Python 线程&进程与协程

Python 线程&进程与协程,第1张

概述Python 与线程 线程是进程的执行单元,对于大多数程序来说,可能只有一个主线程,但是为了能够提高效率,有些程序会采用多线程,在系统中所有的线程看起来都是同时执行的,例如,现在的多线程网络下载程序中 Python 与线程

线程是进程的执行单元,对于大多数程序来说,可能只有一个主线程,但是为了能够提高效率,有些程序会采用多线程,在系统中所有的线程看起来都是同时执行的,例如,现在的多线程网络下载程序中,就使用了这种线程并发的特性,程序将欲下载的文件分成多个部分,然后同时进行下载,从而加快速度.虽然线程并不是一个容易掌握和使用的概念,但是如果运用得当,还是可以获得很不错的性能的.

◆创建使用线程◆

在 Python 中创建线程需要用到一个类,threading类,其类的实现方法是底层调用了C语言的原生函数来实现的创建线程,创建线程有两种方式,一种是直接使用函数创建线程,另一种则是使用类创建线程,两种创建方式效果是相同的,但是需要注意一点,在使用类的方式创建线程的时候,默认执行run(self)方法,且此函数名称必须是run不能修改,接下来看3个小例子吧.

使用函数创建线程: 通过线程模块创建线程,并传递参数即可实现直接对指定函数实现多线程.

import osimport threadingimport timedef MyThread(x,y):                         #定义每个线程要执行的函数体    print("传递的数据:%s,%s"%(x,y))        #其中有两个参数,我们动态传入    time.sleep(5)                         #睡眠5秒钟for x in range(10):                       #创建10个线程并发执行函数    thread = threading.Thread(target=MyThread,args=(x,x+1,)) #args是函数的参数,元组最后一个必须要逗号.    thread.start()                                           #启动线程

使用类创建线程: 通过定义类,传递给类中一些参数,然后启动多线程,这种方式不常用.

import osimport threadingimport timeclass MyThread(threading.Thread):         #继承threading.Thread类    def __init__(self,x,y):               #重写构造函数        super(MyThread,self).__init__()   #先执行父类的构造方法        self.x = x        self.y = y    def run(self):     #run()方法,是cpu调度线程会使用的方法,名称必须是run        print("运行线程,X=%s Y=%s"%(self.x,self.y))for i in range(10):    #创建10个线程    obj = MyThread(i,i+10)    obj.start()
import paramiko,datetime,threadingclass MyThread(threading.Thread):    def __init__(self,address,username,password,port,command):        super(MyThread,self).__init__()        self.address = address        self.username = username        self.password = password        self.port = port        self.command = command    def run(self):        ssh = paramiko.SSHClIEnt()        ssh.set_missing_host_key_policy(paramiko.autoAddPolicy())        try:            ssh.connect(self.address,port=self.port,username=self.username,password=self.password,timeout=1)            stdin,stdout,stderr = ssh.exec_command(self.command)            result = stdout.read()            if not result:                self.result = stderr.read()            ssh.close()            self.result = result.decode()        except Exception:            self.result = "0"    def get_result(self):        try:            return self.result        except Exception:            return NoneThreadPool = []   # 定义线程池starttime = datetime.datetime.Now()for item in range(5):    obj = MyThread("192.168.1.20","root","123","22","ifconfig")    ThreadPool.append(obj)for item in ThreadPool:    item.start()                # 启动线程    item.join()for item in ThreadPool:    ret = item.get_result()    # 获取返回结果    print(ret)endtime = datetime.datetime.Now()print("程序开始运行:{} 结束:{}".format(starttime,endtime))

接收线程返回结果: 我们可以使用join方法,等待线程执行完毕后的返回结果.

import osimport threadingimport timedef MyThread(x,我们动态传入    time.sleep(5)                         #睡眠5秒钟    return "ok"temp=[]for x in range(10):                                          #创建10个线程并发执行函数    thread = threading.Thread(target=MyThread,元组最后一个必须要逗号.    thread.start()                                           #启动线程    temp.append(thread)                                      #将线程结果添加到列表for y in temp:                                               #遍历这个线程列表    #此处一定要join,不然主线程比子线程跑的快,会拿不到结果,程序就退出执行了.    y.join()                                                 #等待线程执行完毕,返回结果    print("线程: %s"%y)
◆线程锁与信号◆

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以就出现了线程锁的概念,即在同一时刻只允许一个线程执行 *** 作,在这里我们选择使用Rlock,而不使用Lock,因为Lock如果多次获取锁的时候会出错,而RLock允许在同一线程中被多次acquire,但是需要用n次的release才能真正释放所占用的琐,一个线程获取了锁在释放之前,其他线程只有等待线程结束后在进行 *** 作.

全局锁(Lock): 添加本全局锁以后,能够保证在同一时间内保证只有一个线程具有权限.

import timeimport threadingnum = 0                  #定义全局共享变量thread_List = []         #线程列表lock = threading.Lock()  #生成全局锁def SumNumber():    global num          #在每个线程中获取这个全局变量    time.sleep(2)    lock.acquire()      #修改数据前给数据加锁    num += 1            #每次进行递增 *** 作    lock.release()      #执行完毕以后,解除锁定for x in range(50):     #指定生成线程数    thread = threading.Thread(target=SumNumber)    thread.start()              #启动线程    thread_List.append(thread)  #将结果列表加入到变量中for y in thread_List:           #等待执行完毕.    y.join()print("计算结果: ",num)

递归锁(RLock): 递归锁和全局锁差不多,递归锁就是在大锁中还要添加个小锁,递归锁是常用的锁.

import threadingimport timenum = 0                          #初始化全局变量lock = threading.RLock()         #设置递归锁def fun1():    lock.acquire()              #添加递归锁    global num    num += 1    lock.release()              #关闭递归锁    return numdef fun2():    lock.acquire()              #添加递归锁    res = fun1()    print("计算结果: ",res)    lock.release()              #关闭递归锁if __name__ == "__main__":    for x in range(10):         #生成10个线程        thread = threading.Thread(target=fun2)        thread.start()while threading.active_count() != 1:   #等待所有线程执行完成    print(threading.active_count())else:    print("所有线程运行完成...")    print(num)

互斥锁(Semaphore): 同时允许一定数量的线程更改数据,也就是限制每次允许执行的线程数.

import threading,timenum = 0                                           #初始化变量semaphore = threading.BoundedSemaphore(5)         #最多允许5个线程同时运行def run(n):    semaphore.acquire()                           #添加信号    time.sleep(1)    print("运行这个线程中: %s"%n)    semaphore.release()                           #关闭信号if __name__ == '__main__':    for i in range(20):                           #同时执行20个线程        t = threading.Thread(target=run,args=(i,))        t.start()while threading.active_count() != 1:              #等待所有线程执行完毕    passelse:    print('----所有线程执行完毕了---')    print(num)
◆线程驱动事件◆

事件驱动(Event): 线程事件用于主线程控制其他线程的执行,事件主要提供了三个方法set、wait、clear、is_set,分别用于设置检测和清除标志.

事件处理机制定义:全局定义了一个"Flag",如果"Flag"值为False,那么当程序执行event.wait 方法时就会阻塞,如果"Flag"值为True,那么在执行event.wait 方法时便不再阻塞,变成可执行模式,总体来说需要了解以下四个方法.

clear:将"Flag"设置为False
set:将"Flag"设置为True
wait:检测当前"Flag",如果"Flag"值为 False,那么当线程执行 event.wait 方法时就会阻塞,那么event.wait 方法时便不再阻塞
is_set:检测当前的状态,是否阻塞

import threadingevent = threading.Event()def func(x,event):    print("函数被执行了: %s 次.." %x)    event.wait()         #检测标志位状态,如果为True=继续执行以下代码,反之等待.    print("加载执行结果: %s" %x)for i in range(10):      #创建10个线程    thread = threading.Thread(target=func,event,))    thread.start()print("当前状态: %s" %event.is_set())       #检测当前状态,这里为Falseevent.clear()                              #将标志位设置为False,默认为Falsetemp=input("输入yes: ")                    #输入yes手动设置为Trueif temp == "yes":    event.set()                            #设置成True    print("当前状态: %s" %event.is_set())   #检测当前状态,这里为True

定时器(Timer): 指定定时器,作用是让进程或者是指定函数,在n秒后执行相应的 *** 作.

import threadingimport timedef func():    print("hello python")for i in range(5):                    #指定5个线程    thread = threading.Timer(5,func)  #在5秒钟以后运行func函数    thread.start()

## Python 与进程

直观地说,进程就是正在执行的程序,进程是多任务 *** 作系统中执行任务的基本单元,是包含了程序指令和相关资源的集合,线程的上一级就是进程,进程可包含很多线程,进程和线程的区别是进程间的数据不共享,多进程也可以用来处理多任务,不过多进程很消耗资源,计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核心数保持一致. 

进程与线程的区别,有以下几种解释:

● 新创建一个线程很容易,新创建一个进程需要复制父进程
● 线程共享创建它的进程的地址空间,进程有自己的地址空间
● 主线程可以控制相当大的线程在同一进程中,进程只能控制子进程
● 线程是直接可以访问线程之间的数据,进程需要复制父进程的数据才能访问
● 主线程变更可能会影响进程的其他线程的行为,父进程的变化不会影响子进程
● 线程可以直接与其他线程的通信过程,进程必须使用进程间通信和同胞交流过程

◆创建一个进程◆

通常情况下,创建一个进程需要使用multiprocessing 模块,具体的创建方法和上面的线程创建方法相同,唯一的不同是关键字的变化,但需要注意的是,由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销,其他使用方法和线程threading.Thread是一样的,如下介绍两个创建进程例子.  

创建进程(1): 通过使用multiprocessing库,循环创建5个进程,并使用join等待进程执行完毕.

import multiprocessingimport timedef func(name):    time.sleep(2)    print("hello",name)if __name__ == "__main__":    for i in range(5):        proc = multiprocessing.Process(target=func,args=("lyshark",))        proc.start()        proc.join()

创建进程(2): 创建5个进程,并在每个进程里启动1个线程,线程打印出线程的ID号.

import multiprocessingimport threadingimport timedef thread_run():    print("子线程->子线程ID: %s" %threading.get_IDent())def func(num):    time.sleep(2)    print("-------------------------------->>>  主线程->主线程ID %s" %num)    for i in range(5):                                  #在主线程里开辟5个子线程        thread = threading.Thread(target=thread_run,)   #嵌套一个子线程        thread.start()                                  #执行子线程if __name__ == "__main__":    for i in range(5):                                 #启动5个主线程        proc = multiprocessing.Process(target=func,))        proc.start()        #proc.join()
◆进程数据共享◆

一般当我们创建两个进程后,进程各自持有一份数据,默认无法共享数据,如果我们想要共享数据必须通过一个中间件来实现数据的交换,来帮你把数据进行一个投递,要实现进程之间的数据共享,其主要有以下几个方法来实现进程间数据的共享,queues,Array,Manager.dict,pipe这些方法都能实现数据共享,下面将举几个小例子进行说明.

共享队列(Queue): 这个Queue主要实现进程与进程之间的数据共享,与线程中的Queue不同.

from multiprocessing import Processfrom multiprocessing import queuesimport multiprocessing def foo(i,arg):    arg.put(i)    print('say hi',i,arg.qsize()) li = queues.Queue(20,ctx=multiprocessing) for i in range(10):    p = Process(target=foo,li,))    p.start()

