python multiprocessing.Process模块源码阅读

之前有记录过python进程间通信的几个方式,现在来看看这个模块的具体的是怎样工作的(本文基于Python 3.7.4)。

multiprocessing.Process典型的使用方式为,

1
2
3
4
5
6
7
8
import multiprocessing

def f(name):
print('hello', name)

p = multiprocessing.Process(target=f, args=('world',))
p.start()
p.join()

以这段代码为例,看看进程的执行过程。

python process

import

import multiprocessing,导入multiprocessing的时候,做了一些初始化的工作。 1. multiprocessing的包导入了context,这个模块主要是判断当前系统类型,并相应地使用对应的实现。在sys.platform != 'win32'的系统上,例如:mac和linux,默认使用fork来创建子进程。 2. context模块中, * 导入了process * 定义了Process类,这个类继承自process.BaseProcess,覆盖了process.BaseProcess_Popen方法,目的是根据系统类型,在初始化的时候根据系统类型,选择相应的Popen实现 3. process模块中, * 导入了util,注册了_exit_function,被注册的函数会在解释器正常终止时执行(子进程主动调用,主进程自动调用) * 此时在父进程里,初始化了几个process模块的全局变量, * _current_process:代表当前进程 * _process_counter:进程数计数器 * _children:子进程的集合

创建Process实例

p = multiprocessing.Process(target=f, args=('world',)),实际上是调用process.BaseProcess__init__()方法进行初始化,并使用了process模块的全局变量初始化实例变量,以及获取当前进程的pid, * count = next(_process_counter) * self._identity = _current_process._identity + (count,) * self._config = _current_process._config.copy() * self._parent_pid = os.getpid()

启动

p.start()创建子进程,首先做了几个检查, 1. 如果已经创建了子进程,那么不能再次创建 2. 只能start由当前进程创建的Process实例 3. 不允许创建daemon进程的子进程

接着_children中清理当前已经结束的进程,然后调用self._Popen(self)开始创建子进程。使用os.fork()创建子进程,用法和posix的fork一样,不多说。要注意的一点是,调用os.pipe()创建了parent_rchild_w,而parent_r将会被用于join()的实现。

子进程中,执行_bootstrap()进行初始化和运行target, 1. 初始化了process模块的全局变量,_current_process_process_counter_children 2. 清空util._finalizer_registry,执行util._run_after_forkers() 3. run()运行target 4. util._exit_function()做进程结束后的收尾工作 * util._exit_function()会调用_run_finalizers(),这个函数会把优先级高于minpriorityfinalizer执行一遍

1
2
3
4
5
6
7
def _run_finalizers(minpriority=None):
'''
Run all finalizers whose exit priority is not None and at least minpriority

Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation.
'''
* 默认情况下子进程的_finalizer_registry是空的,没有任何的finalizer会被执行,但可以通过multiprocessing.util.Finalize手动的进行注册,来完成一些收尾工作,例如:关闭db连接。
1
2
3
4
5
6
7
class Finalize(object):
'''
Class which supports object finalization using weakrefs
'''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
# ......
_finalizer_registry[self._key] = self

join

创建子进程的时候,也创建了pipe parent_r,并set self.sentinel = parent_r,且关闭了child_w。此时唯一打开child_w的进程是子进程,唯一打开parent_r的是主进程。主进程调用join()时,实际上是等待parent_r变为可读状态(wait返回)。

1
2
3
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None

那么何时wait返回?wait循环调用select,当parent_r ready的时候,wait返回, * parent_r可读 * 所有writing side被关闭

当子进程结束的时候,os会关闭这个进程关联的所有fd,又因为主进程已经关闭了child_w,所以此时parent_r ready,主进程中的join()也就返回了。

Reference

  1. multiprocessing — Process-based parallelism
  2. Why should you close a pipe in linux?