betway必威-betway必威官方网站
做最好的网站

betway必威官方网站:python并发编程之进程,进程

操作系统的作用

  一般而言,现代计算机系统是一个复杂的系统。

  其一:如果每位应用程序员都必须掌握该系统所有的细节,那就不可能再编写代码了(严重影响了程序员的开发效率:全部掌握这些细节可能需要一万年....)

  其二:并且管理这些部件并加以优化使用,是一件极富挑战性的工作,于是,计算安装了一层软件(系统软件),称为操作系统。它的任务就是为用户程序提供一个更好、更简单、更清晰的计算机模型,并管理刚才提到的所有设备。

  总结:

  程序员无法把所有的硬件操作细节都了解到,管理这些硬件并且加以优化使用是非常繁琐的工作,这个繁琐的工作就是操作系统来干的,有了他,程序员就从这些繁琐的工作中解脱了出来,只需要考虑自己的应用软件的编写就可以了,应用软件直接使用操作系统提供的功能来间接使用硬件。

  精简的说的话,操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序。操作系统所处的位置如图

betway必威官方网站 1

  细说的话,操作系统应该分成两部分功能: 

#一:隐藏了丑陋的硬件调用接口,为应用程序员提供调用硬件资源的更好,更简单,更清晰的模型(系统调用接口)。应用程序员有了这些接口后,就不用再考虑操作硬件的细节,专心开发自己的应用程序即可。
例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制(比如控制磁盘转动,移动磁头读写数据等细节),

#二:将应用程序对硬件资源的竞态请求变得有序化
例如:很多应用软件其实是共享一套计算机硬件,比方说有可能有三个应用程序同时需要申请打印机来输出内容,那么a程序竞争到了打印机资源就打印,然后可能是b竞争到打印机资源,也可能是c,这就导致了无序,
打印机可能打印一段a的内容然后又去打印c...,操作系统的一个功能就是将这种无序变得有序。

 

参考学习:

一 multiprocessing模块介绍

    python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
    multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

  multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

阅读目录

一. cpython并发编程之多进程
1.1 multiprocessing模块介绍
1.2 Process类的介绍
1.3 Process类的使用
1.4 进程间通信(IPC)方式一:队列
1.5 进程间通信(IPC)方式二:管道(了解部分)
1.6 进程间通信方式三:共享数据
1.7 进程同步(锁),信号量,事件...
1.8 进程池
二. python并发编程之多线程
2.1 threading模块
2.2 Python GIL(Global Interpreter Lock)
2.3 同步锁
2.4 死锁与递归锁
2.5 信号量Semahpore
2.6 事件Event
2.7 条件Condition(了解)

2.8 定时器Timer
2.9 线程queue
2.10 Python标准模块--concurrent.futures
三.  协程

四. 协程模块greenlet

五. gevent模块(单线程并发)

六. 综合应用

 

进程

二 Process类的介绍

    创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

    参数介绍:

betway必威官方网站 2

1 group参数未使用,值始终为None
2 
3 target表示调用对象,即子进程要执行的任务
4 
5 args表示调用对象的位置参数元组,args=(1,2,'egon',)
6 
7 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
8 
9 name为子进程的名称

betway必威官方网站 3

  方法介绍:

betway必威官方网站 4

 1 p.start():启动进程,并调用该子进程中的p.run() 
 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
 3 
 4 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
 5 p.is_alive():如果p仍然运行,返回True
 6 
 7 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  

betway必威官方网站 5

    属性介绍:

betway必威官方网站 6

1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 
3 p.name:进程的名称
4 
5 p.pid:进程的pid
6 
7 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
8 
9 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

一. cpython并发编程之多进程

必备的理论基础:

betway必威官方网站 7

#一 操作系统的作用:
    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
    2:管理、调度进程,并且将多个进程对硬件的竞争变得有序

#二 多道技术:
    1.产生背景:针对单核,实现并发
    ps:
    现在的主机一般是多核,那么每个核都会利用多道技术
    有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
    cpu中的任意一个,具体由操作系统调度算法决定。

    2.空间上的复用:如内存中同时有多道程序
    3.时间上的复用:复用一个cpu的时间片
       强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样
            才能保证下次切换回来时,能基于上次切走的位置继续运行

betway必威官方网站 8

三 Process类的使用

注意:在windows中Process()必须放到# if __name__ == '__main__':下

 

betway必威官方网站 9betway必威官方网站 10

1 Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
2 If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
3 This is the reason for hiding calls to Process() inside
4 
5 if __name__ == "__main__"
6 since statements inside this if-statement will not get called upon import.
7 由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
8 如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
9 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

详细解释

 

创建并开启子进程的两种方式

betway必威官方网站 11betway必威官方网站 12

 1 #开进程的方法一:
 2 import time
 3 import random
 4 from multiprocessing import Process
 5 def piao(name):
 6     print('%s piaoing' %name)
 7     time.sleep(random.randrange(1,5))
 8     print('%s piao end' %name)
 9 
10 
11 
12 p1=Process(target=piao,args=('egon',)) #必须加,号
13 p2=Process(target=piao,args=('alex',))
14 p3=Process(target=piao,args=('wupeqi',))
15 p4=Process(target=piao,args=('yuanhao',))
16 
17 p1.start()
18 p2.start()
19 p3.start()
20 p4.start()
21 print('主线程')

方法一

betway必威官方网站 13betway必威官方网站 14

 1 #开进程的方法二:
 2 import time
 3 import random
 4 from multiprocessing import Process
 5 
 6 
 7 class Piao(Process):
 8     def __init__(self,name):
 9         super().__init__()
10         self.name=name
11     def run(self):
12         print('%s piaoing' %self.name)
13 
14         time.sleep(random.randrange(1,5))
15         print('%s piao end' %self.name)
16 
17 p1=Piao('egon')
18 p2=Piao('alex')
19 p3=Piao('wupeiqi')
20 p4=Piao('yuanhao')
21 
22 p1.start() #start会自动调用run
23 p2.start()
24 p3.start()
25 p4.start()
26 print('主线程')

方法二

进程直接的内存空间是隔离的

betway必威官方网站 15betway必威官方网站 16

 1 from multiprocessing import Process
 2 n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了
 3 def work():
 4     global n
 5     n=0
 6     print('子进程内: ',n)
 7 
 8 
 9 if __name__ == '__main__':
10     p=Process(target=work)
11     p.start()
12     print('主进程内: ',n)

View Code

betway必威官方网站 17betway必威官方网站 18

 1 from socket import *
 2 from multiprocessing import Process
 3 
 4 server=socket(AF_INET,SOCK_STREAM)
 5 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
 6 server.bind(('127.0.0.1',8080))
 7 server.listen(5)
 8 
 9 def talk(conn,client_addr):
10     while True:
11         try:
12             msg=conn.recv(1024)
13             if not msg:break
14             conn.send(msg.upper())
15         except Exception:
16             break
17 
18 if __name__ == '__main__': #windows下start进程一定要写到这下面
19     while True:
20         conn,client_addr=server.accept()
21         p=Process(target=talk,args=(conn,client_addr))
22         p.start()

服务端

betway必威官方网站 19betway必威官方网站 20

 1 from socket import *
 2 
 3 client=socket(AF_INET,SOCK_STREAM)
 4 client.connect(('127.0.0.1',8080))
 5 
 6 
 7 while True:
 8     msg=input('>>: ').strip()
 9     if not msg:continue
10 
11     client.send(msg.encode('utf-8'))
12     msg=client.recv(1024)
13     print(msg.decode('utf-8'))

客户端

每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。
解决方法:进程池


Process对象的join方法

betway必威官方网站 21betway必威官方网站 22

 1 from multiprocessing import Process
 2 import time
 3 import random
 4 
 5 class Piao(Process):
 6     def __init__(self,name):
 7         self.name=name
 8         super().__init__()
 9     def run(self):
10         print('%s is piaoing' %self.name)
11         time.sleep(random.randrange(1,3))
12         print('%s is piao end' %self.name)
13 
14 
15 p=Piao('egon')
16 p.start()
17 p.join(0.0001) #等待p停止,等0.0001秒就不再等了
18 print('开始')

主进程等待子进程

betway必威官方网站 23betway必威官方网站 24

 1 from multiprocessing import Process
 2 import time
 3 import random
 4 def piao(name):
 5     print('%s is piaoing' %name)
 6     time.sleep(random.randint(1,3))
 7     print('%s is piao end' %name)
 8 
 9 p1=Process(target=piao,args=('egon',))
10 p2=Process(target=piao,args=('alex',))
11 p3=Process(target=piao,args=('yuanhao',))
12 p4=Process(target=piao,args=('wupeiqi',))
13 
14 p1.start()
15 p2.start()
16 p3.start()
17 p4.start()
18 
19 #有的同学会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
20 #当然不是了,必须明确:p.join()是让谁等?
21 #很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,
22 
23 #详细解析如下:
24 #进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
25 #而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
26 #join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
27 # 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
28 p1.join()
29 p2.join()
30 p3.join()
31 p4.join()
32 
33 print('主线程')
34 
35 
36 #上述启动进程与join进程可以简写为
37 # p_l=[p1,p2,p3,p4]
38 # 
39 # for p in p_l:
40 #     p.start()
41 # 
42 # for p in p_l:
43 #     p.join()

有了join 程序就不会串行了吗?

Process对象的其他方法或属性(了解)

betway必威官方网站 25betway必威官方网站 26

 1 #进程对象的其他方法一:terminate,is_alive
 2 from multiprocessing import Process
 3 import time
 4 import random
 5 
 6 class Piao(Process):
 7     def __init__(self,name):
 8         self.name=name
 9         super().__init__()
10 
11     def run(self):
12         print('%s is piaoing' %self.name)
13         time.sleep(random.randrange(1,5))
14         print('%s is piao end' %self.name)
15 
16 
17 p1=Piao('egon1')
18 p1.start()
19 
20 p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
21 print(p1.is_alive()) #结果为True
22 
23 print('开始')
24 print(p1.is_alive()) #结果为False

terminate和is_alive

betway必威官方网站 27betway必威官方网站 28

 1 from multiprocessing import Process
 2 import time
 3 import random
 4 class Piao(Process):
 5     def __init__(self,name):
 6         # self.name=name
 7         # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
 8         #                    #所以加到这里,会覆盖我们的self.name=name
 9 
10         #为我们开启的进程设置名字的做法
11         super().__init__()
12         self.name=name
13 
14     def run(self):
15         print('%s is piaoing' %self.name)
16         time.sleep(random.randrange(1,3))
17         print('%s is piao end' %self.name)
18 
19 p=Piao('egon')
20 p.start()
21 print('开始')
22 print(p.pid) #查看pid

name和pid

 

1.1 multiprocessing模块介绍

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing。
 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

强调: 与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

什么是进程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

betway必威官方网站 29betway必威官方网站 30

第一,进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。
第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操作系统执行之),它才能成为一个活动的实体,我们称其为进程。[3] 
进程是操作系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的活动规律引进的一个概念,所有多道程序设计操作系统都建立在进程的基础上。

进程的概念

betway必威官方网站 31betway必威官方网站 32

从理论角度看,是对正在运行的程序过程的抽象;
从实现角度看,是一种数据结构,目的在于清晰地刻画动态系统的内在规律,有效管理和调度进入计算机系统主存储器运行的程序。

操作系统引入进程概念的原因

betway必威官方网站 33betway必威官方网站 34

动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。
并发性:任何进程都可以同其他进程一起并发执行
独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;
异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进
结构特征:进程由程序、数据和进程控制块三部分组成。
多个不同的进程可以包含相同的程序:一个程序在不同的数据集里就构成不同的进程,能得到不同的结果;但是执行过程中,程序不能发生改变。

进程的特征

betway必威官方网站 35betway必威官方网站 36

程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。
而进程是程序在处理机上的一次执行过程,它是一个动态的概念。
程序可以作为一种软件资料长期存在,而进程是有一定生命期的。
程序是永久的,进程是暂时的。

进程与程序中的区别

 

 注意:同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。

四 守护进程

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

betway必威官方网站 37betway必威官方网站 38

 1 from multiprocessing import Process
 2 import time
 3 import random
 4 
 5 class Piao(Process):
 6     def __init__(self,name):
 7         self.name=name
 8         super().__init__()
 9     def run(self):
10         print('%s is piaoing' %self.name)
11         time.sleep(random.randrange(1,3))
12         print('%s is piao end' %self.name)
13 
14 
15 p=Piao('egon')
16 p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
17 p.start()
18 print('主')

守护进程

betway必威官方网站 39betway必威官方网站 40

 1 #主进程代码运行完毕,守护进程就会结束
 2 from multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def foo():
 6     print(123)
 7     time.sleep(1)
 8     print("end123")
 9 
10 def bar():
11     print(456)
12     time.sleep(3)
13     print("end456")
14 
15 
16 p1=Process(target=foo)
17 p2=Process(target=bar)
18 
19 p1.daemon=True
20 p1.start()
21 p2.start()
22 print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止

守护进程2

1.2 Process类的介绍

创建进程的类:

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,'egon',)

kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}

name为子进程的名称

方法介绍:

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍:

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

p.name:进程的名称

p.pid:进程的pid

p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

 

进程调度

 要想多个进程交替运行,操作系统必须对这些进程进行调度,这个调度也不是随即进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。

betway必威官方网站 41betway必威官方网站 42

先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可用于作业调度,也可用于进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)。

先来先服务(FCFS)调度算法

betway必威官方网站 43betway必威官方网站 44

短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法,该算法既可用于作业调度,也可用于进程调度。但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。

短作业(进程)优先调度算法(SJ/PF)

betway必威官方网站 45betway必威官方网站 46

时间片轮转(Round Robin,RR)法的基本思路是让每个进程在就绪队列中的等待时间与享受服务的时间成比例。在时间片轮转法中,需要将CPU的处理时间分成固定大小的时间片,例如,几十毫秒至几百毫秒。如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。
      显然,轮转法只能用来调度分配一些可以抢占的资源。这些可以抢占的资源可以随时被剥夺,而且可以将它们再分配给别的进程。CPU是可抢占资源的一种。但打印机等资源是不可抢占的。由于作业调度是对除了CPU之外的所有系统硬件资源的分配,其中包含有不可抢占资源,所以作业调度不使用轮转法。
