本文是协程系列的收尾篇。看完你能回答:

  1. 事件循环底层怎么工作的?
  2. asyncio.Queue / Semaphore / wait_for 怎么用?
  3. 协程之间共享数据要加锁吗?
  4. 协程怎么调试?

一、事件循环内部

1.1 事件循环 = 调度器

事件循环本质上就是一个 while True

1
2
3
4
5
6
7
8
9
# 伪代码——事件循环就这么简单
def event_loop(tasks):
ready = tasks # 就绪队列
while ready:
for task in ready:
if task.can_run():
task.step() # 执行一步,直到遇到 await
if task.done():
ready.remove(task)

每次遇到 await,协程挂起,控制权回到事件循环。事件循环去跑别的 Task,等这个协程等待的条件满足了再把它加回就绪队列。

1.2 await 的本质

await 不是”等待”,而是”让出控制权”:

1
2
3
4
5
6
7
8
result = await task

# 等价于:
# 1. 注册一个回调("我完事了叫醒我")
# 2. 挂起当前协程
# 3. 事件循环调度别的 Task
# 4. task 完成后 → 调回调 → 把协程放回就绪队列
# 5. 事件循环下次轮到你时,从 await 后面继续

1.3 time.sleep 为什么是毒药

1
2
3
4
5
6
7
async def bad():
time.sleep(5) # ← 整个事件循环卡死!
await do_something()

async def good():
await asyncio.sleep(5) # ← 挂起,事件循环去跑别的 Task
await do_something()

time.sleep 让 OS 挂起整个线程,事件循环也在这个线程里——所以它也动不了。

asyncio.sleep 注册一个定时器回调,协程挂起,事件循环去干别的事。5 秒后定时器到期,事件循环把协程加回就绪队列。

1.4 await asyncio.sleep(0) 不是”等 0 秒”

它是主动让出——告诉事件循环”我这一轮做完了,你去看看别人要不要跑”:

1
2
3
4
5
6
7
8
async def worker(name, steps):
for i in range(steps):
print(f"[{name}] 第 {i+1} 步")
await asyncio.sleep(0) # 主动让出

# 三个 worker 同时注册:
# [A] 第 1 步 → [B] 第 1 步 → [C] 第 1 步 → [A] 第 2 步 → ...
# 事件循环轮着调度三个 Task

没有 await 的协程就是普通函数,一口气跑到完——事件循环切不进来。

1.5 epoll/kqueue/select 与事件循环的关系

这三个是 OS 提供的 I/O 多路复用 API——事件循环的”闹钟”:

1
2
3
4
5
6
7
事件循环                         操作系统
│ │
├─ epoll: "帮我盯着这 1000 个 socket"
│ │
│ (去跑别的协程了) │
│ │
│← "socket #42 可读了!" ────────┤
API 平台 性能
select 所有平台 O(n)
epoll Linux O(1)
kqueue macOS/BSD O(1)

Python 的 selectors 标准库封装了它们,asyncio 底层就用的它。开发时不用管,但知道存在很重要。


二、协程实战模式

2.1 生产者消费者 —— asyncio.Queue

asyncio.Queue 是线程安全队列的协程版——放和取都支持 await

1
2
3
4
5
6
7
8
9
10
11
queue = asyncio.Queue(maxsize=3)

# 生产者
await queue.put(item) # 满了就等
await queue.put(None) # 放结束信号

# 消费者
item = await queue.get() # 空了就等
queue.task_done() # 告诉队列处理完了

await queue.join() # 等所有 task_done

和线程版 Queue 的区别:线程版满了会抛异常或用 timeout,协程版直接 await 挂起,更自然。

结束信号的坑

1
2
3
4
5
# 如果有 3 个 Worker:
for _ in range(3):
await queue.put(None) # 每个 Worker 各需要一个 None

# 少放一个 → 有一个 Worker 永远拿不到 None → 永远不退出 → 程序卡死

2.2 信号量 —— 限流

限制同时执行的协程数量,防止大并发打满资源:

1
2
3
4
5
6
7
8
9
10
sem = asyncio.Semaphore(2)     # 最多 2 个同时跑

# 推荐写法
async with sem:
await do_work()

# 手动写法
await sem.acquire()
await do_work()
sem.release()

应用场景:限制数据库连接数、限制 API 并发请求数。

2.3 超时控制 —— wait_for

给任意协程加超时:

1
2
3
4
5
6
try:
result = await asyncio.wait_for(slow_task(), timeout=3)
except asyncio.TimeoutError:
print("超时了")

# 超时时 Task 会被自动取消,不会后台泄漏

多个协程的超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 整体超时——一起不超过 3 秒
try:
results = await asyncio.wait_for(
asyncio.gather(task1(), task2()),
timeout=3
)
except asyncio.TimeoutError:
print("整体超时")

# 各自超时——每个不超过 2 秒
tasks = [
asyncio.wait_for(task1(), timeout=2),
asyncio.wait_for(task2(), timeout=2),
]
results = await asyncio.gather(*tasks, return_exceptions=True)

