将 AI小龙虾OPENCLAW 与 MongoDB 集成,可以通过以下步骤实现数据存储、检索和管理,以下是具体方法和代码示例:

环境准备
安装依赖
pip install pymongo
连接 MongoDB
基本连接配置
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
# 连接远程MongoDB(需认证)
# client = MongoClient('mongodb://username:password@host:port/')
# 选择数据库
db = client['openclaw_db']
# 选择集合(类似表)
collection = db['claw_data']
数据模型设计
根据 OPENCLAW 的业务需求设计文档结构,
claw_document = {
"task_id": "12345",
"timestamp": "2023-10-01T12:00:00Z",
"input_data": {"url": "https://example.com", "depth": 2},
"output_data": {
"crawled_pages": [
{"url": "https://example.com/page1", "content": "..."},
{"url": "https://example.com/page2", "content": "..."}
],
"metadata": {"page_count": 10, "status": "completed"}
},
"status": "completed",
"processing_time": 5.2
}
核心操作示例
插入数据
# 插入单条数据
result = collection.insert_one(claw_document)
print(f"插入ID: {result.inserted_id}")
# 批量插入
documents = [doc1, doc2, doc3]
result = collection.insert_many(documents)
查询数据
# 查询所有数据
for doc in collection.find():
print(doc)
# 条件查询
query = {"status": "completed"}
results = collection.find(query)
# 查询特定字段
results = collection.find({}, {"task_id": 1, "timestamp": 1})
# 查询单条记录
doc = collection.find_one({"task_id": "12345"})
更新数据
# 更新单条
collection.update_one(
{"task_id": "12345"},
{"$set": {"status": "failed", "error": "timeout"}}
)
# 批量更新
collection.update_many(
{"status": "pending"},
{"$set": {"status": "processing"}}
)
删除数据
# 删除单条
collection.delete_one({"task_id": "12345"})
# 批量删除
collection.delete_many({"status": "failed"})
聚合查询
pipeline = [
{"$match": {"status": "completed"}},
{"$group": {"_id": None, "avg_time": {"$avg": "$processing_time"}}}
]
result = list(collection.aggregate(pipeline))
高级集成方案
1 异步支持(使用 Motor)
# 安装异步驱动
# pip install motor
import motor.motor_asyncio
import asyncio
async def async_example():
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client.openclaw_db
collection = db.claw_data
# 异步插入
await collection.insert_one(claw_document)
# 异步查询
cursor = collection.find({"status": "completed"})
async for doc in cursor:
print(doc)
2 数据索引优化
# 创建索引提升查询性能
collection.create_index("task_id", unique=True) # 唯一索引
collection.create_index([("timestamp", -1)]) # 时间倒序索引
collection.create_index([("status", 1), ("timestamp", -1)]) # 复合索引
3 错误处理与重连
from pymongo.errors import ConnectionFailure, OperationFailure
def safe_operation():
try:
client = MongoClient('mongodb://localhost:27017/',
serverSelectionTimeoutMS=5000)
client.server_info() # 触发连接测试
# 执行操作
collection.insert_one(data)
except ConnectionFailure as e:
print(f"连接失败: {e}")
# 重试逻辑
except OperationFailure as e:
print(f"操作失败: {e}")
4 数据备份与恢复
# 备份指定集合 mongodump --db openclaw_db --collection claw_data --out ./backup/ # 恢复数据 mongorestore --db openclaw_db ./backup/openclaw_db/
完整示例:OPENCLAW 数据处理器
class MongoDBStorage:
def __init__(self, uri='mongodb://localhost:27017/'):
self.client = MongoClient(uri)
self.db = self.client['openclaw_db']
self.collection = self.db['crawl_results']
def save_crawl_result(self, task_id, url, content, metadata=None):
document = {
"task_id": task_id,
"url": url,
"content": content,
"metadata": metadata or {},
"crawled_at": datetime.now(),
"processed": False
}
return self.collection.insert_one(document)
def get_pending_tasks(self, limit=100):
return list(self.collection.find(
{"processed": False},
limit=limit
))
def mark_as_processed(self, task_id):
self.collection.update_one(
{"task_id": task_id},
{"$set": {"processed": True}}
)
def get_stats(self):
pipeline = [
{"$group": {
"_id": None,
"total": {"$sum": 1},
"processed": {"$sum": {"$cond": ["$processed", 1, 0]}},
"avg_length": {"$avg": {"$strLenCP": "$content"}}
}}
]
return self.collection.aggregate(pipeline)
# 使用示例
storage = MongoDBStorage()
storage.save_crawl_result("task_001", "https://example.com", "<html>...")
pending = storage.get_pending_tasks()
stats = storage.get_stats()
最佳实践建议
- 连接池管理:复用 MongoDB 连接,避免频繁创建连接
- 数据分片:对于大数据量,考虑分片存储
- 读写分离:利用 MongoDB 的副本集实现读写分离
- 数据清理:设置 TTL 索引自动清理旧数据
collection.create_index("timestamp", expireAfterSeconds=604800) # 7天过期 - 监控告警:监控集合大小、查询性能等指标
故障排查
# 查看集合统计信息
print(db.command("collstats", "claw_data"))
# 查看当前操作
print(db.current_op())
# 检查索引使用情况
print(collection.index_information())
按照以上步骤,你可以将 OPENCLAW 与 MongoDB 高效集成,实现灵活的数据存储和检索功能,根据实际业务需求调整数据模型和查询逻辑。
版权声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。