在轮转法中,时间片长度的选取非常重要。首先,时间片长度的选择会直接影响到系统的开销和响应时间。如果时间片长度过短,则调度程序抢占处理机的次数增多。这将使进程上下文切换次数也大大增加,从而加重系统开销。反过来,如果时间片长度选择过长,例如,一个时间片能保证就绪队列中所需执行时间最长的进程能执行完毕,则轮转法变成了先来先服务法。时间片长度的选择是根据系统对响应时间的要求和就绪队列中所允许最大的进程数来确定的。
      在轮转法中,加入到就绪队列的进程有3种情况:
      一种是分给它的时间片用完,但进程还未完成,回到就绪队列的末尾等待下次调度去继续执行。
      另一种情况是分给该进程的时间片并未用完,只是因为请求I/O或由于进程的互斥与同步关系而被阻塞。当阻塞解除之后再回到就绪队列。
      第三种情况就是新创建进程进入就绪队列。
      如果对这些进程区别对待,给予不同的优先级和时间片从直观上看,可以进一步改善系统服务质量和效率。例如,我们可把就绪队列按照进程到达就绪队列的类型和进程被阻塞时的阻塞原因分成不同的就绪队列,每个队列按FCFS原则排列,各队列之间的进程享有不同的优先级,但同一队列内优先级相同。这样,当一个进程在执行完它的时间片之后,或从睡眠中被唤醒以及被创建之后,将进入不同的就绪队列。  

时间片轮转法

betway必威官方网站 47betway必威官方网站 48

