Python多进程编程
纸上得来终觉浅,绝知此事要躬行。

1. 多进程编程
由于全局解释锁(GIL)的问题,多线程并不能充分利用多核处理器,如果是一个CPU计算型的任务,应该使用多进程(multiprocessing)模块。虽然两者的工作方式并不相同,但是接口却非常相似。使用多进程模块,给每个进程赋予了单独的Python解释器,这样就规避了全局解释锁带来的问题。
1.1 多进程的使用方式
我们这里只介绍一下基本的使用方式。
- 目标函数不需要传递参数
from multiprocessing import Process def worker(): print('Worker') if __name__ == '__main__': jobs = [] for i in range(5): p = Process(target=worker) jobs.append(p) p.start()
$ python multiprocessing.py Worker Worker Worker Worker Worker
- 目标函数可传入参数
from multiprocessing import Process def worker(num): print(f'Worker: {num}') if __name__ == '__main__': jobs = [] for i in range(5): p = Process(target=worker, args=(i,)) jobs.append(p) p.start()
$ python multiprocessing.py Worker: 0 Worker: 1 Worker: 2 Worker: 3 Worker: 4
1.2 守护和非守护线程
类似于多线程一样也是可以设置守护进程的,这个守护进程也是可以一直运行且不影响主程序的结束。如果主程序结束了,会随着主程序一起结束。
- 可以看到如下的守护进程,主程序结束了还没有运行完成,随着主程序一起结束了。
from time import sleep from multiprocessing import Process, current_process def daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') sleep(2) print('Exiting :', p.name, p.pid) def non_daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') print('Exiting :', p.name, p.pid) if __name__ == '__main__': d =Process(name='daemon', target=daemon, daemon=True) n =Process(name='non-daemon', target=non_daemon) d.start() sleep(1) n.start()
$ python multiprocessing.py Starting: daemon 41295 Starting: non-daemon 41296 Exiting : non-daemon 41296
- 当然也是可以设置join方法设置超时参数的,让主程序等待守护进程执行完毕再结束程序。
from time import sleep from multiprocessing import Process, current_process def daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') sleep(2) print('Exiting :', p.name, p.pid) def non_daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') print('Exiting :', p.name, p.pid) if __name__ == '__main__': d = Process(name='daemon', target=daemon, daemon=True) n = Process(name='non-daemon', target=non_daemon) d.start() sleep(1) n.start() d.join() n.join()
$ python multiprocessing.py Starting: daemon 39312 Starting: non-daemon 39318 Exiting : non-daemon 39318 Exiting : daemon 39312
- join方法不带参数的情况下,默认为None,表示一直会阻塞下去。
from time import sleep from multiprocessing import Process, current_process def daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') sleep(2) print('Exiting :', p.name, p.pid) def non_daemon(): p = current_process() print(f'Starting: {p.name} {p.pid}') print('Exiting :', p.name, p.pid) if __name__ == '__main__': d = Process(name='daemon', target=daemon, daemon=True) n = Process(name='non-daemon', target=non_daemon) d.start() sleep(1) n.start() d.join(1) 1. 告诉我们进程是否当前还是存活的 print('d.is_alive()', d.is_alive()) n.join()
$ python multiprocessing.py Starting: daemon 41297 Starting: non-daemon 41298 Exiting : non-daemon 41298 d.is_alive() True
2. 同步机制
multiprocessing的Lock、Condition、Event、RLock、Semaphore等同步原语和threading模块的API风格是一样的,用法也类似,就不展开了。
>>> dir(multiprocessing) ['Array', 'Value',, 'context', 'cpu_count', 'current_process' 'Condition', 'Event', 'Lock', 'RLock', 'Semaphore', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Manager', 'managers', 'Pipe', 'Process', 'Pool']
2.1 信号量 - Semaphore
- 信号量同步基于内部的计数器,每调用一次acquire计数器就会减1,表示获取了一个锁。每调用一次release计数器就会加1,表示释放了这个锁。当计数器为0的时候,acquire的调用就会被阻塞。
import time import random import multiprocessing sema = multiprocessing.Semaphore(3) def limit_run(seam, num): t = multiprocessing.current_process() with seam: print(f'[{t.pid}]: {num} is acquire sema.') sleep_time = random.random() * 2 time.sleep(sleep_time) print(f'[{t.pid}]: {num} is release sema.') process = [] for num in range(1, 6): p = multiprocessing.Process(name="limit_run", target=limit_run, args=(sema, num)) process.append(p) p.start() for p in process: p.join()
$ python multiprocessing.py [82222]: 1 is acquire sema. [82223]: 2 is acquire sema. [82224]: 3 is acquire sema. [82223]: 2 is release sema. [82225]: 4 is acquire sema. [82224]: 3 is release sema. [82226]: 5 is acquire sema. [82222]: 1 is release sema. [82225]: 4 is release sema. [82226]: 5 is release sema.
2.2 锁 - Lock
- 互斥锁,即相对于信号量值为1的Semaphore,表示同一时刻只能有一个线程来访问这个资源。但是使用了锁会损失一定的性能,因为需要其他线程等待锁的释放。
不加锁的示例
import time import multiprocessing as mp from multiprocessing.sharedctypes import Value value = Value('i', 0) def getlock(): global value new = value.value + 1 time.sleep(0.001) value.value = new process = [] for i in range(100): p = mp.Process(name="getlock", target=getlock) p.start() process.append(p) for p in process: p.join() print(value.value)
$ python multiprocessing.py 91
加了锁的示例
import time import multiprocessing as mp from multiprocessing.sharedctypes import Value lock = mp.Lock() value = Value('i', 0) def getlock(): global value with lock: new = value.value + 1 time.sleep(0.001) value.value = new process = [] for i in range(100): p = mp.Process(name="getlock", target=getlock) p.start() process.append(p) for p in process: p.join() print(value.value)
$ python multiprocessing.py 100
2.3 可重入锁 - RLock
- 可重入锁就是acquire方法能够不被阻塞的被一个线程重复执行多次,但是需要注意的是release需要调用和acquire相同的次数才能够释放锁。
import multiprocessing lock = multiprocessing.RLock() print('First try: ', lock.acquire()) print('Second try: ', lock.acquire(0))
$ python multiprocessing.py First try: True Second try: True
2.4 条件 - Condition
- 接收条件,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号。这个同步机制最好的示例说明就是「生产者-消费者」模型。
import time import multiprocessing as mp cond = mp.Condition() def cusumer(cond): t = mp.current_process() with cond: cond.wait() print('{t.name}: consumer is start...'.format(t=t)) def product(cond): t = mp.current_process() with cond: print('{t.name}: producer is start...'.format(t=t)) cond.notify_all() process = [] for num in range(2): c = mp.Process(name='cusumer', target=cusumer, args=(cond,)) c.start() process.append(c) time.sleep(2) for num in range(1): p = mp.Process(name='product', target=product, args=(cond,)) p.start() process.append(p) for t in process: t.join()
$ python multiprocessing.py product: producer is start... cusumer: consumer is start... cusumer: consumer is start...
2.5 事件 - Event
- 事件模型,一个线程等待某种特定的条件,而另一个线程会发出满足这个特定条件的信号,最好的示例说明也是「生产者-消费者」模型。事件和条件是不同,在Condition条件中一个条件发出之后,所有接受这个条件的子线程都会处理,但是在Event事件中则是谁接收到谁来处理。
import time import random import multiprocessing as mp event = mp.Event() manager = mp.Manager() def consumer(event, q): t = mp.current_process() while True: event_is_set = event.wait(10) if event_is_set: try: integer = q.pop() print(f'{integer} popped from list by {t.name}') event.clear() except IndexError: pass def producer(event, q): t = mp.current_process() while True: integer = random.randint(10, 100) q.append(integer) print(f'{integer} appended to list by {t.name}') event.set() time.sleep(1) threads = [] q = manager.list() for name in ('consumer1', 'consumer2'): c = mp.Process(name=name, target=consumer, args=(event, q)) print(f'{name} is starting...') c.start() threads.append(c) for name in ('producer',): p = mp.Process(name=name, target=producer, args=(event, q)) print(f'{name} is starting...') p.start() threads.append(p) for t in threads: t.join()
$ python multiprocessing.py consumer1 is starting... consumer2 is starting... producer is starting... 70 appended to list by producer 70 popped from list by consumer2 59 appended to list by producer 59 popped from list by consumer1 40 appended to list by producer 40 popped from list by consumer2 88 appended to list by producer 88 popped from list by consumer2 ......
3. 进程间共享状态
多进程模块中提供了进程间共享状态的方案,有三种方案,分别是Queue、Array和Manager。而共享的意思就是可以在多个进程之前共享数据,如我在一个进程中修改了对应的值,在另一个进程中立马就可以看到修改之后的结果了。在使用多进程的过程中,最好不要使用共享资源,因为普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。
共享方式 | 对应的函数 | 适用范围 |
---|---|---|
Queue | 使用 Multiprocessing.Queue 类 | 只适用于 Process 类 |
sharedctypes | 使用 Multiprocessing.sharedctypes 类 | 只适用于 Process 类 |
Manager | 使用 Multiprocessing.Manager 类 | 可以适用于 Pool 类 |
3.1 内存共享 - sharedctypes
内存共享主要是靠多进程模块中sharedctypes的Value和Array实现的。
- 常见的共享类型
- 可以使用缩写也可以使用全称