Python Coroutine

协程 coroutine 不知道是从什么时候开始的,感觉我第一次看到是 lua 里面支持 yield 。后面看到就是 javascript 里面的 Promise,async 和 await。

以前写 Javascript 的时候容易会遇到 callback hell,似乎 Promise 就是出来解决这个问题的,让你可以用同步的方式写异步程序。例如你有三个异步请求可以同时发出去,而后面的结果又需要这三个的结果都回来才能继续,那就可以用类似下面的伪代码,整体执行时间是最长的那个。

res1 = await test1
res2 = await test2
console.log(res1, res2)

Python 里面似乎也类似。我目前理解主要就是让程序可以「同步」执行,但是又避免了需要维护锁的问题,没有锁就不会有死锁了吧。。。

解释下同步,主要是针对对于 cpu 资源的占用。对于计算型的程序,实际上每时每刻都在利用 cpu 做计算,这样就算把计算拆分成了多个计算程序,让他们同时运行,那同一时刻还是只有一个程序在利用 cpu 资源执行,这样并行实际并不能提升效率。所以对于纯计算型任务,可以通过多进程利用多个 cpu。

但是实际我们的程序执行的时候,并不全是 cpu 计算,有时候会需要等网络 io,文件 io 等,做这些事情的时候实际上 cpu 是空闲的。协程就是让这些程序在等待的时候,把控制权交出来,让其他程序运行。那个 yield 关键字就是做这个事情的, yield 很像 return ,遇到的时候就会返回,暂停程序的执行,等到适当的时候又可以从暂停的地方继续执行。

以前是使用 @asyncio.coroutineyield from 来创建协程,似乎 3.10 之后那个装饰器就要被废弃了,替代使用 async/await 来创建,直接替换就可以。

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

Python 的协程文档提供了一些例子,摘取一些。

import asyncio

async def test():
    asynio.sleep(1)

asyncio.run(test())

定义这样的 test 函数叫做 协程函数test() 返回的叫做 协程对象 ,和普通函数不一样, test() 并不会执行这个函数,需要使用协程相关的命令才行,例如 asyncio.run(test())

import asyncio

async def nested():
    print('started')
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await asyncio.sleep(5)
    print('after sleep')
    await task
asyncio.run(main())

类似这个例子, nested 这个函数实际是在 task 赋值的时候就开始执行了。但是在等到了 await task 才获取他的结果。当然如果那个 nested 执行时间比较长,那等到了 await 语句的时候,会等他执行完毕才继续。

这个和那个 javascript 的简单例子就比较像了,整体执行时间就是长的那个,就是并行的感觉。 asyncio 还提供了 asyncio.gather() ,提供了方便的方法把并行的任务绑定在一起,

使用 asyncio 的话,协程里面执行的东西也需要是协程友好的,如果执行原始的阻塞的代码,那这部分代码无法并行。比如把上面的代码修改一下。

import asyncio
from time import time, sleep

async def nested():
    print('started')
    sleep(1)
    print('after 1s')
    await asyncio.sleep(2)
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await asyncio.gather(asyncio.sleep(5), task)

start = time()
asyncio.run(main())
print(time() - start)

# started
# after 1s
# 6.005960941314697

可以看到输出,是 6 秒,本身按照前面说的,最长的协程时间应该是 5 秒的,但是整体执行了 6 秒,因为其中 sleep(1) 这一秒执行的时候其他协程无法执行。摘抄文档里面的说明。

不应该直接调用阻塞( CPU 绑定)代码。例如,如果一个函数执行1秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟1秒。
可以使用执行器在不同的线程甚至不同的进程中运行任务,以避免使用事件循环阻塞OS线程。有关详细信息,请参见 loop.run_in_executor() 方法。

所以就有一堆的 asyncio.xx 出现了,比如 asyncio.open_connection, asyncio.Lock, asyncio.create_subprocess_shell, asyncio.Queue 等。下面这个是一个 Queue 的例子。

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

这里面需要注意的是 task 赋值的时候,实际 worker 就开始运行了。接下来会程序会在 await queue.join() 等待队列都标记为 queue.task_done() ,这个标记动作完全是你的程序自己控制的,如果消耗了一个队列元素,但是标记了 2 次,那会发现队列没有执行完毕就会令 await queue.join() 执行完毕。在已有的 queue 都消耗完毕之后,所有的 worker 都会等待在 sleep_for = await queue.get() ,所以 queue.join 之后还需要执行 task.cancel 取消任务的继续执行。要注意 task.cancel 语句执行也是异步的,也并不是执行完毕任务就会结束了,相当于只是请求任务取消,还需要去用 await task 来确认是真的取消了。

asyncio 的核心是事件循环 loop,可以通过 asyncio.get_event_loop() 来获取当前 loop,然后通过 loop.run_until_complete(future) 这样的命令来执行协程。以及还有一些 loop.xxx 方法来创建任务和管理协程。这些似乎是 low level api,一般不用。

low level api 还有一个 Future,说是 Task 的父类,更底层。感觉 Future 可以用来联系多个协程用的。对于 await future 语句,只有 future.set_result() 被调用的时候才会继续执行。这样比如有多个协程要执行,其中有些又有依赖,那就可以创建一个 feture,然后在被依赖的协程里面计算完毕的时候,执行 future.set_result() ,然后依赖他的协程里面 await future 就会获取到执行结果,也就可以继续执行了。

async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    task = loop.create_task(
        set_after(fut, 1, '... world'))

    print('hello ...')
    await task

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)

上面这个例子是把文档里面改了一下,那个 task 里面会执行 fut.set_result,如果去掉这句,await task 是可以执行完毕的,但是 await fut 就会一直等待了。