python multiprocessing.Queue模块源码阅读

之前有记录过python Queue的使用,以及multiprocessing.Process模块。现在看看multiprocessing.Queue的具体工作方式(本文基于Python 3.7.4)。

multiprocessing.Queue定义在queues.py中,除此之外还定义了SimpleQueueJoinableQueue,是FIFO队列。

类似multiprocessing.Process,首先导入了context,判断当前系统类型,并相应地使用对应的实现。使用的multiprocessing.Queue()的时候,实际上是调用了context中的Queue()方法,设置了ctx

1
2
3
4
def Queue(self, maxsize=0):
    '''Returns a queue object'''
    from .queues import Queue
    return Queue(maxsize, ctx=self.get_context())

SimpleQueue

SimpleQueue很简单,其实就是一个带锁的pipe,

  • 主进程和子进程分别使用各自的lock,实现写入pipe和读取pipe是并发安全的 之所以put()get()可以使用不同的lock,是因为pipe两端的读写已经是并发安全的了。
  • multiprocessing.Pipe来实现消息传递,支持多读多写

关于os.pipemultiprocessing.Pipe

  • os.pipe:在Linux上底层访问的是传统的POSIX pipes,单向,使用encode/decode序列化
  • multiprocessing.Pipe:使用multiprocessing.Connection实现,在Linux上使用POSIX sockets完成数据发送,双向,使用pickle/unpickle序列化

Queue

Queue可以看做在SimpleQueue的基础上,增加了生产者端的发送buffer、支持设置队列大小,以及get()put()的无阻塞调用。

python queues

初始化

__init__()初始化了这么几个比较重要的变量,

  • _maxsize:队列最大size
  • _reader_writermultiprocessing.Connection实例,负责数据的收发
  • _rlockmultiprocessing.Lock,进程间共享,保证_reader.recv_bytes()并发安全
  • _wlockmultiprocessing.Lock,进程间共享,保证_writer.send_bytes()并发安全
  • _sem:队列长度信号量,计数器初始化为_maxsize
  • _notempty:条件变量,同步生产者放入_buffer_buffer中数据的发送
  • _buffer:每个进程有自己独立的buffer,线程安全,size其实是由_sem来控制
  • _thread:生产者数据发送线程

put

首先_sem.acquire(),计数器-1,如果不为零,说明队列还可以写入。

1
2
if not self._sem.acquire(block, timeout):
    raise Full

然后获得_notempty的锁,如果发送线程未创建,则创建。追加元素至buffer后,_notempty.notify()通知发送线程。生产者中数据的发送由单独的线程完成,主线程只负责将数据放入buffer。

1
2
3
4
5
with self._notempty: # acquire保护_notempty和_thread的修改
    if self._thread is None:
        self._start_thread()
    self._buffer.append(obj)
    self._notempty.notify()

feed线程

_start_thread()创建thread对象后,设置daemon属性为True,目的是随主线程的退出而退出,不必手动添加停止的逻辑。接着就启动线程。

具体发送时,不断等待buffer中有元素;如果buffer中有元素,popleft()并发送,直到发送完。

为何_notempty的锁在popleft()就释放了?

  1. buffer是collections.deque,本身就是并发安全的
  2. _notempty锁的目的其实是为了保护_notempty的修改,和put()中的目的类似,但并不是为了保护buffer
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
while 1:
    try:
        nacquire()
        try:
            if not buffer:
                nwait()
        finally:
            nrelease()
        try:
            while 1:
                obj = bpopleft()
                if obj is sentinel:
                    debug('feeder thread got sentinel -- exiting')
                    close()
                    return

                # serialize the data before acquiring the lock
                obj = _ForkingPickler.dumps(obj)
                if wacquire is None:
                    send_bytes(obj)
                else:
                    wacquire()
                    try:
                        send_bytes(obj)
                    finally:
                        wrelease()
        except IndexError:
            pass
    except Exception as e:
        # ......

get

对于block=True的情况 类似SimpleQueue,一直等待_reader.recv_bytes(),直到收到数据。在_reader.recv_bytes()以后,_sem.release(),计数器+1,表示从队列消耗了一个元素。此时如果有进程调用_sem.acquire()并在等待,那么_sem.release()会唤醒其中一个等待的进程。

对于block=False的情况 由于不能阻塞,因此不能一直等待_reader.recv_bytes()

  1. 如果在timeout_rlock都没有获得锁,则返回Empty
  2. _reader.poll()(底层还是使用select来实现的)判断是否在timeout_reader可读,如果不可读,则返回Empty
  3. 最后_reader.recv_bytes(),并_sem.release()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get(self, block=True, timeout=None):
    if block and timeout is None:
        with self._rlock:
            res = self._recv_bytes()
        self._sem.release()
    else:
        if block:
            deadline = time.monotonic() + timeout
        if not self._rlock.acquire(block, timeout):
            raise Empty
        try:
            if block:
                timeout = deadline - time.monotonic()
                if not self._poll(timeout): # 此时的timeout已经减去了等待_rlock.acquire的时间
                    raise Empty
            elif not self._poll():
                raise Empty
            res = self._recv_bytes()
            self._sem.release()
        finally:
            self._rlock.release()
    # unserialize the data after having released the lock
    return _ForkingPickler.loads(res)