前面介绍的各种用作进程调度的算法都有一定的局限性。如短进程优先的调度算法,仅照顾了短进程而忽略了长进程,而且如果并未指明进程的长度,则短进程优先和基于进程长度的抢占式调度算法都将无法使用。
而多级反馈队列调度算法则不必事先知道各种进程所需的执行时间,而且还可以满足各种类型进程的需要,因而它是目前被公认的一种较好的进程调度算法。在采用多级反馈队列调度算法的系统中,调度算法的实施过程如下所述。
(1) 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍,……,第i 1个队列的时间片要比第i个队列的时间片长一倍。
(2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按FCFS原则等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长作业(进程)从第一队列依次降到第n队列后,在第n 队列便采取按时间片轮转的方式运行。

(3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。如果处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,即由调度程序把正在运行的进程放回到第i队列的末尾,把处理机分配给新到的高优先权进程。

多级反馈队列

五 进程同步(锁)

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

竞争带来的结果就是错乱,如何控制,就是加锁处理

part1:多个进程共享同一打印终端

betway必威官方网站 49betway必威官方网站 50

 1 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱
 2 from multiprocessing import Process
 3 import os,time
 4 def work():
 5     print('%s is running' %os.getpid())
 6     time.sleep(2)
 7     print('%s is done' %os.getpid())
 8 
 9 if __name__ == '__main__':
10     for i in range(3):
11         p=Process(target=work)
12         p.start()

并发运行,效率高,但竞争同一打印终端,带来了打印错乱

betway必威官方网站 51betway必威官方网站 52

 1 #由并发变成了串行,牺牲了运行效率,但避免了竞争
 2 from multiprocessing import Process,Lock
 3 import os,time
 4 def work(lock):
 5     lock.acquire()
 6     print('%s is running' %os.getpid())
 7     time.sleep(2)
 8     print('%s is done' %os.getpid())
 9     lock.release()
10 if __name__ == '__main__':
11     lock=Lock()
12     for i in range(3):
13         p=Process(target=work,args=(lock,))
14         p.start()

加锁:由并发变成了串行,牺牲了运行效率,但避免了竞争

part2:多个进程共享同一文件

文件当数据库,模拟抢票

betway必威官方网站 53betway必威官方网站 54

 1 from multiprocessing import Process
 2 import time, random
 3 import json, os
 4 from multiprocessing import Lock
 5 
 6 def search():
 7     dic = json.load(open('../db/db.txt'))
 8     print('\033[33m%s剩余票数%s\033[33m' % (os.getpid(), dic['count']))
 9 
10 
11 def get():
12     with open('../db/db.txt', 'r', encoding='utf8') as read_f:
13         dic = json.load(read_f)
14     time.sleep(0.1)
15     if dic['count'] > 0:
16         dic['count'] -= 1
17         time.sleep(0.2)
18         with open('../db/db.txt', 'w', encoding='utf8') as write_f:
19             json.dump(dic, write_f)
20             print('\033[33m%s购票成功\033[0m' %os.getpid())
21 
22 
23 def task(mutex):
24     search()
25     with mutex:
26         get()
27 
28 
29 if __name__ == '__main__':
30     mutex = Lock()
31     for i in range(100):
32         p = Process(target=task, args=(mutex,))
33         p.start()

并发运行,效率高,但竞争写同一文件,数据写入错乱

betway必威官方网站 55betway必威官方网站 56

 1 # 文件db的内容为:{"count":1}
 2 # 注意一定要用双引号,不然json无法识别
 3 def search():
 4     with open('../db/db.txt', 'r', encoding='utf8') as f:
 5         dic = json.load(f)
 6     print('\033[34m%s剩余票数%s\033[0m' % (os.getpid(), dic['count']))
 7 
 8 def get():
 9     with open('../db/db.txt', 'r', encoding='utf8') as read_f:
10         dic = json.load(read_f)
11     time.sleep(0.1)
12     if dic['count'] > 0:
13         dic['count'] -= 1
14         with open('../db/db.txt', 'w', encoding='utf8') as write_f:
15             json.dump(dic, write_f)
16         print('\033[32m%s购买成功\033[0m' % os.getpid())
17 
18 
19 def task(mutex):
20     search()
21     with mutex:
22         get()
23 
24 if __name__ == '__main__':
25     mutex = Lock()
26     for i in range(4):
27         p = Process(target=task, args=(mutex,))
28         p.start()

加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

总结:

#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理



#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道 锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

1.3 Process类的使用

进程的并行与并发

并行 : 并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )

并发 : 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

区别:

并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

六 队列(推荐使用)

   进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

 创建队列的类(底层就是以管道和锁定的方式实现)

1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

    参数介绍:

1 maxsize是队列中允许最大项数,省略则无大小限制。    

  方法介绍:

    主要方法:

betway必威官方网站 57

betway必威官方网站 58

1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

betway必威官方网站 59

betway必威官方网站 60

    其他方法(了解):

1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

  应用:

betway必威官方网站 61betway必威官方网站 62

 1 '''
 2 multiprocessing模块支持进程间通信的两种主要形式:管道和队列
 3 都是基于消息传递实现的,但是队列接口
 4 '''
 5 
 6 from multiprocessing import Process,Queue
 7 import time
 8 q=Queue(3)
 9 
10 
11 #put ,get ,put_nowait,get_nowait,full,empty
12 q.put(3)
13 q.put(3)
14 q.put(3)
15 print(q.full()) #满了
16 
17 print(q.get())
18 print(q.get())
19 print(q.get())
20 print(q.empty()) #空了

队列

    生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

 

betway必威官方网站 63betway必威官方网站 64

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 
 4 def consumer(q):
 5     while True:
 6         res = q.get()
 7         time.sleep(random.randint(1, 3))
 8         print('\033[45m%s 吃了 %s33[0m' % (os.getpid(), res))
 9 
10 
11 def producer(q):
12     for i in range(10):
13         time.sleep(random.randrange(1, 3))
14         res = '包子%s' % i
15         q.put(res)
16         print('\033[44m%s 生产了 %s\033[0m' % (os.getpid(), res))
17 
18 
19 if __name__ == '__main__':
20     q = Queue()
21     p1 = Process(target=consumer, args=(q,))
22     c1 = Process(target=producer, args=(q,))
23     p1.start()
24     c1.start()
25     print('主')

厨师和吃货

 

#生产者消费者模型总结

    #程序中有两类角色
        一类负责生产数据(生产者)
        一类负责处理数据(消费者)

    #引入生产者消费者模型为了解决的问题是:
        平衡生产者与消费者之间的速度差

    #如何实现:
        生产者-》队列——》消费者
    #生产者消费者模型实现类程序的解耦和

 

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

betway必威官方网站 65betway必威官方网站 66

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 
 4 def consumer(q):
 5     while True:
 6         res = q.get()
 7         if res is None: break  # 收到结束信号则结束
 8         time.sleep(random.randint(1, 3))
 9         print('\033[34m%s 吃了 %s\033[0m' % (os.getpid(), res))
10 
11 def producer(q):
12     for i in range(10):
13         res = '包子%s' % i
14         time.sleep(random.randrange(1, 3))
15         q.put(res)
16         print('\033[32m%s 生产了 %s\033[0m' % (os.getpid(), res))
17     q.put(None)  # 发送结束信号
18 
19 if __name__ == '__main__':
20     q = Queue()
21     p1 = Process(target=producer, args=(q,))
22     c1 = Process(target=consumer, args=(q,))
23     p1.start()
24     c1.start()
25     print('主')

发送结束信号

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

betway必威官方网站 67betway必威官方网站 68

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 
 4 def consumer(q):
 5     while True:
 6         res = q.get()
 7         if res is None: break  # 收到结束信号则结束
 8         time.sleep(random.randint(1, 3))
 9         print('\033[35m%s 吃 %s\033[0m' % (os.getpid(), res))
10 
11 
12 def producer(q):
13     for i in range(10):
14         time.sleep(random.randrange(1, 3))
15         res = '包子%s' % i
16         q.put(res)
17         print('\033[32m%s 生产了 %s\033[0m' % (os.getpid(), res))
18 
19 
20 if __name__ == '__main__':
21     q = Queue()
22 
23     p1 = Process(target=producer, args=(q,))
24     c1 = Process(target=consumer, args=(q,))
25     p1.start()
26     c1.start()
27     p1.join()
28     q.put(None)  # 发送结束信号
29     print('主')

主进程发送结束信号

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

betway必威官方网站 69betway必威官方网站 70

from multiprocessing import Process,Queue
import time,random,os

def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 收到结束信号则结束
        time.sleep(random.randint(1, 3))
        print('\033[35m%s 吃 %s\033[0m' % (os.getpid(), res))


def producer(q, name):
    for i in range(10):
        time.sleep(random.randrange(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('\033[32m%s 生产了 %s\033[0m' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q, '包子'))
    p2 = Process(target=producer, args=(q, '骨头'))
    p3 = Process(target=producer, args=(q, '泔水'))

    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)  # 有几个消费者就应该发送几次结束信号None
    q.put(None)  # 发送结束信号
    print('主')

有几个消费者发送几次消费信号

其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制

   #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

   #参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。    
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

betway必威官方网站 71betway必威官方网站 72

 1 from multiprocessing import Process,JoinableQueue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         time.sleep(random.randint(1,3))
 7         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 8 
 9         q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了
10 
11 def producer(name,q):
12     for i in range(10):
13         time.sleep(random.randint(1,3))
14         res='%s%s' %(name,i)
15         q.put(res)
16         print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
17     q.join()
18 
19 
20 if __name__ == '__main__':
21     q=JoinableQueue()
22     #生产者们:即厨师们
23     p1=Process(target=producer,args=('包子',q))
24     p2=Process(target=producer,args=('骨头',q))
25     p3=Process(target=producer,args=('泔水',q))
26 
27     #消费者们:即吃货们
28     c1=Process(target=consumer,args=(q,))
29     c2=Process(target=consumer,args=(q,))
30     c1.daemon=True
31     c2.daemon=True
32 
33     #开始
34     p_l=[p1,p2,p3,c1,c2]
35     for p in p_l:
36         p.start()
37 
38     p1.join()
39     p2.join()
40     p3.join()
41     print('主') 
42     
43     #主进程等--->p1,p2,p3等---->c1,c2
44     #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
45     #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程

复杂的生产者和消费者模型

1.创建并开启子进程的两种方式

注: 在windows中Process()必须放到# if __name__ == '__main__':下

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.

由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
这是隐藏对Process()内部调用的原理,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

betway必威官方网站 73betway必威官方网站 74

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2017/6/26 0026

import time
import random
from multiprocessing import Process


def talk(name):
    print("%s is say 'Hello'" % name)
    time.sleep(3)
    print("talking end")

if __name__ == '__main__':
    p1=Process(target=talk,args=('Shuke',))         # args是元组的形式,必须加逗号
    p2=Process(target=talk,args=('Tom',))
    p3=Process(target=talk,args=('Eric',))
    p4=Process(target=talk,args=('Lucy',))
    p1.start()
    p2.start()
    p3.start()
    p4.start()

开启进程(方式一)

betway必威官方网站 75betway必威官方网站 76

import time
import random
from multiprocessing import Process


class Talk(Process):    # 继承Process类

    def __init__(self,name):
        super(Talk, self).__init__()    # 继承父类__init__方法
        self.name=name

    def run(self):          # 必须实现一个run方法,规定
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p2=Talk('Eric')
    p3=Talk('Tome')
    p4=Talk('Lucy')

    p1.start()          # start方法会自动调用run方法运行
    p2.start()
    p3.start()
    p4.start()
    print("主线程")

'''
执行结果:
主线程
Shuke is say 'Hello'
Lucy is say 'Hello'
Tome is say 'Hello'
Eric is say 'Hello'
Tome talking end
Eric talking end
Lucy talking end
Shuke talking end
'''

开启进程(方式二)

并发实现socket通信示例

betway必威官方网站 77betway必威官方网站 78

from socket import *
from multiprocessing import Process

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)


def talk(conn, client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':  # windows下start进程一定要写到这下面
    while True:
        conn, addr = server.accept()
        p = Process(target=talk, args=(conn, addr))
        p.start()

server端

betway必威官方网站 79betway必威官方网站 80

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))

while True:
    msg=input('>>:').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

client端

存在的问题:

每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。

解决方法:进程池

同步异步阻塞非阻塞

七 管道

进程间通信(IPC)方式二:管道(不推荐使用,了解即可)

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    

conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

betway必威官方网站 81betway必威官方网站 82

 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def consumer(p,name):
 5     left,right=p
 6     left.close()
 7     while True:
 8         try:
 9             baozi=right.recv()
