python multiprocessing.Queue模块源码阅读
之前有记录过python Queue的使用,以及multiprocessing.Process模块。现在看看multiprocessing.Queue的具体工作方式(本文基于Python 3.7.4)。
multiprocessing.Queue定义在queues.py中,除此之外还定义了SimpleQueue和JoinableQueue,是FIFO队列。
类似multiprocessing.Process,首先导入了context,判断当前系统类型,并相应地使用对应的实现。使用的multiprocessing.Queue()的时候,实际上是调用了context中的Queue()方法,设置了ctx。
| |
SimpleQueue
SimpleQueue很简单,其实就是一个带锁的pipe,
- 主进程和子进程分别使用各自的lock,实现写入pipe和读取pipe是并发安全的
之所以
put()和get()可以使用不同的lock,是因为pipe两端的读写已经是并发安全的了。 - 用
multiprocessing.Pipe来实现消息传递,支持多读多写
关于
os.pipe和multiprocessing.Pipe
os.pipe:在Linux上底层访问的是传统的POSIX pipes,单向,使用encode/decode序列化multiprocessing.Pipe:使用multiprocessing.Connection实现,在Linux上使用POSIX sockets完成数据发送,双向,使用pickle/unpickle序列化
Queue
Queue可以看做在SimpleQueue的基础上,增加了生产者端的发送buffer、支持设置队列大小,以及get()和put()的无阻塞调用。

初始化
__init__()初始化了这么几个比较重要的变量,
_maxsize:队列最大size_reader,_writer:multiprocessing.Connection实例,负责数据的收发_rlock:multiprocessing.Lock,进程间共享,保证_reader.recv_bytes()并发安全_wlock:multiprocessing.Lock,进程间共享,保证_writer.send_bytes()并发安全_sem:队列长度信号量,计数器初始化为_maxsize_notempty:条件变量,同步生产者放入_buffer和_buffer中数据的发送_buffer:每个进程有自己独立的buffer,线程安全,size其实是由_sem来控制_thread:生产者数据发送线程
put
首先_sem.acquire(),计数器-1,如果不为零,说明队列还可以写入。
| |
然后获得_notempty的锁,如果发送线程未创建,则创建。追加元素至buffer后,_notempty.notify()通知发送线程。生产者中数据的发送由单独的线程完成,主线程只负责将数据放入buffer。
| |
feed线程
_start_thread()创建thread对象后,设置daemon属性为True,目的是随主线程的退出而退出,不必手动添加停止的逻辑。接着就启动线程。
具体发送时,不断等待buffer中有元素;如果buffer中有元素,popleft()并发送,直到发送完。
为何_notempty的锁在popleft()就释放了?
- buffer是
collections.deque,本身就是并发安全的 _notempty锁的目的其实是为了保护_notempty的修改,和put()中的目的类似,但并不是为了保护buffer
| |
get
对于block=True的情况
类似SimpleQueue,一直等待_reader.recv_bytes(),直到收到数据。在_reader.recv_bytes()以后,_sem.release(),计数器+1,表示从队列消耗了一个元素。此时如果有进程调用_sem.acquire()并在等待,那么_sem.release()会唤醒其中一个等待的进程。
对于block=False的情况
由于不能阻塞,因此不能一直等待_reader.recv_bytes()。
- 如果在
timeout内_rlock都没有获得锁,则返回Empty _reader.poll()(底层还是使用select来实现的)判断是否在timeout内_reader可读,如果不可读,则返回Empty- 最后
_reader.recv_bytes(),并_sem.release()
| |
序列化和反序列化
序列化和反序列化使用的是pickle来实现,那么如何判断消息的边界?Connection中定义了一个规则,在header存放长度。
| |
close和join_thread
由于生产者启动了一个线程来负责发送,元素首先append到buffer,然后发送。在进程结束时,如何确保元素发送完毕的问题?
在启动feed线程的时候,创建了两个Finalize对象,_finalize_close和_finalize_join,前者的优先级较高,并set了_close和_jointhread。当进程退出的时候,会自动地先后调用这两个函数(基于atexit实现),
_finalize_close:会append元素_sentinel = object()到buffer,feed线程如果看到_sentinel,会调用close()并停止服务_finalize_join:join feed线程
| |
模块同样也提供了close()和join_thread()方法,调用的其实就是_close和_jointhread。如果先手动调用,然后aexit调用,那不会回有问题?首次调用后,都会从util模块的_finalizer_registry中移除,因此不会存在重复调用_finalize_close和_finalize_join的问题。
deadlock
这几个stackoverflow(1,2,3)的问题很类似,在官方的文档中也有提及(python 2,python 3),总结下来就是,join一个调用put的进程,且这个进程尚未把buffer中所有的元素写入pipe时,可能会导致死锁。
| |
原因是,Queue使用了os的pipe来进行数据的传输,而pipe的大小是有限的。如果数据过大,_writer.send_bytes()在写入数据到pipe的时候会阻塞。如果此时join这个子进程,那进程本身已经卡住了,join永远等不到进程结束。
JoinableQueue
JoinableQueue基于Queue实现,覆盖了put(),并新增了,
task_done():表示先前放入队列中的元素被取走了,由消费者调用join():阻塞直到队列中所有元素都被取走
初始化
__init__()调用了Queue的初始化函数,并额外初始化了,
_unfinished_tasks:信号量,表示队列中当前未取走的元素_cond:条件变量,同步join()和task_done()
put
和Queue的put()基本一致。多出来的一点是,每次put()都会对_unfinished_tasks的计数器+1。
| |
task_done和join
task_done()对_unfinished_tasks的计数器进行-1,如果计数器为0,则通知等待在join()的进程。
| |
Reference
- Python os.pipe vs multiprocessing.Pipe
- Daemon is not daemon, but what is it?
- Python 中的 multiprocess.Queue
- Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
- Process.join() and queue don’t work with large numbers
- Script using multiprocessing module does not terminate
- https://docs.python.org/3/library/multiprocessing.html#all-start-methods
- Joining multiprocessing queue takes a long time