序列化和反序列化

序列化和反序列化使用的是pickle来实现,那么如何判断消息的边界?Connection中定义了一个规则,在header存放长度。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _send_bytes(self, buf):
    n = len(buf)
    # For wire compatibility with 3.2 and lower
    header = struct.pack("!i", n)
    if n > 16384:
        # The payload is large so Nagle's algorithm won't be triggered
        # and we'd better avoid the cost of concatenation.
        self._send(header)
        self._send(buf)
    else:
        # Issue #20540: concatenate before sending, to avoid delays due
        # to Nagle's algorithm on a TCP socket.
        # Also note we want to avoid sending a 0-length buffer separately,
        # to avoid "broken pipe" errors if the other end closed the pipe.
        self._send(header + buf)

def _recv_bytes(self, maxsize=None):
    buf = self._recv(4)
    size, = struct.unpack("!i", buf.getvalue())
    if maxsize is not None and size > maxsize:
        return None
    return self._recv(size)

close和join_thread

由于生产者启动了一个线程来负责发送,元素首先append到buffer,然后发送。在进程结束时,如何确保元素发送完毕的问题?

在启动feed线程的时候,创建了两个Finalize对象,_finalize_close_finalize_join,前者的优先级较高,并set了_close_jointhread。当进程退出的时候,会自动地先后调用这两个函数(基于atexit实现),

  1. _finalize_close:会append元素_sentinel = object()到buffer,feed线程如果看到_sentinel,会调用close()并停止服务
  2. _finalize_join:join feed线程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
if not self._joincancelled:
    self._jointhread = Finalize(
        self._thread, Queue._finalize_join,
        [weakref.ref(self._thread)],
        exitpriority=-5
        )

# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
    self, Queue._finalize_close,
    [self._buffer, self._notempty],
    exitpriority=10
    )

模块同样也提供了close()join_thread()方法,调用的其实就是_close_jointhread。如果先手动调用,然后aexit调用,那不会回有问题?首次调用后,都会从util模块的_finalizer_registry中移除,因此不会存在重复调用_finalize_close_finalize_join的问题。

deadlock

这几个stackoverflow(123)的问题很类似,在官方的文档中也有提及(python 2python 3),总结下来就是,join一个调用put的进程,且这个进程尚未把buffer中所有的元素写入pipe时,可能会导致死锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import os
import multiprocessing

def work(q):
    q.put('1'*10000000)

if __name__ == "__main__":
    print(os.getpid())
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=work, args=(q,))

    p.start()
    p.join()

    print(q.get())

原因是,Queue使用了os的pipe来进行数据的传输,而pipe的大小是有限的。如果数据过大,_writer.send_bytes()在写入数据到pipe的时候会阻塞。如果此时join这个子进程,那进程本身已经卡住了,join永远等不到进程结束。

JoinableQueue

JoinableQueue基于Queue实现,覆盖了put(),并新增了,

  • task_done():表示先前放入队列中的元素被取走了,由消费者调用
  • join():阻塞直到队列中所有元素都被取走

初始化

__init__()调用了Queue的初始化函数,并额外初始化了,

  • _unfinished_tasks:信号量,表示队列中当前未取走的元素
  • _cond:条件变量,同步join()task_done()

put

Queueput()基本一致。多出来的一点是,每次put()都会对_unfinished_tasks的计数器+1。

1
2
3
4
5
6
7
8
    def put(self, obj, block=True, timeout=None):
        # ...
        with self._notempty, self._cond:
            if self._thread is None:
                self._start_thread()
            self._buffer.append(obj)
            self._unfinished_tasks.release()
            self._notempty.notify()

task_done和join

task_done()_unfinished_tasks的计数器进行-1,如果计数器为0,则通知等待在join()的进程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def task_done(self):
    with self._cond:
        if not self._unfinished_tasks.acquire(False):
            raise ValueError('task_done() called too many times')
        if self._unfinished_tasks._semlock._is_zero():
            self._cond.notify_all()

def join(self):
    with self._cond:
        if not self._unfinished_tasks._semlock._is_zero():
            self._cond.wait()

Reference

  1. Python os.pipe vs multiprocessing.Pipe
  2. Daemon is not daemon, but what is it?
  3. Python 中的 multiprocess.Queue
  4. Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
  5. Process.join() and queue don’t work with large numbers
  6. Script using multiprocessing module does not terminate
  7. https://docs.python.org/3/library/multiprocessing.html#all-start-methods
  8. Joining multiprocessing queue takes a long time
comments powered by Disqus