python的multiprocessing模块是跨平台的多进程模块,multiprocessing具有创建子进程,进程间通信,队列,事件,锁等功能,multiprocessing模块包含Process,Queue,Pipe,Lock等多个组件。

1、Process

创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]])

参数介绍:
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=()
kwargs表示调用对象的字典,kwargs={'key':'value'}
name为子进程的名称

Note:需要使用关键字方式指定参数

示例1:创建单进程

from multiprocessing import Processdef func(): print("first process")if __name__ == '__main__': # 创建进程对象,主进程和子进程是异步执行的 p = Process(target=func) # 开启进程 p.start()示例2:传参

from multiprocessing import Processdef func(*args,**kwargs): print("IPADDR:%s PORT:%d"%args) for k in kwargs: print("%s --> %s"%(k,kwargs[k]))if __name__ == '__main__': # 创建进程对象,并传递参数 p = Process(target=func,args=('127.0.0.1',8080),kwargs={'key':'value'}) # 如果主进程中的代码已经结束了,子进程还没结束,主进程会等待子进程 # 开启进程 p.start()示例3:创建多进程

import osfrom multiprocessing import Processdef func(): # os模块的getpid方法可以获取当前进程的pid,getppid方法可以获取当前进程的父进程的pid print("子进程pid:%s,父进程pid:%s"%(os.getpid(),os.getppid()))if __name__ == '__main__': p_l = [] # 创建多个进程 for i in range(10): p = Process(target=func) p.start() p_l.append(p) # 异步执行子进程,最后执行主进程中的代码 for p in p_l: p.join() # 阻塞,使主进程等待子进程结束 print("------主进程------")结果:子进程pid:9944,父进程pid:1484子进程pid:8932,父进程pid:1484子进程pid:8504,父进程pid:1484子进程pid:14884,父进程pid:1484子进程pid:4828,父进程pid:1484子进程pid:14644,父进程pid:1484子进程pid:14908,父进程pid:1484子进程pid:1980,父进程pid:1484子进程pid:14604,父进程pid:1484子进程pid:10008,父进程pid:1484------主进程------

Note :因为在windows操作系统中,没有fork(),在创建子进程的时候会自动运行启动它的文件中的所有代码,因此必须将创建子进程的语句写在ifname=='main':条件语句下。

示例4:使用类继承的方式创建进程

import osfrom multiprocessing import Processclass MyProcess(Process): # 必须继承Process类 def __init__(self,arg1,arg2,arg3): ''' 继承父类的初始化方法,加上自己需要的参数 :param arg1: :param arg2: :param arg3: ''' super().__init__() self.arg1 = arg1 self.arg2 = arg2 self.arg3 = arg3 def run(self): ''' 必须要有run方法的实现 :return: ''' print('子进程:%d ,父进程:%s '%(os.getpid(),os.getppid()),self.arg1,self.arg2,self.arg3) self.walk() # walk方法在子进程中执行 def walk(self): print('子进程:%d'%os.getpid())if __name__ == '__main__': p = MyProcess(1,2,3) p.start() # 会默认调用run方法 p.walk() # walk方法直接在主进程中调用,并没有在子进程中执行 print('父进程:%d '%os.getpid())结果:子进程:1220父进程:1220 子进程:2164 ,父进程:1220 1 2 3子进程:2164示例5:守护进程

在为开启daemon前,主进程会等待子进程结束在结束;
开启daemon后,程序会在主进程结束时结束子进程

