学网站开发要什么基础,瓯海建设网站,把网站做成手机版,自己如何做网站优化原地址#xff1a;https://flyingbyte.cc/post/async_io/
python异步IO完全指南 做为一种并行编程的範式#xff0c;异步IO在Python中非常受重视#xff0c;从Python3.4到3.7快速演进。 我们已经有多线程#xff0c;多进程#xff0c;并发#xff08;concurrency#x…原地址https://flyingbyte.cc/post/async_io/
python异步IO完全指南 做为一种并行编程的範式异步IO在Python中非常受重视从Python3.4到3.7快速演进。 我们已经有多线程多进程并发concurrency并行这么多技术来处理并行编程异步IO和这些技术相比有哪些新的特性会带来哪些好处呢 本文将解答这些疑问。通过阅读本文您将了解
异步IO: 一种语言无关的编程範式已经为多种编程语言所支持async/await: 两个Python新引入的定义coroutine的关键词asyncio: 支持运行管理coroutine的Python库
Coroutines, 是Python生成器generator的一种它是Python支持异步IO的核心和基础我们在本文中会深入了解它。 术语表: 在本文中我将使用如下术语 异步IO 语言无关的异步IO范式 asyncio 特定的Python库 并发 concurrency,一种并行编程的技术 并行 parallelism 生成器 generator 迭代器 iterator 解析式 comprehensions 上下文管理器 context manager 迭代 iteration
事件循环 event loop,处理所有事件的循环 在开始我们的学习之旅前首先要建立一个独立的Python环境安装本文中所需要的Python库。
建立Python环境
本文中的Python示例代码要求Python 3.7以上的版本以及aiohttp和aiofiles两个库。
python3.7 -m venv ./py37async
source ./py37async/bin/activate
pip install --upgrade pip aiohttp aiofiles安装完成了么现在开始Python异步IO之旅let’s rock!
异步IO概述
Async IO is a bit lesser known than its tried-and-true cousins, multiprocessing and threading. This section will give you a fuller picture of what async IO is and how it fits into its surrounding landscape. 和其他并行编程技术如多线程、多进程相比人们对异步IO的了解要少得多。本小节希望能让你对它有一个全面的了解。
异步IO的背景
并发和并行是非常庞大的题目涉及非常多的内容经常使人感到迷惑。虽然本文主要聚焦在异步IO和它在Python中的实现但为了让读者能有对相关知识有更全面的认识花一些时间来比较异步IO与其他技术的异同。
并行是指在同一时间执行多个操作。多进程是实现并行的一种方式计算机的中央处理器负责同时处理多个任务。多进程适合CPU密集型任务比如数学计算等。
并发的概念比并行要更宽泛它是指多个任务能够以一种重叠的方式来运行请注意这里的差别并发不等于并行。
线程属于并发执行模式多个线程被依次调度执行。几个进程可以包含多个线程。因为GIL的存在Python与线程的关系很复杂本文就不赘述了有兴趣的读者可以参考https://realpython.com/python-gil/
线程模式适用于IO密集型的任务。CPU密集型任务的特点是计算机的CPU执行长期的很少被打断的任务而IO密集型任务会包含大量的等待IO完成。
总结来说并发既包括多进程适用于计算密集型任务也包括多线程适用于IO密集型任务。多进程是并行模型的一种并行模型是并发模型的一个子集。Python的标准库对这两种模型都支持。
现在我们要为这个家族增加一个新的成员了。在过去的几年中一种新的并发范式异步IO,通过标准库asyncio和新的关键词async/await被引入了Python。异步IO并不是新发明的概念在被引入Python以前它就存在于其他一些语言中了比如Go,C#和Scala。
Python的文档将asyncio定义为实现并发代码的库。然而异步IO并不是多线程也不是多进程。它和这两者都没有任何关系。
实际上异步IO是单进程单线程的实现它使用了协作多任务模式如果你现在还不熟悉这个名词也没有关系等读完本文你就会对这以术语有更深刻的认识了。也就是说异步IO在单线程单进程的环境下给人一种并发处理多任务的感觉。coroutine(协同程序异步IO的核心)可以被并发调度但是它们本质上并不是并行执行的。
也就是说异步IO是一种并发编程的范式多任务可以重叠执行单并不是并行的多任务同时执行。和多进程相比它表现的更像是多线程但实际上和二者都完全不同。多进程多线程和异步IO是并发编程的三种不同方式。
你在其他地方应该也听到过“异步”这个词为了能更好的让你理解这个概念的含义这里给出它的两个性质
1. 异步的程序在等待某长时间执行的调用返回结果时可以暂停自己以便CPU可以利用这段时间运行其他的程序。
2. 通过上述机制异步代码可以并发执行。也就是说异步代码可以有并发执行的效果。下面这张图完整呈现了它们之间的关系。 Here’s a diagram to put it all together. The white terms represent concepts, and the green terms represent ways in which they are implemented or effected:
并发与并行
我对并行编程模型的讨论就停止在这里。本文的主旨是关注在异步 IO如何使用它以及相关的快速变化的 API。如果你希望对线程多进程以及异步 IO 三种模型之间的不同可以阅读 Jim Anderson 关于 Python 中并发的详细解释。
异步 IO 的解释 异步 IO 看上既去反直觉还自相矛盾。并发的代码怎么能在单线程和一个 CPU 上运行Miguel Gringerg 在 2017 PyCon 上清晰明膫的解释了一切在这里我会引用他的部分发言
象棋大师 Judit Polgár 举行一场一对多的比赛。她有两种方式来进行这场比赛同步的和异步的。假设
有24名对手
Judit 每下一步棋需要5秒钟
每个对手需要55秒钟下一步棋
每一局棋平均需要60步每人下30步同步的版本Judit 一次只和一个人下棋直到这局比赛结束。每局比赛需要的事件是 (55 5) * 3 1800 秒也就是30分钟。整个比赛持续的事件就是 24 * 30 720 分钟也就是12小时。异步的版本Judit 从一张桌子移动到另一张桌子每次下一步棋。每下完一步她就离开桌子让她的对手在等待她回到桌子前的这段事件思考并下完自己那一步。Judit 完成一个循环需要的事件是 25 * 5 120 秒也就是2分钟。这段时间足够每个对手完成自己的棋步。整个比赛占用的时间被压缩到了 120 * 30 3600秒也就是一个小时。只有一个 Judit Polár她只有两双手每次只能下一步棋。但是使用异步的方法能够将正常比赛的时间从12小时减少到1个小时。所以协同工作的多任务是一个很奇妙的方法通过这种方法一个程序的 时间循环和多个任务相互通信保证每一个任务能在恰当的时间以优化的时间运行。
所有的 IO 调用都会占用很长时间异步 IO 和同步 IO 的差别是如果一个函数调用了同步 IO, 那么在等待 IO 返回这段时间函数不会返回其他的函数被阻塞不能运行而调用异步 IO 的函数会 立刻返回允许其他的函数运行当 IO 返回时调用异步 IO 的函数会从刚才中断的地方继续运行。
异步 IO 不是件容易的事 我听到有人说“尽量使用异步 IO; 只有在必要时使用线程。”确实写需要长期运行的多线程代码很困难而且很容易出错而使用异步 IO 可以避免一些多线程架构容易犯的错误。
但这并不是说在 Python 中写异步 IO 程序是件容易的事。小心当你了解到异步 IO 内部的机制时就会发现异步 IO 编程也可能会件困难的事。Python 的异步模型建立在回调函数事件 传输transport协议以及 futures 等概念之上光是这些概念就够吓人的了。而且异步 IO 库的 API 不停变化更增添了困难。
幸运的是asyncio 库已经比较成熟大部分的特性已经不再是实验性的而且相关的文档经过了大量的整修同时现在已经有了一些比较高质量的资料来帮助学习理解它。
asyncio库与关键字async/await
现在你已经对异步IO得概念有了一定的了解我们开始探究Python是如何支持异步IO的。Python 3.4 中引入的asyncio库以及两个新的关键字async/await提供了不同的 功能它们合在一起帮助你声明定义执行和管理异步代码。
async/await 语法以及原生的Coroutines 请注意你从互联网上独到的关于Python异步IO的内容因为Python的异步IO API从3.4到3.7经历了剧烈的变动。一些旧的范式不再使用而一些一开始不支持的功能随着新特性 被引入。包括本文在内大部分你能在网上找到的内容都会很快过时。
异步IO的核心是coroutines。coroutine是一种特别的Python生成器函数。我们从最基础的定义开始我们的学习 coroutine是一种在最后返回前可以暂停其执行过程的函数它 可以间接的把CPU让渡给其它coruotine运行。
我们后面会深入了解传统的生成器函数是如何演进成coroutine。现在让我们先通过一些例子来看看coroutine是如何工作的。
第一个例子是一个异步IO的程序虽然很短但已经展示了异步IO的核心功能
#!/usr/bin/env python3
# countasync.pyimport asyncioasync def count():print(One)await asyncio.sleep(1)print(Two)async def main():await asyncio.gather(count(), count(), count())if __name__ __main__:import times time.perf_counter()asyncio.run(main())elapsed time.perf_counter() - sprint(f{__file__} executed in {elapsed:0.2f} seconds.)当执行这段程序时请注意它的输出与使用’def’以及’time.sleep()‘的同步函数比有何不同
$ python3 countasync.py
One
One
One
Two
Two
Two
countasync.py executed in 1.01 seconds.异步IO的核心是代码的执行顺序。在上面的例子里函数count()被一个单独的事件循环(event loop,或者称为coordinator)所驱动。当每个任务运行到
await asyncio.sleep(1)函数就会暂停自己将执行的权利暂时返还给事件循环同时通知事件循环我需要休眠1秒你可以在这段时间让别的任务执行1秒钟后再调度我。
这段代码的同步版本:
#!/usr/bin/env python3
# countsync.pyimport timedef count():print(One)time.sleep(1)print(Two)def main():for _ in range(3):count()if __name__ __main__:s time.perf_counter()main()elapsed time.perf_counter() - sprint(f{__file__} executed in {elapsed:0.2f} seconds.)执行结果和之前的异步版本比代码的执行顺序有微妙但本质上的差别
$ python3 countsync.py
One
Two
One
Two
One
Two
countsync.py executed in 3.01 seconds.time.sleep() 和 asyncio.sleep() 看上去没有任何意义它们是用来代替任何在时间密集型程序中需要等待的函数调用。最简单的需要等待的调用就是 sleep()只是等待完成什么也不做。两者的差别是time.sleep() 代表了任何阻塞的等待调用而 asyncio.sleep() 代表了也需要一段时间来完成但不阻塞的调用。
你在下一小节会看到awaiting 和 asyncio.sleep() 结合起来使得调用它们的函数能暂时放弃运行而让其它有立刻可以执行的指令的任务被调度执行。相对应的time.sleep() 或其它阻塞调用于 Python 的异步代码不兼容因为它们会在休眠期间阻塞所有其它任务包括事件循环。
异步IO的规则 现在我们可以给出关于async/await和coroutine较为正式的定义。这一小节有一些难以理解但掌握async/await对深入理解异步IO是很重要的你可以先跳过这一小节等需要的时候再回来:
关键词 async def 产生一个原生的coroutine对象或一个异步生成器对象。async for 和 async with 也是合法的表达式后面你会看到它们的使用。coroutine函数里可以包含有关键字await的表达式。当执行到它时会暂停这个coroutine的执行将控制返回给事件循环。比如在g()里包含了await f(),
那么g()必须是一个coroutine, 也就是由async def’来定义的。
当Python执行到这一行时await会通知事件循环暂停对coroutine g() 的执行直到await所等待的函数f()返回。在这段事件可以让其它能立刻执行的任务执行。将这两个定义翻译成代码现在试着理解以下它的内涵。
async def g():# 在这里暂停g()的执行执行其它可被调度的任务等到f()返回后再继续执行g()r await f()return r这里有一些关于什么时候以及应该怎样使用 async/awai 的规则不管你是仍然在熟悉语法还是已经对使用 async/awai 已经有了了解它们对你都会是很有用的。
由 async def 定义的函数执行后返回一个coroutine对象。在coroutine里可以有 await, return或yield, 但它们都不是必须的。async def noop():pass也是合法的定义。
A function that you introduce with async def is a coroutine. It may use await, return, or yield, but all of these are optional. Declaring async def noop(): pass is valid:Using await and/or return creates a coroutine function. To call a coroutine function, you must await it to get its results.It is less common (and only recently legal in Python) to use yield in an async def block. This creates an asynchronous generator, which you iterate over with async for. Forget about async generators for the time being and focus on getting down the syntax for coroutine functions, which use await and/or return.couroutine 里不能使用 yield from。如同在函数体之外使用yield会产生语法错误一样在由async def定义的coroutine之外使用 await 也会产生错误SyntaxError: await outside async function.下面几个代码片段体现了上面的规则
async def f(x):y await z(x) # OK - await 和 return 都可以出现在coroutines里return yasync def g(x):yield x # OK - 使用yield表明这是一个异步生成器async def m(x):yield from gen(x) # No - couroutine里不能使用 yield fromdef m(x):y await z(x) # No - await必须包含在一个coroutine内return y最后当你使用 await f() 时要求 f() 必须是一个 awaitable 的对象。听起来没有什么营养是吧。现在你只需要直到一个 awaitable 对象是以下二者之一
1. 一个 coroutine
2. 一个定义了一个返回值是一个迭代器名字是 .__await()的成员函数的对象。基本上你在写代码时只需要关注第一种。
await 的对象其实是一个 coroutine回想一下除了用 async 来定义 coroutine, 我们还可以用 asyncio.coroutine 修饰一个普通函数的方法来定义一个 coroutine。通过这种方法我们得到 的是一个生成器类型的 coroutine。自从 async/await 语法从 Python 3.5 被引入之后这种方法已经过时了。
这两种方法本质上是等价的都是创建了一个 awaitable 对象但是第一种是基于生成器的而第二种是原生的 coroutine:
import asyncioasyncio.coroutine
def py34_coro():基于生成器的 coroutine, 旧式语法yield from stuff()async def py35_coro():原生 coroutine, 现代语法await stuff()在写代码时请优先使用原生的 coroutine因为它显示的定义了 coroutine而基于生成器的语法是隐式的定义 coroutine并且将被从 Python 3.10 后删除。
在本文的后续章节我们只会因为技术讨论的目的涉及基于生成器的 coroutine。之所以引入 async/await 语法目的是使 coroutine 成为 Python 中单独的一个特性而不要和生成器混淆起来。
不要纠结在基于生成器的 coroutine 问题上这种用法已经过时而且其规则与 async/await 语法不兼容。
在讨论下一个话题前我们再看几个示例程序。
下面这个程序演示了异步 IO 如何减少等待的时间makerandome() 是一个 coroutine, 它会循环的生成从0到10之间的随机整数直到生成的随机数大于每一个阈值每次生成随机数后这个 coroutine 都会休眠一段时间。我们希望能运行多个 coroutine但不必等待一个 coroutine 结束另一个 coroutine 就能运行。这段代码大体上遵循了前面两个程序的模式只是有轻微的改动
#!/usr/bin/env python3
# rand.pyimport asyncio
import random# ANSI colors
c (\033[0m, # End of color\033[36m, # Cyan\033[91m, # Red\033[35m, # Magenta
)async def makerandom(idx: int, threshold: int 6) - int:print(c[idx 1] fInitiated makerandom({idx}).)i random.randint(0, 10)while i threshold:print(c[idx 1] fmakerandom({idx}) {i} too low; retrying.)await asyncio.sleep(idx 1)i random.randint(0, 10)print(c[idx 1] f--- Finished: makerandom({idx}) {i} c[0])return iasync def main():res await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))return resif __name__ __main__:random.seed(444)r1, r2, r3 asyncio.run(main())print()print(fr1: {r1}, r2: {r2}, r3: {r3})用不同颜色区分的输出能比我的描述更清晰的展示各个任务运行的模式
rand.py program execution
rand.py execution这段程序定义了一个主的 coroutine: makerandom()然后用不同的输入参数调用了它三次。这里的模式是非常有代表性的定义多个小的模块化的 coroutine然后用一个主 coroutine 来将这小的 coroutine 串行起来运行。main() 函数通过映射一个迭代器或者 pool, 来多次调用这个主的 coroutine之后在 main() 函数里收集这些 courtine 的执行结果。
在这个小程序里用的 pool 是 range(3)后面我们会看到一个更完整的程序那个程序会遍历一个 URL 的集合在 main() 函数里通过主 coroutine 来实现对这个集合中的 URL 建立连接发送请求 解析回应, 对这些 URL 的处理都是并发的。
回到现在的这个程序我们用 asyncio.sleep() 来模拟一个 IO 密集型的程序如何等待 IO 操作完成比如像是即时通信软件中两个客户端彼此发送并接收信息那么发送或接收的时候函数就必须 要等待对方的回应IO 操作才会返回。
异步IO设计模式 本小节开始介绍异步IO特有的设计模式。
串联调用Coroutines 如我们前介绍的coroutine是一个awaitable对象可以被另一个coroutine通过await调用coroutine - await - coroutine。通过这种方式我们可以把程序分解成多个串联 的小的可管理的能循环调用的调用链。
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# chained.pyimport asyncio
import random
import time# ANSI colors
c {end : \033[0m, # End of color张三 : \033[36m, # Cyan李四 : \033[91m, # Red王五 : \033[35m, # Magenta
}async def part1(n: str) - str:i random.randint(0, 10)print(c[n] f({n}) 开始做{i}个面包.)await asyncio.sleep(i)print(c[n]f({n}) 做好了{i}个面包.)return iasync def part2(n: str, arg: int) - str:i random.randint(0, 10)print(c[n]f({n}) 开始吃面包.)await asyncio.sleep(i)print(c[n]f({n}) 花了{i}分钟吃了{arg}个面包.c[end])return iasync def chain(n: str) - None:start time.perf_counter()p1 await part1(n)p2 await part2(n, p1)end time.perf_counter() - startprint(f--{n}的工作 做面包用了{p1}分钟, 吃面包用了{p2}分钟.)async def main(*args):await asyncio.gather(*(chain(n) for n in args))if __name__ __main__:import sysrandom.seed(12)args [张三, 李四, 王五]start time.perf_counter()asyncio.run(main(*args))end time.perf_counter() - startprint(fProgram finished in {end:0.2f} seconds.)part1()和part()2都是coroutine, 它们都被coroutine chain()串行调用而各自又串行调用了coroutine asyncio.sleep()。part1执行到’await asyncio.sleep(i)‘时 会暂停并从coroutine part1()返回接着串行调用part1()的chain()也返回将控制权交还给事件循环事件循环调度下一个可执行的任务。当休眠结束part1() 从上次暂停的地方被再次调度执行并返回chain() part2()开始执行。
$python3 ./chained.py
(张三) 开始做7个面包.
(李四) 开始做4个面包.
(王五) 开始做10个面包.
(李四) 做好了4个面包.
(李四) 开始吃面包.
(张三) 做好了7个面包.
(张三) 开始吃面包.
(王五) 做好了10个面包.
(王五) 开始吃面包.
(李四) 花了8分钟吃了4个面包.
--李四的工作 做面包用了4分钟, 吃面包用了8分钟.
(王五) 花了5分钟吃了10个面包.
--王五的工作 做面包用了10分钟, 吃面包用了5分钟.
(张三) 花了10分钟吃了7个面包.
--张三的工作 做面包用了7分钟, 吃面包用了10分钟.
Program finished in 17.01 seconds.最后程序运行的总时间等于运行时间最长的那个任务花费的时间。
使用队列 asyncio库提供了一个队列(queue)的数据结构。到目前为止我们的示例程序还没有需要用到这个结构。在chained.py中每个任务都是有一系列串行执行的coroutine组成的。
除了这种方式异步IO还有另一种常用的结构一些互相独立的生产者向队列中放入事务。每个生产者都可能在随机的时间向队列中放入任意多个事项而一旦事项被放入队列就会有一 组消费者马上从队列中读取尽量多的事项来进行处理无需等待任何信号来触发这一动作。
在这种设计模式中消费者和生产者是相互独立的。消费者不知道生产者的数量事先也不知道会有多少事项被生产者放入队列中。
每个生产者和消费者都会占用一定的时间来从对列中放入/取出事务。队列作为通道使生产者与消费者不需要直接接触就可以进行通信。
注意在Python多线程编程中队列因为其线程安全的特性被大量使用但在异步IO中你不需要考虑线程安全性异步IO中我们只有一个线程除非将异步IO与多线程结合起来使用本文 不涉及这部分内容。
本节中的示例程序使用队列来作为生产者和消费者的传输媒介二者互相间不直接接触。
这个程序的同步版本开始去会很糟糕一组生产者依次的将事务放入队列只有等所有的生产者都完成任务队列才被释放接着消费者一个一个的从队列中事务。这种同步的设计带来了大量的 延迟。事务在队列中等待很长时间而不是被立刻取出并处理。
下面的代码 asyncq.py 是利用了asyncio.queue实现的一个异步版本。asyncio.queue中定义了三个coroutine。await queue.put()向队列中放入事务如果队列满了就会返回等待队列可以 有新的空间时再次被调度。await queue.get()从队列中取出事务如果队列为空就会返回等待队列中有新的事务。await queue.join()判断队列中的所有事务是否被全部取出并处理如果是 就会返回。
下面是完成的代码。
#!/usr/bin/env python3
# asyncq.pyimport asyncio
import itertools as it
import os
import random
import time# 辅助函数异步生成随机数
async def makeitem(size: int 5) - str:return os.urandom(size).hex()# 辅助函数 异步休眠随机时间
async def randsleep(a: int 1, b: int 5, callerNone) - None:i random.randint(0, 10)if caller:print(f{caller} sleeping for {i} seconds.)await asyncio.sleep(i)async def produce(name: int, q: asyncio.Queue) - None:n random.randint(0, 10)for _ in it.repeat(None, n): # Synchronous loop for each single producerawait randsleep(callerfProducer {name})i await makeitem()t time.perf_counter()await q.put((i, t))print(fProducer {name} added {i} to queue.)async def consume(name: int, q: asyncio.Queue) - None:while True:await randsleep(callerfConsumer {name})i, t await q.get()now time.perf_counter()print(fConsumer {name} got element {i}f in {now-t:0.5f} seconds.)q.task_done()async def main(nprod: int, ncon: int):q asyncio.Queue()producers [asyncio.create_task(produce(n, q)) for n in range(nprod)]consumers [asyncio.create_task(consume(n, q)) for n in range(ncon)]await asyncio.gather(*producers)await q.join() # Implicitly awaits consumers, toofor c in consumers:c.cancel()if __name__ __main__:import argparserandom.seed(444)parser argparse.ArgumentParser()parser.add_argument(-p, --nprod, typeint, default5)parser.add_argument(-c, --ncon, typeint, default10)ns parser.parse_args()start time.perf_counter()asyncio.run(main(**ns.__dict__))elapsed time.perf_counter() - startprint(fProgram completed in {elapsed:0.5f} seconds.)前面两个函数是辅助函数第一个函数返回随机值做为生产者放入队列中的事务第二个函数随机休眠一段时间。生产者会将10以内随机个数的事务放入队列每个事务是 一个元组(i,t),其中i是辅助函数产生的随机值t是该事务被放入的时间。当消费者将事务从队列中取出后可以根据事务中的时间戳计算该事务在队列中存放了多长时间。
请不要忘记这里的asyncio.sleep()只是用来模拟那些会阻塞的函数。
下面是两个生产者五个消费者时的运行结果。
$ python3 asyncq.py -p 2 -c 5
Producer 0 sleeping for 3 seconds.
Producer 1 sleeping for 3 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 sleeping for 3 seconds.
Consumer 3 sleeping for 5 seconds.
Consumer 4 sleeping for 4 seconds.
Producer 0 added 377b1e8f82 to queue.
Producer 0 sleeping for 5 seconds.
Producer 1 added 413b8802f8 to queue.
Consumer 1 got element 377b1e8f82 in 0.00013 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 got element 413b8802f8 in 0.00009 seconds.
Consumer 2 sleeping for 4 seconds.
Producer 0 added 06c055b3ab to queue.
Producer 0 sleeping for 1 seconds.
Consumer 0 got element 06c055b3ab in 0.00021 seconds.
Consumer 0 sleeping for 4 seconds.
Producer 0 added 17a8613276 to queue.
Consumer 4 got element 17a8613276 in 0.00022 seconds.
Consumer 4 sleeping for 5 seconds.
Program completed in 9.00954 seconds.在这个示例中所有的事务都以大约万分之几秒的速度被处理这些处理事务所需的时间大概来自两个地方
标准的不可避免的运行开销。 所有的消费者都恰好在睡眠状态的情况 考虑第二种情况幸运的是在有海量消费者存在时这是很正常的情况。你可以试一下吧消费者的数量设成1000,看以下会有什么结果。重要的是理论上说你可以有从不同系统上的不同用户 来控制生产者和消费者的行为而队列充当中心的通路就像kafka。
好啦到这里你已经学习了很多关于异步IO的知识了解了如何用async/await来定义coroutine, 还读了三个相关的示例代码。如果你还想继续深入的了解coroutine机制在Python中是如何实现的 请翻开下一章。
Python的异步IO根植于生成器 之前你已经看到了一个关于旧式的生成器类型coroutine的例子虽然这种定义coroutine的方式已经被原生coroutine所取代但为了能让你对coroutine有深入的了解我们还是要研究下这种定义coroutine 的方式。
import asyncioasyncio.coroutine
def py34_coro():生成器类型的coroutine# 请用原生coroutine,但是你需要知道这种使用方式s yield from stuff()return sasync def py35_coro():更现代的语法原生的生成器s await stuff()return sasync def stuff():return 0x10, 0x20, 0x30请试一下如果不用await, 直接调用这两个函数会发生什么 py35_coro()
coroutine object py35_coro at 0x10126dcc8恩是不是和你想得不太一样直接调用一个coroutine函数的返回值是一个coroutine object(实际上await要求后面是一个这样的object, 我们成这样的object 为awaitable)。
提问: 你记不记得Python还有什么函数特性看上去和这个类似当你调用它时不会执行内部的代码而是返回一个object?
如果你的回答时生成器那么恭喜你已经了解了coroutine的本质。coroutine内部是一种增强了的生成器函数。 def gen():
... yield 1
... yield 2
...g gen() #调用生成器函数会返回一个生成器对象g # 什么也不会发生 - 生成器对象是一种迭代对象(iter),需要通过next来遍历它的内部成员.__next__()
generator object gen at 0x1012705e8next(g)
1next(g)
2实际上无论你是用’async dev’声明的原生coroutine,还是使用旧式的’asyncio.coroutine wrappe’ decorator 声明的coroutine, 它们的基础都是生成器函数。从技术上来看‘await’更像是’yiled from’ 而不是’yield’(但请不要忘了‘yield from x()’ 只不过是’i in x(): yield i’ 用法的语法糖)。
生成器函数之所以能称为异步IO的基础是因为它能在某一点上返回然后在这一点上重入的特性。当一个调度器函数执行时遇到’yield’, 它就在这一点上返回之后这个函数就让渡CPU的控制权给它的调用 者知道下一次它被调用时就从这一点继续执行。比如如果你的一个生成器函数中有循环而在循环中会调用’yield’,那么循环就在这个点上暂时停止直到下次函数被调用时会在这个点上继续执行 后续的代码。
我们用一个例子来说明这一点。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-from itertools import cycle# 生成器函数
def endless():Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever# 每次执行到这里会返回一个数值然后交出执行权直到下一次被调用for i in cycle((9, 8, 7, 6)):yield i#yield from cycle((9, 8, 7, 6))def yield_from_loop():# 执行生成器函数返回一个生成器对象# 请和coroutine对比e endless()total 0# 每次生成器执行到yield返回然后下一次迭代从返回的点开始for i in e:if total 6:print(i, end )total ielse:print()# Pause execution. We can resume later.break#打印 9 8 7 6 9 8
yield_from_loop()# 从上次的点继续运行输出(6, 9, 8)
next(e), next(e), next(e)关键字’await’的行为与此类型也是标记一个暂停点然后将执行权利返回(给事件循环)然后事件循环调度其它可以立即执行的coroutine。暂停表示coroutine暂时放弃执行但是并没有退出或结束。请 不要忘记‘yield’,‘yield from’ 和’await’ 都标记了生成器运行中的一个中断点。
这是普通函数与生成器的本质区别普通函数是一种要么不运行要运行就会运行到底直到遇到’return’,然后将函数的返回值返还调用函数。而生成器则不同每次它运行时遇到一个’yield’,不仅会将返回值返回调用函数还会保存当前的运行状态(暂停的位置当前各局部变量的值等等等你下一次调用next()时会从保存的暂停点开始继续运行。
此外generator 还有一个特性帮助实现异步IO。你可以通过一个 generator 对象的 send() 方法向其传送一个值。通过这种方式generator 对象也包括 coroutine 对象可以不被阻塞得调用另一个 对象。这个特性只用于 coroutine 的底层实现你不会直接使用这种方式。
如果你对此敢兴趣可以从 PEP 342: coroutine 的定义 开始学习。 Bretty Cannon 的How the Heck Does Async-Await Work in Python以及 David Beazley 的 “Curious Course on Couroutines and Concurrency” 都是深入学习 coroutine 机制的很好的教材。
这里我这种将以上材料的内容凝练成几句话 coroutine 工作在一种特别的非传统方式下。当调用它们的 send() 方法时会抛出一种异常属性。这之间有一些很曲折的联系但它不会帮助你 实际使用这门语言所以我们不再深入这个话题。
把这些知识点穿在一起现在我们可以总结以下关于做为 coroutine 使用 generator 的几个知识点
1. courtine 是特殊的生成器它利用了生成器的一些特性。
2. 旧式的基于生成器的 coroutine 使用 ‘yield from’ 来等待 coroutine 的结果。现代的 Python 原生语法只是用 await 替换了 ‘yield from’ 来等待一个 coroutine 的结果。 ‘await’ 是对‘yield from’ 的一种模仿认识到这种联系应该会能帮助你理解 coroutine 是如果工作的。
3. ‘await’ 是在它被调用的地方标记了一个断点。coroutine 会在这里暂停临时交出 CPU 使用权直到下次被调度时从这一点开始继续执行。其它特性: 异步生成器(generator)与异步解析式(comprehensions)
除了 async/await 两个关键字以外Python 还引入了 ‘async for’ 来遍历一个异步的迭代器(iterator)。异步迭代器的目的是使每次循环中都能异步的调用代码。
这个概念很自然的可以扩展到异步生成器。回忆我们之前说的在一个原生的 coroutine 中可以使用 await, return, yield 来临时中断或结束 coroutine 的运行。Python 3.6以后通过 PEP 525允许 在 coroutine 中使用 ‘yield’在同一个 coroutine 中调用 ‘await’ 和 ‘yield’ 就会变成一个异步生成器
async def mygen(u: int 10):异步生成2的幂.i 0while i u:yield 2 ** ii 1await asyncio.sleep(0.1)最后很重要的一点Python 允许使用 ‘async for’ 来构造异步数组解析式。就像它的同步版本一样这本质上是一个大大的语法糖:
async def main():# This does *not* introduce concurrent execution# It is meant to show syntax onlyg [i async for i in mygen()]f [j async for j in mygen() if not (j // 3 % 5)]return g, fg, f asyncio.run(main())
print(g)
#[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
print(f)
#[1, 2, 16, 32, 256, 512]有一个微妙但很重要的点无论是异步生成器还是异步解析式都不会使迭代调用并发而是当循环遇到异步的点时暂时放弃 CPU 的所有权好让其它 coroutine 能够运行。
换句话说异步的迭代器和异步生成器都不是用来并发的把函数影射到一个序列或者迭代器上。它们只是设计来让包含它们的 coroutine 能够暂停以允许其它任务运行。 之所以要设计 ‘async for’ 和 ‘async with’ 这时因为如果使用同步的 ‘for’ 和 ‘with’ 会破坏对包含它们的 coroutine 的 ‘await’ 调用。对异步和并发区别的理解是掌握这个不同的关键。
什么是事件循环以及 asyncio.run 事件循环是一个无限循环比如典型的
while True:print(in loop)time.sleep(5)当然在事件循环中做的工作不会是打印输出这么简单实际上事件循环作的任务类似与 kernel 里的调度器监视 所有的 coroutine 的状态当前运行的 coroutine 变成 idle 之后寻找可以被 调度执行的 coroutine. 当一个处于 idle 状态的 coroutine 等待的资源变成可用的时候事件循环可以把它唤醒。
在目前来说事件循环所有的管理工作都在一个函数中完成
asyncio.run(main()) # Python 3.7asyncio.run() 在 Python 3.7 中引入负责创建一个事件循环运行所有的任务直到它们完成然后结束该事件循环。
还有一种相对冗长的方式来处理事件循环使用 get_event_loop()。典型的模式是
loop asyncio.get_event_loop()
try:loop.run_until_complete(main())
finally:loop.close()你肯定在一些过时的示例程序里看到过使用 loop.get_event_loop()但是除非你你有明确的理由要精细的控制事件循环的管理asyncio.run() 应该能满足绝大部分程序的需要。
如果你需要和 Python 程序里的事件循环交互 loop 是那种设计良好的老式 Python 对象支持使用 loop.is_running() 和 loop.is_closed() 来检查内部状态。当你需要时可以操纵操纵它比如 把一个回调作为参数传给循环来调度它。
关于事件循环更重要的是了解一些其内部的机制。下面是关于它的一些值得花时间去学习的知识点
除非被绑定到一个事件循环上否则 coroutine 本身不做太多的事
虽然在我们之前对生成器的解释里已经展示了这一点但仍值得在这里再强调一遍。假如你有一个主 coroutine, 再里面 await 了其他的 coroutine, 那么只是调用这个 coroutine 不会产生效果 import asyncio async def main():
... print(Hello ...)
... await asyncio.sleep(1)
... print(World!) routine main()routine
coroutine object main at 0x1027a6150记住使用 asyncio.run() 来调度 主 coroutine在事件循环中执行它 asyncio.run(routine)
Hello ...
World!(其他的被 await 修饰的子 coroutine 也会被执行。通常不需要单独执行 main() 来生成一个 coroutine 的对象只需要把 main() 作为 aynscio.run() 的参数那么主 coroutine, 包括所有用 await 修饰的被串行调用的子 coroutine, 也都会被事件循环调度执行)
一般情况下一个异步 IO 事件循环跑在一个线程里运行在一个 CPU 上。通常在一个 CPU 上跑单线程的事件循环已经足够。事件循环也可以配置多个 CPU 上。可以参考 John Reese 的相关讲座 本文结尾给出了连接。
事件循环是插件式的。也就是说如果你愿意你也可以写一个自己的事件循环实现。一个很好的例子是 uvloop,它在 Cythyon 中实现了事件循环。 插件式的事件循环意味这你可以使用任何一种事件循环的实现而和 coroutine 的结构无关。实际上 asyncio 本身就包含了两种事件循环的实现缺省的是基于 selector 模块的另一个是给 windows 使用的。
一个完整的例子异步HTTP请求 恭喜你走到现在学习了这么多新鲜的内容现在是时候享受你的学习成果了。在这一小节你将使用 aiohttp 来写一个网页抓取小程序areq.py。aiohttp 是一个非常快的异步HTTP 客户端/服务端框架我们只需要用到客户端。像这样的程序可以用来在把HTTP连接映射到一个集群里的各个主机通过把url组成一个有向图。 You’ve made it this far, and now it’s time for the fun and painless part. In this section, you’ll build a web-scraping URL collector, areq.py, using aiohttp, a blazingly fast async HTTP client/server framework. (We just need the client part.) Such a tool could be used to map connections between a cluster of sites, with the links forming a directed graph.
注意你可能会奇怪为什么Python的request库不兼容异步IO。这是因为Python 的request库是实现在urllib3之上的而urllib3又是利用的Python的http和socket模块。
缺省情况下socket 操作都是阻塞的。这意味着不应该使用 ‘await requests.get(url)’ 因为 ‘.reqeusts.get()’ 不是一个 awaitable 对象。相反aiohttp 库里的几乎所有成员都是 awaitable 对象 比如 session.request() 和 response.text()。所以请不要用 request 库来写异步代码。
这个程序的架构是这样的
从一个本地文件 urls.txt 里读取一系列的 URL 地址
向这些 URL 发送 GET 请求然后解析返回值。如果失败就不再继续处理这个 URL 了。
搜索返回的 HTTP 内容里包含的 href 里的 URL
将这些 URL 写入本地文件 foundurls.txt
尽量用异步和并发来实现上述逻辑。使用 aiohttp 来发送 HTTP 请求使用 aiofiles 来写入文件。这两个库是都是异步IO模式的很好的例子。
下面是 urls.txt 文件的内容。这不是一个很大的文件而且包含了一些不能访问的网站
$ cat urls.txt
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt这里面的第二个 URL 应该返回一个404, 应该要小心处理。如果你想实现一个更有实用性的程序你还需要处理一些更负责的情况比如服务器连接断开或者无限重定向等等。
所有的请求都应该包含在一个 session 里这样就可以复用 session 的内部连接池。
我们先完整的看一遍这个程序然后再一步一步分析
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# areq.py 异步的获取嵌入在 HTML 内的 HTTP 链接 import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parseimport aiofiles
import aiohttp
from aiohttp import ClientSessionlogging.basicConfig(format%(asctime)s %(levelname)s:%(name)s: %(message)s,levellogging.DEBUG,datefmt%H:%M:%S,streamsys.stderr,
)
logger logging.getLogger(areq)
logging.getLogger(chardet.charsetprober).disabled TrueHREF_RE re.compile(rhref(.*?))async def fetch_html(url: str, session: ClientSession, **kwargs) - str: 利用 HTTP GET 请求获取 HTML 页面。kwargs 作为参数传递给 session.request().resp await session.request(methodGET, urlurl, **kwargs)resp.raise_for_status()logger.info(Got response [%s] for URL: %s, resp.status, url)html await resp.text()return htmlasync def parse(url: str, session: ClientSession, **kwargs) - set: 获取 url 链接返回的 html 中内嵌的 href found set()try:html await fetch_html(urlurl, sessionsession, **kwargs)except (aiohttp.ClientError,aiohttp.http_exceptions.HttpProcessingError,) as e:logger.error(aiohttp exception for %s [%s]: %s,url,getattr(e, status, None),getattr(e, message, None),)return foundexcept Exception as e:logger.exception(Non-aiohttp exception occured: %s, getattr(e, __dict__, {}))return foundelse:for link in HREF_RE.findall(html):try:abslink urllib.parse.urljoin(url, link)except (urllib.error.URLError, ValueError):logger.exception(Error parsing URL: %s, link)passelse:found.add(abslink)logger.info(Found %d links for %s, len(found), url)return foundasync def write_one(file: IO, url: str, **kwargs) - None: 爬取一个 url 链接将 html 包含的 href 写入文件 res await parse(urlurl, **kwargs)if not res:return Noneasync with aiofiles.open(file, a) as f:for p in res:await f.write(f{url}\t{p}\n)logger.info(Wrote results for source URL: %s, url)async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) - None: 异步的爬取多个 url 并写入文件 async with ClientSession() as session:tasks []for url in urls:tasks.append(write_one(filefile, urlurl, sessionsession, **kwargs))await asyncio.gather(*tasks)if __name__ __main__:import pathlibimport sysassert sys.version_info (3, 7), Script requires Python 3.7.here pathlib.Path(__file__).parent 打开 urls 文件将 url 读入 set with open(here.joinpath(urls.txt)) as infile:urls set(map(str.strip, infile)) 打开存放提取出的 url 的文件 outpath here.joinpath(foundurls.txt)with open(outpath, w) as outfile:outfile.write(source_url\tparsed_url\n) 运行异步任务 asyncio.run(bulk_crawl_and_write(fileoutpath, urlsurls))这个程序比我们之前所写的那些都更实用也更复杂让我们分解来看。
常量 HREF_RE 是一个正则表达式用来分离出我们最后想要搜索的内容 HTML 里包含的 href 标签。 HREF_RE.search(Go to a hrefhttps://realpython.com/Real Python/a)
re.Match object; span(15, 45), matchhrefhttps://realpython.com/fetch_html() 作为一个 couroutine, 封装了把发送 GET 请求给服务器然后分析返回的 HTML 内容的过程。它发送请求await 服务器的回应如果返回的状态不是 200-OK, 会抛出异常然后处理
resp await session.request(methodGET, urlurl, **kwargs)
resp.raise_for_status()如果状态正常fetch_html() 会以字符串的形式返回 HTML 页面的内容。值得注意的是在这个函数里没有处理异常异常被传递到调用函数并在那得到处理
html await resp.text()我们用 await 来处理 session.request() 和 resp.text() 因为它们的类型都是 awaitable 的 coroutines 对象。如果fetch_html()是同步的这个请求/回应循环会占用很多的时间但是使用了异步IO, 其他的任务可以在这段时间被调用比如解析 html 内容把已经解析出的 URL 链接写入文件。
在 coroutine fetch_html()返回之后, coroutine 调用链继续执行。coroutine parse() 开始将 fetch_html() 返回的 HTML 页面包含的 href 标签提取保证其中的内容是正确的之后将它们转换为绝对 路径格式。
确实 coroutine parse() 的第二部分代码是阻塞的但它包含的是一个快速的正则匹配在我们这个特殊的例子中这部分同步的代码应该执行很快并且不会出问题。但是请记住一个 coroutine 里的每 一行代码都会阻塞其他的 coroutine 的执行直到它调用 yield, await 或者 return 放弃执行权。如果我们例子里的解析是一个更加 CPU 密集的调用就应该考虑调用 loop.run_in_executor() 在一个 单独的线程里执行这部分代码。
接着 coroutine write_one() 的两个参数是 文件对象和一个 URL, 它先 await parse(), 当 parse() 返回了解析出的 URL 之后将这些 URL 和它们的源 URL 一起异步的写入文件。
最后 bulk_crawl_and_write() 是这个程序的 coroutine 调用链的的主入口。它使用了一个 session, 每一个从 urls.txt 中读出来的 URL 是一个单独的 task.
这里有一些需要注意的知识点
缺省情况下 ClientSession 最多支持 100 个连接。可以通过传一个 asyncio.connector.TCPConnector 给 ClientSession 来配置这个行为。你也可以为每个主机单独配置上限。
你可以为 session 或者每个单独的请求设置连接超时。
这个脚本也使用了 async with 来调用一个异步的上下文管理器。Python 中异步的上下文管理器和同步的上下文管理器没什么不同只是将 .exit() 和 .enter() 替换成了 .aenter() 和 .aexit()。 你肯定已经猜到了async with 只能使用在用 async def 定义的 coroutine 中。
请参考本文后面的参考文献部分来继续深入学习这部分知识。
我们尝试运行 areq.py, 来处理9个 URL, 在一秒钟内就能完成所有的获取解析和保存结果的工作。下面是运行的结果
$ python3 areq.py
21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/
21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
21:33:23 INFO:areq: Found 23 links for https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/看上去还不错是吧。你可以检查下输出结果有多少行来看看运行是否正确。我测试时的结果是
$ wc -l foundurls.txt626 foundurls.txt$ head -n 3 foundurls.txt
source_url parsed_url
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos下一步做什么你可以把程序变成递归调用使用 aio-redis 来记录哪一个 URL 已经被抓取以避免重复发送请求你还可以使用 networkx 库来发起连接请求。
一定要适度对一个小网站同时发起1000条请求是非常非常不好的行为。有很多方法可以限制一次发送多少请求比如使用 asyncio 提供的 semphaore。如果你不遵守这个警告最后你只会得到大量的超时 错误伤害你自己的程序这对谁都没有好处。
异步IO的背景 现在你已经看到很多异步IO的 Python 代码现在让我们谈一下什么时候应该使用异步IO,什么时候应该选择其它的并发模式以及做出这一选择的原因。
什么时候选择异步IO?
本文并不是正式讨论异步IO,线程模式和多进程模式的正式对比但会讨论何时最好使用异步IO,而不是使用另外两种模型。
异步IO与多进程模式其实并不构成竞争关系。实际上它们可以被同时使用。如果你有多个 CPU 密集型的任务比如 scikit-learn 或者 keras 中使用的网格搜索很明显应该使用多进程模型。
如果函数调用了阻塞的应用, 在这些函数前都加上 async 不是一个好主意这会减慢你的程序。但就像之前提到的有的时候异步IO可以和多进程模型一起使用。
异步IO和线程模型之间的比较要更直接。在本文开始我提到过“线程很难使用”。实际上即使是在线程模型很容易实现的场合使用线程模型仍然会因为竞争条件内存使用等等问题导致很难调试。
此外与异步IO相比线程模型更难扩展。因为线程是一种有限的系统资源。在很多机器上创建几千个线程会失败我也不会推荐你去尝试。但创建几千个异步IO的任务常常是可行的。
异步IO适合使用在包含有多个IO密集型任务并且等待这些阻塞IO会占用这些任务大量的时间的场景。比如
网络IO, 用异步IO做服务器端或客户端 无服务器设计比如端到端多用户的网络聊天室 模仿“发射后不管”风格的读写操作但不想为每个读和写的对象加锁 不使用异步IO的最大原因是它需要代码库特别支持。如果你想要对某一个数据库使用异步的写操作你不仅需要找到这个数据库的 Python 的封装库还需要看它是不是支持 async/await 语法。请时刻记住 包含同步调用的 coroutine 会阻塞其它任务的运行。
本文结尾的附录里包含了支持 async/await 语法的代码库的简单列表。
我该使用那个异步IO的库
虽然本文使用的是异步IO库 asyncio,但你也可以使用其它的封装异步IO的库。关于这些库的情况Nathaniel J. Smith 有一段描述
“在不久的将来asyncio 很可能会沦落到与其它那些已经实际上很少有人使用的 Python 标准库比如 urllib2, 相同的境地。
实际上我想指出的是正式因为 asyncio 如此成功它才会面临这样的境地 当它被设计时使用的时那时最好的方法但从那时以后asyncio 引入的许多优秀特性比如 async/await, 为后来的开发 者提供了灵感而 asyncio 则为它的之前取得的成功所累慢慢变得陈旧。
在其它提供异步IO的库中curio 和 trio 是最为人们看好的两个。我个人认为如果你是构建一个中等规模设计比较直接的项目直接使用 asyncio 已经足够而且能够避免引入除了 Python 标准库 之外的其它依赖。
但不管怎样试着学习使用 curio 和 trio, 你也许会发现它们对你来说更易用更符合直觉。许多 asyncio 的概念也适用于其它的异步IO库。
其它 下面我们会讨论一些关于 asyncio 库和 async/await 语法的杂项这些知识点不容易放到前面的章节中但对于构建和理解一个完整的程序仍然是很重要的。
其它的 asyncio 高层函数 除了 asyncio.run() 之外你也可以看到其它其它一些 asyncio 库的高层函数比如 asyncio.create_task() 和 asyncio.gather()。
你可以用 create_task() 加上 run() 来调度一个 coroutine 对象的执行
import asyncioasync def coro(seq) - list:IO 等待的时间是 seq 里的最大值await asyncio.sleep(max(seq))return list(reversed(seq))async def main():# 因为这个例子里只有一个任务所以可以不用 creae_task# 直接使用 await coro([3, 2, 1])t asyncio.create_task(coro([3, 2, 1])) # Python 3.7 或更新await tprint(ft: type {type(t)})print(ft done: {t.done()})t asyncio.run(main())
# t: type class _asyncio.Task
# t done: True在这个例子里有一个微妙的地方如果在 main() 中不调用 ‘await t’, coroutine coco 有可能在 main() 完成后才完成。因为 asyncio.run(main()) 实际上是调用 loop.run_until_complete(main()) 事件循环只关心什么时候 ‘main’ 结束而不会知道在 main() 里创建了其他任务仍然也就不会知道 coroutine coco 什么时候完结。所有如果 coroutine coco 在 main() 之后才完结那么这时 事件循环已经结束coroutine coco 也就被终止执行了。你可以通过函数 asyncio.Task.all_tasks() 来获得这个事件循环里所有等待被调度的任务的列表。
注意 asyncio.create_task() 是在 Python 3.7 中被引入在 Python 3.6 或更低的版本中可以使用 asyncio.ensure_future()。
另一种方法是使用 asyncio.gather()。它的作用是把多个 coroutine 集成为一个单独的 future 对象然后返回。如果把多个 coroutine 作为参数传给 ‘await asyncio.gather()就可以等到他们都 执行完成。这有一点想我们之前的例子 queue.jone() 。gather() 的返回值是它所有输入的 coroutine 的执行结果组成的链表。 import timeasync def main():
... t asyncio.create_task(coro([3, 2, 1]))
... t2 asyncio.create_task(coro([10, 5, 0])) # Python 3.7
... print(Start:, time.strftime(%X))
... a await asyncio.gather(t, t2)
... print(End:, time.strftime(%X)) # Should be 10 seconds
... print(fBoth tasks done: {all((t.done(), t2.done()))})
... return a
...a asyncio.run(main())
Start: 16:20:11
End: 16:20:21
Both tasks done: Truea
[[1, 2, 3], [0, 5, 10]]有可能已经注意到了 gatyer() 会等待参数中所有的 coroutine 的执行结果。你也可以遍历 asyncio.as_complted()按照完成的顺序来得到那些以完成的任务的输出。和 gather() 一样这个函数的 输入参数也是所有的 coroutine, 而输出是一个当这些任务结束时 yield 执行结果的迭代器。下面的例子里coroutine coco([3,2,1]) 的输出结果会在 coco([10,5,0]) 之前得到而不像 gather() 一样同时得到。 async def main():
... t asyncio.create_task(coro([3, 2, 1]))
... t2 asyncio.create_task(coro([10, 5, 0]))
... print(Start:, time.strftime(%X))
... for res in asyncio.as_completed((t, t2)):
... compl await res
... print(fres: {compl} completed at {time.strftime(%X)})
... print(End:, time.strftime(%X))
... print(fBoth tasks done: {all((t.done(), t2.done()))})
...a asyncio.run(main())
Start: 09:49:07
res: [1, 2, 3] completed at 09:49:10
res: [0, 5, 10] completed at 09:49:17
End: 09:49:17
Both tasks done: True最后你也会看到 asyncio.ensure_future()。但你应该会很少用到它因为它是一个底层的 API其大部分的功能已经被前面我们提到过的 create_task() 取代了。
await 的优先级 虽然行为类型但关键词 await 的优先级要比 yield 高得多。这也就意味着如果使用了await 替代 yield from,很多场景里没必要使用括号了。更多信息请参考 PEP 492 里使用 await 表达式的例子。
总结## 标题 现在你已经学会使用 async/await 以及建立在它们之上的库了。这里是你应该已经学习到的知识点
1. 异步 IO 是一种语言无关的模型以及一种通过允许 coroutine 相互之间间接通讯来实现并发的方法。
2. Python 引入新的关键字 async/await用来标记和定义 coroutine
3. asyncio, 是一个提供运行和管理 coroutine 的 Python 包相关资源
Python 版本规范 Async IO 是一个还在快速演进的部分不停有新的东西出现。这里我们列出与 asyncio 相关的 Python 版本演进
3.3: 引入 yield from 表达式允许生成器expression allows for generator delegation.
3.4: asyncio 被引入 Python 标准库, 此时它的 API 被标示为尚未稳定。
3.5: async 和 await 称为 Python 语法的一部分用来定义和等待 coroutine但还没有变成关键字也就是说你还可以定义名字是 async 和 await 的函数或变量。
3.6: 引入异步生成器和异步解析式。asyncio 的 API 被标示为稳定版本。
3.7: async 和 await 称为保留的关键字。建议不再使用旧式的 asyncio.routine() 来定义 coroutine。引入了 asyncio.run() 和其他一些特性。
如果你希望能安全的使用 asyncio并且想要使用 asyncio.run()那么建议升级到 Python 3.7 以上的版本。
文献
这里是一些相关的扩展资源 Real Python: Speed up your Python Program with Concurrency Real Python: What is the Python Global Interpreter Lock? CPython: The asyncio package source Python docs: Data model Coroutines TalkPython: Async Techniques and Examples in Python Brett Cannon: How the Heck Does Async-Await Work in Python 3.5? PYMOTW: asyncio A. Jesse Jiryu Davis and Guido van Rossum: A Web Crawler With asyncio Coroutines Andy Pearce: The State of Python Coroutines: yield from Nathaniel J. Smith: Some Thoughts on Asynchronous API Design in a Post-async/await World Armin Ronacher: I don’t understand Python’s Asyncio Andy Balaam: series on asyncio (4 posts) Stack Overflow: Python asyncio.semaphore in async-await function Yeray Diaz: AsyncIO for the Working Python Developer Asyncio Coroutine Patterns: Beyond await Python 的一些 What’s New 中详细解释了语言变化背后的动机 What’s New in Python 3.3 (yield from and PEP 380) What’s New in Python 3.6 (PEP 525 530) David Beazley 的相关文章: Generator: Tricks for Systems Programmers A Curious Course on Coroutines and Concurrency Generators: The Final Frontier YouTube 上的视频资源 John Reese - Thinking Outside the GIL with AsyncIO and Multiprocessing - PyCon 2018 Keynote David Beazley - Topics of Interest (Python Asyncio) David Beazley - Python Concurrency From the Ground Up: LIVE! - PyCon 2015 Raymond Hettinger, Keynote on Concurrency, PyBay 2017 Thinking about Concurrency, Raymond Hettinger, Python core developer Miguel Grinberg Asynchronous Python for the Complete Beginner PyCon 2017 Yury Selivanov asyncawait and asyncio in Python 3 6 and beyond PyCon 2017 Fear and Awaiting in Async: A Savage Journey to the Heart of the Coroutine Dream What Is Async, How Does It Work, and When Should I Use It? (PyCon APAC 2014)
兼容 async/await 的库
aio-libs:
- aiohttp: Asynchronous HTTP client/server framework
- aioredis: Async IO Redis support
- aiopg: Async IO PostgreSQL support
- aiomcache: Async IO memcached client
- aiokafka: Async IO Kafka client
- aiozmq: Async IO ZeroMQ support
- aiojobs: Jobs scheduler for managing background tasks
- async_lru: Simple LRU cache for async IOmagicstack: - uvloop: Ultra fast async IO event loop- asyncpg: (Also very fast) async IO PostgreSQL support其它
- trio: Friendlier asyncio intended to showcase a radically simpler design
- aiofiles: Async file IO
- asks: Async requests-like http library
- asyncio-redis: Async IO Redis support
- aioprocessing: Integrates multiprocessing module with asyncio
- umongo: Async IO MongoDB client
- unsync: Unsynchronize asyncio
- aiostream: Like itertools, but async本文现在在 Real Python 有了相关的视频教程你可以观看并完成对应的编程训练来加深理解: Hands-on Python 3 Concurrency With the asyncio Module 文章转载自: http://www.morning.gllhx.cn.gov.cn.gllhx.cn http://www.morning.wjdgx.cn.gov.cn.wjdgx.cn http://www.morning.jpmcb.cn.gov.cn.jpmcb.cn http://www.morning.kpgms.cn.gov.cn.kpgms.cn http://www.morning.bpkqd.cn.gov.cn.bpkqd.cn http://www.morning.smpmn.cn.gov.cn.smpmn.cn http://www.morning.zqcgt.cn.gov.cn.zqcgt.cn http://www.morning.sbrjj.cn.gov.cn.sbrjj.cn http://www.morning.diuchai.com.gov.cn.diuchai.com http://www.morning.fzwf.cn.gov.cn.fzwf.cn http://www.morning.rqwwm.cn.gov.cn.rqwwm.cn http://www.morning.qcdtzk.cn.gov.cn.qcdtzk.cn http://www.morning.pmlgr.cn.gov.cn.pmlgr.cn http://www.morning.ljdtn.cn.gov.cn.ljdtn.cn http://www.morning.lhgqc.cn.gov.cn.lhgqc.cn http://www.morning.mtbth.cn.gov.cn.mtbth.cn http://www.morning.kzqpn.cn.gov.cn.kzqpn.cn http://www.morning.mnlk.cn.gov.cn.mnlk.cn http://www.morning.lpzqd.cn.gov.cn.lpzqd.cn http://www.morning.kycxb.cn.gov.cn.kycxb.cn http://www.morning.zqwqy.cn.gov.cn.zqwqy.cn http://www.morning.knlyl.cn.gov.cn.knlyl.cn http://www.morning.nrmyj.cn.gov.cn.nrmyj.cn http://www.morning.rxhsm.cn.gov.cn.rxhsm.cn http://www.morning.drcnf.cn.gov.cn.drcnf.cn http://www.morning.mrfr.cn.gov.cn.mrfr.cn http://www.morning.dgckn.cn.gov.cn.dgckn.cn http://www.morning.lchtb.cn.gov.cn.lchtb.cn http://www.morning.nxdqz.cn.gov.cn.nxdqz.cn http://www.morning.ryxyz.cn.gov.cn.ryxyz.cn http://www.morning.qfwzm.cn.gov.cn.qfwzm.cn http://www.morning.zknjy.cn.gov.cn.zknjy.cn http://www.morning.zbpqq.cn.gov.cn.zbpqq.cn http://www.morning.gnlyq.cn.gov.cn.gnlyq.cn http://www.morning.lmfmd.cn.gov.cn.lmfmd.cn http://www.morning.wdskl.cn.gov.cn.wdskl.cn http://www.morning.pqnkg.cn.gov.cn.pqnkg.cn http://www.morning.qqhersx.com.gov.cn.qqhersx.com http://www.morning.fbpyd.cn.gov.cn.fbpyd.cn http://www.morning.rmjxp.cn.gov.cn.rmjxp.cn http://www.morning.gthwr.cn.gov.cn.gthwr.cn http://www.morning.rjrh.cn.gov.cn.rjrh.cn http://www.morning.jfymz.cn.gov.cn.jfymz.cn http://www.morning.fbzdn.cn.gov.cn.fbzdn.cn http://www.morning.lfdrq.cn.gov.cn.lfdrq.cn http://www.morning.mrbmc.cn.gov.cn.mrbmc.cn http://www.morning.hfbtt.cn.gov.cn.hfbtt.cn http://www.morning.khclr.cn.gov.cn.khclr.cn http://www.morning.drbwh.cn.gov.cn.drbwh.cn http://www.morning.xlclj.cn.gov.cn.xlclj.cn http://www.morning.wbqt.cn.gov.cn.wbqt.cn http://www.morning.hkchp.cn.gov.cn.hkchp.cn http://www.morning.mxnfh.cn.gov.cn.mxnfh.cn http://www.morning.mwjwy.cn.gov.cn.mwjwy.cn http://www.morning.lhygbh.com.gov.cn.lhygbh.com http://www.morning.jyzqn.cn.gov.cn.jyzqn.cn http://www.morning.dktyc.cn.gov.cn.dktyc.cn http://www.morning.bxrlt.cn.gov.cn.bxrlt.cn http://www.morning.lzdbb.cn.gov.cn.lzdbb.cn http://www.morning.mqffm.cn.gov.cn.mqffm.cn http://www.morning.wyzby.cn.gov.cn.wyzby.cn http://www.morning.rhchr.cn.gov.cn.rhchr.cn http://www.morning.cgtfl.cn.gov.cn.cgtfl.cn http://www.morning.mcjxq.cn.gov.cn.mcjxq.cn http://www.morning.cfpq.cn.gov.cn.cfpq.cn http://www.morning.wcyr.cn.gov.cn.wcyr.cn http://www.morning.jjnry.cn.gov.cn.jjnry.cn http://www.morning.pbsqr.cn.gov.cn.pbsqr.cn http://www.morning.rymb.cn.gov.cn.rymb.cn http://www.morning.sqhtg.cn.gov.cn.sqhtg.cn http://www.morning.rzrbw.cn.gov.cn.rzrbw.cn http://www.morning.xjkr.cn.gov.cn.xjkr.cn http://www.morning.mqxzh.cn.gov.cn.mqxzh.cn http://www.morning.mlpch.cn.gov.cn.mlpch.cn http://www.morning.xrpjr.cn.gov.cn.xrpjr.cn http://www.morning.pgxjl.cn.gov.cn.pgxjl.cn http://www.morning.wgzzj.cn.gov.cn.wgzzj.cn http://www.morning.xpqdf.cn.gov.cn.xpqdf.cn http://www.morning.pbmkh.cn.gov.cn.pbmkh.cn http://www.morning.zrdhd.cn.gov.cn.zrdhd.cn