跳转至

Client 模块

核心文件

  • src/by_framework/client/client.py - GatewayClient
  • src/by_framework/client/byai_client.py - ByaiGatewayClient

GatewayClient

向 Redis Streams 发送命令的客户端。

构造方法

class GatewayClient:
    def __init__(
        self,
        redis_client: Redis,
        registry: WorkerRegistry,
    ) -> None:
        self._redis = redis_client
        self._registry = registry

send_message

发送消息:

async def send_message(
    self,
    target_agent_type: str,
    session_id: str,
    content: Any,
    user_code: str = "",
    action_type: str = "ASK_AGENT",
    metadata: Optional[dict] = None,
    target_worker_id: Optional[str] = None,
    require_online_worker: bool = True,
) -> SendMessageResponse:

cancel_task

取消任务:

async def cancel_task(
    self,
    message_id: str,
    session_id: str,
    reason: str = "",
) -> CancelTaskResponse:

ByaiGatewayClient

GatewayClient 的封装,增加了 ByaiMessageInterceptor,支持更高级的消息协议:

class ByaiGatewayClient(GatewayClient):
    def __init__(
        self,
        redis_client: Redis,
        registry: WorkerRegistry,
        message_interceptor: Optional[ByaiMessageInterceptor] = None,
    ) -> None:

SendMessageResponse

@dataclass
class SendMessageResponse:
    success: bool
    message_id: Optional[str] = None
    target_worker_id: Optional[str] = None
    error: Optional[str] = None

CancelTaskResponse

@dataclass
class CancelTaskResponse:
    success: bool
    error: Optional[str] = None