Worker 模块¶
核心文件¶
src/by_framework/worker/worker.py- GatewayWorker 基类src/by_framework/worker/runner.py- Worker 运行循环src/by_framework/worker/processor.py- 任务处理器src/by_framework/worker/app.py- 启动入口
GatewayWorker¶
class GatewayWorker:
"""Abstract base class for Gateway Workers."""
async def process_command(self, command, context: AgentContext) -> Any:
"""Process incoming command. Must be implemented by subclass."""
raise NotImplementedError
def get_agent_types(self) -> List[str]:
"""Return list of agent types this worker can handle."""
raise NotImplementedError
async def get_capabilities(self) -> List[WorkerCapability]:
"""Return list of worker capabilities."""
return []
Runner 双循环架构¶
WorkerRunner 采用双循环设计:
_control_loop¶
负责: - Worker 注册/注销 - 心跳维护 - 优雅退出信号处理 - 任务状态同步
_run_once¶
负责: - 从 Redis Stream 批量拉取消息 - 并发处理任务 - 错误重试与死信处理