常见问题¶
快速问题¶
Q: 如何保证任务不丢失?¶
A: Redis Streams 提供持久化机制。Worker 使用 XACK 确认消息处理完成,未确认的消息会被重新投递。
Q: 如何实现 Worker 负载均衡?¶
A: 多个 Worker 连接同一个 Redis Stream,Redis 会自动在消费者组内进行负载分配。
Q: 如何横向扩展 Worker?¶
A: 启动多个不同 worker_id 的 Worker 进程,共享同一个 Redis 实例和相同的 target_agent_type stream。
Q: 消息处理失败后会自动重试吗?¶
A: 是的,未 ACK 的消息会在重新读取时被重新投递。但建议在 process_command 中实现重试逻辑。
高级问题¶
Q: 如何实现定向消息发送?¶
A: 传入 target_worker_id 参数,消息会写入 worker:{worker_id} stream:
response = await client.send_message(
target_agent_type="my_agent",
session_id="sess_123",
content="hello",
target_worker_id="specific-worker-01", # 定向发送
)
Q: 如何处理人机交互场景?¶
A: 使用 context.ask_user() 挂起任务,等待用户回复:
Q: 如何监控 Worker 状态?¶
A: 使用 WorkerRegistry.get_online_workers():
workers = await registry.get_online_workers(agent_type="my_agent")
for worker in workers:
print(f"Worker: {worker['worker_id']}, Last seen: {worker['last_heartbeat']}")