0%

python multiprocessing.Queue模块源码阅读

之前有记录过python Queue的使用,以及multiprocessing.Process模块。现在看看multiprocessing.Queue的具体工作方式(本文基于Python 3.7.4)。

multiprocessing.Queue定义在queues.py中,除此之外还定义了SimpleQueueJoinableQueue,是FIFO队列。

类似multiprocessing.Process,首先导入了context,判断当前系统类型,并相应地使用对应的实现。使用的multiprocessing.Queue()的时候,实际上是调用了context中的Queue()方法,设置了ctx

1
2
3
4
def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())

SimpleQueue

SimpleQueue很简单,其实就是一个带锁的pipe,

  • 主进程和子进程分别使用各自的lock,实现写入pipe和读取pipe是并发安全的
    之所以put()get()可以使用不同的lock,是因为pipe两端的读写已经是并发安全的了。
  • multiprocessing.Pipe来实现消息传递,支持多读多写

关于os.pipemultiprocessing.Pipe

  • os.pipe:在Linux上底层访问的是传统的POSIX pipes,单向,使用encode/decode序列化
  • multiprocessing.Pipe:使用multiprocessing.Connection实现,在Linux上使用POSIX sockets完成数据发送,双向,使用pickle/unpickle序列化

Queue

Queue可以看做在SimpleQueue的基础上,增加了生产者端的发送buffer、支持设置队列大小,以及get()put()的无阻塞调用。

python queues

初始化

__init__()初始化了这么几个比较重要的变量,

  • _maxsize:队列最大size
  • _reader_writermultiprocessing.Connection实例,负责数据的收发
  • _rlockmultiprocessing.Lock,进程间共享,保证_reader.recv_bytes()并发安全
  • _wlockmultiprocessing.Lock,进程间共享,保证_writer.send_bytes()并发安全
  • _sem:队列长度信号量,计数器初始化为_maxsize
  • _notempty:条件变量,同步生产者放入_buffer_buffer中数据的发送
  • _buffer:每个进程有自己独立的buffer,线程安全,size其实是由_sem来控制
  • _thread:生产者数据发送线程

put

首先_sem.acquire(),计数器-1,如果不为零,说明队列还可以写入。

1
2
if not self._sem.acquire(block, timeout):
raise Full

然后获得_notempty的锁,如果发送线程未创建,则创建。追加元素至buffer后,_notempty.notify()通知发送线程。生产者中数据的发送由单独的线程完成,主线程只负责将数据放入buffer。

1
2
3
4
5
with self._notempty: # acquire保护_notempty和_thread的修改
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

feed线程

_start_thread()创建thread对象后,设置daemon属性为True,目的是随主线程的退出而退出,不必手动添加停止的逻辑。接着就启动线程。

具体发送时,不断等待buffer中有元素;如果buffer中有元素,popleft()并发送,直到发送完。

为何_notempty的锁在popleft()就释放了?

  1. buffer是collections.deque,本身就是并发安全的
  2. _notempty锁的目的其实是为了保护_notempty的修改,和put()中的目的类似,但并不是为了保护buffer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    while 1:
    try:
    nacquire()
    try:
    if not buffer:
    nwait()
    finally:
    nrelease()
    try:
    while 1:
    obj = bpopleft()
    if obj is sentinel:
    debug('feeder thread got sentinel -- exiting')
    close()
    return

    # serialize the data before acquiring the lock
    obj = _ForkingPickler.dumps(obj)
    if wacquire is None:
    send_bytes(obj)
    else:
    wacquire()
    try:
    send_bytes(obj)
    finally:
    wrelease()
    except IndexError:
    pass
    except Exception as e:
    # ......

get

对于block=True的情况
类似SimpleQueue,一直等待_reader.recv_bytes(),直到收到数据。在_reader.recv_bytes()以后,_sem.release(),计数器+1,表示从队列消耗了一个元素。此时如果有进程调用_sem.acquire()并在等待,那么_sem.release()会唤醒其中一个等待的进程。

对于block=False的情况
由于不能阻塞,因此不能一直等待_reader.recv_bytes()

  1. 如果在timeout_rlock都没有获得锁,则返回Empty
  2. _reader.poll()(底层还是使用select来实现的)判断是否在timeout_reader可读,如果不可读,则返回Empty
  3. 最后_reader.recv_bytes(),并_sem.release()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get(self, block=True, timeout=None):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout): # 此时的timeout已经减去了等待_rlock.acquire的时间
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

序列化和反序列化

序列化和反序列化使用的是pickle来实现,那么如何判断消息的边界?Connection中定义了一个规则,在header存放长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _send_bytes(self, buf):
n = len(buf)
# For wire compatibility with 3.2 and lower
header = struct.pack("!i", n)
if n > 16384:
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
self._send(header)
self._send(buf)
else:
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
self._send(header + buf)

def _recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)

close和join_thread

由于生产者启动了一个线程来负责发送,元素首先append到buffer,然后发送。在进程结束时,如何确保元素发送完毕的问题?

在启动feed线程的时候,创建了两个Finalize对象,_finalize_close_finalize_join,前者的优先级较高,并set了_close_jointhread。当进程退出的时候,会自动地先后调用这两个函数(基于atexit实现),

  1. _finalize_close:会append元素_sentinel = object()到buffer,feed线程如果看到_sentinel,会调用close()并停止服务
  2. _finalize_join:join feed线程
1
2
3
4
5
6
7
8
9
10
11
12
13
if not self._joincancelled:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)

# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)

模块同样也提供了close()join_thread()方法,调用的其实就是_close_jointhread。如果先手动调用,然后aexit调用,那不会回有问题?首次调用后,都会从util模块的_finalizer_registry中移除,因此不会存在重复调用_finalize_close_finalize_join的问题。

deadlock

这几个stackoverflow(123)的问题很类似,在官方的文档中也有提及(python 2python 3),总结下来就是,join一个调用put的进程,且这个进程尚未把buffer中所有的元素写入pipe时,可能会导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
import multiprocessing

def work(q):
q.put('1'*10000000)

if __name__ == "__main__":
print(os.getpid())
q = multiprocessing.Queue()

p = multiprocessing.Process(target=work, args=(q,))

p.start()
p.join()

print(q.get())

原因是,Queue使用了os的pipe来进行数据的传输,而pipe的大小是有限的。如果数据过大,_writer.send_bytes()在写入数据到pipe的时候会阻塞。如果此时join这个子进程,那进程本身已经卡住了,join永远等不到进程结束。

JoinableQueue

JoinableQueue基于Queue实现,覆盖了put(),并新增了,

  • task_done():表示先前放入队列中的元素被取走了,由消费者调用
  • join():阻塞直到队列中所有元素都被取走

初始化

__init__()调用了Queue的初始化函数,并额外初始化了,

  • _unfinished_tasks:信号量,表示队列中当前未取走的元素
  • _cond:条件变量,同步join()task_done()

put

Queueput()基本一致。多出来的一点是,每次put()都会对_unfinished_tasks的计数器+1。

1
2
3
4
5
6
7
8
def put(self, obj, block=True, timeout=None):
# ...
with self._notempty, self._cond:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._unfinished_tasks.release()
self._notempty.notify()

task_done和join

task_done()_unfinished_tasks的计数器进行-1,如果计数器为0,则通知等待在join()的进程。

1
2
3
4
5
6
7
8
9
10
11
def task_done(self):
with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

Reference

  1. Python os.pipe vs multiprocessing.Pipe
  2. Daemon is not daemon, but what is it?
  3. Python 中的 multiprocess.Queue
  4. Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
  5. Process.join() and queue don’t work with large numbers
  6. Script using multiprocessing module does not terminate
  7. https://docs.python.org/3/library/multiprocessing.html#all-start-methods
  8. Joining multiprocessing queue takes a long time