Linux IO多路复用

简介

在平常的工作中,遇到 IO 密集型任务,比如要频繁调用外部的 API 接口(网络IO),或者频繁读写文件(磁盘IO)通常代码都会在 IO 任务创建后等待 IO 任务结束并将需要的数据返回。这种同步的编程方式是最简单的,但同时也是效率最低的。比如在一个 Web 后端中,如果是单线程单进程的运行模式,一次就只能处理一个客户端的请求,即使引入了多线程/多进程,虽然可以同时服务多个客户端了,但是如果瞬间几千的并发连接,那么后端就必须创建相对多的线程或进程,这时候 CPU 将会花费大量的时间和资源用于线程上下文切换上,而且这么多的线程,如果用到了锁机制来保证一些共享数据的安全,还将进一步的降低 Web 后端的性能。而 IO 多路复用的出现,则给这种局面带来了转机。

环境

软件 版本
操作系统 Ubuntu 18.04
Python 3.6

教程

在学习 IO 多路复用前,我们先理解一些基本的概念,渐进式的学习 IO 多路复用。

同步/异步

同步和异步的概念,在不同的角度有不同的回答,这里举几个简单的例子来说明一下。

  1. 在楼下食堂买饭,当点完餐后,你就站在台前呆若木鸡的等,直到你点的餐做好了你才端走去吃。因为吃饭和点餐是相互依赖的事情,所以在点餐没有得到结果(食物)之前就什么也不干一直等待,这就是同步。异步就是在点完餐后,你就找个地坐着玩手机,就干点别的 不傻等了。节约了等待过程的时间浪费,但是怎么才能知道自己点的餐好了呢?这时候可以选择每隔五分钟就上吧台问一问,没有好就继续回去玩手机。这种方式也就称之为轮询。还有一种更高效的方式,就是点完餐后服务人员给你一个号,当你的餐准备完毕就广播你的号,你就可以前去领餐了。这种方式就是消息通知。

  2. 在一套 Web 系统中,比如我们要导出员工表,当点击导出的按钮后,过一会就开始自动下载员工表。在浏览器内部,浏览器发起 http 请求我们的后端接口,这时候后端保持着 http 连接,当把员工数据从数据库中找到并绘制成表格后就返回给浏览器,这就是同步。而异步就是当浏览器发起请求后,后端直接返回它一个任务 ID,并提供它一个接口用来查询任务进度。这时候想要知道任务是否已经结束就需要不停的刷新页面查看任务状态,这种轮询的方式是非常比较浪费资源切低效的。最高效的方式就是后端可以主动通知浏览器任务已经结束了,关于主动推的技术目前主流的是 WebSocket。

  3. 在日常的编程中,我们接触最多的就是同步编程的方式了,比如我们使用 requests 库发起一个 http 请求,在请求发出后,函数将以阻塞的方式等待对方返回数据。在阻塞期间,是没有任何一行代码被执行,整个程序都处于挂起状态,直到对方返回数据后才继续往下走,如果有多个不想关的请求需要被发送,比如爬虫程序,那么同步编程的方式效率简直太低了。JavaScript 在浏览器中,通过 XMLHttpRequest 发送 ajax 请求的时候,默认是直接返回的,但是你需要将事后对数据的处理方法通过回调的形式传递给 XMLHttpRequest 对象,当请求的接口响应后,会主动调用对数据处理的回调函数。这种通过非阻塞加回调的方式,就是一种常见的异步编程方式。

由上可以看到,同步就是事物之间串行化的执行过程,异步就是事物之间无需互相等待。同步和异步所关注的就是事件完成后的消息通知机制,是阻塞等待、轮询还是事件通知/回调。

阻塞/非阻塞

阻塞和非阻塞是在调用者的角度来看,例如发起一个 IO 相关的系统调用,或者 wait 另外的线程,sleep 主动挂起自己,都会使程序进行阻塞。而我们大多遇到的阻塞案例,都是进行了 IO 相关的系统调用。

什么是系统调用?简单来说就是内核提供给用户程序的 API 接口,用于操纵内核才有权限访问的内容。例如我们在 Python 中对文件进行读写,我们并不需要关心文件存储在哪种物理介质上,是机械硬盘是U盘还是固态硬盘,这是因为在驱动、内核、C语言库的层层封装下,我们才得以用非常简单的方式对文件进行读写。对于网络也一样,我们不需要关心网卡的类型和性质,只需要简单的调用 socket 模块就可以完成对网络的操控。这些库本质上都只是调用了C语言提供的标准库,C语言的标准库中又调用了内核提供的这些系统调用完成的。那内核又为什么不允许用户进程直接操控硬件呢?是为了自我保护,如果任何进程都能随意的操控硬件,操控内存中的数据则是非常危险的,内核都可能分分钟被恶意程序干掉。

现在再来看看因为 IO 相关的系统调用造成阻塞的原因是什么。Linux 上的 strace 命令可以跟踪用户进程的系统调用,下面写一段因为网络 IO 造成的阻塞的例子:

1
2
3
4
5
6
7
import socket

s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.bind(('',9090))
data, addr = s.recvfrom(4096)
s.sendto(data, addr)
s.close()

使用 strace python echo.py 执行脚本,在一片输出后,停在了 recvfrom(3, 这个地方:

...
socket(AF_INET, SOCK_DGRAM, IPPROTO_IP) = 3
bind(3, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
recvfrom(3, 

我们需要关注的是最后三行,前面打印的都是 Python 虚拟机在启动的时候加载动态链接库、读取脚本内容、分配内存空间等相关的系统调用。当脚本执行到 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 发起了第一个系统调用 socket,就是创建了一个网络套接字文件,返回的文件描述符是 3,在系统的 /proc 下也可以看到该进程打开的所有文件描述符:

root@yunfwe:~# ps aux |grep echo.py
root     10533  0.0  0.0  18720  1932 pts/0    S+   20:40   0:00 strace python echo.py
root     10535  0.0  0.4  35684  9084 pts/0    S+   20:40   0:00 python echo.py
root     14654  0.0  0.0  14428  1052 pts/2    S+   20:50   0:00 grep --color=auto echo.py
root@yunfwe:~# ls -lh /proc/10535/fd
total 0
lrwx------ 1 root root 64 2019/11/10 20:50:52 0 -> /dev/pts/0
lrwx------ 1 root root 64 2019/11/10 20:50:52 1 -> /dev/pts/0
lrwx------ 1 root root 64 2019/11/10 20:50:52 2 -> /dev/pts/0
lrwx------ 1 root root 64 2019/11/10 20:50:52 3 -> 'socket:[274124191]'
root@yunfwe:~#

程序当前已经打开了 4 个文件描述符,其中 1,2,3 分别对应着 标准输入标准输出错误输出,其中都指向了 /dev/pts/0,这个设备文件就是当前 SSH 连接的虚拟终端,所以程序的输出我们才会在终端上看到。最后一个 socket 类型的文件就是打开的网络套接字了。

接着 s.bind(('',9090)) 的语句发起了第二个系统调用,就是给刚才创建的网络套接字绑定了监听地址和端口。如果没有异常,bind 系统调用将返回 0。接着执行到 data, addr = s.recvfrom(4096) 语句的时候,发起了第三个系统调用,这个系统调用并长时间的阻塞了 Python 进程,这个过程又发生了什么?

进程在进行系统调用的时候,用户进程触发软中断,CPU 接收到中断信号后由用户态切换到内核态去运行相应系统调用的内核函数,此时用户进程是被挂起的(进程被放入等待队列中,这时进程的状态为 sleeping),并等待系统调用执行完毕后 CPU 由内核态切换到用户态才被重新唤醒(进程重新被放入执行队列,只有此队列的进程才可能获得 CPU 执行权,此时进程的状态为 running)。所以,前面在进行 socketbind 系统调用的时候,Python 进程并不是没有被阻塞,而是阻塞的时间太短,我们感知不到而已。

当网卡又数据来到的时候,网卡将产生一个硬中断来通知内核来处理数据,内核发现了有该进程监听的端口的数据的时候,将数据由内核空间的缓冲区拷贝到用户进程空间的缓冲区,并重新将用户进程唤醒(放置到运行队列),这时用户进程就拿到了需要的数据。我们可以使用 nc 命令向此端口发送一些数据,看看用户进程之后的动作。

新建终端,并通过 nc 命令发送数据:

root@yunfwe:~# echo 'hello' |nc -4u 127.0.0.1 9090
hello

Python 进程退出,strace 命令的跟踪如下:

recvfrom(3, "hello\n", 4096, 0, {sa_family=AF_INET, sin_port=htons(53114), sin_addr=inet_addr("127.0.0.1")}, [16]) = 6
sendto(3, "hello\n", 6, 0, {sa_family=AF_INET, sin_port=htons(53114), sin_addr=inet_addr("127.0.0.1")}, 16) = 6
close(3)                                = 0
rt_sigaction(SIGINT, {sa_handler=SIG_DFL, sa_mask=[], sa_flags=SA_RESTORER, sa_restorer=0x7f1fa7148f20}, {sa_handler=0x561f54bcebe0, sa_mask=[], sa_flags=SA_RESTORER, sa_restorer=0x7f1fa7148f20}, 8) = 0
exit_group(0)                           = ?
+++ exited with 0 +++
root@yunfwe:~# 

recvfrom 系统调用将接收到的字符串 hello\n 和客户端地址由内核态返回到用户态,用户态紧接着通过 sendto 系统调用将数据发送给客户端地址,然后 close 系统调用关闭打开的文件描述符,用户进程执行完毕,Python 解释器退出。进程退出也是用户进程生命周期的最后一个系统调用,这时候内核将回收进程所占用的资源(打开的文件、占用的内存等),然后将进程的退出码返回给进程的调用者,这就是该进程的父进程。

题外话:无论是文件 IO 相关的系统调用,还是网络 IO 相关的系统调用,都会发生由 内核缓冲区 -> 用户空间缓冲区,或者 用户空间缓冲区 -> 内核缓冲区的数据复制,对于 Web 静态资源服务器,比如 Nginx 像客户端发送文件数据,需要先将硬盘上读取的文件由内核缓冲区 -> 用户空间缓冲区,然后再由用户空间缓冲区 -> 内核的网络套接字缓冲区 -> 发送到客户端,这个过程是低效的,因此 Linux 内核提供了 sendfile 这个系统调用,这个系统调用将文件描述符(必须是真实的文件)作为输入,套接字描述符作为输出,达到在两个套接字之间直接传递数据,完全在内核内部完成,而避免了数据在内核和用户进程间的复制,并且避免了多余的系统调用。所以 Nginx 作为号称可以榨干计算机最后一丝性能的服务软件,自然也提供了对 sendfile 的支持,只需要在提供静态资源访问的配置块中添加 sendfile on; 即可。

说完了程序为什么会发送阻塞,再说说如何避免阻塞,也就是非阻塞。非阻塞 IO 最简单的实现就是多线程,遇到耗时的 IO 操作,就开启一个线程处理,这样就可以做到不阻塞当前线程了。对于 socket 模块,也可以将套接字对象设置为非阻塞模式:socket.setblocking(false),这样再进行 recv 等系统调用时将抛出一个异常而不是阻塞当前线程。对于文件,Linux 内核提供了 AIO 相关的系统调用,但是市面上也并没有看到太多的应用。当今最常用的非阻塞 IO 的方法就是 IO多路复用,Linux 提供 IO 多路复用相关的几个系统调用就是 selectpollepoll。Python 协程的完整实现,IO 多路复用也是不可少的一个环节,Nginx 出了名和高效,也就是借助了 IO 多路复用的能力。

PS:中断其实也就是一种异步的事件通知机制,可见整套计算机系统的设计中穿插着各种的同步和异步。其他原因造成的阻塞,例如锁、waitsleep 函数等,各位有兴趣可以自己探索一番。

IO多路复用

IO 多路复用是如何在单个线程内并发处理多个网络套接字的一种方式,已经是系统高性能网络服务器的必备技术,Nginx、Redis 等也都用到了 IO 多路复用技术。

select

最先出来,且兼容性最好的 IO 多路复用技术是 select 系统调用。它的实现思路是,传递给它一个包含所有套接字描述符的数组,如果数组内的套接字有数据到来,select 将返回这些可以操作的套接字,并唤醒进程继续处理,否则将一直阻塞,或直到设置的 timeout 超时。Python 的标准库 select.select 提供了对它的封装,下面一个简单的例子看看如何使用。

select.select 函数申明:

select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
  • rlist 对应可读列表,此列表内的套接字有数据来到时会使 select 返回。
  • wlist 对应可写列表,此列表内的套接字可写时返回。
  • xlist 对应异常列表,此列表内的套接字异常,比如已被关闭时返回。
  • 可选的 timeout 参数,如果提供,则 select 会在超时后唤醒进程。因此,我们也可以使用 select.select([],[],[],2) 来模拟 time.sleep 函数。

select 函数返回一个包含三种情况套接字列表的元组,例如返回的 rlist 内的套接字,肯定是已经有数据可读的套接字,因此这时候调用 recv 就不会将进程阻塞了。同时处理多个客户端连接的原因也在这里,当有新的客户端连接,或者客户端有数据发送到服务端时,select 都将唤醒线程去处理,且处理过程中 recv 也不会造成单个套接字的阻塞导致整个线程内所有套接字都无法被处理。

下面看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import select
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
s.setblocking(False)
s.bind(('',9090))
s.listen(5)
print 'Server is listen on 0.0.0.0:9090'

rlist = [s, ]
while True:
rables , _ , xables = select.select(rlist,[],[])
for sock in rables:
if sock is s:
new_sock, addr = sock.accept()
new_sock.setblocking(False)
print 'New client : %s:%s' % addr
rlist.append(new_sock)
else:
msg = sock.recv(4096)
if msg == '' or 'exit' in msg:
sock.close()
rlist.remove(sock)
print 'Have a client exit...'
else:
print 'Recv msg: %s' % msg
sock.send('From server: ' + msg)

将代码运行起来:

root@yunfwe:~# python echo.py 
Server is listen on 0.0.0.0:9090

这时代码被阻塞到了 select 处,接着新开一个终端,通过 nc 命令连接此端口,并发送 hello 字符串:

root@yunfwe:~# nc 127.0.0.1 9090
hello
From server: hello

这时服务端打印如下:

root@yunfwe:~# python echo.py 
Server is listen on 0.0.0.0:9090
New client : 127.0.0.1:41700
Recv msg: hello

这个过程中,select 已经返回了两次,第一次是 nc 客户端接入的时候,执行了获取客户端套接字以及将客户端套接字也加入 rlist 中,第二次返回因为客户端发送了 hello 字符串。sock.recv(4096) 这一步,并不是指从套接字缓存区读取 4096 个字节,而是创建一个长度为 4096 的用户进程缓冲区,然后接收从内核中套接字缓冲区复制来的数据,所以并不一定 recv 返回的数据就有 4096 的长度。接着再新开一个终端再次用 nc 连接服务端:

root@yunfwe:~# nc 127.0.0.1 9090
hi
From server: hi

可以看到,第二个客户端也被正常的服务了!

root@yunfwe:~# python echo.py 
Server is listen on 0.0.0.0:9090
New client : 127.0.0.1:41700
Recv msg: hello

New client : 127.0.0.1:41702
Recv msg: hi

在单线程中,同时处理多个客户端的连接,而且如果设置了 timeout 关键字,就可以在超时后做一些额外的处理工作,IO 多路复用就提供了这么强大的能力。

接着两个客户端都向服务器发送 exit 字符串来退出连接:

root@yunfwe:~# python echo.py 
Server is listen on 0.0.0.0:9090
New client : 127.0.0.1:41700
Recv msg: hello

New client : 127.0.0.1:41702
Recv msg: hi

Have a client exit...
Have a client exit...

以上就是 select 的简单示例,可以看到 select 还是非常简单,那么 select 又是如何完成套接字的监听和唤醒进程的呢?

在上面的例子中,已经有两个客户端连接了,加上服务端监听服务端口的套接字,一共有三个套接字。在进行 select 系统调用时,select 会将调用线程分别放到这三个套接字的等待队列中,这时线程就失去了执行权,当其中任何一个套接字有数据到来,将触发中断处理程序,中断程序会将线程从等待队列移除,然后重新加入执行队列。线程被唤醒后就知道,肯定有至少一个套接字接收了数据,这时只需要遍历 select 返回的列表就可以拿到所有就绪的套接字了。

select 的实现非常简单,而且在多个平台都有实现(Windows,Linux,Mac 都可用),但是简单不一定代表高效,如果需要被同时监听的套接字太多,每次 select 都先检查套接字缓冲区是否已经有准备好的数据,如果有就直接返回,没有就需要将线程加入到每个套接字的等待队列,唤醒也需要从每个套接字的等待队列移除。这个过程需要经历至少一次对套接字列表的遍历,套接字列表越大,每次 select 调用的成本也越高,这也是 Linux 限制 select 默认最高 1024 个描述符的原因之一。

epoll

在 select 的基础上,升级发展出了 poll,但是基本的原理还是一样,select 中遇到的问题,poll 也没有解决,只是 poll 没有了 1024 最大文件描述符的限制。再之后,对 select 进行功能分离和引入 eventpoll 对象来作为代理,优化存储套接字的数据结构,使 IO 多路复用的效率大大提高,这就是 epoll。

在 select 中,每次 select 调用,都会将线程添加到套接字的等待队列,唤醒也需要从所有套接字队列中把线程移除,epoll 添加了一层代理(eventpoll),只需要使用 epoll_create 就可以创建它,然后使用 epoll_ctl 维护要监听的套接字,当添加要监听的套接字的时候,只是将 eventpoll 添加到这些套接字的等待队列,而不是直接将线程添加到等待队列了。当套接字接收到数据,中断处理进程会给 eventpoll 对象的 rdlist (就绪列表)中引用这些准备就绪的套接字。当程序执行到 epoll_wait 时,如果 rdlist 中已经有就绪的套接字就直接返回,否则将线程放到 eventpoll 中的等待队列里完成对线程的阻塞,当套接字再次收到数据,线程才会被再次唤醒。

正因为 rdlist 的存在,eventpoll 就不需要对所有套接字进行遍历来收集准备就绪的套接字了。而且只需要判断 rdlist 的有无数据就可以完成对线程的阻塞和唤醒,epoll_ctl 也使用红黑树来维护要监视的套接字,所以综上,就是 epoll 更高效和快速的原因。

Python 的 select.epoll 实现了对 epoll 的封装,刚才的示例使用 epoll 来改造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import select
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
s.setblocking(False)
s.bind(('',9090))
s.listen(5)
print 'Server is listen on 0.0.0.0:9090'

epoll = select.epoll()
epoll.register(s.fileno(), select.EPOLLIN)

fd_map = {s.fileno(): s}

while True:
events = epoll.poll()
for fd, event in events:
sock = fd_map[fd]
if sock is s:
new_sock, addr = sock.accept()
new_sock.setblocking(False)
print 'New client : %s:%s' % addr
epoll.register(new_sock.fileno(), select.EPOLLIN | select.EPOLLET)
fd_map[new_sock.fileno()] = new_sock
elif event == select.EPOLLIN:
msg = sock.recv(4096)
if msg == '' or 'exit' in msg:
sock.close()
epoll.unregister(fd)
del fd_map[fd]
print 'Have a client exit...'
else:
print 'Recv msg: %s' % msg
sock.send('From server: ' + msg)

程序的执行效果与 select 版一致,就不再演示了。epoll 与 select 在 Python 上的用法还是有些区别的,select 可以直接传递套接字对象,而 epoll 只能传递套接字的文件描述符(sock.fileno()),所以我们需要手动维护一个文件描述符到套接字的映射。epoll.register(s.fileno(), select.EPOLLIN) 就是在一开始的时候,注册端口监听的套接字,之后每次调用 epoll.poll() 的时候,就不需要重新注册了。select.EPOLLIN 就是监听此套接字的可读事件,对应着 select.EPOLLOUT 为可写事件。在注册客户端套接字的时候多出来 select.EPOLLET 是表示将此套接字的监听模式设置为边缘触发模式,接下来的代码对数据的处理逻辑就与 select 版相同了。

关于水平触发边缘触发:水平触发是默认的情况,当有数据来到,通过 recv 函数读取数据时,如果一次性没有读完(例如接收缓冲区太小),则此套接字在下次执行 epoll.poll() 的时候,依然会被返回,直到此套接字缓冲区被读空为止。如果存在大量没有读完的套接字,水平触发模式效率是比较低的。边缘触发则相反,如果数据一次没读完,也只通知一次,直到下次有新的数据到来。所以如果采用边缘触发模式,应该循环读取数据,直到套接字缓存区被读完抛出异常为止。因此,边缘触发的示例中,从套接字缓冲区读取数据的部分应该改为如下形式:

1
2
3
4
5
6
7
elif event == select.EPOLLIN:
msg = ''
while True:
try:
msg += sock.recv(4096)
except socket.error:
break

到这里,希望大家都对这些概念都有了基本的理解。

附录