共享整数(int): 整数之间的共享,只需要使用multiprocessing.Value方法,即可实现.

import multiprocessingdef func(num):    num.value = 1024                              #虽然赋值了,但是子进程改变了这个数值    print("函数中的数值: %s"%num.value)if __name__ == "__main__":    num = multiprocessing.Value("d",10.0)         #主进程与子进程共享这个value    print("这个共享数值: %s"%num.value)    for i in range(5):        num = multiprocessing.Value("d",i)      #声明进程,并传递1,2,3,4这几个数        proc = multiprocessing.Process(target=func,args=(num,))        proc.start()                             #启动进程        #proc.join()        print("最后打印数值: %s"%num.value)

共享数组(Array): 数组之间的共享,只需要使用multiprocessing.Array方法,即可实现.

import multiprocessingdef func(ary):       #子进程改变数组,主进程跟着改变    ary[0]=100    ary[1]=200    ary[2]=300''' i所对应的类型是ctypes.c_int,其他类型如下参考:    'c': ctypes.c_char,'u': ctypes.c_wchar,'b': ctypes.c_byte,'B': ctypes.c_ubyte,'h': ctypes.c_short,'H': ctypes.c_ushort,'i': ctypes.c_int,'I': ctypes.c_uint,'l': ctypes.c_long,'L': ctypes.c_ulong,'f': ctypes.c_float,'d': ctypes.c_double'''if __name__ == "__main__":    ary = multiprocessing.Array("i",[1,3])   #主进程与子进程共享这个数组    for i in range(5):        proc = multiprocessing.Process(target=func,args=(ary,))        print(ary[:])        proc.start()

