跳转至

常见问题

快速问题

Q: 如何保证任务不丢失?

A: Redis Streams 提供持久化机制。Worker 使用 XACK 确认消息处理完成,未确认的消息会被重新投递。

Q: 如何实现 Worker 负载均衡?

A: 多个 Worker 连接同一个 Redis Stream,Redis 会自动在消费者组内进行负载分配。

Q: 如何横向扩展 Worker?

A: 启动多个不同 worker_id 的 Worker 进程,共享同一个 Redis 实例和相同的 target_agent_type stream。

Q: 消息处理失败后会自动重试吗?

A: 是的,未 ACK 的消息会在重新读取时被重新投递。但建议在 process_command 中实现重试逻辑。

高级问题

Q: 如何实现定向消息发送?

A: 传入 target_worker_id 参数,消息会写入 worker:{worker_id} stream:

response = await client.send_message(
    target_agent_type="my_agent",
    session_id="sess_123",
    content="hello",
    target_worker_id="specific-worker-01",  # 定向发送
)

Q: 如何处理人机交互场景?

A: 使用 context.ask_user() 挂起任务,等待用户回复:

result = await context.ask_user(
    AskUserEvent(prompt="请确认")
)
# 任务会挂起,用户回复后以 ResumeCommand 形式恢复

Q: 如何监控 Worker 状态?

A: 使用 WorkerRegistry.get_online_workers()

workers = await registry.get_online_workers(agent_type="my_agent")
for worker in workers:
    print(f"Worker: {worker['worker_id']}, Last seen: {worker['last_heartbeat']}")

Q: 如何配置插件超时?

run_worker(
    worker_class=MyWorker,
    plugin_hook_timeout_seconds=30,  # 插件钩子超时 30 秒
)

Q: 如何使用服务发现?

from by_framework.core.discovery import ServiceRegistry

registry = ServiceRegistry(redis_client=redis)
await registry.register("my-service", "http://192.168.1.100:8080")
await registry.heartbeat("my-service", "http://192.168.1.100:8080")