Python多进程-multiprocess
Python中主要通过 multiprocess 包来操作和管理进程。
10多年的蓬溪网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。全网整合营销推广的优势是能够根据用户设备显示端的尺寸不同,自动调整蓬溪建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联从事“蓬溪网站设计”,“蓬溪网站推广”以来,每个客户项目都认真落实执行。
进程启动方式
python 启动进程方式1:
import time
from multiprocessing import Process
def fork(thread_name):
time.sleep(2)
print("subprocess: " + thread_name)
if __name__ == '__main__':
p = Process(target=fork, args=('hello_1',))
p.start() # 启动进程
print("end...")
# 结果输出
end...
subprocess: hello_1
Process类参数说明:
Process([ target [, name [, args [, kwargs]]]]])
target 表示子进程要执行的任务
args 表示调用对象的位置参数元组,args=(1,2,'hello',)
kwargs 表示调用对象的字典,kwargs={'name':'baby','age':18}
name 子进程的名称
python 启动进程方式2:
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, thread_name):
super().__init__()
self.thread_name = thread_name
def run(self):
time.sleep(2)
print("subprocess: " + self.thread_name)
if __name__ == '__main__':
p = MyProcess('hello_1')
p.start()
print("end...")
# 结果输出
end...
subprocess: hello_1
Tip:两种启动进程的方式没有优劣之分~
join方法的应用
在主进程中通过 join 方法,可以让主进程等待子进程执行完毕后,再继续往下执行
import time
from multiprocessing import Process
def fork(thread_name):
time.sleep(2)
print("subprocess: " + thread_name)
if __name__ == '__main__':
p = Process(target=fork, args=('hello_1',))
p.start()
p.join() # 等待子进程执行完毕
print("end...")
# 结果输出
subprocess: hello_1
end...
多个子进程同时运行
import time
from multiprocessing import Process
def fork(thread_name):
time.sleep(2)
print("subprocess: " + thread_name)
if __name__ == '__main__':
p_list = []
for i in range(1, 4):
p = Process(target=fork, args=('hello_' + str(i),))
p.start()
p_list.append(p)
[p.join() for p in p_list] # 等待子进程执行完毕
print("end...")
# 结果输出
subprocess: hello_1
subprocess: hello_2
subprocess: hello_3
end...
如上是通过第一种方式启动子进程,使用继承 Process 类的形式启动子进程示例如下:
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, thread_name):
super().__init__()
self.thread_name = thread_name
def run(self):
time.sleep(2)
print("subprocess: " + self.thread_name)
if __name__ == '__main__':
p_list = []
for i in range(1, 4):
p = MyProcess('hello_' + str(i))
p.start()
p_list.append(p)
[p.join() for p in p_list]
print("end...")
Process类的其他相关方法和属性
import time
from multiprocessing import Process
def fork(thread_name):
time.sleep(2)
print("subprocess: " + thread_name)
if __name__ == '__main__':
p = Process(target=fork, args=('hello',))
p.start()
# 进程的名称
print(p.name) # 输出:Process-1
# 布尔值,True 表示该进程为守护进程,默认为 False,这个值需要在 p.start() 之前设置
print(p.daemon) # 输出:False
# 进程的pid
print(p.pid) # 输出:7980
# 进程的身份验证键,默认是由 os.urandom() 随机生成的32字符的字符串。
print(p.authkey) # 输出:b'\xf2M)\xc8\xf6\xae8\x0c\xbet\xbcAT\xad7%ig9zl\xe5|\xb5|\x7f\xa6\xab\x8a\x8a\x94:'
# 查看进程是否还在运行,若还在运行,则返回 True
print(p.is_alive()) # 输出:True
# 主进程等待子进程 p 执行结束,再继续往下执行
# p.join()
# 强制终止子进程 p
p.terminate()
print('end...')
守护进程
import time
from multiprocessing import Process
def fork(thread_name):
for i in range(5):
time.sleep(1)
print("subprocess: " + thread_name + "..." + str(i))
if __name__ == '__main__':
p = Process(target=fork, args=('hello',))
p.start()
time.sleep(2)
print('end...')
# 输出结果:
subprocess: hello...0
subprocess: hello...1
end...
subprocess: hello...2
subprocess: hello...3
subprocess: hello...4
可以看到主进程的代码先运行完毕,运行完成后,它会等待子进程执行完成后再结束。若是将子进程设置为守护进程,则子进程会随着主进程的代码执行完毕而结束。注意守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children。
import time
from multiprocessing import Process
def fork(thread_name):
for i in range(5):
time.sleep(1)
print("subprocess: " + thread_name + "..." + str(i))
if __name__ == '__main__':
p = Process(target=fork, args=('hello',))
p.daemon = True # 设置进程 p 为守护进程
p.start()
time.sleep(2)
print('end...')
# 输出结果:
subprocess: hello...0
subprocess: hello...1
end...
值得注意的是:守护进程是在主进程代码执行结束后就终止,即主进程的代码执行完毕,守护进程就终止。来看如下示例:
import time
from multiprocessing import Process
def fork_1(thread_name):
for i in range(5):
time.sleep(1)
print("subprocess: " + thread_name + "..." + str(i), end="\n")
def fork_2(thread_name):
for i in range(7):
time.sleep(1)
print("subprocess: " + thread_name + "..." + str(i), end="\n")
if __name__ == '__main__':
p1 = Process(target=fork_1, args=('hello',))
p2 = Process(target=fork_2, args=('hi',))
p1.daemon = True # 设置进程 p1 为守护进程
p1.start()
p2.start()
time.sleep(2)
print('end...')
# 输出结果:
subprocess: hello...0
subprocess: hi...0
subprocess: hello...1
subprocess: hi...1
end...
subprocess: hi...2
subprocess: hi...3
subprocess: hi...4
subprocess: hi...5
subprocess: hi...6
如上示例中,p1 为守护进程,在主进程输出 ‘end…’ 后,即主进程的代码执行完毕后,守护进程 p1 就终止了。但是此时,主进程并没有终止,它需要等待 p2 执行完毕之后再终止。
锁 —— multiprocess.Lock
进程与进程之间数据是隔离的
from multiprocessing import Process
def fork(thread_name):
global n
print("subprocess: " + thread_name + "...n=" + str(n))
n = 1
print("subprocess: " + thread_name + "...n=" + str(n))
if __name__ == '__main__':
n = 100
p = Process(target=fork, args=('hello',))
p.start()
p.join()
print("main...n=" + str(n))
# 输出结果:
subprocess: hello...n=100
subprocess: hello...n=1
main...n=100
通过如上示例可以看出,子进程 p 中的变量 n 和主进程中的变量 n 是两个独立的变量,存放在不同的内存空间,更改其中一个变量并不会影响另一个变量的值。
要想在进程间共享数据,可通过 Manager 类实现。Manager 类中提供了很多可以共享数据的数据类型,包括dict,list,Queue,Pipe 等。注意:Manager 中的数据是不安全的。当多个进程同时访问共享数据的时候,就会产生数据安全问题。
多进程同时抢购余票示例:
from multiprocessing import Process, Manager
def work(m_dict):
if m_dict['count'] > 0:
print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
m_dict['count'] -= 1
if __name__ == '__main__':
m = Manager()
m_dict = m.dict({'count': 20})
p_list = []
for i in range(20):
p = Process(target=work, args=(m_dict, ))
p.start()
p_list.append(p)
for i in p_list:
i.join()
print("end..." + str(m_dict['count']))
# 输出结果:
32940 get ticket 20
32941 get ticket 19
32942 get ticket 18
32939 get ticket 17
32943 get ticket 16
32944 get ticket 15
32946 get ticket 14
32945 get ticket 13
32947 get ticket 12
32948 get ticket 11
32953 get ticket 11
32958 get ticket 9
32957 get ticket 8
32955 get ticket 7
32956 get ticket 7
32954 get ticket 6
32950 get ticket 5
32949 get ticket 5
32951 get ticket 3
32952 get ticket 2
end...1
输出结果中 “ticket 11” 被购买了2次,可以看到当多个进程对同一份数据进行操作的时候,就会引发数据安全问题。
在如上示例中,增加进程数据还有可能出现如下这样的报错:
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
这个报错的触发原因并没有深究,极有可能是 manager 内部原因,在 manager 管理进程的同时不可以进入主进程进行某些交互。可以通过在子进程中 sleep 一下 来避免这个问题(这并不是根本的解决方式)
import time, os
from multiprocessing import Process, Manager
def work(m_dict):
time.sleep(0.5) # sleep 0.5 s,可以绕过这个问题
if m_dict['count'] > 0:
print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
m_dict['count'] -= 1
...
如上的数据安全问题,可以在子进程中加锁来解决,即在同一时刻,仅允许一个进程执行 lock.acquire() 和 lock.release() 之间的代码
import os
from multiprocessing import Process, Manager, Lock
def work(m_dict, lock):
lock.acquire()
if m_dict['count'] > 0:
print("%s get ticket %d" % (str(os.getpid()), m_dict['count']))
m_dict['count'] -= 1
lock.release()
if __name__ == '__main__':
m = Manager()
m_dict = m.dict({'count': 20})
lock = Lock()
p_list = []
for i in range(20):
p = Process(target=work, args=(m_dict, lock))
p.start()
p_list.append(p)
for i in p_list:
i.join()
print("end..." + str(m_dict['count']))
# 输出结果:
33240 get ticket 20
33242 get ticket 19
33241 get ticket 18
33243 get ticket 17
33244 get ticket 16
33245 get ticket 15
33247 get ticket 14
33246 get ticket 13
33249 get ticket 12
33248 get ticket 11
33250 get ticket 10
33251 get ticket 9
33252 get ticket 8
33257 get ticket 7
33258 get ticket 6
33253 get ticket 5
33254 get ticket 4
33255 get ticket 3
33259 get ticket 2
33256 get ticket 1
end...0
Manager() 是通过共享进程来实现多进程之间数据共享。Manager() 返回的对象控制了一个 server 进程,这个 server 进程允许其他进程通过 proxies 来访问。多进程之间数据共享,除了 Manager() 外,还有 Value 、 Array,Value 和 Array 是通过共享内存的方式实现数据共享,同样为了保证数据安全,经常和同步互斥锁配合使用。
关于 Value 、 Array 的具体使用方式可参阅 https://www.cnblogs.com/gengyi/p/8661235.html。
使用 Value 实现上述的抢票示例:
import os
from multiprocessing import Process, Value, Lock
def work(count, lock):
lock.acquire()
if count.value > 0:
print("%s get ticket %d" % (str(os.getpid()), count.value))
count.value -= 1
lock.release()
if __name__ == '__main__':
count = Value('l', 50)
lock = Lock()
p_list = []
for i in range(50):
p = Process(target=work, args=(count, lock))
p.start()
p_list.append(p)
for i in p_list:
i.join()
print("end..." + str(count.value))
队列 —— multiprocess.Queue
from multiprocessing import Queue
queue = Queue(3) # 创建队列:Queue([maxsize]),maxsize 表示队列的最大长度
queue.put('a')
queue.put('b')
queue.put('c')
print(queue.full()) # 输出 True,表示队列已经满了
# 若队列已经满了,继续向队列中插入数据,则程序会阻塞在这里,直到队列的另一端有数据被取出,新的数据才能插入
# put 方法有两个可选参数:block 和 timeout。
# block 默认为 True,表示会阻塞 timeout 指定的时间,如果超时,会抛出 Queue.Full 异常。如果 block 为 False,在 put 时 队列已满,则会立即抛出 Queue.Full 异常。
# timeout 默认为 None,表示会一直阻塞。
# queue.put('d')
# queue.put_nowait() # 等同于 queue.put(block = False)
print(queue.get()) # 'a'
print(queue.get()) # 'b'
print(queue.get()) # 'c'
print(queue.empty()) # 输出 True,表示队列已空
# 若队列已空,继续从该队列中 get 数据,则程序会阻塞在这里,直到队列中新插入了数据。
# get 方法也有两个参数:block 和 timeout,通 put 方法
# block 默认为 True,表示会阻塞 timeout 指定的时间,如果 timeout 之间之内还是没有获取到数据,会抛出 Queue.Empty 异常。block 为 False 时,若队列中有数据,则会立即返回数据,如果队列为空,则会立即抛出 Queue.Empty 异常.
# timeout 默认为 None,表示会一直阻塞。
# queue.get(False)
# queue.get_nowait() # 等同于 queue.get(block = False)
# print(queue.qsize()) # 获取队列的长度,某些系统上,此方法可能引发NotImplementedError异常。
# q.close() # 关闭队列
生产者和消费者示例
from multiprocessing import Process, Queue
import time
def producer(name, production, queue):
for i in range(2):
time.sleep(0.5)
queue.put(production + '_' + str(i))
print('%s produce %s' % (name, production + '_' + str(i)), end="\n")
def consumer(name, queue):
while True:
data = queue.get()
if data is None: break # None 为结束信号
time.sleep(0.3)
print('%s consume %s' % (name, data), end="\n")
if __name__ == '__main__':
queue = Queue()
p_list = []
for index, f in enumerate(['apple', 'pear', 'peach']):
p = Process(target=producer, args=('producer_' + str(index), f, queue))
p_list.append(p)
p.start()
Process(target=consumer, args=('consumer_1', queue)).start()
Process(target=consumer, args=('consumer_2', queue)).start()
[p.join() for p in p_list]
# 有2个消费者,则发送2次 None
queue.put(None)
queue.put(None)
# 输出结果:
producer_1 produce pear_0
producer_2 produce peach_0
producer_0 produce apple_0
consumer_2 consume peach_0
consumer_1 consume pear_0
producer_1 produce pear_1
producer_2 produce peach_1
producer_0 produce apple_1
consumer_2 consume apple_0
consumer_1 consume peach_1
consumer_2 consume pear_1
consumer_1 consume apple_1
通过向队列中插入 None,来告诉消费者生产已经结束。这是一种比较低端的实现方式。
JoinableQueue 类是 Queue 类的扩展,JoinableQueue 类中的 task_done() 方法为消费者调用方法,表示从队列中获取的项目(queue.get() 获取的数据)已经被处理;JoinableQueue 类中的 join() 方法为生产者调用的方法,生产者在调用 join() 方法后会被阻塞,直到队列中的每个项目都被调用 queue.task_done() 方法为止。
如下示例是通过 task_done() 方法 和 join() 方法来实现类似于上述的发送结束信号机制。
from multiprocessing import Process, JoinableQueue
import time
def producer(name, production, queue):
for i in range(2):
time.sleep(0.5)
queue.put(production + '_' + str(i))
print('%s produce %s' % (name, production + '_' + str(i)), end="\n")
queue.join()
def consumer(name, queue):
while True:
data = queue.get()
time.sleep(0.3)
print('%s consume %s' % (name, data), end="\n")
queue.task_done()
if __name__ == '__main__':
queue = JoinableQueue()
p_list = []
for index, f in enumerate(['apple', 'pear', 'peach']):
p = Process(target=producer, args=('producer_' + str(index), f, queue))
p_list.append(p)
p.start()
c1 = Process(target=consumer, args=('consumer_1', queue))
c2 = Process(target=consumer, args=('consumer_2', queue))
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
[p.join() for p in p_list]
print('end...')
输出结果与上一个示例一致。这里将 2个 consumer 设置为守护进程,在等待 producer 完成后,也随主进程的结束而结束。
管道
管道的使用:
from multiprocessing import Process, Pipe
def func(pro, con):
pro.close()
while True:
try:
print(con.recv())
except EOFError:
con.close()
break
if __name__ == '__main__':
pro, con = Pipe() # pro, con 分别表示管道的两端
Process(target=func, args=(pro, con)).start()
con.close() # 这里也可以不关闭
for i in range(5):
pro.send(i)
pro.close()
# 输出结果:
0
1
2
3
4
传给进程的 conn(管道连接)是不会相互影响的,在一个进程中关闭了管道,并不会影响这个管道在另一个进程中的使用。若是在一个进程中,管道的一端没有被用到,那么就应该将这一端关闭。例如在生产者中,应该关闭管道的 con 端(右端),在消费者中应该关闭管道的 pro 端(左端)。
当管道所有的入口都已经关闭(上述示例中,主进程和子进程中管道的入口都为 pro),消费者继续接收数据(调用 recv() 方法),当管道中已经没有数据时,就会抛出 EOFError。
如果管道有入口没有关闭,且该入口没有在向管道发送数据,那么消费者就会阻塞在 recv() 方法上。
如上示例是通过 抛出 EOFError 错误来结束管道,还有另一种方式,就是通过管道中的数据(例如向管道中传递None)来结束管道
from multiprocessing import Process, Pipe
def func(con):
while True:
data = con.recv()
if data is None: break
print(data)
if __name__ == '__main__':
pro, con = Pipe() # con, pro 分别表示管道的两端
Process(target=func, args=(con,)).start()
for i in range(5):
pro.send(i)
pro.send(None)
多个消费者消费管道中的数据示例(加锁):
from multiprocessing import Process, Pipe, Lock
import time
def producer(pro, con, name, production):
con.close()
for i in range(4):
time.sleep(0.5)
pro.send(production + str(i))
print('%s produce %s' % (name, production + '_' + str(i)), end="\n")
pro.close()
def consumer(pro, con, name, lock):
pro.close()
while True:
lock.acquire()
try:
data = con.recv()
time.sleep(0.3)
print('%s consume %s' % (name, data), end="\n")
except EOFError:
con.close()
break
finally:
lock.release()
if __name__ == '__main__':
pro, con = Pipe()
lock = Lock()
Process(target=producer, args=(pro, con, 'producer', 'apple')).start()
Process(target=consumer, args=(pro, con, 'c_1', lock)).start()
Process(target=consumer, args=(pro, con, 'c_2', lock)).start()
pro.close()
con.close()
pipe(管道)是进程数据不安全的,队列进程之间是数据安全的,因为队列的实现就是基于管道和锁实现的。所以管道极少被用到,生产环境中 pipe 一般也很少被用到,使用较多的一般会是队列服务器,例如 rabbitmq,kafka…...
信号量
信号量也是一种锁,信号量与互斥锁区别在于,互斥锁的 acquire() 方法和 release() 方法之间,仅允许一个线程(或进程)执行,而信号量可允许多个线程(或进程)执行。信号量的一种应用就是控制并发执行的线程(或进程)数。
from multiprocessing import Process, Semaphore
import time
def func(semaphore, name):
if semaphore.acquire():
print(name)
time.sleep(2)
semaphore.release()
if __name__ == '__main__':
semaphore = Semaphore(3)
for i in range(9):
Process(target=func, args=(semaphore, 'process_' + str(i), )).start()
事件
Python中的事件(Event)主要用于主线程(进程)控制其他线程(进程)的执行,其主要方法包括 set、wait、clear,is_set。
若事件(Event)的标记取值为 False,则线程(进程)会阻塞在 event.wait() 方法,event.wait() 还可以设置一个参数 timeout,在等待 timeout 指定的时间后停止阻塞,继续运行。
方法说明:
event.set():将 event 的标记设置为 True,所有 阻塞在 event.wait() 的线程(进程)都会继续执行
event.clear():将 event 的标记设置为 False。
event.is_set():判断 event 的标志是否为 True。
如下示例,在主进程中控制子进程在何时继续向下执行。例如在主进程的 time.sleep(3) 处可以执行一些检测工作,确保子进程的运行,若检测没有问题则继续子进程的运行。
from multiprocessing import Process, Event
import time
def worker(name, event):
print('Process_%s is ready' % name)
event.wait()
print('Process_%s is running' % name)
if __name__ == '__main__':
event = Event()
for i in range(0, 2):
Process(target=worker, args=(i, event)).start()
time.sleep(3)
event.set()
# 结果输出:
Process_0 is ready
Process_1 is ready
Process_0 is running
Process_1 is running
如上示例,若主进程一直没有允许子进程继续执行(例如检测工作没有通过),则子进程会一直阻塞在 event.wait() 这儿,我们希望在子进程阻塞过程中会有持续的提示信息,这个可以通过设置 event.wait 方法的 timeout 参数实现。
from multiprocessing import Process, Event
import time
def worker(name, event):
while not event.is_set():
print('Process_%s is ready' % name)
event.wait(1)
print('Process_%s is running' % name)
if __name__ == '__main__':
event = Event()
for i in range(0, 2):
Process(target=worker, args=(i, event)).start()
time.sleep(3)
event.set()
# 结果输出:
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_0 is ready
Process_1 is ready
Process_1 is running
Process_0 is running
进程池
进程的创建和销毁都需要消耗系统资源,且每一台服务器的 cpu 核心数有限,创建过多的进程反而会降低执行效率。这里就可以使用进程池,进程池一启动就会创建固定数量的进程,有执行需要了,就从进程池中获取一个进程处理对应的任务,处理完成后,进程不会被销毁,而是放回进程池中。如果同时需要执行的任务过多,没有获取到进程的任务需要等待,等有空闲的进程了才能运行。
进程池节省了操作系统在创建和销毁进程上所花去的开销,也限制了同一时间能够运行的进程总数,在一定程度上提升了多进程的执行效率。
如下示例是使用进程池启动进程和直接启动进程的效率差距:
from multiprocessing import Process, Pool
import time
def m_add(a):
return a ** a
if __name__ == '__main__':
# print(os.cpu_count()) # 调试环境的 cpu 核数为 8
# 创建进程池
pool = Pool(8)
start_t1 = time.time()
# 使用进程池启动进程
res = pool.map(m_add, range(500))
print(time.time() - start_t1)
p_list = []
start_t2 = time.time()
# 直接启动进程
for i in range(500):
p = Process(target=m_add, args=(i, ))
p_list.append(p)
p.start()
for p in p_list: p.join()
print(time.time() - start_t2)
# 输出结果:
0.003328084945678711
0.6395020484924316
创建进程池:
Pool([numprocess [,initializer [, initargs]]]):
numprocess:进程池中的固定继承数,默认为 cpu 核心数(os.cpu_count())
initializer:每次启动进程需要执行的可调用对象
initargs:传递给 initializer 的参数
Pool 的常用方法:
map(func, iterable):异步提交任务。iterable 为一个可迭代对象,这个可迭代对象的长度是多少,就启动多少个子进程,且可迭代对象的每一个元素会作为参数传递给 func。注意,使用 map 方法开启子进程,只能传递一个参数,若子进程需要多个参数,则这个参数可以使用 元组;将所有子进程的返回结果以列表的形式返回。
apply(func [, args [, kwargs]]):同步提交任务,返回子进程的执行结果。如果需要并发地执行 func,必须从不同线程中调用同一个进程池的 apply() 方法;
apply_async(func [, args [, kwargs]]):异步提交任务,返回 AsyncResult 类的实例,从 AsyncResult 实例中获取执行结果。与 map 方法的区别是,apply_async 方法可以随心所欲地传递参数;
close():结束进程池接受任务;
jion():感知进程池中的任务执行结束。即所有提交进来的任务都已经执行完毕,且没有新的任务提交进来。
Tip:进程池可以有返回值,这是进程池特有的,但是直接起进程,是做不到有返回值的。
apply 方法应用:
import time, os
from multiprocessing import Pool
def worker(i):
print('worker_%s running, pid: %s' % (i, os.getpid()))
time.sleep(1)
return i * i
if __name__ == '__main__':
pool = Pool(3)
res_list = []
for i in range(7):
res = pool.apply(worker, args=(i, )) # 返回的 res 即是子进程的返回结果
res_list.append(res)
print(res_list)
print('...end')
# 输出结果:
worker_0 running, pid: 20584
worker_1 running, pid: 20585
worker_2 running, pid: 20586
worker_3 running, pid: 20584
worker_4 running, pid: 20585
worker_5 running, pid: 20586
worker_6 running, pid: 20584
[0, 1, 4, 9, 16, 25, 36]
...end
在同一个线程中使用 pool.apply 方法提交任务,是提交一个,执行一个,执行完成后才能继续提交下一个任务。如上输出结果也是逐个输出。
apply_async 方法应用:
import time, os
from multiprocessing import Pool
def worker(i):
print('worker_%s running, pid: %s' % (i, os.getpid()))
time.sleep(1)
return i * i
if __name__ == '__main__':
pool = Pool(3)
res_list = []
for i in range(7):
res = pool.apply_async(worker, args=(i, )) # res 为 AsyncResult 类的实例
res_list.append(res)
pool.close()
pool.join()
for i in res_list:
print(i.get())
print('...end')
# 输出结果:
worker_0 running, pid: 20598
worker_1 running, pid: 20599
worker_2 running, pid: 20600
worker_3 running, pid: 20598
worker_4 running, pid: 20599
worker_5 running, pid: 20600
worker_6 running, pid: 20599
0
1
4
9
16
25
36
...end
通过 AsyncResult 对象的 get 方法获取返回值,get 方法会阻塞,即阻塞到子进程执行完毕,然后获取其返回值。
一般使用 apply_async 方法 异步提交任务,需要在主进程中感知任务结束(join方法),并且在 join 方法前面结束进程池接受任务(close方法)
map 方法应用:
import time, os
from multiprocessing import Pool
def worker(i):
print('worker_%s running, pid: %s' % (i, os.getpid()))
time.sleep(1)
return i * i
if __name__ == '__main__':
pool = Pool(3)
res_list = pool.map(worker, range(7))
for i in res_list:
print(i)
print('...end')
# 输出结果:
worker_0 running, pid: 20713
worker_1 running, pid: 20714
worker_2 running, pid: 20715
worker_3 running, pid: 20714
worker_4 running, pid: 20713
worker_5 running, pid: 20715
worker_6 running, pid: 20715
0
1
4
9
16
25
36
...end
map 方法自带 join 方法和 close 方法,map 方法启动子进程后,就不允许再提交任务,且 map 方法会阻塞,直到子进程全部执行完毕,且将所有子进程的返回结果以列表的形式返回。
若是不想阻塞在 map 方法,则可以使用 map_async,只是用了 map_async 方法,需要自己进行 close 和 join。
import time, os
from multiprocessing import Pool
def worker(i):
print('worker_%s running, pid: %s' % (i, os.getpid()))
time.sleep(1)
return i * i
if __name__ == '__main__':
pool = Pool(3)
res_list = pool.map_async(worker, range(7))
pool.close()
pool.join()
for i in res_list.get():
print(i)
print('...end')
返回结果与上述一致。
回调函数
进程池中一个进程处理完任务之后,这进程可以调用一个函数去处理该进程返回的结果,这个函数就是回调函数。回调函数的主要作用是告诉主进程,这里已经执行完毕,主进程可以针对返回结果继续后续的处理。相对于主进程轮询等待子进程的返回结果,利用回调函数可以提高程序的执行效率。
注意回调函数是由主进程执行的,可以将一些比较耗IO的操作放到进程池中执行,由主进程统一处理它们的返回结果。
回调函数简单示例:
from multiprocessing import Pool
def func(info):
print('...' + str(info))
def worker(i):
return i * i
if __name__ == '__main__':
pool = Pool(3)
res_list = []
for i in range(7):
res = pool.apply_async(worker, args=(i, ), callback=func)
res_list.append(res)
pool.close()
pool.join()
print('~end')
# 输出结果:
...0
...4
...9
...1
...16
...36
...25
~end
如下示例中,可以将具体的业务放在 worker 方法中,例如从网络上爬取数据,然后统一由回调函数 func 写到一个文件中。
from multiprocessing import Pool
def func(info):
with open('abc.txt', 'a+') as f:
f.writelines(str(info) + '\n')
def worker(i):
return i * i
if __name__ == '__main__':
pool = Pool()
for i in range(10):
pool.apply_async(worker, (i,), callback=func)
pool.close()
pool.join()
.................^_^
网站名称:Python多进程-multiprocess
网页URL:http://pcwzsj.com/article/pjdhdh.html