Python Coroutine
协程 coroutine 不知道是从什么时候开始的,感觉我第一次看到是 lua 里面支持 yield 。后面看到就是 javascript 里面的 Promise,async 和 await。
以前写 Javascript 的时候容易会遇到 callback hell,似乎 Promise 就是出来解决这个问题的,让你可以用同步的方式写异步程序。例如你有三个异步请求可以同时发出去,而后面的结果又需要这三个的结果都回来才能继续,那就可以用类似下面的伪代码,整体执行时间是最长的那个。
res1 = await test1
res2 = await test2
console.log(res1, res2)
Python 里面似乎也类似。我目前理解主要就是让程序可以「同步」执行,但是又避免了需要维护锁的问题,没有锁就不会有死锁了吧。。。
解释下同步,主要是针对对于 cpu 资源的占用。对于计算型的程序,实际上每时每刻都在利用 cpu 做计算,这样就算把计算拆分成了多个计算程序,让他们同时运行,那同一时刻还是只有一个程序在利用 cpu 资源执行,这样并行实际并不能提升效率。所以对于纯计算型任务,可以通过多进程利用多个 cpu。
但是实际我们的程序执行的时候,并不全是 cpu 计算,有时候会需要等网络 io,文件 io 等,做这些事情的时候实际上 cpu 是空闲的。协程就是让这些程序在等待的时候,把控制权交出来,让其他程序运行。那个 yield
关键字就是做这个事情的, yield
很像 return
,遇到的时候就会返回,暂停程序的执行,等到适当的时候又可以从暂停的地方继续执行。
以前是使用 @asyncio.coroutine
和 yield from
来创建协程,似乎 3.10 之后那个装饰器就要被废弃了,替代使用 async/await
来创建,直接替换就可以。
@asyncio.coroutine
def old_style_coroutine():
yield from asyncio.sleep(1)
async def main():
await old_style_coroutine()
Python 的协程文档提供了一些例子,摘取一些。
import asyncio
async def test():
asynio.sleep(1)
asyncio.run(test())
定义这样的 test
函数叫做 协程函数
, test()
返回的叫做 协程对象
,和普通函数不一样, test()
并不会执行这个函数,需要使用协程相关的命令才行,例如 asyncio.run(test())
。
import asyncio
async def nested():
print('started')
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await asyncio.sleep(5)
print('after sleep')
await task
asyncio.run(main())
类似这个例子, nested
这个函数实际是在 task 赋值的时候就开始执行了。但是在等到了 await task
才获取他的结果。当然如果那个 nested
执行时间比较长,那等到了 await 语句的时候,会等他执行完毕才继续。
这个和那个 javascript 的简单例子就比较像了,整体执行时间就是长的那个,就是并行的感觉。 asyncio
还提供了 asyncio.gather()
,提供了方便的方法把并行的任务绑定在一起,
使用 asyncio 的话,协程里面执行的东西也需要是协程友好的,如果执行原始的阻塞的代码,那这部分代码无法并行。比如把上面的代码修改一下。
import asyncio
from time import time, sleep
async def nested():
print('started')
sleep(1)
print('after 1s')
await asyncio.sleep(2)
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await asyncio.gather(asyncio.sleep(5), task)
start = time()
asyncio.run(main())
print(time() - start)
# started
# after 1s
# 6.005960941314697
可以看到输出,是 6 秒,本身按照前面说的,最长的协程时间应该是 5 秒的,但是整体执行了 6 秒,因为其中 sleep(1)
这一秒执行的时候其他协程无法执行。摘抄文档里面的说明。
不应该直接调用阻塞( CPU 绑定)代码。例如,如果一个函数执行1秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟1秒。
可以使用执行器在不同的线程甚至不同的进程中运行任务,以避免使用事件循环阻塞OS线程。有关详细信息,请参见 loop.run_in_executor() 方法。
所以就有一堆的 asyncio.xx
出现了,比如 asyncio.open_connection
, asyncio.Lock
, asyncio.create_subprocess_shell
, asyncio.Queue
等。下面这个是一个 Queue
的例子。
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
这里面需要注意的是 task
赋值的时候,实际 worker 就开始运行了。接下来会程序会在 await queue.join()
等待队列都标记为 queue.task_done()
,这个标记动作完全是你的程序自己控制的,如果消耗了一个队列元素,但是标记了 2 次,那会发现队列没有执行完毕就会令 await queue.join()
执行完毕。在已有的 queue 都消耗完毕之后,所有的 worker 都会等待在 sleep_for = await queue.get()
,所以 queue.join
之后还需要执行 task.cancel
取消任务的继续执行。要注意 task.cancel
语句执行也是异步的,也并不是执行完毕任务就会结束了,相当于只是请求任务取消,还需要去用 await task
来确认是真的取消了。
asyncio 的核心是事件循环 loop,可以通过 asyncio.get_event_loop()
来获取当前 loop,然后通过 loop.run_until_complete(future)
这样的命令来执行协程。以及还有一些 loop.xxx
方法来创建任务和管理协程。这些似乎是 low level api,一般不用。
low level api 还有一个 Future,说是 Task 的父类,更底层。感觉 Future 可以用来联系多个协程用的。对于 await future
语句,只有 future.set_result()
被调用的时候才会继续执行。这样比如有多个协程要执行,其中有些又有依赖,那就可以创建一个 feture,然后在被依赖的协程里面计算完毕的时候,执行 future.set_result() ,然后依赖他的协程里面 await future 就会获取到执行结果,也就可以继续执行了。
async def set_after(fut, delay, value):
# Sleep for *delay* seconds.
await asyncio.sleep(delay)
# Set *value* as a result of *fut* Future.
fut.set_result(value)
async def main():
# Get the current event loop.
loop = asyncio.get_running_loop()
# Create a new Future object.
fut = loop.create_future()
# Run "set_after()" coroutine in a parallel Task.
# We are using the low-level "loop.create_task()" API here because
# we already have a reference to the event loop at hand.
# Otherwise we could have just used "asyncio.create_task()".
task = loop.create_task(
set_after(fut, 1, '... world'))
print('hello ...')
await task
# Wait until *fut* has a result (1 second) and print it.
print(await fut)
上面这个例子是把文档里面改了一下,那个 task 里面会执行 fut.set_result,如果去掉这句,await task 是可以执行完毕的,但是 await fut 就会一直等待了。