12.5. select — 高效地等待 I/O

本节目标:学会当输入输出准备就绪时接收通知

select 模块提供了一组访问不同平台下 I/O 监控函数的功能。最轻便高效的接口是 POSIX (可移植性系统接口) 函数 select(),它可以在 Unix 和 Windows 下同时工作。模块中还包含另一个 poll() 函数,不过它是 Unix-only 的 API,还有几个只能在 Unix 变种系统中工作的函数。

注意

最新的 selectors 模块提供以 select 为基础的上层接口 API 。使用 selectors 可以更容易编写代码,除非有使用 select 的必要,否则我们应该使用的是 selectors 模块。

使用 select()

Python 的 select() 函数是部署底层操作系统的直接接口。它监视着套接字,打开的文件和管道(任何调用 fileno() 方法后会返回有效文件描述符的东西)直到它们变得可读可写或有错误发生。 select() 让我们同时监视多个连接变得简单,同时比在 Python 中使用套接字超时写轮询池要有效,因为这些监视发生在操作系统网络层而不是在解释器层。

注意

select() 与 Python 的文件对象结合使用在 Unix 下是有效的,不过在 Windows 下无效。

socket 章节的回显服务器例子可以用 select() 来扩展成面向多个连接的例子。新的例子会创建一个非阻塞的 TCP/IP 套接字并注册监听某个地址。

select_echo_server.py

import select
import socket
import sys
import queue

# 创建 TCP/IP 套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# 绑定套接字到端口
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
      file=sys.stderr)
server.bind(server_address)

# 监听即将到来的连接
server.listen(5)

接下来 select() 中的参数是三个包含向监听器通信渠道的列表。第一个列表中的对象用于检测即将读取的数据,第二个列表包含的对象会在缓存区有空间的时候接收传出的数据,第三个则是接收它们执行期间发生的错误(通常是输入和输出对象的结合)。之后我们要给 select() 写一下输入源以及输出对象。

# 我们想读的套接字
inputs = [server]

# 我们想写的套接字
outputs = []

连接会在服务器主循环中在各自列表中添加,删除。这个版本的服务器会在发送任何数据前等待套接字直到可写(之前的是立即回复),所以每个输出连接都需要配备一个队列作为数据缓存区来发送数据。

# 消息传出队列 格式:(socket:Queue)
message_queues = {}

在服务器端的程序循环部分,我们调用 select() 来阻塞并等待网络活动。

while inputs:

    # 等待至少有一个套接字准备好了进行后续处理。

        print('waiting for the next event', file=sys.stderr)
    readable, writable, exceptional = select.select(inputs,
                                                    outputs,
                                                    inputs)

select() 返回三个新的列表,里面的内容是传入的列表的内容。所有在 可读(readable) 列表里的套接字都有数据缓存并且都可读。所有在 可写(writable) 列表里的套接字的缓冲区都有空闲空间并都可写。那些返回到 exceptional 中的套接字都发生了错误(具体「异常情况」的定义(描述)取决于平台)。

「可读」的套接字会有下面三种情况。如果这个套接字是主「服务器」套接字,一面继续来监听连接,另一面「可读」这个条件要让它准备好接收其他连接。除了要添加新的连接到 inputs 中供监控,还要设置这个客户端套接字为非阻塞状态。

    # inputs 处理
    for s in readable:

        if s is server:
            # 可读的套接字需要准备好接收连接。
            connection, client_address = s.accept()
            print('  connection from', client_address,
                  file=sys.stderr)
            connection.setblocking(0)
            inputs.append(connection)

            # 把我们想发送的数据队列给它。
            message_queues[connection] = queue.Queue()

另一种情况是与已经发送过数据的客户端建立连接。我们用 recv() 读取,然后将它放到队列中,这样我们就可以通过该套接字发送出去然后返回到客户端。

        else:
            data = s.recv(1024)
            if data:
                # 一个有数据的可读客户端
                print('  received {!r} from {}'.format(
                    data, s.getpeername()), file=sys.stderr,
                )
                message_queues[s].put(data)
                # 添加到输出列表用来做响应
                if s not in outputs:
                    outputs.append(s)

如果是一个可读的套接字但 没有 数据,客户端也断开了连接,我们就要准备断开流。

            else:
                # 空结果表明要关闭连接
                print('  closing', client_address,
                      file=sys.stderr)
                # 停止监听该链接的输入
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()

                # 删除这个消息队列
                del message_queues[s]

