本文是协程系列的收尾篇。看完你能回答:
事件循环底层怎么工作的?
asyncio.Queue / Semaphore / wait_for 怎么用?
协程之间共享数据要加锁吗?
协程怎么调试?
一、事件循环内部 1.1 事件循环 = 调度器 事件循环本质上就是一个 while True:
1 2 3 4 5 6 7 8 9 def event_loop (tasks ): ready = tasks while ready: for task in ready: if task.can_run(): task.step() if task.done(): ready.remove(task)
每次遇到 await,协程挂起,控制权回到事件循环。事件循环去跑别的 Task,等这个协程等待的条件满足了再把它加回就绪队列。
1.2 await 的本质 await 不是”等待”,而是”让出控制权”:
1 2 3 4 5 6 7 8 result = await task
1.3 time.sleep 为什么是毒药 1 2 3 4 5 6 7 async def bad (): time.sleep(5 ) await do_something()async def good (): await asyncio.sleep(5 ) await do_something()
time.sleep 让 OS 挂起整个线程 ,事件循环也在这个线程里——所以它也动不了。
asyncio.sleep 注册一个定时器回调,协程挂起,事件循环去干别的事。5 秒后定时器到期,事件循环把协程加回就绪队列。
1.4 await asyncio.sleep(0) 不是”等 0 秒” 它是主动让出 ——告诉事件循环”我这一轮做完了,你去看看别人要不要跑”:
1 2 3 4 5 6 7 8 async def worker (name, steps ): for i in range (steps): print (f"[{name} ] 第 {i+1 } 步" ) await asyncio.sleep(0 )
没有 await 的协程就是普通函数,一口气跑到完——事件循环切不进来。
1.5 epoll/kqueue/select 与事件循环的关系 这三个是 OS 提供的 I/O 多路复用 API——事件循环的”闹钟”:
1 2 3 4 5 6 7 事件循环 操作系统 │ │ ├─ epoll: "帮我盯着这 1000 个 socket" │ │ │ (去跑别的协程了) │ │ │ │← "socket #42 可读了!" ────────┤
API
平台
性能
select
所有平台
O(n)
epoll
Linux
O(1)
kqueue
macOS/BSD
O(1)
Python 的 selectors 标准库封装了它们,asyncio 底层就用的它。开发时不用管,但知道存在很重要。
二、协程实战模式 2.1 生产者消费者 —— asyncio.Queue asyncio.Queue 是线程安全队列的协程版——放和取都支持 await:
1 2 3 4 5 6 7 8 9 10 11 queue = asyncio.Queue(maxsize=3 )await queue.put(item) await queue.put(None ) item = await queue.get() queue.task_done() await queue.join()
和线程版 Queue 的区别:线程版满了会抛异常或用 timeout,协程版直接 await 挂起,更自然。
结束信号的坑 :
1 2 3 4 5 for _ in range (3 ): await queue.put(None )
2.2 信号量 —— 限流 限制同时执行的协程数量,防止大并发打满资源:
1 2 3 4 5 6 7 8 9 10 sem = asyncio.Semaphore(2 ) async with sem: await do_work()await sem.acquire()await do_work() sem.release()
应用场景 :限制数据库连接数、限制 API 并发请求数。
2.3 超时控制 —— wait_for 给任意协程加超时:
1 2 3 4 5 6 try : result = await asyncio.wait_for(slow_task(), timeout=3 )except asyncio.TimeoutError: print ("超时了" )
多个协程的超时 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 try : results = await asyncio.wait_for( asyncio.gather(task1(), task2()), timeout=3 )except asyncio.TimeoutError: print ("整体超时" ) tasks = [ asyncio.wait_for(task1(), timeout=2 ), asyncio.wait_for(task2(), timeout=2 ), ] results = await asyncio.gather(*tasks, return_exceptions=True )
2.4 协程 vs 线程 vs 进程
协程
线程
进程
调度者
事件循环(用户态)
OS 内核
OS 内核
切换开销
~0.1μs
~1μs
~1ms
并发数
几十万
几千
几百
适合场景
I/O 密集型
I/O 密集型
CPU 密集型
一句话选择 :
1 2 3 CPU 密集 → 多进程 I/O 密集 → 协程(大部分 Web 服务/爬虫/API 调用都是这个) 既要又要 → 多进程 + 每个进程内协程
三、协程陷阱 3.1 共享数据需要加锁 协程是单线程的,但 读 → await → 写 这种三段式操作会有竞争:
1 2 3 4 5 6 7 8 9 10 counter = 0 async def bad (): global counter for _ in range (1000 ): temp = counter await asyncio.sleep(0 ) counter = temp + 1
加 asyncio.Lock :
1 2 3 4 5 6 7 8 9 10 11 lock = asyncio.Lock()async def safe (): global counter for _ in range (1000 ): async with lock: temp = counter await asyncio.sleep(0 ) counter = temp + 1
不需要锁的情况 :单条语句,如 list.append(item)、dict[key] = value——这些是原子的。
3.2 异常被吞 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 try : await will_fail()except ValueError: print ("捕获到了" ) task = asyncio.create_task(will_fail())await asyncio.sleep(0.1 ) task.result() results = await asyncio.gather(ok(), bad(), return_exceptions=True )for r in results: if isinstance (r, Exception): print (f"出错了: {r} " )
3.3 asyncio.run 只能调一次 一个事件循环不能嵌套另一个:
1 2 3 4 async def main (): asyncio.run(other()) asyncio.run(main())
解决方案:协程之间一律用 await,asyncio.run 只放在程序入口。
3.11+ 可以用 asyncio.Runner 解决,但日常用 await 就够了。
3.4 同步库阻塞事件循环 1 2 3 4 5 6 7 data = requests.get("https://api.example.com" ).json()async with aiohttp.ClientSession() as session: async with session.get("https://api.example.com" ) as resp: data = await resp.json()
如果必须用同步库,用 asyncio.to_thread() 扔到线程池:
1 data = await asyncio.to_thread(requests.get, "https://api.example.com" )
四、协程调试 4.1 日志大法(最实用) 1 2 3 4 5 6 7 8 import logging logging.basicConfig(level=logging.DEBUG) logging.info("开始并发请求" ) results = await asyncio.gather(*(task() for _ in range (3 )), return_exceptions=True )for i, r in enumerate (results): status = "✅ 成功" if not isinstance (r, Exception) else f"❌ 失败: {r} " logging.info(f"结果[{i} ]: {status} " )
4.2 给 Task 起名 1 2 3 4 5 task = asyncio.create_task(work(), name="任务-A" ) current = asyncio.current_task()print (current.get_name())
没有命名的话 Task 默认叫 Task-1、Task-2……出错了分不清是谁。
4.3 查看所有活跃的 Task 1 2 for t in asyncio.all_tasks(): print (f"活跃 Task: {t.get_name()} ,完成: {t.done()} " )
常见场景 ——程序卡住了,抓肇事者:
1 2 3 4 unfinished = [t for t in asyncio.all_tasks() if not t.done()]for t in unfinished: print (f"卡住的 Task: {t.get_name()} " ) t.cancel()
4.4 Task 状态检查 1 2 3 4 5 6 7 task = asyncio.create_task(will_fail())await asyncio.sleep(0.1 ) task.done() task.cancelled() task.exception() task.result()
4.5 总结:协程调试三板斧 1 2 3 1. 日志大法 → 看清执行顺序 2. Task 命名 + all_tasks() → 找到肇事者 3. 降并发到 1 → 确定是不是并发引起的问题
五、总结 从 async/await 到并发工具、从生产者消费者到信号量、从事件循环内部到调试技巧——协程的完整知识链到这里就走完了。
1 2 3 4 5 6 7 async def → await → create_task ↓ gather / wait / Queue / Semaphore ↓ 事件循环 + epoll 调度 ↓ 调试:日志 + Task 命名
协程的核心就是让等待不阻塞 。等你真正用协程写过几个实际项目后,这些概念会变成肌肉记忆,就像现在你写 for 循环一样自然。