Worker 模块¶
核心文件¶
src/by_framework/worker/worker.py- GatewayWorker 基类src/by_framework/worker/runner.py- Worker 运行循环src/by_framework/worker/processor.py- 任务处理器src/by_framework/worker/app.py- 启动入口
worker/GatewayWorker.java- GatewayWorker 基类worker/WorkerRunner.java- Worker 运行循环worker/AgentContext.java- 运行时上下文worker/ExecutionTracker.java- 任务追踪器
src/worker.ts- GatewayWorker 基类src/runner.ts- Worker 运行循环src/processor.ts- 任务处理器src/app.ts- 启动入口(runWorker)
GatewayWorker¶
class GatewayWorker:
"""Abstract base class for Gateway Workers."""
async def process_command(self, command, context: AgentContext) -> Any:
"""Process incoming command. Must be implemented by subclass."""
raise NotImplementedError
def get_agent_types(self) -> List[str]:
"""Return list of agent types this worker can handle."""
raise NotImplementedError
async def get_capabilities(self) -> List[WorkerCapability]:
"""Return list of worker capabilities."""
return []
public abstract class GatewayWorker {
protected final String workerId;
public GatewayWorker(String workerId) {
this.workerId = workerId;
}
/** 返回此 Worker 能处理的 Agent 类型列表 */
public abstract List<String> getAgentTypes();
/** 处理传入的命令,必须由子类实现 */
public abstract Object processCommand(GatewayCommand command, AgentContext context);
}
Runner 双循环架构¶
WorkerRunner 采用双循环设计(各语言实现细节略有不同,但核心逻辑一致):
控制循环¶
负责: - Worker 注册/注销 - 心跳维护 - 优雅退出信号处理 - 任务状态同步
消息处理¶
负责: - 从 Redis Stream 批量拉取消息 - 并发处理任务(Python: asyncio.gather / Java: ExecutorService / TypeScript: Promise.all) - 错误重试与死信处理