python multiprocessing.Queue模块源码阅读
之前有记录过python Queue的使用,以及multiprocessing.Process模块。现在看看multiprocessing.Queue
的具体工作方式(本文基于Python
3.7.4)。
multiprocessing.Queue
定义在queues.py中,除此之外还定义了SimpleQueue
和JoinableQueue
,是FIFO队列。
类似multiprocessing.Process
,首先导入了context
,判断当前系统类型,并相应地使用对应的实现。使用的multiprocessing.Queue()
的时候,实际上是调用了context
中的Queue()
方法,设置了ctx
。
1
2
3
4def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())
SimpleQueue
SimpleQueue
很简单,其实就是一个带锁的pipe, *
主进程和子进程分别使用各自的lock,实现写入pipe和读取pipe是并发安全的
之所以put()
和get()
可以使用不同的lock,是因为pipe两端的读写已经是并发安全的了。
* 用multiprocessing.Pipe
来实现消息传递,支持多读多写
关于
os.pipe
和multiprocessing.Pipe
*os.pipe
:在Linux上底层访问的是传统的POSIX pipes,单向,使用encode/decode序列化 *multiprocessing.Pipe
:使用multiprocessing.Connection
实现,在Linux上使用POSIX sockets完成数据发送,双向,使用pickle/unpickle序列化
Queue
Queue
可以看做在SimpleQueue
的基础上,增加了生产者端的发送buffer、支持设置队列大小,以及get()
和put()
的无阻塞调用。
初始化
__init__()
初始化了这么几个比较重要的变量, *
_maxsize
:队列最大size *
_reader
,_writer
:multiprocessing.Connection
实例,负责数据的收发
*
_rlock
:multiprocessing.Lock
,进程间共享,保证_reader.recv_bytes()
并发安全
*
_wlock
:multiprocessing.Lock
,进程间共享,保证_writer.send_bytes()
并发安全
* _sem
:队列长度信号量,计数器初始化为_maxsize
*
_notempty
:条件变量,同步生产者放入_buffer
和_buffer
中数据的发送
*
_buffer
:每个进程有自己独立的buffer,线程安全,size其实是由_sem
来控制
* _thread
:生产者数据发送线程
put
首先_sem.acquire()
,计数器-1,如果不为零,说明队列还可以写入。
1
2if not self._sem.acquire(block, timeout):
raise Full
然后获得_notempty
的锁,如果发送线程未创建,则创建。追加元素至buffer后,_notempty.notify()
通知发送线程。生产者中数据的发送由单独的线程完成,主线程只负责将数据放入buffer。
1
2
3
4
5with self._notempty: # acquire保护_notempty和_thread的修改
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
feed线程
_start_thread()
创建thread对象后,设置daemon
属性为True
,目的是随主线程的退出而退出,不必手动添加停止的逻辑。接着就启动线程。
具体发送时,不断等待buffer中有元素;如果buffer中有元素,popleft()
并发送,直到发送完。
为何_notempty
的锁在popleft()
就释放了? 1.
buffer是collections.deque
,本身就是并发安全的 2.
_notempty
锁的目的其实是为了保护_notempty
的修改,和put()
中的目的类似,但并不是为了保护buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30while 1:
try:
nacquire()
try:
if not buffer:
nwait()
finally:
nrelease()
try:
while 1:
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
close()
return
# serialize the data before acquiring the lock
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
send_bytes(obj)
else:
wacquire()
try:
send_bytes(obj)
finally:
wrelease()
except IndexError:
pass
except Exception as e:
# ......
get
对于block=True
的情况
类似SimpleQueue
,一直等待_reader.recv_bytes()
,直到收到数据。在_reader.recv_bytes()
以后,_sem.release()
,计数器+1,表示从队列消耗了一个元素。此时如果有进程调用_sem.acquire()
并在等待,那么_sem.release()
会唤醒其中一个等待的进程。
对于block=False
的情况
由于不能阻塞,因此不能一直等待_reader.recv_bytes()
。
- 如果在
timeout
内_rlock
都没有获得锁,则返回Empty
_reader.poll()
(底层还是使用select
来实现的)判断是否在timeout
内_reader
可读,如果不可读,则返回Empty
- 最后
_reader.recv_bytes()
,并_sem.release()
1 | def get(self, block=True, timeout=None): |
序列化和反序列化
序列化和反序列化使用的是pickle来实现,那么如何判断消息的边界?Connection
中定义了一个规则,在header
存放长度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22def _send_bytes(self, buf):
n = len(buf)
# For wire compatibility with 3.2 and lower
header = struct.pack("!i", n)
if n > 16384:
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
self._send(header)
self._send(buf)
else:
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
self._send(header + buf)
def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)
close和join_thread
由于生产者启动了一个线程来负责发送,元素首先append到buffer,然后发送。在进程结束时,如何确保元素发送完毕的问题?
在启动feed线程的时候,创建了两个Finalize
对象,_finalize_close
和_finalize_join
,前者的优先级较高,并set了_close
和_jointhread
。当进程退出的时候,会自动地先后调用这两个函数(基于atexit
实现),
1.
_finalize_close
:会append元素_sentinel = object()
到buffer,feed线程如果看到_sentinel
,会调用close()
并停止服务
2. _finalize_join
:join feed线程
1 | if not self._joincancelled: |
模块同样也提供了close()
和join_thread()
方法,调用的其实就是_close
和_jointhread
。如果先手动调用,然后aexit
调用,那不会回有问题?首次调用后,都会从util
模块的_finalizer_registry
中移除,因此不会存在重复调用_finalize_close
和_finalize_join
的问题。
deadlock
这几个stackoverflow(1,2,3)的问题很类似,在官方的文档中也有提及(python 2,python 3),总结下来就是,join一个调用put的进程,且这个进程尚未把buffer中所有的元素写入pipe时,可能会导致死锁。
1 | import os |
原因是,Queue
使用了os的pipe来进行数据的传输,而pipe的大小是有限的。如果数据过大,_writer.send_bytes()
在写入数据到pipe的时候会阻塞。如果此时join这个子进程,那进程本身已经卡住了,join永远等不到进程结束。
JoinableQueue
JoinableQueue
基于Queue
实现,覆盖了put()
,并新增了,
*
task_done()
:表示先前放入队列中的元素被取走了,由消费者调用
* join()
:阻塞直到队列中所有元素都被取走
初始化
__init__()
调用了Queue
的初始化函数,并额外初始化了,
* _unfinished_tasks
:信号量,表示队列中当前未取走的元素 *
_cond
:条件变量,同步join()
和task_done()
put
和Queue
的put()
基本一致。多出来的一点是,每次put()
都会对_unfinished_tasks
的计数器+1。
1
2
3
4
5
6
7
8def put(self, obj, block=True, timeout=None):
# ...
with self._notempty, self._cond:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._unfinished_tasks.release()
self._notempty.notify()
task_done和join
task_done()
对_unfinished_tasks
的计数器进行-1,如果计数器为0,则通知等待在join()
的进程。
1 | def task_done(self): |
Reference
- Python os.pipe vs multiprocessing.Pipe
- Daemon is not daemon, but what is it?
- Python 中的 multiprocess.Queue
- Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
- Process.join() and queue don't work with large numbers
- Script using multiprocessing module does not terminate
- https://docs.python.org/3/library/multiprocessing.html#all-start-methods
- Joining multiprocessing queue takes a long time