流式输出示例¶
字符级流式输出¶
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class StreamingAgent(GatewayWorker):
def get_agent_types(self):
return ["streaming_demo"]
async def process_command(self, command, context: AgentContext):
text = "这是一段流式输出的示例文本,每个字符会逐个发送。"
for char in text:
await context.emit_chunk(char)
await asyncio.sleep(0.05) # 模拟打字效果
return {"status": "done"}
public class StreamingAgent extends GatewayWorker {
public StreamingAgent(String workerId) { super(workerId); }
@Override
public List<String> getAgentTypes() {
return List.of("streaming_demo");
}
@Override
public Object processCommand(GatewayCommand command, AgentContext context) {
String text = "这是一段流式输出的示例文本,每个字符会逐个发送。";
for (char c : text.toCharArray()) {
context.emitChunk(String.valueOf(c));
try { Thread.sleep(50); } catch (InterruptedException e) { break; }
}
return "done";
}
}
class StreamingAgent extends GatewayWorker {
getAgentTypes(): string[] {
return ["streaming_demo"];
}
async processCommand(command: GatewayCommand, context: AgentContext) {
const text = "这是一段流式输出的示例文本,每个字符会逐个发送。";
for (const char of text) {
await context.emitChunk(char);
await new Promise(resolve => setTimeout(resolve, 50));
}
return { status: "done" };
}
}
带思考过程的流式输出¶
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class ThinkingAgent(GatewayWorker):
def get_agent_types(self):
return ["thinking_agent"]
async def process_command(self, command, context: AgentContext):
user_input = str(command.content)
# 思考中状态
await context.emit_state("thinking")
await context.emit_chunk("让我想想...\n")
await asyncio.sleep(1)
# 推理过程
await context.emit_state("reasoning")
await context.emit_chunk("首先,我需要分析这个问题...\n")
await asyncio.sleep(0.5)
# 生成回答
await context.emit_state("generating")
await context.emit_chunk(f"关于 '{user_input}' 的回答是:\n")
await asyncio.sleep(0.5)
# 最终答案
await context.emit_chunk("这是最终的流式回答!")
return {"status": "success"}
public class ThinkingAgent extends GatewayWorker {
public ThinkingAgent(String workerId) { super(workerId); }
@Override
public List<String> getAgentTypes() {
return List.of("thinking_agent");
}
@Override
public Object processCommand(GatewayCommand command, AgentContext context) {
String userInput = String.valueOf(((AskAgentCommand) command).content());
context.emitState("thinking");
context.emitChunk("让我想想...\n");
context.emitState("reasoning");
context.emitChunk("首先,我需要分析这个问题...\n");
context.emitState("generating");
context.emitChunk("关于 '" + userInput + "' 的回答是:\n");
context.emitChunk("这是最终的流式回答!");
return "success";
}
}
class ThinkingAgent extends GatewayWorker {
getAgentTypes(): string[] {
return ["thinking_agent"];
}
async processCommand(command: GatewayCommand, context: AgentContext) {
const userInput = String((command as AskAgentCommand).content);
await context.emitState({ state: "thinking" });
await context.emitChunk("让我想想...\n");
await new Promise(resolve => setTimeout(resolve, 1000));
await context.emitState({ state: "reasoning" });
await context.emitChunk("首先,我需要分析这个问题...\n");
await new Promise(resolve => setTimeout(resolve, 500));
await context.emitState({ state: "generating" });
await context.emitChunk(`关于 '${userInput}' 的回答是:\n`);
await context.emitChunk("这是最终的流式回答!");
return { status: "success" };
}
}