插件示例
基础插件示例
LoggingPlugin - 带生命周期的插件
import time
from typing import Any
from by_framework.core.extensions import (
AgentConfig,
Plugin,
PluginBuildContext,
PluginManifest,
)
from by_framework.worker import AgentContext
from langchain_core.tools import tool
class LoggingPlugin(Plugin):
"""原生 By-Framework 插件示例:自动化记录任务执行生命周期"""
def __init__(self):
super().__init__(PluginManifest(
plugin_id="logging-plugin",
version="1.0.0",
))
self._start_times = {}
async def register_agent_configs(
self, build_context: PluginBuildContext
) -> list[AgentConfig]:
"""插件注册阶段:向系统中注入预置的 Agent 配置"""
@tool
async def get_worker_stats(metrics: list = None) -> str:
"""实时获取当前 Worker 的资源指标"""
return f"[插件执行] 成功获取指标 {metrics or 'all'}: CPU 12.4%, 内存 4.2GB"
return [
AgentConfig(
agent_id="debug-agent",
name="By-Framework 诊断专家",
description="由 LoggingPlugin 动态注入的专家级智能体",
tools={"get_worker_stats": get_worker_stats},
extra={
"plugin_origin": self.plugin_id,
"version": self.manifest.version,
"injected_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
)
]
async def on_task_start(self, context: AgentContext) -> None:
"""任务开始钩子:记录开始时间"""
self._start_times[context.message_id] = time.time()
print(f"[插件钩子] 任务 {context.message_id} 开始执行")
async def on_task_complete(self, context: AgentContext, result: Any) -> None:
"""任务完成钩子:计算耗时并输出"""
start_time = self._start_times.pop(context.message_id, time.time())
duration = time.time() - start_time
print(f"[插件钩子] 任务 {context.message_id} 完成,耗时 {duration:.4f}s")
async def on_worker_startup(self, worker: Any) -> None:
"""Worker 启动时调用"""
print(f"[插件钩子] Worker {getattr(worker, 'worker_id', 'unknown')} 已启动")
使用插件
from by_framework import run_worker
run_worker(
EchoWorker,
worker_id="echo-worker-1",
redis_host="127.0.0.1",
redis_port=6379,
plugin_list=[LoggingPlugin()], # 传入插件列表
)
热更新插件示例
HotReloadPlugin
import json
from pathlib import Path
from by_framework.core.extensions import (
AgentConfig,
Plugin,
PluginBuildContext,
PluginManifest,
PluginReloadContext,
)
class HotReloadPlugin(Plugin):
"""支持增量热更新的插件"""
def __init__(self, state_file: Path = Path("hot_reload_state.json")):
super().__init__(PluginManifest(
plugin_id="hot-reload-demo",
version="1.0.0",
priority=10,
))
self.state_file = state_file
async def register_agent_configs(
self, build_context: PluginBuildContext,
) -> list[AgentConfig]:
state = self._load_state()
return [self._build_config(state)]
async def reload(
self, context: PluginReloadContext,
) -> list[AgentConfig]:
"""热更新时替换自己的配置"""
state = self._load_state()
next_config = self._build_config(state)
# 保留其他插件的配置
next_configs = []
for config in context.current_agent_configs:
if config.agent_id == "hot-reload-agent":
next_configs.append(next_config)
else:
next_configs.append(config)
return next_configs
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {"version": 1, "message": "hello from plugin"}
def _build_config(self, state: dict) -> AgentConfig:
return AgentConfig(
agent_id="hot-reload-agent",
name=f"Hot Reload Demo v{state['version']}",
extra={"hot_reload_version": state["version"]},
)
触发热更新
# hot_reload_client.py
import json
# 修改状态文件
state = json.load(open("hot_reload_state.json"))
state["version"] += 1
json.dump(state, open("hot_reload_state.json", "w"))
# 通知所有 worker 热更新
await client.reload_plugins_for_agent_type("hot-reload-agent")