多 Agent 协作示例
架构说明
┌─────────────────────┐ ┌─────────────────────┐
│ Orchestrator │ │ Sub-Worker │
│ (协调层进程) │ │ (执行层进程) │
│ │ │ │
│ - LLM 决策 │────▶│ - 文本反转 │
│ - 调用子 Agent │◀────│ - 计算任务 │
└─────────────────────┘ └─────────────────────┘
│ │
└────────── Redis ─────────┘
Orchestrator Agent
import os
from typing import Annotated, Any, List
from by_framework.worker import (
ByaiAgentContext,
ByaiAskAgentCommand,
ByaiResumeCommand,
ByaiWorker,
run_worker,
)
from langchain_core.tools import tool, InjectedToolCallId
from langgraph.types import interrupt
class OrchestratorWorker(ByaiWorker):
"""诗歌创作协调员:通过远程工具调用调度诗人、翻译、评论专家"""
def get_agent_types(self) -> List[str]:
return ["orchestrator-agent"]
def _make_remote_tool(self, context, tool_name: str, target_agent_type: str, description: str):
"""工厂方法:生成远程调用工具,核心模式 dispatch + interrupt"""
@tool(tool_name, description=description)
async def remote_tool(topic: str, tool_call_id: Annotated[str, InjectedToolCallId]):
# Redis 幂等防抖:checkpoint 恢复时不重复派发
redis_key = f"dispatched_task:{context.session_id}:{tool_call_id}"
is_dispatched = await context.redis.exists(redis_key)
if not is_dispatched:
await context.emit_chunk(f"🎨 已调度专家 {target_agent_type}...")
await context.call_agent(
target_agent_type=target_agent_type,
content=topic,
)
await context.redis.set(redis_key, 1, ex=86400)
# 正统 LangGraph 中断:挂起等待 ResumeCommand 唤醒
result = interrupt(f"Waiting for {target_agent_type}")
return f"专家回复:\n{result}"
return remote_tool
async def process_command(
self,
command: ByaiAskAgentCommand | ByaiResumeCommand,
context: ByaiAgentContext,
) -> Any:
from langgraph.types import Command
if isinstance(command, ByaiAskAgentCommand):
await context.emit_chunk("✍️ 开始处理您的需求...")
# ... 构建 LangGraph 并执行
return "Tasks dispatched"
if isinstance(command, ByaiResumeCommand):
# 获取子 Agent 返回结果,继续执行
resume_data = str(command.reply_data) if hasattr(command, "reply_data") else ""
# 携带结果唤醒 LangGraph
final = await graph.ainvoke(Command(resume=resume_data), config=config)
return final["messages"][-1].content
raise TypeError(f"Unsupported command type: {type(command)!r}")
Sub-Worker (执行节点)
import os
from by_framework.worker import ByaiWorker, ByaiAskAgentCommand, ByaiResumeCommand, run_worker
class SubWorker(ByaiWorker):
"""纯粹的计算节点,负责执行具体任务"""
def get_agent_types(self) -> List[str]:
return ["poet-agent", "translator-agent", "critic-agent"]
async def process_command(self, command, context):
text = str(command.content)
# 根据 agent_type 执行不同任务
if "poet" in context.current_agent_id:
result = self._generate_poem(text)
elif "translator" in context.current_agent_id:
result = self._translate(text)
else:
result = f"处理完成: {text}"
await context.emit_chunk(result)
return {"status": "success", "result": result}
def _generate_poem(self, topic: str) -> str:
return f"诗篇:关于 {topic} 的美丽诗行"
def _translate(self, text: str) -> str:
return f"[EN] {text}"
启动方式
# Terminal 1: 启动执行层
uv run python sub_worker.py
# Terminal 2: 启动协调层
uv run python orchestrator.py
观察点
- Orchestrator 终端:显示
🎨 [Orchestrator] 已调度专家
- SubWorker 终端:显示
🔨 接收到计算任务
- SubWorker 完成后,Orchestrator 收到 ResumeCommand 并输出最终结果