歡迎光臨
每天分享高質量文章

Tornado 原始碼閱讀:初步認識

作者:國夫君

來源:見文末

ioloop


`ioloop``tornado`的核心模組,也是個排程模組,各種非同步事件都是由他排程的,所以必須弄清他的執行邏輯

 

原始碼分析


而`ioloop`的核心部分則是 `while True`這個迴圈內部的邏輯,貼上他的程式碼下

   def start(self):

        if self._running:

            raise RuntimeError(“IOLoop is already running”)

        self._setup_logging()

        if self._stopped:

            self._stopped = False

            return

        old_current = getattr(IOLoop._current, “instance”, None)

        IOLoop._current.instance = self

        self._thread_ident = thread.get_ident()

        self._running = True

 

        old_wakeup_fd = None

        if hasattr(signal, ‘set_wakeup_fd’) and os.name == ‘posix’:

 

            try:

                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())

                if old_wakeup_fd != –1:

 

                    signal.set_wakeup_fd(old_wakeup_fd)

                    old_wakeup_fd = None

            except ValueError:

 

                old_wakeup_fd = None

 

        try:

            while True:

 

                with self._callback_lock:

                    callbacks = self._callbacks

                    self._callbacks = []

 

                due_timeouts = []

 

                if self._timeouts:

                    now = self.time()

                    while self._timeouts:

                        if self._timeouts[0].callback is None:

 

                            heapq.heappop(self._timeouts)

                            self._cancellations -= 1

                        elif self._timeouts[0].deadline <= now:

                            due_timeouts.append(heapq.heappop(self._timeouts))

                        else:

                            break

                    if (self._cancellations > 512

                            and self._cancellations > (len(self._timeouts) >> 1)):

                        self._cancellations = 0

                        self._timeouts = [x for x in self._timeouts

                                          if x.callback is not None]

                        heapq.heapify(self._timeouts)

 

                for callback in callbacks:

                    self._run_callback(callback)

                for timeout in due_timeouts:

                    if timeout.callback is not None:

                        self._run_callback(timeout.callback)

 

                callbacks = callback = due_timeouts = timeout = None

 

                if self._callbacks:

 

                    poll_timeout = 0.0

                elif self._timeouts:

 

                    poll_timeout = self._timeouts[0].deadline – self.time()

                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))

                else:

 

                    poll_timeout = _POLL_TIMEOUT

 

                if not self._running:

                    break

 

                if self._blocking_signal_threshold is not None:

 

                    signal.setitimer(signal.ITIMER_REAL, 0, 0)

 

                try:

                    event_pairs = self._impl.poll(poll_timeout)

                except Exception as e:

 

                    if errno_from_exception(e) == errno.EINTR:

                        continue

                    else:

                        raise

 

                if self._blocking_signal_threshold is not None:

                    signal.setitimer(signal.ITIMER_REAL,

                                     self._blocking_signal_threshold, 0)

 

                self._events.update(event_pairs)

                while self._events:

                    fd, events = self._events.popitem()

                    try:

                        fd_obj, handler_func = self._handlers[fd]

                        handler_func(fd_obj, events)

                    except (OSError, IOError) as e:

                        if errno_from_exception(e) == errno.EPIPE:

 

                            pass

                        else:

                            self.handle_callback_exception(self._handlers.get(fd))

                    except Exception:

                        self.handle_callback_exception(self._handlers.get(fd))

                fd_obj = handler_func = None

 

        finally:

 

            self._stopped = False

            if self._blocking_signal_threshold is not None:

                signal.setitimer(signal.ITIMER_REAL, 0, 0)

            IOLoop._current.instance = old_current

            if old_wakeup_fd is not None:

                signal.set_wakeup_fd(old_wakeup_fd)


除去註釋,程式碼其實沒多少行. 由while 內部程式碼可以看出ioloop主要由三部分組成:


1.回呼 callbacks


他是ioloop回呼的基礎部分,透過IOLoop.instance().add_callback()新增到self._callbacks,他們將在每一次loop中被執行.


主要用途是將邏輯分塊,在適合時機將包裝好的callback新增到self._callbacks讓其執行.


例如ioloop中的add_future


def add_future(self, future, callback):

        “””Schedules a callback on the “IOLoop“ when the given

        `.Future` is finished.

 

        The callback is invoked with one argument, the

        `.Future`.

        “””

        assert is_future(future)

        callback = stack_context.wrap(callback)

        future.add_done_callback(

            lambda futureself.add_callback(callback, future))


future物件得到result的時候會呼叫future.add_done_callback新增的callback,再將其轉至ioloop執行


