本文完整代码:github.com/SumengQAQ/pythonic-design-patterns


一、故事:从一把锁开始

想象你开了一个社区网站。用户注册后需要:

  1. 发一封欢迎邮件(耗时 2 秒)
  2. 发一张新人优惠券(耗时 1 秒)
  3. 记录注册行为到分析系统(耗时 0.5 秒)

最粗暴的写法:

1
2
3
4
5
def register_user(name, email):
save_to_db(name, email)
send_email(name, email)
issue_coupon(name)
track_analytics(name)

一个注册请求卡 3.5 秒——用户等得想摔键盘。

二、第一次进化:异步 + 并发

用协程让三个操作同时跑:

1
2
3
4
5
6
7
async def register_user(name, email):
user = await save_to_db(name, email)
await asyncio.gather(
send_email(user),
issue_coupon(user),
track_analytics(user),
)

好了一些,但这把业务逻辑硬编码在注册流程里了。每加一个新功能就要改 register_user 核心函数。

所以我们需要事件驱动架构

三、事件驱动的核心思想

“发生了某事”和”怎么处理它”分开。

注册成功只管发一个 UserRegisteredEvent,不关心谁在处理。加功能就加新处理器——不碰原来的代码。

四、版本一:中介拓扑(Mediator Topology)

4.1 设计思路

一个中心化的 EventBus 管理事件和处理器之间的映射。

1
2
3
发布者 → EventBus(中心调度器) → 处理器 A
→ 处理器 B
→ 处理器 C

4.2 定义领域对象

1
2
3
4
5
@dataclass
class User:
id: str
name: str
email: str

4.3 设计 BaseEvent 与 init_subclass

处理器需要限流——同一事件不能无限制并发。但限流是事件级别的,不是处理器级别的。三个处理器应该共享同一个事件类型的信号量。

最自然的做法:让每个事件子类在定义时自动获得一个独立的信号量。

1
2
3
4
5
6
@dataclass
class BaseEvent:
_semaphore: ClassVar[asyncio.Semaphore]

def __init_subclass__(cls, semaphore_num: int = 3) -> None:
cls._semaphore = asyncio.Semaphore(semaphore_num)

__init_subclass__ 是 Python 3.6(PEP 487)引入的特性——当一个类继承了 BaseEvent 时,__init_subclass__ 自动调用。这里的作用是:每个事件子类被定义时自动创建自己的信号量。

1
2
3
@dataclass
class UserRegisteredEvent(BaseEvent, semaphore_num=3):
user: User

4.4 装饰器注册到 EventBus

