客户端开发指南¶
ByaiGatewayClient¶
ByaiGatewayClient 是对 GatewayClient 的封装,默认通过共享的 Byai codec 进行消息序列化,支持更高级的消息协议。
import asyncio
from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis
async def main():
redis = init_redis(host="localhost", port=6379)
registry = WorkerRegistry(redis_client=redis)
client = ByaiGatewayClient(redis_client=redis, registry=registry)
response = await client.send_message(
target_agent_type="weather_agent",
session_id="session_123",
user_code="user_123",
content="查询北京今天的天气",
)
if response.success:
print(f"任务已发送,消息 ID: {response.message_id}")
else:
print(f"发送失败: {response.error}")
await close_redis()
asyncio.run(main())
发送路径说明¶
GatewayClient.send_message(...) 有两种模式:
Agent Type 模式(默认)¶
- 根据
target_agent_type写入 agent type stream - 在
require_online_worker=True时验证是否存在在线 worker - 实际由哪个 worker 消费是在消费者真正读到消息时才确定的
Direct Worker 模式¶
- 传入
target_worker_id后直接写入 worker stream - 适合 debug 或定向控制
- 发送前会显式检查该 worker 是否 online
GatewayClient API¶
async def send_message(
self,
target_agent_type: str,
session_id: str,
content: Any,
user_code: str = "",
action_type: str = "ASK_AGENT",
metadata: Optional[dict] = None,
target_worker_id: Optional[str] = None,
require_online_worker: bool = True,
) -> SendMessageResponse:
"""发送消息,返回响应对象"""
async def cancel_task(
self,
message_id: str,
session_id: str,
reason: str = ""
) -> CancelTaskResponse:
"""取消指定的任务"""