Python asyncio异步编程深度解析:事件循环、协程与await关键字

作者:Python编程 日期:2026-07-03 09:48:37   阅读:286 次   
从一个线上事故说起 上周五晚上十一点,一个爬虫服务在高峰期突然卡死——所有请求排队等待,CPU占用却只有5%。重启后恢复正常,但第二天同一时间又复现。排查了数据库连接池、线程锁、内存泄漏,最后用strace跟踪系统调用才发现:某个第三方API的HTTP请求用了同步库requests,在协程里阻塞了事件循环。 这个真实案例(据《asyncio 异步编程(一):事件循环、协程对象、await 关键字深

从一个线上事故说起

上周五晚上十一点,一个爬虫服务在高峰期突然卡死——所有请求排队等待,CPU占用却只有5%。重启后恢复正常,但第二天同一时间又复现。排查了数据库连接池、线程锁、内存泄漏,最后用strace跟踪系统调用才发现:某个第三方API的HTTP请求用了同步库requests,在协程里阻塞了事件循环

这个真实案例(据《asyncio 异步编程(一):事件循环、协程对象、await 关键字深入》https://blog.csdn.net/zzfmlxs/article/details/162516157)揭示了一个普遍问题:很多人对asyncio的理解还停留在"会用async/await写个demo"的层面,一旦遇到生产环境的并发问题就束手无策。

事件循环:那个看不见的调度器

事件循环是asyncio的心脏。简单说,它就是一个死循环,不断从任务队列里取出协程对象执行,遇到await就挂起当前任务,切到下一个。

Python 3.7之后,asyncio.run()帮你创建和关闭事件循环,但底层原理必须吃透:

import asyncio

async def main():
    print("开始")
    await asyncio.sleep(1)  # 遇到await,交出控制权
    print("1秒后继续")

# asyncio.run() 做了三件事:
# 1. 创建事件循环
# 2. 将main()协程包装成Task放入队列
# 3. 持续循环直到所有Task完成
asyncio.run(main())

事件循环的执行流程可以用伪代码描述:

while tasks_not_empty:
    task = get_next_ready_task()
    if task.is_completed:
        handle_result(task)
    elif task.is_waiting:
        check_if_ready(task)  # IO完成?定时器到期?
    else:
        run_until_await(task)  # 执行到下一个await点

协程对象:不是函数调用

这是asyncio最容易被误解的概念。调用一个async函数不会执行它,只会返回一个协程对象:

async def hello():
    print("hello")

# 这不会打印任何东西!
coro = hello()
print(type(coro))  # <class 'coroutine'>

# 必须通过await或asyncio.run来执行
asyncio.run(hello())  # 这才会打印 "hello"

协程对象的三种运行方式:

方式场景示例
asyncio.run()程序入口asyncio.run(main())
await协程内调用协程await hello()
asyncio.create_task()并发执行asyncio.create_task(hello())

一个常见错误是在协程内直接调用async函数而不await:

async def bad_example():
    hello()  # ❌ 不会执行,还会触发RuntimeWarning
    await hello()  # ✅ 正确

async def good_example():
    task = asyncio.create_task(hello())  # ✅ 并发调度
    # 继续做其他事情...
    await task  # 等待结果

await关键字:交出控制权的时机

await的本质是告诉事件循环:"我在等某个操作完成,先去执行别的任务吧"。它只能用在async函数内部,后面必须跟一个可等待对象(Awaitable)。

Python中三种可等待对象:

  1. 协程(Coroutine):async def定义的函数返回的对象
  2. 任务(Task):对协程的封装,用于并发调度
  3. 未来对象(Future):低层API,表示未来某个时刻会有结果
import asyncio

async def fetch_data(url):
    # 模拟网络请求
    await asyncio.sleep(2)
    return f"Data from {url}"

async def main():
    # ❌ 串行:总共4秒
    data1 = await fetch_data("api1.com")
    data2 = await fetch_data("api2.com")

    # ✅ 并发:总共2秒
    task1 = asyncio.create_task(fetch_data("api1.com"))
    task2 = asyncio.create_task(fetch_data("api2.com"))
    data1 = await task1
    data2 = await task2

事件循环的陷阱:同步阻塞

回到开头的线上事故——在协程中使用同步阻塞操作会怎样?

import asyncio
import requests  # 同步HTTP库
import time

async def bad_fetch():
    # ❌ 这会阻塞整个事件循环!
    resp = requests.get("https://slow-api.example.com")
    return resp.json()

async def good_fetch():
    # ✅ 使用异步HTTP库
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get("https://slow-api.example.com") as resp:
            return await resp.json()

async def workaround_fetch():
    # ⚠️ 不得已时的workaround:用线程池执行同步代码
    loop = asyncio.get_event_loop()
    resp = await loop.run_in_executor(
        None,  # 默认线程池
        lambda: requests.get("https://slow-api.example.com")
    )
    return resp.json()

三种方案对比:

方案是否阻塞事件循环并发能力推荐程度
同步库直接调用完全丧失❌ 绝对禁止
异步库替代完整保留✅ 首选
run_in_executor受线程池限制⚠️ 过渡方案

asyncio.run()的底层机制

很多人只知道asyncio.run(main())能跑起来,但不理解它做了什么。我们拆解一下:

# asyncio.run() 的简化实现逻辑
def run(coro):
    # 1. 创建新的事件循环
    loop = asyncio.new_event_loop()
    
    try:
        # 2. 设置为当前线程的事件循环
        asyncio.set_event_loop(loop)
        
        # 3. 将协程包装成Task并运行
        return loop.run_until_complete(coro)
    finally:
        # 4. 清理:取消所有未完成任务
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
            loop.run_until_complete(loop.shutdown_default_executor())
        finally:
            # 5. 关闭事件循环
            asyncio.set_event_loop(None)
            loop.close()

关键点:asyncio.run()每次调用都会创建新的事件循环,所以不应该在已有事件循环中嵌套调用。这也是为什么在Jupyter Notebook中直接asyncio.run()会报错——Notebook本身已经运行了一个事件循环。

实战:构建高并发爬虫

将以上知识综合运用,构建一个真正的高并发爬虫:

import asyncio
import aiohttp
from asyncio import Semaphore

class AsyncCrawler:
    def __init__(self, concurrency=10):
        self.semaphore = Semaphore(concurrency)  # 控制并发数
        self.session = None
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, *args):
        await self.session.close()
    
    async def fetch(self, url, retry=3):
        async with self.semaphore:  # 信号量控制并发
            for attempt in range(retry):
                try:
                    async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                        if resp.status == 200:
                            return await resp.text()
                        elif resp.status == 429:
                            # 限流:指数退避
                            await asyncio.sleep(2 ** attempt)
                        else:
                            return None
                except asyncio.TimeoutError:
                    if attempt == retry - 1:
                        return None
                    await asyncio.sleep(1)
        return None
    
    async def crawl(self, urls):
        tasks = [self.fetch(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# 使用示例
async def main():
    urls = [f"https://httpbin.org/delay/{i%3}" for i in range(50)]
    async with AsyncCrawler(concurrency=5) as crawler:
        results = await crawler.crawl(urls)
        success = sum(1 for r in results if r and not isinstance(r, Exception))
        print(f"成功: {success}/{len(urls)}")

asyncio.run(main())

这个爬虫实现了三个关键能力:

  1. 并发控制:Semaphore限制同时进行的请求数,防止过载
  2. 重试与退避:指数退避策略应对429限流和超时
  3. 资源管理:异步上下文管理器确保Session正确关闭

常见asyncio反模式

总结了生产环境中最常见的五个反模式:

# 反模式1:忘记await
async def bug1():
    coro = some_async_function()  # ❌ 协程未执行
    result = await some_async_function()  # ✅

# 反模式2:在循环中串行await
async def bug2(urls):
    results = []
    for url in urls:
        result = await fetch(url)  # ❌ 串行,慢
        results.append(result)
    
    # ✅ 用gather并发
    results = await asyncio.gather(*[fetch(url) for url in urls])

# 反模式3:创建Task后不保存引用
async def bug3():
    asyncio.create_task(background_work())  # ❌ Task可能被GC回收
    # ✅ 保存引用
    task = asyncio.create_task(background_work())
    # 或在不需要结果时用background_tasks集合管理

# 反模式4:无限创建Task
async def bug4(urls):
    # ❌ 10万个URL同时发起,内存爆炸
    tasks = [asyncio.create_task(fetch(url)) for url in urls]
    
    # ✅ 使用Semaphore或asyncio.as_completed控制并发
    sem = asyncio.Semaphore(100)
    async def limited_fetch(url):
        async with sem:
            return await fetch(url)
    tasks = [asyncio.create_task(limited_fetch(url)) for url in urls]

# 反模式5:在async函数中使用time.sleep()
async def bug5():
    time.sleep(5)   # ❌ 阻塞整个事件循环
    await asyncio.sleep(5)  # ✅ 非阻塞等待

Python 3.12+的asyncio新特性

Python持续改进asyncio生态,3.12和3.13版本带来了重要更新:

  • 3.12:TaskGroup更稳定的异常处理,改进的TaskGroup上下文传播
  • 3.13:改进的asyncio.run()实现,更好的调试信息
  • 3.14(开发中):free-threaded模式下asyncio的性能优化
# Python 3.11+ 推荐使用TaskGroup替代gather
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch("api1.com"))
        task2 = tg.create_task(fetch("api2.com"))
        task3 = tg.create_task(fetch("api3.com"))
    # TaskGroup退出时自动等待所有任务完成
    # 任一任务异常时自动取消其余任务
    print(task1.result(), task2.result(), task3.result())

总结

asyncio不是银弹,但在IO密集型场景下,它是Python最强大的并发武器。理解事件循环的调度机制、协程对象的本质、await的交出控制权语义,是从"会用"到"用好"asyncio的关键。

记住一句话:在协程里,永远不要做任何同步阻塞操作。如果必须使用同步库,用run_in_executor作为过渡,但最终一定要替换为异步原生方案。Python asyncio异步编程代码示例

发表评论

文明上网,从我做起!

评论列表COMMENT

  • 暂时还没有人发表评论。