Python asyncio异步编程深度解析:事件循环、协程与await关键字
从一个线上事故说起
上周五晚上十一点,一个爬虫服务在高峰期突然卡死——所有请求排队等待,CPU占用却只有5%。重启后恢复正常,但第二天同一时间又复现。排查了数据库连接池、线程锁、内存泄漏,最后用strace跟踪系统调用才发现:某个第三方API的HTTP请求用了同步库requests,在协程里阻塞了事件循环。
这个真实案例(据《asyncio 异步编程(一):事件循环、协程对象、await 关键字深入》https://blog.csdn.net/zzfmlxs/article/details/162516157)揭示了一个普遍问题:很多人对asyncio的理解还停留在"会用async/await写个demo"的层面,一旦遇到生产环境的并发问题就束手无策。
事件循环:那个看不见的调度器
事件循环是asyncio的心脏。简单说,它就是一个死循环,不断从任务队列里取出协程对象执行,遇到await就挂起当前任务,切到下一个。
Python 3.7之后,asyncio.run()帮你创建和关闭事件循环,但底层原理必须吃透:
import asyncio
async def main():
print("开始")
await asyncio.sleep(1) # 遇到await,交出控制权
print("1秒后继续")
# asyncio.run() 做了三件事:
# 1. 创建事件循环
# 2. 将main()协程包装成Task放入队列
# 3. 持续循环直到所有Task完成
asyncio.run(main())事件循环的执行流程可以用伪代码描述:
while tasks_not_empty:
task = get_next_ready_task()
if task.is_completed:
handle_result(task)
elif task.is_waiting:
check_if_ready(task) # IO完成?定时器到期?
else:
run_until_await(task) # 执行到下一个await点协程对象:不是函数调用
这是asyncio最容易被误解的概念。调用一个async函数不会执行它,只会返回一个协程对象:
async def hello():
print("hello")
# 这不会打印任何东西!
coro = hello()
print(type(coro)) # <class 'coroutine'>
# 必须通过await或asyncio.run来执行
asyncio.run(hello()) # 这才会打印 "hello"协程对象的三种运行方式:
| 方式 | 场景 | 示例 |
|---|---|---|
asyncio.run() | 程序入口 | asyncio.run(main()) |
await | 协程内调用协程 | await hello() |
asyncio.create_task() | 并发执行 | asyncio.create_task(hello()) |
一个常见错误是在协程内直接调用async函数而不await:
async def bad_example():
hello() # ❌ 不会执行,还会触发RuntimeWarning
await hello() # ✅ 正确
async def good_example():
task = asyncio.create_task(hello()) # ✅ 并发调度
# 继续做其他事情...
await task # 等待结果await关键字:交出控制权的时机
await的本质是告诉事件循环:"我在等某个操作完成,先去执行别的任务吧"。它只能用在async函数内部,后面必须跟一个可等待对象(Awaitable)。
Python中三种可等待对象:
- 协程(Coroutine):async def定义的函数返回的对象
- 任务(Task):对协程的封装,用于并发调度
- 未来对象(Future):低层API,表示未来某个时刻会有结果
import asyncio
async def fetch_data(url):
# 模拟网络请求
await asyncio.sleep(2)
return f"Data from {url}"
async def main():
# ❌ 串行:总共4秒
data1 = await fetch_data("api1.com")
data2 = await fetch_data("api2.com")
# ✅ 并发:总共2秒
task1 = asyncio.create_task(fetch_data("api1.com"))
task2 = asyncio.create_task(fetch_data("api2.com"))
data1 = await task1
data2 = await task2事件循环的陷阱:同步阻塞
回到开头的线上事故——在协程中使用同步阻塞操作会怎样?
import asyncio
import requests # 同步HTTP库
import time
async def bad_fetch():
# ❌ 这会阻塞整个事件循环!
resp = requests.get("https://slow-api.example.com")
return resp.json()
async def good_fetch():
# ✅ 使用异步HTTP库
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://slow-api.example.com") as resp:
return await resp.json()
async def workaround_fetch():
# ⚠️ 不得已时的workaround:用线程池执行同步代码
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(
None, # 默认线程池
lambda: requests.get("https://slow-api.example.com")
)
return resp.json()三种方案对比:
| 方案 | 是否阻塞事件循环 | 并发能力 | 推荐程度 |
|---|---|---|---|
| 同步库直接调用 | 是 | 完全丧失 | ❌ 绝对禁止 |
| 异步库替代 | 否 | 完整保留 | ✅ 首选 |
| run_in_executor | 否 | 受线程池限制 | ⚠️ 过渡方案 |
asyncio.run()的底层机制
很多人只知道asyncio.run(main())能跑起来,但不理解它做了什么。我们拆解一下:
# asyncio.run() 的简化实现逻辑
def run(coro):
# 1. 创建新的事件循环
loop = asyncio.new_event_loop()
try:
# 2. 设置为当前线程的事件循环
asyncio.set_event_loop(loop)
# 3. 将协程包装成Task并运行
return loop.run_until_complete(coro)
finally:
# 4. 清理:取消所有未完成任务
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
# 5. 关闭事件循环
asyncio.set_event_loop(None)
loop.close()关键点:asyncio.run()每次调用都会创建新的事件循环,所以不应该在已有事件循环中嵌套调用。这也是为什么在Jupyter Notebook中直接asyncio.run()会报错——Notebook本身已经运行了一个事件循环。
实战:构建高并发爬虫
将以上知识综合运用,构建一个真正的高并发爬虫:
import asyncio
import aiohttp
from asyncio import Semaphore
class AsyncCrawler:
def __init__(self, concurrency=10):
self.semaphore = Semaphore(concurrency) # 控制并发数
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url, retry=3):
async with self.semaphore: # 信号量控制并发
for attempt in range(retry):
try:
async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
return await resp.text()
elif resp.status == 429:
# 限流:指数退避
await asyncio.sleep(2 ** attempt)
else:
return None
except asyncio.TimeoutError:
if attempt == retry - 1:
return None
await asyncio.sleep(1)
return None
async def crawl(self, urls):
tasks = [self.fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 使用示例
async def main():
urls = [f"https://httpbin.org/delay/{i%3}" for i in range(50)]
async with AsyncCrawler(concurrency=5) as crawler:
results = await crawler.crawl(urls)
success = sum(1 for r in results if r and not isinstance(r, Exception))
print(f"成功: {success}/{len(urls)}")
asyncio.run(main())这个爬虫实现了三个关键能力:
- 并发控制:Semaphore限制同时进行的请求数,防止过载
- 重试与退避:指数退避策略应对429限流和超时
- 资源管理:异步上下文管理器确保Session正确关闭
常见asyncio反模式
总结了生产环境中最常见的五个反模式:
# 反模式1:忘记await
async def bug1():
coro = some_async_function() # ❌ 协程未执行
result = await some_async_function() # ✅
# 反模式2:在循环中串行await
async def bug2(urls):
results = []
for url in urls:
result = await fetch(url) # ❌ 串行,慢
results.append(result)
# ✅ 用gather并发
results = await asyncio.gather(*[fetch(url) for url in urls])
# 反模式3:创建Task后不保存引用
async def bug3():
asyncio.create_task(background_work()) # ❌ Task可能被GC回收
# ✅ 保存引用
task = asyncio.create_task(background_work())
# 或在不需要结果时用background_tasks集合管理
# 反模式4:无限创建Task
async def bug4(urls):
# ❌ 10万个URL同时发起,内存爆炸
tasks = [asyncio.create_task(fetch(url)) for url in urls]
# ✅ 使用Semaphore或asyncio.as_completed控制并发
sem = asyncio.Semaphore(100)
async def limited_fetch(url):
async with sem:
return await fetch(url)
tasks = [asyncio.create_task(limited_fetch(url)) for url in urls]
# 反模式5:在async函数中使用time.sleep()
async def bug5():
time.sleep(5) # ❌ 阻塞整个事件循环
await asyncio.sleep(5) # ✅ 非阻塞等待Python 3.12+的asyncio新特性
Python持续改进asyncio生态,3.12和3.13版本带来了重要更新:
- 3.12:TaskGroup更稳定的异常处理,改进的TaskGroup上下文传播
- 3.13:改进的asyncio.run()实现,更好的调试信息
- 3.14(开发中):free-threaded模式下asyncio的性能优化
# Python 3.11+ 推荐使用TaskGroup替代gather
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("api1.com"))
task2 = tg.create_task(fetch("api2.com"))
task3 = tg.create_task(fetch("api3.com"))
# TaskGroup退出时自动等待所有任务完成
# 任一任务异常时自动取消其余任务
print(task1.result(), task2.result(), task3.result())总结
asyncio不是银弹,但在IO密集型场景下,它是Python最强大的并发武器。理解事件循环的调度机制、协程对象的本质、await的交出控制权语义,是从"会用"到"用好"asyncio的关键。
记住一句话:在协程里,永远不要做任何同步阻塞操作。如果必须使用同步库,用run_in_executor作为过渡,但最终一定要替换为异步原生方案。
评论列表COMMENT
- 暂时还没有人发表评论。
发表评论
文明上网,从我做起!