共享字典(dict): 通过使用Manager()方法,实现两个进程中的,字典与列表的数据共享.

import multiprocessingdef func(mydict,myList):    mydict["字典1"] = "值1"    mydict["字典2"] = "值2"    myList.append(1)    myList.append(2)    myList.append(3)if __name__ == "__main__":    mydict = multiprocessing.Manager().dict()        #主进程与子进程共享字典    myList = multiprocessing.Manager().List()        #主进程与子进程共享列表    proc = multiprocessing.Process(target=func,args=(mydict,myList))    proc.start()    proc.join()    print("列表中的元素: %s" %myList)    print("字典中的元素: %s" %mydict)

管道共享(Pipe): 通过Pipe管道的方式在两个进程之间共享数据,类似于Socket套接字.

import multiprocessingdef func(conn):    conn.send("你好我是子进程.")                      #发送消息给父进程    print("父进程传来了:",conn.recv())                #接收父进程传来的消息    conn.close()if __name__ == "__main__":    parent_conn,child_conn = multiprocessing.Pipe()  #管道创建两个端口,一收一发送    proc = multiprocessing.Process(target=func,args=(child_conn,))    proc.start()    print("子进程传来了:",parent_conn.recv())         #接收子进程传来的数据    parent_conn.send("我是父进程,收到消息了..")        #父进程发送消息给子进程

进程锁(Lock): 进程中也有锁,可以实现进程之间数据的一致性,也就是进程数据的同步,保证数据不混乱.

import multiprocessingdef func(loc,num):    loc.acquire()                        #添加进程锁    print("hello ---> %s" %num)    loc.release()                        #关闭进程锁if __name__ == "__main__":    lock = multiprocessing.Lock()        #生成进程锁    for number in range(10):        proc = multiprocessing.Process(target=func,args=(lock,number,))        proc.start()
◆进程的进程池◆

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止,进程池有两个方法:apply(),apply_async(),下面将介绍几个常用的小技巧.

进程池(apply): 同步执行,每次执行一个进程,直到所有进程执行完毕,其实也就是串行执行.

import multiprocessingimport timedef foo(num):    time.sleep(2)    print("进程执行-->: %s"%num)if __name__ == "__main__":    pool = multiprocessing.Pool(processes=5)    #允许进程池同时放入5个进程    for i in range(10):        pool.apply(func=foo,))          #并行执行每次执行一个    print("ends ...")    pool.close()    pool.join()

进程池(apply_async): 异步执行进程,每次执行5个进程,直到执行完10次循环位置,并行执行.

import multiprocessingimport timedef foo(num):    time.sleep(2)    print("进程执行-->: %s"%num)def bar(arg):    print("call back 函数执行..")if __name__ == "__main__":    pool = multiprocessing.Pool(processes=5)               #允许进程池同时放入5个进程    for i in range(10):        pool.apply_async(func=foo,),callback=bar)  #每次执行进程结束,自动执行callback指定的函数    print("ends ...")        pool.close()    pool.join()

## Python 与协程

