我来帮你设计一个具备数据保留功能的 OpenClaw(机械爪)类,以下是一个完整的实现方案:

import json
import pickle
import time
from datetime import datetime
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Optional, Any
from enum import Enum
class ClawState(Enum):
"""机械爪状态枚举"""
OPEN = "open"
CLOSED = "closed"
HALF_OPEN = "half_open"
ERROR = "error"
@dataclass
class OperationRecord:
"""操作记录数据类"""
timestamp: float
datetime: str
action: str
position: tuple
force: float
state: ClawState
success: bool
metadata: Dict[str, Any] = field(default_factory=dict)
class OpenClaw:
"""
具备数据保留功能的智能机械爪
特点:
1. 自动记录所有操作历史
2. 支持多种数据持久化方式
3. 可配置的数据保留策略
4. 实时状态监控
"""
def __init__(self,
claw_id: str = "default_claw",
max_history: int = 1000,
auto_save: bool = True,
save_format: str = "json"):
"""
初始化机械爪
Args:
claw_id: 机械爪唯一标识
max_history: 最大历史记录数(0表示无限制)
auto_save: 是否自动保存
save_format: 保存格式(json/pickle)
"""
self.claw_id = claw_id
self.max_history = max_history
self.auto_save = auto_save
self.save_format = save_format
# 当前状态
self.state = ClawState.CLOSED
self.position = (0.0, 0.0, 0.0) # (x, y, z)
self.current_force = 0.0
self.is_active = False
# 数据存储
self._operation_history: List[OperationRecord] = []
self._config = {
"max_force": 50.0, # 最大抓取力(N)
"min_force": 0.5, # 最小抓取力(N)
"speed": 1.0, # 移动速度
"sensitivity": 0.1, # 灵敏度
}
# 统计数据
self._stats = {
"total_operations": 0,
"successful_operations": 0,
"failed_operations": 0,
"total_force_applied": 0.0,
"last_maintenance": time.time(),
}
# 加载历史数据(如果存在)
self._load_history()
def open(self, force: float = 5.0) -> bool:
"""打开机械爪"""
return self._execute_operation("open", force)
def close(self, force: float = 10.0) -> bool:
"""关闭机械爪"""
return self._execute_operation("close", force)
def move_to(self, x: float, y: float, z: float) -> bool:
"""移动到指定位置"""
old_position = self.position
self.position = (x, y, z)
record = OperationRecord(
timestamp=time.time(),
datetime=datetime.now().isoformat(),
action="move",
position=self.position,
force=0.0,
state=self.state,
success=True,
metadata={
"from_position": old_position,
"to_position": self.position,
"distance": self._calculate_distance(old_position, self.position)
}
)
self._add_record(record)
return True
def grip(self, target_force: float, object_type: str = "unknown") -> bool:
"""
抓取物体
Args:
target_force: 目标抓取力
object_type: 物体类型
"""
if target_force > self._config["max_force"]:
print(f"警告:目标抓取力 {target_force}N 超过最大限制 {self._config['max_force']}N")
return False
self.state = ClawState.CLOSED
self.current_force = target_force
record = OperationRecord(
timestamp=time.time(),
datetime=datetime.now().isoformat(),
action="grip",
position=self.position,
force=target_force,
state=self.state,
success=True,
metadata={
"object_type": object_type,
"force_applied": target_force,
"config_used": self._config.copy()
}
)
self._add_record(record)
self._stats["total_force_applied"] += target_force
return True
def release(self) -> bool:
"""释放物体"""
self.state = ClawState.OPEN
self.current_force = 0.0
record = OperationRecord(
timestamp=time.time(),
datetime=datetime.now().isoformat(),
action="release",
position=self.position,
force=0.0,
state=self.state,
success=True,
metadata={"force_before_release": self.current_force}
)
self._add_record(record)
return True
def _execute_operation(self, action: str, force: float) -> bool:
"""执行基础操作"""
success = False
try:
if action == "open":
self.state = ClawState.OPEN
self.current_force = 0.0
success = True
elif action == "close":
self.state = ClawState.CLOSED
self.current_force = force
success = True
record = OperationRecord(
timestamp=time.time(),
datetime=datetime.now().isoformat(),
action=action,
position=self.position,
force=force,
state=self.state,
success=success,
metadata={"target_force": force}
)
self._add_record(record)
if success:
self._stats["successful_operations"] += 1
else:
self._stats["failed_operations"] += 1
self._stats["total_operations"] += 1
except Exception as e:
record = OperationRecord(
timestamp=time.time(),
datetime=datetime.now().isoformat(),
action=action,
position=self.position,
force=force,
state=ClawState.ERROR,
success=False,
metadata={"error": str(e)}
)
self._add_record(record)
self._stats["failed_operations"] += 1
self._stats["total_operations"] += 1
return success
def _add_record(self, record: OperationRecord):
"""添加操作记录"""
self._operation_history.append(record)
# 限制历史记录数量
if self.max_history > 0 and len(self._operation_history) > self.max_history:
self._operation_history.pop(0)
# 自动保存
if self.auto_save:
self.save_history()
def save_history(self, filename: Optional[str] = None):
"""
保存历史记录到文件
Args:
filename: 文件名(默认使用claw_id)
"""
if filename is None:
filename = f"{self.claw_id}_history"
data = {
"claw_id": self.claw_id,
"config": self._config,
"stats": self._stats,
"current_state": {
"position": self.position,
"force": self.current_force,
"state": self.state.value
},
"history": [asdict(record) for record in self._operation_history]
}
if self.save_format == "json":
with open(f"{filename}.json", "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
elif self.save_format == "pickle":
with open(f"{filename}.pkl", "wb") as f:
pickle.dump(data, f)
def _load_history(self):
"""加载历史记录"""
filename = f"{self.claw_id}_history"
try:
if self.save_format == "json":
with open(f"{filename}.json", "r", encoding="utf-8") as f:
data = json.load(f)
elif self.save_format == "pickle":
with open(f"{filename}.pkl", "rb") as f:
data = pickle.load(f)
else:
return
# 更新配置和统计数据
self._config = data.get("config", self._config)
self._stats = data.get("stats", self._stats)
# 加载历史记录
history_data = data.get("history", [])
for record_data in history_data:
record_data["state"] = ClawState(record_data["state"])
record = OperationRecord(**record_data)
self._operation_history.append(record)
except FileNotFoundError:
print("未找到历史记录文件,创建新记录")
except Exception as e:
print(f"加载历史记录时出错: {e}")
def get_history(self,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
action_filter: Optional[List[str]] = None,
min_force: Optional[float] = None) -> List[OperationRecord]:
"""
获取历史记录
Args:
start_time: 开始时间戳
end_time: 结束时间戳
action_filter: 动作过滤器
min_force: 最小力度
Returns:
过滤后的历史记录
"""
filtered = self._operation_history
if start_time:
filtered = [r for r in filtered if r.timestamp >= start_time]
if end_time:
filtered = [r for r in filtered if r.timestamp <= end_time]
if action_filter:
filtered = [r for r in filtered if r.action in action_filter]
if min_force is not None:
filtered = [r for r in filtered if r.force >= min_force]
return filtered
def get_stats(self) -> Dict[str, Any]:
"""获取统计数据"""
stats = self._stats.copy()
stats.update({
"history_size": len(self._operation_history),
"uptime": time.time() - stats["last_maintenance"],
"current_position": self.position,
"current_state": self.state.value,
"current_force": self.current_force,
})
return stats
def clear_history(self, confirm: bool = False) -> bool:
"""清空历史记录"""
if not confirm:
print("请确认清空历史记录,调用时设置 confirm=True")
return False
self._operation_history.clear()
print("历史记录已清空")
return True
def export_report(self, filename: str = "claw_report"):
"""导出详细报告"""
stats = self.get_stats()
recent_operations = self.get_history()[-10:] # 最近10次操作
report = {
"report_generated": datetime.now().isoformat(),
"claw_id": self.claw_id,
"summary": {
"total_operations": stats["total_operations"],
"success_rate": (stats["successful_operations"] / stats["total_operations"] * 100
if stats["total_operations"] > 0 else 0),
"average_force": (stats["total_force_applied"] / stats["successful_operations"]
if stats["successful_operations"] > 0 else 0),
"uptime_hours": stats["uptime"] / 3600,
},
"config": self._config,
"recent_operations": [asdict(op) for op in recent_operations],
"performance_metrics": {
"operations_per_hour": stats["total_operations"] / (stats["uptime"] / 3600)
if stats["uptime"] > 0 else 0,
"error_rate": (stats["failed_operations"] / stats["total_operations"] * 100
if stats["total_operations"] > 0 else 0),
}
}
with open(f"{filename}.json", "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
return report
@staticmethod
def _calculate_distance(pos1: tuple, pos2: tuple) -> float:
"""计算两点间距离"""
return ((pos1[0] - pos2[0])**2 +
(pos1[1] - pos2[1])**2 +
(pos1[2] - pos2[2])**2) ** 0.5
def __str__(self) -> str:
"""字符串表示"""
return (f"OpenClaw(id={self.claw_id}, "
f"state={self.state.value}, "
f"position={self.position}, "
f"force={self.current_force}N, "
f"operations={self._stats['total_operations']})")
def __enter__(self):
"""上下文管理器入口"""
self.is_active = True
print(f"{self.claw_id} 已激活")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出"""
self.is_active = False
self.save_history()
print(f"{self.claw_id} 已停用,数据已保存")
def demo_openclaw():
"""演示如何使用 OpenClaw"""
# 创建机械爪实例
claw = OpenClaw(
claw_id="robot_arm_01",
max_history=500,
auto_save=True,
save_format="json"
)
# 使用上下文管理器确保资源正确释放
with claw:
# 执行一系列操作
claw.move_to(10.5, 5.2, 3.1)
claw.grip(8.5, object_type="cube")
claw.move_to(12.0, 6.0, 2.5)
claw.release()
# 获取统计数据
stats = claw.get_stats()
print("当前统计数据:", stats)
# 获取最近5次操作
recent_ops = claw.get_history()[-5:]
print(f"len(recent_ops)}次操作:")
for op in recent_ops:
print(f" - {op.datetime}: {op.action} at {op.position}")
# 导出报告
report = claw.export_report("demo_report")
print(f"报告已生成,包含 {len(report['recent_operations'])} 条操作记录")
# 保存最终状态
claw.save_history()
if __name__ == "__main__":
demo_openclaw()
主要功能特性:
数据保留功能
- 自动记录所有操作的时间戳、位置、力度、状态
- 支持操作元数据存储
- 可配置的历史记录上限
多种持久化方式
- JSON格式(人类可读)
- Pickle格式(Python对象)
- 自动加载上次保存的数据
智能查询功能
- 按时间范围查询
- 按操作类型过滤
- 按力度阈值筛选
统计分析
- 成功率统计
- 平均力度计算
- 运行时长统计
- 性能指标分析
报告生成
- 详细操作报告
- 性能分析报告
- 可导出为JSON文件
使用方法:
# 创建机械爪
claw = OpenClaw(claw_id="my_claw")
# 执行操作
claw.open()
claw.move_to(10, 20, 30)
claw.grip(15.0, object_type="bottle")
claw.release()
# 查询历史
history = claw.get_history(
start_time=time.time() - 3600, # 过去1小时
action_filter=["grip", "release"]
)
# 获取统计
stats = claw.get_stats()
# 导出报告
claw.export_report()
# 手动保存
claw.save_history("backup_data")
这个设计具有很好的扩展性,可以根据具体需求添加更多传感器数据、故障记录、机器学习预测等功能。
版权声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。