跳转至

基础示例

简单的 Echo Worker

Worker 实现

import os
from by_framework import run_worker

class EchoWorker:
    def get_agent_types(self):
        return ["echo_agent"]

    async def process_command(self, command, context):
        user_input = (
            command.content
            if isinstance(command.content, str)
            else str(command.content)
        )
        await context.emit_chunk(f"收到: {user_input}")
        return {"status": "success", "echo": user_input}

if __name__ == "__main__":
    run_worker(
        EchoWorker,
        worker_id="echo-worker-1",
        redis_host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
        redis_port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
    )

发送任务客户端

import asyncio
import os
from by_framework import ByaiGatewayClient, WorkerRegistry, init_redis, close_redis

async def send():
    redis = init_redis(
        host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
        port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
    )
    registry = WorkerRegistry(redis_client=redis)
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="echo_agent",
        session_id="session-001",
        content="Hello, World!",
    )

    print(f"Success: {response.success}")
    if response.message_id:
        print(f"Message ID: {response.message_id}")

    await close_redis()

asyncio.run(send())

带状态的 Worker

import asyncio
from by_framework import run_worker

class StatefulWorker:
    def get_agent_types(self):
        return ["stateful_agent"]

    async def process_command(self, command, context):
        # 发送开始状态
        await context.emit_state("idle")
        await context.emit_chunk("开始处理...\n")

        # 处理中
        await context.emit_state("processing")
        await asyncio.sleep(1)

        # 完成
        await context.emit_state("completed")
        await context.emit_chunk("处理完成!")

        return {"status": "success"}

启动多个 Worker

在不同的终端启动多个 Worker 实例,实现负载均衡:

# Terminal 1
uv run python echo_worker.py --worker-id echo-worker-1

# Terminal 2
uv run python echo_worker.py --worker-id echo-worker-2

多个 Worker 连接同一个 Redis Stream,Redis 会自动进行负载分配。