同样,可写的连接也有多种情况。如果此连接的队列中存在数据,那我们的下一个消息就是要把它发送出去。否则我们要把该连接从输出列表中移除,这样下一次循环 select() 就不会标记这个套接字为要发送数据了。

    # outputs 处理
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # 没有消息在等待,我们要关闭掉。
            print('  ', s.getpeername(), 'queue empty',
                  file=sys.stderr)
            outputs.remove(s)
        else:
            print('  sending {!r} to {}'.format(next_msg,
                                                s.getpeername()),
                  file=sys.stderr)
            s.send(next_msg)

最后,如果套接字中♂出了一个错误,那它就要关闭。

    # 处理 「异常状况」
    for s in exceptional:
        print('exception condition on', s.getpeername(),
              file=sys.stderr)
        # 停止监听此连接的输入。
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        # 移除此消息队列。
        del message_queues[s]

客户端程序例子使用两个套接字来演示服务器端是怎么用 select() 同时管理多个连接的。客户端会建立两个 TCP/IP 套接字连接服务器。

select_echo_multiclient.py

import socket
import sys

messages = [
    'This is the message. ',
    'It will be sent ',
    'in parts.',
]
server_address = ('localhost', 10000)

# 创建 TCP/IP 套接字
socks = [
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]

# 连接服务器所监听的套接字端口。
print('connecting to {} port {}'.format(*server_address),
      file=sys.stderr)
for s in socks:
    s.connect(server_address)

之后我们在同一时间让每个套接字都发送一段消息,并在写入新数据后读取所有的响应。

for message in messages:
    outgoing_data = message.encode()

    # 让每个套接字都发送消息。
    for s in socks:
        print('{}: sending {!r}'.format(s.getsockname(),
                                        outgoing_data),
              file=sys.stderr)
        s.send(outgoing_data)

    # 让每个套接字都读取响应。
    for s in socks:
        data = s.recv(1024)
        print('{}: received {!r}'.format(s.getsockname(),
                                         data),
              file=sys.stderr)
        if not data:
            print('closing socket', s.getsockname(),
                  file=sys.stderr)
            s.close()

起一个窗口运行服务器,起另一个窗口运行客户端。我们会看到如下输出,端口数字可能会有所不同。

$ python3 select_echo_server.py
starting up on localhost port 10000
waiting for the next event
  connection from ('127.0.0.1', 61003)
waiting for the next event
  connection from ('127.0.0.1', 61004)
waiting for the next event
  received b'This is the message. ' from ('127.0.0.1', 61003)
  received b'This is the message. ' from ('127.0.0.1', 61004)
waiting for the next event
  sending b'This is the message. ' to ('127.0.0.1', 61003)
  sending b'This is the message. ' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  received b'It will be sent ' from ('127.0.0.1', 61003)
  received b'It will be sent ' from ('127.0.0.1', 61004)
waiting for the next event
  sending b'It will be sent ' to ('127.0.0.1', 61003)
  sending b'It will be sent ' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61003)
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61004)
  sending b'in parts.' to ('127.0.0.1', 61003)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
  sending b'in parts.' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  closing ('127.0.0.1', 61004)
  closing ('127.0.0.1', 61004)
waiting for the next event

这是客户端中的发送和接收的输出。

$ python3 select_echo_multiclient.py
connecting to localhost port 10000
('127.0.0.1', 61003): sending b'This is the message. '
('127.0.0.1', 61004): sending b'This is the message. '
('127.0.0.1', 61003): received b'This is the message. '
('127.0.0.1', 61004): received b'This is the message. '
('127.0.0.1', 61003): sending b'It will be sent '
('127.0.0.1', 61004): sending b'It will be sent '
('127.0.0.1', 61003): received b'It will be sent '
('127.0.0.1', 61004): received b'It will be sent '
('127.0.0.1', 61003): sending b'in parts.'
('127.0.0.1', 61004): sending b'in parts.'
('127.0.0.1', 61003): received b'in parts.'
('127.0.0.1', 61004): received b'in parts.'

带超时时间的非阻塞 I/O

select() 还有第四个可选参数,它接收一个数字(秒)作为断开监控的超时时间(前提是没有任何通道再活动)。我们可以在一个大型程序循环中使用超时时间,这样我们可以在检测网络输入时做些其他事情。

当到了超时时间后,select() 就会返回三个空列表。我们更新一下服务器端例子,给 select() 加上第四个超时参数,然后相应的处理一下 select() 的空列表返回。

select_echo_server_timeout.py

    readable, writable, exceptional = select.select(inputs,
                                                    outputs,
                                                    inputs,
                                                    timeout)

    if not (readable or writable or exceptional):
        print('  timed out, do some other work here',
              file=sys.stderr)
        continue

