进程
进程其实和线程在python中使用的代码的格式很相似,他的存在更像是变相的逃避了gil的封锁
在cpython中我们的python相当一个单核心编程,无论你创建了多少个线程,只有一个线程在真正的运算,所以对于多核时代cpython解释器下的python真的很艰难,而我们又不能舍弃cpython下那些丰富的模块,所以我们只好变相的妥协,利用多进程的方式:
multiprocessing就是开启多进程的模块
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
multiprocessing的调用方式
# Process类调用 from multiprocessing import Process import time def f(name): print('hello', name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin:%s'%i,)) p_list.append(p) p.start() for i in p_list: p.join() print('end') # 继承Process类调用 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): print ('hello', self.name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
我们可以看到他也有,start,joing跟我们调用threading一样
进程之间的通讯
multiprocessing.Queue列队
进程之间的通讯,也和线程的queue一样的效果,
不同点在于他们用的子进程是独立于主进程之外的一个独立的内存空间所以需要需要引用进去
这里的rags=(q,)就是引用
import multiprocessing def bar(): print('is bar ok') def foo(q): q.put(bar) if __name__=='__main__': q=multiprocessing.Queue()#创建进程列队 p=multiprocessing.Process(target=foo,args=(q,))#创建进程 p.start()#运行进程 q.get()()#第一个括号是从列队中取出,第二个括号是运行
管道(Pipe)
他就好比在进程和子进程之间挖连上一个水管子,这个水管子分别有send发送和recv接收两个命令
调用方法multiprocessing import Pipe
from multiprocessing import Pipe,process def foo(sk): sk.send('你好') sock,conn=Pipe()#这里建立了通道当然进程和子进程谁用那个都可以 if __name__=='__main__': p=process(target=foo,args=(sock,))#可以理解为我将管道放进子进程 p.start#启动子进程 print(conn.recv())#主进程打印收到的效果
manager数据共享
我们知道了Pipe和queue可让进程之间有了通讯的价值,但是他们之间是否拥有一个可以共同调用的数据呢?这里就需要用到manager
from multiprocessing import Manager,Process def foo(ml,md,i): ml.append(i*i)#将导入的i平方后传回去 md['%s' %i]=(i*i) if __name__=='__main__': manger=Manger()#将Manger实例化 ml=manger.list([1,2,3])#这里的list是数据类型列表的全写 md=manger.dict({'a':1})#dict必须写全这样才可以确认是多进程共享的字典 pl=[]#创建一个join的列表 for i in range(10): p=Process(target=foo,args=(ml,md,l))#建立进程 p.start()#开启进程 pl.append(p) for i in pl: i.join()#开启进程等待 print(ml) print(md)
进程池
进程池就好比我们如果一个for循环下回开启很多个进程,如果我循环100次会开启100个进程,那么问题来了 你确定你电脑这么牛逼????能开启100个进程,卡死你丫的。
可是我的任务量很多,不多开几个不行啊。万一运行了5个之后我还有活要干呢??所以万能的人类就创建了进程池这个东西。
就是为了满足同时只有几个进程在运行,如果有个进程运行完了,那么我们就开一个进程补上,始终保持这么多进程一直到任务的结束
from multiprocessing import Pool import time,random def foo(n): print(n) time.sleep(random.randint(1,3))#随机睡眠1-3秒 if __name__=='__main__': pool_obj=Pool(5)#这里我们控制一下进程的数量,上限5个 for i in range(1000): p=pool_obj.apply_async(func=foo,args=(i,))#用异步的方式开启子进程 pool_obj.close()#当我们完成了任务后需要做的是关闭池子 pool_obj.join()#让我们的系统等待着池子内的一切都完成
进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有以下几个主要方法:
- apply:从进程池里取一个进程并执行
- apply_async:apply的异步版本
- terminate:立刻关闭线程池
- join:主进程等待所有子进程执行完毕,必须在close或terminate之后
- close:等待所有进程结束后,才关闭线程池