OpenClaw 本地部署与即时通讯集成实战
本文提供了OpenClaw本地部署与即时通讯集成的完整教程,包含以下内容: 5分钟快速部署: 提供一键部署脚本创建Python虚拟环境 安装核心依赖和配置项目结构 设置.env配置文件 微信集成方案: 使用itchat-uos库实现微信机器人 包含消息接收和处理逻辑 提供快速启动脚本 飞书集成方案: 基于Flask搭建Webhook接口 实现消息验证和处理逻辑 包含飞书API调用方法 教程特点:
目录
OpenClaw 本地部署与即时通讯集成实战
我将提供一个完整的保姆级教程,让你在10分钟内搭建起OpenClaw本地系统,并接入微信/飞书/钉钉/QQ等平台。
📦 一、快速部署(5分钟)
步骤1:一键部署脚本
# 创建项目目录
mkdir openclaw-bot && cd openclaw-bot
# 创建一键部署脚本
cat > deploy.sh << 'EOF'
#!/bin/bash
echo "🚀 OpenClaw 一键部署开始..."
# 1. 创建Python虚拟环境
python3.9 -m venv venv
source venv/bin/activate
# 2. 安装核心依赖
pip install --upgrade pip
pip install open-interpreter
pip install python-dotenv
pip install playwright
pip install selenium
pip install requests
playwright install
# 3. 创建项目结构
mkdir -p config logs plugins tasks screenshots
# 4. 创建配置文件
cat > .env << 'ENV'
# OpenClaw配置
OPENCLAW_MODEL=gpt-4o-mini
OPENCLAW_API_KEY=${your_openai_api_key}
# 各平台配置
WECHAT_AUTO_REPLY=true
DINGTALK_WEBHOOK=
FEISHU_WEBHOOK=
QQ_BOT_ID=
# 自动化配置
SCREENSHOT_PATH=./screenshots
LOG_LEVEL=INFO
ENV
echo "✅ 基础环境部署完成!"
EOF
# 赋予执行权限并运行
chmod +x deploy.sh
./deploy.sh
步骤2:快速配置
# 创建核心配置文件
cat > config.py << 'EOF'
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# OpenClaw配置
MODEL = os.getenv('OPENCLAW_MODEL', 'gpt-4o-mini')
API_KEY = os.getenv('OPENCLAW_API_KEY')
# 各平台Webhook
DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '')
FEISHU_WEBHOOK = os.getenv('FEISHU_WEBHOOK', '')
# 微信配置(使用itchat-uos)
WECHAT_ENABLED = os.getenv('WECHAT_AUTO_REPLY', 'true').lower() == 'true'
# QQ配置(使用Mirai)
QQ_BOT_ID = os.getenv('QQ_BOT_ID', '')
# 路径配置
SCREENSHOT_DIR = os.getenv('SCREENSHOT_PATH', './screenshots')
LOG_DIR = './logs'
@staticmethod
def validate():
if not Config.API_KEY:
raise ValueError("请设置OPENCLAW_API_KEY环境变量")
EOF
💬 二、微信集成(2分钟)
使用 itchat-uos(最简方案)
# wechat_bot.py
import itchat
import asyncio
from openclaw_core import OpenClawCore
import json
class WeChatBot:
def __init__(self):
self.claw = OpenClawCore()
def start(self):
"""启动微信机器人"""
@itchat.msg_register(itchat.content.TEXT)
def text_reply(msg):
print(f"收到消息: {msg['Text']} from {msg['FromUserName']}")
# 异步处理消息
asyncio.create_task(self.handle_message(msg))
return "正在处理您的请求..."
# 登录
itchat.auto_login(hotReload=True, enableCmdQR=2)
print("✅ 微信登录成功!")
itchat.run()
async def handle_message(self, msg):
"""处理消息"""
try:
user_msg = msg['Text']
user_id = msg['FromUserName']
# 调用OpenClaw处理
response = await self.claw.process_command(user_msg)
# 回复用户
itchat.send(response, toUserName=user_id)
except Exception as e:
error_msg = f"处理失败: {str(e)}"
itchat.send(error_msg, toUserName=msg['FromUserName'])
# 运行微信机器人
if __name__ == "__main__":
bot = WeChatBot()
bot.start()
快速启动微信机器人
# 安装itchat-uos
pip install itchat-uos==1.5.0.dev0
# 创建快速启动脚本
cat > start_wechat.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
echo "📱 启动微信机器人..."
echo "请使用手机微信扫描二维码登录"
python wechat_bot.py
EOF
chmod +x start_wechat.sh
📱 三、飞书集成(1分钟)
# feishu_bot.py
from flask import Flask, request, jsonify
import requests
import json
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
# 飞书验证令牌
VERIFICATION_TOKEN = "your_verification_token"
@app.route('/feishu', methods=['POST'])
def feishu_webhook():
"""飞书Webhook接口"""
data = request.json
# 验证请求
if data.get("token") != VERIFICATION_TOKEN:
return jsonify({"error": "Invalid token"}), 403
# 处理消息
if data.get("type") == "url_verification":
return jsonify({"challenge": data.get("challenge")})
elif data.get("type") == "event_callback":
event = data.get("event")
if event.get("message_type") == "text":
text = event.get("text", "")
user_id = event.get("sender", {}).get("user_id")
# 异步处理
asyncio.create_task(process_feishu_message(text, user_id))
return jsonify({"success": True})
async def process_feishu_message(text, user_id):
"""处理飞书消息"""
try:
# 调用OpenClaw
result = await claw.process_command(text)
# 回复到飞书(需要飞书机器人API)
await reply_to_feishu(user_id, result)
except Exception as e:
print(f"飞书处理失败: {e}")
async def reply_to_feishu(user_id, message):
"""回复飞书消息"""
# 这里需要飞书机器人的API密钥
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/your_token"
payload = {
"msg_type": "text",
"content": {
"text": message
}
}
requests.post(webhook_url, json=payload)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
飞书快速配置脚本
# feishu_setup.sh
#!/bin/bash
echo "🔧 配置飞书机器人:"
echo "1. 访问 https://open.feishu.cn/app"
echo "2. 创建新应用"
echo "3. 启用机器人能力"
echo "4. 复制Webhook地址到 .env 文件的 FEISHU_WEBHOOK"
echo ""
echo "📌 获取Webhook URL后运行:"
echo "export FEISHU_WEBHOOK='你的Webhook地址'"
echo "python feishu_bot.py"
📌 四、钉钉集成(1分钟)
# dingtalk_bot.py
from flask import Flask, request, jsonify
import requests
import hashlib
import base64
import hmac
import time
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
class DingTalkBot:
def __init__(self, app_secret, access_token):
self.app_secret = app_secret
self.access_token = access_token
def sign(self, timestamp):
"""生成签名"""
string_to_sign = f'{timestamp}\n{self.app_secret}'
hmac_code = hmac.new(
self.app_secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
return base64.b64encode(hmac_code).decode('utf-8')
async def send_message(self, user_id, message):
"""发送钉钉消息"""
timestamp = str(round(time.time() * 1000))
sign = self.sign(timestamp)
url = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}×tamp={timestamp}&sign={sign}'
payload = {
"msgtype": "text",
"text": {
"content": message
},
"at": {
"atUserIds": [user_id]
}
}
response = requests.post(url, json=payload)
return response.json()
@app.route('/dingtalk', methods=['POST'])
def dingtalk_webhook():
"""钉钉Webhook接口"""
data = request.json
if data.get('msgtype') == 'text':
text = data.get('text', {}).get('content', '')
sender_id = data.get('senderStaffId', '')
# 异步处理
asyncio.create_task(process_dingtalk_message(text, sender_id))
return jsonify({"msgtype": "text", "text": {"content": "处理中..."}})
async def process_dingtalk_message(text, user_id):
"""处理钉钉消息"""
try:
# 配置钉钉机器人
bot = DingTalkBot(
app_secret=os.getenv('DINGTALK_APP_SECRET'),
access_token=os.getenv('DINGTALK_ACCESS_TOKEN')
)
# 调用OpenClaw
result = await claw.process_command(text)
# 回复消息
await bot.send_message(user_id, result)
except Exception as e:
print(f"钉钉处理失败: {e}")
if __name__ == '__main__':
app.run(port=5001)
🎮 五、QQ集成(1分钟)
使用 go-cqhttp(推荐)
# 下载 go-cqhttp
wget https://github.com/Mrs4s/go-cqhttp/releases/download/v1.0.0-rc3/go-cqhttp_linux_amd64.tar.gz
tar -zxvf go-cqhttp_linux_amd64.tar.gz
cd go-cqhttp
# 生成配置文件
./go-cqhttp
# 选择 3 (HTTP通信) 和 0 (正向WebSocket)
QQ机器人服务端
# qq_bot.py
from flask import Flask, request, jsonify
import websocket
import json
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
class QQBot:
def __init__(self, ws_url="ws://127.0.0.1:6700"):
self.ws_url = ws_url
self.ws = None
def connect(self):
"""连接WebSocket"""
self.ws = websocket.WebSocketApp(
self.ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
self.ws.on_open = self.on_open
self.ws.run_forever()
def on_message(self, ws, message):
"""处理收到的消息"""
data = json.loads(message)
if data.get('post_type') == 'message':
# 异步处理消息
asyncio.create_task(self.handle_qq_message(data))
async def handle_qq_message(self, data):
"""处理QQ消息"""
try:
message = data.get('message', '')
user_id = data.get('user_id')
group_id = data.get('group_id')
# 调用OpenClaw
result = await claw.process_command(message)
# 回复消息
if group_id:
self.send_group_message(group_id, result)
else:
self.send_private_message(user_id, result)
except Exception as e:
print(f"QQ消息处理失败: {e}")
def send_group_message(self, group_id, message):
"""发送群消息"""
payload = {
"action": "send_group_msg",
"params": {
"group_id": group_id,
"message": message
}
}
self.ws.send(json.dumps(payload))
def send_private_message(self, user_id, message):
"""发送私聊消息"""
payload = {
"action": "send_private_msg",
"params": {
"user_id": user_id,
"message": message
}
}
self.ws.send(json.dumps(payload))
def on_error(self, ws, error):
print(f"WebSocket错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket连接关闭")
def on_open(self, ws):
print("✅ QQ机器人已连接")
@app.route('/qq', methods=['POST'])
def qq_http():
"""HTTP模式下的QQ消息接口"""
data = request.json
if data.get('post_type') == 'message':
message = data.get('message', '')
asyncio.create_task(claw.process_command(message))
return jsonify({"status": "ok"})
if __name__ == '__main__':
# 启动WebSocket连接
bot = QQBot()
bot.connect()
# 启动HTTP服务器
app.run(port=5002)
🧠 六、OpenClaw核心实现
# openclaw_core.py
import os
import asyncio
import base64
from typing import Optional
import openai
from config import Config
import pyautogui
from PIL import Image
import io
class OpenClawCore:
"""OpenClaw核心引擎"""
def __init__(self):
self.client = openai.OpenAI(api_key=Config.API_KEY)
self.model = Config.MODEL
# 初始化自动化工具
pyautogui.FAILSAFE = True
async def process_command(self, command: str) -> str:
"""处理用户命令"""
try:
print(f"处理命令: {command}")
# 1. 分析命令类型
command_type = self.analyze_command_type(command)
# 2. 根据类型执行
if command_type == "screen_operation":
return await self.handle_screen_operation(command)
elif command_type == "browser_operation":
return await self.handle_browser_operation(command)
elif command_type == "file_operation":
return await self.handle_file_operation(command)
else:
return await self.handle_general_query(command)
except Exception as e:
return f"❌ 执行失败: {str(e)}"
def analyze_command_type(self, command: str) -> str:
"""分析命令类型"""
command = command.lower()
if any(word in command for word in ['点击', '打开', '关闭', '截图', '输入', '查找']):
return "screen_operation"
elif any(word in command for word in ['网页', '网站', '浏览器', '搜索']):
return "browser_operation"
elif any(word in command for word in ['文件', '文档', '保存', '读取']):
return "file_operation"
else:
return "general_query"
async def handle_screen_operation(self, command: str) -> str:
"""处理屏幕操作"""
try:
# 截图当前屏幕
screenshot = pyautogui.screenshot()
# 保存截图
timestamp = asyncio.get_event_loop().time()
screenshot_path = f"{Config.SCREENSHOT_DIR}/screen_{timestamp}.png"
screenshot.save(screenshot_path)
# 转换为base64
buffered = io.BytesIO()
screenshot.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
# 调用GPT分析截图并执行操作
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": """你是一个屏幕操作助手。
分析用户指令和当前屏幕截图,返回要执行的操作步骤。
操作格式:click(x,y) 或 type(text) 或 press(key)"""},
{"role": "user", "content": [
{"type": "text", "text": f"指令: {command}"},
{"type": "image_url", "image_url": {
"url": f"data:image/png;base64,{img_base64}"
}}
]}
]
)
# 解析并执行操作
action_text = response.choices[0].message.content
return await self.execute_action(action_text)
except Exception as e:
return f"屏幕操作失败: {e}"
async def execute_action(self, action_text: str) -> str:
"""执行操作指令"""
try:
# 解析操作(简化版)
if action_text.startswith("click("):
# 提取坐标
coords = action_text[6:-1].split(",")
x, y = int(coords[0]), int(coords[1])
pyautogui.click(x, y)
return f"✅ 已点击位置 ({x}, {y})"
elif action_text.startswith("type("):
text = action_text[5:-1]
pyautogui.write(text)
return f"✅ 已输入: {text}"
elif action_text.startswith("press("):
key = action_text[6:-1]
pyautogui.press(key)
return f"✅ 已按下按键: {key}"
else:
return f"✅ 操作完成: {action_text}"
except Exception as e:
return f"❌ 操作执行失败: {e}"
async def handle_browser_operation(self, command: str) -> str:
"""处理浏览器操作"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
driver = None
try:
# 启动浏览器
driver = webdriver.Chrome()
# 根据指令执行操作
if "打开" in command and "http" in command:
# 提取URL
import re
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', command)
if urls:
driver.get(urls[0])
return f"✅ 已打开: {urls[0]}"
elif "搜索" in command:
# 提取搜索关键词
search_term = command.split("搜索")[-1].strip()
driver.get(f"https://www.google.com/search?q={search_term}")
return f"✅ 已搜索: {search_term}"
return "✅ 浏览器操作完成"
except Exception as e:
return f"浏览器操作失败: {e}"
finally:
if driver:
driver.quit()
async def handle_general_query(self, query: str) -> str:
"""处理一般查询"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是一个智能助手,帮助用户完成各种任务。"},
{"role": "user", "content": query}
]
)
return response.choices[0].message.content
🚀 七、一键启动脚本
# start_all.sh
#!/bin/bash
echo "🤖 启动所有机器人服务..."
echo "========================"
# 1. 检查环境
if [ ! -d "venv" ]; then
echo "❌ 请先运行 ./deploy.sh 安装环境"
exit 1
fi
source venv/bin/activate
# 2. 创建日志目录
mkdir -p logs
# 3. 启动各服务(使用screen/tmux)
echo "1. 启动微信机器人..."
screen -dmS wechat_bot python wechat_bot.py
echo "2. 启动飞书机器人..."
screen -dmS feishu_bot python feishu_bot.py
echo "3. 启动钉钉机器人..."
screen -dmS dingtalk_bot python dingtalk_bot.py
echo "4. 启动QQ机器人..."
screen -dmS qq_bot python qq_bot.py
echo ""
echo "✅ 所有服务已启动!"
echo ""
echo "📋 运行状态:"
echo "微信机器人: screen -r wechat_bot"
echo "飞书机器人: screen -r feishu_bot"
echo "钉钉机器人: screen -r dingtalk_bot"
echo "QQ机器人: screen -r qq_bot"
echo ""
echo "📝 查看日志:"
echo "tail -f logs/*.log"
📊 八、监控和管理面板
# dashboard.py
from flask import Flask, render_template_string
import psutil
import json
from datetime import datetime
app = Flask(__name__)
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>OpenClaw 监控面板</title>
<style>
body { font-family: Arial; margin: 20px; }
.card {
border: 1px solid #ddd;
padding: 20px;
margin: 10px;
border-radius: 5px;
display: inline-block;
width: 200px;
}
.status-online { color: green; }
.status-offline { color: red; }
</style>
</head>
<body>
<h1>🤖 OpenClaw 监控面板</h1>
<div class="card">
<h3>系统状态</h3>
<p>CPU: {{ cpu_percent }}%</p>
<p>内存: {{ memory_percent }}%</p>
<p>运行时间: {{ uptime }}</p>
</div>
<div class="card">
<h3>微信机器人</h3>
<p class="status-{{ wechat_status }}">{{ wechat_status|upper }}</p>
<p>消息数: {{ wechat_stats.count }}</p>
</div>
<div class="card">
<h3>飞书机器人</h3>
<p class="status-{{ feishu_status }}">{{ feishu_status|upper }}</p>
<p>消息数: {{ feishu_stats.count }}</p>
</div>
<div class="card">
<h3>最近任务</h3>
<ul>
{% for task in recent_tasks %}
<li>{{ task.time }}: {{ task.command }}</li>
{% endfor %}
</ul>
</div>
<div>
<h3>快速命令</h3>
<input type="text" id="command" placeholder="输入命令...">
<button onclick="sendCommand()">执行</button>
</div>
<script>
function sendCommand() {
var command = document.getElementById('command').value;
fetch('/api/command', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({command: command})
});
}
// 自动刷新
setInterval(() => location.reload(), 30000);
</script>
</body>
</html>
"""
@app.route('/')
def dashboard():
"""监控面板"""
return render_template_string(HTML_TEMPLATE,
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
uptime=str(datetime.now() - datetime.fromtimestamp(psutil.boot_time())),
wechat_status="online",
feishu_status="online",
wechat_stats={"count": 123},
feishu_stats={"count": 456},
recent_tasks=[
{"time": "10:30", "command": "打开浏览器"},
{"time": "10:31", "command": "搜索信息"},
{"time": "10:32", "command": "保存文件"}
]
)
@app.route('/api/command', methods=['POST'])
def api_command():
"""API命令接口"""
import json
data = request.json
# 这里可以调用OpenClaw处理命令
return jsonify({"success": True, "message": "命令已接收"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=True)
📝 九、完整部署检查清单
# checklist.sh
#!/bin/bash
echo "🔍 部署检查清单"
echo "================"
check() {
if command -v $1 &> /dev/null; then
echo "✅ $1: 已安装"
else
echo "❌ $1: 未安装"
fi
}
echo ""
echo "1. 基础环境检查:"
check python3.9
check pip
check git
echo ""
echo "2. Python包检查:"
source venv/bin/activate
for pkg in "openai" "flask" "itchat-uos" "selenium" "playwright"; do
if python -c "import $pkg" 2>/dev/null; then
echo "✅ $pkg: 已安装"
else
echo "❌ $pkg: 未安装"
fi
done
echo ""
echo "3. 配置文件检查:"
if [ -f ".env" ]; then
echo "✅ .env: 存在"
if grep -q "OPENCLAW_API_KEY" .env; then
echo "✅ API Key: 已配置"
else
echo "❌ API Key: 未配置"
fi
else
echo "❌ .env: 不存在"
fi
echo ""
echo "4. 目录结构检查:"
for dir in "logs" "screenshots" "plugins" "tasks"; do
if [ -d "$dir" ]; then
echo "✅ $dir/: 存在"
else
echo "❌ $dir/: 不存在"
fi
done
echo ""
echo "📌 下一步:"
echo "1. 获取OpenAI API Key: https://platform.openai.com/api-keys"
echo "2. 编辑 .env 文件填入API_KEY"
echo "3. 运行 ./start_all.sh 启动所有服务"
🎯 十、实用命令示例
# 1. 查看所有运行的服务
screen -ls
# 2. 查看特定服务日志
tail -f logs/wechat.log
# 3. 重启服务
screen -X -S wechat_bot quit
screen -dmS wechat_bot python wechat_bot.py
# 4. 测试OpenClaw功能
echo "测试命令:"
echo "1. '截图当前屏幕'"
echo "2. '打开浏览器访问百度'"
echo "3. '搜索人工智能最新进展'"
echo "4. '点击屏幕左上角'"
echo "5. '输入Hello World'"
# 5. 更新所有依赖
pip install --upgrade -r requirements.txt
🚨 注意事项
-
安全性:
- 不要将API密钥上传到GitHub
- 使用防火墙限制访问端口
- 定期更新依赖包
-
稳定性:
- 使用screen/tmux保持服务运行
- 设置日志轮转防止日志过大
- 添加异常重试机制
-
性能优化:
- 截图压缩减少API调用大小
- 使用模型缓存减少重复计算
- 异步处理避免阻塞
-
平台限制:
- 微信网页版可能被封号(建议用小号测试)
- 钉钉/飞书需要企业认证
- QQ机器人需要服务器部署
这个保姆级教程包含了从零开始部署OpenClaw并集成到各大即时通讯平台的完整流程。按照步骤操作,10分钟内即可搭建起可用的自动化系统!
OpenClaw 本地部署与即时通讯集成实战
我将提供一个完整的保姆级教程,让你在10分钟内搭建起OpenClaw本地系统,并接入微信/飞书/钉钉/QQ等平台。
📦 一、快速部署(5分钟)
步骤1:一键部署脚本
# 创建项目目录
mkdir openclaw-bot && cd openclaw-bot
# 创建一键部署脚本
cat > deploy.sh << 'EOF'
#!/bin/bash
echo "🚀 OpenClaw 一键部署开始..."
# 1. 创建Python虚拟环境
python3.9 -m venv venv
source venv/bin/activate
# 2. 安装核心依赖
pip install --upgrade pip
pip install open-interpreter
pip install python-dotenv
pip install playwright
pip install selenium
pip install requests
playwright install
# 3. 创建项目结构
mkdir -p config logs plugins tasks screenshots
# 4. 创建配置文件
cat > .env << 'ENV'
# OpenClaw配置
OPENCLAW_MODEL=gpt-4o-mini
OPENCLAW_API_KEY=${your_openai_api_key}
# 各平台配置
WECHAT_AUTO_REPLY=true
DINGTALK_WEBHOOK=
FEISHU_WEBHOOK=
QQ_BOT_ID=
# 自动化配置
SCREENSHOT_PATH=./screenshots
LOG_LEVEL=INFO
ENV
echo "✅ 基础环境部署完成!"
EOF
# 赋予执行权限并运行
chmod +x deploy.sh
./deploy.sh
步骤2:快速配置
# OpenClaw 本地部署与即时通讯集成实战
我将提供一个完整的保姆级教程,让你在10分钟内搭建起OpenClaw本地系统,并接入微信/飞书/钉钉/QQ等平台。
## 📦 一、快速部署(5分钟)
### 步骤1:一键部署脚本
```bash
# 创建项目目录
mkdir openclaw-bot && cd openclaw-bot
# 创建一键部署脚本
cat > deploy.sh << 'EOF'
#!/bin/bash
echo "🚀 OpenClaw 一键部署开始..."
# 1. 创建Python虚拟环境
python3.9 -m venv venv
source venv/bin/activate
# 2. 安装核心依赖
pip install --upgrade pip
pip install open-interpreter
pip install python-dotenv
pip install playwright
pip install selenium
pip install requests
playwright install
# 3. 创建项目结构
mkdir -p config logs plugins tasks screenshots
# 4. 创建配置文件
cat > .env << 'ENV'
# OpenClaw配置
OPENCLAW_MODEL=gpt-4o-mini
OPENCLAW_API_KEY=${your_openai_api_key}
# 各平台配置
WECHAT_AUTO_REPLY=true
DINGTALK_WEBHOOK=
FEISHU_WEBHOOK=
QQ_BOT_ID=
# 自动化配置
SCREENSHOT_PATH=./screenshots
LOG_LEVEL=INFO
ENV
echo "✅ 基础环境部署完成!"
EOF
# 赋予执行权限并运行
chmod +x deploy.sh
./deploy.sh
步骤2:快速配置
# 创建核心配置文件
cat > config.py << 'EOF'
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# OpenClaw配置
MODEL = os.getenv('OPENCLAW_MODEL', 'gpt-4o-mini')
API_KEY = os.getenv('OPENCLAW_API_KEY')
# 各平台Webhook
DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '')
FEISHU_WEBHOOK = os.getenv('FEISHU_WEBHOOK', '')
# 微信配置(使用itchat-uos)
WECHAT_ENABLED = os.getenv('WECHAT_AUTO_REPLY', 'true').lower() == 'true'
# QQ配置(使用Mirai)
QQ_BOT_ID = os.getenv('QQ_BOT_ID', '')
# 路径配置
SCREENSHOT_DIR = os.getenv('SCREENSHOT_PATH', './screenshots')
LOG_DIR = './logs'
@staticmethod
def validate():
if not Config.API_KEY:
raise ValueError("请设置OPENCLAW_API_KEY环境变量")
EOF
💬 二、微信集成(2分钟)
使用 itchat-uos(最简方案)
# wechat_bot.py
import itchat
import asyncio
from openclaw_core import OpenClawCore
import json
class WeChatBot:
def __init__(self):
self.claw = OpenClawCore()
def start(self):
"""启动微信机器人"""
@itchat.msg_register(itchat.content.TEXT)
def text_reply(msg):
print(f"收到消息: {msg['Text']} from {msg['FromUserName']}")
# 异步处理消息
asyncio.create_task(self.handle_message(msg))
return "正在处理您的请求..."
# 登录
itchat.auto_login(hotReload=True, enableCmdQR=2)
print("✅ 微信登录成功!")
itchat.run()
async def handle_message(self, msg):
"""处理消息"""
try:
user_msg = msg['Text']
user_id = msg['FromUserName']
# 调用OpenClaw处理
response = await self.claw.process_command(user_msg)
# 回复用户
itchat.send(response, toUserName=user_id)
except Exception as e:
error_msg = f"处理失败: {str(e)}"
itchat.send(error_msg, toUserName=msg['FromUserName'])
# 运行微信机器人
if __name__ == "__main__":
bot = WeChatBot()
bot.start()
快速启动微信机器人
# 安装itchat-uos
pip install itchat-uos==1.5.0.dev0
# 创建快速启动脚本
cat > start_wechat.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
echo "📱 启动微信机器人..."
echo "请使用手机微信扫描二维码登录"
python wechat_bot.py
EOF
chmod +x start_wechat.sh
📱 三、飞书集成(1分钟)
# feishu_bot.py
from flask import Flask, request, jsonify
import requests
import json
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
# 飞书验证令牌
VERIFICATION_TOKEN = "your_verification_token"
@app.route('/feishu', methods=['POST'])
def feishu_webhook():
"""飞书Webhook接口"""
data = request.json
# 验证请求
if data.get("token") != VERIFICATION_TOKEN:
return jsonify({"error": "Invalid token"}), 403
# 处理消息
if data.get("type") == "url_verification":
return jsonify({"challenge": data.get("challenge")})
elif data.get("type") == "event_callback":
event = data.get("event")
if event.get("message_type") == "text":
text = event.get("text", "")
user_id = event.get("sender", {}).get("user_id")
# 异步处理
asyncio.create_task(process_feishu_message(text, user_id))
return jsonify({"success": True})
async def process_feishu_message(text, user_id):
"""处理飞书消息"""
try:
# 调用OpenClaw
result = await claw.process_command(text)
# 回复到飞书(需要飞书机器人API)
await reply_to_feishu(user_id, result)
except Exception as e:
print(f"飞书处理失败: {e}")
async def reply_to_feishu(user_id, message):
"""回复飞书消息"""
# 这里需要飞书机器人的API密钥
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/your_token"
payload = {
"msg_type": "text",
"content": {
"text": message
}
}
requests.post(webhook_url, json=payload)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
飞书快速配置脚本
# feishu_setup.sh
#!/bin/bash
echo "🔧 配置飞书机器人:"
echo "1. 访问 https://open.feishu.cn/app"
echo "2. 创建新应用"
echo "3. 启用机器人能力"
echo "4. 复制Webhook地址到 .env 文件的 FEISHU_WEBHOOK"
echo ""
echo "📌 获取Webhook URL后运行:"
echo "export FEISHU_WEBHOOK='你的Webhook地址'"
echo "python feishu_bot.py"
📌 四、钉钉集成(1分钟)
# dingtalk_bot.py
from flask import Flask, request, jsonify
import requests
import hashlib
import base64
import hmac
import time
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
class DingTalkBot:
def __init__(self, app_secret, access_token):
self.app_secret = app_secret
self.access_token = access_token
def sign(self, timestamp):
"""生成签名"""
string_to_sign = f'{timestamp}\n{self.app_secret}'
hmac_code = hmac.new(
self.app_secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
return base64.b64encode(hmac_code).decode('utf-8')
async def send_message(self, user_id, message):
"""发送钉钉消息"""
timestamp = str(round(time.time() * 1000))
sign = self.sign(timestamp)
url = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}×tamp={timestamp}&sign={sign}'
payload = {
"msgtype": "text",
"text": {
"content": message
},
"at": {
"atUserIds": [user_id]
}
}
response = requests.post(url, json=payload)
return response.json()
@app.route('/dingtalk', methods=['POST'])
def dingtalk_webhook():
"""钉钉Webhook接口"""
data = request.json
if data.get('msgtype') == 'text':
text = data.get('text', {}).get('content', '')
sender_id = data.get('senderStaffId', '')
# 异步处理
asyncio.create_task(process_dingtalk_message(text, sender_id))
return jsonify({"msgtype": "text", "text": {"content": "处理中..."}})
async def process_dingtalk_message(text, user_id):
"""处理钉钉消息"""
try:
# 配置钉钉机器人
bot = DingTalkBot(
app_secret=os.getenv('DINGTALK_APP_SECRET'),
access_token=os.getenv('DINGTALK_ACCESS_TOKEN')
)
# 调用OpenClaw
result = await claw.process_command(text)
# 回复消息
await bot.send_message(user_id, result)
except Exception as e:
print(f"钉钉处理失败: {e}")
if __name__ == '__main__':
app.run(port=5001)
🎮 五、QQ集成(1分钟)
使用 go-cqhttp(推荐)
# 下载 go-cqhttp
wget https://github.com/Mrs4s/go-cqhttp/releases/download/v1.0.0-rc3/go-cqhttp_linux_amd64.tar.gz
tar -zxvf go-cqhttp_linux_amd64.tar.gz
cd go-cqhttp
# 生成配置文件
./go-cqhttp
# 选择 3 (HTTP通信) 和 0 (正向WebSocket)
QQ机器人服务端
# qq_bot.py
from flask import Flask, request, jsonify
import websocket
import json
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
class QQBot:
def __init__(self, ws_url="ws://127.0.0.1:6700"):
self.ws_url = ws_url
self.ws = None
def connect(self):
"""连接WebSocket"""
self.ws = websocket.WebSocketApp(
self.ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
self.ws.on_open = self.on_open
self.ws.run_forever()
def on_message(self, ws, message):
"""处理收到的消息"""
data = json.loads(message)
if data.get('post_type') == 'message':
# 异步处理消息
asyncio.create_task(self.handle_qq_message(data))
async def handle_qq_message(self, data):
"""处理QQ消息"""
try:
message = data.get('message', '')
user_id = data.get('user_id')
group_id = data.get('group_id')
# 调用OpenClaw
result = await claw.process_command(message)
# 回复消息
if group_id:
self.send_group_message(group_id, result)
else:
self.send_private_message(user_id, result)
except Exception as e:
print(f"QQ消息处理失败: {e}")
def send_group_message(self, group_id, message):
"""发送群消息"""
payload = {
"action": "send_group_msg",
"params": {
"group_id": group_id,
"message": message
}
}
self.ws.send(json.dumps(payload))
def send_private_message(self, user_id, message):
"""发送私聊消息"""
payload = {
"action": "send_private_msg",
"params": {
"user_id": user_id,
"message": message
}
}
self.ws.send(json.dumps(payload))
def on_error(self, ws, error):
print(f"WebSocket错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket连接关闭")
def on_open(self, ws):
print("✅ QQ机器人已连接")
@app.route('/qq', methods=['POST'])
def qq_http():
"""HTTP模式下的QQ消息接口"""
data = request.json
if data.get('post_type') == 'message':
message = data.get('message', '')
asyncio.create_task(claw.process_command(message))
return jsonify({"status": "ok"})
if __name__ == '__main__':
# 启动WebSocket连接
bot = QQBot()
bot.connect()
# 启动HTTP服务器
app.run(port=5002)
🧠 六、OpenClaw核心实现
# openclaw_core.py
import os
import asyncio
import base64
from typing import Optional
import openai
from config import Config
import pyautogui
from PIL import Image
import io
class OpenClawCore:
"""OpenClaw核心引擎"""
def __init__(self):
self.client = openai.OpenAI(api_key=Config.API_KEY)
self.model = Config.MODEL
# 初始化自动化工具
pyautogui.FAILSAFE = True
async def process_command(self, command: str) -> str:
"""处理用户命令"""
try:
print(f"处理命令: {command}")
# 1. 分析命令类型
command_type = self.analyze_command_type(command)
# 2. 根据类型执行
if command_type == "screen_operation":
return await self.handle_screen_operation(command)
elif command_type == "browser_operation":
return await self.handle_browser_operation(command)
elif command_type == "file_operation":
return await self.handle_file_operation(command)
else:
return await self.handle_general_query(command)
except Exception as e:
return f"❌ 执行失败: {str(e)}"
def analyze_command_type(self, command: str) -> str:
"""分析命令类型"""
command = command.lower()
if any(word in command for word in ['点击', '打开', '关闭', '截图', '输入', '查找']):
return "screen_operation"
elif any(word in command for word in ['网页', '网站', '浏览器', '搜索']):
return "browser_operation"
elif any(word in command for word in ['文件', '文档', '保存', '读取']):
return "file_operation"
else:
return "general_query"
async def handle_screen_operation(self, command: str) -> str:
"""处理屏幕操作"""
try:
# 截图当前屏幕
screenshot = pyautogui.screenshot()
# 保存截图
timestamp = asyncio.get_event_loop().time()
screenshot_path = f"{Config.SCREENSHOT_DIR}/screen_{timestamp}.png"
screenshot.save(screenshot_path)
# 转换为base64
buffered = io.BytesIO()
screenshot.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
# 调用GPT分析截图并执行操作
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": """你是一个屏幕操作助手。
分析用户指令和当前屏幕截图,返回要执行的操作步骤。
操作格式:click(x,y) 或 type(text) 或 press(key)"""},
{"role": "user", "content": [
{"type": "text", "text": f"指令: {command}"},
{"type": "image_url", "image_url": {
"url": f"data:image/png;base64,{img_base64}"
}}
]}
]
)
# 解析并执行操作
action_text = response.choices[0].message.content
return await self.execute_action(action_text)
except Exception as e:
return f"屏幕操作失败: {e}"
async def execute_action(self, action_text: str) -> str:
"""执行操作指令"""
try:
# 解析操作(简化版)
if action_text.startswith("click("):
# 提取坐标
coords = action_text[6:-1].split(",")
x, y = int(coords[0]), int(coords[1])
pyautogui.click(x, y)
return f"✅ 已点击位置 ({x}, {y})"
elif action_text.startswith("type("):
text = action_text[5:-1]
pyautogui.write(text)
return f"✅ 已输入: {text}"
elif action_text.startswith("press("):
key = action_text[6:-1]
pyautogui.press(key)
return f"✅ 已按下按键: {key}"
else:
return f"✅ 操作完成: {action_text}"
except Exception as e:
return f"❌ 操作执行失败: {e}"
async def handle_browser_operation(self, command: str) -> str:
"""处理浏览器操作"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
driver = None
try:
# 启动浏览器
driver = webdriver.Chrome()
# 根据指令执行操作
if "打开" in command and "http" in command:
# 提取URL
import re
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', command)
if urls:
driver.get(urls[0])
return f"✅ 已打开: {urls[0]}"
elif "搜索" in command:
# 提取搜索关键词
search_term = command.split("搜索")[-1].strip()
driver.get(f"https://www.google.com/search?q={search_term}")
return f"✅ 已搜索: {search_term}"
return "✅ 浏览器操作完成"
except Exception as e:
return f"浏览器操作失败: {e}"
finally:
if driver:
driver.quit()
async def handle_general_query(self, query: str) -> str:
"""处理一般查询"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是一个智能助手,帮助用户完成各种任务。"},
{"role": "user", "content": query}
]
)
return response.choices[0].message.content
🚀 七、一键启动脚本
# start_all.sh
#!/bin/bash
echo "🤖 启动所有机器人服务..."
echo "========================"
# 1. 检查环境
if [ ! -d "venv" ]; then
echo "❌ 请先运行 ./deploy.sh 安装环境"
exit 1
fi
source venv/bin/activate
# 2. 创建日志目录
mkdir -p logs
# 3. 启动各服务(使用screen/tmux)
echo "1. 启动微信机器人..."
screen -dmS wechat_bot python wechat_bot.py
echo "2. 启动飞书机器人..."
screen -dmS feishu_bot python feishu_bot.py
echo "3. 启动钉钉机器人..."
screen -dmS dingtalk_bot python dingtalk_bot.py
echo "4. 启动QQ机器人..."
screen -dmS qq_bot python qq_bot.py
echo ""
echo "✅ 所有服务已启动!"
echo ""
echo "📋 运行状态:"
echo "微信机器人: screen -r wechat_bot"
echo "飞书机器人: screen -r feishu_bot"
echo "钉钉机器人: screen -r dingtalk_bot"
echo "QQ机器人: screen -r qq_bot"
echo ""
echo "📝 查看日志:"
echo "tail -f logs/*.log"
📊 八、监控和管理面板
# dashboard.py
from flask import Flask, render_template_string
import psutil
import json
from datetime import datetime
app = Flask(__name__)
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>OpenClaw 监控面板</title>
<style>
body { font-family: Arial; margin: 20px; }
.card {
border: 1px solid #ddd;
padding: 20px;
margin: 10px;
border-radius: 5px;
display: inline-block;
width: 200px;
}
.status-online { color: green; }
.status-offline { color: red; }
</style>
</head>
<body>
<h1>🤖 OpenClaw 监控面板</h1>
<div class="card">
<h3>系统状态</h3>
<p>CPU: {{ cpu_percent }}%</p>
<p>内存: {{ memory_percent }}%</p>
<p>运行时间: {{ uptime }}</p>
</div>
<div class="card">
<h3>微信机器人</h3>
<p class="status-{{ wechat_status }}">{{ wechat_status|upper }}</p>
<p>消息数: {{ wechat_stats.count }}</p>
</div>
<div class="card">
<h3>飞书机器人</h3>
<p class="status-{{ feishu_status }}">{{ feishu_status|upper }}</p>
<p>消息数: {{ feishu_stats.count }}</p>
</div>
<div class="card">
<h3>最近任务</h3>
<ul>
{% for task in recent_tasks %}
<li>{{ task.time }}: {{ task.command }}</li>
{% endfor %}
</ul>
</div>
<div>
<h3>快速命令</h3>
<input type="text" id="command" placeholder="输入命令...">
<button onclick="sendCommand()">执行</button>
</div>
<script>
function sendCommand() {
var command = document.getElementById('command').value;
fetch('/api/command', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({command: command})
});
}
// 自动刷新
setInterval(() => location.reload(), 30000);
</script>
</body>
</html>
"""
@app.route('/')
def dashboard():
"""监控面板"""
return render_template_string(HTML_TEMPLATE,
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
uptime=str(datetime.now() - datetime.fromtimestamp(psutil.boot_time())),
wechat_status="online",
feishu_status="online",
wechat_stats={"count": 123},
feishu_stats={"count": 456},
recent_tasks=[
{"time": "10:30", "command": "打开浏览器"},
{"time": "10:31", "command": "搜索信息"},
{"time": "10:32", "command": "保存文件"}
]
)
@app.route('/api/command', methods=['POST'])
def api_command():
"""API命令接口"""
import json
data = request.json
# 这里可以调用OpenClaw处理命令
return jsonify({"success": True, "message": "命令已接收"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=True)
📝 九、完整部署检查清单
# checklist.sh
#!/bin/bash
echo "🔍 部署检查清单"
echo "================"
check() {
if command -v $1 &> /dev/null; then
echo "✅ $1: 已安装"
else
echo "❌ $1: 未安装"
fi
}
echo ""
echo "1. 基础环境检查:"
check python3.9
check pip
check git
echo ""
echo "2. Python包检查:"
source venv/bin/activate
for pkg in "openai" "flask" "itchat-uos" "selenium" "playwright"; do
if python -c "import $pkg" 2>/dev/null; then
echo "✅ $pkg: 已安装"
else
echo "❌ $pkg: 未安装"
fi
done
echo ""
echo "3. 配置文件检查:"
if [ -f ".env" ]; then
echo "✅ .env: 存在"
if grep -q "OPENCLAW_API_KEY" .env; then
echo "✅ API Key: 已配置"
else
echo "❌ API Key: 未配置"
fi
else
echo "❌ .env: 不存在"
fi
echo ""
echo "4. 目录结构检查:"
for dir in "logs" "screenshots" "plugins" "tasks"; do
if [ -d "$dir" ]; then
echo "✅ $dir/: 存在"
else
echo "❌ $dir/: 不存在"
fi
done
echo ""
echo "📌 下一步:"
echo "1. 获取OpenAI API Key: https://platform.openai.com/api-keys"
echo "2. 编辑 .env 文件填入API_KEY"
echo "3. 运行 ./start_all.sh 启动所有服务"
🎯 十、实用命令示例
# 1. 查看所有运行的服务
screen -ls
# 2. 查看特定服务日志
tail -f logs/wechat.log
# 3. 重启服务
screen -X -S wechat_bot quit
screen -dmS wechat_bot python wechat_bot.py
# 4. 测试OpenClaw功能
echo "测试命令:"
echo "1. '截图当前屏幕'"
echo "2. '打开浏览器访问百度'"
echo "3. '搜索人工智能最新进展'"
echo "4. '点击屏幕左上角'"
echo "5. '输入Hello World'"
# 5. 更新所有依赖
pip install --upgrade -r requirements.txt
🚨 注意事项
-
安全性:
- 不要将API密钥上传到GitHub
- 使用防火墙限制访问端口
- 定期更新依赖包
-
稳定性:
- 使用screen/tmux保持服务运行
- 设置日志轮转防止日志过大
- 添加异常重试机制
-
性能优化:
- 截图压缩减少API调用大小
- 使用模型缓存减少重复计算
- 异步处理避免阻塞
-
平台限制:
- 微信网页版可能被封号(建议用小号测试)
- 钉钉/飞书需要企业认证
- QQ机器人需要服务器部署
这个保姆级教程包含了从零开始部署OpenClaw并集成到各大即时通讯平台的完整流程。按照步骤操作,10分钟内即可搭建起可用的自动化系统!# 创建核心配置文件
cat > config.py << ‘EOF’
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# OpenClaw配置
MODEL = os.getenv(‘OPENCLAW_MODEL’, ‘gpt-4o-mini’)
API_KEY = os.getenv(‘OPENCLAW_API_KEY’)
# 各平台Webhook
DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '')
FEISHU_WEBHOOK = os.getenv('FEISHU_WEBHOOK', '')
# 微信配置(使用itchat-uos)
WECHAT_ENABLED = os.getenv('WECHAT_AUTO_REPLY', 'true').lower() == 'true'
# QQ配置(使用Mirai)
QQ_BOT_ID = os.getenv('QQ_BOT_ID', '')
# 路径配置
SCREENSHOT_DIR = os.getenv('SCREENSHOT_PATH', './screenshots')
LOG_DIR = './logs'
@staticmethod
def validate():
if not Config.API_KEY:
raise ValueError("请设置OPENCLAW_API_KEY环境变量")
EOF
## 💬 二、微信集成(2分钟)
### 使用 itchat-uos(最简方案)
```python
# wechat_bot.py
import itchat
import asyncio
from openclaw_core import OpenClawCore
import json
class WeChatBot:
def __init__(self):
self.claw = OpenClawCore()
def start(self):
"""启动微信机器人"""
@itchat.msg_register(itchat.content.TEXT)
def text_reply(msg):
print(f"收到消息: {msg['Text']} from {msg['FromUserName']}")
# 异步处理消息
asyncio.create_task(self.handle_message(msg))
return "正在处理您的请求..."
# 登录
itchat.auto_login(hotReload=True, enableCmdQR=2)
print("✅ 微信登录成功!")
itchat.run()
async def handle_message(self, msg):
"""处理消息"""
try:
user_msg = msg['Text']
user_id = msg['FromUserName']
# 调用OpenClaw处理
response = await self.claw.process_command(user_msg)
# 回复用户
itchat.send(response, toUserName=user_id)
except Exception as e:
error_msg = f"处理失败: {str(e)}"
itchat.send(error_msg, toUserName=msg['FromUserName'])
# 运行微信机器人
if __name__ == "__main__":
bot = WeChatBot()
bot.start()
快速启动微信机器人
# 安装itchat-uos
pip install itchat-uos==1.5.0.dev0
# 创建快速启动脚本
cat > start_wechat.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
echo "📱 启动微信机器人..."
echo "请使用手机微信扫描二维码登录"
python wechat_bot.py
EOF
chmod +x start_wechat.sh
📱 三、飞书集成(1分钟)
# feishu_bot.py
from flask import Flask, request, jsonify
import requests
import json
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
# 飞书验证令牌
VERIFICATION_TOKEN = "your_verification_token"
@app.route('/feishu', methods=['POST'])
def feishu_webhook():
"""飞书Webhook接口"""
data = request.json
# 验证请求
if data.get("token") != VERIFICATION_TOKEN:
return jsonify({"error": "Invalid token"}), 403
# 处理消息
if data.get("type") == "url_verification":
return jsonify({"challenge": data.get("challenge")})
elif data.get("type") == "event_callback":
event = data.get("event")
if event.get("message_type") == "text":
text = event.get("text", "")
user_id = event.get("sender", {}).get("user_id")
# 异步处理
asyncio.create_task(process_feishu_message(text, user_id))
return jsonify({"success": True})
async def process_feishu_message(text, user_id):
"""处理飞书消息"""
try:
# 调用OpenClaw
result = await claw.process_command(text)
# 回复到飞书(需要飞书机器人API)
await reply_to_feishu(user_id, result)
except Exception as e:
print(f"飞书处理失败: {e}")
async def reply_to_feishu(user_id, message):
"""回复飞书消息"""
# 这里需要飞书机器人的API密钥
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/your_token"
payload = {
"msg_type": "text",
"content": {
"text": message
}
}
requests.post(webhook_url, json=payload)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
飞书快速配置脚本
# feishu_setup.sh
#!/bin/bash
echo "🔧 配置飞书机器人:"
echo "1. 访问 https://open.feishu.cn/app"
echo "2. 创建新应用"
echo "3. 启用机器人能力"
echo "4. 复制Webhook地址到 .env 文件的 FEISHU_WEBHOOK"
echo ""
echo "📌 获取Webhook URL后运行:"
echo "export FEISHU_WEBHOOK='你的Webhook地址'"
echo "python feishu_bot.py"
📌 四、钉钉集成(1分钟)
# dingtalk_bot.py
from flask import Flask, request, jsonify
import requests
import hashlib
import base64
import hmac
import time
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
class DingTalkBot:
def __init__(self, app_secret, access_token):
self.app_secret = app_secret
self.access_token = access_token
def sign(self, timestamp):
"""生成签名"""
string_to_sign = f'{timestamp}\n{self.app_secret}'
hmac_code = hmac.new(
self.app_secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
return base64.b64encode(hmac_code).decode('utf-8')
async def send_message(self, user_id, message):
"""发送钉钉消息"""
timestamp = str(round(time.time() * 1000))
sign = self.sign(timestamp)
url = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}×tamp={timestamp}&sign={sign}'
payload = {
"msgtype": "text",
"text": {
"content": message
},
"at": {
"atUserIds": [user_id]
}
}
response = requests.post(url, json=payload)
return response.json()
@app.route('/dingtalk', methods=['POST'])
def dingtalk_webhook():
"""钉钉Webhook接口"""
data = request.json
if data.get('msgtype') == 'text':
text = data.get('text', {}).get('content', '')
sender_id = data.get('senderStaffId', '')
# 异步处理
asyncio.create_task(process_dingtalk_message(text, sender_id))
return jsonify({"msgtype": "text", "text": {"content": "处理中..."}})
async def process_dingtalk_message(text, user_id):
"""处理钉钉消息"""
try:
# 配置钉钉机器人
bot = DingTalkBot(
app_secret=os.getenv('DINGTALK_APP_SECRET'),
access_token=os.getenv('DINGTALK_ACCESS_TOKEN')
)
# 调用OpenClaw
result = await claw.process_command(text)
# 回复消息
await bot.send_message(user_id, result)
except Exception as e:
print(f"钉钉处理失败: {e}")
if __name__ == '__main__':
app.run(port=5001)
🎮 五、QQ集成(1分钟)
使用 go-cqhttp(推荐)
# 下载 go-cqhttp
wget https://github.com/Mrs4s/go-cqhttp/releases/download/v1.0.0-rc3/go-cqhttp_linux_amd64.tar.gz
tar -zxvf go-cqhttp_linux_amd64.tar.gz
cd go-cqhttp
# 生成配置文件
./go-cqhttp
# 选择 3 (HTTP通信) 和 0 (正向WebSocket)
QQ机器人服务端
# qq_bot.py
from flask import Flask, request, jsonify
import websocket
import json
import asyncio
from openclaw_core import OpenClawCore
app = Flask(__name__)
claw = OpenClawCore()
class QQBot:
def __init__(self, ws_url="ws://127.0.0.1:6700"):
self.ws_url = ws_url
self.ws = None
def connect(self):
"""连接WebSocket"""
self.ws = websocket.WebSocketApp(
self.ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
self.ws.on_open = self.on_open
self.ws.run_forever()
def on_message(self, ws, message):
"""处理收到的消息"""
data = json.loads(message)
if data.get('post_type') == 'message':
# 异步处理消息
asyncio.create_task(self.handle_qq_message(data))
async def handle_qq_message(self, data):
"""处理QQ消息"""
try:
message = data.get('message', '')
user_id = data.get('user_id')
group_id = data.get('group_id')
# 调用OpenClaw
result = await claw.process_command(message)
# 回复消息
if group_id:
self.send_group_message(group_id, result)
else:
self.send_private_message(user_id, result)
except Exception as e:
print(f"QQ消息处理失败: {e}")
def send_group_message(self, group_id, message):
"""发送群消息"""
payload = {
"action": "send_group_msg",
"params": {
"group_id": group_id,
"message": message
}
}
self.ws.send(json.dumps(payload))
def send_private_message(self, user_id, message):
"""发送私聊消息"""
payload = {
"action": "send_private_msg",
"params": {
"user_id": user_id,
"message": message
}
}
self.ws.send(json.dumps(payload))
def on_error(self, ws, error):
print(f"WebSocket错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket连接关闭")
def on_open(self, ws):
print("✅ QQ机器人已连接")
@app.route('/qq', methods=['POST'])
def qq_http():
"""HTTP模式下的QQ消息接口"""
data = request.json
if data.get('post_type') == 'message':
message = data.get('message', '')
asyncio.create_task(claw.process_command(message))
return jsonify({"status": "ok"})
if __name__ == '__main__':
# 启动WebSocket连接
bot = QQBot()
bot.connect()
# 启动HTTP服务器
app.run(port=5002)
🧠 六、OpenClaw核心实现
# openclaw_core.py
import os
import asyncio
import base64
from typing import Optional
import openai
from config import Config
import pyautogui
from PIL import Image
import io
class OpenClawCore:
"""OpenClaw核心引擎"""
def __init__(self):
self.client = openai.OpenAI(api_key=Config.API_KEY)
self.model = Config.MODEL
# 初始化自动化工具
pyautogui.FAILSAFE = True
async def process_command(self, command: str) -> str:
"""处理用户命令"""
try:
print(f"处理命令: {command}")
# 1. 分析命令类型
command_type = self.analyze_command_type(command)
# 2. 根据类型执行
if command_type == "screen_operation":
return await self.handle_screen_operation(command)
elif command_type == "browser_operation":
return await self.handle_browser_operation(command)
elif command_type == "file_operation":
return await self.handle_file_operation(command)
else:
return await self.handle_general_query(command)
except Exception as e:
return f"❌ 执行失败: {str(e)}"
def analyze_command_type(self, command: str) -> str:
"""分析命令类型"""
command = command.lower()
if any(word in command for word in ['点击', '打开', '关闭', '截图', '输入', '查找']):
return "screen_operation"
elif any(word in command for word in ['网页', '网站', '浏览器', '搜索']):
return "browser_operation"
elif any(word in command for word in ['文件', '文档', '保存', '读取']):
return "file_operation"
else:
return "general_query"
async def handle_screen_operation(self, command: str) -> str:
"""处理屏幕操作"""
try:
# 截图当前屏幕
screenshot = pyautogui.screenshot()
# 保存截图
timestamp = asyncio.get_event_loop().time()
screenshot_path = f"{Config.SCREENSHOT_DIR}/screen_{timestamp}.png"
screenshot.save(screenshot_path)
# 转换为base64
buffered = io.BytesIO()
screenshot.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
# 调用GPT分析截图并执行操作
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": """你是一个屏幕操作助手。
分析用户指令和当前屏幕截图,返回要执行的操作步骤。
操作格式:click(x,y) 或 type(text) 或 press(key)"""},
{"role": "user", "content": [
{"type": "text", "text": f"指令: {command}"},
{"type": "image_url", "image_url": {
"url": f"data:image/png;base64,{img_base64}"
}}
]}
]
)
# 解析并执行操作
action_text = response.choices[0].message.content
return await self.execute_action(action_text)
except Exception as e:
return f"屏幕操作失败: {e}"
async def execute_action(self, action_text: str) -> str:
"""执行操作指令"""
try:
# 解析操作(简化版)
if action_text.startswith("click("):
# 提取坐标
coords = action_text[6:-1].split(",")
x, y = int(coords[0]), int(coords[1])
pyautogui.click(x, y)
return f"✅ 已点击位置 ({x}, {y})"
elif action_text.startswith("type("):
text = action_text[5:-1]
pyautogui.write(text)
return f"✅ 已输入: {text}"
elif action_text.startswith("press("):
key = action_text[6:-1]
pyautogui.press(key)
return f"✅ 已按下按键: {key}"
else:
return f"✅ 操作完成: {action_text}"
except Exception as e:
return f"❌ 操作执行失败: {e}"
async def handle_browser_operation(self, command: str) -> str:
"""处理浏览器操作"""
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
driver = None
try:
# 启动浏览器
driver = webdriver.Chrome()
# 根据指令执行操作
if "打开" in command and "http" in command:
# 提取URL
import re
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', command)
if urls:
driver.get(urls[0])
return f"✅ 已打开: {urls[0]}"
elif "搜索" in command:
# 提取搜索关键词
search_term = command.split("搜索")[-1].strip()
driver.get(f"https://www.google.com/search?q={search_term}")
return f"✅ 已搜索: {search_term}"
return "✅ 浏览器操作完成"
except Exception as e:
return f"浏览器操作失败: {e}"
finally:
if driver:
driver.quit()
async def handle_general_query(self, query: str) -> str:
"""处理一般查询"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是一个智能助手,帮助用户完成各种任务。"},
{"role": "user", "content": query}
]
)
return response.choices[0].message.content
🚀 七、一键启动脚本
# start_all.sh
#!/bin/bash
echo "🤖 启动所有机器人服务..."
echo "========================"
# 1. 检查环境
if [ ! -d "venv" ]; then
echo "❌ 请先运行 ./deploy.sh 安装环境"
exit 1
fi
source venv/bin/activate
# 2. 创建日志目录
mkdir -p logs
# 3. 启动各服务(使用screen/tmux)
echo "1. 启动微信机器人..."
screen -dmS wechat_bot python wechat_bot.py
echo "2. 启动飞书机器人..."
screen -dmS feishu_bot python feishu_bot.py
echo "3. 启动钉钉机器人..."
screen -dmS dingtalk_bot python dingtalk_bot.py
echo "4. 启动QQ机器人..."
screen -dmS qq_bot python qq_bot.py
echo ""
echo "✅ 所有服务已启动!"
echo ""
echo "📋 运行状态:"
echo "微信机器人: screen -r wechat_bot"
echo "飞书机器人: screen -r feishu_bot"
echo "钉钉机器人: screen -r dingtalk_bot"
echo "QQ机器人: screen -r qq_bot"
echo ""
echo "📝 查看日志:"
echo "tail -f logs/*.log"
📊 八、监控和管理面板
# dashboard.py
from flask import Flask, render_template_string
import psutil
import json
from datetime import datetime
app = Flask(__name__)
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>OpenClaw 监控面板</title>
<style>
body { font-family: Arial; margin: 20px; }
.card {
border: 1px solid #ddd;
padding: 20px;
margin: 10px;
border-radius: 5px;
display: inline-block;
width: 200px;
}
.status-online { color: green; }
.status-offline { color: red; }
</style>
</head>
<body>
<h1>🤖 OpenClaw 监控面板</h1>
<div class="card">
<h3>系统状态</h3>
<p>CPU: {{ cpu_percent }}%</p>
<p>内存: {{ memory_percent }}%</p>
<p>运行时间: {{ uptime }}</p>
</div>
<div class="card">
<h3>微信机器人</h3>
<p class="status-{{ wechat_status }}">{{ wechat_status|upper }}</p>
<p>消息数: {{ wechat_stats.count }}</p>
</div>
<div class="card">
<h3>飞书机器人</h3>
<p class="status-{{ feishu_status }}">{{ feishu_status|upper }}</p>
<p>消息数: {{ feishu_stats.count }}</p>
</div>
<div class="card">
<h3>最近任务</h3>
<ul>
{% for task in recent_tasks %}
<li>{{ task.time }}: {{ task.command }}</li>
{% endfor %}
</ul>
</div>
<div>
<h3>快速命令</h3>
<input type="text" id="command" placeholder="输入命令...">
<button onclick="sendCommand()">执行</button>
</div>
<script>
function sendCommand() {
var command = document.getElementById('command').value;
fetch('/api/command', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({command: command})
});
}
// 自动刷新
setInterval(() => location.reload(), 30000);
</script>
</body>
</html>
"""
@app.route('/')
def dashboard():
"""监控面板"""
return render_template_string(HTML_TEMPLATE,
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
uptime=str(datetime.now() - datetime.fromtimestamp(psutil.boot_time())),
wechat_status="online",
feishu_status="online",
wechat_stats={"count": 123},
feishu_stats={"count": 456},
recent_tasks=[
{"time": "10:30", "command": "打开浏览器"},
{"time": "10:31", "command": "搜索信息"},
{"time": "10:32", "command": "保存文件"}
]
)
@app.route('/api/command', methods=['POST'])
def api_command():
"""API命令接口"""
import json
data = request.json
# 这里可以调用OpenClaw处理命令
return jsonify({"success": True, "message": "命令已接收"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=True)
📝 九、完整部署检查清单
# checklist.sh
#!/bin/bash
echo "🔍 部署检查清单"
echo "================"
check() {
if command -v $1 &> /dev/null; then
echo "✅ $1: 已安装"
else
echo "❌ $1: 未安装"
fi
}
echo ""
echo "1. 基础环境检查:"
check python3.9
check pip
check git
echo ""
echo "2. Python包检查:"
source venv/bin/activate
for pkg in "openai" "flask" "itchat-uos" "selenium" "playwright"; do
if python -c "import $pkg" 2>/dev/null; then
echo "✅ $pkg: 已安装"
else
echo "❌ $pkg: 未安装"
fi
done
echo ""
echo "3. 配置文件检查:"
if [ -f ".env" ]; then
echo "✅ .env: 存在"
if grep -q "OPENCLAW_API_KEY" .env; then
echo "✅ API Key: 已配置"
else
echo "❌ API Key: 未配置"
fi
else
echo "❌ .env: 不存在"
fi
echo ""
echo "4. 目录结构检查:"
for dir in "logs" "screenshots" "plugins" "tasks"; do
if [ -d "$dir" ]; then
echo "✅ $dir/: 存在"
else
echo "❌ $dir/: 不存在"
fi
done
echo ""
echo "📌 下一步:"
echo "1. 获取OpenAI API Key: https://platform.openai.com/api-keys"
echo "2. 编辑 .env 文件填入API_KEY"
echo "3. 运行 ./start_all.sh 启动所有服务"
🎯 十、实用命令示例
# 1. 查看所有运行的服务
screen -ls
# 2. 查看特定服务日志
tail -f logs/wechat.log
# 3. 重启服务
screen -X -S wechat_bot quit
screen -dmS wechat_bot python wechat_bot.py
# 4. 测试OpenClaw功能
echo "测试命令:"
echo "1. '截图当前屏幕'"
echo "2. '打开浏览器访问百度'"
echo "3. '搜索人工智能最新进展'"
echo "4. '点击屏幕左上角'"
echo "5. '输入Hello World'"
# 5. 更新所有依赖
pip install --upgrade -r requirements.txt
🚨 注意事项
-
安全性:
- 不要将API密钥上传到GitHub
- 使用防火墙限制访问端口
- 定期更新依赖包
-
稳定性:
- 使用screen/tmux保持服务运行
- 设置日志轮转防止日志过大
- 添加异常重试机制
-
性能优化:
- 截图压缩减少API调用大小
- 使用模型缓存减少重复计算
- 异步处理避免阻塞
-
平台限制:
- 微信网页版可能被封号(建议用小号测试)
- 钉钉/飞书需要企业认证
- QQ机器人需要服务器部署
这个保姆级教程包含了从零开始部署OpenClaw并集成到各大即时通讯平台的完整流程。按照步骤操作,10分钟内即可搭建起可用的自动化系统!
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)