Python进程间通信总结
Python的multiprocessing支持使用类似threading模块的API来创建进程,multiprocessing提供了本地和远程并发,有效的避免了线程中的GIL。下面对Python进程间通信的方法做了简单的总结,并列举了相应的例子。
消息传递
pipe
1 2 3 4 5 6 7 8 9 10 11
| def worker(conn): i = conn.recv() print(i) conn.close()
if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() parent_conn.send(1) p.join()
|
queue
multiprocessing.Queue
使用pepe和locks/semaphores实现了进程间的共享队列。任何picklable对象都可以通过queue
传递。
1 2 3 4 5 6 7 8 9 10 11 12 13
| def worker(q): i = q.get() print(i)
if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start()
queue.put(1) queue.close() queue.join_thread() p.join()
|
同步原语
Barrier
一种简单的同步原语,用于固定数目的进程相互等待。当所有进程都调用wait
以后,所有进程会同时开始执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| def worker(b): i = b.wait() print(i) if i == 0: print('passed the barrier')
if __name__ == '__main__': barrier = multiprocessing.Barrier(2) p = multiprocessing.Process(target=worker, args=(barrier,)) p.start()
q = multiprocessing.Process(target=worker, args=(barrier,)) q.start()
p.join() p.join()
|
Semaphore
BoundedSemaphore
Condition
条件变量。多个进程可以等待同一个条件变量,直到他们被另一个进程通知。条件变量默认使用RLock
,可以提供自己的锁,但必须是Lock
或者RLock
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| def producer(q, c): c.acquire() while len(q) == 0: c.wait()
i = q.pop() print(i) c.release()
def consumer(q, c): for i in range(10): c.acquire() q.append(i) c.notify_all() c.release() time.sleep(1)
if __name__ == '__main__': queue = [] cond = multiprocessing.Condition() p1 = multiprocessing.Process(target=consumer, args=(queue, cond,)) p1.start()
p2 = multiprocessing.Process(target=consumer, args=(queue, cond,)) p2.start()
q = multiprocessing.Process(target=producer, args=(queue, cond,)) q.start()
p1.join() p2.join() q.join()
|
Event
Event
提供了一种简单的方法来实现进程间的状态传递。event类似一个flag,状态是set或者unset。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| def wait_for_event(e): print('wait_for_event: starting') e.wait() print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t): print('wait_for_event_timeout: starting') e.wait(t) print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__': e = multiprocessing.Event() w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,)) w1.start()
w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2)) w2.start()
print('main: waiting before calling Event.set()') time.sleep(3) e.set() print('main: event is set')
|
Lock
状态共享
shared memory map
通过使用Value
或Array
,数据可以被存储在shared
memory map中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i]
if __name__ == '__main__': num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr)) p.start() p.join()
print(num.value) print(arr[:])
|
Value
和Array
在shared
memory分配了ctypes对象。默认情况下,返回的值是使用同步方法封装过的对象,即线程安全的。其中,参数d
和i
代表了值的类型。
References
- Passing
Messages to Processes
- multiprocessing
— Process-based parallelism