同样在客户端每次发送消息后「降」下速来模拟传输过程中的延迟情况。

select_echo_slow_client.py

import socket
import sys
import time

# 创建 TCP/IP 套接字。
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接服务器所监听的套接字端口。
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address),
      file=sys.stderr)
sock.connect(server_address)

time.sleep(1)

messages = [
    'Part one of the message.',
    'Part two of the message.',
]
amount_expected = len(''.join(messages))

try:

    # 发送数据
    for message in messages:
        data = message.encode()
        print('sending {!r}'.format(data), file=sys.stderr)
        sock.sendall(data)
        time.sleep(1.5)

    # 尝试获取响应
    amount_received = 0

    while amount_received < amount_expected:
        data = sock.recv(16)
        amount_received += len(data)
        print('received {!r}'.format(data), file=sys.stderr)

finally:
    print('closing socket', file=sys.stderr)
    sock.close()

运行新版服务器和慢慢的客户端:

$ python3 select_echo_server_timeout.py
starting up on localhost port 10000
waiting for the next event
  timed out, do some other work here
waiting for the next event
  connection from ('127.0.0.1', 61144)
waiting for the next event
  timed out, do some other work here
waiting for the next event
  received b'Part one of the message.' from ('127.0.0.1', 61144)
waiting for the next event
  sending b'Part one of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
  timed out, do some other work here
waiting for the next event
  received b'Part two of the message.' from ('127.0.0.1', 61144)
waiting for the next event
  sending b'Part two of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
  timed out, do some other work here
waiting for the next event
closing ('127.0.0.1', 61144)
waiting for the next event
  timed out, do some other work here

慢慢的客户端输出如下:

$ python3 select_echo_slow_client.py
connecting to localhost port 10000
sending b'Part one of the message.'
sending b'Part two of the message.'
received b'Part one of the '
received b'message.Part two'
received b' of the message.'
closing socket

使用 poll()

poll() 函数的功能与 select() 差不多,不过从底层来看更加高效一些。代价是 poll() 无法在 Windows 下使用,所以使用 poll() 的话可移植性也更低。

使用 poll() 的回显服务器开启套接字的配置代码与之前的例子中的一样。

select_poll_echo_server.py

import select
import socket
import sys
import queue

# 创建一个 TCP/IP 套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# 绑定套接字端口
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
      file=sys.stderr)
server.bind(server_address)

# 监听即将到来的连接
server.listen(5)

# 保存即将发出的消息。
message_queues = {}

poll() 中的超时值是毫秒级的,所以我们传入 1000 来表示停顿一秒。

# 让它不会永远阻塞住,设置个超时时间 (毫秒)
TIMEOUT = 1000

在 Python 中实现 poll() 时要有一个管理着注册过的数据通道的类。通道可以调用 register() 来添加,所添加的标识是该通道所要监控的事件。下表写出了所有对应的标识。

poll() 的事件标识

事件 简介
POLLIN 输入就绪
POLLPRI 优先输入就绪
POLLOUT 可以接收输出
POLLERR 错误
POLLHUP 通道已关闭
POLLNVAL 通道未开启

回显服务器需要设置几个套接字为只读,设置几个为可读可写。我们将合适的标识结合放到 READ_ONLYREAD_WRITE 中。

# 通常的标识设置
READ_ONLY = (
    select.POLLIN |
    select.POLLPRI |
    select.POLLHUP |
    select.POLLERR
)
READ_WRITE = READ_ONLY | select.POLLOUT

我们注册 server 套接字,这样所有要连接的客户端或发送的数据都会触发一个事件。

# 设置 poller
poller = select.poll()
poller.register(server, READ_ONLY)

由于 poll() 返回的是存放套接字文件描述符和事件标识的元组的列表,映射一个文件描述符代号到具体的对象对我们之后访问 socket 是很有必要的。

# 将文件描述符映射到套接字对象
fd_to_socket = {
    server.fileno(): server,
}

服务器端的循环会不断调用 poll() 之后处理返回的「事件」来查看套接字并基于事件的标识采取下一步行动。

while True:

    # 等待直到至少有一个套接字准备好接受处理
    print('waiting for the next event', file=sys.stderr)
    events = poller.poll(TIMEOUT)

    for fd, flag in events:

        # 通过它的文件描述符取出套接字对象
        s = fd_to_socket[fd]

