Python Coroutine
协程 coroutine 不知道是从什么时候开始的,感觉我第一次看到是 lua 里面支持 yield 。后面看到就是 javascript 里面的 Promise,async 和 await。
以前写 Javascript 的时候容易会遇到 callback hell,似乎 Promise 就是出来解决这个问题的,让你可以用同步的方式写异步程序。例如你有三个异步请求可以同时发出去,而后面的结果又需要这三个的结果都回来才能继续,那就可以用类似下面的伪代码,整体执行时间是最长的那个。
1res1 = await test1
2res2 = await test2
3console.log(res1, res2)
Python 里面似乎也类似。我目前理解主要就是让程序可以「同步」执行,但是又避免了需要维护锁的问题,没有锁就不会有死锁了吧。。。
解释下同步,主要是针对对于 cpu 资源的占用。对于计算型的程序,实际上每时每刻都在利用 cpu 做计算,这样就算把计算拆分成了多个计算程序,让他们同时运行,那同一时刻还是只有一个程序在利用 cpu 资源执行,这样并行实际并不能提升效率。所以对于纯计算型任务,可以通过多进程利用多个 cpu。
但是实际我们的程序执行的时候,并不全是 cpu 计算,有时候会需要等网络 io,文件 io 等,做这些事情的时候实际上 cpu 是空闲的。协程就是让这些程序在等待的时候,把控制权交出来,让其他程序运行。那个 yield
关键字就是做这个事情的, yield
很像 return
,遇到的时候就会返回,暂停程序的执行,等到适当的时候又可以从暂停的地方继续执行。
以前是使用 @asyncio.coroutine
和 yield from
来创建协程,似乎 3.10 之后那个装饰器就要被废弃了,替代使用 async/await
来创建,直接替换就可以。
1@asyncio.coroutine
2def old_style_coroutine():
3 yield from asyncio.sleep(1)
4
5async def main():
6 await old_style_coroutine()
Python 的协程文档提供了一些例子,摘取一些。
1import asyncio
2
3async def test():
4 asynio.sleep(1)
5
6asyncio.run(test())
定义这样的 test
函数叫做 协程函数
, test()
返回的叫做 协程对象
,和普通函数不一样, test()
并不会执行这个函数,需要使用协程相关的命令才行,例如 asyncio.run(test())
。
1import asyncio
2
3async def nested():
4 print('started')
5 return 42
6
7async def main():
8 # Schedule nested() to run soon concurrently
9 # with "main()".
10 task = asyncio.create_task(nested())
11
12 # "task" can now be used to cancel "nested()", or
13 # can simply be awaited to wait until it is complete:
14 await asyncio.sleep(5)
15 print('after sleep')
16 await task
17asyncio.run(main())
类似这个例子, nested
这个函数实际是在 task 赋值的时候就开始执行了。但是在等到了 await task
才获取他的结果。当然如果那个 nested
执行时间比较长,那等到了 await 语句的时候,会等他执行完毕才继续。
这个和那个 javascript 的简单例子就比较像了,整体执行时间就是长的那个,就是并行的感觉。 asyncio
还提供了 asyncio.gather()
,提供了方便的方法把并行的任务绑定在一起,
使用 asyncio 的话,协程里面执行的东西也需要是协程友好的,如果执行原始的阻塞的代码,那这部分代码无法并行。比如把上面的代码修改一下。
1import asyncio
2from time import time, sleep
3
4async def nested():
5 print('started')
6 sleep(1)
7 print('after 1s')
8 await asyncio.sleep(2)
9 return 42
10
11async def main():
12 # Schedule nested() to run soon concurrently
13 # with "main()".
14 task = asyncio.create_task(nested())
15
16 # "task" can now be used to cancel "nested()", or
17 # can simply be awaited to wait until it is complete:
18 await asyncio.gather(asyncio.sleep(5), task)
19
20start = time()
21asyncio.run(main())
22print(time() - start)
23
24# started
25# after 1s
26# 6.005960941314697
可以看到输出,是 6 秒,本身按照前面说的,最长的协程时间应该是 5 秒的,但是整体执行了 6 秒,因为其中 sleep(1)
这一秒执行的时候其他协程无法执行。摘抄文档里面的说明。
1不应该直接调用阻塞( CPU 绑定)代码。例如,如果一个函数执行1秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟1秒。
2可以使用执行器在不同的线程甚至不同的进程中运行任务,以避免使用事件循环阻塞OS线程。有关详细信息,请参见 loop.run_in_executor() 方法。
所以就有一堆的 asyncio.xx
出现了,比如 asyncio.open_connection
, asyncio.Lock
, asyncio.create_subprocess_shell
, asyncio.Queue
等。下面这个是一个 Queue
的例子。
1import asyncio
2import random
3import time
4
5
6async def worker(name, queue):
7 while True:
8 # Get a "work item" out of the queue.
9 sleep_for = await queue.get()
10
11 # Sleep for the "sleep_for" seconds.
12 await asyncio.sleep(sleep_for)
13
14 # Notify the queue that the "work item" has been processed.
15 queue.task_done()
16
17 print(f'{name} has slept for {sleep_for:.2f} seconds')
18
19
20async def main():
21 # Create a queue that we will use to store our "workload".
22 queue = asyncio.Queue()
23
24 # Generate random timings and put them into the queue.
25 total_sleep_time = 0
26 for _ in range(20):
27 sleep_for = random.uniform(0.05, 1.0)
28 total_sleep_time += sleep_for
29 queue.put_nowait(sleep_for)
30
31 # Create three worker tasks to process the queue concurrently.
32 tasks = []
33 for i in range(3):
34 task = asyncio.create_task(worker(f'worker-{i}', queue))
35 tasks.append(task)
36
37 # Wait until the queue is fully processed.
38 started_at = time.monotonic()
39 await queue.join()
40 total_slept_for = time.monotonic() - started_at
41
42 # Cancel our worker tasks.
43 for task in tasks:
44 task.cancel()
45 # Wait until all worker tasks are cancelled.
46 await asyncio.gather(*tasks, return_exceptions=True)
47
48 print('====')
49 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
50 print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
51
52
53asyncio.run(main())
这里面需要注意的是 task
赋值的时候,实际 worker 就开始运行了。接下来会程序会在 await queue.join()
等待队列都标记为 queue.task_done()
,这个标记动作完全是你的程序自己控制的,如果消耗了一个队列元素,但是标记了 2 次,那会发现队列没有执行完毕就会令 await queue.join()
执行完毕。在已有的 queue 都消耗完毕之后,所有的 worker 都会等待在 sleep_for = await queue.get()
,所以 queue.join
之后还需要执行 task.cancel
取消任务的继续执行。要注意 task.cancel
语句执行也是异步的,也并不是执行完毕任务就会结束了,相当于只是请求任务取消,还需要去用 await task
来确认是真的取消了。
asyncio 的核心是事件循环 loop,可以通过 asyncio.get_event_loop()
来获取当前 loop,然后通过 loop.run_until_complete(future)
这样的命令来执行协程。以及还有一些 loop.xxx
方法来创建任务和管理协程。这些似乎是 low level api,一般不用。
low level api 还有一个 Future,说是 Task 的父类,更底层。感觉 Future 可以用来联系多个协程用的。对于 await future
语句,只有 future.set_result()
被调用的时候才会继续执行。这样比如有多个协程要执行,其中有些又有依赖,那就可以创建一个 feture,然后在被依赖的协程里面计算完毕的时候,执行 future.set_result() ,然后依赖他的协程里面 await future 就会获取到执行结果,也就可以继续执行了。
1async def set_after(fut, delay, value):
2 # Sleep for *delay* seconds.
3 await asyncio.sleep(delay)
4
5 # Set *value* as a result of *fut* Future.
6 fut.set_result(value)
7
8async def main():
9 # Get the current event loop.
10 loop = asyncio.get_running_loop()
11
12 # Create a new Future object.
13 fut = loop.create_future()
14
15 # Run "set_after()" coroutine in a parallel Task.
16 # We are using the low-level "loop.create_task()" API here because
17 # we already have a reference to the event loop at hand.
18 # Otherwise we could have just used "asyncio.create_task()".
19 task = loop.create_task(
20 set_after(fut, 1, '... world'))
21
22 print('hello ...')
23 await task
24
25 # Wait until *fut* has a result (1 second) and print it.
26 print(await fut)
上面这个例子是把文档里面改了一下,那个 task 里面会执行 fut.set_result,如果去掉这句,await task 是可以执行完毕的,但是 await fut 就会一直等待了。