跳转至

Worker 模块

核心文件

  • src/by_framework/worker/worker.py - GatewayWorker 基类
  • src/by_framework/worker/runner.py - Worker 运行循环
  • src/by_framework/worker/processor.py - 任务处理器
  • src/by_framework/worker/app.py - 启动入口

GatewayWorker

class GatewayWorker:
    """Abstract base class for Gateway Workers."""

    async def process_command(self, command, context: AgentContext) -> Any:
        """Process incoming command. Must be implemented by subclass."""
        raise NotImplementedError

    def get_agent_types(self) -> List[str]:
        """Return list of agent types this worker can handle."""
        raise NotImplementedError

    async def get_capabilities(self) -> List[WorkerCapability]:
        """Return list of worker capabilities."""
        return []

Runner 双循环架构

WorkerRunner 采用双循环设计:

_control_loop          # 控制循环 - 管理生命周期、心跳、状态
_run_once              # 单次执行 - 批量拉取并处理消息

_control_loop

负责: - Worker 注册/注销 - 心跳维护 - 优雅退出信号处理 - 任务状态同步

_run_once

负责: - 从 Redis Stream 批量拉取消息 - 并发处理任务 - 错误重试与死信处理

任务处理流程

async def _run_once(self) -> None:
    # 1. 批量拉取消息
    messages = await self._fetch_messages()

    # 2. 并发处理
    tasks = [
        self._process_single(message)
        for message in messages
    ]
    await asyncio.gather(*tasks, return_exceptions=True)

    # 3. ACK 已处理消息
    for message in messages:
        await self._ack_message(message)