import timefrom multiprocessing import Processdef cal_time(second): while True: print("current time:%s"%time.ctime()) time.sleep(second)if __name__ == '__main__': p = Process(target=cal_time,args=(1,)) ''' 守护进程的作用:会随着主进程代码执行结束而结束 守护进程要在start前设置 守护进程中不能再开启子进程 ''' p.daemon = True p.start() for i in range(10): time.sleep(0.2) print('*'*i)未开启daemon结果:子进程一直在运行current time:Tue Feb 12 17:48:44 2019**********current time:Tue Feb 12 17:48:45 2019***********************************current time:Tue Feb 12 17:48:46 2019current time:Tue Feb 12 17:48:47 2019current time:Tue Feb 12 17:48:48 2019current time:Tue Feb 12 17:48:49 2019开启daemon后结果:主进程结束程序就结束了current time:Tue Feb 12 17:49:14 2019**********current time:Tue Feb 12 17:49:15 2019***********************************示例6:属性:name,pid 方法:is_alive(),terminate()

name:查看进程名
pid:查看进程id
is_alive:查看进程是否正在运行
terminate:结束进程

import timefrom multiprocessing import Processdef func(): print("start") time.sleep(3) print("end")if __name__ == '__main__': p = Process(target=func) p.start() time.sleep(3) print("进程名:%s,进程id:%s"%(p.name,p.pid)) # is_alive方法查看进程是否正在运行 print(p.is_alive()) # terminate方法结束进程 p.terminate() time.sleep(3) print(p.is_alive())结果:start进程名:Process-1,进程id:17564TrueFalse2、Lock

进程锁:当多个进程访问共享资源时,进程锁保证同一时间只能有一个任务可以进行修改,程序的运行方法有并发改为串行,这样速度慢了,但是保证了数据的安全

示例:

import osimport timeimport randomfrom multiprocessing import Process,Lockdef func(lock,n): lock.acquire() #加锁 print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() #释放if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=func,args=(lock,i)) p.start()3、Semaphore

信号量:Lock(锁)可以保证同一时间只能有一个任务对共享数据进行操作,而Semaphore(信号量)可以在同一时间让指定数量的进程操作共享数据。

示例:迷你唱吧

import timeimport randomfrom multiprocessing import Processfrom multiprocessing import Semaphore'''迷你唱吧,20个人,同一时间只能有4个人进去'''def sing(i,sem): sem.acquire() # 加锁 print('%s enter the ktv'%i) time.sleep(random.randint(1,10)) print('%s leave the ktv'%i) sem.release() # 释放if __name__ == '__main__': sem = Semaphore(4) for i in range(20): p = Process(target=sing,args=(i,sem)) p.start()4、Event

事件:Event是进程之间的状态标记通信,因为进程不共享数据,所以事件对象需要以参数形式传递到函数中使用。

主要方法:

e = Event() # 实例化一个事件对象
e.set() # 标记变为非阻塞
e.wait() # 默认标记为阻塞,在等待的过程中,遇到非阻塞信号就继续执行
e.clear() # 标记变为阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞

示例:红绿灯

import timeimport randomfrom multiprocessing import Eventfrom multiprocessing import Processdef traffic_light(e): while True: if e.is_set(): # True为绿灯 time.sleep(3) # 等3秒后变为红灯 print("红灯亮") e.clear() else: # False为红灯,等3秒后变为绿灯 time.sleep(3) print("绿灯亮") e.set()def car(i,e): e.wait() # 默认是红灯 print("%s 车通过"%i)if __name__ == '__main__': e = Event() # 控制红绿灯的进程 tra = Process(target=traffic_light,args=(e,)) tra.start() for i in range(100): if i%6 == 0: time.sleep(random.randint(1,3)) p = Process(target=car,args=(i,e)) p.start()5、Pipe

管道是进程间通信(IPC)的一种,管道是双向通信的,但它不保证数据安全
创建管道:p1,p2=Pipe()

主要方法:

send():发送数据
recv():接收数据
close():关闭

示例:

def func(p): foo,son = p foo.close() # 不使用主进程的管道一端,先行关闭 while True: try: print(son.recv()) # 子进程在结束数据时,如果管道无数据,且对端没有close,就会报EOFError;如果管道无数据,对端没close,进程会阻塞 except EOFError: breakif __name__ == '__main__': foo,son = Pipe() p = Process(target=func,args=((foo,son),)) p.start() son.close() # 不使用子进程的管道一端,先行关闭 foo.send("hello") foo.send("hello") foo.close()6、Queue

