本文完整代码:github.com/SumengQAQ/pythonic-design-patterns
一、故事:从一把锁开始 想象你开了一个社区网站。用户注册后需要:
发一封欢迎邮件(耗时 2 秒)
发一张新人优惠券(耗时 1 秒)
记录注册行为到分析系统(耗时 0.5 秒)
最粗暴的写法:
1 2 3 4 5 def register_user (name, email ): save_to_db(name, email) send_email(name, email) issue_coupon(name) track_analytics(name)
一个注册请求卡 3.5 秒——用户等得想摔键盘。
二、第一次进化:异步 + 并发 用协程让三个操作同时跑:
1 2 3 4 5 6 7 async def register_user (name, email ): user = await save_to_db(name, email) await asyncio.gather( send_email(user), issue_coupon(user), track_analytics(user), )
好了一些,但这把业务逻辑硬编码在注册流程里了。每加一个新功能就要改 register_user 核心函数。
所以我们需要事件驱动架构 。
三、事件驱动的核心思想
“发生了某事”和”怎么处理它”分开。
注册成功只管发一个 UserRegisteredEvent,不关心谁在处理。加功能就加新处理器——不碰原来的代码。
4.1 设计思路 一个中心化的 EventBus 管理事件和处理器之间的映射。
1 2 3 发布者 → EventBus(中心调度器) → 处理器 A → 处理器 B → 处理器 C
4.2 定义领域对象 1 2 3 4 5 @dataclass class User : id : str name: str email: str
4.3 设计 BaseEvent 与 init_subclass 处理器需要限流——同一事件不能无限制并发。但限流是事件级别的,不是处理器级别的。三个处理器应该共享同一个事件类型的信号量。
最自然的做法:让每个事件子类在定义时自动获得一个独立的信号量。
1 2 3 4 5 6 @dataclass class BaseEvent : _semaphore: ClassVar[asyncio.Semaphore] def __init_subclass__ (cls, semaphore_num: int = 3 ) -> None : cls._semaphore = asyncio.Semaphore(semaphore_num)
__init_subclass__ 是 Python 3.6(PEP 487)引入的特性——当一个类继承了 BaseEvent 时,__init_subclass__ 自动调用。这里的作用是:每个事件子类被定义时自动创建自己的信号量。
1 2 3 @dataclass class UserRegisteredEvent (BaseEvent, semaphore_num=3 ): user: User
4.4 装饰器注册到 EventBus 1 2 3 4 5 6 7 8 @classmethod def register (cls, func: Callable ) -> Callable : @functools.wraps(func ) async def wrapper (*args, **kwargs ): async with cls._semaphore: await func(*args, **kwargs) EventBus.register(cls, wrapper) return wrapper
register 做了两件事:
用 cls._semaphore 包装原函数——执行时获取信号量
把包装后的函数注册到 EventBus
1 2 3 4 @UserRegisteredEvent.register async def email_notifier (event: UserRegisteredEvent ): await asyncio.sleep(2 ) print (f"📧 发送邮件给 {event.user.name} ({event.user.email} )" )
4.5 EventBus 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class EventBus : _events: dict [Type [BaseEvent], list [Callable ]] = defaultdict(list ) @classmethod def register (cls, event, callback ): cls._events[event].append(callback) @classmethod async def emit (cls, event ): tasks = [asyncio.create_task(callback(event)) for callback in cls._events[type (event)]] results = await asyncio.gather(*tasks, return_exceptions=True ) exceptions = [r for r in results if r] if exceptions: await cls.emit(ErrorEvent(exceptions))
emit 用 gather 并发执行所有处理器,return_exceptions=True 捕获异常——如果有处理器挂了,触发新的 ErrorEvent。
4.6 错误也是事件 1 2 3 4 5 6 7 8 9 @dataclass class ErrorEvent (BaseEvent, semaphore_num=10 ): exceptions: list [BaseException]@ErrorEvent.register async def error_handler (event: ErrorEvent ): for exception in event.exceptions: await asyncio.sleep(0.5 ) print (f"❌ {type (exception).__name__} :{exception} " )
错误本身也是一个事件。EventBus 发现 gather 返回了异常时,自动 emit ErrorEvent。错误处理和其他业务逻辑在架构级别是对等的。
4.7 运行
三个处理器并发执行,总耗时 ≈ 最慢的那个(2 秒)。
4.8 中介拓扑的局限 EventBus 直接调处理器,没有缓冲区。瞬间来 10000 个事件就创建 10000×3 个 Task。
五、版本二:代理拓扑(Broker Topology) 5.1 设计思路 在发布者和处理器之间加一层消息队列解耦。
1 2 发布者 → 队列 → Worker 1 → 处理器 A,B,C → Worker 2 → 处理器 A,B,C
5.2 注册目标变了 中介拓扑里,装饰器注册到 EventBus。代理拓扑里,装饰器注册到全局字典 EVENT_CALLBACK:
1 EVENT_CALLBACK[cls].append(wrapper)
因为代理拓扑中没有中心 EventBus 了。事件不经过 EventBus 调度,直接入队列。但”声明处理器”的机制应该保留——所以把注册目标从 EventBus 改为一个纯粹的注册表,Worker 再去注册表里读取处理器列表。
5.3 Worker 与处理器注入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 class UserRegisteredWorker : handlers: list [Callable ] = [] @classmethod def set_handlers (cls, handlers ): cls.handlers = handlers return cls @classmethod async def worker (cls, queue: asyncio.Queue ): while True : event = await queue.get() if event is None : break await asyncio.gather(*[h(event) for h in cls.handlers])
Worker 只做两件事:从队列拿事件,然后对 handlers 里的每个函数并发执行。
handlers 列表在 main() 里通过 set_handlers 注入——控制反转。
5.4 结束信号的坑 1 2 3 4 @classmethod async def close (cls, event_type, worker_num ): for _ in range (worker_num): await cls.event_queues[event_type].put(None )
结束信号的数量必须等于 Worker 数——少放一个就有一个 Worker 永远在等 queue.get(),程序卡死。
5.5 运行
六、对比
维度
中介拓扑
代理拓扑
中心组件
EventBus
消息队列
注册目标
EventBus._events
EVENT_CALLBACK 字典
限流机制
__init_subclass__ + 信号量
同左
削峰能力
无
有(队列缓冲)
容错性
ErrorEvent 兜底
Worker 独立,互不影响
代码量
~50 行核心
~80 行核心
一句话:不需要削峰时中介拓扑够了。需要高可靠性和解耦时上代理拓扑。
七、协程是基础 发邮件等网络、发券查数据库、埋点写日志——都涉及 I/O 等待。
协程的 await 在等待时让出控制权,事件循环去跑别的处理器。1000 个并发 = 1000 个协程 = 1 个线程,没有线程切换开销。
八、别以为很简单 但实际上这只是最最最核心的骨架。生产级的事件驱动系统还要补一堆东西:
说几个让人崩溃的场景:
处理器挂了怎么办?要不要重试?
重试还失败怎么办?要不要死信队列?
服务重启了,队列里的事件丢了怎么办?
同一个事件被重复投递,幂等性怎么保证?
邮件发出去了但优惠券发放失败了,怎么回滚?
跨三个服务追踪一个事件的完整链路,出问题了怎么查?
但没有一步到位的架构。先跑通核心流程,量大了再加削峰,出问题了再加重试,搞崩了再加回滚——架构从来不是设计出来的,是事故堆出来的 (´;ω;`)
九、完整源代码 完整代码见 GitHub 仓库 ,包含中介拓扑和代理拓扑两个版本的完整实现。
下一篇预告:从事件驱动到 CQRS——读写分离的事件溯源架构