再造轮子:手把手教你设计协程库

经过前面章节学习,我们知道生成器可以通过 yield 将执行权还给调用者。这时生成器会记住自己的执行进度,重新调度后将恢复执行。利用这个特性,我们可以实现用户态 协程 。但具体如何实施呢?

本节,我们将在 Python 交互式终端上探索协程库的设计思路,最终形成一个精简的协程库,代码量仅 100 来行!麻雀虽小,五脏俱全!借此即可彻底掌握 协程运行原理协程库设计精髓 ,玩转协程式应用开发。

事件循环建模

简而言之,我们需要实现一个 事件循环 ( Event Loop ),它内部有一个 可执行 ( Runnable )协程队列:

事件循环是一个永久循环,每次循环时它先调度可执行队列里的每个协程——即从队列中取出一个可执行协程,然后调用 send 方法驱动它执行:

协程执行的结果可分为两种不同情况。其一,协程没有遇到 IO 操作,一把梭哈到底并最后退出。这时, send 方法抛 StopIteration 异常通知调用者:

其二,协程需要进行 IO 操作,这时它应该通过 yield 让出执行权,并将 IO 操作上下文提交给事件循环。IO 操作由事件循环负责执行,操作上下文必须记录协程信息:

可执行队列处理完毕后,得到成一个个 IO 操作上下文,事件循环负责将它们注册到 epoll ,以便订阅 IO 事件:

接着,事件循环通过 epoll 等待 IO 事件到达。当某个 IO 操作就绪时,事件循环将把对应协程重新放入可执行队列。假设协程 3 等待的 IO 操作已经就绪,epoll 将返回对应 IO 事件,执行 IO 处理函数并将协程放回可执行队列重新调度:

事件循环处理完所有 epoll 事件后,将进入下一次循环。这时,又开始处理可执行队列,周而复始。

epoll

由于事件循环需要同时关注多个 IO 操作,因此需要采用 IO多路复用 技术。那么,什么是 IO 多路复用呢?它又是如何使用的呢?epollLinux 下的 IO 多路复用技术,很有代表性。我们便以 epoll 为例,简单探讨一下。

服务器应用一般需要通过 套接字 ( socket )监听某个端口,等待客户端连接。这个函数用于创建一个监听套接字:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR

def create_listen_socket(bind_addr='0.0.0.0', bind_port=55555, backlogs=102400):
    # 创建套接字
    sock = socket(AF_INET, SOCK_STREAM)
    # 设置地址复用选项
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    # 绑定监听地址和端口
    sock.bind((bind_addr, bind_port))
    # 开始监听
    sock.listen(backlogs)

    return sock

我们以默认参数创建一个监听套接字,并调用 accept 方法接受客户端连接:

1
2
>>> s = create_listen_socket()
>>> s.accept()

accept 调用将 阻塞 ,直到有客户端连接上来才会返回。现在,我们通过 telnet 命令模拟客户端连接:

1
$ telnet 127.0.0.1 55555

当客户端连上来后,accept 调用就返回了,返回值是一个元组。元组包含一个与客户端通讯的套接字,以及客户端的地址端口对信息:

1
2
>>> s.accept()
(<socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 55555), raddr=('127.0.0.1', 41990)>, ('127.0.0.1', 41990))

如果程序还需要处理另一个监听套接字,事情就尴尬了。假设我们在 s 上等待客户端连接,这时 accept 将阻塞;就算 s2 套接字上来了新连接,也无法提前返回:

1
2
>>> s2 = create_listen_socket(bind_port=44444)
>>> s.accept()

这该怎么办呢?我们先把套接字设置成 非阻塞 状态,accept 就不会一直阻塞了:

1
2
3
4
5
6
7
>>> s.setblocking(False)
>>> s.accept()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/fasion/opt/python3.8.5/lib/python3.8/socket.py", line 292, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable

由于 s 套接字上没有新连接,accept 将抛出 BlockingIOError 异常,以此告知调用者。这时,我们就可以抽出身来处理 s2 了。如果 s2 也没有新连接了,我们又再次检查 s

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> s2.accept()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/fasion/opt/python3.8.5/lib/python3.8/socket.py", line 292, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable
>>> s.accept()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/fasion/opt/python3.8.5/lib/python3.8/socket.py", line 292, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable

最终,我们将在 ss2 间来回 轮询 ,但轮询很浪费 CPU 资源!特别是套接字很多时,更是如此!如果能让内核同时关注多个套接字,当它们中有新连接达到时再通知我们就好了——这就是 epoll 擅长的事。

当监听套接字上有新连接时,它会产生 **读事件 **。因此,我们可以创建一个 epoll 描述符,并将 ss2 注册进去,订阅 读事件 ( EPOLLIN ):

1
2
3
4
>>> import select
>>> ep = select.epoll()
>>> ep.register(s.fileno(), select.EPOLLIN)
>>> ep.register(s2.fileno(), select.EPOLLIN)

接着,我们调用 poll 方法,等待我们感兴趣的事件:

1
>>> events = ep.poll()

poll 将一直阻塞,直到 ss2 上有新连接达到。试着连一下 s2

1
$ telnet 127.0.0.1 44444

poll 立马停止阻塞,并向我们返回了一个事件列表,列表项是一个由 文件描述符事件掩码 组成的元组:

1
2
3
4
5
6
>>> events
[(6, 1)]
>>> for fileno, event in events:
...     print(fileno, event)
...
6 1

这个信息告诉我们,哪个套接字上有什么事件发生。如此一来,程序可以精准处理套接字,无须傻傻 轮询 。这就是 epoll 的强大能力,它让高效处理大规模套接字成为可能。

调度第一个协程

开始研究有 IO 操作的协程之前,我们先拿一个纯计算协程练练手。这是一个只做加法运算的协程:

1
2
3
4
def add(a, b):
    if False:
        yield
    return a + b

if 语句永远不会执行,它只是为了引入 yield 语句,让 Pythonadd 编译成生成器。

现在我们创建一个新协程,并调用 send 方法把它调度起来:

1
2
3
4
5
>>> co = add(1, 2)
>>> co.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 3

正如前面提到的那样,协程一把梭哈到底,StopIteration 异常告诉我们它已经执行完毕,结果是 3 。注意到,协程执行结果(函数返回值)保存在 StopIterationvalue 属性:

1
2
3
4
5
6
>>> import sys
>>> e = sys.last_value
>>> e
StopIteration(3)
>>> e.value
3

我们还可以写一个函数来调度协程,函数只需调用 send 方法,并在协程执行完毕后输出一些提示:

1
2
3
4
5
def schedule_coroutine(co):
    try:
        co.send(None)
    except StopIteration as e:
        print('coroutine {} completed with result {}'.format(co.__name__, e.value))
1
2
3
>>> co = add(2, 3)
>>> schedule_coroutine(co)
coroutine add completed with result 5

IO上下文

如果协程中涉及 IO 操作,则需要在 IO 未就绪时通过 yield 让出执行权。在让出执行权的同时,还需要将 IO 上下文提交给事件循环,由它协助处理。那么,IO 上下文需要包含哪些信息呢?

IOContext 需要保存哪些信息取决于封装程度,但至少要包括协程需要等待的 文件描述符 以及感兴趣的 事件

1
2
3
4
5
class IOContext:

    def __init__(self, fileno, events):
        self.fileno = fileno
        self.events = events

现在我们开始编写一个带 IO 操作的协程,它负责从监听套接字接收新客户端连接:

1
2
3
4
5
6
7
8
def accept_client(sock):
    while True:
        try:
            return sock.accept()
        except BlockingIOError:
            pass

        yield IOContext(sock.fileno(), select.EPOLLIN)

协程主体逻辑是一个循环,它先调用 accept 尝试接收新连接。如果没有连接就绪,accept 会抛 BlockingIOError 异常。 这时,yield 语句让出执行权,并将 IOContext 提交给事件循环。注意到,协程对套接字上的读事件感兴趣。

现在我们创建一个这样的协程,并扮演事件循环,来体会协程调度过程。如果套接字 s 没有就绪连接,send 将收到协程返回的 IOContext ,表明协程期待哪些事件发生:

1
2
3
4
5
6
7
8
>>> co = accept_client(s)
>>> context = co.send(None)
>>> context
<__main__.IOContext object at 0x7fcd58e3ef70>
>>> context.fileno
3
>>> context.events
1

事件循环接到上下文后,需要将当前协程保存到上下文中,并将需要订阅的事件注册到 epoll

1
2
>>> context.co = co
>>> ep.register(context.fileno, context.events)

接着,事件循环在 epoll 上等待相关事件到达:

1
>>> ep.poll()

poll 将保持阻塞,直到有注册事件出现。因此,用 telnet 命令再次连接 s 套接字,poll 将返回:

1
2
>>> ep.poll()
[(3, 1)]

根据 poll 返回的文件描述符 3 ,我们知道 context 这次 IO 操作已经就绪了。这时,可以接着调度对应的协程:

1
2
3
4
>>> context.co.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: (<socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 55555), raddr=('127.0.0.1', 51528)>, ('127.0.0.1', 51528))

至此,协程 co 成功接收了一个客户端连接,并退出了。

yield from

现有一个用于计算圆面积的协程,它没有涉及 IO 操作:

1
2
3
4
5
6
import math

def circle_area(r):
    if False:
        yield
    return math.pi * r ** 2

创建一个这样的协程来计算半径为 2 的圆的面积,并调用 send 方法来调度它,协程执行完毕后将返回结果:

1
2
3
4
5
>>> co = circle_area(2)
>>> co.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 12.566370614359172

现在,让我们利用这个协程来计算圆柱体积:

1
2
def cylindrical_volume(r, h):
    return circle_area(r) * h

这样显然是不行的,因为调用 circle_area 返回的是一个代表协程的生成器,需要调度它才能获得计算结果。不过没关系,我们可以这么写:

1
2
3
4
5
6
7
8
def cylindrical_volume(r, h):
    co = circle_area(r)
    while True:
        try:
            yield co.send(None)
        except StopIteration as e:
            floorage = e.value
            return floorage * h

这个是一个协程函数,它先创建一个子协程用于计算底面积,然后用一个永久循环驱动子协程执行。

每次循环时,它先调用 send 方法将执行权交给子协程。如果子协程用 yield 语句归还执行权,这里同样用 yield 将执行权交给调用者,yield 值也一并向上传递。如果子协程退出,它将取出子协程执行结果并完成计算。

1
2
3
4
5
>>> co = cylindrical_volume(2, 3)
>>> co.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 37.69911184307752

因此,cylindrical_volume 就像一个中间人,在调用者和子协程之间来回传递执行权。函数调用很常见,如果涉及协程的函数调用都需要用样板代码传递执行权,那简直就是一个噩梦!为此,Python 引入 yield from

1
2
3
def cylindrical_volume(r, h):
    floorage = yield from circle_area(r)
    return floorage * h

例子中 yield from 的作用相当于上一例子中的 while 循环,因此这两个例子是完全等价的。与业务逻辑无关的样板代码消除后,新函数变得简洁纯粹,更加清晰易懂了!

async await

直接使用生成器实现协程,虽然逻辑上可行,但语义上有点令人摸不着头脑:

1
2
3
>>> co = circle_area(1)
>>> co
<generator object circle_area at 0x10500db50>

为突显协程语义,Python 引入了 async 关键字:

1
2
async def circle_area(r):
    return math.pi * r ** 2

async 关键字标识的函数会被编译成异步函数,调用后得到一个 coroutine 对象:

1
2
3
>>> co = circle_area(1)
>>> co
<coroutine object circle_area at 0x1050f7050>

coroutine 对象与 generator 对象类似,我们可以调用 send 方法来调度 coroutine 对象:

1
2
3
4
>>> co.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 3.141592653589793

coroutine 对象的语义更加准确,而且我们再也不需要在函数代码中显式编写 yield 语句了,这未免有点画蛇添足。

青出于蓝而胜于蓝,如果 coroutine 没执行完毕便被意外销毁,Python 将输出警告信息:

1
2
3
4
>>> co = circle_area(2)
>>> del co
__main__:1: RuntimeWarning: coroutine 'circle_area' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

此外,Python 还引入了 await 关键字,代替前面提到的 yield from 语句。与 yield from 类似,await 将执行权交给子协程,并等待它退出。如果子协程需要暂时归还执行权,await 同样承担起中间人角色,在调用者与子协程间来回接棒。

1
2
3
async def cylindrical_volume(r, h):
    floorage = await circle_area(r)
    return floorage * h

无须多言,await 的语义也比 yield from 准确。另外,Python 还引入了 可等待对象 ( awaitable )。例子如下:

1
2
3
4
5
6
7
8
9
class Job:

    def __await__(self):
        print('step 1')
        yield
        print('step 2')
        yield
        print('step 3')
        return 'coding-fan'

可等待对象需要提供 await 魔术方法,实现成普通生成器即可。然后,await 就可以驱动生成器的执行:

1
2
3
async def do_job(job):
    value = await job
    print('job is done with value {}'.format(value))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> co = do_job(Job())
>>> co.send(None)
step 1
>>> co.send(None)
step 2
>>> co.send(None)
step 3
job is done with value coding-fan
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如果你觉得可等待对象 Job 难以理解,可以将它想象成等价的 yield from 形式,便豁然开朗了:

1
2
3
def do_job(job):
    value = yield from job.__await__()
    print('job is done with value {}'.format(value))

await 本无法驱动普通生成器,可等待对象却另辟蹊径,因而它在协程库中有重要作用。

终极作品

铺垫了这么东西,终于可以亮出我们的终极作品了:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import select

from collections import deque
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR

def create_listen_socket(bind_addr='0.0.0.0', bind_port=55555, backlogs=102400):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind((bind_addr, bind_port))
    sock.listen(backlogs)
    return sock

class Future:

    def __init__(self, loop):
        self.loop = loop
        self.done = False
        self.result = None
        self.co = None

    def set_coroutine(self, co):
        self.co = co

    def set_result(self, result):
        self.done = True
        self.result = result

        if self.co:
            self.loop.add_coroutine(self.co)

    def __await__(self):
        if not self.done:
            yield self
        return self.result

class AsyncSocket:

    def __init__(self, sock, loop):
        sock.setblocking(False)

        self.sock = sock
        self.loop = loop

    def fileno(self):
        return self.sock.fileno()

    def create_future_for_events(self, events):
        future = self.loop.create_future()

        def handler(fileno, active_events):
            loop.unregister_from_polling(self.fileno())
            future.set_result(active_events)

        self.loop.register_for_polling(self.fileno(), events, handler)

        return future

    async def accept(self):
        while True:
            try:
                sock, addr = self.sock.accept()
                return AsyncSocket(sock=sock, loop=self.loop), addr
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def recv(self, bufsize):
        while True:
            try:
                return self.sock.recv(bufsize)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def send(self, data):
        while True:
            try:
                return self.sock.send(data)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLOUT)
                await future

class EventLoop:

    def __init__(self):
        self.epoll = select.epoll()

        self.runnables = deque()
        self.handlers = {}

    def create_future(self):
        return Future(loop=self)

    def create_listen_socket(self, bind_addr, bind_port, backlogs=102400):
        sock = create_listen_socket(bind_addr, bind_port, backlogs)
        return AsyncSocket(sock=sock, loop=loop)

    def register_for_polling(self, fileno, events, handler):
        print('register fileno={} for events {}'.format(fileno, events))
        self.handlers[fileno] = handler
        self.epoll.register(fileno, events)

    def unregister_from_polling(self, fileno):
        print('unregister fileno={}'.format(fileno))
        self.epoll.unregister(fileno)
        self.handlers.pop(fileno)

    def add_coroutine(self, co):
        self.runnables.append(co)

    def run_coroutine(self, co):
        try:
            future = co.send(None)
            future.set_coroutine(co)
        except StopIteration as e:
            print('coroutine {} stopped'.format(co.__name__))

    def schedule_runnable_coroutines(self):
        while self.runnables:
            self.run_coroutine(co=self.runnables.popleft())

    def run_forever(self):
        while True:
            self.schedule_runnable_coroutines()

            events = self.epoll.poll(1)
            for fileno, event in events:
                handler = self.handlers.get(fileno)
                if handler:
                    handler(fileno, events)

class TcpServer:

    def __init__(self, loop, bind_addr='0.0.0.0', bind_port=55555):
        self.loop = loop
        self.listen_sock = self.loop.create_listen_socket(bind_addr=bind_addr, bind_port=bind_port)
        self.loop.add_coroutine(self.serve_forever())

    async def serve_client(self, sock):
        while True:
            data = await sock.recv(1024)
            if not data:
                print('client disconnected')
                break

            await sock.send(data.upper())

    async def serve_forever(self):
        while True:
            sock, (addr, port) = await self.listen_sock.accept()
            print('client connected addr={} port={}'.format(addr, port))

            self.loop.add_coroutine(self.serve_client(sock))

if __name__ == '__main__':
    loop = EventLoop()
    server = TcpServer(loop=loop)
    loop.run_forever()

这个程序是一个精简的协程库实现,除了用于演示的应用代码 TcpServer ,整个库也就 100 来行代码!

我们模仿常见协程库,引入 Future ,代表一个在未来才能获取到的数据。Future 一般由协程创建,典型的场景是这样的:协程在等待一个 IO 事件,这时它便创建一个 Future 对象,并把执行权归还给事件循环。

例子中的 Future 类,有 4 个重要的属性:

  • loop ,当前事件循环对象;
  • done ,标识目标数据是否就绪;
  • result ,目标数据;
  • co ,关联协程,Future 就绪后,事件循环 loop 将把它放入可执行队列重新调度;

注意到,Future 是一个 可等待对象 ( awaitable ),它实现了 await 方法。当数据未就绪时,通过 yield 让出执行权,这时事件循环将协程记录在 Future 中。当数据就绪后,事件循环将协程放回可执行队列重新调度。

协程库还将套接字进行 异步化 封装,抽象出 AsyncSocket 类,接口与原生 socket 对象类似。除了保存原生 socket 对象,它还保存事件循环对象,以便通过事件循环订阅 IO 事件。

create_future_for_events 方法创建一个 Future 对象,来等待一个不知何时发生的 IO 事件。创建完 Future 对象后,进一步调用 loop 相关方法,将感兴趣的 IO 事件注册到 epoll 。当相关事件就绪时,事件循环将执行回调函数 handler ,它解除 epoll 注册,并将活跃事件作为目标数据设置到 Future 上(注意 set_result 将唤醒协程)。

然后是套接字系列操作函数,以 accept 为例,它不断尝试调用原生套接字,而原生套接字已被设为非阻塞。如果套接字已就绪,accept 将直接返回新连接,协程无须等待。

否则,accept 方法抛出 BlockingIOError 异常。这时,协程调用 create_future_for_events 方法创建一个 Future 订阅读事件( EPOLLIN ),并等待事件到达。

recvsend 方法封装也是类似的,不同的是 send 需要订阅 可写事件 ( EPOLLOUT )。

好了,终于来到协程库了主角事件循环 EventLoop 对象了,它有 3 个重要属性:

  • epoll ,这是一个 epoll 描述符,用于订阅 IO 事件;
  • runnables ,可执行协程队列;
  • handlersIO 事件回调处理函数映射表;

register_for_polling 方法注册感兴趣的 IO 事件和处理函数,它以文件描述符为键,将处理函数记录到映射表中,然后调用 epoll 完成事件订阅。unregister_from_polling 方法则刚好相反,用于取消注册。

add_coroutine 将一个可运行的协程加入队列。run_coroutine 则调度一个可执行协程,它调用 send 将执行权交给协程。如果协程执行完毕,它将输出提示;协程需要等待时,会通过 yield 归还执行权并提交 Future 对象,它将协程记录到 Future 上下文。schedule_runnable_coroutines 将可执行协程逐个取出并调度,直到队列为空。

run_forever 是事件循环的主体逻辑,这是一个永久循环。每次循环时,先调度可执行协程;然后通过 poll 等待协程注册的 IO 事件;当有新事件到达时,取出回调函数 handler 函数并调用。

TcpServer 只是一个普通的协程式应用,无须赘述。接下来,我们逐步分析,看看程序启动后都发生什么事情:

  1. 创建事件循环 EventLoop 对象,它将创建 epoll 描述符;
  2. 创建 TcpServer 对象,它通过事件循环 loop 创建监听套接字,并将 serve_forever 协程放入可执行队列;
  3. 事件循环 loop.run_forever 开始执行,它先调度可执行队列;
  4. 可执行队列一开始只有一个协程 TcpServer.serve_forever ,它将开始执行(由 run_coroutine 驱动);
  5. 执行权来到 TcpServer.serve_forever 协程,它调用 AsyncSocket.accept 准备接受一个新连接;
  6. 假设原生套接字未就绪,它将抛出 BlockingIOError 异常;
  7. 由于 IO 未就绪,协程创建一个 Future 对象,用来等待一个未来的 IO 事件( AsyncSocket.accept );
  8. 于此同时,协程调用事件循环 register_for_polling 方法订阅 IO 事件,并注册回调处理函数 handler
  9. future 是一个可等待对象,await future 将执行权交给它的 await 函数;
  10. 由于一开始 future 是未就绪的,这时 yield 将协程执行逐层归还给事件循环,future 对象也被同时上报;
  11. 执行权回到事件循环,run_coroutine 收到协程上报的 future 后将协程设置进去,以便 future 就绪后重新调度协程;
  12. 可执行队列变空后,事件循环开始调用 epoll.poll 等待协程注册的 IO 事件( serve_forever );
  13. 当注册事件到达后,事件循环取出回调处理函数并调用;
  14. handler 先将套接字从 epoll 解除注册,然后调用 set_result 将活跃事件作为目标数据记录到 future 中;
  15. set_result 将协程重新放回可执行队列;
  16. IO 事件处理完毕,进入下一次事件循环;
  17. 事件循环再次调度可执行队列,这时 TcpServer.serve_forever 协程再次拿到执行权;
  18. TcpServer.serve_forever 协程从 yield 语句恢复执行,开始返回目标数据,也就是先前设置的活跃事件;
  19. AsyncSocket.acceptawait future 语句取得活跃事件,然后循环继续;
  20. 循环再次调用原生套接字,这时它早已就绪,得到一个新套接字,简单包装后作为结果返回给调用者;
  21. TcpServer.serve_forever 拿到代表新连接的套接字后,创建一个 serve_client 协程并交给事件循环 loop
  22. TcpServer.serve_forever 进入下一次循环,调用 accept 准备接受下一个客户端连接;
  23. 如果监听套接字未就绪,执行权再次回到事件循环;
  24. 事件循环接着调度可执行队列里面的协程,TcpServer.serve_client 协程也开始执行了;
  25. etc

这看着就像一个精密的机械装置,有条不紊的运行着,环环相扣!

洞悉 Python 虚拟机运行机制,探索高效程序设计之道!

到底如何才能提升我的 Python 开发水平,向更高一级的岗位迈进呢? 如果你有这些问题或者疑惑,请订阅我们的专栏 Python源码深度剖析 ,阅读更多章节:

【Python源码剖析】系列文章首发于公众号【小菜学编程】,敬请关注: