快速入门¶
环境要求¶
- Python 3.12+
- Redis 7.0+
安装¶
启动 Redis¶
创建第一个 Worker¶
创建 my_agent.py:
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class MyAssistant(GatewayWorker):
def get_agent_types(self):
# 声明此 Worker 能够处理的 Agent 类型
return ["weather_agent", "chat_agent"]
async def process_command(self, command, context: AgentContext):
# 发送流式文本片段
await context.emit_chunk("正在处理您的请求...\n")
# 模拟耗时操作
await asyncio.sleep(0.5)
# 更新任务状态
await context.emit_state("thinking")
# 从 command 的 content 中读取请求内容
user_input = (
command.content if isinstance(command.content, str) else str(command.content)
)
# 发送思考过程
await context.emit_chunk(f"我收到了: {user_input}\n")
await asyncio.sleep(0.3)
# 发送最终结果
await context.emit_chunk("这是我的回复!")
return {
"status": "success",
"message": "任务完成",
"data": {"answer": "今天天气晴朗"}
}
if __name__ == "__main__":
run_worker(
worker_class=MyAssistant,
worker_id="worker-01",
redis_host="localhost",
redis_port=6379,
)
启动 Worker:
发送测试任务¶
创建 send_task.py:
import asyncio
from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis
async def send_task():
redis = init_redis(host="localhost", port=6379)
registry = WorkerRegistry(redis_client=redis)
client = ByaiGatewayClient(redis_client=redis, registry=registry)
response = await client.send_message(
target_agent_type="weather_agent",
session_id="session-001",
content="今天北京天气怎么样?",
)
if response.success:
print(f"任务已发送,消息 ID: {response.message_id}")
else:
print(f"发送失败: {response.error}")
await close_redis()
asyncio.run(send_task())
运行: