跳转至

Redis Streams 深度剖析

为什么选择 Redis Streams

  • 持久化: 消息持久化到磁盘,不丢失
  • 消费组: 原生支持 consumer group,实现竞争消费
  • 范围查询: 支持按 ID 范围读取历史消息
  • ACK 机制: 支持消息确认,未 ACK 消息会自动重投

Redis Key 模式

Key 模式 用途
byai_gateway:ctrl:agent_type:{agent_type} Agent type 控制流
byai_gateway:ctrl:worker:{worker_id} Worker 定向控制流
queue:data:stream 会话数据流
worker:active:{worker_id} Worker 心跳注册

Consumer Group 语义

生产路径 (Agent Type Stream)

Client → byai_gateway:ctrl:agent_type:{agent_type}
                    同一 agent type 的多个 Worker
                    通过 Redis consumer group 竞争消费
  • 发送前只检查"这个 agent type 是否至少存在一个 online Worker"
  • 不会在发送前预先挑选某个具体 Worker

调试路径 (Worker Stream)

Client → byai_gateway:ctrl:worker:{worker_id}
              只发往指定的 Worker
  • 当显式提供 target_worker_id 时使用此路径
  • 用于 debug、定向下发或 worker 级控制命令

消息确认机制

Worker 使用 XACK 确认消息处理完成:

  • 成功处理 → XACK → 消息从 PEL 中移除
  • 处理失败 → 不 ACK → 消息自动重投(由 XREADGROUPBLOCK 参数控制)

消费者组配置

run_worker(
    consumer_group="agent_engines",  # 消费者组名
    max_concurrency=50,              # 最大并发
    fetch_count=10,                  # 批量获取数量
)