队列:进程之间是独立的,要实现进程间通信(IPC);multiprocessing模块支持两种形式:队列(queue)和管道(pipe),这两种方式都是使用消息传递的,且都是双向通信的,Queue = Pipe+Lock。

队列的两种创建方式:

q = Queue() # 创建队列对象,无长度限制
q1 = Queue(3) # 传参数,创建一个有最大长度限制的队列

队列的主要方法:

q.put(1) # 放入一个数据,对于无长度限制的队列来说,永不阻塞;对于有长度限制的队列来说,放满就阻塞

q.get() # 队列中有数据就取出一个数据,队列中无数据就会阻塞;遵循先进先出原则

q.qsize() # 查看队列的数据大小,不一定准确

示例1:主进程与子进程之间的通信

from multiprocessing import Processfrom multiprocessing import Queuedef queue_put(q): q.put("123") # 子进程队列中放入一个变量if __name__ == '__main__': q = Queue() p = Process(target=queue_put,args=(q,)) p.start() print(q.get()) # 主进程获取到变量示例2:子进程与子进程之间的通信from multiprocessing import Processfrom multiprocessing import Queuedef queue_put(q): q.put("123") # 子进程队列中放入一个变量def queue_get(q): print(q.get()) # 另一个子进程获取到队列中的数据if __name__ == '__main__': q = Queue() p = Process(target=queue_put,args=(q,)) p.start() p1 = Process(target=queue_get,args=(q,)) p1.start()7、JoinableQueue

JoinableQueue也是multiprocessing模块的一种队列的实现,但它与Queue不同的是JoinableQueue允许项目的使用者通知生成者项目已经被成功处理。创建方式同Queue。
主要方法:put与get与Queue一致
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

示例:生产者消费者模型

import timeimport randomfrom multiprocessing import JoinableQueuefrom multiprocessing import Process'''程序执行流程1、生产者生产的数据全部被消费 --> 2、生产者进程结束 --> 3、主进程代码执行结束 --> 4、消费者守护进程结束'''def producer(q,food): for i in range(5): q.put("%s -- %s"%(i,food)) print("生产了 %s"%(food)) time.sleep(random.random()) q.join() # 2、等待消费者消费完所有数据def consumer(q,name): while True: food = q.get() if food == None:break print("%s 吃了 %s"%(name,food)) q.task_done() # 1、消费者每消费一个数据就返回一个task_done给生产者if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer,args=(q,'youtiao')) p1.start() p2 = Process(target=producer,args=(q,'baozi')) p2.start() c1 = Process(target=consumer,args=(q,'daxiong')) c1.daemon = True # 4、消费者守护进程结束 c1.start() c2 = Process(target=consumer,args=(q,'chenglei')) c2.daemon = True c2.start() c3 = Process(target=consumer,args=(q,'niu')) c3.daemon = True c3.start() p1.join() # 3、等待p1执行完毕 p2.join() # 3、等待p2执行完毕8、Manager

Manager也是multiprocessing模块的一个类,这个类主要提供了进程间通信(IPC)的一个机制,它支持Python所有的数据类型,但不提供数据安全的机制。

示例:

from multiprocessing import Managerfrom multiprocessing import Processdef func(d): print(d) d['num'] -= 10if __name__ == '__main__': m = Manager() d = m.dict({'num':100}) l = [] for i in range(10): p = Process(target=func,args=(d,)) p.start() # p.join() # 同步 l.append(p) for j in l: j.join() # 异步结果:{'num': 100}{'num': 90}{'num': 80}{'num': 70}{'num': 60}{'num': 50}{'num': 40}{'num': 30}{'num': 20}{'num': 10}9、Pool

