跳转至

多 Agent 协作示例

架构说明

┌─────────────────────┐     ┌─────────────────────┐
│  Orchestrator       │     │   Sub-Worker        │
│  (协调层进程)        │     │   (执行层进程)       │
│                     │     │                     │
│  - LLM 决策         │────▶│  - 文本反转         │
│  - 调用子 Agent     │◀────│  - 计算任务         │
└─────────────────────┘     └─────────────────────┘
           │                          │
           └────────── Redis ─────────┘

Orchestrator Agent

import os
from typing import Annotated, Any, List
from by_framework.worker import (
    ByaiAgentContext,
    ByaiAskAgentCommand,
    ByaiResumeCommand,
    ByaiWorker,
    run_worker,
)
from langchain_core.tools import tool, InjectedToolCallId
from langgraph.types import interrupt

class OrchestratorWorker(ByaiWorker):
    """诗歌创作协调员:通过远程工具调用调度诗人、翻译、评论专家"""

    def get_agent_types(self) -> List[str]:
        return ["orchestrator-agent"]

    def _make_remote_tool(self, context, tool_name: str, target_agent_type: str, description: str):
        """工厂方法:生成远程调用工具,核心模式 dispatch + interrupt"""

        @tool(tool_name, description=description)
        async def remote_tool(topic: str, tool_call_id: Annotated[str, InjectedToolCallId]):
            # Redis 幂等防抖:checkpoint 恢复时不重复派发
            redis_key = f"dispatched_task:{context.session_id}:{tool_call_id}"
            is_dispatched = await context.redis.exists(redis_key)

            if not is_dispatched:
                await context.emit_chunk(f"🎨 已调度专家 {target_agent_type}...")
                await context.call_agent(
                    target_agent_type=target_agent_type,
                    content=topic,
                )
                await context.redis.set(redis_key, 1, ex=86400)

            # 正统 LangGraph 中断:挂起等待 ResumeCommand 唤醒
            result = interrupt(f"Waiting for {target_agent_type}")
            return f"专家回复:\n{result}"

        return remote_tool

    async def process_command(
        self,
        command: ByaiAskAgentCommand | ByaiResumeCommand,
        context: ByaiAgentContext,
    ) -> Any:
        from langgraph.types import Command

        if isinstance(command, ByaiAskAgentCommand):
            await context.emit_chunk("✍️ 开始处理您的需求...")
            # ... 构建 LangGraph 并执行
            return "Tasks dispatched"

        if isinstance(command, ByaiResumeCommand):
            # 获取子 Agent 返回结果,继续执行
            resume_data = str(command.reply_data) if hasattr(command, "reply_data") else ""
            # 携带结果唤醒 LangGraph
            final = await graph.ainvoke(Command(resume=resume_data), config=config)
            return final["messages"][-1].content

        raise TypeError(f"Unsupported command type: {type(command)!r}")

Sub-Worker (执行节点)

import os
from by_framework.worker import ByaiWorker, ByaiAskAgentCommand, ByaiResumeCommand, run_worker

class SubWorker(ByaiWorker):
    """纯粹的计算节点,负责执行具体任务"""

    def get_agent_types(self) -> List[str]:
        return ["poet-agent", "translator-agent", "critic-agent"]

    async def process_command(self, command, context):
        text = str(command.content)

        # 根据 agent_type 执行不同任务
        if "poet" in context.current_agent_id:
            result = self._generate_poem(text)
        elif "translator" in context.current_agent_id:
            result = self._translate(text)
        else:
            result = f"处理完成: {text}"

        await context.emit_chunk(result)
        return {"status": "success", "result": result}

    def _generate_poem(self, topic: str) -> str:
        return f"诗篇:关于 {topic} 的美丽诗行"

    def _translate(self, text: str) -> str:
        return f"[EN] {text}"

启动方式

# Terminal 1: 启动执行层
uv run python sub_worker.py

# Terminal 2: 启动协调层
uv run python orchestrator.py

观察点

  • Orchestrator 终端:显示 🎨 [Orchestrator] 已调度专家
  • SubWorker 终端:显示 🔨 接收到计算任务
  • SubWorker 完成后,Orchestrator 收到 ResumeCommand 并输出最终结果