1
2
3
4
5
6
7
8
@classmethod
def register(cls, func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with cls._semaphore:
await func(*args, **kwargs)
EventBus.register(cls, wrapper)
return wrapper

register 做了两件事:

  1. cls._semaphore 包装原函数——执行时获取信号量
  2. 把包装后的函数注册到 EventBus
1
2
3
4
@UserRegisteredEvent.register
async def email_notifier(event: UserRegisteredEvent):
await asyncio.sleep(2)
print(f"📧 发送邮件给 {event.user.name}({event.user.email})")

4.5 EventBus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class EventBus:
_events: dict[Type[BaseEvent], list[Callable]] = defaultdict(list)

@classmethod
def register(cls, event, callback):
cls._events[event].append(callback)

@classmethod
async def emit(cls, event):
tasks = [asyncio.create_task(callback(event))
for callback in cls._events[type(event)]]
results = await asyncio.gather(*tasks, return_exceptions=True)
exceptions = [r for r in results if r]
if exceptions:
await cls.emit(ErrorEvent(exceptions))

emitgather 并发执行所有处理器,return_exceptions=True 捕获异常——如果有处理器挂了,触发新的 ErrorEvent

4.6 错误也是事件

1
2
3
4
5
6
7
8
9
@dataclass
class ErrorEvent(BaseEvent, semaphore_num=10):
exceptions: list[BaseException]

@ErrorEvent.register
async def error_handler(event: ErrorEvent):
for exception in event.exceptions:
await asyncio.sleep(0.5)
print(f"❌ {type(exception).__name__}{exception}")

错误本身也是一个事件。EventBus 发现 gather 返回了异常时,自动 emit ErrorEvent。错误处理和其他业务逻辑在架构级别是对等的。

4.7 运行

1
2
3
4
5
# 输出:
# 📊 记录注册事件:塑梦(123456)
# 🎉 发放欢迎优惠券给 塑梦
# 📧 发送邮件给 塑梦([email protected])
# ⏱ 耗时 2.01 秒

三个处理器并发执行,总耗时 ≈ 最慢的那个(2 秒)。

4.8 中介拓扑的局限

EventBus 直接调处理器,没有缓冲区。瞬间来 10000 个事件就创建 10000×3 个 Task。

五、版本二:代理拓扑(Broker Topology)

5.1 设计思路

在发布者和处理器之间加一层消息队列解耦。

1
2
发布者 → 队列 → Worker 1 → 处理器 A,B,C
→ Worker 2 → 处理器 A,B,C

5.2 注册目标变了

中介拓扑里,装饰器注册到 EventBus。代理拓扑里,装饰器注册到全局字典 EVENT_CALLBACK

1
EVENT_CALLBACK[cls].append(wrapper)

因为代理拓扑中没有中心 EventBus 了。事件不经过 EventBus 调度,直接入队列。但”声明处理器”的机制应该保留——所以把注册目标从 EventBus 改为一个纯粹的注册表,Worker 再去注册表里读取处理器列表。

5.3 Worker 与处理器注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class UserRegisteredWorker:
handlers: list[Callable] = []

@classmethod
def set_handlers(cls, handlers):
cls.handlers = handlers
return cls

@classmethod
async def worker(cls, queue: asyncio.Queue):
while True:
event = await queue.get()
if event is None: break
await asyncio.gather(*[h(event) for h in cls.handlers])

Worker 只做两件事:从队列拿事件,然后对 handlers 里的每个函数并发执行。

handlers 列表在 main() 里通过 set_handlers 注入——控制反转。

5.4 结束信号的坑

1
2
3
4
@classmethod
async def close(cls, event_type, worker_num):
for _ in range(worker_num):
await cls.event_queues[event_type].put(None)

结束信号的数量必须等于 Worker 数——少放一个就有一个 Worker 永远在等 queue.get(),程序卡死。

5.5 运行

1
2
3
4
5
6
7
8
# 输出:
# 📊 记录注册事件:塑梦(123456)
# 🎉 发放欢迎优惠券给 塑梦
# 📧 发送邮件给 塑梦([email protected])
# 📊 记录注册事件:虚幻(789012)
# 🎉 发放欢迎优惠券给 虚幻
# 📧 发送邮件给 虚幻([email protected])
# ⏱ 耗时 2.50 秒

六、对比

维度 中介拓扑 代理拓扑
中心组件 EventBus 消息队列
注册目标 EventBus._events EVENT_CALLBACK 字典
限流机制 __init_subclass__ + 信号量 同左
削峰能力 有(队列缓冲)
容错性 ErrorEvent 兜底 Worker 独立,互不影响
代码量 ~50 行核心 ~80 行核心

一句话:不需要削峰时中介拓扑够了。需要高可靠性和解耦时上代理拓扑。

七、协程是基础

发邮件等网络、发券查数据库、埋点写日志——都涉及 I/O 等待。

协程的 await 在等待时让出控制权,事件循环去跑别的处理器。1000 个并发 = 1000 个协程 = 1 个线程,没有线程切换开销。

八、别以为很简单

但实际上这只是最最最核心的骨架。生产级的事件驱动系统还要补一堆东西:

1
2
3
4
# 现在的代码:事件 → 处理 → 完事
# 生产级的:事件 → 反序列化 → 校验 → 幂等检查 →
# 处理 → 重试(3次) → 死信队列 → 报警 →
# 回滚补偿 → 分布式事务 → 链路追踪

说几个让人崩溃的场景:

  • 处理器挂了怎么办?要不要重试?
  • 重试还失败怎么办?要不要死信队列?
  • 服务重启了,队列里的事件丢了怎么办?
  • 同一个事件被重复投递,幂等性怎么保证?
  • 邮件发出去了但优惠券发放失败了,怎么回滚?
  • 跨三个服务追踪一个事件的完整链路,出问题了怎么查?

但没有一步到位的架构。先跑通核心流程,量大了再加削峰,出问题了再加重试,搞崩了再加回滚——架构从来不是设计出来的,是事故堆出来的 (´;ω;`)

九、完整源代码

完整代码见 GitHub 仓库,包含中介拓扑和代理拓扑两个版本的完整实现。


下一篇预告:从事件驱动到 CQRS——读写分离的事件溯源架构