乔拓云建站平台,没有公司可以注册网站吗,网站怎么做文本跳出来,wordpress下载的主题如何安装目录
一.进程
二.线程
三.Python标准库中并行处理的相关模块
Threading模块
#xff08;1#xff09;使用Thread对象创建线程
#xff08;2#xff09;自定义派生于Thread的对象
#xff08;3#xff09;线程加入join() (4)用户线程和daemon线程
(5)Timer线程
线…目录
一.进程
二.线程
三.Python标准库中并行处理的相关模块
Threading模块
1使用Thread对象创建线程
2自定义派生于Thread的对象
3线程加入join() (4)用户线程和daemon线程
(5)Timer线程
线程同步
Lock/RLock对象
Condition对象
基于queue模块中队列的同步
基于Event的同步和通信:
基于进程的并行计算: 一.进程
进程是操作系统中正在执行的应用程序的一个实例每一个进程有自己的地址空间一般包括文本区域、数据区域和堆栈区域。
•文本区域处理器执行的代码
•数据区域变量和进程执行期间使用的动态分配的内存
•堆栈区域活动过程调用的指令和本地变量 二.线程
线程是进程中的一个实体是CPU独立调度和分派处理器时间的基本单位。
线程由线程ID、当前指令指针、寄存器集合和堆栈组成线程与同属一个进程的其他线程共享进程所拥有的全部资源。
线程使程序能够执行并发处理例如
•一个线程执行后台计算任务另一线程监视输入
•高优先级线程管理关键任务
•服务器创建线程池处理并发客户端请求
一个进程包含多个线程。多个线程相对独立线程有自己的上下文切换受系统控制。 三.Python标准库中并行处理的相关模块
- _thread和_dummy_thread模块底层低级线程API。
- threading模块线程及其同步处理。
- multiprocessing模块多进程处理和进程池。
- concurrent.futures模块启动并行任务。
- queue模块线程安全队列用于线程或进程间信息传递。
- asyncio模块异步IO、事件循环和任务处理。
Threading模块
这里重点看threading模块threading模块包括创建线程启动线程和线程同步
- threading.Thread对象实例化可创建线程
- Thread对象的start()可启动线程。
- 创建Thread的派生类重写run()其对象实例也可创建线程。
- 设置线程对象的daemon属性线程可设为用户线程或daemon线程
1使用Thread对象创建线程
threading模块封装了_thread模块虽然可使用_thread模块中的start_new_thread()函数创建线程但一般建议使用threading模块 Thread(targetNone, nameNone,args(),kwargs{}) Thread是线程运行函数 name是线程的名称 args和kwargs是传递给target的参数元组和命名参数字典。 t1 threading.Thread(targettimer, args(5,))
t1.start() #启动线程
t1.is_alive() #判断线程是否活动
t1.name #属性线程名
t1.id #线程标识符
threading.get_ident() #返回当前线程的标识符
threading.current_thread() #返回当前线程
threading.active_count() #返回活动的线程数目
threading.enumerate() #返回线程的列表
#创建和启动新线程
import threading, time, random
def timer(interval):for i in range(3):time.sleep(random.choice(range(interval))) #随机睡眠interval秒thread_id threading.get_ident() #获取当前线程标识符print(Thread:{0} Time:{1}.format(thread_id, time.ctime()))
if __name____main__:t1 threading.Thread(targettimer, args(5,)) #创建线程t2 threading.Thread(targettimer, args(3,)) #创建线程t1.start(); t2.start() #启动线程2自定义派生于Thread的对象
声明Thread的派生类重写对象的run方法创建对象实例通过对象的start方法可启动线程并自动执行对象的run方法例如如下操作
import threading, time, random
class MyThread(threading.Thread): #继承threading.Threaddef __init__(self, interval): #构造函数threading.Thread.__init__(self) #调用父类构造函数self.interval interval #对象属性def run(self): #定义run方法for i in range(3):time.sleep(random.choice(range(self.interval))) #随机睡眠interval秒thread_id threading.get_ident() #获取当前线程标识符print(fThread:{thread_id}, time:{time.ctime()})if __name__ __main__:t1 MyThread(5) #创建对象t2 MyThread(3)t1.start() #启动线程t2.start()3线程加入join()
所谓线程加入t.join()即让包含t.join()的线程tc即当前线程“加入”到另外一个线程t的尾部。在线程t执行完毕之前线程tc不能执行 join(timeoutNone) timeout是超时参数单位为秒。如果指定了超时则线程t执行完毕或者超时都可能使当前线程继续此时可通过t.is_alive()来判断线程是否终止。 注意线程不能加入自己否则导致RuntimeError,因为这样将导致死锁。线程也不能加入未启动的线程否则将导致RuntimeError。 import threading, time, random
class MyThread(threading.Thread): #继承threading.Threaddef __init__(self): #构造函数threading.Thread.__init__(self) #调用父类构造函数def run(self): #定义run方法for i in range(5):time.sleep(1) #睡眠1秒t threading.current_thread() #获取当前线程print(f{t.name} at {time.ctime()})#打印线程名、当前时间print(线程t1结束)def test():t1 MyThread() #创建线程对象t1.name t1 #设置线程名称t1.start() #启动线程print(主线程开始等待线程(t1));t1.join(2) #主线程阻塞等待t1结束或超时print(主线程等待线程(t1)2s结束)print(主线程开始等待线程结束);t1.join() #主线程阻塞等待t1结束print(主线程结束)
if __name____main__:test()(4)用户线程和daemon线程 线程可以分为用户线程和daemon线程: - 用户线程非daemon线程是通常意义的线程应用程序运行即为主线程在主线程中可以创建和启动新线程默认为用户线程。只有当所有的非daemon的用户线程包括主线程结束后应用程序终止。 - daemon线程又称守护线程其优先级最低一般为其它的线程提供服务。通常daemon线程是一个无限循环的程序。所有非daemon线程都结束了则daemon线程自动终止。 - t.daemon True import threading, time
class MyThread(threading.Thread): #继承threading.Threaddef __init__(self, interval): #构造函数threading.Thread.__init__(self) #调用父类构造函数self.interval interval #对象属性def run(self): #定义run方法t threading.current_thread() #获取当前线程print(线程 t.name 开始)time.sleep(self.interval) #延迟self.interval秒print(线程 t.name 结束)
class MyThreadDaemon(threading.Thread): #继承threading.Threaddef __init__(self, interval): #构造函数threading.Thread.__init__(self) #调用父类构造函数self.interval interval #对象属性def run(self): #定义run方法t threading.current_thread() #获取当前线程print(线程 t.name 开始)while True:time.sleep(self.interval) #延迟self.interval秒print(daemon线程 t.name 正在运行)print(线程 t.name 结束)
def test():print(主线程开始) t1 MyThread(5) #创建线程对象t2 MyThreadDaemon(1) #创建线程对象t1.name t1; t2.name t2 #设置线程名称t2.daemon True #设置为daemont1.start() #启动线程t2.start()print(主线程结束)
if __name____main__:test()(5)Timer线程
在实际应用中经常使用定时器触发一些事件,而 threading中的Timer线程Thread的子类可以很方便实现定时器功能。 Timer(interval,function,argsNone,kwargsNone) #构造函数在指定时间interval后执行函数function start() #启动线程即启动计时器 cancel() #取消计时器 import threading
def f():print(Hello Timer!) #创建定时器1秒后运行timer threading.Timer(1, f)timer.start()
timer threading.Timer(1, f) #创建定时器1秒后运行
timer.start()
线程同步
当多个线程调用单个对象的属性和方法时一个线程可能会中断另一个线程正在执行的任务使该对象处于一种无效状态(被占用)因此必须针对这些调用进行同步处理
多种线程同步处理解决方案Lock/RLock对象、Condition对象、Semaphore对象、Event对象
Lock/RLock对象
- threading模块的Lock对象锁可实现线程简单同步。
- threading模块的RLock是可重入的同步锁。如果一个线程在持有锁的情况下再次调用acquire()方法那么它将会被阻塞因为锁已经被这个线程所持有导致死锁的发生。可重入锁的特点是允许同一个线程多次调用acquire()来获取锁而不会发生死锁。
注每次调用acquire()都必须对应一次release()来释放锁否则其他线程将无法再次获取该锁。 acquire() #获取锁锁会被系统变为blocked状态 release() #释放锁锁进入unlocked状态blocked状态的线程会受到一个通知并有权利获得锁。多个线程处于blocked状态系统会选择一个线程来获得锁具体哪个线程获得与实现有关 import threading
lock threading.Lock()
lock.acquire()
#do something...
lock.release()Lock对象支持with语句
lock threading.Lock()
with lock:#do something...示例
创建工作线程模拟银行现金帐户取款。多个线程同时执行取款操作时如果不使用同步处理会造成账户余额混乱;尝试使用同步锁对象Lock以保证多个线程同时执行取款操作时银行现金帐户取款的有效和一致
import threading, time, random
class Account(threading.Thread): #继承threading.Threadlock threading.Lock() #创建锁def __init__(self, amount): #构造函数threading.Thread.__init__(self) #调用父类构造函数Account.amount amount #账户金额类变量def run(self): #定义run方法self.withdraw() #取款def withdraw(self):Account.lock.acquire() #获取锁。注释不使用同步处理t threading.current_thread()a random.choice(range(50,101))if Account.amount a:print(f{t.name}交易失败。取款前余额{Account.amount}取款额{a})Account.lock.release() #释放锁return 0 #拒绝交易time.sleep(random.choice(range(5))) #随机睡眠[0-5)秒prev Account.amountAccount.amount - a #取款print(f{t.name}取款前余额{prev}, 取款额{a}, 取款后额{Account.amount})Account.lock.release() #释放锁。注释不使用同步处理
def test():for i in range(5): #创建5个线程对象并启动Account(200).start()
if __name____main__:test()
Condition对象
线程A获得同步锁lock,在执行同步代码时需要某资源而该资源由线程B提供。线程B无法获取线程A占用的同步锁lock,故无法提供线程A资源从而导致死锁。
- 解决死锁问题可使用基于条件变量(condition)的线程间通信的通信机制
1.线程A获得同步锁lock,在执行同步代码时需要等待线程B占用的资源则可以调用wait()/wait(毫秒数)方法阻塞当前线程运行并释放其占用的同步锁lock;
2.notify()方法通知等待同步锁lock的线程B线程B获取同步锁lock后执行资源生产或者释放资源操作然后释放同步锁并调用notify()通知线程A
3.线程A获取同步锁lock继续执行 cv threading.Condition(lockNone) #条件变量对象关联一个锁lock。创建时可传入参数lock没有传入则默认自动创建一个。 支持with语句等价于cv.acquire(和cv.release() with cv: 同步操作 wait()/wait(timeout) #释放锁并阻塞当前线程直到其他线程使用notify()和notifyAll()唤醒后重新获取锁 notify() #唤醒一个等待线程 notifyAll() #唤醒所有线程 import threading, time, random
class Container1(): #基于同步和通信def __init__(self): #构造函数self.contents 0 #容器内容self.available False #容器内容self.cv threading.Condition() #条件变量def put(self, value): #生产函数with self.cv: #使用条件变量同步if self.available: #如果已经生产过则等待self.cv.wait() #等待self.contents value #生产设置内容t threading.current_thread()print(f{t.name}生产{self.contents})self.available True #设置容器状态已生产self.cv.notify() #通知等待的消费者def get(self): #消费函数with self.cv: #使用条件变量同步if not self.available: #如果未生产则等待self.cv.wait() #等待t threading.current_thread()print(f{t.name}消费{self.contents})self.available False #设置容器状态未生产self.cv.notify() #通知等待的生产者
class Container2(): #无同步和通信def __init__(self): #构造函数self.contents 0 #容器内容self.available False #容器内容def put(self, value): #生产函数if self.available: #如果已经生产passelse:self.contents value #生产设置内容t threading.current_thread()print(f{t.name}生产{self.contents})self.available True #设置容器状态已生产def get(self): # 消费函数if not self.available: # 如果未生产不操作passelse:t threading.current_thread()print(f{t.name}消费{self.contents})self.available False # 设置容器状态未生产class Producer(threading.Thread): # 生产者类def __init__(self, container): # 构造函数threading.Thread.__init__(self) # 调用父类构造函数self.container container # 容器def run(self): # 定义run方法for i in range(1, 6):time.sleep(random.choice(range(2))) # 随机睡眠[0-5)秒self.container.put(i) # 生产class Consumer(threading.Thread): #消费者类def __init__(self, container): #构造函数threading.Thread.__init__(self) #调用父类构造函数self.container container #容器def run(self): #定义run方法for i in range(1,6):time.sleep(random.choice(range(2)))#随机睡眠[0-5)秒self.container.get() #消费def test1():print(基本同步和通信的生产者消费者模型)container Container1() #创建容器Producer(container).start() #创建消费者线程并启动Consumer(container).start() #创建消费者线程并启动
def test2():print(无同步和通信的生产者消费者模型)container Container2() #创建容器Producer (container).start() #创建消费者线程并启动Consumer(container).start() #创建消费者线程并启动
if __name____main__:test1()基于queue模块中队列的同步
- 模块queue提供了适用于多线程编程的先进先出的数据结构即队列用来在生产者和消费者线程之间的信息传递。使用queue模块中的线程安全的队列可以快捷实现生产者和消费者模型
- Queue模块中包含三种线程安全的队列Queue、LifoQueue和PriorityQueue。以Queue为例其主要方法包括 1Queue(maxsize0)构造函数构造指定大小的队列。默认不限定大小 2put(item, blockTrue, timeoutNone)向队列中添加一个项。默认阻塞即队列满的时候程序阻塞等待 3get(blockTrue, timeoutNone)从队列中拿出一个项。默认阻塞即队列为空的时候程序阻塞等待 #基于queue.Queue的生产者和消费者案例
import time
import queue
import threading
q queue.Queue(10) #创建一个大小为10的队列
def productor(i):while True:time.sleep(1) #休眠1秒钟即每秒钟做一个包子q.put(厨师{}做的包子.format(i)) #如果队列满则等待
def consumer(j):while True:print(顾客{}吃了一个{}\n.format(j, q.get()),end’’) #如果队列空则等待time.sleep(1) #休眠1秒钟即每秒钟吃一个包子
for i in range(3): #3个厨师不停做包子t threading.Thread(targetproductor, args(i,))t.start()
for k in range(10): #10个顾客等待吃包子v threading.Thread(targetconsumer, args(k,))v.start()基于Event的同步和通信:
- Event相当于红绿灯信号可用于主线程控制其他线程的执行。当flag为False时其他的线程调用e.wait()阻塞等待这个信号当设置flag为True时等待的线程解除阻塞继续执行。
- Event对象主要包括下列方法 1wait([timeout])阻塞等待直到Event对象的flag为True或超时 2set()将flag设置为True 3clear()将flag设置为False 4isSet()判断flag是否为True import threading
import random
def f(i, e):e.wait() #检测Event的标志如果是False则阻塞print(f线程{i}的随机结果为{random.randrange(1,100)})
if __name__ __main__:event threading.Event() #创建事件对象默认标志为Falsefor i in range(3): #创建3个线程并运行默认阻塞等待Eventt threading.Thread(targetf, args(i, event))t.start()ready input(请输入1开始继续执行阻塞的线程)if ready 1:event.set() #设置Event的flag为True基于进程的并行计算:
- multiprocessing模块提供了与进程相关的操作创建进程、启动进程、进程同步等
- 模块multiprocessing还提供进程池和线程池
- multiprocessing模块的API与threading.Thread类似大大减少其使用复杂度
注意windows平台中所有与进程相关的代码必须放置在
if __name__ ‘__main__’:之中
创建进程有两种方法即创建Process的实例对象创建Process的子类 Process(targetNone, nameNone,args(),kwargs{}) target是进程运行的函数 name是进程的名称 args和kwargs是传递给target的参数元组和命名参数字典 t.start() #启动线程 t.is_alive() #判断线程是否活动 t.join() #进程加入 terminate() #终止进程 t.name #进程名 t.pid #进程id t.daemon #设置进程为用户进程(False)或Daemon进程(True) cpu_count() #可用的CPU核数 current_process() #返回当前进程 active_children() #活动的子进程 log_to_stderr() #设置输出日志信息到标准错误输出(控制台) #使用Process对象创建和启动新进程
import time, random
import multiprocessing as mp
def timer(interval):for i in range(3):time.sleep(random.choice(range(interval))) #随机睡眠interval秒pid mp.current_process().pid #获取当前进程IDprint(Process:{0} Time:{1}.format(pid, time.ctime()))
if __name____main__:p1 mp.Process(targettimer, args(5,)) #创建进程p2 mp.Process(targettimer, args(5,)) #创建进程p1.start() #启动线程p2.start()p1.join()p2.join()
模块multiprocessing为进程间提供了两种通信方法Queue和Pipe
- 模块multiprocessing中的Queue类似于queue.Queue为进程间通信提供了一个线程和进程安全的队列
- 模块multiprocessing中的Pipe()返回一个管道包括两个连接对象两个进程可以分别连接到不同的端的连接对象然后通过其send()方法发送数据或者通过recv()方法接收数据。
基于Queue队列的生产者和消费者模型
import time
import multiprocessing as mp
def productor(i, q):while True:time.sleep(1) #休眠1秒钟即每秒钟做一个包子q.put(厨师{}做的包子.format(i)) #如果队列满则等待
def consumer(j, q):while True:print(顾客{}吃了一个{} .format(j, q.get())) #如果队列空则等待time.sleep(1) #休眠1秒钟即每秒钟吃一个包子
if __name____main__:q mp.Queue(10) #创建一个大小为10的队列for i in range(3): #3个厨师不停做包子p mp.Process(targetproductor, args(i,q))p.start()for k in range(10): #10个顾客等待吃包子p mp.Process(targetconsumer, args(k,q))p.start()基于Pipe的进程间通信
import multiprocessing as mp
import time, random, itertools
def consumer(conn):#从管道读取数据while True:try:item conn.recv()time.sleep(random.randrange(2)) #随机休眠代表处理过程print(consume:{}.format(item))except EOFError:break
def producer(conn):#生产项目并将其发送到连接的管道上for i in itertools.count(1): #从1开始无限循环time.sleep(random.randrange(2)) #随机休眠,代表处理过程conn.send(i)print(produce:{}.format(i))
if __name____main__:# 创建管道返回两个连接对象的元组conn_out, conn_in mp.Pipe()# 创建并启动生产者进程传入参数管道一端的连接对象p_producer mp.Process(targetproducer, args(conn_out,))p_producer.start()# 创建并启动消费者进程传入参数管道另一端的连接对象p_consumer mp.Process(targetconsumer, args(conn_in,))p_consumer.start()#加入进程等待完成p_producer.join(); p_consumer.join()