Python-asyncio-协程

本文最后更新于:2020年12月9日 上午

信息

asyncio是用来编写 并发 代码的库,使用 async/await 语法
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等

asyncio 往往是构建 IO密集型 和高层级 结构化 网络代码的最佳选择

文档

asyncio官网:https://docs.python.org/zh-cn/3/library/asyncio.html

asyncio提供一组 高层级API 用于:
并发地 运行 Python 协程 并对其执行过程实现完全控制;

  • 执行 网络 IO 和 IPC;
  • 控制 子进程;
  • 通过 队列 实现分布式任务;
  • 同步 并发代码;

此外,还有一些 低层级 API 以支持 库和框架的开发者 实现:

  • 创建和管理 事件循环,以提供异步 API 用于 网络化, 运行 子进程,处理 OS 信号 等等;
  • 使用 transports 实现高效率协议;
  • 通过 async/await 语法 桥接 基于回调的库和代码

协程与任务

协程

协程 通过 async/await 语法进行声明,是编写 asyncio 应用的推荐方式

协程基础

1
2
3
4
5
6
7
8
import asyncio

async def main():
print('hello')
await asyncio.sleep(1)
print('world')

asyncio.run(main())

结果

1
2
hello
world

注意:简单地调用一个协程并不会将其加入执行日程

概念

  • 协程函数: 定义形式为 async def 的函数
  • 协程对象: 调用 协程函数 所返回的对象

运行

要真正运行一个协程,asyncio 提供了三种主要机制:

  • asyncio.run() 函数直接运行
    asyncio.run()是用来运行协程的最高层级的入口,案例见上
  • 串行运行
    没有特别设置的情况下,连续 await 一个或多个协程能让它串行运行,案例
  • 并发运行
    asyncio.create_task() 函数用来作为 asyncio 任务 的多个协程,案例

可等待对象

如果一个对象可以在 await 语句中使用,那么它就是 可等待对象
许多 asyncio API 都被设计为接受可等待对象

可等待 对象有三种主要类型:协程、任务、Future

可等待对象 - 协程

Python 协程属于 可等待 对象,因此可以在其他协程中被等待

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def nested():
return 42

async def main():
# 如果一个协程创建出来,但是没有被 await ,那么它根本就不会被执行
nested()
# await 以后才会被执行
print(await nested()) # 将会输出42

asyncio.run(main())

可等待对象 - Task 任务

Task 被用来设置日程以便 并发 执行协程。

当一个协程通过 asyncio.create_task() 等函数被打包为一个 Task,该协程将自动排入日程准备立即运行:

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def nested():
return 42

async def main():
# 利用 协程函数 nested() 创建了一个 Task
# 这个 Task 会被 协程 main() 立即执行
task = asyncio.create_task(nested())
await task

asyncio.run(main())

名字 与 计算结果

  • 名字
    可以为Task对象设置一个名字,用来辨识不同的对象
    名字可以在创建的时候通过参数设置, 也可以通过set_name()方法进行设置
  • 获取计算结果
    可以通过result()方法来获取Task运行完毕以后得到的计算结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def nested():
return 42

async def main():
task = asyncio.create_task(nested())
# 设置 Task 的名字。其实也可以在创建的时候传参来设置
task.set_name('My_name_is_Task')
await task
print('Task 的名字为:', task.get_name())
print('Task 的计算结果为:', task.result())

asyncio.run(main())

运行完毕 与 取消

  • 是否运行完毕
    查看 Task 是否运行完毕可以用done()方法
    如果 Task 对象 已完成 则返回 True
    Task 所封包的协程返回一个值、引发一个异常或 Task 本身被取消时,会被认为 已完成
  • 取消
    要取消一个正在运行的 Task 对象可使用 cancel() 方法
    调用此方法将使该 Task 对象抛出一个 CancelledError 异常给打包的协程
  • 查看是否被取消
    使用cancelled()方法可以查看Task是否已经被取消运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
from random import randint


async def nested():
a = 0
for _ in range(4):
await asyncio.sleep(1)
a += randint(1,10)
print('nested -> {}'.format(a))
return a

async def using_cancel():
_task = asyncio.create_task(nested())
await asyncio.sleep(2) # 等待,以验证真的是运行以后再取消
_task.cancel()
try:
await _task
except asyncio.CancelledError:
if _task.cancelled(): # cancelled() 可被用来检测 Task 对象是否被取消。
print("task 已经被取消了")

if _task.done(): # done()方法 返回 Task 是否已经完成
print('task 已经完成')

可等待对象 - Future 对象

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果

当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕

asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码

通常情况下 没有必要 在应用层级的代码中创建 Future 对象

运行 asyncio 程序

asyncio.run(coro, *, debug=False)
执行 coroutine coro 并返回结果。

此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。

当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

如果 debugTrue,事件循环将以调试模式运行。

此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

1
2
3
4
5
async def main():
await asyncio.sleep(1)
print('hello')

asyncio.run(main())

创建任务

asyncio.create_task(coro, *, name=None)
协程函数coro 打包为一个 Task 排入日程准备执行,函数执行后,会返回一个 Task 对象

如果 参数name 不为 None,它将使用 Task.set_name() 来设为任务的名称

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError

此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

1
2
3
4
5
6
7
8
9
async def coro():
...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())

休眠

coroutine asyncio.sleep(delay, result=None, *)
阻塞 delay 指定的秒数
如果指定了 result,则当协程完成时将其返回给调用者。
sleep() 总是会挂起当前任务,以允许其他任务运行。

以下协程示例运行 5 秒,每秒显示一次当前日期:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
import datetime

async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)

asyncio.run(display_date())

并发运行

awaitable asyncio.gather(*aws, return_exceptions=False)
并发 运行aws 序列中的 可等待对象

如果 aws 中的某个可等待对象为协程,它将自动作为一个Task加入日程

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致

  • return_exceptions
    • 如果 return_exceptionsFalse (默认),所引发的首个异常会立即传播给等待 gather() 的任务
      aws 序列中的其他可等待对象 不会被取消 并将继续运行
    • 如果 return_exceptionsTrue,异常会和成功的结果一样处理,并聚合至结果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。

如果 aws 序列中的任一 TaskFuture 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio


async def factorial(name, number):
'''
计算阶乘
'''
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: {number}的阶乘 = {f}")


async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)

asyncio.run(main())

屏蔽取消操作

awaitable asyncio.shield(aw, *, loop=None)
保护一个 可等待对象 防止其被 取消
如果 aw 是一个协程,它将自动作为任务加入日程

以下语句:
res = await shield(something())
相当于:
res = await something()
不同之处 在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消
something() 的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 “await” 表达式仍然会引发 CancelledError

如果通过其他方式取消 something() (例如在其内部操作) 则 shield() 也会取消。

如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段

超时

coroutine asyncio.wait_for(aw, timeout, *, loop=None)
等待 aw 可等待对象 完成,指定 timeout 秒数后超时
如果 aw 是一个协程,它将自动作为任务加入日程
timeout 可以为 None,也可以为 floatint 型数值表示的等待秒数。如果 timeoutNone,则等待直到完成
如果发生超时,任务将取消并引发 asyncio.TimeoutError
要避免任务 取消,可以加上 shield()

此函数将等待直到 Future 确实被取消,所以总等待时间可能超过 timeout。 如果在取消期间发生了异常,异常将会被传播

如果等待被取消,则 aw 指定的对象也会被取消

1
2
3
4
5
6
7
8
9
10
11
12
13
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')

async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')

asyncio.run(main())

简单等待

wait

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
运行 aws 的可等待对象。阻塞直到触发return_when设置的条件
返回两个 Task/Future 集合: (done, pending)

1
done, pending = await asyncio.wait(aws)

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数

请注意此函数不会引发 asyncio.TimeoutError
当超时发生时,未完成的 FutureTask 将在指定秒数后被返回

return_when 指定此函数应在何时返回。它必须为以下常数之一

常数 描述
FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回
FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回

wait_for() 不同,wait() 在超时发生时不会取消可等待对象

as_completed

asyncio.as_completed(aws, *, loop=None, timeout=None)
运行 aws 的可等待对象。返回可迭代的协程
如果在所有 Future 对象完成前发生超时则将引发 asyncio.TimeoutError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
from random import randint


