跳转至

流式输出示例

字符级流式输出

import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class StreamingAgent(GatewayWorker):
    def get_agent_types(self):
        return ["streaming_demo"]

    async def process_command(self, command, context: AgentContext):
        text = "这是一段流式输出的示例文本,每个字符会逐个发送。"

        for char in text:
            await context.emit_chunk(char)
            await asyncio.sleep(0.05)  # 模拟打字效果

        return {"status": "done"}

逐词流式输出

import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class WordStreamingAgent(GatewayWorker):
    def get_agent_types(self):
        return ["word_streaming"]

    async def process_command(self, command, context: AgentContext):
        words = ["这是", "一段", "逐词", "流式", "输出", "示例"]

        for word in words:
            await context.emit_chunk(word + " ")
            await asyncio.sleep(0.2)

        return {"status": "done"}

带思考过程的流式输出

import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class ThinkingAgent(GatewayWorker):
    def get_agent_types(self):
        return ["thinking_agent"]

    async def process_command(self, command, context: AgentContext):
        user_input = str(command.content)

        # 思考中状态
        await context.emit_state("thinking")
        await context.emit_chunk("让我想想...\n")
        await asyncio.sleep(1)

        # 推理过程
        await context.emit_state("reasoning")
        await context.emit_chunk("首先,我需要分析这个问题...\n")
        await asyncio.sleep(0.5)

        # 生成回答
        await context.emit_state("generating")
        await context.emit_chunk(f"关于 '{user_input}' 的回答是:\n")
        await asyncio.sleep(0.5)

        # 最终答案
        await context.emit_chunk("这是最终的流式回答!")

        return {"status": "success"}