协程,又称微线程,是一种用户态的轻量级线程,携程主要实现了在单线程下实现并发,一个线程能够被分割成多个协程,协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,因此协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态.

线程和进程的 *** 作是由程序触发系统接口,最后的执行者是系统,协程的 *** 作则是程序员,协程存在的意义:对于多线程应用,cpu通过切片的方式来切换线程间的执行,线程切换时需要耗时,而协程则只使用一个线程,在一个线程中规定某个代码块执行顺序,协程的适用场景:当程序中存在大量不需要cpu的 *** 作时(IO *** 作),时适用于协程.

协程之(YIEld): 通过使用yIEld方法来模拟实现协程 *** 作的例子,这里只是演示.

import timeimport queuedef consumer(name):    print("--->包子...")    while True:        new_yIEld = yIEld        print("[%s] 在吃包子 %s" % (name,new_yIEld))def producer():    r = con.__next__()    r = con2.__next__()    n = 0    while n < 5:        n += 1        con.send(n)        con2.send(n)        print("3[32;1m[producer]3[0m 生产包子.. %s" % n)if __name__ == '__main__':    con = consumer("admin")    con2 = consumer("lyshark")    p = producer()

协程之(Greenlet): Greenlet协程模块,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator,但是仍然需要手动切换.

from greenlet import greenletdef master():    print("主程序执行...")    green2.switch()            #切换到slaves函数    print("主程序再次执行...")    green2.switch()            #切换到master函数def slaves():    print("子程序执行....")    green1.switch()             #切换到master函数    print("子程序再次执行...")green1 = greenlet(master)           #启动一个协程green2 = greenlet(slaves)           #启动一个协程green1.switch()                     #切换到master函数

协程之(Gevent): Gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,它是以C扩展模块形式接入Python的轻量级协程.

import geventdef func1():    print("函数 func1 开始...")    gevent.sleep(3)    print("函数 func1 结束...")def func2():    print("函数 func2 开始...")    gevent.sleep(1)    print("函数 func2 结束...")def func3():    print("函数 func3 开始...")    gevent.sleep(0)    print("函数 func3 结束...")gevent.joinall([    gevent.spawn(func1),#切换协程    gevent.spawn(func2),gevent.spawn(func3),])

协程实现爬虫: 通过使用Gevent模块,实现批量爬取指定页面并返回页面的大小.

from gevent import monkeymonkey.patch_all()           #把当前程序所有IO *** 作给我单独做上标记,打补丁import geventfrom urllib.request import urlopendef func(url):    print("获取页面: %s" %url)    resp = urlopen(url)    data = resp.read()    print("%s URL大小为= %d bytes" %(url,len(data)))gevent.joinall([    gevent.spawn(func,'https://www.python.org/'),gevent.spawn(func,'https://www.yahoo.com/'),'https://github.com/'),])

并发Socket(服务端): 在单线程下实现多Socket并发,服务端代码如下.

import sysimport socketimport timeimport geventfrom gevent import socket,monkeymonkey.patch_all()def server(port):    s = socket.socket()    s.bind(('0.0.0.0',port))    s.Listen(500)    while True:        cli,addr = s.accept()        gevent.spawn(handle_request,cli)def handle_request(conn):    try:        while True:            data = conn.recv(1024)            print("接收数据:",data)            conn.send(data)            if not data:                conn.shutdown(socket.SHUT_WR)    except Exception as  ex:        print(ex)    finally:        conn.close()if __name__ == '__main__':    server(8001)

并发Socket(客户端): 在单线程下实现多Socket并发,客户端代码如下.

import socketHOST = 'localhost'PORT = 8001s = socket.socket(socket.AF_INET,socket.soCK_STREAM)s.connect((HOST,PORT))while True:    msg = bytes(input("输入发送的数据:"),enCoding="utf8")    s.sendall(msg)    data = s.recv(1024)    print('返回数据',repr(data))s.close()

## Python 与队列

同步队列 Queue 这是一个专门为多线程访问所设计的数据结构,能够有效地实现线程对资源的访问,程序可以通过此结构在线程间安全有效地传递数据 Queue 模块中包含一个 Queue 的类,其构造函数中可以指定一个Maxsize值,当maxszIE值小于或等于0的时候,表示对队列的长度没有限制,当大于0的时候,则指定了队列的长度.当队列到达最大长度而又有新的线程过来的时候,则需要等待 Queue 类中有不少方法,但是最市要的是 put 和 get 方法,Put 方法将需要完成的任务放入队列,而 get 方法相反,从队列中获取任务,需要注意的是,在这些方法中,有些方法由于多线程的原因,返回值并不一定是准确的,例如qsize,empty等函数的统计结果.

先进先出队列: 先来介绍简单的队列例子,以及队列的常用方法的使用,此队列是先进先出模式.

import queueq = queue.Queue(5)                    #默认maxsize=0无限接收,最大支持的个数print(q.empty())                      #查看队列是否为空,如果为空则返回Trueq.put(1)                              #PUT方法是,向队列中添加数据q.put(2)                              #第二个PUT,第二次向队列中添加数据q.put(3,block=False,timeout=2)        #是否阻塞:默认是阻塞block=True,timeout=超时时间print(q.full())                       #查看队列是否已经放满print(q.qsize())                      #队列中有多少个元素print(q.maxsize)                      #队列最大支持的个数print(q.get(block=False,timeout=2))   #GET取数据print(q.get())                        q.task_done()       #join配合task_done,队列中有任务就会阻塞进程,当队列中的任务执行完毕之后,不在阻塞print(q.get())q.task_done()q.join()            #队列中还有元素的话,程序就不会结束程序,只有元素被取完配合task_done执行,程序才会结束

后进先出队列: 这个队列则是,后进先出,也就是最后放入的数据最先d出,类似于堆栈.

>>> import queue>>>>>> q = queue.lifoQueue()>>>>>> q.put("wang")>>> q.put("rui")>>> q.put("ni")>>> q.put("hao")>>>>>> print(q.get())hao>>> print(q.get())ni>>> print(q.get())rui>>> print(q.get())wang>>> print(q.get())

优先级队列: 此类队列,可以指定优先级顺序,默认从高到低排列,以此根据优先级d出数据.

>>> import queue>>>>>> q = queue.PriorityQueue()>>>>>> q.put((1,"python1"))>>> q.put((-1,"python2"))>>> q.put((10,"python3"))>>> q.put((4,"python4"))>>> q.put((98,"python5"))>>>>>> print(q.get())(-1,'python2')>>> print(q.get())(1,'python1')>>> print(q.get())(4,'python4')>>> print(q.get())(10,'python3')>>> print(q.get())(98,'python5')

双向的队列: 双向队列,也就是说可以分别从两边d出数据,没有任何限制.

>>> import queue>>>>>> q = queue.deque()>>>>>> q.append(1)>>> q.append(2)>>> q.append(3)>>> q.append(4)>>> q.append(5)>>>>>> q.appendleft(6)>>>>>> print(q.pop())5>>> print(q.pop())4>>> print(q.popleft())6>>> print(q.popleft())1>>> print(q.popleft())2

生产者消费者模型: 生产者消费者模型,是各种开发场景中最常用的开发模式,以下是模拟的模型.

import queueimport threadingimport timeq = queue.Queue()def productor(arg):    while True:        q.put(str(arg))        print("%s 号窗口有票...."%str(arg))        time.sleep(1)def consumer(arg):    while True:        print("第 %s 人取 %s 号窗口票"%(str(arg),q.get()))        time.sleep(1)for i in range(10):                     #负责生产票数    t = threading.Thread(target=productor,))    t.start()for j in range(5):                      #负责取票,两个用户取票    t = threading.Thread(target=consumer,args=(j,))    t1 = threading.Thread(target=consumer,))    t.start()    t1.start()
总结

以上是内存溢出为你收集整理的Python 线程&进程与协程全部内容,希望文章能够帮你解决Python 线程&进程与协程所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/langs/1158013.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-01
下一篇 2022-06-01

发表评论

登录后才能评论

评论列表(0条)

保存