跳转至

插件示例

基础插件示例

LoggingPlugin - 带生命周期的插件

import time
from typing import Any
from by_framework.core.extensions import (
    AgentConfig,
    Plugin,
    PluginBuildContext,
    PluginManifest,
)
from by_framework.worker import AgentContext
from langchain_core.tools import tool

class LoggingPlugin(Plugin):
    """原生 By-Framework 插件示例:自动化记录任务执行生命周期"""

    def __init__(self):
        super().__init__(PluginManifest(
            plugin_id="logging-plugin",
            version="1.0.0",
        ))
        self._start_times = {}

    async def register_agent_configs(
        self, build_context: PluginBuildContext
    ) -> list[AgentConfig]:
        """插件注册阶段:向系统中注入预置的 Agent 配置"""

        @tool
        async def get_worker_stats(metrics: list = None) -> str:
            """实时获取当前 Worker 的资源指标"""
            return f"[插件执行] 成功获取指标 {metrics or 'all'}: CPU 12.4%, 内存 4.2GB"

        return [
            AgentConfig(
                agent_id="debug-agent",
                name="By-Framework 诊断专家",
                description="由 LoggingPlugin 动态注入的专家级智能体",
                tools={"get_worker_stats": get_worker_stats},
                extra={
                    "plugin_origin": self.plugin_id,
                    "version": self.manifest.version,
                    "injected_at": time.strftime("%Y-%m-%d %H:%M:%S"),
                }
            )
        ]

    async def on_task_start(self, context: AgentContext) -> None:
        """任务开始钩子:记录开始时间"""
        self._start_times[context.message_id] = time.time()
        print(f"[插件钩子] 任务 {context.message_id} 开始执行")

    async def on_task_complete(self, context: AgentContext, result: Any) -> None:
        """任务完成钩子:计算耗时并输出"""
        start_time = self._start_times.pop(context.message_id, time.time())
        duration = time.time() - start_time
        print(f"[插件钩子] 任务 {context.message_id} 完成,耗时 {duration:.4f}s")

    async def on_worker_startup(self, worker: Any) -> None:
        """Worker 启动时调用"""
        print(f"[插件钩子] Worker {getattr(worker, 'worker_id', 'unknown')} 已启动")

使用插件

from by_framework import run_worker

run_worker(
    EchoWorker,
    worker_id="echo-worker-1",
    redis_host="127.0.0.1",
    redis_port=6379,
    plugin_list=[LoggingPlugin()],  # 传入插件列表
)

热更新插件示例

HotReloadPlugin

import json
from pathlib import Path
from by_framework.core.extensions import (
    AgentConfig,
    Plugin,
    PluginBuildContext,
    PluginManifest,
    PluginReloadContext,
)

class HotReloadPlugin(Plugin):
    """支持增量热更新的插件"""

    def __init__(self, state_file: Path = Path("hot_reload_state.json")):
        super().__init__(PluginManifest(
            plugin_id="hot-reload-demo",
            version="1.0.0",
            priority=10,
        ))
        self.state_file = state_file

    async def register_agent_configs(
        self, build_context: PluginBuildContext,
    ) -> list[AgentConfig]:
        state = self._load_state()
        return [self._build_config(state)]

    async def reload(
        self, context: PluginReloadContext,
    ) -> list[AgentConfig]:
        """热更新时替换自己的配置"""
        state = self._load_state()
        next_config = self._build_config(state)

        # 保留其他插件的配置
        next_configs = []
        for config in context.current_agent_configs:
            if config.agent_id == "hot-reload-agent":
                next_configs.append(next_config)
            else:
                next_configs.append(config)

        return next_configs

    def _load_state(self) -> dict:
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"version": 1, "message": "hello from plugin"}

    def _build_config(self, state: dict) -> AgentConfig:
        return AgentConfig(
            agent_id="hot-reload-agent",
            name=f"Hot Reload Demo v{state['version']}",
            extra={"hot_reload_version": state["version"]},
        )

触发热更新

# hot_reload_client.py
import json

# 修改状态文件
state = json.load(open("hot_reload_state.json"))
state["version"] += 1
json.dump(state, open("hot_reload_state.json", "w"))

# 通知所有 worker 热更新
await client.reload_plugins_for_agent_type("hot-reload-agent")