10             print('%s 收到包子:%s' %(name,baozi))
11         except EOFError:
12             right.close()
13             break
14 def producer(seq,p):
15     left,right=p
16     right.close()
17     for i in seq:
18         left.send(i)
19         # time.sleep(1)
20     else:
21         left.close()
22 if __name__ == '__main__':
23     left,right=Pipe()
24 
25     c1=Process(target=consumer,args=((left,right),'c1'))
26     c1.start()
27 
28 
29     seq=(i for i in range(10))
30     producer(seq,(left,right))
31 
32     right.close()
33     left.close()
34 
35     c1.join()
36     print('主进程')

基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的)

注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

betway必威官方网站 83betway必威官方网站 84

 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def adder(p,name):
 5     server,client=p
 6     client.close()
 7     while True:
 8         try:
 9             x,y=server.recv()
10         except EOFError:
11             server.close()
12             break
13         res=x y
14         server.send(res)
15     print('server done')
16 if __name__ == '__main__':
17     server,client=Pipe()
18 
19     c1=Process(target=adder,args=((server,client),'c1'))
20     c1.start()
21 
22     server.close()
23 
24     client.send((10,20))
25     print(client.recv())
26     client.close()
27 
28     c1.join()
29     print('主进程')
30 #注意:send()和recv()方法使用pickle模块对对象进行序列化。

管道可以用于双向通信,利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序

2. Process对象的其他方法和属性

进程对象的其他方法一:terminate,is_alive

betway必威官方网站 85betway必威官方网站 86

import time
import random
from multiprocessing import Process


class Talk(Process):    # 继承Process类

    def __init__(self,name):
        super(Talk, self).__init__()    # 继承父类__init__方法
        self.name=name

    def run(self):          # 必须实现一个run方法,规定
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')

    p1.start()          # start方法会自动调用run方法运行
    p1.terminate()      # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p1.is_alive())# True
    time.sleep(1)       # 模拟CPU调度的延时
    print("====分割线====")
    print(p1.is_alive())# False

'''
执行结果:
True
====分割线====
False
'''

terminate,is_alive

进程对象的其他方法二:p1.daemon=True,p1.join

betway必威官方网站 87betway必威官方网站 88

import time
import random
from multiprocessing import Process


class Talk(Process):

    def __init__(self,name):
        super(Talk, self).__init__()
        self.name=name

    def run(self):
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p1.daemon = True    # 一定要在p1.start()前设置,设置p1为守护进程,禁止p1创建子进程,并且父进程结束,p1跟着一起结束
    p1.start()          # start方法会自动调用run方法运行
    p1.join(0.0001)     # 等待p1停止,等0.0001秒就不再等了

p1.daemon=True,p1.join

剖析p1.join

betway必威官方网站 89betway必威官方网站 90

from multiprocessing import Process

import time
import random
def piao(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)

p1=Process(target=piao,args=('egon',))
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('yuanhao',))
p4=Process(target=piao,args=('wupeiqi',))

p1.start()
p2.start()
p3.start()
p4.start()

p1.join()
p2.join()
p3.join()
p4.join()

print('主线程')

#疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
#当然不是了
#注意:进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
#而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过
# 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间


#上述启动进程与join进程可以简写为
p_l=[p1,p2,p3,p4]

for p in p_l:
    p.start()

for p in p_l:
    p.join()

有了join,程序不就是串行了吗???

进程对象的其他属性:name,pid

betway必威官方网站 91betway必威官方网站 92

import time
import random
from multiprocessing import Process


class Talk(Process):

    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        # 为我们开启的进程设置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p1.start()          # start方法会自动调用run方法运行
    print("====")
    print(p1.pid)       # 查看pid

'''
执行结果:
====
20484
Shuke is say 'Hello'
Shuke talking end
'''

属性:name,pid

状态介绍

betway必威官方网站 93

  在了解其他概念之前,我们首先要了解进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。

  (1)就绪(Ready)状态

  当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

  (2)执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

  (3)阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

      betway必威官方网站 94

八 共享数据

展望未来,基于消息传递的并发编程是大势所趋

即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合

通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,

还可以扩展到分布式系统中

进程间通信应该尽量避免使用本节所讲的共享数据的方式

betway必威官方网站 95

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的

虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,

betway必威官方网站 96betway必威官方网站 97

 1 from multiprocessing import Manager,Process,Lock
 2 import os
 3 def work(d,lock):
 4     # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
 5         d['count']-=1
 6 
 7 if __name__ == '__main__':
 8     lock=Lock()
 9     with Manager() as m:
10         dic=m.dict({'count':100})
11         p_l=[]
12         for i in range(100):
13             p=Process(target=work,args=(dic,lock))
14             p_l.append(p)
15             p.start()
16         for p in p_l:
17             p.join()
18         print(dic)
19         #{'count': 94}

进程之间操控共享数据

3. 进程同步(锁)

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的

#多进程共享一个打印终端(用python2测试看两个进程同时往一个终端打印,出现打印到一行的错误)
from multiprocessing import Process
import time
class Logger(Process):
    def __init__(self):
        super(Logger,self).__init__()
    def run(self):
        print(self.name)


for i in range(1000000):
    l=Logger()
    l.start()

#多进程共享一套文件系统
from multiprocessing import Process
import time,random

def work(f,msg):
    f.write(msg)
    f.flush()


f=open('a.txt','w') #在windows上无法把f当做参数传入,可以传入一个文件名,然后在work内用a 的方式打开文件,进行写入测试
for i in range(5):
    p=Process(target=work,args=(f,str(i)))
    p.start()

注: 既然可以用文件共享数据,那么进程间通信用文件作为数据传输介质就可以了啊,可以,但是有问题:

1.效率

2.需要自己加锁处理

需知:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全。

进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理。所以,就让我们用文件当做数据库,模拟抢票,(Lock互斥锁),见下文抢票示例。

学习了通过使用共享的文件的方式,实现进程直接的共享,即共享数据的方式,这种方式必须考虑周全同步、锁等问题。而且文件是操作系统提供的抽象,可以作为进程直接通信的介质,与mutiprocess模块无关。

但其实mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。

IPC机制中的队列又是基于(管道 锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

 

同步和异步

      所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列betway必威官方网站,。

betway必威官方网站 98betway必威官方网站 99

比如我去银行办理业务,可能会有两种方式:
第一种 :选择排队等候;
第二种 :选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了;

第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务情况;

第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)往往注册一个回调机制,在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。

例子

九 信号量(了解)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,
后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。
一旦释放,就有人可以获得一把锁

    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

betway必威官方网站 100betway必威官方网站 101

 1 from multiprocessing import Process,Semaphore
 2 import time,random
 3 
 4 def go_wc(sem,user):
 5     sem.acquire()
 6     print('%s 占到一个茅坑' %user)
 7     time.sleep(random.randint(0,3)) #模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了
 8     sem.release()
 9 
10 if __name__ == '__main__':
11     sem=Semaphore(5)
12     p_l=[]
13     for i in range(13):
14         p=Process(target=go_wc,args=(sem,'user%s' %i,))
15         p.start()
16         p_l.append(p)
17 
18     for i in p_l:
19         i.join()
20     print('============》')

View Code

 十 事件(了解)

betway必威官方网站 102betway必威官方网站 103

 1 python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
 2 
 3     事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
 4 
 5 clear:将“Flag”设置为False
 6 set:将“Flag”设置为True
 7  
 8 
 9 #_*_coding:utf-8_*_
10 #!/usr/bin/env python
11 
12 from multiprocessing import Process,Event
13 import time,random
14 
15 def car(e,n):
16     while True:
17         if not e.is_set(): #Flase
18             print('\033[31m红灯亮\033[0m,car%s等着' %n)
19             e.wait()
20             print('\033[32m车%s 看见绿灯亮了\033[0m' %n)
21             time.sleep(random.randint(3,6))
22             if not e.is_set():
23                 continue
24             print('走你,car', n)
25             break
26 
27 def police_car(e,n):
28     while True:
29         if not e.is_set():
30             print('\033[31m红灯亮\033[0m,car%s等着' % n)
31             e.wait(1)
32             print('灯的是%s,警车走了,car %s' %(e.is_set(),n))
33             break
34 
35 def traffic_lights(e,inverval):
36     while True:
37         time.sleep(inverval)
38         if e.is_set():
39             e.clear() #e.is_set() ---->False
40         else:
41             e.set()
42 
43 if __name__ == '__main__':
44     e=Event()
45     # for i in range(10):
46     #     p=Process(target=car,args=(e,i,))
47     #     p.start()
48 
49     for i in range(5):
50         p = Process(target=police_car, args=(e, i,))
51         p.start()
52     t=Process(target=traffic_lights,args=(e,10))
53     t.start()
54 
55     print('============》')

Event(同线程一样)

1.4 进程间通信(IPC)方式一:队列

 进程彼此之间互相隔离,要实现进程间通信,即IPC,multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的,广泛应用在分布式系统中。

Queue模块有三种队列及构造函数:
  1. Python Queue模块的FIFO队列先进先出。 class Queue.Queue(maxsize)
  2. LIFO类似于堆,即先进后出。 class Queue.LifoQueue(maxsize)
  3. 还有一种是优先级队列级别越低越先出来。 class Queue.PriorityQueue(maxsize)

阻塞与非阻塞

 阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

betway必威官方网站 104betway必威官方网站 105

继续上面的那个例子,不论是排队还是使用号码等待通知,如果在这个等待的过程中,等待者除了等待消息通知之外不能做其它的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。
相反,有的人喜欢在银行办理这些业务的时候一边打打电话发发短信一边等待,这样的状态就是非阻塞的,因为他(等待者)没有阻塞在这个消息通知上,而是一边做自己的事情一边等待。

注意:同步非阻塞形式实际上是效率低下的,想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有。如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的;而异步非阻塞形式却没有这样的问题,因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

例子

十一 进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

  1. 很明显需要并发执行的任务通常要远大于核数
  2. 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
  3. 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数...
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

    创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

1 Pool([numprocess  [,initializer [, initargs]]]):创建进程池 

    参数介绍:

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

  方法介绍:

    主要方法:

1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
3    
4 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

   其他方法(了解部分)

 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。
实例具有以下方法 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。
如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
 obj.ready():如果调用完成,返回True obj.successful():如果调用完成且没有引发异常,返回True,
如果在结果就绪之前调用此方法,引发异常 obj.wait([timeout]):等待结果变为可用。 
obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。
如果p被垃圾回收,将自动调用此函数

    应用:

betway必威官方网站 106betway必威官方网站 107

 1 from multiprocessing import Pool
 2 import os,time
 3 def work(n):
 4     print('%s run' %os.getpid())
 5     time.sleep(3)
 6     return n**2
 7 
 8 if __name__ == '__main__':
 9     p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
10     res_l=[]
11     for i in range(10):
12         res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限
13         res_l.append(res)
14     print(res_l)

异步调用apply

betway必威官方网站 108betway必威官方网站 109

 1 from multiprocessing import Pool
 2 import os,time
 3 def work(n):
 4     print('%s run' %os.getpid())
 5     time.sleep(3)
 6     return n**2
 7 
 8 if __name__ == '__main__':
 9     p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
10     res_l=[]
11     for i in range(10):
12         res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
13         res_l.append(res)
14 
15     #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
16     p.close()
17     p.join()
18     for res in res_l:
19         print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

异步调用apply_async

betway必威官方网站 110betway必威官方网站 111

 1 #一:使用进程池(异步调用,apply_async)
 2 #coding: utf-8
 3 from multiprocessing import Process,Pool
 4 import time
 5 
 6 def func(msg):
 7     print( "msg:", msg)
 8     time.sleep(1)
 9     return msg
10 
11 if __name__ == "__main__":
12     pool = Pool(processes = 3)
13     res_l=[]
14     for i in range(10):
15         msg = "hello %d" %(i)
16         res=pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
17         res_l.append(res)
18     print("==============================>") #没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了
19 
20     pool.close() #关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
21     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
22 
23     print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
24     for i in res_l:
25         print(i.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
26 
27 #二:使用进程池(同步调用,apply)
28 #coding: utf-8
29 from multiprocessing import Process,Pool
30 import time
31 
32 def func(msg):
33     print( "msg:", msg)
34     time.sleep(0.1)
35     return msg
36 
37 if __name__ == "__main__":
38     pool = Pool(processes = 3)
39     res_l=[]
40     for i in range(10):
41         msg = "hello %d" %(i)
42         res=pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
43         res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
44     print("==============================>")
45     pool.close()
46     pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
47 
48     print(res_l) #看到的就是最终的结果组成的列表
49     for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法

详解:apply_async与apply

练习2:使用进程池维护固定数目的进程(重写练习1)

betway必威官方网站 112betway必威官方网站 113

 1 #Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
 2 #开启6个客户端,会发现2个客户端处于等待状态
 3 #在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
 4 from socket import *
 5 from multiprocessing import Pool
 6 import os
 7 
 8 server=socket(AF_INET,SOCK_STREAM)
 9 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
10 server.bind(('127.0.0.1',8080))
11 server.listen(5)
12 
13 def talk(conn,client_addr):
14     print('进程pid: %s' %os.getpid())
15     while True:
16         try:
17             msg=conn.recv(1024)
18             if not msg:break
19             conn.send(msg.upper())
20         except Exception:
21             break
22 
23 if __name__ == '__main__':
24     p=Pool()
25     while True:
26         conn,client_addr=server.accept()
27         p.apply_async(talk,args=(conn,client_addr))
28         # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

服务端

betway必威官方网站 114betway必威官方网站 115

 1 from socket import *
 2 
 3 client=socket(AF_INET,SOCK_STREAM)
 4 client.connect(('127.0.0.1',8080))
 5 
 6 
 7 while True:
 8     msg=input('>>: ').strip()
 9     if not msg:continue
10 
11     client.send(msg.encode('utf-8'))
12     msg=client.recv(1024)
13     print(msg.decode('utf-8'))

客户端

发现:并发开启多个客户端,服务端同一时间只有3个不同的pid,干掉一个客户端,另外一个客户端才会进来,被3个进程之一处理

 

  回掉函数:

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

betway必威官方网站 116betway必威官方网站 117

 1 from multiprocessing import Pool
 2 import requests
 3 import json
 4 import os
 5 
 6 def get_page(url):
 7     print('<进程%s> get %s' %(os.getpid(),url))
 8     respone=requests.get(url)
 9     if respone.status_code == 200:
10         return {'url':url,'text':respone.text}
11 
12 def pasrse_page(res):
13     print('<进程%s> parse %s' %(os.getpid(),res['url']))
14     parse_res='url:<%s> size:[%s]n' %(res['url'],len(res['text']))
15     with open('db.txt','a') as f:
16         f.write(parse_res)
17 
18 
19 if __name__ == '__main__':
20     urls=[
21         'https://www.baidu.com',
22         'https://www.python.org',
23         'https://www.openstack.org',
24         'https://help.github.com/',
25         'http://www.sina.com.cn/'
26     ]
27 
28     p=Pool(3)
29     res_l=[]
30     for url in urls:
31         res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
32         res_l.append(res)
33 
34     p.close()
35     p.join()
36     print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了
37 
38 '''
39 打印结果:
40 <进程3388> get https://www.baidu.com
41 <进程3389> get https://www.python.org
42 <进程3390> get https://www.openstack.org
43 <进程3388> get https://help.github.com/
44 <进程3387> parse https://www.baidu.com
45 <进程3389> get http://www.sina.com.cn/
46 <进程3387> parse https://www.python.org
47 <进程3387> parse https://help.github.com/
48 <进程3387> parse http://www.sina.com.cn/
49 <进程3387> parse https://www.openstack.org
50 [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>rn...',...}]
51 '''

View Code

betway必威官方网站 118betway必威官方网站 119

 1 from multiprocessing import Pool
 2 import time,random
 3 import requests
 4 import re
 5 
 6 def get_page(url,pattern):
 7     response=requests.get(url)
 8     if response.status_code == 200:
 9         return (response.text,pattern)
10 
11 def parse_page(info):
12     page_content,pattern=info
13     res=re.findall(pattern,page_content)
14     for item in res:
15         dic={
16             'index':item[0],
17             'title':item[1],
18             'actor':item[2].strip()[3:],
19             'time':item[3][5:],
20             'score':item[4] item[5]
21 
22         }
23         print(dic)
24 if __name__ == '__main__':
25     pattern1=re.compile(r'<dd>.*?board-index.*?>(d )<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)
26 
27     url_dic={
28         'http://maoyan.com/board/7':pattern1,
29     }
30 
31     p=Pool()
32     res_l=[]
33     for url,pattern in url_dic.items():
34         res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
35         res_l.append(res)
36 
37     for i in res_l:
38         i.get()
39 
40     # res=requests.get('http://maoyan.com/board/7')
41     # print(re.findall(pattern,res.text))

爬虫案例

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

betway必威官方网站 120betway必威官方网站 121

 1 from multiprocessing import Pool
 2 import time,random,os
 3 
 4 def work(n):
 5     time.sleep(1)
 6     return n**2
 7 if __name__ == '__main__':
 8     p=Pool()
 9 
10     res_l=[]
11     for i in range(10):
12         res=p.apply_async(work,args=(i,))
13         res_l.append(res)
14 
15     p.close()
16     p.join() #等待进程池中所有进程执行完毕
17 
18     nums=[]
19     for res in res_l:
20         nums.append(res.get()) #拿到所有结果
21     print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理

View Code

进程池的其他实现方式:https://docs.python.org/dev/library/concurrent.futures.html

 

Queue类(创建队列)

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递,底层是以管道和锁的方式实现的。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。    

方法介绍:

主要方法:

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

其他方法:

q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

应用:

'''
multiprocessing 模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,都是队列接口
'''

from multiprocessing import Process,Queue
import time

q=Queue(5)
q.put([1,2,3])
q.put(('a','b','c'))
q.put(100)
q.put("Hello World")
q.put({'name':'shuke'})
# q.put('队列满了')           # 如果队列元素满了,后续put进入队列的数据将会处于等待状态,直到队列的元素被消费,才可以加入
print(q.qsize())            # 5; 返回队列的大小
print(q.full())             # True

print(q.get())              # [1, 2, 3]
print(q.get())              # ('a', 'b', 'c')
print(q.get())              # 100
print(q.get())              # Hello World
print(q.get())              # {'name': 'shuke'}
# print(q.get())            # 如果队列元素全部被消费完成,会一直卡住,直到队列中被放入新的元素
print(q.empty())            # True

同步/异步与阻塞/非阻塞

  1. 同步阻塞形式

  效率最低。拿上面的例子来说,就是你专心排队,什么别的事都不做。

  1. 异步阻塞形式

  如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行做其它的事情,那么很显然,这个人被阻塞在了这个等待的操作上面;

异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

  1. 同步非阻塞形式

  实际上是效率低下的。

  想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

  1. 异步非阻塞形式

  效率更高,

  因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换

  比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步 非阻塞的方式了。

  

很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来,同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型

betway必威官方网站 122betway必威官方网站 123

from multiprocessing import Process,Queue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))


def consumer(q,name):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        print("%s 消费者消费了: %s"%(name,res))


if __name__ == '__main__':
    q=Queue()
    seq=("苹果%s"% i for i in range(5))

    p=Process(target=consumer,args=(q,'Tom'))       # 以元组的方式传参
    p.start()
    producer(seq,q,'shuke')
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
Tom 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
Tom 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
shuke 生产者生产了: 苹果3
Tom 消费者消费了: 苹果2
shuke 生产者生产了: 苹果4
=====主线程=====
Tom 消费者消费了: 苹果3
Tom 消费者消费了: 苹果4
'''

生产者消费者模型示例(基于队列)

betway必威官方网站 124betway必威官方网站 125

# 生产者发送结束标志给消费者
from multiprocessing import Process,Queue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))


def consumer(q,name):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))


if __name__ == '__main__':
    q=Queue()
    seq=("苹果%s"% i for i in range(5))

    c=Process(target=consumer,args=(q,'Tom'))       # 以元组的方式传参
    c.start()

    producer(seq,q,'shuke')
    q.put(None)
    c.join()    # 主线程等待直到c消费者进程运行结束再继续往下运行
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
Tom 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
Tom 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
Tom 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
Tom 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
Tom 消费者消费了: 苹果4
=====主线程=====
'''

主线程等到消费者结束

进程的创建与结束

JoinableQueue类 (创建队列的另外一个类)

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的消费者通知生产者队列已经被成功处理,通知进程是使用共享的信号和条件变量来实现的。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。

方法介绍:

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

  • q.task_done(): 使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常。
  • q.join(): 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止。

betway必威官方网站 126betway必威官方网站 127

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))
    q.join()            # 生产者调用此方法进行阻塞


def consumer(q,name):
    while True:
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))
        q.task_done()       # 使用者使用此方法发出信号,表示q.get()的返回元素已经被消费处理。

if __name__ == '__main__':
    q=JoinableQueue()
    seq=("苹果%s"% i for i in range(5))

    c=Process(target=consumer,args=(q,'Tom'))       # 以元组的方式传参
    c.daemon=True     # 在start之前进行设置为守护进程,在主线程停止时c也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    c.start()

    producer(seq,q,'shuke')
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
Tom 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
Tom 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
Tom 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
Tom 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
Tom 消费者消费了: 苹果4
=====主线程=====
'''

q.join与q.task_done示例

betway必威官方网站 128betway必威官方网站 129

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))
    q.join()


def consumer(q,name):
    while True:
        time.sleep(random.randint(1, 3))
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))
        q.task_done()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=("苹果%s"% i for i in range(5))

    c1=Process(target=consumer,args=(q,'消费者1'))       # 以元组的方式传参
    c2=Process(target=consumer,args=(q,'消费者2'))
    c3=Process(target=consumer,args=(q,'消费者3'))
    c1.daemon=True     # 在start之前进行设置为守护进程,在主线程停止时c也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    c2.daemon=True
    c3.daemon=True
    c1.start()
    c2.start()
    c3.start()

    producer(seq,q,'shuke')
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
消费者3 消费者消费了: 苹果0
shuke 生产者生产了: 苹果1
消费者1 消费者消费了: 苹果1
shuke 生产者生产了: 苹果2
消费者2 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
消费者1 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
消费者3 消费者消费了: 苹果4
=====主线程=====
'''

一个生产者 多个消费者

betway必威官方网站 130betway必威官方网站 131

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        # time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生产者生产了: %s"%(name,item))
    q.join()


def consumer(q,name):
    while True:
        # time.sleep(random.randint(1, 3))
        res=q.get()
        if res is None:break
        print("%s 消费者消费了: %s"%(name,res))
        q.task_done()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=["苹果%s"% i for i in range(5)]

    c1=Process(target=consumer,args=(q,'消费者1'))       # 以元组的方式传参
    c2=Process(target=consumer,args=(q,'消费者2'))
    c3=Process(target=consumer,args=(q,'消费者3'))
    c1.daemon=True     # 在start之前进行设置为守护进程,在主线程停止时c也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    c2.daemon=True
    c3.daemon=True
    c1.start()
    c2.start()
    c3.start()

    # producer(seq,q,'shuke')     # 也可以是下面三行的形式,开启一个新的子进程当生产者,不用主线程当生产者
    p=Process(target=producer,args=(seq,q,'shuke'))     # 注意此处参数seq为列表
    p.start()
    p.join()
    print("=====主线程=====")

'''
执行结果:
shuke 生产者生产了: 苹果0
shuke 生产者生产了: 苹果1
消费者3 消费者消费了: 苹果0
shuke 生产者生产了: 苹果2
消费者2 消费者消费了: 苹果1
消费者3 消费者消费了: 苹果2
shuke 生产者生产了: 苹果3
消费者2 消费者消费了: 苹果3
shuke 生产者生产了: 苹果4
消费者3 消费者消费了: 苹果4
=====主线程=====
'''

开启一个子进程当作生产者而不是主线程

 

进程的创建

  但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

  而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程:

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

  2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

  3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)

  4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

  无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。  

betway必威官方网站 132betway必威官方网站 133

1. 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)

  2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。

  关于创建子进程,UNIX和windows

  1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。

  2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。

创建进程

1.5 进程间通信(IPC)方式二:管道(了解部分)

管道也可以说是队列的另外一种形式,下面我们就开始介绍基于管道实现进程之间的消息传递

进程的结束

  1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出错退出(自愿,python a.py中a.py不存在)

  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)

  4. 被其他进程杀死(非自愿,如kill -9)

Pipe类(创建管道)

Pipe([duplex]): 在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

参数介绍:

dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

方法介绍:

主要方法:

  • conn1.recv(): 接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
  • conn1.send(obj): 通过连接发送对象。obj是与序列化兼容的任意对象。

其他方法:

conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法。
conn1.fileno():返回连接使用的整数文件描述符。
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

基于管道实现进程间通信 (与队列的方式是类似的,队列就是管道加锁实现的)

from multiprocessing import Process,Pipe
import time

def consumer(p,name):
    left,right = p
    left.close()
    while True:
        try:
            fruit = right.recv()
            print("%s 收到水果: %s" % (name,fruit))
        except EOFError:
            right.close()
            break

def producer(seq,p):
    left,right = p
    right.close()
    for item in seq:
        left.send(item)
    else:
        left.close()

if __name__ == '__main__':
    left,right = Pipe()
    c1=Process(target=consumer,args=((left,right),'Tom'))
    c1.start()

    seq=(i for i in range(5))
    producer(seq,(left,right))
    right.close()
    left.close()

    c1.join()
    print("===主线程===")

'''
执行结果:
Tom 收到水果: 0
Tom 收到水果: 1
Tom 收到水果: 2
Tom 收到水果: 3
Tom 收到水果: 4
===主线程===
'''

 注: 生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果,除非消费者中也关闭了相同的管道端点。

管道可以用于双向通信,通常利用在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,如下:

betway必威官方网站 134betway必威官方网站 135

from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主进程')

示例

注: send()和recv()方法使用pickle模块对对象进行序列化。

 

在python程序中的进程操作

  之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块。

1.6 进程间通信方式三:共享数据

展望未来,基于消息传递的并发编程是大势所趋,即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。

注: 进程间通信应该尽量避免使用本节所讲的共享数据的方式

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的,虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此。

betway必威官方网站 136betway必威官方网站 137

from multiprocessing import Process,Manager
import os

def foo(name,d,l):
    l.append(os.getpid())
    d[name]=os.getpid()
if __name__ == '__main__':
    with Manager() as manager:
        d=manager.dict({'name':'shuke'})
        l=manager.list(['init',])

        p_l=[]
        for i in range(5):
            p=Process(target=foo,args=('p%s' %i,d,l))
            p.start()
            p_l.append(p)

        for p in p_l:
            p.join() #必须有join不然会报错

        print(d)
        print(l)
'''
执行结果:
{'p0': 62792, 'p4': 63472, 'name': 'shuke', 'p1': 60336, 'p3': 62704, 'p2': 63196}
['init', 60336, 62704, 62792, 63196, 63472]
'''

示例

 

multiprocess模块

仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

1.7 进程同步(锁),信号量,事件...

模拟抢票(Lock-->互斥锁)

# 文件db的内容为:{"count":1}
# 注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #买票
    # lock.acquire()
    with lock:      # with语法下面的代码块执行完毕会自动释放锁
        with open(filename,encoding='utf-8') as f:
            dic=json.loads(f.read())
            # print('剩余票数: %s' % dic['count'])
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模拟网络延迟
            with open(filename,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('%s 购票成功' %os.getpid())
        else:
            print('%s 购票失败' %os.getpid())
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(5):
        p=Process(target=work,args=('db',lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print('主线程')

'''
执行结果:
63448 购票成功
13676 购票失败
61668 购票失败
63544 购票失败
17816 购票失败
主线程
'''

betway必威官方网站 138betway必威官方网站 139

#互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁

#信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 占到一个茅坑' %user)
    time.sleep(random.randint(0,3)) #模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

信号量Semahpore(同线程一样)

betway必威官方网站 140betway必威官方网站 141

# python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
# 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True

#_*_coding:utf-8_*_
#!/usr/bin/env python

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m红灯亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m车%s 看见绿灯亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m红灯亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('灯的是%s,警车走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')

Event(同线程一样)

 

本文由betway必威发布于编程开发,转载请注明出处:betway必威官方网站:python并发编程之进程,进程

TAG标签: betway必威
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。