跳转至

快速入门

环境要求

  • Python 3.12+
  • Redis 7.0+

安装

# 克隆项目
cd by-framework-python

# 使用 uv 安装(推荐)
uv sync

# 或使用 pip
pip install by-framework

启动 Redis

docker run -d -p 6379:6379 redis:7-alpine

创建第一个 Worker

创建 my_agent.py:

import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class MyAssistant(GatewayWorker):
    def get_agent_types(self):
        # 声明此 Worker 能够处理的 Agent 类型
        return ["weather_agent", "chat_agent"]

    async def process_command(self, command, context: AgentContext):
        # 发送流式文本片段
        await context.emit_chunk("正在处理您的请求...\n")

        # 模拟耗时操作
        await asyncio.sleep(0.5)

        # 更新任务状态
        await context.emit_state("thinking")

        # 从 command 的 content 中读取请求内容
        user_input = (
            command.content if isinstance(command.content, str) else str(command.content)
        )

        # 发送思考过程
        await context.emit_chunk(f"我收到了: {user_input}\n")
        await asyncio.sleep(0.3)

        # 发送最终结果
        await context.emit_chunk("这是我的回复!")

        return {
            "status": "success",
            "message": "任务完成",
            "data": {"answer": "今天天气晴朗"}
        }

if __name__ == "__main__":
    run_worker(
        worker_class=MyAssistant,
        worker_id="worker-01",
        redis_host="localhost",
        redis_port=6379,
    )

启动 Worker:

uv run python my_agent.py

发送测试任务

创建 send_task.py:

import asyncio
from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis

async def send_task():
    redis = init_redis(host="localhost", port=6379)
    registry = WorkerRegistry(redis_client=redis)
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="weather_agent",
        session_id="session-001",
        content="今天北京天气怎么样?",
    )

    if response.success:
        print(f"任务已发送,消息 ID: {response.message_id}")
    else:
        print(f"发送失败: {response.error}")

    await close_redis()

asyncio.run(send_task())

运行:

uv run python send_task.py

下一步