跳转至

Worker API

GatewayWorker

class GatewayWorker:
    """Abstract base class for Gateway Workers."""

    async def process_command(self, command, context: AgentContext) -> Any:
        """Process incoming command. Must be implemented by subclass."""
        raise NotImplementedError

    def get_agent_types(self) -> List[str]:
        """Return list of agent types this worker can handle."""
        raise NotImplementedError

    async def get_capabilities(self) -> List[WorkerCapability]:
        """Return list of worker capabilities."""
        return []

run_worker

def run_worker(
    worker_class: Type[GatewayWorker],
    worker_id: str = "worker-1",
    redis_host: str = "localhost",
    redis_port: int = 6379,
    redis_db: int = 0,
    redis_password: Optional[str] = None,
    redis_username: Optional[str] = None,
    workspace_dir: str = "/tmp/gateway-workspace",
    consumer_group: str = "agent_engines",
    max_concurrency: int = 50,
    fetch_count: int = 10,
    redis_max_connections: Optional[int] = None,
    plugin_list: Optional[List[Plugin]] = None,
    plugin_configurator: Optional[Callable] = None,
    plugin_hook_timeout_seconds: Optional[float] = None,
    plugin_log_hook_stats_on_shutdown: bool = True,
    plugin_dir: Optional[str] = None,
) -> None: