之前有记录过python进程间通信的几个方式,现在来看看这个模块的具体的是怎样工作的(本文基于Python 3.7.4)。
multiprocessing.Process
典型的使用方式为,
|
|
以这段代码为例,看看进程的执行过程。
import
import multiprocessing
,导入multiprocessing的时候,做了一些初始化的工作。
- multiprocessing的包导入了
context
,这个模块主要是判断当前系统类型,并相应地使用对应的实现。在sys.platform != 'win32'
的系统上,例如:mac和linux,默认使用fork
来创建子进程。 context
模块中,- 导入了
process
- 定义了
Process
类,这个类继承自process.BaseProcess
,覆盖了process.BaseProcess
的_Popen
方法,目的是根据系统类型,在初始化的时候根据系统类型,选择相应的Popen
实现
- 导入了
process
模块中,- 导入了
util
,注册了_exit_function
,被注册的函数会在解释器正常终止时执行(子进程主动调用,主进程自动调用) - 此时在父进程里,初始化了几个
process
模块的全局变量,_current_process
:代表当前进程_process_counter
:进程数计数器_children
:子进程的集合
- 导入了
创建Process
实例
p = multiprocessing.Process(target=f, args=('world',))
,实际上是调用process.BaseProcess
的__init__()
方法进行初始化,并使用了process
模块的全局变量初始化实例变量,以及获取当前进程的pid,
count = next(_process_counter)
self._identity = _current_process._identity + (count,)
self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
启动
p.start()
创建子进程,首先做了几个检查,
- 如果已经创建了子进程,那么不能再次创建
- 只能start由当前进程创建的
Process
实例 - 不允许创建daemon进程的子进程
接着_children
中清理当前已经结束的进程,然后调用self._Popen(self)
开始创建子进程。使用os.fork()
创建子进程,用法和posix的fork一样,不多说。要注意的一点是,调用os.pipe()
创建了parent_r
和child_w
,而parent_r
将会被用于join()
的实现。
子进程中,执行_bootstrap()
进行初始化和运行target
,
- 初始化了
process
模块的全局变量,_current_process
、_process_counter
、_children
- 清空
util._finalizer_registry
,执行util._run_after_forkers()
run()
运行target
util._exit_function()
做进程结束后的收尾工作util._exit_function()
会调用_run_finalizers()
,这个函数会把优先级高于minpriority
的finalizer
执行一遍1 2 3 4 5 6 7
def _run_finalizers(minpriority=None): ''' Run all finalizers whose exit priority is not None and at least minpriority Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. '''
- 默认情况下子进程的
_finalizer_registry
是空的,没有任何的finalizer
会被执行,但可以通过multiprocessing.util.Finalize
手动的进行注册,来完成一些收尾工作,例如:关闭db连接。1 2 3 4 5 6 7
class Finalize(object): ''' Class which supports object finalization using weakrefs ''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): # ...... _finalizer_registry[self._key] = self
join
创建子进程的时候,也创建了pipe parent_r
,并set self.sentinel = parent_r
,且关闭了child_w
。此时唯一打开child_w
的进程是子进程,唯一打开parent_r
的是主进程。主进程调用join()
时,实际上是等待parent_r
变为可读状态(wait
返回)。
|
|
那么何时wait
返回?wait
循环调用select
,当parent_r
ready的时候,wait返回,
parent_r
可读- 所有writing side被关闭
当子进程结束的时候,os会关闭这个进程关联的所有fd,又因为主进程已经关闭了child_w
,所以此时parent_r
ready,主进程中的join()
也就返回了。