0%

python multiprocessing.Process模块源码阅读

之前有记录过python进程间通信的几个方式,现在来看看这个模块的具体的是怎样工作的(本文基于Python 3.7.4)。

multiprocessing.Process典型的使用方式为,

1
2
3
4
5
6
7
8
import multiprocessing

def f(name):
print('hello', name)

p = multiprocessing.Process(target=f, args=('world',))
p.start()
p.join()

以这段代码为例,看看进程的执行过程。

python process

import

import multiprocessing,导入multiprocessing的时候,做了一些初始化的工作。

  1. multiprocessing的包导入了context,这个模块主要是判断当前系统类型,并相应地使用对应的实现。在sys.platform != 'win32'的系统上,例如:mac和linux,默认使用fork来创建子进程。
  2. context模块中,
    • 导入了process
    • 定义了Process类,这个类继承自process.BaseProcess,覆盖了process.BaseProcess_Popen方法,目的是根据系统类型,在初始化的时候根据系统类型,选择相应的Popen实现
  3. process模块中,
    • 导入了util,注册了_exit_function,被注册的函数会在解释器正常终止时执行(子进程主动调用,主进程自动调用)
    • 此时在父进程里,初始化了几个process模块的全局变量,
      • _current_process:代表当前进程
      • _process_counter:进程数计数器
      • _children:子进程的集合

创建Process实例

p = multiprocessing.Process(target=f, args=('world',)),实际上是调用process.BaseProcess__init__()方法进行初始化,并使用了process模块的全局变量初始化实例变量,以及获取当前进程的pid,

  • count = next(_process_counter)
  • self._identity = _current_process._identity + (count,)
  • self._config = _current_process._config.copy()
  • self._parent_pid = os.getpid()

启动

p.start()创建子进程,首先做了几个检查,

  1. 如果已经创建了子进程,那么不能再次创建
  2. 只能start由当前进程创建的Process实例
  3. 不允许创建daemon进程的子进程

接着_children中清理当前已经结束的进程,然后调用self._Popen(self)开始创建子进程。使用os.fork()创建子进程,用法和posix的fork一样,不多说。要注意的一点是,调用os.pipe()创建了parent_rchild_w,而parent_r将会被用于join()的实现。

子进程中,执行_bootstrap()进行初始化和运行target

  1. 初始化了process模块的全局变量,_current_process_process_counter_children
  2. 清空util._finalizer_registry,执行util._run_after_forkers()
  3. run()运行target
  4. util._exit_function()做进程结束后的收尾工作
    • util._exit_function()会调用_run_finalizers(),这个函数会把优先级高于minpriorityfinalizer执行一遍
      1
      2
      3
      4
      5
      6
      7
      def _run_finalizers(minpriority=None):
      '''
      Run all finalizers whose exit priority is not None and at least minpriority

      Finalizers with highest priority are called first; finalizers with
      the same priority will be called in reverse order of creation.
      '''
    • 默认情况下子进程的_finalizer_registry是空的,没有任何的finalizer会被执行,但可以通过multiprocessing.util.Finalize手动的进行注册,来完成一些收尾工作,例如:关闭db连接。
      1
      2
      3
      4
      5
      6
      7
      class Finalize(object):
      '''
      Class which supports object finalization using weakrefs
      '''
      def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
      # ......
      _finalizer_registry[self._key] = self

join

创建子进程的时候,也创建了pipe parent_r,并set self.sentinel = parent_r,且关闭了child_w。此时唯一打开child_w的进程是子进程,唯一打开parent_r的是主进程。主进程调用join()时,实际上是等待parent_r变为可读状态(wait返回)。

1
2
3
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None

那么何时wait返回?wait循环调用select,当parent_r ready的时候,wait返回,

  • parent_r可读
  • 所有writing side被关闭

当子进程结束的时候,os会关闭这个进程关联的所有fd,又因为主进程已经关闭了child_w,所以此时parent_r ready,主进程中的join()也就返回了。

Reference

  1. multiprocessing — Process-based parallelism
  2. Why should you close a pipe in linux?