async def foo():
return randint(1,100)

async def main():
the_futures = []
the_futures.append(asyncio.ensure_future(foo()))
the_futures.append(asyncio.ensure_future(foo()))
the_futures.append(asyncio.ensure_future(foo()))
the_futures.append(asyncio.ensure_future(foo()))

for coro in asyncio.as_completed(the_futures):
earliest_result = await coro
print(earliest_result)
asyncio.run(main())

同步原语

章节官方文档: https://docs.python.org/zh-cn/3/library/asyncio-sync.html

原语
所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可被中断
在操作系统中,某些被进程调用的操作,如队列操作、对信号量的操作、检查启动外设操作等,一旦开始执行,就不能被中断,否则就会出现操作错误,造成系统混乱

asyncio 同步原语被设计为与 threading 模块的类似,但有两个关键注意事项:

  • asyncio 原语不是线程安全的,因此它们不应被用于 OS 线程同步 (而应当使用 threading);
  • 这些同步原语的方法不接受 timeout 参数;请使用 asyncio.wait_for() 函数来执行带有超时的操作

Lock 锁

class asyncio.Lock(*)
实现一个用于 asyncio 任务的互斥锁。 非线程安全
asyncio 锁可被用来保证对共享资源的独占访问
推荐通过 async with 语句来使用 Lock

方法 信息 特殊
coroutine acquire() 获取锁
此方法会等待直至锁为 unlocked,将其设为 locked 并返回 True
当有一个以上的协程在 acquire() 中被阻塞则会等待解锁,最终只有一个协程会被执行
公平的的获取:
等待的协程服从先等先服务原则
release() 释放锁
当锁为 locked 时,将其设为 unlocked 并返回
如果锁为 unlocked,则会引发 RuntimeError
locked() 如果锁为 locked 则返回 True
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio


async def using_toilet(toilet_lock, name:str):
print('{} 尝试获取厕所使用权'.format(name))
async with toilet_lock:
print('{} 正在使用厕所'.format(name))
await asyncio.sleep(4)
print('{} 厕所使用完毕'.format(name))

async def using_toilet_without_lock(name:str):
print('{} 尝试获取厕所使用权'.format(name))
print('{} 正在使用厕所'.format(name))
await asyncio.sleep(4)
print('{} 厕所使用完毕'.format(name))

async def main():
print('\n并行无锁')
futures = asyncio.wait([
using_toilet_without_lock(name)
for name in ['DIO', 'JOJO', 'XISA']
])
await asyncio.ensure_future(futures)

print('\n并行有锁')
toilet_lock = asyncio.Lock()
futures = asyncio.wait([
using_toilet(toilet_lock, name)
for name in ['DIO', 'JOJO', 'XISA']
])
await asyncio.ensure_future(futures)


asyncio.run(main())

Event 事件

class asyncio.Event(*)
事件对象。 该对象不是线程安全的。

asyncio 事件可被用来通知多个 asyncio 任务已经有事件发生。

Event 对象会管理一个内部旗标,该旗标初始时将被设为 false

  • set() 方法将其设为 true
  • clear() 方法将其设为 false
  • wait() 方法会阻塞直至该旗标被设为 true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio


async def waiter(event):
print('意大利炮已上膛,等待发射命令中......')
await event.wait()
print('开炮!!!开炮!!!开炮!!!')

async def main():
event = asyncio.Event() # 创建一个Event对象
# create_task 生成一个 Task,这个 Task 立即执行
# 然后碰到event.wait()会一直等待,直到 Event 被 set
waiter_task = asyncio.create_task(waiter(event))

# 等待3秒后,set Event
await asyncio.sleep(3)
event.set()

# 虽然说在Event set了以后,Task会运行进行直到运行完毕
# 但加个等待以确保100% 完成是个好习惯
await waiter_task

asyncio.run(main())

Condition 条件

asyncio 条件原语可被任务用于等待某个事件发生,然后获取对共享资源的独占访问

在本质上,Condition 对象合并了 EventLock 的功能
多个 Condition 对象有可能共享一个 Lock,这允许关注于共享资源的特定状态的不同任务实现对共享资源的协同独占访问

可选的 lock 参数必须为 Lock 对象或 None。 在后一种情况下会自动创建一个新的 Lock 对象。

