查看: 367|回复: 0

Python_进程

[复制链接]
发表于 2020-3-8 10:26:53 | 显示全部楼层 |阅读模式
<div id="cnblogs_post_body" h1理论知识/h1h2操纵系统配景知识/h2pstrong顾名思义,历程即正在执行的一个过程。历程是对正在运行程序的一个抽象。/strong/ppstrong历程的概念起源于操纵系统,是操纵系统最核心的概念,也是操纵系统提供的最古老也是最紧张的抽象概念之一。操纵系统的其他所有内容都是围绕历程的概念展开的。/strong/ppstrong所以想要真正相识历程,必须事先相识操纵系统,a href="http://www.cnblogs.com/Eva-J/articles/8253521.html" target="_blank"点击进入/a    /strong/ppPS:即使可以利用的cpu只有一个(早期的盘算机确实云云),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个假造的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有历程的抽象,现代盘算机将不复存在。/ppstrong必备的理论基础:/strong/pa  title="复制代码"
img src="https://common.cnblogs.com/images/copycode.gif"
/a
  1. #一 操纵系统的作用:    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口    2:管理、调度历程,并且将多个历程对硬件的竞争变得有序#二 多道技术:    1.产生配景:针对单核,实现并发    ps:    现在的主机一般是多核,那么每个核都会利用多道技术    有4个cpu,运行于cpu1的某个程序碰到io阻塞,会等到io竣事再重新调度,会被调度到4个    cpu中的任意一个,详细由操纵系统调度算法决定。        2.空间上的复用:如内存中同时有多道程序    3.时间上的复用:复用一个cpu的时间片       夸大:碰到io切,占用cpu时间过长也切,核心在于切之前将历程的状态保存下来,如许            才气保证下次切换回来时,能基于上次切走的位置继续运行
复制代码
a  title="复制代码"
img src="https://common.cnblogs.com/images/copycode.gif"
/ah2什么是历程/h2p历程(Process)是盘算机中的程序关于某数据聚集上的一次运行运动,是系统进行资源分配和调度的基本单元,是a href="https://baike.baidu.com/item/%E6%93%8D%E4%BD%9C%E7%B3%BB%E7%BB%9F" target="_blank"操纵系统/a布局的基础。在早期面向历程计划的盘算机布局中,历程是程序的基本执行实体;在今世面向线程计划的盘算机布局中,历程是线程的容器。程序是指令、数据及其组织形式的描述,历程是程序的实体。/p狭义定义:历程是正在运行的程序的实例(an instance of a computer program that is being executed)。广义定义:历程是一个具有一定独建功能的程序关于某个数据聚集的一次运行运动。它是a href="https://baike.baidu.com/item/%E6%93%8D%E4%BD%9C%E7%B3%BB%E7%BB%9F/192" target="_blank" data-lemmaid="192"操纵系统/a动态执行的a href="https://baike.baidu.com/item/%E5%9F%BA%E6%9C%AC%E5%8D%95%E5%85%83" target="_blank"基本单元/a,在传统的a href="https://baike.baidu.com/item/%E6%93%8D%E4%BD%9C%E7%B3%BB%E7%BB%9F" target="_blank"操纵系统/a中,历程既是基本的a href="https://baike.baidu.com/item/%E5%88%86%E9%85%8D%E5%8D%95%E5%85%83" target="_blank"分配单元/a,也是基本的执行单元。
img id="code_img_closed_764034dc-2bee-4e80-b05b-c1120edeb455"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_764034dc-2bee-4e80-b05b-c1120edeb455"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 第一,历程是一个实体。每一个历程都有它自己的地点空间,一般情况下,包罗文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理惩罚器执行的代码;数据区域存储变量和历程执行期间使用的动态分配的内存;堆栈区域存储着运动过程调用的指令和当地变量。第二,历程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理惩罚器赋予程序生命时(操纵系统执行之),它才气成为一个运动的实体,我们称其为历程。[3] 历程是操纵系统中最基本、紧张的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的运动规律引进的一个概念,所有多道程序计划操纵系统都创建在历程的基础上。
复制代码
历程的概念
img id="code_img_closed_95dedc7a-44d5-4dde-8a2b-f49db47e59af"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_95dedc7a-44d5-4dde-8a2b-f49db47e59af"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 从理论角度看,是对正在运行的程序过程的抽象;从实现角度看,是一种数据布局,目的在于清晰地刻画动态系统的内在规律,有效管理和调度进入盘算机系统主存储器运行的程序。
复制代码
操纵系统引入历程的概念的缘故原由
img id="code_img_closed_95ff71bd-bbe9-4972-a9ec-c7db6ae53893"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_95ff71bd-bbe9-4972-a9ec-c7db6ae53893"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 动态性:历程的实质是程序在多道程序系统中的一次执行过程,历程是动态产生,动态消亡的。并发性:任何历程都可以同其他历程一起并发执行独立性:历程是一个能独立运行的基本单元,同时也是系统分配资源和调度的独立单元;异步性:由于历程间的相互制约,使历程具有执行的间断性,即历程按各自独立的、不可预知的速率向前推进布局特征:历程由程序、数据和历程控制块三部分组成。多个不同的历程可以包罗相同的程序:一个程序在不同的数据集里就构成不同的历程,能得到不同的结果;但是执行过程中,程序不能发生改变。
复制代码
历程的特征
img id="code_img_closed_415fb02a-730a-48cb-9bcf-62d8b286ac77"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_415fb02a-730a-48cb-9bcf-62d8b286ac77"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 程序是指令和数据的有序聚集,其本身没有任何运行的含义,是一个静态的概念。而历程是程序在处理惩罚机上的一次执行过程,它是一个动态的概念。程序可以作为一种软件资料长期存在,而历程是有一定生命期的。程序是永久的,历程是暂时的。
复制代码
历程与程序中的区别pstrong注意:同一个程序执行两次,就会在操纵系统中出现两个历程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。/strong/ph2历程调度/h2p要想多个历程交替运行,操纵系统必须对这些历程进行调度,这个调度也不是随即进行的,而是须要遵循一定的法则,由此就有了历程的调度算法。/p
img id="code_img_closed_2f08d698-a87d-4629-9287-44976e66da81"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_2f08d698-a87d-4629-9287-44976e66da81"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可用于作业调度,也可用于历程调度。FCFS算法比较有利于长作业(历程),而不利于短作业(历程)。由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(历程)。
复制代码
先来先服务调度算法
img id="code_img_closed_6aff43dd-e7ad-46ba-9d6f-f31c6a4f309f"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_6aff43dd-e7ad-46ba-9d6f-f31c6a4f309f"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 短作业(历程)优先调度算法(SJ/PF)是指对短作业或短历程优先调度的算法,该算法既可用于作业调度,也可用于历程调度。但其对长作业不利;不能保证紧急性作业(历程)被及时处理惩罚;作业的是非只是被估算出来的。
复制代码
短作业优先调度算法
img id="code_img_closed_f94f0589-8d79-4d27-8e88-78019a8dadc2"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_f94f0589-8d79-4d27-8e88-78019a8dadc2"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1.       时间片轮转(Round Robin,RR)法的基本思路是让每个历程在就绪队列中的等候时间与享受服务的时间成比例。在时间片轮转法中,须要将CPU的处理惩罚时间分成固定巨细的时间片,比方,几十毫秒至几百毫秒。假如一个历程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占据的CPU而排到就绪队列的末尾,等候下一次调度。同时,历程调度程序又去调度当前就绪队列中的第一个历程。      显然,轮转法只能用来调度分配一些可以抢占的资源。这些可以抢占的资源可以随时被剥夺,而且可以将它们再分配给别的历程。CPU是可抢占资源的一种。但打印机等资源是不可抢占的。由于作业调度是对除了CPU之外的所有系统硬件资源的分配,其中包罗有不可抢占资源,所以作业调度不使用轮转法。在轮转法中,时间片长度的选取非常紧张。首先,时间片长度的选择会直接影响到系统的开销和响应时间。假如时间片长度过短,则调度程序抢占处理惩罚机的次数增多。这将使历程上下文切换次数也大大增加,从而加重系统开销。反过来,假如时间片长度选择过长,比方,一个时间片能保证就绪队列中所需执行时间最长的历程能执行完毕,则轮转法变成了先来先服务法。时间片长度的选择是根据系统对响应时间的要求和就绪队列中所允许最大的历程数来确定的。      在轮转法中,参加到就绪队列的历程有3种情况:      一种是分给它的时间片用完,但历程还未完成,回到就绪队列的末尾等候下次调度去继续执行。      另一种情况是分给该历程的时间片并未用完,只是因为请求I/O或由于历程的互斥与同步关系而被阻塞。当阻塞解除之后再回到就绪队列。      第三种情况就是新创建历程进入就绪队列。      假如对这些历程区别对待,给予不同的优先级和时间片从直观上看,可以进一步改善系统服务质量和服从。比方,我们可把就绪队列按照历程到达就绪队列的范例和历程被阻塞时的阻塞缘故原由分成不同的就绪队列,每个队列按FCFS原则排列,各队列之间的历程享有不同的优先级,但同一队列内优先级相同。如许,当一个历程在执行完它的时间片之后,或从就寝中被唤醒以及被创建之后,将进入不同的就绪队列。  
复制代码
时间片轮转法
img id="code_img_closed_a11cb25e-0039-41f4-a5a7-f8fdbb205b04"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_a11cb25e-0039-41f4-a5a7-f8fdbb205b04"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 前面先容的各种用作历程调度的算法都有一定的范围性。如短历程优先的调度算法,仅照顾了短历程而忽略了长历程,而且假如并未指明历程的长度,则短历程优先和基于历程长度的抢占式调度算法都将无法使用。而多级反馈队列调度算法则不必事先知道各种历程所需的执行时间,而且还可以满足各种范例历程的须要,因而它是目前被公认的一种较好的历程调度算法。在采用多级反馈队列调度算法的系统中,调度算法的实行过程如下所述。(1) 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个低落。该算法赋予各个队列中历程执行时间片的巨细也各不相同,在优先权愈高的队列中,为每个历程所规定的执行时间片就愈小。比方,第二个队列的时间片要比第一个队列的时间片长一倍,……,第i+1个队列的时间片要比第i个队列的时间片长一倍。(2) 当一个新历程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等候调度。当轮到该历程执行时,如它能在该时间片内完成,便可准备撤离系统;假如它在一个时间片竣事时尚未完成,调度程序便将该历程转入第二队列的末尾,再同样地按FCFS原则等候调度执行;假如它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,云云下去,当一个长作业(历程)从第一队列依次降到第n队列后,在第n 队列便采取按时间片轮转的方式运行。(3) 仅当第一队列空闲时,调度程序才调度第二队列中的历程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的历程运行。假如处理惩罚机正在第i队列中为某历程服务时,又有新历程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新历程将抢占正在运行历程的处理惩罚机,即由调度程序把正在运行的历程放回到第i队列的末尾,把处理惩罚机分配给新到的高优先权历程。
复制代码
多级反馈队列h2 历程的并行与并发/h2pstrong并行/strong : 并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )/ppstrong并发/strong : 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高服从。/ppstrong区别/strong:/pp strong并行/strong是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理惩罚器。brstrong并发/strong是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理惩罚多个session。/ph2同步异步阻塞非阻塞/h2h3状态先容/h3p
img src="https://images2017.cnblogs.com/blog/827651/201801/827651-20180110201327535-1120359184.png"
/pp  在相识其他概念之前,我们首先要相识历程的几个状态。在程序运行的过程中,由于被操纵系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。/pp  (1)就绪(Ready)状态/pp  当历程已分配到除CPU以外的所有须要的资源,只要获得处理惩罚机便可立刻执行,这时的历程状态称为就绪状态。/pp  (2)执行/运行(Running)状态当历程已获得处理惩罚机,其程序正在处理惩罚机上执行,此时的历程状态称为执行状态。/pp  (3)阻塞(Blocked)状态正在执行的历程,由于等候某个事件发生而无法执行时,便放弃处理惩罚机而处于阻塞状态。引起历程阻塞的事件可有多种,比方,等候I/O完成、申请缓冲区不能满足、等候信件(信号)等。/pp      
img src="https://images2017.cnblogs.com/blog/827651/201801/827651-20180110204322676-135915799.png"  width="410" height="229"
/ph3同步和异步/h3p      code所谓同步就是一个任务的完成须要依赖别的一个任务时,只有等候被依赖的任务完成后,依赖的任务才气算完成,这是一种可靠的任务序列/code。要么成功都成功,失败都失败,两个任务的状态可以保持一致。/ppcode  所谓异步是不须要等候被依赖的任务完成,只是关照被依赖的任务要完成什么工作,依赖的任务也立刻执行,只要自己完成了整个任务就算完成了/code。至于被依赖的任务终极是否真正完成,依赖它的任务无法确定,code所以它是不可靠的任务序列/code。/p
img id="code_img_closed_1d0d0fbc-9f49-469b-83cb-2eb03f589544"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_1d0d0fbc-9f49-469b-83cb-2eb03f589544"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 比如我去银行办理业务,可能会有两种方式:第一种 :选择排队等候;第二种 :选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人关照我轮到我去办理业务了;第一种:前者(排队等候)就是同步等候消息关照,也就是我要不停在等候银行办理业务情况;第二种:后者(等候别人关照)就是异步等候消息关照。在异步消息处理惩罚中,等候消息关照者(在这个例子中就是等候办理业务的人)往往注册一个回调机制,在所等候的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等候该事件的人。
复制代码
例子h3阻塞与非阻塞/h3p      code阻塞和非阻塞这两个概念与程序(线程)等候消息关照(无所谓同步大概异步)时的状态有关。也就是说阻塞与非阻塞重要是程序(线程)等候消息关照时的状态角度来说的/code/p
img id="code_img_closed_a5654f9b-c056-439c-a48f-97176cc93174"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_a5654f9b-c056-439c-a48f-97176cc93174"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 继续上面的那个例子,岂论是排队还是使用号码等候关照,假如在这个等候的过程中,等候者除了等候消息关照之外不能做别的的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序不停阻塞在该函数调用处不能继续往下执行。相反,有的人喜欢在银行办理这些业务的时候一边打打电话发短信一边等候,如许的状态就是非阻塞的,因为他(等候者)没有阻塞在这个消息关照上,而是一边做自己的事情一边等候。注意:同步非阻塞形式实际上是服从低下的,想象一下你一边打着电话一边还须要抬头看到底队伍排到你了没有。假如把打电话和观察排队的位置看成是程序的两个操纵的话,这个程序须要在这两种不同的行为之间来回的切换,服从可想而知是低下的;而异步非阻塞形式却没有如许的问题,因为打电话是你(等候者)的事情,而关照你则是柜台(消息触发机制)的事情,程序没有在两种不同的操纵中来回切换。
复制代码
例子h3同步/异步与阻塞/非阻塞/h3ollistrong同步阻塞形式/strong/li/olp  服从最低。拿上面的例子来说,就是你专心排队,什么别的事都不做。/pol start="2"listrong异步阻塞形式/strong/li/olp  假如在银行等候办理业务的人code采用的是异步的方式去等候消息被触发(关照)/code,也就是领了一张小纸条,假如在这段时间里他不能离开银行做别的的事情,那么很显然,这个人被阻塞在了这个等候的操纵上面;/ppstrongcode  异步操纵是可以被阻塞住的,只不过它不是在处理惩罚消息时阻塞,而是在等候消息关照时被阻塞。/code/strong/pol start="3"listrong同步非阻塞形式/strong/li/olp  实际上是服从低下的。/pp  想象一下你一边打着电话一边还须要抬头看到底队伍排到你了没有,假如把打电话和观察排队的位置看成是程序的两个操纵的话,code这个程序须要在这两种不同的行为之间来回的切换/code,服从可想而知是低下的。/pol start="4"listrong异步非阻塞形式/strong/li/olp  服从更高,/pp  因为打电话是你(等候者)的事情,而关照你则是柜台(消息触发机制)的事情,code程序没有在两种不同的操纵中来回切换/code。/pp  比如说,这个人突然发觉自己烟瘾犯了,须要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面关照我一下,那么他就没有被阻塞在这个等候的操纵上面,自然这个就是异步+非阻塞的方式了。/pp  /pp很多人会把同步和阻塞肴杂,是code因为很多时候同步操纵会以阻塞的形式表现出来/code,同样的,很多人也会把异步和非阻塞肴杂,code因为异步操纵一般都不会在真正的IO操纵处被阻塞/code。/ph2历程的创建与竣事/h2h3历程的创建/h3p  但凡是硬件,都须要有操纵系统去管理,只要有操纵系统,就有历程的概念,就须要有创建历程的方式,一些操纵系统只为一个应用程序计划,比如微波炉中的控制器,一旦启动微波炉,所有的历程都已经存在。/pp  而对于通用系统(跑很多应用程序),须要有系统运行过程中创建或撤销历程的能力,重要分为4中形式创建新的历程:/pp  1. 系统初始化(查看历程linux中用ps命令,Windows中用任务管理器,前台历程负责与用户交互,后台运行的历程与用户无关,运行在后台并且只在须要时才唤醒的历程,称为守护历程,如电子邮件、web页面、消息、打印)/pp  2. 一个历程在运行过程中开启了子历程(如nginx开启多历程,os.fork,subprocess.Popen等)/pp  3. 用户的交互式请求,而创建一个新历程(如用户双击暴风影音)/pp  4. 一个批处理惩罚作业的初始化(只在大型机的批处理惩罚系统中应用)/pp  无论哪一种,新历程的创建都是由一个已经存在的历程执行了一个用于创建历程的系统调用而创建的。  /p
img id="code_img_closed_139413bd-2e81-485f-8568-725b8687b518"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_139413bd-2e81-485f-8568-725b8687b518"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 1. 在UNIX中该系统调用是:fork,fork会创建一个与父历程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器历程中,执行一个命令就会创建一个子历程)  2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理惩罚历程的创建,也负责把正确的程序装入新历程。  关于创建子历程,UNIX和windows  1.相同的是:历程创建后,父历程和子历程有各自不同的地点空间(多道技术要求物理层面实现历程之间内存的隔离),任何一个历程的在其地点空间中的修改都不会影响到别的一个历程。  2.不同的是:在UNIX中,子历程的初始地点空间是父历程的一个副本,提示:子历程和父历程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父历程与子历程的地点空间就是不同的。
复制代码
创建历程h3历程的竣事/h3p  1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)/pp  2. 出错退出(自愿,python a.py中a.py不存在)/pp  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉非常,try...except...)/pp  4. 被其他历程杀死(非自愿,如kill -9)/ph1在python程序中的历程操纵/h1p  之前我们已经相识了很多历程相关的理论知识,相识历程是什么应该不再困难了,刚刚我们已经相识了,运行中的程序就是一个历程。所有的历程都是通过它的父历程来创建的。因此,运行起来的python程序也是一个历程,那么我们也可以在程序中再创建历程。多个历程可以实现并发效果,也就是说,当我们的程序中存在多个历程的时候,在某些时候,就会让程序的执行速率变快。以我们之前所学的知识,并不能实现创建历程这个功能,所以我们就须要借助python中强大的模块。/ph2multiprocess模块/h2p      仔细说来,multiprocess不是一个模块而是python中一个操纵、管理历程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包罗了和历程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大抵分为四个部分:创建历程部分,历程同步部分,历程池部分,历程之间数据共享。/ph2multiprocess.process模块/h2h3process模块先容/h3pprocess模块是一个创建历程的模块,借助这个模块,就可以完成历程的创建。/pa  title="复制代码"
img src="https://common.cnblogs.com/images/copycode.gif"
/a
  1. Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子历程中的任务(尚未启动)夸大:1. 须要使用关键字的方式来指定参数2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号参数先容:1 group参数未使用,值始终为None2 target表示调用对象,即子历程要执行的任务3 args表示调用对象的位置参数元组,args=(1,2,'egon',)4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}5 name为子历程的名称
