流式输出示例
字符级流式输出
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class StreamingAgent(GatewayWorker):
def get_agent_types(self):
return ["streaming_demo"]
async def process_command(self, command, context: AgentContext):
text = "这是一段流式输出的示例文本,每个字符会逐个发送。"
for char in text:
await context.emit_chunk(char)
await asyncio.sleep(0.05) # 模拟打字效果
return {"status": "done"}
逐词流式输出
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class WordStreamingAgent(GatewayWorker):
def get_agent_types(self):
return ["word_streaming"]
async def process_command(self, command, context: AgentContext):
words = ["这是", "一段", "逐词", "流式", "输出", "示例"]
for word in words:
await context.emit_chunk(word + " ")
await asyncio.sleep(0.2)
return {"status": "done"}
带思考过程的流式输出
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class ThinkingAgent(GatewayWorker):
def get_agent_types(self):
return ["thinking_agent"]
async def process_command(self, command, context: AgentContext):
user_input = str(command.content)
# 思考中状态
await context.emit_state("thinking")
await context.emit_chunk("让我想想...\n")
await asyncio.sleep(1)
# 推理过程
await context.emit_state("reasoning")
await context.emit_chunk("首先,我需要分析这个问题...\n")
await asyncio.sleep(0.5)
# 生成回答
await context.emit_state("generating")
await context.emit_chunk(f"关于 '{user_input}' 的回答是:\n")
await asyncio.sleep(0.5)
# 最终答案
await context.emit_chunk("这是最终的流式回答!")
return {"status": "success"}