推荐通过 async with 语句来使用 Condition

|方法|信息|特殊|
|–|–|
|coroutine acquire()|获取下层的锁
此方法会等待直至下层的锁为 unlocked,将其设为 locked 并返回 True|
|notify(n=1)|唤醒最多 n 个正在等待此条件的任务(默认为 1 个)
如果没有任务正在等待则此方法为空操作。
锁必须在此方法被调用前被获取并在随后被快速释放
|如果通过一个 unlocked 锁调用则会引发 RuntimeError|
|locked()|如果下层的锁已被获取则返回 True||
|notify_all()|唤醒所有正在等待此条件的任务。此方法的行为类似于 notify(),但会唤醒所有正在等待的任务。锁必须在此方法被调用前被获取并在随后被快速释放| 如果通过一个 unlocked 锁调用则会引发 RuntimeError|
|release()|释放下层的锁|当在未锁定的锁上发起调用时,会引发 RuntimeError|
|coroutine wait()|等待直至收到通知
这个方法会释放下层的锁,然后保持阻塞直到被 notify()notify_all() 调用所唤醒。
一旦被唤醒,Condition 会重新获取它的锁并且此方法将返回 True|当此方法被调用时如果调用方任务未获得锁,则会引发 RuntimeError|
|coroutine wait_for(predicate)|等待直到目标值变为 true
目标必须为一个可调用对象,其结果将被解读为一个布尔值。 最终的值将为返回值。|

Semaphore 信号量

信号量会管理一个内部计数器,该计数器会随每次 acquire() 调用递减并随每次 release() 调用递增。计数器的值永远不会降到零以下
acquire() 发现其值为零时,它将保持阻塞直到有某个任务调用了 release()

可选的 value 参数用来为内部计数器赋初始值 (默认值为 1)。 如果给定的值小于 0 则会引发 ValueError

方法 信息
coroutine acquire() 获取一个信号量。
如果内部计数器的值大于零,则将其减一并立即返回 True
如果其值为零,则会等待直到 release() 并调用并返回 True
locked() 如果信号量对象无法被立即获取则返回 True
release() 释放一个信号量对象,将内部计数器的值加1
可以唤醒一个正在等待获取信号量对象的任务。
不同于 BoundedSemaphoreSemaphore 允许执行的 release() 调用多于 acquire() 调用

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1, *)
绑定的信号量对象
BoundedSemaphore 是特殊版本的 Semaphore,如果在 release() 中内部计数器值增加到初始 value 以上它将引发一个 ValueError

未完成

位置:https://docs.python.org/zh-cn/3/library/asyncio-subprocess.html


案例

串行等待运行

以下代码段会在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")
# 直接运行并等待执行完毕(简单来说就是串行执行的协程)
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")

asyncio.run(main())
>预期的输出:  
1
2
3
4
started at 17:13:52
hello
world
finished at 17:13:55
[Back](#BackExampleJustAwait)

asyncio.create_task 基础并行运行

等待到两个协程都执行完毕,因为是并行的,预计会用两秒钟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
task1 = asyncio.create_task(say_after(2, 'world'))
task2 = asyncio.create_task(say_after(1, 'hello'))
print(f"started at {time.strftime('%X')}")

# 等待到两个协程都执行完毕,因为是并行的,预计会用两秒钟
await task1
await task2
print(f"finished at {time.strftime('%X')}")

asyncio.run(main())
[Back](#BackExampleCreateTaskMostBasing)

async with 运行Lock

通过async with来运行Lock能够能够让代码更加美观

1
2
3
4
5
lock = asyncio.Lock()

# ... later
async with lock:
# access shared state

这等价于:

1
2
3
4
5
6
7
8
9
10
lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
# access shared state
finally:
lock.release()

lock = asyncio.Lock()
[Back](#AsyncWithRunLock)

async with 运行Condition

通过async with来运行Lock能够能够让代码更加美观

1
2
3
4
5
cond = asyncio.Condition()

# ... later
async with cond:
await cond.wait()

这等价于:

1
2
3
4
5
6
7
8
cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
await cond.wait()
finally:
cond.release()
[Back](#AsyncWithRunCondition)