复制代码
a  title="复制代码"
img src="https://common.cnblogs.com/images/copycode.gif"
/a
img id="code_img_closed_1cb4b589-a5e3-43f2-9311-d867d7d74ce4"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_1cb4b589-a5e3-43f2-9311-d867d7d74ce4"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 1 p.start():启动历程,并调用该子历程中的p.run() 2 p.run():历程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  3 p.terminate():强制终止历程p,不会进行任何清算操纵,假如p创建了子历程,该子历程就成了僵尸历程,使用该方法须要特别警惕这种情况。假如p还保存了一个锁那么也将不会被释放,进而导致死锁4 p.is_alive():假如p仍旧运行,返回True5 p.join([timeout]):主线程等候p终止(夸大:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要夸大的是,p.join只能join住start开启的历程,而不能join住run开启的历程  
复制代码
方法先容
img id="code_img_closed_4dfdd10a-bc1e-4077-a501-ad810f62d163"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_4dfdd10a-bc1e-4077-a501-ad810f62d163"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 1 p.daemon:默认值为false,假如设为True,代表p为后台运行的守护历程,当p的父历程终止时,p也随之终止,并且设定为True后,p不能创建自己的新历程,必须在p.start()之前设置2 p.name:历程的名称3 p.pid:历程的pid4 p.exitcode:历程在运行时为None、假如为–N,表示被信号N竣事(相识即可)5 p.authkey:历程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络毗连的底层历程间通信提供安全性,这类毗连只有在具有相同的身份验证键时才气成功(相识即可)
复制代码
属性先容
img id="code_img_closed_6b179262-caf0-4156-98b1-6bd9d53c4f46"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_6b179262-caf0-4156-98b1-6bd9d53c4f46"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 在Windows操纵系统中由于没有fork(linux操纵系统中创建历程的机制),在创建子历程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此假如将process()直接写在文件中就会无限递归创建子历程报错。所以必须把创建子历程的部分使用if __name__ ==‘__main__’ 判定掩护起来,import 的时候  ,就不会递归运行了。
复制代码
在windows中使用process模块的注意事项p /ph3使用process模块创建历程/h3p在一个python历程中开启子历程,start方法和并发效果。/p
img id="code_img_closed_9d9aeb36-0c6f-4044-8a0a-c3287928b129"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_9d9aeb36-0c6f-4044-8a0a-c3287928b129"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
复制代码
在python中启动的第一个子历程
img id="code_img_closed_6179182c-aec0-40e7-a081-78044e91e0fb"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_6179182c-aec0-40e7-a081-78044e91e0fb"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import timefrom multiprocessing import Processdef f(name):    print('hello', name)    time.sleep(1)    print('我是子历程')if __name__ == '__main__':    p = Process(target=f, args=('bob',))    p.start()    #p.join()    print('我是父历程')
复制代码
join方法
img id="code_img_closed_d03a49d1-0113-46da-b855-61d09f8abd63"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_d03a49d1-0113-46da-b855-61d09f8abd63"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import osfrom multiprocessing import Processdef f(x):    print('子历程id :',os.getpid(),'父历程id :',os.getppid())    return x*xif __name__ == '__main__':    print('主历程id :', os.getpid())    p_lst = []    for i in range(5):        p = Process(target=f, args=(i,))        p.start()
复制代码
查看主历程和子历程的历程号p /pp进阶,多个历程同时运行(注意,子历程的执行顺序不是根据启动顺序决定的)/p
img id="code_img_closed_4c4766c2-561b-4bef-bbdf-e5c06d7819da"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_4c4766c2-561b-4bef-bbdf-e5c06d7819da"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import timefrom multiprocessing import Processdef f(name):    print('hello', name)    time.sleep(1)if __name__ == '__main__':    p_lst = []    for i in range(5):        p = Process(target=f, args=('bob',))        p.start()        p_lst.append(p)
复制代码
多个历程同时运行
img id="code_img_closed_b062396c-c589-47b2-9a6a-041834d599f9"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_b062396c-c589-47b2-9a6a-041834d599f9"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import timefrom multiprocessing import Processdef f(name):    print('hello', name)    time.sleep(1)if __name__ == '__main__':    p_lst = []    for i in range(5):        p = Process(target=f, args=('bob',))        p.start()        p_lst.append(p)        p.join()    # [p.join() for p in p_lst]    print('父历程在执行')
复制代码
多个历程同时运行,再谈join方法(1)
img id="code_img_closed_e990f7bb-ab2a-4301-87a2-a5f8d49f5b9d"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_e990f7bb-ab2a-4301-87a2-a5f8d49f5b9d"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import timefrom multiprocessing import Processdef f(name):    print('hello', name)    time.sleep(1)if __name__ == '__main__':    p_lst = []    for i in range(5):        p = Process(target=f, args=('bob',))        p.start()        p_lst.append(p)    # [p.join() for p in p_lst]    print('父历程在执行')
复制代码
多个历程同时运行,再谈join方法(2)p /pp除了上面这些开启历程的方法,还有一种以继承Process类的形式开启历程的方式/p
img id="code_img_closed_d0febf1e-8874-42b8-be5f-c0b297fe7861"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_d0febf1e-8874-42b8-be5f-c0b297fe7861"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import osfrom multiprocessing import Processclass MyProcess(Process):    def __init__(self,name):        super().__init__()        self.name=name    def run(self):        print(os.getpid())        print('%s 正在和女主播谈天' %self.name)p1=MyProcess('wupeiqi')p2=MyProcess('yuanhao')p3=MyProcess('nezha')p1.start() #start会自动调用runp2.start()# p2.run()p3.start()p1.join()p2.join()p3.join()print('主线程')
复制代码
通过继承Process类开启历程p /pp历程之间的数据隔离问题/p
img id="code_img_closed_dd766d96-d4ad-4195-8d31-958c63ed90ae"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_dd766d96-d4ad-4195-8d31-958c63ed90ae"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. from multiprocessing import Processdef work():    global n    n=0    print('子历程内: ',n)if __name__ == '__main__':    n = 100    p=Process(target=work)    p.start()    print('主历程内: ',n)
复制代码
历程之间的数据隔离问题h3守护历程/h3p会随着主历程的竣事而竣事。/pp主历程创建守护历程/pp  其一:守护历程会在主历程代码执行竣事后就终止/pp  其二:守护历程内无法再开启子历程,否则抛出非常:AssertionError: daemonic processes are not allowed to have children/pp注意:历程之间是互相独立的,主历程代码运行竣事,守护历程随即终止/p
img id="code_img_closed_a6512635-cccd-4ca5-b461-fe335292a840"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_a6512635-cccd-4ca5-b461-fe335292a840"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import osimport timefrom multiprocessing import Processclass Myprocess(Process):    def __init__(self,person):        super().__init__()        self.person = person    def run(self):        print(os.getpid(),self.name)        print('%s正在和女主播谈天' %self.person)p=Myprocess('哪吒')p.daemon=True #一定要在p.start()前设置,设置p为守护历程,禁止p创建子历程,并且父历程代码执行竣事,p即终止运行p.start()time.sleep(10) # 在sleep时查看历程id对应的历程ps -ef|grep idprint('主')
复制代码
守护历程的启动
img id="code_img_closed_383206cd-0374-4f0d-804c-ebb4b50e46b8"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_383206cd-0374-4f0d-804c-ebb4b50e46b8"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. from multiprocessing import Processdef foo():    print(123)    time.sleep(1)    print("end123")def bar():    print(456)    time.sleep(3)    print("end456")p1=Process(target=foo)p2=Process(target=bar)p1.daemon=Truep1.start()p2.start()time.sleep(0.1)print("main-------")#打印该行则主历程代码竣事,则守护历程p1应该被终止.#可能会有p1任务执行的打印信息123,因为主历程打印main----时,p1也执行了,但是随即被终止.
复制代码
主历程代码执行竣事守护历程立刻竣事h3socket谈天并发实例/h3
img id="code_img_closed_7b2b323a-30bc-40d9-9cc4-0ebce3f3080a"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_7b2b323a-30bc-40d9-9cc4-0ebce3f3080a"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. from socket import *from multiprocessing import Processserver=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)server.bind(('127.0.0.1',8080))server.listen(5)def talk(conn,client_addr):    while True:        try:            msg=conn.recv(1024)            if not msg:break            conn.send(msg.upper())        except Exception:            breakif __name__ == '__main__': #windows下start历程一定要写到这下面    while True:        conn,client_addr=server.accept()        p=Process(target=talk,args=(conn,client_addr))        p.start()
复制代码
使用多历程实现socket谈天并发-server
img id="code_img_closed_29fc8276-e584-4407-9382-04a9eb528339"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_29fc8276-e584-4407-9382-04a9eb528339"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. from socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8080))while True:    msg=input(': ').strip()    if not msg:continue    client.send(msg.encode('utf-8'))    msg=client.recv(1024)    print(msg.decode('utf-8'))
复制代码
client端h3多历程中的其他方法/h3
img id="code_img_closed_3b66125a-1dcd-4632-afec-4208d71c9ad1"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_3b66125a-1dcd-4632-afec-4208d71c9ad1"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. from multiprocessing import Processimport timeimport randomclass Myprocess(Process):    def __init__(self,person):        self.name=person        super().__init__()    def run(self):        print('%s正在和网红脸谈天' %self.name)        time.sleep(random.randrange(1,5))        print('%s还在和网红脸谈天' %self.name)p1=Myprocess('哪吒')p1.start()p1.terminate()#关闭历程,不会立刻关闭,所以is_alive立刻查看的结果可能还是存活print(p1.is_alive()) #结果为Trueprint('开始')print(p1.is_alive()) #结果为False
复制代码
历程对象的其他方法:terminate,is_alive
img id="code_img_closed_21d2cec6-55ec-4996-8f2f-a0798fb43928"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_21d2cec6-55ec-4996-8f2f-a0798fb43928"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. 1 class Myprocess(Process): 2     def __init__(self,person): 3         self.name=person   # name属性是Process中的属性,标示历程的名字 4         super().__init__() # 执行父类的初始化方法会覆盖name属性 5         #self.name = person # 在这里设置就可以修改历程名字了 6         #self.person = person #假如不想覆盖历程名,就修改属性名称就可以了 7     def run(self): 8         print('%s正在和网红脸谈天' %self.name) 9         # print('%s正在和网红脸谈天' %self.person)10         time.sleep(random.randrange(1,5))11         print('%s正在和网红脸谈天' %self.name)12         # print('%s正在和网红脸谈天' %self.person)13 14 15 p1=Myprocess('哪吒')16 p1.start()17 print(p1.pid)    #可以查看子历程的历程id
复制代码
历程对象的其他属性:pid和nameh2历程同步(multiprocess.Lock)/h2h3锁 —— multiprocess.Lock/h3p      通过刚刚的学习,我们费尽心机实现了程序的异步,让多个任务可以同时在几个历程中并发处理惩罚,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。/pp  当多个历程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。/p
img id="code_img_closed_332245f6-a6cb-4d52-bb3d-433992a7eac6"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_332245f6-a6cb-4d52-bb3d-433992a7eac6"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. import osimport timeimport randomfrom multiprocessing import Processdef work(n):    print('%s: %s is running' %(n,os.getpid()))    time.sleep(random.random())    print('%s:%s is done' %(n,os.getpid()))if __name__ == '__main__':    for i in range(3):        p=Process(target=work,args=(i,))        p.start()
复制代码
多历程抢占输出资源
img id="code_img_closed_153a1e7f-636c-431a-b3b8-28bee02569f1"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_153a1e7f-636c-431a-b3b8-28bee02569f1"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. # 由并发变成了串行,牺牲了运行服从,但避免了竞争import osimport timeimport randomfrom multiprocessing import Process,Lockdef work(lock,n):    lock.acquire()    print('%s: %s is running' % (n, os.getpid()))    time.sleep(random.random())    print('%s: %s is done' % (n, os.getpid()))    lock.release()if __name__ == '__main__':    lock=Lock()    for i in range(3):        p=Process(target=work,args=(lock,i))        p.start()
复制代码
使用锁维护执行顺序p  上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,如许确实会浪费了时间,却保证了数据的安全。/pp  接下来,我们以模拟抢票为例,来看看数据安全的紧张性。 /p
img id="code_img_closed_38bbbf7b-b4e4-472c-b35c-50ebe68fa5db"  src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif"
img id="code_img_opened_38bbbf7b-b4e4-472c-b35c-50ebe68fa5db"  style="display: none;"  src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif"
  1. #文件db的内容为:{"count":1}#注意一定要用双引号,不然json无法辨认#并发运行,服从高,但竞争写同一文件,数据写入庞杂from multiprocessing import Process,Lockimport time,json,randomdef search():    dic=json.load(open('db'))    print('\033[43m剩余票数%s\033[0m' %dic['count'])def get():    dic=json.load(open('db'))    time.sleep(0.1) #模拟读数据的网络延迟    if dic['count'] >0:        dic['count']-=1        time.sleep(0.2) #模拟写数据的网络延迟        json.dump(dic,open('db','w'))        print('\033[43m购票成功\033[0m')def task():    search()    get()if __name__ == '__main__':    for i in range(100): #模拟并发100个客户端抢票        p=Process(target=task)        p.start()
复制代码
多历程同时抢购余票
  1. #文件db的内容为:{"count":5}#注意一定要用双引号,不然json无法辨认#并发运行,服从高,但竞争写同一文件,数据写入庞杂from multiprocessing import Process,Lockimport time,json,randomdef search():    dic=json.load(open('db'))    print('\033[43m剩余票数%s\033[0m' %dic['count'])def get():    dic=json.load(open('db'))    time.sleep(random.random()) #模拟读数据的网络延迟    if dic['count'] >0:        dic['count']-=1        time.sleep(random.random()) #模拟写数据的网络延迟        json.dump(dic,open('db','w'))        print('\033[32m购票成功\033[0m')    else:        print('\033[31m购票失败\033[0m')def task(lock):    search()    lock.acquire()    get()    lock.release()if __name__ == '__main__':    lock = Lock()    for i in range(100): #模拟并发100个客户端抢票        p=Process(target=task,args=(lock,))        p.start()
复制代码
使用锁来保证数据安全
  1. #加锁可以保证多个历程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速率是慢了,但牺牲了速率却保证了数据安全。虽然可以用文件共享数据实现历程间通信,但问题是:1.服从低(共享数据基于文件,而文件是硬盘上的数据)2.须要自己加锁处理惩罚#因此我们最好找寻一种解决方案能够兼顾:1、服从高(多个历程共享一块内存的数据)2、帮我们处理惩罚好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。队列和管道都是将数据存放于内存中队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理惩罚复杂的同步和锁问题,而且在历程数量增多时,往往可以获得更好的可获展性。
复制代码
历程间通信——队列(multiprocess.Queue)

历程间通信

IPC(Inter-Process Communication)
队列

概念先容

创建共享的历程队列,Queue是多历程安全的队列,可以使用Queue实现多历程之间的数据传递。
  1. Queue([maxsize]) 创建共享的历程队列。参数 :maxsize是队列中允许的最大项数。假如省略此参数,则无巨细限制。底层队列使用管道和锁定实现。
复制代码
  1. Queue([maxsize]) 创建共享的历程队列。maxsize是队列中允许的最大项数。假如省略此参数,则无巨细限制。底层队列使用管道和锁定实现。别的,还须要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具有以下方法:q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。假如q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 假如设置为False,将引发Queue.Empty非常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。假如在制定的时间间隔内没有项目变为可用,将引发Queue.Empty非常。q.get_nowait( ) 同q.get(False)方法。q.put(item [, block [,timeout ] ] ) 将item放入队列。假如队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。假如设置为False,将引发Queue.Empty非常(定义在Queue库模块中)。timeout指定在阻塞模式中等候可用空间的时间是非。超时后将引发Queue.Full非常。q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError非常。q.empty() 假如调用此方法时 q为空,返回True。假如其他历程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经参加新的项目。q.full() 假如q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
复制代码
方法先容
  1. q.close() 关闭队列,防止队列中参加更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。假如q被垃圾网络,将自动调用此方法。关闭队列不会在队列使用者中生成任何范例的数据竣事信号或非常。比方,假如某个使用者正被阻塞在get()操纵上,关闭生产者中的队列不会导致get()方法返回错误。q.cancel_join_thread() 不会再历程退出时自动毗连后台线程。这可以防止join_thread()方法阻塞。q.join_thread() 毗连队列的后台线程。此方法用于在调用q.close()方法后,等候所有队列项被斲丧。默认情况下,此方法由不是q的原始创建者的所有历程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
复制代码
其他方法(相识)代码实例

  1. '''multiprocessing模块支持历程间通信的两种重要形式:管道和队列都是基于消息传递实现的,但是队列接口'''from multiprocessing import Queueq=Queue(3)#put ,get ,put_nowait,get_nowait,full,emptyq.put(3)q.put(3)q.put(3)# q.put(3)   # 假如队列已经满了,程序就会停在这里,等候数据被别人取走,再将数据放入队列。           # 假如队列中的数据不停不被取走,程序就会永久停在这里。try:    q.put_nowait(3) # 可以使用put_nowait,假如队列满了不会阻塞,但是会因为队列满了而报错。except: # 因此我们可以用一个try语句来处理惩罚这个错误。如许程序不会不停阻塞下去,但是会丢掉这个消息。    print('队列已经满了')# 因此,我们再放入数据之前,可以先看一下队列的状态,假如已经满了,就不继续put了。print(q.full()) #满了print(q.get())print(q.get())print(q.get())# print(q.get()) # 同put方法一样,假如队列已经空了,那么继续取就会出现阻塞。try:    q.get_nowait(3) # 可以使用get_nowait,假如队列满了不会阻塞,但是会因为没取到值而报错。except: # 因此我们可以用一个try语句来处理惩罚这个错误。如许程序不会不停阻塞下去。    print('队列已经空了')print(q.empty()) #空了
复制代码
单看队列用法上面这个例子还没有参加历程通信,只是先来看看队列为我们提供的方法,以及这些方法的使用和征象。
  1. import timefrom multiprocessing import Process, Queuedef f(q):    q.put([time.asctime(), 'from Eva', 'hello'])  #调用主函数中p历程传递过来的历程参数 put函数为向队列中添加一条数据。if __name__ == '__main__':    q = Queue() #创建一个Queue对象    p = Process(target=f, args=(q,)) #创建一个历程    p.start()    print(q.get())    p.join()
复制代码
子历程发送数据给父历程上面是一个queue的简单应用,使用队列q对象调用get函数来取得队列中最先进入的数据。 接下来看一个稍微复杂一些的例子:
  1. import osimport timeimport multiprocessing# 向queue中输入数据的函数def inputQ(queue):    info = str(os.getpid()) + '(put):' + str(time.asctime())    queue.put(info)# 向queue中输出数据的函数def outputQ(queue):    info = queue.get()    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))# Mainif __name__ == '__main__':    multiprocessing.freeze_support()    record1 = []   # store input processes    record2 = []   # store output processes    queue = multiprocessing.Queue(3)    # 输入历程    for i in range(10):        process = multiprocessing.Process(target=inputQ,args=(queue,))        process.start()        record1.append(process)    # 输出历程    for i in range(10):        process = multiprocessing.Process(target=outputQ,args=(queue,))        process.start()        record2.append(process)    for p in record1:        p.join()    for p in record2:        p.join()
复制代码
批量生产数据放入队列再批量获取结果 x生产者消费者模型

在并发编程中使用生产者和消费者模式能够解断交大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理惩罚数据的速率。
为什么要使用生产者和消费者模式

在线程天下里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,假如生产者处理惩罚速率很快,而消费者处理惩罚速率很慢,那么生产者就必须等候消费者处理惩罚完,才气继续生产数据。同样的道理,假如消费者的处理惩罚能力大于生产者,那么消费者就必须等候生产者。为相识决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等候消费者处理惩罚,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理惩罚能力。
基于队列实现生产者消费者模型

  1. from multiprocessing import Process,Queueimport time,random,osdef consumer(q):    while True:        res=q.get()        time.sleep(random.randint(1,3))        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):    for i in range(10):        time.sleep(random.randint(1,3))        res='包子%s' %i        q.put(res)        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':    q=Queue()    #生产者们:即厨师们    p1=Process(target=producer,args=(q,))    #消费者们:即吃货们    c1=Process(target=consumer,args=(q,))    #开始    p1.start()    c1.start()    print('主')
复制代码
基于队列实现生产者消费者模型此时的问题是主历程永久不会竣事,缘故原由是:生产者p在生产完后就竣事了,但是消费者c在取空了q之后,则不停处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个竣事信号,如许消费者在吸收到竣事信号后就可以break出死循环。
  1. from multiprocessing import Process,Queueimport time,random,osdef consumer(q):    while True:        res=q.get()        if res is None:break #收到竣事信号则竣事        time.sleep(random.randint(1,3))        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):    for i in range(10):        time.sleep(random.randint(1,3))        res='包子%s' %i        q.put(res)        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))    q.put(None) #发送竣事信号if __name__ == '__main__':    q=Queue()    #生产者们:即厨师们    p1=Process(target=producer,args=(q,))    #消费者们:即吃货们    c1=Process(target=consumer,args=(q,))    #开始    p1.start()    c1.start()    print('主')
复制代码
改良版——生产者消费者模型注意:竣事信号None,不一定要由生产者发,主历程里同样可以发,但主历程须要等生产者竣事后才应该发送该信号
  1. from multiprocessing import Process,Queueimport time,random,osdef consumer(q):    while True:        res=q.get()        if res is None:break #收到竣事信号则竣事        time.sleep(random.randint(1,3))        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):    for i in range(2):        time.sleep(random.randint(1,3))        res='包子%s' %i        q.put(res)        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':    q=Queue()    #生产者们:即厨师们    p1=Process(target=producer,args=(q,))    #消费者们:即吃货们    c1=Process(target=consumer,args=(q,))    #开始    p1.start()    c1.start()    p1.join()    q.put(None) #发送竣事信号    print('主')
复制代码
主历程在生产者生产完毕后发送竣事信号None但上述解决方式,在有多个生产者和多个消费者时,我们则须要用一个很low的方式去解决
  1. from multiprocessing import Process,Queueimport time,random,osdef consumer(q):    while True:        res=q.get()        if res is None:break #收到竣事信号则竣事        time.sleep(random.randint(1,3))        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):    for i in range(2):        time.sleep(random.randint(1,3))        res='%s%s' %(name,i)        q.put(res)        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':    q=Queue()    #生产者们:即厨师们    p1=Process(target=producer,args=('包子',q))    p2=Process(target=producer,args=('骨头',q))    p3=Process(target=producer,args=('泔水',q))    #消费者们:即吃货们    c1=Process(target=consumer,args=(q,))    c2=Process(target=consumer,args=(q,))    #开始    p1.start()    p2.start()    p3.start()    c1.start()    p1.join() #必须保证生产者全部生产完毕,才应该发送竣事信号    p2.join()    p3.join()    q.put(None) #有几个消费者就应该发送频频竣事信号None    q.put(None) #发送竣事信号    print('主')
复制代码
多个消费者的例子:有几个消费者就须要发送频频竣事信号
JoinableQueue([maxsize])
创建可毗连的共享历程队列。这就像是一个Queue对象,但队列允许项目的使用者关照生产者项目已经被成功处理惩罚。关照历程是使用共享的信号和条件变量来实现的。
  1. JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:q.task_done() 使用者使用此方法发出信号,表示q.get()返回的项目已经被处理惩罚。假如调用此方法的次数大于从队列中删除的项目数量,将引发ValueError非常。q.join() 生产者将使用此方法进行阻塞,直到队列中所有项目均被处理惩罚。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 下面的例子阐明如何创建永久运行的历程,使用和处理惩罚队列上的项目。生产者将项目放入队列,并等候它们被处理惩罚。
复制代码
方法先容
  1. from multiprocessing import Process,JoinableQueueimport time,random,osdef consumer(q):    while True:        res=q.get()        time.sleep(random.randint(1,3))        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))        q.task_done() #向q.join()发送一次信号,证实一个数据已经被取走了def producer(name,q):    for i in range(10):        time.sleep(random.randint(1,3))        res='%s%s' %(name,i)        q.put(res)        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))    q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理惩罚。if __name__ == '__main__':    q=JoinableQueue()    #生产者们:即厨师们    p1=Process(target=producer,args=('包子',q))    p2=Process(target=producer,args=('骨头',q))    p3=Process(target=producer,args=('泔水',q))    #消费者们:即吃货们    c1=Process(target=consumer,args=(q,))    c2=Process(target=consumer,args=(q,))    c1.daemon=True    c2.daemon=True    #开始    p_l=[p1,p2,p3,c1,c2]    for p in p_l:        p.start()    p1.join()    p2.join()    p3.join()    print('主')         #主历程等--->p1,p2,p3等---->c1,c2    #p1,p2,p3竣事了,证实c1,c2肯定全都收完了p1,p2,p3发到队列的数据    #因而c1,c2也没有存在的价值了,不须要继续阻塞在历程中影响主历程了。应该随着主历程的竣事而竣事,所以设置成守护历程就可以了。
复制代码
JoinableQueue队列实现消费之生产者模型历程之间的数据共享

展望将来,基于消息传递的并发编程是大势所趋
即便是使用线程,推荐做法也是将程序计划为大量独立的线程聚集,通过消息队列互换数据。
如许极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。
但历程间应该尽量避免通信,即便须要通信,也应该选择历程安全的工具来避免加锁带来的问题。
以后我们会尝试使用数据库来解决现在历程之间的数据共享问题。
  1. 历程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的虽然历程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
复制代码
Manager模块先容
  1. from multiprocessing import Manager,Process,Lockdef work(d,lock):    with lock: #不加锁而操纵共享的数据,肯定会出现数据庞杂        d['count']-=1if __name__ == '__main__':    lock=Lock()    with Manager() as m:        dic=m.dict({'count':100})        p_l=[]        for i in range(100):            p=Process(target=work,args=(dic,lock))            p_l.append(p)            p.start()        for p in p_l:            p.join()        print(dic)
复制代码
Manager例子

历程池和multiprocess.Pool模块

历程池

为什么要有历程池?历程池的概念。
在程序实际处理惩罚问题过程中,忙时会有成千上万的任务须要被执行,闲时可能只有零散任务。那么在成千上万个任务须要被执行的时候,我们就须要去创建成千上万个历程么?首先,创建历程须要斲丧时间,烧毁历程也须要斲丧时间。第二即便开启了成千上万的历程,操纵系统也不能让他们同时执行,如许反而会影响程序的服从。因此我们不能无限制的根据任务开启大概竣事历程。那么我们要怎么做呢?
在这里,要给大家先容一个历程池的概念,定义一个池子,在里面放上固定数量的历程,有需求来了,就拿一个池中的历程来处理惩罚任务,等到处理惩罚完毕,历程并不关闭,而是将历程再放回历程池中继续等候任务。假如有很多任务须要执行,池中的历程数量不敷,任务就要等候之前的历程执行任务完毕归来,拿到空闲历程才气继续执行。也就是说,池中历程的数量是固定的,那么同一时间最多有固定数量的历程在运行。如许不会增加操纵系统的调度难度,还节流了开闭历程的时间,也一定程度上能够实现并发效果。
multiprocess.Pool模块

概念先容
  1. Pool([numprocess  [,initializer [, initargs]]]):创建历程池
复制代码
  1. 1 numprocess:要创建的历程数,假如省略,将默认使用cpu_count()的值2 initializer:是每个工作历程启动时要执行的可调用对象,默认为None3 initargs:是要传给initializer的参数组
复制代码
参数先容
  1. 1 p.apply(func [, args [, kwargs]]):在一个池工作历程中执行func(*args,**kwargs),然后返回结果。2 '''须要夸大的是:此操纵并不会在所有池工作历程中并执行func函数。假如要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数大概使用p.apply_async()'''3 4 p.apply_async(func [, args [, kwargs]]):在一个池工作历程中执行func(*args,**kwargs),然后返回结果。5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,吸收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操纵,否则将吸收其他异步操纵中的结果。'''6    7 p.close():关闭历程池,防止进一步操纵。假如所有操纵持续挂起,它们将在工作历程终止前完成8 9 P.jion():等候所有工作历程退出。此方法只能在close()或teminate()之后调用
复制代码
重要方法
  1. 1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法2 obj.get():返回结果,假如有须要则等候结果到达。timeout是可选的。假如在指定时间内还没有到达,将引发一场。假如长途操纵中引发了非常,它将在调用此方法时再次被引发。3 obj.ready():假如调用完成,返回True4 obj.successful():假如调用完成且没有引发非常,返回True,假如在结果就绪之前调用此方法,引发非常5 obj.wait([timeout]):等候结果变为可用。6 obj.terminate():立刻终止所有工作历程,同时不执行任何清算或竣事任何挂起工作。假如p被垃圾接纳,将自动调用此函数
复制代码
其他方法(相识)代码实例

历程池和多历程服从对比

p.map历程池和历程服从测试同步和异步

  1. import os,timefrom multiprocessing import Pooldef work(n):    print('%s run' %os.getpid())    time.sleep(3)    return n**2if __name__ == '__main__':    p=Pool(3) #历程池中从无到有创建三个历程,以后不停是这三个历程在执行任务    res_l=[]    for i in range(10):        res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等候任务work执行的过程中可能有阻塞也可能没有阻塞                                    # 但不管该任务是否存在阻塞,同步调用都会在原地等着    print(res_l)
复制代码
历程池的同步调用
  1. import osimport timeimport randomfrom multiprocessing import Pooldef work(n):    print('%s run' %os.getpid())    time.sleep(random.random())    return n**2if __name__ == '__main__':    p=Pool(3) #历程池中从无到有创建三个历程,以后不停是这三个历程在执行任务    res_l=[]    for i in range(10):        res=p.apply_async(work,args=(i,)) # 异步运行,根据历程池中有的历程数,每次最多3个子历程在异步执行                                          # 返回结果之后,将结果放入列表,归还历程,之后再执行新的任务                                          # 须要注意的是,历程池中的三个历程不会同时开启大概同时竣事                                          # 而是执行完一个就释放一个历程,这个历程就去吸收新的任务。          res_l.append(res)    # 异步apply_async用法:假如使用异步提交的任务,主历程须要使用jion,等候历程池内任务都处理惩罚完,然后可以用get网络结果    # 否则,主历程竣事,历程池可能还没来得及执行,也就跟着一起竣事了    p.close()    p.join()    for res in res_l:        print(res.get()) #使用get来获取apply_aync的结果,假如是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
复制代码
历程池的异步调用练习

  1. #Pool内的历程数默认是cpu核数,假设为4(查看方法os.cpu_count())#开启6个客户端,会发现2个客户端处于等候状态#在每个历程内查看pid,会发现pid使用为4个,即多个客户端公用4个历程from socket import *from multiprocessing import Poolimport osserver=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)server.bind(('127.0.0.1',8080))server.listen(5)def talk(conn):    print('历程pid: %s' %os.getpid())    while True:        try:            msg=conn.recv(1024)            if not msg:break            conn.send(msg.upper())        except Exception:            breakif __name__ == '__main__':    p=Pool(4)    while True:        conn,*_=server.accept()        p.apply_async(talk,args=(conn,))        # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
复制代码
server:历程池版socket并发谈天
  1. from socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8080))while True:    msg=input('>>: ').strip()    if not msg:continue    client.send(msg.encode('utf-8'))    msg=client.recv(1024)    print(msg.decode('utf-8'))
复制代码
client发现:并发开启多个客户端,服务端同一时间只有4个不同的pid,只能竣事一个客户端,别的一个客户端才会进来.
回调函数
  1. 须要回调函数的场景:历程池中任何一个任务一旦处理惩罚完了,就立刻告知主历程:我好了额,你可以处理惩罚我的结果了。主历程则调用一个函数去处理惩罚该结果,该函数即回调函数我们可以把耗时间(阻塞)的任务放到历程池中,然后指定回调函数(主历程负责执行),如许主历程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
复制代码

  1. from multiprocessing import Poolimport requestsimport jsonimport osdef get_page(url):    print(' get %s' %(os.getpid(),url))    respone=requests.get(url)    if respone.status_code == 200:        return {'url':url,'text':respone.text}def pasrse_page(res):    print(' parse %s' %(os.getpid(),res['url']))    parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))    with open('db.txt','a') as f:        f.write(parse_res)if __name__ == '__main__':    urls=[        'https://www.baidu.com',        'https://www.python.org',        'https://www.openstack.org',        'https://help.github.com/',        'http://www.sina.com.cn/'    ]    p=Pool(3)    res_l=[]    for url in urls:        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)        res_l.append(res)    p.close()    p.join()    print([res.get() for res in res_l]) #拿到的是get_page的结果,实在完全没须要拿该结果,该结果已经传给回调函数处理惩罚了'''打印结果: get https://www.baidu.com get https://www.python.org get https://www.openstack.org get https://help.github.com/ parse https://www.baidu.com get http://www.sina.com.cn/ parse https://www.python.org parse https://help.github.com/ parse http://www.sina.com.cn/ parse https://www.openstack.org[{'url': 'https://www.baidu.com', 'text': '\r\n...',...}]'''
复制代码
使用多历程请求多个url来减少网络等候浪费的时间
[code]import refrom urllib.request import urlopenfrom multiprocessing import Pooldef get_page(url,pattern):    response=urlopen(url).read().decode('utf-8')    return pattern,responsedef parse_page(info):    pattern,page_content=info    res=re.findall(pattern,page_content)    for item in res:        dic={            'index':item[0].strip(),            'title':item[1].strip(),            'actor':item[2].strip(),            'time':item[3].strip(),        }        print(dic)if __name__ == '__main__':    regex = r'.*?

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?用户注册

x

天涯海角也要找到Ni:Python_进程

中发现Ni: Python_进程
中发现Ni: Python_进程
中发现Ni: Python_进程
中发现Ni: Python_进程
中发现Ni: Python_进程
中发现Ni: Python_进程
相关技术服务需求,请联系管理员和客服QQ:2753533861或QQ:619920289
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

帖子推荐:
客服咨询

QQ:2753533861

服务时间 9:00-22:00

快速回复 返回顶部 返回列表