2.定時器 due_timeouts


這是定時器,在指定的事件執行callback.


跟1中的callback類似,透過IOLoop.instance().add_callback在每一次迴圈,會計算timeouts回呼串列裡的事件,執行已到期的callback.


當然不是無節操的迴圈.


因為poll操作會阻塞到有io操作發生,所以只要計算最近的timeout, 然後用這個時間作為self._impl.poll(poll_timeout) 的 poll_timeout ,就可以達到按時運行了


但是,假設poll_timeout的時間很大時,self._impl.poll一直在堵塞中(沒有io事件,但在處理某一個io事件), 那新增剛才1中的callback不是要等很久才會被執行嗎? 答案當然是不會.


ioloop中有個waker物件,他是由兩個fd組成,一個讀一個寫.


ioloop在初始化的時候把waker系結到epoll裡了,add_callback時會觸發waker的讀寫.


這樣ioloop就會在poll中被喚醒了,接著就可以及時處理timeout callback了


用這樣的方式也可以自己封裝一個小的定時器功能玩玩


3.io事件的event loop


處理epoll事件的功能

透過IOLoop.instance().add_handler(fd, handler, events)系結fd event的處理事件

在httpserver.listen的程式碼內,

netutil.py中的netutil.py的add_accept_handler系結accept handler處理客戶端接入的邏輯


如法炮製,其他的io事件也這樣系結,業務邏輯的分塊交由ioloop的callback和future處理


關於epoll的用法的內容.詳情見我第一篇文章吧,哈哈


總結


ioloop由callback(業務分塊), timeout callback(定時任務) io event(io傳輸和解析) 三塊組成,互相配合完成非同步的功能,構建gen,httpclient,iostream等功能。


串聯大致的流程是,tornado 系結io event,處理io傳輸解析,傳輸完成後(結合Future)回呼(callback)業務處理的邏輯和一些固定操作 . 定時器則是較為獨立的模組


Futrue


個人認為Future是tornado僅此ioloop重要的模組,他貫穿全文,所有非同步操作都有他的身影。顧名思義,他主要是關註日後要做的事,類似jquery的Deferred吧


一般的用法是透過ioloop的add_future定義future的done callback, 當future被set_result的時候,future的done callback就會被呼叫. 從而完成Future的功能.


具體可以參考gen.coroutine的實現,本文後面也會講到


他的組成不複雜,只有幾個重要的方法,最重要的是 add_done_callback , set_result


tornado用Future和ioloop,yield實現了gen.coroutine


1. add_done_callback


跟ioloop的callback類似 , 儲存事件完成後的callback在self._callbacks裡


def add_done_callback(self, fn):

        if self._done:

            fn(self)

        else:

            self._callbacks.append(fn)


2.set_result


設定事件的結果,並執行之前儲存好的callback


def set_result(self, result):

        self._result = result

        self._set_done()

 

def _set_done(self):

        self._done = True

        for cb in self._callbacks:

            try:

                cb(self)

            except Exception:

                app_log.exception(‘Exception in callback %r for %r’,

                                  cb, self)

        self._callbacks = None


為了驗證之前所說的,上一段測試程式碼


#! /usr/bin/env python

#coding=utf-8

 

import tornado.web

import tornado.ioloop

 

from tornado.gen import coroutine

from tornado.concurrent import Future

 

 

def test():

    def pp(s):

        print s

 

    future = Future()

    iol = tornado.ioloop.IOLoop.instance()

 

    print ‘init future %s’%future

 

    iol.add_future(future, lambda fpp(‘ioloop callback after future done,future is %s’%f))

 

    #模擬io延遲操作

    iol.add_timeout(iol.time()+5,lambda:future.set_result(‘set future is done’))

 

    print ‘init complete’

    tornado.ioloop.IOLoop.instance().start()

 

if __name__ == “__main__”:

    test() 


執行結果:



gen.coroutine


接著繼續延伸,看看coroutine的實現。


gen.coroutine實現的功能其實是將原來的callback的寫法,用yield的寫法代替. 即以yield為分界,將程式碼分成兩部分.


如:


#! /usr/bin/env python

#coding=utf-8

 

import tornado.ioloop

from tornado.gen import coroutine

from tornado.httpclient import AsyncHTTPClient

 

@coroutine

