Python-asyncio-协程
本文最后更新于:2020年12月9日 上午
信息
asyncio
是用来编写 并发
代码的库,使用 async/await
语法asyncio
被用作多个提供高性能 Python
异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等
asyncio
往往是构建IO密集型
和高层级结构化
网络代码的最佳选择
文档
asyncio
官网:https://docs.python.org/zh-cn/3/library/asyncio.html
asyncio
提供一组 高层级
API 用于:
并发地 运行 Python 协程
并对其执行过程实现完全控制;
- 执行
网络 IO 和 IPC
; - 控制
子进程
; - 通过
队列
实现分布式任务; 同步
并发代码;
此外,还有一些 低层级
API 以支持 库和框架的开发者 实现:
- 创建和管理
事件循环
,以提供异步 API 用于网络化
, 运行子进程
,处理OS 信号
等等; - 使用
transports
实现高效率协议; - 通过 async/await 语法
桥接
基于回调的库和代码
协程与任务
协程
协程
通过 async/await
语法进行声明,是编写 asyncio
应用的推荐方式
协程基础
1 |
|
结果
1 |
|
注意:简单地调用一个协程并不会将其加入执行日程
概念
- 协程函数: 定义形式为
async def
的函数 - 协程对象: 调用
协程函数
所返回的对象
运行
要真正运行一个协程,asyncio
提供了三种主要机制:
asyncio.run()
函数直接运行asyncio.run()
是用来运行协程的最高层级的入口,案例见上- 串行运行
没有特别设置的情况下,连续await
一个或多个协程能让它串行运行,案例 - 并发运行
asyncio.create_task()
函数用来作为asyncio
任务 的多个协程,案例
可等待对象
如果一个对象可以在 await
语句中使用,那么它就是 可等待对象
许多 asyncio API
都被设计为接受可等待对象
可等待 对象有三种主要类型:协程、任务、Future
可等待对象 - 协程
Python
协程
属于 可等待
对象,因此可以在其他协程中被等待
1 |
|
可等待对象 - Task 任务
Task
被用来设置日程以便 并发
执行协程。
当一个协程通过 asyncio.create_task()
等函数被打包为一个 Task
,该协程将自动排入日程准备立即运行:
1 |
|
名字 与 计算结果
- 名字
可以为Task
对象设置一个名字,用来辨识不同的对象
名字可以在创建的时候通过参数设置, 也可以通过set_name()
方法进行设置 - 获取计算结果
可以通过result()
方法来获取Task
运行完毕以后得到的计算结果
1 |
|
运行完毕 与 取消
- 是否运行完毕
查看Task
是否运行完毕可以用done()
方法
如果Task
对象 已完成 则返回True
当Task
所封包的协程返回一个值、引发一个异常或Task
本身被取消时,会被认为 已完成 - 取消
要取消一个正在运行的Task
对象可使用cancel()
方法
调用此方法将使该Task
对象抛出一个CancelledError
异常给打包的协程 - 查看是否被取消
使用cancelled()
方法可以查看Task
是否已经被取消运行
1 |
|
可等待对象 - Future 对象
Future
是一种特殊的 低层级
可等待对象,表示一个异步操作的 最终结果
当一个 Future 对象
被等待,这意味着协程将保持等待直到该 Future
对象在其他地方操作完毕
在 asyncio
中需要 Future
对象以便允许通过 async/await
使用基于回调的代码
通常情况下 没有必要 在应用层级的代码中创建 Future 对象
运行 asyncio 程序
asyncio.run(coro, *, debug=False)
执行 coroutine
coro 并返回结果。
此函数会运行传入的协程,负责管理 asyncio
事件循环,终结异步生成器,并关闭线程池。
当有其他 asyncio
事件循环在同一线程中运行时,此函数不能被调用。
如果 debug
为 True
,事件循环将以调试模式运行。
此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio
程序的主入口点,理想情况下应当只被调用一次。
1 |
|
创建任务
asyncio.create_task(coro, *, name=None)
将 协程函数coro
打包为一个 Task
排入日程准备执行,函数执行后,会返回一个 Task
对象
如果 参数name
不为 None
,它将使用 Task.set_name()
来设为任务的名称
该任务会在 get_running_loop()
返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError
此函数 在
Python 3.7
中被加入。在Python 3.7
之前,可以改用低层级的asyncio.ensure_future()
函数。
1 |
|
休眠
coroutine asyncio.sleep(delay, result=None, *)
阻塞 delay
指定的秒数
如果指定了 result
,则当协程完成时将其返回给调用者。sleep()
总是会挂起当前任务,以允许其他任务运行。
以下协程示例运行 5 秒,每秒显示一次当前日期:
1 |
|
并发运行
awaitable asyncio.gather(*aws, return_exceptions=False)
并发 运行aws
序列中的 可等待对象
如果 aws
中的某个可等待对象为协程,它将自动作为一个Task
加入日程
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws
中可等待对象的顺序一致
- return_exceptions
- 如果
return_exceptions
为False
(默认),所引发的首个异常会立即传播给等待gather()
的任务aws
序列中的其他可等待对象 不会被取消 并将继续运行 - 如果
return_exceptions
为True
,异常会和成功的结果一样处理,并聚合至结果列表。
- 如果
如果 gather()
被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。
如果 aws
序列中的任一 Task
或 Future
对象 被取消,它将被当作引发了 CancelledError
一样处理 – 在此情况下 gather()
调用 不会 被取消。这是为了防止一个已提交的 Task/Future
被取消导致其他 Tasks/Future
也被取消
1 |
|
屏蔽取消操作
awaitable asyncio.shield(aw, *, loop=None)
保护一个 可等待对象
防止其被 取消
如果 aw
是一个协程,它将自动作为任务加入日程
以下语句:res = await shield(something())
相当于:res = await something()
不同之处 在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消
从something()
的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 “await” 表达式仍然会引发 CancelledError
如果通过其他方式取消 something()
(例如在其内部操作) 则 shield()
也会取消。
如果希望完全忽略取消操作 (不推荐) 则 shield()
函数需要配合一个 try/except
代码段
超时
coroutine asyncio.wait_for(aw, timeout, *, loop=None)
等待 aw
可等待对象 完成,指定 timeout
秒数后超时
如果 aw
是一个协程,它将自动作为任务加入日程timeout
可以为 None
,也可以为 float
或 int
型数值表示的等待秒数。如果 timeout
为 None
,则等待直到完成
如果发生超时,任务将取消并引发 asyncio
.TimeoutError
要避免任务 取消,可以加上 shield()
此函数将等待直到 Future
确实被取消,所以总等待时间可能超过 timeout
。 如果在取消期间发生了异常,异常将会被传播
如果等待被取消,则 aw
指定的对象也会被取消
1 |
|
简单等待
wait
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
运行 aws
的可等待对象。阻塞直到触发return_when
设置的条件
返回两个 Task/Future
集合: (done, pending)
1 |
|
如指定 timeout (float 或 int 类型)
则它将被用于控制返回之前等待的最长秒数
请注意此函数不会引发 asyncio.TimeoutError
当超时发生时,未完成的 Future
或 Task
将在指定秒数后被返回
return_when
指定此函数应在何时返回。它必须为以下常数之一
常数 | 描述 |
---|---|
FIRST_COMPLETED | 函数将在任意可等待对象结束或取消时返回 |
FIRST_EXCEPTION | 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED |
ALL_COMPLETED | 函数将在所有可等待对象结束或取消时返回 |
与 wait_for()
不同,wait()
在超时发生时不会取消可等待对象
as_completed
asyncio.as_completed(aws, *, loop=None, timeout=None)
运行 aws
的可等待对象。返回可迭代的协程
如果在所有 Future
对象完成前发生超时则将引发 asyncio.TimeoutError
1 |
|
同步原语
章节官方文档: https://docs.python.org/zh-cn/3/library/asyncio-sync.html
原语
所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可被中断
在操作系统中,某些被进程调用的操作,如队列操作、对信号量的操作、检查启动外设操作等,一旦开始执行,就不能被中断,否则就会出现操作错误,造成系统混乱
asyncio
同步原语被设计为与 threading
模块的类似,但有两个关键注意事项:
asyncio
原语不是线程安全的,因此它们不应被用于OS
线程同步 (而应当使用threading
);- 这些同步原语的方法不接受
timeout
参数;请使用asyncio.wait_for()
函数来执行带有超时的操作
Lock 锁
class asyncio.Lock(*)
实现一个用于 asyncio
任务的互斥锁。 非线程安全asyncio
锁可被用来保证对共享资源的独占访问
推荐通过 async with
语句来使用 Lock
方法 | 信息 | 特殊 |
---|---|---|
coroutine acquire() | 获取锁 此方法会等待直至锁为 unlocked ,将其设为 locked 并返回 True 当有一个以上的协程在 acquire() 中被阻塞则会等待解锁,最终只有一个协程会被执行 |
公平的的获取: 等待的协程服从先等先服务原则 |
release() | 释放锁 当锁为 locked 时,将其设为 unlocked 并返回 |
如果锁为 unlocked ,则会引发 RuntimeError |
locked() | 如果锁为 locked 则返回 True |
1 |
|
Event 事件
class asyncio.Event(*)
事件对象。 该对象不是线程安全的。
asyncio
事件可被用来通知多个 asyncio
任务已经有事件发生。
Event
对象会管理一个内部旗标,该旗标初始时将被设为 false
set()
方法将其设为true
clear()
方法将其设为false
wait()
方法会阻塞直至该旗标被设为true
1 |
|
Condition 条件
asyncio
条件原语可被任务用于等待某个事件发生,然后获取对共享资源的独占访问
在本质上,Condition
对象合并了 Event
和 Lock
的功能
多个 Condition
对象有可能共享一个 Lock
,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访问
可选的 lock
参数必须为 Lock
对象或 None
。 在后一种情况下会自动创建一个新的 Lock
对象。
推荐通过 async with
语句来使用 Condition
|方法|信息|特殊|
|–|–|
|coroutine acquire()|获取下层的锁
此方法会等待直至下层的锁为 unlocked
,将其设为 locked
并返回 True
|
|notify(n=1)|唤醒最多 n
个正在等待此条件的任务(默认为 1 个)
如果没有任务正在等待则此方法为空操作。
锁必须在此方法被调用前被获取并在随后被快速释放
|如果通过一个 unlocked
锁调用则会引发 RuntimeError
|
|locked()|如果下层的锁已被获取则返回 True
||
|notify_all()|唤醒所有正在等待此条件的任务。此方法的行为类似于 notify()
,但会唤醒所有正在等待的任务。锁必须在此方法被调用前被获取并在随后被快速释放| 如果通过一个 unlocked
锁调用则会引发 RuntimeError
|
|release()|释放下层的锁|当在未锁定的锁上发起调用时,会引发 RuntimeError
|
|coroutine wait()|等待直至收到通知
这个方法会释放下层的锁,然后保持阻塞直到被 notify()
或 notify_all()
调用所唤醒。
一旦被唤醒,Condition
会重新获取它的锁并且此方法将返回 True
|当此方法被调用时如果调用方任务未获得锁,则会引发 RuntimeError
|
|coroutine wait_for(predicate)|等待直到目标值变为 true
目标必须为一个可调用对象,其结果将被解读为一个布尔值。 最终的值将为返回值。|
Semaphore 信号量
信号量会管理一个内部计数器,该计数器会随每次 acquire()
调用递减并随每次 release()
调用递增。计数器的值永远不会降到零以下
当 acquire()
发现其值为零时,它将保持阻塞直到有某个任务调用了 release()
可选的 value
参数用来为内部计数器赋初始值 (默认值为 1)。 如果给定的值小于 0
则会引发 ValueError
方法 | 信息 |
---|---|
coroutine acquire() | 获取一个信号量。 如果内部计数器的值大于零,则将其减一并立即返回 True 如果其值为零,则会等待直到 release() 并调用并返回 True |
locked() | 如果信号量对象无法被立即获取则返回 True |
release() | 释放一个信号量对象,将内部计数器的值加1 可以唤醒一个正在等待获取信号量对象的任务。 不同于 BoundedSemaphore ,Semaphore 允许执行的 release() 调用多于 acquire() 调用 |
BoundedSemaphore
class asyncio.BoundedSemaphore(value=1, *)
绑定的信号量对象BoundedSemaphore
是特殊版本的 Semaphore
,如果在 release()
中内部计数器值增加到初始 value
以上它将引发一个 ValueError
未完成
位置:https://docs.python.org/zh-cn/3/library/asyncio-subprocess.html
案例
串行等待运行
以下代码段会在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world
1 |
|
>预期的输出:
1 |
|
asyncio.create_task 基础并行运行
等待到两个协程都执行完毕,因为是并行的,预计会用两秒钟
1 |
|
async with
运行Lock
通过async with
来运行Lock
能够能够让代码更加美观
1 |
|
这等价于:
1 |
|
async with
运行Condition
通过async with
来运行Lock
能够能够让代码更加美观
1 |
|
这等价于:
1 |
|
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!