Python进程间通信总结

Python进程间通信总结

Python的multiprocessing支持使用类似threading模块的API来创建进程,multiprocessing提供了本地和远程并发,有效的避免了线程中的GIL。下面对Python进程间通信的方法做了简单的总结,并列举了相应的例子。

消息传递

pipe

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def worker(conn):
    i = conn.recv()
    print(i)
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    parent_conn.send(1)
    p.join()

queue

multiprocessing.Queue使用pepe和locks/semaphores实现了进程间的共享队列。任何picklable对象都可以通过queue传递。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def worker(q):
    i = q.get()
    print(i)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(1)
    queue.close()
    queue.join_thread()
    p.join()

同步原语

Barrier

一种简单的同步原语,用于固定数目的进程相互等待。当所有进程都调用wait以后,所有进程会同时开始执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def worker(b):
    i = b.wait()
    print(i)
    if i == 0:
        print('passed the barrier')

if __name__ == '__main__':
    barrier = multiprocessing.Barrier(2)
    p = multiprocessing.Process(target=worker, args=(barrier,))
    p.start()

    q = multiprocessing.Process(target=worker, args=(barrier,))
    q.start()

    p.join()
    p.join()

Semaphore

BoundedSemaphore

Condition

条件变量。多个进程可以等待同一个条件变量,直到他们被另一个进程通知。条件变量默认使用RLock,可以提供自己的锁,但必须是Lock或者RLock对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def producer(q, c):
    c.acquire()
    while len(q) == 0:
        c.wait()

    i = q.pop()
    print(i)
    c.release()

def consumer(q, c):
    for i in range(10):
        c.acquire()
        q.append(i)
        c.notify_all()
        c.release()
        time.sleep(1)

if __name__ == '__main__':
    queue = []
    cond = multiprocessing.Condition()
    p1 = multiprocessing.Process(target=consumer, args=(queue, cond,))
    p1.start()

    p2 = multiprocessing.Process(target=consumer, args=(queue, cond,))
    p2.start()

    q = multiprocessing.Process(target=producer, args=(queue, cond,))
    q.start()

    p1.join()
    p2.join()
    q.join()

Event

Event提供了一种简单的方法来实现进程间的状态传递。event类似一个flag,状态是set或者unset。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def wait_for_event(e):
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())

def wait_for_event_timeout(e, t):
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())

if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))
    w2.start()

    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')

Lock

状态共享

shared memory map

通过使用ValueArray,数据可以被存储在shared memory map中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = multiprocessing.Value('d', 0.0)
    arr = multiprocessing.Array('i', range(10))

    p = multiprocessing.Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

ValueArray在shared memory分配了ctypes对象。默认情况下,返回的值是使用同步方法封装过的对象,即线程安全的。其中,参数di代表了值的类型。

References

  1. Passing Messages to Processes
  2. multiprocessing — Process-based parallelism
comments powered by Disqus