def cotest():

    client = AsyncHTTPClient()

    res = yield client.fetch(“http://www.segmentfault.com/”)

    print res

 

if __name__ == “__main__”:

    f = cotest()    

    print f #這裡傳回了一個future哦

    tornado.ioloop.IOLoop.instance().start()


執行結果:



原始碼分析


接下來分析下coroutine的實現


def _make_coroutine_wrapper(func, replace_callback):

 

    @functools.wraps(func)

    def wrapper(*args, **kwargs):

        future = TracebackFuture()

 

        if replace_callback and ‘callback’ in kwargs:

            callback = kwargs.pop(‘callback’)

            IOLoop.current().add_future(

                future, lambda futurecallback(future.result()))

 

        try:

            result = func(*args, **kwargs)

        except (Return, StopIteration) as e:

            result = getattr(e, ‘value’, None)

        except Exception:

            future.set_exc_info(sys.exc_info())

            return future

        else:

            if isinstance(result, types.GeneratorType):

                try:

                    orig_stack_contexts = stack_context._state.contexts

                    yielded = next(result)

                    if stack_context._state.contexts is not orig_stack_contexts:

                        yielded = TracebackFuture()

                        yielded.set_exception(

                            stack_context.StackContextInconsistentError(

                                ‘stack_context inconsistency (probably caused ‘

                                ‘by yield within a “with StackContext” block)’))

                except (StopIteration, Return) as e:

                    future.set_result(getattr(e, ‘value’, None))

                except Exception:

                    future.set_exc_info(sys.exc_info())

                else:

                    Runner(result, future, yielded)

                try:

                    return future

                finally:

                    future = None

        future.set_result(result)

        return future

    return wrapper


如原始碼所示,func執行的結果是GeneratorType ,yielded = next(result), 執行至原函式的yield位置,傳回的是原函式func內部 yield 右邊傳回的物件(必須是Future或Future的list)給yielded.經過Runner(result, future, yielded) 對yielded進行處理.在此就 貼出Runner的程式碼了.


Runner初始化過程,呼叫handle_yield, 檢視yielded是否已done了,否則add_future執行Runner的run方法, run方法中如果yielded物件已完成,用對它的gen呼叫send,傳送完成的結果.


所以yielded在什麼地方被set_result非常重要, 當被set_result的時候,才會send結果給原func,完成整個非同步操作


詳情可以檢視tornado 中重要的物件 iostream,原始碼中iostream的 _handle_connect,如此設定了連線的result.


def _handle_connect(self):

        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)

        if err != 0:

            self.error = socket.error(err, os.strerror(err))

            if self._connect_future is None:

                gen_log.warning(“Connect error on fd %s: %s”,

                                self.socket.fileno(), errno.errorcode[err])

            self.close()

            return

        if self._connect_callback is not None:

            callback = self._connect_callback

            self._connect_callback = None

            self._run_callback(callback)

        if self._connect_future is not None:

            future = self._connect_future

            self._connect_future = None

            future.set_result(self)

        self._connecting = False


最後貼上一個簡單的測試程式碼,演示coroutine,future的用法


import tornado.ioloop

from tornado.gen import coroutine

from tornado.concurrent import Future

 

@coroutine

def asyn_sum(a, b):

    print(“begin calculate:sum %d+%d”%(a,b))

    future = Future()

    future2 = Future()

    iol = tornado.ioloop.IOLoop.instance()

 

    print future

 

    def callback(a, b):

        print(“calculating the sum of %d+%d:”%(a,b))

        future.set_result(a+b)

 

        iol.add_timeout(iol.time()+3,lambda f:f.set_result(None),future2)

    iol.add_timeout(iol.time()+3,callback, a, b)

 

    result = yield future

 

    print(“after yielded”)

    print(“the %d+%d=%d”%(a, b, result))

 

    yield future2

 

    print ‘after future2’

 

def main():

    f =  asyn_sum(2,3)

 

    print 

    print f

    tornado.ioloop.IOLoop.instance().start()

 

if __name__ == “__main__”:

    main()


執行結果:



為什麼程式碼中個yield都起作用了? 因為Runner.run裡,最後繼續用handle_yield處理了send後傳回的yielded物件,意思是func裡可以有n幹個yield操作


if not self.handle_yield(yielded):

        return



至此,已完成tornado中重要的幾個模組的流程,其他模組也是由此而來.寫了這麼多,越寫越卡,就到此為止先吧。

來源:國夫君

segmentfault.com/a/1190000002971992

《Linux雲端計算及運維架構師高薪實戰班》2018年09月16日即將開課中,120天衝擊Linux運維年薪30萬,改變速約~~~~

    *宣告:推送內容及圖片來源於網路,部分內容會有所改動,版權歸原作者所有,如來源資訊有誤或侵犯權益,請聯絡我們刪除或授權事宜。

    – END –




    更多Linux好文請點選【閱讀原文】

    ↓↓↓

    贊(0)

    分享創造快樂