在执行大量并发任务时,多进程是行之有效的手段之一,但是多进程需要注意几个问题,一是操作系统不可能无限开启进程,一般是有几个核开启几个进程,二是开启进程过多,系统资源占用过多,会导致系统运行速度变慢;那么遇到这种情况时pool(进程池)便是最好的解决方案。
Pool可以指定开启一定数量的进程(一般为CPU核数+1个)等待用户使用,当有新的请求进入时,如果池中有空闲进程,便直接开启;如果池中的进程都在使用,那么该请求就会等待,直到池中有进程结束,重用该进程。

示例1:多进程与进程池效率对比

import timefrom multiprocessing import Processfrom multiprocessing import Pooldef func(i): i -= 1if __name__ == '__main__': # 计算进程池所需事件 start1_time = time.time() # 开始时间 p = Pool(5) # 进程池中创建5个进程 p.map(func,range(100)) # 调用进程执行任务,target = func args = (1,2,3...),第二个参数要是可迭代对象 p.close() # 不允许再向进程池中添加任务 p.join() # 等待进程池中所有进程执行结束 stop1_time = time.time() - start1_time # 结束时间 print("进程池所需时间: %s "%stop1_time) # 计算多进程所需时间 start2_time = time.time() # 开始时间 l = [] for i in range(100): p1 = Process(target=func,args=(i,)) p1.start() l.append(p) for j in l: j.join() stop2_time = time.time() - start2_time print("多进程所需时间: %s"%stop2_time)结果:进程池所需时间: 0.19990277290344238 多进程所需时间: 1.7190303802490234

由上可知,进程池在执行大量并发任务时的效率。

主要方法:

map(self, func, iterable, chunksize=None):将func应用于iterable中的每个元素,收集结果在返回的列表中。
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):异步的map
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):异步提交任务的机制
apply(self, func, args=(), kwds={}):同步提交任务的机制
close():不允许再提交新的任务
join():等待进程池中的进程执行结束在往下执行,此方法只能在close()或teminate()之后调用

执行apply或apply_async方法时,会返回ApplyResult类的实例对象
ApplyResult类有以下方法:
obj.get():获取进程的返回值
obj.ready():调用完成时,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发ValueError异常
obj.wait([timeout]):等待结果变为可用

示例2:apply与apply_async方法

import timefrom multiprocessing import Pool'''apply:同步提交任务的机制apply_async:异步提交任务机制'''def func(i): time.sleep(1) i += 1 print(i)if __name__ == '__main__': p = Pool(5) res_l = [] for i in range(20): # p.apply(func,args=(i,)) # 同步,执行完毕立即获取到返回值 res = p.apply_async(func,args=(i,)) # 异步,通过get获取返回值 res_l.append(res) p.close() # 不允许再提交新的任务 p.join() # 等待进程池中的进程执行结束在往下执行 for res in res_l: print(res.get()) # 使用get来获取apply_aync的结果10、pool的call函数

在进程池中,一个进程任务结束就会返回一个结果,主进程则调用一个函数去处理这个结果,这就是回调函数。回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值;

示例:请求网页

在爬虫中,使用回调比较多,爬虫将访问网页、下载网页的过程放到子进程中去做,分析数据,处理数据让回调函数去做,因为访问网页与下载网页有网络延时,而处理数据只占用很小的时间

import requestsfrom multiprocessing import Pooldef get(url): ret = requests.get(url) return {'url':url, 'status_code':ret.status_code, 'content':ret.text}def parser(dic): print(dic['url'],len(dic['content'])) parse_url = "URL:%s Size:%s"%(dic['url'],len(dic['content'])) with open('url.txt','a') as f: f.write(parse_url+'\n')if __name__ == '__main__': url_l = [ 'http://www.baidu.com', 'http://www.google.com', 'https://zh.wikipedia.org/wiki/Wikipedia:%E9%A6%96%E9%A1%B5', 'https://www.youtube.com/?app=desktop', 'https://www.facebook.com/' ] p = Pool(5) for i in url_l: p.apply_async(get,args=(i,),callback=parser) p.close() p.join()