之前有记录过python Queue的使用,以及multiprocessing.Process模块。现在看看multiprocessing.Queue
的具体工作方式(本文基于Python 3.7.4)。
multiprocessing.Queue
定义在queues.py中,除此之外还定义了SimpleQueue
和JoinableQueue
,是FIFO队列。
类似multiprocessing.Process
,首先导入了context
,判断当前系统类型,并相应地使用对应的实现。使用的multiprocessing.Queue()
的时候,实际上是调用了context
中的Queue()
方法,设置了ctx
。
|
|
SimpleQueue
SimpleQueue
很简单,其实就是一个带锁的pipe,
- 主进程和子进程分别使用各自的lock,实现写入pipe和读取pipe是并发安全的
之所以
put()
和get()
可以使用不同的lock,是因为pipe两端的读写已经是并发安全的了。 - 用
multiprocessing.Pipe
来实现消息传递,支持多读多写
关于
os.pipe
和multiprocessing.Pipe
os.pipe
:在Linux上底层访问的是传统的POSIX pipes,单向,使用encode/decode序列化multiprocessing.Pipe
:使用multiprocessing.Connection
实现,在Linux上使用POSIX sockets完成数据发送,双向,使用pickle/unpickle序列化
Queue
Queue
可以看做在SimpleQueue
的基础上,增加了生产者端的发送buffer、支持设置队列大小,以及get()
和put()
的无阻塞调用。
初始化
__init__()
初始化了这么几个比较重要的变量,
_maxsize
:队列最大size_reader
,_writer
:multiprocessing.Connection
实例,负责数据的收发_rlock
:multiprocessing.Lock
,进程间共享,保证_reader.recv_bytes()
并发安全_wlock
:multiprocessing.Lock
,进程间共享,保证_writer.send_bytes()
并发安全_sem
:队列长度信号量,计数器初始化为_maxsize
_notempty
:条件变量,同步生产者放入_buffer
和_buffer
中数据的发送_buffer
:每个进程有自己独立的buffer,线程安全,size其实是由_sem
来控制_thread
:生产者数据发送线程
put
首先_sem.acquire()
,计数器-1,如果不为零,说明队列还可以写入。
|
|
然后获得_notempty
的锁,如果发送线程未创建,则创建。追加元素至buffer后,_notempty.notify()
通知发送线程。生产者中数据的发送由单独的线程完成,主线程只负责将数据放入buffer。
|
|
feed线程
_start_thread()
创建thread对象后,设置daemon
属性为True
,目的是随主线程的退出而退出,不必手动添加停止的逻辑。接着就启动线程。
具体发送时,不断等待buffer中有元素;如果buffer中有元素,popleft()
并发送,直到发送完。
为何_notempty
的锁在popleft()
就释放了?
- buffer是
collections.deque
,本身就是并发安全的 _notempty
锁的目的其实是为了保护_notempty
的修改,和put()
中的目的类似,但并不是为了保护buffer
|
|
get
对于block=True
的情况
类似SimpleQueue
,一直等待_reader.recv_bytes()
,直到收到数据。在_reader.recv_bytes()
以后,_sem.release()
,计数器+1,表示从队列消耗了一个元素。此时如果有进程调用_sem.acquire()
并在等待,那么_sem.release()
会唤醒其中一个等待的进程。
对于block=False
的情况
由于不能阻塞,因此不能一直等待_reader.recv_bytes()
。
- 如果在
timeout
内_rlock
都没有获得锁,则返回Empty
_reader.poll()
(底层还是使用select
来实现的)判断是否在timeout
内_reader
可读,如果不可读,则返回Empty
- 最后
_reader.recv_bytes()
,并_sem.release()
|
|
序列化和反序列化
序列化和反序列化使用的是pickle来实现,那么如何判断消息的边界?Connection
中定义了一个规则,在header
存放长度。
|
|
close和join_thread
由于生产者启动了一个线程来负责发送,元素首先append到buffer,然后发送。在进程结束时,如何确保元素发送完毕的问题?
在启动feed线程的时候,创建了两个Finalize
对象,_finalize_close
和_finalize_join
,前者的优先级较高,并set了_close
和_jointhread
。当进程退出的时候,会自动地先后调用这两个函数(基于atexit
实现),
_finalize_close
:会append元素_sentinel = object()
到buffer,feed线程如果看到_sentinel
,会调用close()
并停止服务_finalize_join
:join feed线程
|
|
模块同样也提供了close()
和join_thread()
方法,调用的其实就是_close
和_jointhread
。如果先手动调用,然后aexit
调用,那不会回有问题?首次调用后,都会从util
模块的_finalizer_registry
中移除,因此不会存在重复调用_finalize_close
和_finalize_join
的问题。
deadlock
这几个stackoverflow(1,2,3)的问题很类似,在官方的文档中也有提及(python 2,python 3),总结下来就是,join一个调用put的进程,且这个进程尚未把buffer中所有的元素写入pipe时,可能会导致死锁。
|
|
原因是,Queue
使用了os的pipe来进行数据的传输,而pipe的大小是有限的。如果数据过大,_writer.send_bytes()
在写入数据到pipe的时候会阻塞。如果此时join这个子进程,那进程本身已经卡住了,join永远等不到进程结束。
JoinableQueue
JoinableQueue
基于Queue
实现,覆盖了put()
,并新增了,
task_done()
:表示先前放入队列中的元素被取走了,由消费者调用join()
:阻塞直到队列中所有元素都被取走
初始化
__init__()
调用了Queue
的初始化函数,并额外初始化了,
_unfinished_tasks
:信号量,表示队列中当前未取走的元素_cond
:条件变量,同步join()
和task_done()
put
和Queue
的put()
基本一致。多出来的一点是,每次put()
都会对_unfinished_tasks
的计数器+1。
|
|
task_done和join
task_done()
对_unfinished_tasks
的计数器进行-1,如果计数器为0,则通知等待在join()
的进程。
|
|
Reference
- Python os.pipe vs multiprocessing.Pipe
- Daemon is not daemon, but what is it?
- Python 中的 multiprocess.Queue
- Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
- Process.join() and queue don’t work with large numbers
- Script using multiprocessing module does not terminate
- https://docs.python.org/3/library/multiprocessing.html#all-start-methods
- Joining multiprocessing queue takes a long time