Worker API
GatewayWorker
class GatewayWorker:
"""Abstract base class for Gateway Workers."""
async def process_command(self, command, context: AgentContext) -> Any:
"""Process incoming command. Must be implemented by subclass."""
raise NotImplementedError
def get_agent_types(self) -> List[str]:
"""Return list of agent types this worker can handle."""
raise NotImplementedError
async def get_capabilities(self) -> List[WorkerCapability]:
"""Return list of worker capabilities."""
return []
run_worker
def run_worker(
worker_class: Type[GatewayWorker],
worker_id: str = "worker-1",
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0,
redis_password: Optional[str] = None,
redis_username: Optional[str] = None,
workspace_dir: str = "/tmp/gateway-workspace",
consumer_group: str = "agent_engines",
max_concurrency: int = 50,
fetch_count: int = 10,
redis_max_connections: Optional[int] = None,
plugin_list: Optional[List[Plugin]] = None,
plugin_configurator: Optional[Callable] = None,
plugin_hook_timeout_seconds: Optional[float] = None,
plugin_log_hook_stats_on_shutdown: bool = True,
plugin_dir: Optional[str] = None,
) -> None: