OpenClaw 插件扩展系统设计

openclaw AI小龙虾攻略 1

OpenClaw 是一个可扩展的机器人控制/自动化框架,插件系统是其核心特性,允许用户扩展功能而无需修改核心代码。

OpenClaw 插件扩展系统设计-第1张图片-官方获取 | OpenClaw下载 - openclaw官网

插件架构设计

插件基础结构

# openclaw/plugin/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
import inspect
import importlib
import pkgutil
from pathlib import Path
class OpenClawPlugin(ABC):
    """插件基类"""
    def __init__(self):
        self.name = self.__class__.__name__
        self.version = "1.0.0"
        self.author = "Unknown"
        self.description = ""
        self.enabled = True
    @property
    @abstractmethod
    def plugin_id(self) -> str:
        """插件唯一标识符"""
        pass
    @abstractmethod
    def initialize(self, context: 'PluginContext') -> bool:
        """初始化插件"""
        pass
    def on_enable(self):
        """插件启用时调用"""
        pass
    def on_disable(self):
        """插件禁用时调用"""
        pass
    def get_config_schema(self) -> Dict[str, Any]:
        """返回插件配置模式"""
        return {}
    def cleanup(self):
        """清理资源"""
        pass

插件管理器

# openclaw/plugin/manager.py
import threading
import json
import yaml
from dataclasses import dataclass
from enum import Enum
class PluginLoadStatus(Enum):
    LOADED = "loaded"
    FAILED = "failed"
    DISABLED = "disabled"
@dataclass
class PluginInfo:
    id: str
    name: str
    version: str
    author: str
    description: str
    enabled: bool
    instance: Optional[OpenClawPlugin] = None
    load_status: PluginLoadStatus = PluginLoadStatus.DISABLED
    error_message: str = ""
class PluginManager:
    """插件管理器"""
    def __init__(self, config_path: str = "config/plugins.yaml"):
        self.plugins: Dict[str, PluginInfo] = {}
        self.plugin_dirs: List[str] = []
        self.config_path = config_path
        self._lock = threading.RLock()
        self._context = None
    def add_plugin_dir(self, directory: str):
        """添加插件目录"""
        self.plugin_dirs.append(directory)
    def discover_plugins(self) -> List[str]:
        """发现可用插件"""
        discovered = []
        for plugin_dir in self.plugin_dirs:
            for module_info in pkgutil.iter_modules([plugin_dir]):
                discovered.append(module_info.name)
        return discovered
    def load_plugin(self, plugin_id: str) -> bool:
        """加载单个插件"""
        with self._lock:
            try:
                # 动态导入插件模块
                module = importlib.import_module(f"plugins.{plugin_id}")
                # 查找插件类(继承自OpenClawPlugin的类)
                for name, obj in inspect.getmembers(module):
                    if (inspect.isclass(obj) and 
                        issubclass(obj, OpenClawPlugin) and 
                        obj != OpenClawPlugin):
                        plugin_instance = obj()
                        # 初始化插件
                        if plugin_instance.initialize(self._context):
                            plugin_info = PluginInfo(
                                id=plugin_instance.plugin_id,
                                name=plugin_instance.name,
                                version=plugin_instance.version,
                                author=plugin_instance.author,
                                description=plugin_instance.description,
                                enabled=True,
                                instance=plugin_instance,
                                load_status=PluginLoadStatus.LOADED
                            )
                            self.plugins[plugin_instance.plugin_id] = plugin_info
                            plugin_instance.on_enable()
                            return True
            except Exception as e:
                print(f"加载插件 {plugin_id} 失败: {e}")
                self.plugins[plugin_id] = PluginInfo(
                    id=plugin_id,
                    name=plugin_id,
                    version="0.0.0",
                    author="Unknown",
                    description="",
                    enabled=False,
                    load_status=PluginLoadStatus.FAILED,
                    error_message=str(e)
                )
                return False
    def load_all_plugins(self):
        """加载所有插件"""
        plugin_ids = self.discover_plugins()
        for plugin_id in plugin_ids:
            self.load_plugin(plugin_id)
    def enable_plugin(self, plugin_id: str) -> bool:
        """启用插件"""
        with self._lock:
            if plugin_id in self.plugins:
                plugin = self.plugins[plugin_id]
                if plugin.instance:
                    plugin.instance.enabled = True
                    plugin.instance.on_enable()
                    return True
        return False
    def disable_plugin(self, plugin_id: str) -> bool:
        """禁用插件"""
        with self._lock:
            if plugin_id in self.plugins:
                plugin = self.plugins[plugin_id]
                if plugin.instance:
                    plugin.instance.enabled = False
                    plugin.instance.on_disable()
                    return True
        return False
    def get_plugin(self, plugin_id: str) -> Optional[OpenClawPlugin]:
        """获取插件实例"""
        with self._lock:
            plugin_info = self.plugins.get(plugin_id)
            return plugin_info.instance if plugin_info else None
    def list_plugins(self) -> List[PluginInfo]:
        """列出所有插件信息"""
        return list(self.plugins.values())
    def save_config(self):
        """保存插件配置"""
        config = {}
        for plugin_id, plugin_info in self.plugins.items():
            config[plugin_id] = {
                'enabled': plugin_info.enabled,
                'config': plugin_info.instance.get_config() if plugin_info.instance else {}
            }
        with open(self.config_path, 'w') as f:
            yaml.dump(config, f)

插件上下文

# openclaw/plugin/context.py
class PluginContext:
    """插件上下文,提供插件运行时环境"""
    def __init__(self, config_manager, event_bus, logger):
        self.config_manager = config_manager
        self.event_bus = event_bus
        self.logger = logger
        self.shared_data = {}
    def get_service(self, service_name: str):
        """获取系统服务"""
        # 从服务容器获取服务
        pass
    def emit_event(self, event_type: str, data: Dict[str, Any]):
        """发射事件"""
        self.event_bus.emit(event_type, data)
    def register_command(self, command: str, handler):
        """注册命令处理器"""
        pass

插件示例:Web界面插件

# plugins/web_interface.py
from openclaw.plugin.base import OpenClawPlugin
from flask import Flask, jsonify, request
import threading
class WebInterfacePlugin(OpenClawPlugin):
    """Web界面插件"""
    @property
    def plugin_id(self) -> str:
        return "web_interface"
    def initialize(self, context) -> bool:
        self.context = context
        self.app = Flask(__name__)
        self.port = context.config_manager.get('web_interface.port', 8080)
        self.running = False
        # 注册路由
        self.setup_routes()
        return True
    def setup_routes(self):
        """设置Flask路由"""
        @self.app.route('/api/plugins')
        def list_plugins():
            plugins = self.context.plugin_manager.list_plugins()
            return jsonify([{
                'id': p.id,
                'name': p.name,
                'enabled': p.enabled,
                'version': p.version
            } for p in plugins])
        @self.app.route('/api/plugins/<plugin_id>/enable', methods=['POST'])
        def enable_plugin(plugin_id):
            success = self.context.plugin_manager.enable_plugin(plugin_id)
            return jsonify({'success': success})
        @self.app.route('/api/status')
        def get_status():
            # 获取系统状态
            return jsonify({'status': 'running'})
    def on_enable(self):
        """启动Web服务器"""
        if not self.running:
            self.thread = threading.Thread(
                target=self.app.run,
                kwargs={'host': '0.0.0.0', 'port': self.port, 'debug': False},
                daemon=True
            )
            self.thread.start()
            self.running = True
            self.context.logger.info(f"Web界面已启动,端口: {self.port}")
    def on_disable(self):
        """停止Web服务器"""
        # Flask在调试模式下运行,这里需要更复杂的停止逻辑
        self.running = False
        self.context.logger.info("Web界面已停止")
    def get_config_schema(self) -> Dict[str, Any]:
        return {
            'port': {
                'type': 'number',
                'default': 8080,
                'description': 'Web服务器端口'
            },
            'auth_enabled': {
                'type': 'boolean',
                'default': False,
                'description': '启用身份验证'
            }
        }

插件示例:任务调度插件

# plugins/task_scheduler.py
from openclaw.plugin.base import OpenClawPlugin
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import json
class TaskSchedulerPlugin(OpenClawPlugin):
    """任务调度插件"""
    @property
    def plugin_id(self) -> str:
        return "task_scheduler"
    def initialize(self, context) -> bool:
        self.context = context
        self.scheduler = BackgroundScheduler()
        self.tasks = {}
        # 加载保存的任务
        self.load_tasks()
        return True
    def schedule_task(self, task_id: str, func, trigger: str, **trigger_args):
        """调度任务"""
        job = self.scheduler.add_job(
            func,
            trigger,
            id=task_id,
            **trigger_args
        )
        self.tasks[task_id] = {
            'job': job,
            'trigger': trigger,
            'args': trigger_args
        }
        self.save_tasks()
    def cancel_task(self, task_id: str):
        """取消任务"""
        if task_id in self.tasks:
            self.scheduler.remove_job(task_id)
            del self.tasks[task_id]
            self.save_tasks()
    def load_tasks(self):
        """加载任务配置"""
        try:
            with open('data/scheduled_tasks.json', 'r') as f:
                tasks_config = json.load(f)
            # 这里可以重新创建任务
            # 注意:实际实现需要考虑函数序列化问题
        except FileNotFoundError:
            pass
    def save_tasks(self):
        """保存任务配置"""
        tasks_config = {}
        for task_id, task_info in self.tasks.items():
            tasks_config[task_id] = {
                'trigger': task_info['trigger'],
                'args': task_info['args']
            }
        with open('data/scheduled_tasks.json', 'w') as f:
            json.dump(tasks_config, f, indent=2)
    def on_enable(self):
        self.scheduler.start()
        self.context.logger.info("任务调度器已启动")
    def on_disable(self):
        self.scheduler.shutdown()
        self.save_tasks()
        self.context.logger.info("任务调度器已停止")

插件配置管理

# plugins/configuration_manager.py
class ConfigurationManagerPlugin(OpenClawPlugin):
    """配置管理插件"""
    @property
    def plugin_id(self) -> str:
        return "config_manager"
    def initialize(self, context) -> bool:
        self.context = context
        self.configs = {}
        self.load_all_configs()
        return True
    def get_plugin_config(self, plugin_id: str, key: str = None, default=None):
        """获取插件配置"""
        config = self.configs.get(plugin_id, {})
        if key is None:
            return config
        return config.get(key, default)
    def set_plugin_config(self, plugin_id: str, key: str, value):
        """设置插件配置"""
        if plugin_id not in self.configs:
            self.configs[plugin_id] = {}
        self.configs[plugin_id][key] = value
        self.save_config(plugin_id)
        # 通知配置变更
        self.context.emit_event('config_changed', {
            'plugin_id': plugin_id,
            'key': key,
            'value': value
        })
    def load_all_configs(self):
        """加载所有配置"""
        config_dir = Path('config/plugins')
        config_dir.mkdir(parents=True, exist_ok=True)
        for config_file in config_dir.glob('*.json'):
            plugin_id = config_file.stem
            with open(config_file, 'r') as f:
                self.configs[plugin_id] = json.load(f)
    def save_config(self, plugin_id: str):
        """保存插件配置"""
        if plugin_id in self.configs:
            config_file = Path(f'config/plugins/{plugin_id}.json')
            with open(config_file, 'w') as f:
                json.dump(self.configs[plugin_id], f, indent=2)

事件系统集成

# openclaw/plugin/events.py
from typing import Callable, Dict, List
import threading
class EventBus:
    """事件总线"""
    def __init__(self):
        self.handlers: Dict[str, List[Callable]] = {}
        self._lock = threading.RLock()
    def subscribe(self, event_type: str, handler: Callable):
        """订阅事件"""
        with self._lock:
            if event_type not in self.handlers:
                self.handlers[event_type] = []
            self.handlers[event_type].append(handler)
    def unsubscribe(self, event_type: str, handler: Callable):
        """取消订阅"""
        with self._lock:
            if event_type in self.handlers:
                self.handlers[event_type].remove(handler)
    def emit(self, event_type: str, data: Dict[str, Any] = None):
        """发射事件"""
        with self._lock:
            handlers = self.handlers.get(event_type, [])
            for handler in handlers:
                try:
                    handler(data)
                except Exception as e:
                    print(f"事件处理器出错: {e}")
# 预定义事件类型
class EventTypes:
    PLUGIN_LOADED = "plugin_loaded"
    PLUGIN_UNLOADED = "plugin_unloaded"
    PLUGIN_ENABLED = "plugin_enabled"
    PLUGIN_DISABLED = "plugin_disabled"
    CONFIG_CHANGED = "config_changed"
    TASK_COMPLETED = "task_completed"
    ERROR_OCCURRED = "error_occurred"

使用示例

# main.py
from openclaw.plugin.manager import PluginManager
from openclaw.plugin.context import PluginContext
from openclaw.plugin.events import EventBus
def main():
    # 创建事件总线
    event_bus = EventBus()
    # 创建插件管理器
    plugin_manager = PluginManager()
    plugin_manager.add_plugin_dir("./plugins")
    # 创建插件上下文
    context = PluginContext(
        config_manager=None,  # 实际应该传入配置管理器
        event_bus=event_bus,
        logger=print  # 实际应该传入日志器
    )
    plugin_manager._context = context
    # 加载所有插件
    plugin_manager.load_all_plugins()
    # 启用Web界面插件
    plugin_manager.enable_plugin("web_interface")
    # 订阅事件
    def on_plugin_loaded(data):
        print(f"插件已加载: {data.get('plugin_id')}")
    event_bus.subscribe("plugin_loaded", on_plugin_loaded)
    # 保持主线程运行
    try:
        import time
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("正在关闭...")
        # 清理所有插件
        for plugin_info in plugin_manager.plugins.values():
            if plugin_info.instance:
                plugin_info.instance.cleanup()
if __name__ == "__main__":
    main()

插件配置文件示例

# config/plugins.yaml
web_interface:
  enabled: true
  config:
    port: 8080
    auth_enabled: false
task_scheduler:
  enabled: true
  config:
    timezone: "Asia/Shanghai"
config_manager:
  enabled: true

扩展建议

  1. 插件依赖管理:添加插件间依赖关系支持
  2. 插件隔离:使用进程或容器隔离插件,提高稳定性
  3. 插件签名验证:确保插件来源可信
  4. 热重载:支持插件热重载而不重启主程序
  5. 插件市场:在线插件仓库和安装功能
  6. 插件沙箱:限制插件权限,增强安全性

这个插件系统为 OpenClaw 提供了强大的扩展能力,用户可以根据需要开发各种功能插件,同时保持系统的稳定性和可维护性。

标签: OpenClaw 插件扩展

抱歉,评论功能暂时关闭!