Redis Streams 深度剖析¶
为什么选择 Redis Streams¶
- 持久化: 消息持久化到磁盘,不丢失
- 消费组: 原生支持 consumer group,实现竞争消费
- 范围查询: 支持按 ID 范围读取历史消息
- ACK 机制: 支持消息确认,未 ACK 消息会自动重投
Redis Key 模式¶
| Key 模式 | 用途 |
|---|---|
byai_gateway:ctrl:agent_type:{agent_type} |
Agent type 控制流 |
byai_gateway:ctrl:worker:{worker_id} |
Worker 定向控制流 |
queue:data:stream |
会话数据流 |
worker:active:{worker_id} |
Worker 心跳注册 |
Consumer Group 语义¶
生产路径 (Agent Type Stream)¶
Client → byai_gateway:ctrl:agent_type:{agent_type}
↓
同一 agent type 的多个 Worker
通过 Redis consumer group 竞争消费
- 发送前只检查"这个 agent type 是否至少存在一个 online Worker"
- 不会在发送前预先挑选某个具体 Worker
调试路径 (Worker Stream)¶
- 当显式提供
target_worker_id时使用此路径 - 用于 debug、定向下发或 worker 级控制命令
消息确认机制¶
Worker 使用 XACK 确认消息处理完成:
- 成功处理 →
XACK→ 消息从 PEL 中移除 - 处理失败 → 不 ACK → 消息自动重投(由
XREADGROUP的BLOCK参数控制)