select() 一样,当主服务器套接字是 「可读」时,其真实的意思是有一个从客户端来的挂起连接。新的连接需要注册为 READ_ONLY 来监视它所传过来的数据。

        # 处理 inputs
        if flag & (select.POLLIN | select.POLLPRI):

            if s is server:
                # 可读表示准备好接受新连接
                connection, client_address = s.accept()
                print('  connection', client_address,
                      file=sys.stderr)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()] = connection
                poller.register(connection, READ_ONLY)

                # 给这个连接一个发送数据用的队列
                message_queues[connection] = queue.Queue()

其他的客户端套接字我们用 recv() 来访问数据等待被读取。

            else:
                data = s.recv(1024)

如果 recv() 返回了任何数据,就会被放到该套接字的待发送队列中,之后该套接字的标识使用 modify() 修改一下,这样 poll() 会监控到这个套接字准备好接收数据了。

                if data:
                    # 可读的客户端套接字存在数据
                    print('  received {!r} from {}'.format(
                        data, s.getpeername()), file=sys.stderr,
                    )
                    message_queues[s].put(data)
                    # 添加输出通道以响应
                    poller.modify(s, READ_WRITE)

recv() 返回空字符串标识客户端断开了连接,所以我们用 unregister() 来告诉 poll 对象忽略该套接字。

                else:
                    # 空的返回结果表示连接断开
                    print('  closing', client_address,
                          file=sys.stderr)
                    # 停止监听该连接的输入
                    poller.unregister(s)
                    s.close()

                    # 删除它的消息队列
                    del message_queues[s]

POLLHUP 标识表示某客户端处于「挂起」状态且没有准确的关闭。这样服务器也要停止轮询该客户端。

        elif flag & select.POLLHUP:
            # 客户端挂起
            print('  closing', client_address, '(HUP)',
                  file=sys.stderr)
            # 停止监听该连接输入
            poller.unregister(s)
            s.close()

处理可写的套接字与是用 select() 的例子差不多,除了给 poller 改为用 modify() 改变套接字的标识,而不是将它从输出列表中删除。

        elif flag & select.POLLOUT:
            # 套接字准备好发送任何数据
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                # 无消息等待,所以我们停止检测
                print(s.getpeername(), 'queue empty',
                      file=sys.stderr)
                poller.modify(s, READ_ONLY)
            else:
                print('  sending {!r} to {}'.format(
                    next_msg, s.getpeername()), file=sys.stderr,
                )
                s.send(next_msg)

最后,任何由 POLLERR 引起的事件我们都关闭套接字。

        elif flag & select.POLLERR:
            print('  exception on', s.getpeername(),
                  file=sys.stderr)
            # 停止监听该连接的输入
            poller.unregister(s)
            s.close()

            # 删除它的消息队列
            del message_queues[s]

我们将使用 poll 版本的服务器端与 select_echo_multiclient.py 一起运行,下面是输出:

$ python3 select_poll_echo_server.py
starting up on localhost port 10000
waiting for the next event
waiting for the next event
waiting for the next event
waiting for the next event
  connection ('127.0.0.1', 61253)
waiting for the next event
  connection ('127.0.0.1', 61254)
waiting for the next event
  received b'This is the message. ' from ('127.0.0.1', 61253)
  received b'This is the message. ' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'This is the message. ' to ('127.0.0.1', 61253)
  sending b'This is the message. ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  received b'It will be sent ' from ('127.0.0.1', 61253)
  received b'It will be sent ' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'It will be sent ' to ('127.0.0.1', 61253)
  sending b'It will be sent ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61253)
  received b'in parts.' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'in parts.' to ('127.0.0.1', 61253)
  sending b'in parts.' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  closing ('127.0.0.1', 61254)
waiting for the next event
  closing ('127.0.0.1', 61254)
waiting for the next event

特定平台选择

select 提供的 epoll 可移植性较低,edge polling API 由 linux 支持。kqueue,使用了 BSD 的 内和队列kevent BSD 的 内核事件 接口。更多有关他们的工作方式请参阅操作系统库文档。

推荐阅读

  • select 标准库文档
  • selectorsselect 的高级封装。
  • Socket Programming HOWOTO – Gordon McMillan 撰写的教学指南,已经包含在标准库文档中。
  • socket – 低级网络通信。
  • SocketServer – 创建网络服务器的框架。
  • asyncio – 异步 I/O 框架。
  • Unix 网络编程, 卷 1: 网络套接字 API,第三版 – 由 W. Richard Stevens,Bill Fenner,和 Andrew M. Rudoff. Published by Addison-Wesley Professional 撰写,2004. ISBN-10: 0131411551
  • Python 网络编程基础, 第三版 – 由 Brandon Rhodes 和 John Goerzen 撰写。 由 Apress 出版社发布, 2014. ISBN-10: 1430258543