2.4 协程 vs 线程 vs 进程

协程 线程 进程
调度者 事件循环(用户态) OS 内核 OS 内核
切换开销 ~0.1μs ~1μs ~1ms
并发数 几十万 几千 几百
适合场景 I/O 密集型 I/O 密集型 CPU 密集型

一句话选择

1
2
3
CPU 密集 → 多进程
I/O 密集 → 协程(大部分 Web 服务/爬虫/API 调用都是这个)
既要又要 → 多进程 + 每个进程内协程

三、协程陷阱

3.1 共享数据需要加锁

协程是单线程的,但 读 → await → 写 这种三段式操作会有竞争:

1
2
3
4
5
6
7
8
9
10
counter = 0

async def bad():
global counter
for _ in range(1000):
temp = counter # 读
await asyncio.sleep(0) # 让出——另一个协程改了 counter
counter = temp + 1 # 写——用旧值覆盖

# 两个 bad() 同时跑 → counter 只到 ~1000,不是 2000

asyncio.Lock

1
2
3
4
5
6
7
8
9
10
11
lock = asyncio.Lock()

async def safe():
global counter
for _ in range(1000):
async with lock:
temp = counter
await asyncio.sleep(0) # 锁保护着,安全
counter = temp + 1

# 两个 safe() 同时跑 → counter = 2000 ✅

不需要锁的情况:单条语句,如 list.append(item)dict[key] = value——这些是原子的。

3.2 异常被吞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 直接 await → 安全
try:
await will_fail()
except ValueError:
print("捕获到了") # ✅

# create_task + 不调 result() → 异常被吞!
task = asyncio.create_task(will_fail())
await asyncio.sleep(0.1) # task 跑完了,异常被吞了
# 没有任何报错!
task.result() # 必须调 result() 才能拿到异常

# gather 需要 return_exceptions=True
results = await asyncio.gather(ok(), bad(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"出错了: {r}")

3.3 asyncio.run 只能调一次

一个事件循环不能嵌套另一个:

1
2
3
4
async def main():
asyncio.run(other()) # ← RuntimeError!

asyncio.run(main()) # ← 这是第一次

解决方案:协程之间一律用 awaitasyncio.run 只放在程序入口。

3.11+ 可以用 asyncio.Runner 解决,但日常用 await 就够了。

3.4 同步库阻塞事件循环

1
2
3
4
5
6
7
# ❌ 会阻塞
data = requests.get("https://api.example.com").json()

# ✅ 不会阻塞
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as resp:
data = await resp.json()

如果必须用同步库,用 asyncio.to_thread() 扔到线程池:

1
data = await asyncio.to_thread(requests.get, "https://api.example.com")

四、协程调试

4.1 日志大法(最实用)

1
2
3
4
5
6
7
8
import logging
logging.basicConfig(level=logging.DEBUG)

logging.info("开始并发请求")
results = await asyncio.gather(*(task() for _ in range(3)), return_exceptions=True)
for i, r in enumerate(results):
status = "✅ 成功" if not isinstance(r, Exception) else f"❌ 失败: {r}"
logging.info(f"结果[{i}]: {status}")

4.2 给 Task 起名

1
2
3
4
5
task = asyncio.create_task(work(), name="任务-A")

# 协程内部查看自己叫什么
current = asyncio.current_task()
print(current.get_name()) # "任务-A"

没有命名的话 Task 默认叫 Task-1Task-2……出错了分不清是谁。

4.3 查看所有活跃的 Task

1
2
for t in asyncio.all_tasks():
print(f"活跃 Task: {t.get_name()},完成: {t.done()}")

常见场景——程序卡住了,抓肇事者:

1
2
3
4
unfinished = [t for t in asyncio.all_tasks() if not t.done()]
for t in unfinished:
print(f"卡住的 Task: {t.get_name()}")
t.cancel()

4.4 Task 状态检查

1
2
3
4
5
6
7
task = asyncio.create_task(will_fail())
await asyncio.sleep(0.1)

task.done() # 是否已完成(正常/异常/取消都算)
task.cancelled() # 是否被取消
task.exception() # 安全获取异常(不抛出)
task.result() # 获取结果(会重新抛出异常)

4.5 总结:协程调试三板斧

1
2
3
1. 日志大法 → 看清执行顺序
2. Task 命名 + all_tasks() → 找到肇事者
3. 降并发到 1 → 确定是不是并发引起的问题

五、总结

async/await 到并发工具、从生产者消费者到信号量、从事件循环内部到调试技巧——协程的完整知识链到这里就走完了。

1
2
3
4
5
6
7
async def → await → create_task

gather / wait / Queue / Semaphore

事件循环 + epoll 调度

调试:日志 + Task 命名

协程的核心就是让等待不阻塞。等你真正用协程写过几个实际项目后,这些概念会变成肌肉记忆,就像现在你写 for 循环一样自然。