python standard library selectors

python 标准库 selectors 高级 I/O 复用库

源码: Lib/selectors.py


概述

它的功能与linux的epoll,还是select模块,poll等类似;实现高效的I/O multiplexing, 常用于非阻塞的socket的编程中

模块定义了一个 BaseSelector的抽象基类, 以及它的子类,包括:EpollSelector, KqueueSelector等模块,

Classes hierarchy:

1
2
3
4
5
6
BaseSelector
+-- SelectSelector
+-- PollSelector
+-- EpollSelector
+-- DevpollSelector
+-- KqueueSelector

模块定义了两个常量,用于描述 event Mask

常数 意义
EVENT_READ 可读
EVENT_WRITE 可写

模块定义了一个 SelectorKey类,是一个 namedtuple 类型, 一般用这个类的实例来描述一个已经注册的文件对象的状态,

常用属性:

属性 描述
fileobj 表示已经注册的文件对象
fd 表示文件对象的描述符,是一个整数,它是文件对象的 fileno()方法的返回值
events 表示注册一个文件对象时,我们等待的events, 即上面的event Mask, 是可读呢还是可写
data 表示注册一个文件对象是邦定的data

class selectors.BaseSelector

定义了一个抽象基类,能够注册和取消注册,通过一个可选的timeout,等待流中的I/O事件。抽象基类无法实例化,所以通常使用DefaultSelector 代替,或者SelectSelector`, KqueueSelector 来实现。如果你想声明一个操作系统平台支持的实现, BaseSelector 和他的子类支持 上下文管理 实现。

  • abstractmethod register(fileobj, events, data=None)

    用于注册一个文件对象或监控I/O事件的抽象方法。fileobj 参数是一个用于监控的对象。它可能是一个整形文件描述符或者是一个拥有 fileno()方法的对象。返回一个新创建的SelectorKey` 类实例,或因为不存在的event mask 或文件描述符,或者该文件对象已经被注册过而引发ValueError 异常。

  • abstractmethod unregister(fileobj)

    用于注销注册文件对象或移除监控I/O事件的抽象方法。该文件对象必须是之前已经注册的对象。返回值关联了SelectorKey 实例,或因为不存在的event mask 或文件描述符,或者该文件对象已经被注册过而引发ValueError 异常。

  • modify(fileobj, events, data=None)

    用于修改一个注册过的文件对象,比如从监听可读变为监听可写;它其实就是register() 后再跟unregister(), 但是使用modify( ) 更高效;返回一个新创建的SelectorKey` 类实例,或因为不存在的event mask 或文件描述符,或者该文件对象已经被注册过而引发ValueError 异常。

  • abstractmethod select(timeout=None)

    用于选择满足我们监听的event的文件对象的抽象方法。

  • close()

    关闭 selector,为了 要确保所有的资源被释放,最后一定要调用该方法。

  • get_key(fileobj)

    返回注册文件对象的 SelectorKey 实例。该实例关联文件对象,若文件对象未注册,将抛出KeyError 异常。

  • abstractmethod get_map()

    返回selectors key 的文件对象的映射。

  • class selectors.DefaultSelector

    默认的selector类,其中一个子类的别名,它自动选择为当前环境中最有效的Selector。

    selectors模块默认会用epoll,如果你的系统中没有epoll(比如windows)则会自动使用select

  • class selectors.SelectSelector

    select.select()-based selector.

  • class selectors.PollSelector

    select.poll()-based selector.

  • class selectors.EpollSelector

    select.epoll()-based selector.fileno()This returns the file descriptor used by the underlying select.epoll() object.

  • class selectors.DevpollSelector

    select.devpoll()-based selector.fileno()This returns the file descriptor used by the underlying select.devpoll() object.3.5 新版功能.

  • class selectors.KqueueSelector

    select.kqueue()-based selector.fileno()This returns the file descriptor used by the underlying select.kqueue() object.

官方示例

Here is a simple echo server implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)

一个基于socket的客户端与服务器端实例

基于socket的服务器端实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
selectors_echo_server.py
import selectors
import socket

mysel = selectors.DefaultSelector()
keep_running = True


def read(connection, mask):
"Callback for read events"
global keep_running

client_address = connection.getpeername()
print('read({})'.format(client_address))
data = connection.recv(1024)
if data:
# A readable client socket has data
print(' received {!r}'.format(data))
connection.sendall(data)
else:
# Interpret empty result as closed connection
print(' closing')
mysel.unregister(connection)
connection.close()
# Tell the main loop to stop
keep_running = False


def accept(sock, mask):
"Callback for new connections"
new_connection, addr = sock.accept()
print('accept({})'.format(addr))
new_connection.setblocking(False)
mysel.register(new_connection, selectors.EVENT_READ, read)


server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address))
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.bind(server_address)
server.listen(5)

mysel.register(server, selectors.EVENT_READ, accept)

while keep_running:
print('waiting for I/O')
for key, mask in mysel.select(timeout=1):
callback = key.data
callback(key.fileobj, mask)

print('shutting down')
mysel.close()

基于socket的客户端实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
selectors_echo_client.py
import selectors
import socket

mysel = selectors.DefaultSelector()
keep_running = True
outgoing = [
b'It will be repeated.',
b'This is the message. ',
]
bytes_sent = 0
bytes_received = 0

# Connecting is a blocking operation, so call setblocking()
# after it returns.
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
sock.setblocking(False)

# Set up the selector to watch for when the socket is ready
# to send data as well as when there is data to read.
mysel.register(
sock,
selectors.EVENT_READ | selectors.EVENT_WRITE,
)

while keep_running:
print('waiting for I/O')
for key, mask in mysel.select(timeout=1):
connection = key.fileobj
client_address = connection.getpeername()
print('client({})'.format(client_address))

if mask & selectors.EVENT_READ:
print(' ready to read')
data = connection.recv(1024)
if data:
# A readable client socket has data
print(' received {!r}'.format(data))
bytes_received += len(data)

# Interpret empty result as closed connection,
# and also close when we have received a copy
# of all of the data sent.
keep_running = not (
data or
(bytes_received and
(bytes_received == bytes_sent))
)

if mask & selectors.EVENT_WRITE:
print(' ready to write')
if not outgoing:
# We are out of messages, so we no longer need to
# write anything. Change our registration to let
# us keep reading responses from the server.
print(' switching to read-only')
mysel.modify(sock, selectors.EVENT_READ)
else:
# Send the next message.
next_msg = outgoing.pop()
print(' sending {!r}'.format(next_msg))
sock.sendall(next_msg)
bytes_sent += len(next_msg)

print('shutting down')
mysel.unregister(connection)
connection.close()
mysel.close()

Server和client交互

1
2
3
4
5
6
7
8
9
10
11
12
$ python3 source/selectors/selectors_echo_server.py
starting up on localhost port 10000
waiting for I/O
waiting for I/O
accept(('127.0.0.1', 59850))
waiting for I/O
read(('127.0.0.1', 59850))
received b'This is the message. It will be repeated.'
waiting for I/O
read(('127.0.0.1', 59850))
closing
shutting down
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ python3 source/selectors/selectors_echo_client.py
connecting to localhost port 10000
waiting for I/O
client(('127.0.0.1', 10000))
ready to write
sending b'This is the message. '
waiting for I/O
client(('127.0.0.1', 10000))
ready to write
sending b'It will be repeated.'
waiting for I/O
client(('127.0.0.1', 10000))
ready to write
switching to read-only
waiting for I/O
client(('127.0.0.1', 10000))
ready to read
received b'This is the message. It will be repeated.'
shutting down

参考文档

python标准库之selectors

坚持原创技术分享,您的支持将鼓励我继续创作!