基础示例¶
简单的 Echo Worker¶
Worker 实现¶
import os
from by_framework import run_worker
class EchoWorker:
def get_agent_types(self):
return ["echo_agent"]
async def process_command(self, command, context):
user_input = (
command.content
if isinstance(command.content, str)
else str(command.content)
)
await context.emit_chunk(f"收到: {user_input}")
return {"status": "success", "echo": user_input}
if __name__ == "__main__":
run_worker(
EchoWorker,
worker_id="echo-worker-1",
redis_host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
redis_port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
)
发送任务客户端¶
import asyncio
import os
from by_framework import ByaiGatewayClient, WorkerRegistry, init_redis, close_redis
async def send():
redis = init_redis(
host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
)
registry = WorkerRegistry(redis_client=redis)
client = ByaiGatewayClient(redis_client=redis, registry=registry)
response = await client.send_message(
target_agent_type="echo_agent",
session_id="session-001",
content="Hello, World!",
)
print(f"Success: {response.success}")
if response.message_id:
print(f"Message ID: {response.message_id}")
await close_redis()
asyncio.run(send())
带状态的 Worker¶
import asyncio
from by_framework import run_worker
class StatefulWorker:
def get_agent_types(self):
return ["stateful_agent"]
async def process_command(self, command, context):
# 发送开始状态
await context.emit_state("idle")
await context.emit_chunk("开始处理...\n")
# 处理中
await context.emit_state("processing")
await asyncio.sleep(1)
# 完成
await context.emit_state("completed")
await context.emit_chunk("处理完成!")
return {"status": "success"}
启动多个 Worker¶
在不同的终端启动多个 Worker 实例,实现负载均衡:
# Terminal 1
uv run python echo_worker.py --worker-id echo-worker-1
# Terminal 2
uv run python echo_worker.py --worker-id echo-worker-2
多个 Worker 连接同一个 Redis Stream,Redis 会自动进行负载分配。