基础示例¶
简单的 Echo Worker¶
Worker 实现¶
import os
from by_framework import run_worker
class EchoWorker:
def get_agent_types(self):
return ["echo_agent"]
async def process_command(self, command, context):
user_input = (
command.content
if isinstance(command.content, str)
else str(command.content)
)
await context.emit_chunk(f"收到: {user_input}")
return {"status": "success", "echo": user_input}
if __name__ == "__main__":
run_worker(
EchoWorker,
worker_id="echo-worker-1",
redis_host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
redis_port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
)
import com.iwhaleai.byai.framework.core.protocol.AskAgentCommand;
import com.iwhaleai.byai.framework.core.protocol.GatewayCommand;
import com.iwhaleai.byai.framework.worker.AgentContext;
import com.iwhaleai.byai.framework.worker.GatewayWorker;
import com.iwhaleai.byai.framework.worker.WorkerRunner;
import java.util.List;
public class EchoWorker extends GatewayWorker {
public EchoWorker(String workerId) {
super(workerId);
}
@Override
public List<String> getAgentTypes() {
return List.of("echo_agent");
}
@Override
public Object processCommand(GatewayCommand command, AgentContext context) {
AskAgentCommand askCommand = (AskAgentCommand) command;
String userInput = String.valueOf(askCommand.content());
context.emitChunk("收到: " + userInput);
return "success";
}
public static void main(String[] args) {
EchoWorker worker = new EchoWorker("echo-worker-1");
WorkerRunner runner = new WorkerRunner(worker);
runner.start();
Runtime.getRuntime().addShutdownHook(new Thread(runner::stop));
}
}
import {
GatewayWorker, AgentContext, GatewayCommand,
AskAgentCommand, runWorker
} from '@byclaw/by-framework';
class EchoWorker extends GatewayWorker {
getAgentTypes(): string[] {
return ["echo_agent"];
}
async processCommand(command: GatewayCommand, context: AgentContext) {
const askCommand = command as AskAgentCommand;
const userInput = String(askCommand.content);
await context.emitChunk(`收到: ${userInput}`);
return { status: "success", echo: userInput };
}
}
runWorker(EchoWorker, {
workerId: "echo-worker-1",
redisHost: process.env.BYAI_REDIS_HOST || "127.0.0.1",
redisPort: Number(process.env.BYAI_REDIS_PORT || 6379),
});
发送任务客户端¶
import asyncio
import os
from by_framework import ByaiGatewayClient, WorkerRegistry, init_redis, close_redis
async def send():
redis = init_redis(
host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
)
registry = WorkerRegistry(redis_client=redis)
client = ByaiGatewayClient(redis_client=redis, registry=registry)
response = await client.send_message(
target_agent_type="echo_agent",
session_id="session-001",
content="Hello, World!",
)
print(f"Success: {response.success}")
if response.message_id:
print(f"Message ID: {response.message_id}")
await close_redis()
asyncio.run(send())
import com.iwhaleai.byai.framework.client.ByaiGatewayClient;
import com.iwhaleai.byai.framework.client.GatewayClient;
import com.iwhaleai.byai.framework.common.RedisClient;
public class SendTask {
public static void main(String[] args) {
RedisClient redisClient = RedisClient.getInstance();
ByaiGatewayClient client = new ByaiGatewayClient(redisClient);
GatewayClient.SendResponse response = client.sendMessage(
"echo_agent", "session-001", "Hello, World!"
);
System.out.println("Success: " + response.isSuccess());
if (response.getMessageId() != null) {
System.out.println("Message ID: " + response.getMessageId());
}
redisClient.close();
}
}
import { ByaiGatewayClient, initRedis, closeRedis } from '@byclaw/by-framework';
async function send() {
const redis = initRedis({
host: process.env.BYAI_REDIS_HOST || "127.0.0.1",
port: Number(process.env.BYAI_REDIS_PORT || 6379),
});
const client = new ByaiGatewayClient({ redisClient: redis });
const response = await client.sendMessage({
targetAgentType: "echo_agent",
sessionId: "session-001",
content: "Hello, World!",
});
console.log(`Success: ${response.success}`);
if (response.message_id) {
console.log(`Message ID: ${response.message_id}`);
}
await closeRedis();
}
send();
带状态的 Worker¶
import asyncio
from by_framework import run_worker
class StatefulWorker:
def get_agent_types(self):
return ["stateful_agent"]
async def process_command(self, command, context):
# 发送开始状态
await context.emit_state("idle")
await context.emit_chunk("开始处理...\n")
# 处理中
await context.emit_state("processing")
await asyncio.sleep(1)
# 完成
await context.emit_state("completed")
await context.emit_chunk("处理完成!")
return {"status": "success"}
public class StatefulWorker extends GatewayWorker {
public StatefulWorker(String workerId) { super(workerId); }
@Override
public List<String> getAgentTypes() {
return List.of("stateful_agent");
}
@Override
public Object processCommand(GatewayCommand command, AgentContext context) {
context.emitState("idle");
context.emitChunk("开始处理...\n");
context.emitState("processing");
try { Thread.sleep(1000); } catch (InterruptedException e) { return "cancelled"; }
context.emitState("completed");
context.emitChunk("处理完成!");
return "success";
}
}
class StatefulWorker extends GatewayWorker {
getAgentTypes(): string[] {
return ["stateful_agent"];
}
async processCommand(command: GatewayCommand, context: AgentContext) {
await context.emitState({ state: "idle" });
await context.emitChunk("开始处理...\n");
await context.emitState({ state: "processing" });
await new Promise(resolve => setTimeout(resolve, 1000));
await context.emitState({ state: "completed" });
await context.emitChunk("处理完成!");
return { status: "success" };
}
}
启动多个 Worker¶
在不同的终端启动多个 Worker 实例,实现负载均衡:
多个 Worker 连接同一个 Redis Stream,Redis 会自动进行负载分配。