Worker 开发指南¶
GatewayWorker 基类¶
GatewayWorker 是所有自定义 Worker 的基类,你需要实现以下方法:
| 方法 | 是否必须 | 描述 |
|---|---|---|
get_agent_types() |
是 | 返回此 Worker 能处理的 Agent 类型列表 |
process_command(command, context) |
是 | 处理具体的业务逻辑 |
AgentContext 上下文¶
AgentContext 提供了与运行环境交互的能力:
from by_framework import AgentContext, ArtifactEvent
async def process_command(self, command, context: AgentContext):
# 1. 发送流式输出
await context.emit_chunk("正在处理...")
# 2. 发送产物/结构化数据
await context.emit_artifact(ArtifactEvent(url="https://example.com/result.json"))
# 3. 获取消息 ID 和会话 ID
msg_id = context.message_id
session_id = context.session_id
# 4. 调用其他 Agent (支持挂起当前任务等待返回)
result = await context.call_agent(
target_agent_type="translator_agent",
content="Hello",
wait_for_reply=True
)
AgentContext API¶
| 方法 | 描述 |
|---|---|
emit_chunk() |
发送流式文本片段 |
emit_state() |
发送状态更新事件 |
emit_artifact() |
发送产物/附件事件 |
ask_user() |
向用户发送等待输入请求 |
call_agent() |
调用其他 Agent |
dispatch_group() |
分发任务组 |
get_active_workers() |
获取集群中所有活跃的 worker |
完整示例¶
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class StreamingAgent(GatewayWorker):
def get_agent_types(self):
return ["streaming_demo"]
async def process_command(self, command, context: AgentContext):
text = "这是一段流式输出的示例文本。"
for char in text:
await context.emit_chunk(char)
await asyncio.sleep(0.05)
return {"status": "done"}
if __name__ == "__main__":
run_worker(
worker_class=StreamingAgent,
worker_id="streaming-worker-01",
redis_host="localhost",
redis_port=6379,
)
进阶能力¶
人机交互型流程¶
Worker 可以通过 context.ask_user(...) 挂起执行并等待用户输入。用户回复回来后,会以 ResumeCommand 的形式重新进入同一个 Worker。
from by_framework import AgentContext, AskUserEvent, GatewayWorker, ResumeCommand
class ApprovalAgent(GatewayWorker):
def get_agent_types(self):
return ["approval_agent"]
async def process_command(self, command, context: AgentContext):
if isinstance(command, ResumeCommand):
await context.emit_chunk(f"用户回复: {command.content}")
return {"status": "completed"}
return await context.ask_user(
AskUserEvent(prompt="请确认部署窗口。")
)
Scatter-Gather 分发¶
dispatch_group(...) 可以一次分发多个子任务: