跳转至

Worker 开发指南

GatewayWorker 基类

GatewayWorker 是所有自定义 Worker 的基类,你需要实现以下方法:

方法 是否必须 描述
get_agent_types() 返回此 Worker 能处理的 Agent 类型列表
process_command(command, context) 处理具体的业务逻辑

AgentContext 上下文

AgentContext 提供了与运行环境交互的能力:

from by_framework import AgentContext, ArtifactEvent

async def process_command(self, command, context: AgentContext):
    # 1. 发送流式输出
    await context.emit_chunk("正在处理...")

    # 2. 发送产物/结构化数据
    await context.emit_artifact(ArtifactEvent(url="https://example.com/result.json"))

    # 3. 获取消息 ID 和会话 ID
    msg_id = context.message_id
    session_id = context.session_id

    # 4. 调用其他 Agent (支持挂起当前任务等待返回)
    result = await context.call_agent(
        target_agent_type="translator_agent",
        content="Hello",
        wait_for_reply=True
    )

AgentContext API

方法 描述
emit_chunk() 发送流式文本片段
emit_state() 发送状态更新事件
emit_artifact() 发送产物/附件事件
ask_user() 向用户发送等待输入请求
call_agent() 调用其他 Agent
dispatch_group() 分发任务组
get_active_workers() 获取集群中所有活跃的 worker

完整示例

import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class StreamingAgent(GatewayWorker):
    def get_agent_types(self):
        return ["streaming_demo"]

    async def process_command(self, command, context: AgentContext):
        text = "这是一段流式输出的示例文本。"

        for char in text:
            await context.emit_chunk(char)
            await asyncio.sleep(0.05)

        return {"status": "done"}

if __name__ == "__main__":
    run_worker(
        worker_class=StreamingAgent,
        worker_id="streaming-worker-01",
        redis_host="localhost",
        redis_port=6379,
    )

进阶能力

人机交互型流程

Worker 可以通过 context.ask_user(...) 挂起执行并等待用户输入。用户回复回来后,会以 ResumeCommand 的形式重新进入同一个 Worker。

from by_framework import AgentContext, AskUserEvent, GatewayWorker, ResumeCommand

class ApprovalAgent(GatewayWorker):
    def get_agent_types(self):
        return ["approval_agent"]

    async def process_command(self, command, context: AgentContext):
        if isinstance(command, ResumeCommand):
            await context.emit_chunk(f"用户回复: {command.content}")
            return {"status": "completed"}

        return await context.ask_user(
            AskUserEvent(prompt="请确认部署窗口。")
        )

Scatter-Gather 分发

dispatch_group(...) 可以一次分发多个子任务:

tasks = [
    {"target_agent_type": "researcher", "content": "收集参考资料"},
    {"target_agent_type": "writer", "content": "起草摘要"},
]

group = await context.dispatch_group(tasks, wait_for_reply=True)
results = await context.collect_group_results(group["task_group_id"])