目录

OpenClaw 本地部署与即时通讯集成实战

我将提供一个完整的保姆级教程,让你在10分钟内搭建起OpenClaw本地系统,并接入微信/飞书/钉钉/QQ等平台。

📦 一、快速部署(5分钟)

步骤1:一键部署脚本

# 创建项目目录
mkdir openclaw-bot && cd openclaw-bot

# 创建一键部署脚本
cat > deploy.sh << 'EOF'
#!/bin/bash

echo "🚀 OpenClaw 一键部署开始..."

# 1. 创建Python虚拟环境
python3.9 -m venv venv
source venv/bin/activate

# 2. 安装核心依赖
pip install --upgrade pip
pip install open-interpreter
pip install python-dotenv
pip install playwright
pip install selenium
pip install requests
playwright install

# 3. 创建项目结构
mkdir -p config logs plugins tasks screenshots

# 4. 创建配置文件
cat > .env << 'ENV'
# OpenClaw配置
OPENCLAW_MODEL=gpt-4o-mini
OPENCLAW_API_KEY=${your_openai_api_key}

# 各平台配置
WECHAT_AUTO_REPLY=true
DINGTALK_WEBHOOK=
FEISHU_WEBHOOK=
QQ_BOT_ID=

# 自动化配置
SCREENSHOT_PATH=./screenshots
LOG_LEVEL=INFO
ENV

echo "✅ 基础环境部署完成!"
EOF

# 赋予执行权限并运行
chmod +x deploy.sh
./deploy.sh

步骤2:快速配置

# 创建核心配置文件
cat > config.py << 'EOF'
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    # OpenClaw配置
    MODEL = os.getenv('OPENCLAW_MODEL', 'gpt-4o-mini')
    API_KEY = os.getenv('OPENCLAW_API_KEY')
    
    # 各平台Webhook
    DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '')
    FEISHU_WEBHOOK = os.getenv('FEISHU_WEBHOOK', '')
    
    # 微信配置(使用itchat-uos)
    WECHAT_ENABLED = os.getenv('WECHAT_AUTO_REPLY', 'true').lower() == 'true'
    
    # QQ配置(使用Mirai)
    QQ_BOT_ID = os.getenv('QQ_BOT_ID', '')
    
    # 路径配置
    SCREENSHOT_DIR = os.getenv('SCREENSHOT_PATH', './screenshots')
    LOG_DIR = './logs'
    
    @staticmethod
    def validate():
        if not Config.API_KEY:
            raise ValueError("请设置OPENCLAW_API_KEY环境变量")
EOF

💬 二、微信集成(2分钟)

使用 itchat-uos(最简方案)

# wechat_bot.py
import itchat
import asyncio
from openclaw_core import OpenClawCore
import json

class WeChatBot:
    def __init__(self):
        self.claw = OpenClawCore()
        
    def start(self):
        """启动微信机器人"""
        @itchat.msg_register(itchat.content.TEXT)
        def text_reply(msg):
            print(f"收到消息: {msg['Text']} from {msg['FromUserName']}")
            
            # 异步处理消息
            asyncio.create_task(self.handle_message(msg))
            return "正在处理您的请求..."
        
        # 登录
        itchat.auto_login(hotReload=True, enableCmdQR=2)
        print("✅ 微信登录成功!")
        itchat.run()

    async def handle_message(self, msg):
        """处理消息"""
        try:
            user_msg = msg['Text']
            user_id = msg['FromUserName']
            
            # 调用OpenClaw处理
            response = await self.claw.process_command(user_msg)
            
            # 回复用户
            itchat.send(response, toUserName=user_id)
            
        except Exception as e:
            error_msg = f"处理失败: {str(e)}"
            itchat.send(error_msg, toUserName=msg['FromUserName'])

# 运行微信机器人
if __name__ == "__main__":
    bot = WeChatBot()
    bot.start()

快速启动微信机器人

# 安装itchat-uos
pip install itchat-uos==1.5.0.dev0

# 创建快速启动脚本
cat > start_wechat.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
echo "📱 启动微信机器人..."
echo "请使用手机微信扫描二维码登录"
python wechat_bot.py
EOF

chmod +x start_wechat.sh

📱 三、飞书集成(1分钟)

# feishu_bot.py
from flask import Flask, request, jsonify
import requests
import json
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

# 飞书验证令牌
VERIFICATION_TOKEN = "your_verification_token"

@app.route('/feishu', methods=['POST'])
def feishu_webhook():
    """飞书Webhook接口"""
    data = request.json
    
    # 验证请求
    if data.get("token") != VERIFICATION_TOKEN:
        return jsonify({"error": "Invalid token"}), 403
    
    # 处理消息
    if data.get("type") == "url_verification":
        return jsonify({"challenge": data.get("challenge")})
    
    elif data.get("type") == "event_callback":
        event = data.get("event")
        if event.get("message_type") == "text":
            text = event.get("text", "")
            user_id = event.get("sender", {}).get("user_id")
            
            # 异步处理
            asyncio.create_task(process_feishu_message(text, user_id))
            
    return jsonify({"success": True})

async def process_feishu_message(text, user_id):
    """处理飞书消息"""
    try:
        # 调用OpenClaw
        result = await claw.process_command(text)
        
        # 回复到飞书(需要飞书机器人API)
        await reply_to_feishu(user_id, result)
        
    except Exception as e:
        print(f"飞书处理失败: {e}")

async def reply_to_feishu(user_id, message):
    """回复飞书消息"""
    # 这里需要飞书机器人的API密钥
    webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/your_token"
    
    payload = {
        "msg_type": "text",
        "content": {
            "text": message
        }
    }
    
    requests.post(webhook_url, json=payload)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

飞书快速配置脚本

# feishu_setup.sh
#!/bin/bash
echo "🔧 配置飞书机器人:"
echo "1. 访问 https://open.feishu.cn/app"
echo "2. 创建新应用"
echo "3. 启用机器人能力"
echo "4. 复制Webhook地址到 .env 文件的 FEISHU_WEBHOOK"
echo ""
echo "📌 获取Webhook URL后运行:"
echo "export FEISHU_WEBHOOK='你的Webhook地址'"
echo "python feishu_bot.py"

📌 四、钉钉集成(1分钟)

# dingtalk_bot.py
from flask import Flask, request, jsonify
import requests
import hashlib
import base64
import hmac
import time
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

class DingTalkBot:
    def __init__(self, app_secret, access_token):
        self.app_secret = app_secret
        self.access_token = access_token
        
    def sign(self, timestamp):
        """生成签名"""
        string_to_sign = f'{timestamp}\n{self.app_secret}'
        hmac_code = hmac.new(
            self.app_secret.encode('utf-8'),
            string_to_sign.encode('utf-8'),
            digestmod=hashlib.sha256
        ).digest()
        
        return base64.b64encode(hmac_code).decode('utf-8')
    
    async def send_message(self, user_id, message):
        """发送钉钉消息"""
        timestamp = str(round(time.time() * 1000))
        sign = self.sign(timestamp)
        
        url = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}&timestamp={timestamp}&sign={sign}'
        
        payload = {
            "msgtype": "text",
            "text": {
                "content": message
            },
            "at": {
                "atUserIds": [user_id]
            }
        }
        
        response = requests.post(url, json=payload)
        return response.json()

@app.route('/dingtalk', methods=['POST'])
def dingtalk_webhook():
    """钉钉Webhook接口"""
    data = request.json
    
    if data.get('msgtype') == 'text':
        text = data.get('text', {}).get('content', '')
        sender_id = data.get('senderStaffId', '')
        
        # 异步处理
        asyncio.create_task(process_dingtalk_message(text, sender_id))
    
    return jsonify({"msgtype": "text", "text": {"content": "处理中..."}})

async def process_dingtalk_message(text, user_id):
    """处理钉钉消息"""
    try:
        # 配置钉钉机器人
        bot = DingTalkBot(
            app_secret=os.getenv('DINGTALK_APP_SECRET'),
            access_token=os.getenv('DINGTALK_ACCESS_TOKEN')
        )
        
        # 调用OpenClaw
        result = await claw.process_command(text)
        
        # 回复消息
        await bot.send_message(user_id, result)
        
    except Exception as e:
        print(f"钉钉处理失败: {e}")

if __name__ == '__main__':
    app.run(port=5001)

🎮 五、QQ集成(1分钟)

使用 go-cqhttp(推荐)

# 下载 go-cqhttp
wget https://github.com/Mrs4s/go-cqhttp/releases/download/v1.0.0-rc3/go-cqhttp_linux_amd64.tar.gz
tar -zxvf go-cqhttp_linux_amd64.tar.gz
cd go-cqhttp

# 生成配置文件
./go-cqhttp
# 选择 3 (HTTP通信) 和 0 (正向WebSocket)

QQ机器人服务端

# qq_bot.py
from flask import Flask, request, jsonify
import websocket
import json
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

class QQBot:
    def __init__(self, ws_url="ws://127.0.0.1:6700"):
        self.ws_url = ws_url
        self.ws = None
        
    def connect(self):
        """连接WebSocket"""
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        self.ws.on_open = self.on_open
        self.ws.run_forever()
    
    def on_message(self, ws, message):
        """处理收到的消息"""
        data = json.loads(message)
        
        if data.get('post_type') == 'message':
            # 异步处理消息
            asyncio.create_task(self.handle_qq_message(data))
    
    async def handle_qq_message(self, data):
        """处理QQ消息"""
        try:
            message = data.get('message', '')
            user_id = data.get('user_id')
            group_id = data.get('group_id')
            
            # 调用OpenClaw
            result = await claw.process_command(message)
            
            # 回复消息
            if group_id:
                self.send_group_message(group_id, result)
            else:
                self.send_private_message(user_id, result)
                
        except Exception as e:
            print(f"QQ消息处理失败: {e}")
    
    def send_group_message(self, group_id, message):
        """发送群消息"""
        payload = {
            "action": "send_group_msg",
            "params": {
                "group_id": group_id,
                "message": message
            }
        }
        self.ws.send(json.dumps(payload))
    
    def send_private_message(self, user_id, message):
        """发送私聊消息"""
        payload = {
            "action": "send_private_msg",
            "params": {
                "user_id": user_id,
                "message": message
            }
        }
        self.ws.send(json.dumps(payload))
    
    def on_error(self, ws, error):
        print(f"WebSocket错误: {error}")
    
    def on_close(self, ws, close_status_code, close_msg):
        print("WebSocket连接关闭")
    
    def on_open(self, ws):
        print("✅ QQ机器人已连接")

@app.route('/qq', methods=['POST'])
def qq_http():
    """HTTP模式下的QQ消息接口"""
    data = request.json
    
    if data.get('post_type') == 'message':
        message = data.get('message', '')
        asyncio.create_task(claw.process_command(message))
    
    return jsonify({"status": "ok"})

if __name__ == '__main__':
    # 启动WebSocket连接
    bot = QQBot()
    bot.connect()
    
    # 启动HTTP服务器
    app.run(port=5002)

🧠 六、OpenClaw核心实现

# openclaw_core.py
import os
import asyncio
import base64
from typing import Optional
import openai
from config import Config
import pyautogui
from PIL import Image
import io

class OpenClawCore:
    """OpenClaw核心引擎"""
    
    def __init__(self):
        self.client = openai.OpenAI(api_key=Config.API_KEY)
        self.model = Config.MODEL
        
        # 初始化自动化工具
        pyautogui.FAILSAFE = True
        
    async def process_command(self, command: str) -> str:
        """处理用户命令"""
        try:
            print(f"处理命令: {command}")
            
            # 1. 分析命令类型
            command_type = self.analyze_command_type(command)
            
            # 2. 根据类型执行
            if command_type == "screen_operation":
                return await self.handle_screen_operation(command)
            elif command_type == "browser_operation":
                return await self.handle_browser_operation(command)
            elif command_type == "file_operation":
                return await self.handle_file_operation(command)
            else:
                return await self.handle_general_query(command)
                
        except Exception as e:
            return f"❌ 执行失败: {str(e)}"
    
    def analyze_command_type(self, command: str) -> str:
        """分析命令类型"""
        command = command.lower()
        
        if any(word in command for word in ['点击', '打开', '关闭', '截图', '输入', '查找']):
            return "screen_operation"
        elif any(word in command for word in ['网页', '网站', '浏览器', '搜索']):
            return "browser_operation"
        elif any(word in command for word in ['文件', '文档', '保存', '读取']):
            return "file_operation"
        else:
            return "general_query"
    
    async def handle_screen_operation(self, command: str) -> str:
        """处理屏幕操作"""
        try:
            # 截图当前屏幕
            screenshot = pyautogui.screenshot()
            
            # 保存截图
            timestamp = asyncio.get_event_loop().time()
            screenshot_path = f"{Config.SCREENSHOT_DIR}/screen_{timestamp}.png"
            screenshot.save(screenshot_path)
            
            # 转换为base64
            buffered = io.BytesIO()
            screenshot.save(buffered, format="PNG")
            img_base64 = base64.b64encode(buffered.getvalue()).decode()
            
            # 调用GPT分析截图并执行操作
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": """你是一个屏幕操作助手。
                    分析用户指令和当前屏幕截图,返回要执行的操作步骤。
                    操作格式:click(x,y) 或 type(text) 或 press(key)"""},
                    {"role": "user", "content": [
                        {"type": "text", "text": f"指令: {command}"},
                        {"type": "image_url", "image_url": {
                            "url": f"data:image/png;base64,{img_base64}"
                        }}
                    ]}
                ]
            )
            
            # 解析并执行操作
            action_text = response.choices[0].message.content
            return await self.execute_action(action_text)
            
        except Exception as e:
            return f"屏幕操作失败: {e}"
    
    async def execute_action(self, action_text: str) -> str:
        """执行操作指令"""
        try:
            # 解析操作(简化版)
            if action_text.startswith("click("):
                # 提取坐标
                coords = action_text[6:-1].split(",")
                x, y = int(coords[0]), int(coords[1])
                pyautogui.click(x, y)
                return f"✅ 已点击位置 ({x}, {y})"
                
            elif action_text.startswith("type("):
                text = action_text[5:-1]
                pyautogui.write(text)
                return f"✅ 已输入: {text}"
                
            elif action_text.startswith("press("):
                key = action_text[6:-1]
                pyautogui.press(key)
                return f"✅ 已按下按键: {key}"
                
            else:
                return f"✅ 操作完成: {action_text}"
                
        except Exception as e:
            return f"❌ 操作执行失败: {e}"
    
    async def handle_browser_operation(self, command: str) -> str:
        """处理浏览器操作"""
        from selenium import webdriver
        from selenium.webdriver.common.by import By
        from selenium.webdriver.common.keys import Keys
        
        driver = None
        try:
            # 启动浏览器
            driver = webdriver.Chrome()
            
            # 根据指令执行操作
            if "打开" in command and "http" in command:
                # 提取URL
                import re
                urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', command)
                if urls:
                    driver.get(urls[0])
                    return f"✅ 已打开: {urls[0]}"
            
            elif "搜索" in command:
                # 提取搜索关键词
                search_term = command.split("搜索")[-1].strip()
                driver.get(f"https://www.google.com/search?q={search_term}")
                return f"✅ 已搜索: {search_term}"
            
            return "✅ 浏览器操作完成"
            
        except Exception as e:
            return f"浏览器操作失败: {e}"
        finally:
            if driver:
                driver.quit()
    
    async def handle_general_query(self, query: str) -> str:
        """处理一般查询"""
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "你是一个智能助手,帮助用户完成各种任务。"},
                {"role": "user", "content": query}
            ]
        )
        
        return response.choices[0].message.content

🚀 七、一键启动脚本

# start_all.sh
#!/bin/bash

echo "🤖 启动所有机器人服务..."
echo "========================"

# 1. 检查环境
if [ ! -d "venv" ]; then
    echo "❌ 请先运行 ./deploy.sh 安装环境"
    exit 1
fi

source venv/bin/activate

# 2. 创建日志目录
mkdir -p logs

# 3. 启动各服务(使用screen/tmux)
echo "1. 启动微信机器人..."
screen -dmS wechat_bot python wechat_bot.py

echo "2. 启动飞书机器人..."
screen -dmS feishu_bot python feishu_bot.py

echo "3. 启动钉钉机器人..."
screen -dmS dingtalk_bot python dingtalk_bot.py

echo "4. 启动QQ机器人..."
screen -dmS qq_bot python qq_bot.py

echo ""
echo "✅ 所有服务已启动!"
echo ""
echo "📋 运行状态:"
echo "微信机器人: screen -r wechat_bot"
echo "飞书机器人: screen -r feishu_bot"
echo "钉钉机器人: screen -r dingtalk_bot"
echo "QQ机器人:   screen -r qq_bot"
echo ""
echo "📝 查看日志:"
echo "tail -f logs/*.log"

📊 八、监控和管理面板

# dashboard.py
from flask import Flask, render_template_string
import psutil
import json
from datetime import datetime

app = Flask(__name__)

HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
    <title>OpenClaw 监控面板</title>
    <style>
        body { font-family: Arial; margin: 20px; }
        .card { 
            border: 1px solid #ddd; 
            padding: 20px; 
            margin: 10px; 
            border-radius: 5px;
            display: inline-block;
            width: 200px;
        }
        .status-online { color: green; }
        .status-offline { color: red; }
    </style>
</head>
<body>
    <h1>🤖 OpenClaw 监控面板</h1>
    
    <div class="card">
        <h3>系统状态</h3>
        <p>CPU: {{ cpu_percent }}%</p>
        <p>内存: {{ memory_percent }}%</p>
        <p>运行时间: {{ uptime }}</p>
    </div>
    
    <div class="card">
        <h3>微信机器人</h3>
        <p class="status-{{ wechat_status }}">{{ wechat_status|upper }}</p>
        <p>消息数: {{ wechat_stats.count }}</p>
    </div>
    
    <div class="card">
        <h3>飞书机器人</h3>
        <p class="status-{{ feishu_status }}">{{ feishu_status|upper }}</p>
        <p>消息数: {{ feishu_stats.count }}</p>
    </div>
    
    <div class="card">
        <h3>最近任务</h3>
        <ul>
            {% for task in recent_tasks %}
            <li>{{ task.time }}: {{ task.command }}</li>
            {% endfor %}
        </ul>
    </div>
    
    <div>
        <h3>快速命令</h3>
        <input type="text" id="command" placeholder="输入命令...">
        <button onclick="sendCommand()">执行</button>
    </div>
    
    <script>
        function sendCommand() {
            var command = document.getElementById('command').value;
            fetch('/api/command', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({command: command})
            });
        }
        
        // 自动刷新
        setInterval(() => location.reload(), 30000);
    </script>
</body>
</html>
"""

@app.route('/')
def dashboard():
    """监控面板"""
    return render_template_string(HTML_TEMPLATE, 
        cpu_percent=psutil.cpu_percent(),
        memory_percent=psutil.virtual_memory().percent,
        uptime=str(datetime.now() - datetime.fromtimestamp(psutil.boot_time())),
        wechat_status="online",
        feishu_status="online",
        wechat_stats={"count": 123},
        feishu_stats={"count": 456},
        recent_tasks=[
            {"time": "10:30", "command": "打开浏览器"},
            {"time": "10:31", "command": "搜索信息"},
            {"time": "10:32", "command": "保存文件"}
        ]
    )

@app.route('/api/command', methods=['POST'])
def api_command():
    """API命令接口"""
    import json
    data = request.json
    # 这里可以调用OpenClaw处理命令
    return jsonify({"success": True, "message": "命令已接收"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=True)

📝 九、完整部署检查清单

# checklist.sh
#!/bin/bash

echo "🔍 部署检查清单"
echo "================"

check() {
    if command -v $1 &> /dev/null; then
        echo "✅ $1: 已安装"
    else
        echo "❌ $1: 未安装"
    fi
}

echo ""
echo "1. 基础环境检查:"
check python3.9
check pip
check git

echo ""
echo "2. Python包检查:"
source venv/bin/activate
for pkg in "openai" "flask" "itchat-uos" "selenium" "playwright"; do
    if python -c "import $pkg" 2>/dev/null; then
        echo "✅ $pkg: 已安装"
    else
        echo "❌ $pkg: 未安装"
    fi
done

echo ""
echo "3. 配置文件检查:"
if [ -f ".env" ]; then
    echo "✅ .env: 存在"
    if grep -q "OPENCLAW_API_KEY" .env; then
        echo "✅ API Key: 已配置"
    else
        echo "❌ API Key: 未配置"
    fi
else
    echo "❌ .env: 不存在"
fi

echo ""
echo "4. 目录结构检查:"
for dir in "logs" "screenshots" "plugins" "tasks"; do
    if [ -d "$dir" ]; then
        echo "✅ $dir/: 存在"
    else
        echo "❌ $dir/: 不存在"
    fi
done

echo ""
echo "📌 下一步:"
echo "1. 获取OpenAI API Key: https://platform.openai.com/api-keys"
echo "2. 编辑 .env 文件填入API_KEY"
echo "3. 运行 ./start_all.sh 启动所有服务"

🎯 十、实用命令示例

# 1. 查看所有运行的服务
screen -ls

# 2. 查看特定服务日志
tail -f logs/wechat.log

# 3. 重启服务
screen -X -S wechat_bot quit
screen -dmS wechat_bot python wechat_bot.py

# 4. 测试OpenClaw功能
echo "测试命令:"
echo "1. '截图当前屏幕'"
echo "2. '打开浏览器访问百度'"
echo "3. '搜索人工智能最新进展'"
echo "4. '点击屏幕左上角'"
echo "5. '输入Hello World'"

# 5. 更新所有依赖
pip install --upgrade -r requirements.txt

🚨 注意事项

  1. 安全性

    • 不要将API密钥上传到GitHub
    • 使用防火墙限制访问端口
    • 定期更新依赖包
  2. 稳定性

    • 使用screen/tmux保持服务运行
    • 设置日志轮转防止日志过大
    • 添加异常重试机制
  3. 性能优化

    • 截图压缩减少API调用大小
    • 使用模型缓存减少重复计算
    • 异步处理避免阻塞
  4. 平台限制

    • 微信网页版可能被封号(建议用小号测试)
    • 钉钉/飞书需要企业认证
    • QQ机器人需要服务器部署

这个保姆级教程包含了从零开始部署OpenClaw并集成到各大即时通讯平台的完整流程。按照步骤操作,10分钟内即可搭建起可用的自动化系统!

OpenClaw 本地部署与即时通讯集成实战

我将提供一个完整的保姆级教程,让你在10分钟内搭建起OpenClaw本地系统,并接入微信/飞书/钉钉/QQ等平台。

📦 一、快速部署(5分钟)

步骤1:一键部署脚本

# 创建项目目录
mkdir openclaw-bot && cd openclaw-bot

# 创建一键部署脚本
cat > deploy.sh << 'EOF'
#!/bin/bash

echo "🚀 OpenClaw 一键部署开始..."

# 1. 创建Python虚拟环境
python3.9 -m venv venv
source venv/bin/activate

# 2. 安装核心依赖
pip install --upgrade pip
pip install open-interpreter
pip install python-dotenv
pip install playwright
pip install selenium
pip install requests
playwright install

# 3. 创建项目结构
mkdir -p config logs plugins tasks screenshots

# 4. 创建配置文件
cat > .env << 'ENV'
# OpenClaw配置
OPENCLAW_MODEL=gpt-4o-mini
OPENCLAW_API_KEY=${your_openai_api_key}

# 各平台配置
WECHAT_AUTO_REPLY=true
DINGTALK_WEBHOOK=
FEISHU_WEBHOOK=
QQ_BOT_ID=

# 自动化配置
SCREENSHOT_PATH=./screenshots
LOG_LEVEL=INFO
ENV

echo "✅ 基础环境部署完成!"
EOF

# 赋予执行权限并运行
chmod +x deploy.sh
./deploy.sh

步骤2:快速配置

# OpenClaw 本地部署与即时通讯集成实战

我将提供一个完整的保姆级教程,让你在10分钟内搭建起OpenClaw本地系统,并接入微信/飞书/钉钉/QQ等平台。

## 📦 一、快速部署(5分钟)

### 步骤1:一键部署脚本

```bash
# 创建项目目录
mkdir openclaw-bot && cd openclaw-bot

# 创建一键部署脚本
cat > deploy.sh << 'EOF'
#!/bin/bash

echo "🚀 OpenClaw 一键部署开始..."

# 1. 创建Python虚拟环境
python3.9 -m venv venv
source venv/bin/activate

# 2. 安装核心依赖
pip install --upgrade pip
pip install open-interpreter
pip install python-dotenv
pip install playwright
pip install selenium
pip install requests
playwright install

# 3. 创建项目结构
mkdir -p config logs plugins tasks screenshots

# 4. 创建配置文件
cat > .env << 'ENV'
# OpenClaw配置
OPENCLAW_MODEL=gpt-4o-mini
OPENCLAW_API_KEY=${your_openai_api_key}

# 各平台配置
WECHAT_AUTO_REPLY=true
DINGTALK_WEBHOOK=
FEISHU_WEBHOOK=
QQ_BOT_ID=

# 自动化配置
SCREENSHOT_PATH=./screenshots
LOG_LEVEL=INFO
ENV

echo "✅ 基础环境部署完成!"
EOF

# 赋予执行权限并运行
chmod +x deploy.sh
./deploy.sh

步骤2:快速配置

# 创建核心配置文件
cat > config.py << 'EOF'
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    # OpenClaw配置
    MODEL = os.getenv('OPENCLAW_MODEL', 'gpt-4o-mini')
    API_KEY = os.getenv('OPENCLAW_API_KEY')
    
    # 各平台Webhook
    DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '')
    FEISHU_WEBHOOK = os.getenv('FEISHU_WEBHOOK', '')
    
    # 微信配置(使用itchat-uos)
    WECHAT_ENABLED = os.getenv('WECHAT_AUTO_REPLY', 'true').lower() == 'true'
    
    # QQ配置(使用Mirai)
    QQ_BOT_ID = os.getenv('QQ_BOT_ID', '')
    
    # 路径配置
    SCREENSHOT_DIR = os.getenv('SCREENSHOT_PATH', './screenshots')
    LOG_DIR = './logs'
    
    @staticmethod
    def validate():
        if not Config.API_KEY:
            raise ValueError("请设置OPENCLAW_API_KEY环境变量")
EOF

💬 二、微信集成(2分钟)

使用 itchat-uos(最简方案)

# wechat_bot.py
import itchat
import asyncio
from openclaw_core import OpenClawCore
import json

class WeChatBot:
    def __init__(self):
        self.claw = OpenClawCore()
        
    def start(self):
        """启动微信机器人"""
        @itchat.msg_register(itchat.content.TEXT)
        def text_reply(msg):
            print(f"收到消息: {msg['Text']} from {msg['FromUserName']}")
            
            # 异步处理消息
            asyncio.create_task(self.handle_message(msg))
            return "正在处理您的请求..."
        
        # 登录
        itchat.auto_login(hotReload=True, enableCmdQR=2)
        print("✅ 微信登录成功!")
        itchat.run()

    async def handle_message(self, msg):
        """处理消息"""
        try:
            user_msg = msg['Text']
            user_id = msg['FromUserName']
            
            # 调用OpenClaw处理
            response = await self.claw.process_command(user_msg)
            
            # 回复用户
            itchat.send(response, toUserName=user_id)
            
        except Exception as e:
            error_msg = f"处理失败: {str(e)}"
            itchat.send(error_msg, toUserName=msg['FromUserName'])

# 运行微信机器人
if __name__ == "__main__":
    bot = WeChatBot()
    bot.start()

快速启动微信机器人

# 安装itchat-uos
pip install itchat-uos==1.5.0.dev0

# 创建快速启动脚本
cat > start_wechat.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
echo "📱 启动微信机器人..."
echo "请使用手机微信扫描二维码登录"
python wechat_bot.py
EOF

chmod +x start_wechat.sh

📱 三、飞书集成(1分钟)

# feishu_bot.py
from flask import Flask, request, jsonify
import requests
import json
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

# 飞书验证令牌
VERIFICATION_TOKEN = "your_verification_token"

@app.route('/feishu', methods=['POST'])
def feishu_webhook():
    """飞书Webhook接口"""
    data = request.json
    
    # 验证请求
    if data.get("token") != VERIFICATION_TOKEN:
        return jsonify({"error": "Invalid token"}), 403
    
    # 处理消息
    if data.get("type") == "url_verification":
        return jsonify({"challenge": data.get("challenge")})
    
    elif data.get("type") == "event_callback":
        event = data.get("event")
        if event.get("message_type") == "text":
            text = event.get("text", "")
            user_id = event.get("sender", {}).get("user_id")
            
            # 异步处理
            asyncio.create_task(process_feishu_message(text, user_id))
            
    return jsonify({"success": True})

async def process_feishu_message(text, user_id):
    """处理飞书消息"""
    try:
        # 调用OpenClaw
        result = await claw.process_command(text)
        
        # 回复到飞书(需要飞书机器人API)
        await reply_to_feishu(user_id, result)
        
    except Exception as e:
        print(f"飞书处理失败: {e}")

async def reply_to_feishu(user_id, message):
    """回复飞书消息"""
    # 这里需要飞书机器人的API密钥
    webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/your_token"
    
    payload = {
        "msg_type": "text",
        "content": {
            "text": message
        }
    }
    
    requests.post(webhook_url, json=payload)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

飞书快速配置脚本

# feishu_setup.sh
#!/bin/bash
echo "🔧 配置飞书机器人:"
echo "1. 访问 https://open.feishu.cn/app"
echo "2. 创建新应用"
echo "3. 启用机器人能力"
echo "4. 复制Webhook地址到 .env 文件的 FEISHU_WEBHOOK"
echo ""
echo "📌 获取Webhook URL后运行:"
echo "export FEISHU_WEBHOOK='你的Webhook地址'"
echo "python feishu_bot.py"

📌 四、钉钉集成(1分钟)

# dingtalk_bot.py
from flask import Flask, request, jsonify
import requests
import hashlib
import base64
import hmac
import time
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

class DingTalkBot:
    def __init__(self, app_secret, access_token):
        self.app_secret = app_secret
        self.access_token = access_token
        
    def sign(self, timestamp):
        """生成签名"""
        string_to_sign = f'{timestamp}\n{self.app_secret}'
        hmac_code = hmac.new(
            self.app_secret.encode('utf-8'),
            string_to_sign.encode('utf-8'),
            digestmod=hashlib.sha256
        ).digest()
        
        return base64.b64encode(hmac_code).decode('utf-8')
    
    async def send_message(self, user_id, message):
        """发送钉钉消息"""
        timestamp = str(round(time.time() * 1000))
        sign = self.sign(timestamp)
        
        url = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}&timestamp={timestamp}&sign={sign}'
        
        payload = {
            "msgtype": "text",
            "text": {
                "content": message
            },
            "at": {
                "atUserIds": [user_id]
            }
        }
        
        response = requests.post(url, json=payload)
        return response.json()

@app.route('/dingtalk', methods=['POST'])
def dingtalk_webhook():
    """钉钉Webhook接口"""
    data = request.json
    
    if data.get('msgtype') == 'text':
        text = data.get('text', {}).get('content', '')
        sender_id = data.get('senderStaffId', '')
        
        # 异步处理
        asyncio.create_task(process_dingtalk_message(text, sender_id))
    
    return jsonify({"msgtype": "text", "text": {"content": "处理中..."}})

async def process_dingtalk_message(text, user_id):
    """处理钉钉消息"""
    try:
        # 配置钉钉机器人
        bot = DingTalkBot(
            app_secret=os.getenv('DINGTALK_APP_SECRET'),
            access_token=os.getenv('DINGTALK_ACCESS_TOKEN')
        )
        
        # 调用OpenClaw
        result = await claw.process_command(text)
        
        # 回复消息
        await bot.send_message(user_id, result)
        
    except Exception as e:
        print(f"钉钉处理失败: {e}")

if __name__ == '__main__':
    app.run(port=5001)

🎮 五、QQ集成(1分钟)

使用 go-cqhttp(推荐)

# 下载 go-cqhttp
wget https://github.com/Mrs4s/go-cqhttp/releases/download/v1.0.0-rc3/go-cqhttp_linux_amd64.tar.gz
tar -zxvf go-cqhttp_linux_amd64.tar.gz
cd go-cqhttp

# 生成配置文件
./go-cqhttp
# 选择 3 (HTTP通信) 和 0 (正向WebSocket)

QQ机器人服务端

# qq_bot.py
from flask import Flask, request, jsonify
import websocket
import json
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

class QQBot:
    def __init__(self, ws_url="ws://127.0.0.1:6700"):
        self.ws_url = ws_url
        self.ws = None
        
    def connect(self):
        """连接WebSocket"""
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        self.ws.on_open = self.on_open
        self.ws.run_forever()
    
    def on_message(self, ws, message):
        """处理收到的消息"""
        data = json.loads(message)
        
        if data.get('post_type') == 'message':
            # 异步处理消息
            asyncio.create_task(self.handle_qq_message(data))
    
    async def handle_qq_message(self, data):
        """处理QQ消息"""
        try:
            message = data.get('message', '')
            user_id = data.get('user_id')
            group_id = data.get('group_id')
            
            # 调用OpenClaw
            result = await claw.process_command(message)
            
            # 回复消息
            if group_id:
                self.send_group_message(group_id, result)
            else:
                self.send_private_message(user_id, result)
                
        except Exception as e:
            print(f"QQ消息处理失败: {e}")
    
    def send_group_message(self, group_id, message):
        """发送群消息"""
        payload = {
            "action": "send_group_msg",
            "params": {
                "group_id": group_id,
                "message": message
            }
        }
        self.ws.send(json.dumps(payload))
    
    def send_private_message(self, user_id, message):
        """发送私聊消息"""
        payload = {
            "action": "send_private_msg",
            "params": {
                "user_id": user_id,
                "message": message
            }
        }
        self.ws.send(json.dumps(payload))
    
    def on_error(self, ws, error):
        print(f"WebSocket错误: {error}")
    
    def on_close(self, ws, close_status_code, close_msg):
        print("WebSocket连接关闭")
    
    def on_open(self, ws):
        print("✅ QQ机器人已连接")

@app.route('/qq', methods=['POST'])
def qq_http():
    """HTTP模式下的QQ消息接口"""
    data = request.json
    
    if data.get('post_type') == 'message':
        message = data.get('message', '')
        asyncio.create_task(claw.process_command(message))
    
    return jsonify({"status": "ok"})

if __name__ == '__main__':
    # 启动WebSocket连接
    bot = QQBot()
    bot.connect()
    
    # 启动HTTP服务器
    app.run(port=5002)

🧠 六、OpenClaw核心实现

# openclaw_core.py
import os
import asyncio
import base64
from typing import Optional
import openai
from config import Config
import pyautogui
from PIL import Image
import io

class OpenClawCore:
    """OpenClaw核心引擎"""
    
    def __init__(self):
        self.client = openai.OpenAI(api_key=Config.API_KEY)
        self.model = Config.MODEL
        
        # 初始化自动化工具
        pyautogui.FAILSAFE = True
        
    async def process_command(self, command: str) -> str:
        """处理用户命令"""
        try:
            print(f"处理命令: {command}")
            
            # 1. 分析命令类型
            command_type = self.analyze_command_type(command)
            
            # 2. 根据类型执行
            if command_type == "screen_operation":
                return await self.handle_screen_operation(command)
            elif command_type == "browser_operation":
                return await self.handle_browser_operation(command)
            elif command_type == "file_operation":
                return await self.handle_file_operation(command)
            else:
                return await self.handle_general_query(command)
                
        except Exception as e:
            return f"❌ 执行失败: {str(e)}"
    
    def analyze_command_type(self, command: str) -> str:
        """分析命令类型"""
        command = command.lower()
        
        if any(word in command for word in ['点击', '打开', '关闭', '截图', '输入', '查找']):
            return "screen_operation"
        elif any(word in command for word in ['网页', '网站', '浏览器', '搜索']):
            return "browser_operation"
        elif any(word in command for word in ['文件', '文档', '保存', '读取']):
            return "file_operation"
        else:
            return "general_query"
    
    async def handle_screen_operation(self, command: str) -> str:
        """处理屏幕操作"""
        try:
            # 截图当前屏幕
            screenshot = pyautogui.screenshot()
            
            # 保存截图
            timestamp = asyncio.get_event_loop().time()
            screenshot_path = f"{Config.SCREENSHOT_DIR}/screen_{timestamp}.png"
            screenshot.save(screenshot_path)
            
            # 转换为base64
            buffered = io.BytesIO()
            screenshot.save(buffered, format="PNG")
            img_base64 = base64.b64encode(buffered.getvalue()).decode()
            
            # 调用GPT分析截图并执行操作
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": """你是一个屏幕操作助手。
                    分析用户指令和当前屏幕截图,返回要执行的操作步骤。
                    操作格式:click(x,y) 或 type(text) 或 press(key)"""},
                    {"role": "user", "content": [
                        {"type": "text", "text": f"指令: {command}"},
                        {"type": "image_url", "image_url": {
                            "url": f"data:image/png;base64,{img_base64}"
                        }}
                    ]}
                ]
            )
            
            # 解析并执行操作
            action_text = response.choices[0].message.content
            return await self.execute_action(action_text)
            
        except Exception as e:
            return f"屏幕操作失败: {e}"
    
    async def execute_action(self, action_text: str) -> str:
        """执行操作指令"""
        try:
            # 解析操作(简化版)
            if action_text.startswith("click("):
                # 提取坐标
                coords = action_text[6:-1].split(",")
                x, y = int(coords[0]), int(coords[1])
                pyautogui.click(x, y)
                return f"✅ 已点击位置 ({x}, {y})"
                
            elif action_text.startswith("type("):
                text = action_text[5:-1]
                pyautogui.write(text)
                return f"✅ 已输入: {text}"
                
            elif action_text.startswith("press("):
                key = action_text[6:-1]
                pyautogui.press(key)
                return f"✅ 已按下按键: {key}"
                
            else:
                return f"✅ 操作完成: {action_text}"
                
        except Exception as e:
            return f"❌ 操作执行失败: {e}"
    
    async def handle_browser_operation(self, command: str) -> str:
        """处理浏览器操作"""
        from selenium import webdriver
        from selenium.webdriver.common.by import By
        from selenium.webdriver.common.keys import Keys
        
        driver = None
        try:
            # 启动浏览器
            driver = webdriver.Chrome()
            
            # 根据指令执行操作
            if "打开" in command and "http" in command:
                # 提取URL
                import re
                urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', command)
                if urls:
                    driver.get(urls[0])
                    return f"✅ 已打开: {urls[0]}"
            
            elif "搜索" in command:
                # 提取搜索关键词
                search_term = command.split("搜索")[-1].strip()
                driver.get(f"https://www.google.com/search?q={search_term}")
                return f"✅ 已搜索: {search_term}"
            
            return "✅ 浏览器操作完成"
            
        except Exception as e:
            return f"浏览器操作失败: {e}"
        finally:
            if driver:
                driver.quit()
    
    async def handle_general_query(self, query: str) -> str:
        """处理一般查询"""
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "你是一个智能助手,帮助用户完成各种任务。"},
                {"role": "user", "content": query}
            ]
        )
        
        return response.choices[0].message.content

🚀 七、一键启动脚本

# start_all.sh
#!/bin/bash

echo "🤖 启动所有机器人服务..."
echo "========================"

# 1. 检查环境
if [ ! -d "venv" ]; then
    echo "❌ 请先运行 ./deploy.sh 安装环境"
    exit 1
fi

source venv/bin/activate

# 2. 创建日志目录
mkdir -p logs

# 3. 启动各服务(使用screen/tmux)
echo "1. 启动微信机器人..."
screen -dmS wechat_bot python wechat_bot.py

echo "2. 启动飞书机器人..."
screen -dmS feishu_bot python feishu_bot.py

echo "3. 启动钉钉机器人..."
screen -dmS dingtalk_bot python dingtalk_bot.py

echo "4. 启动QQ机器人..."
screen -dmS qq_bot python qq_bot.py

echo ""
echo "✅ 所有服务已启动!"
echo ""
echo "📋 运行状态:"
echo "微信机器人: screen -r wechat_bot"
echo "飞书机器人: screen -r feishu_bot"
echo "钉钉机器人: screen -r dingtalk_bot"
echo "QQ机器人:   screen -r qq_bot"
echo ""
echo "📝 查看日志:"
echo "tail -f logs/*.log"

📊 八、监控和管理面板

# dashboard.py
from flask import Flask, render_template_string
import psutil
import json
from datetime import datetime

app = Flask(__name__)

HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
    <title>OpenClaw 监控面板</title>
    <style>
        body { font-family: Arial; margin: 20px; }
        .card { 
            border: 1px solid #ddd; 
            padding: 20px; 
            margin: 10px; 
            border-radius: 5px;
            display: inline-block;
            width: 200px;
        }
        .status-online { color: green; }
        .status-offline { color: red; }
    </style>
</head>
<body>
    <h1>🤖 OpenClaw 监控面板</h1>
    
    <div class="card">
        <h3>系统状态</h3>
        <p>CPU: {{ cpu_percent }}%</p>
        <p>内存: {{ memory_percent }}%</p>
        <p>运行时间: {{ uptime }}</p>
    </div>
    
    <div class="card">
        <h3>微信机器人</h3>
        <p class="status-{{ wechat_status }}">{{ wechat_status|upper }}</p>
        <p>消息数: {{ wechat_stats.count }}</p>
    </div>
    
    <div class="card">
        <h3>飞书机器人</h3>
        <p class="status-{{ feishu_status }}">{{ feishu_status|upper }}</p>
        <p>消息数: {{ feishu_stats.count }}</p>
    </div>
    
    <div class="card">
        <h3>最近任务</h3>
        <ul>
            {% for task in recent_tasks %}
            <li>{{ task.time }}: {{ task.command }}</li>
            {% endfor %}
        </ul>
    </div>
    
    <div>
        <h3>快速命令</h3>
        <input type="text" id="command" placeholder="输入命令...">
        <button onclick="sendCommand()">执行</button>
    </div>
    
    <script>
        function sendCommand() {
            var command = document.getElementById('command').value;
            fetch('/api/command', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({command: command})
            });
        }
        
        // 自动刷新
        setInterval(() => location.reload(), 30000);
    </script>
</body>
</html>
"""

@app.route('/')
def dashboard():
    """监控面板"""
    return render_template_string(HTML_TEMPLATE, 
        cpu_percent=psutil.cpu_percent(),
        memory_percent=psutil.virtual_memory().percent,
        uptime=str(datetime.now() - datetime.fromtimestamp(psutil.boot_time())),
        wechat_status="online",
        feishu_status="online",
        wechat_stats={"count": 123},
        feishu_stats={"count": 456},
        recent_tasks=[
            {"time": "10:30", "command": "打开浏览器"},
            {"time": "10:31", "command": "搜索信息"},
            {"time": "10:32", "command": "保存文件"}
        ]
    )

@app.route('/api/command', methods=['POST'])
def api_command():
    """API命令接口"""
    import json
    data = request.json
    # 这里可以调用OpenClaw处理命令
    return jsonify({"success": True, "message": "命令已接收"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=True)

📝 九、完整部署检查清单

# checklist.sh
#!/bin/bash

echo "🔍 部署检查清单"
echo "================"

check() {
    if command -v $1 &> /dev/null; then
        echo "✅ $1: 已安装"
    else
        echo "❌ $1: 未安装"
    fi
}

echo ""
echo "1. 基础环境检查:"
check python3.9
check pip
check git

echo ""
echo "2. Python包检查:"
source venv/bin/activate
for pkg in "openai" "flask" "itchat-uos" "selenium" "playwright"; do
    if python -c "import $pkg" 2>/dev/null; then
        echo "✅ $pkg: 已安装"
    else
        echo "❌ $pkg: 未安装"
    fi
done

echo ""
echo "3. 配置文件检查:"
if [ -f ".env" ]; then
    echo "✅ .env: 存在"
    if grep -q "OPENCLAW_API_KEY" .env; then
        echo "✅ API Key: 已配置"
    else
        echo "❌ API Key: 未配置"
    fi
else
    echo "❌ .env: 不存在"
fi

echo ""
echo "4. 目录结构检查:"
for dir in "logs" "screenshots" "plugins" "tasks"; do
    if [ -d "$dir" ]; then
        echo "✅ $dir/: 存在"
    else
        echo "❌ $dir/: 不存在"
    fi
done

echo ""
echo "📌 下一步:"
echo "1. 获取OpenAI API Key: https://platform.openai.com/api-keys"
echo "2. 编辑 .env 文件填入API_KEY"
echo "3. 运行 ./start_all.sh 启动所有服务"

🎯 十、实用命令示例

# 1. 查看所有运行的服务
screen -ls

# 2. 查看特定服务日志
tail -f logs/wechat.log

# 3. 重启服务
screen -X -S wechat_bot quit
screen -dmS wechat_bot python wechat_bot.py

# 4. 测试OpenClaw功能
echo "测试命令:"
echo "1. '截图当前屏幕'"
echo "2. '打开浏览器访问百度'"
echo "3. '搜索人工智能最新进展'"
echo "4. '点击屏幕左上角'"
echo "5. '输入Hello World'"

# 5. 更新所有依赖
pip install --upgrade -r requirements.txt

🚨 注意事项

  1. 安全性

    • 不要将API密钥上传到GitHub
    • 使用防火墙限制访问端口
    • 定期更新依赖包
  2. 稳定性

    • 使用screen/tmux保持服务运行
    • 设置日志轮转防止日志过大
    • 添加异常重试机制
  3. 性能优化

    • 截图压缩减少API调用大小
    • 使用模型缓存减少重复计算
    • 异步处理避免阻塞
  4. 平台限制

    • 微信网页版可能被封号(建议用小号测试)
    • 钉钉/飞书需要企业认证
    • QQ机器人需要服务器部署

这个保姆级教程包含了从零开始部署OpenClaw并集成到各大即时通讯平台的完整流程。按照步骤操作,10分钟内即可搭建起可用的自动化系统!# 创建核心配置文件
cat > config.py << ‘EOF’
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
# OpenClaw配置
MODEL = os.getenv(‘OPENCLAW_MODEL’, ‘gpt-4o-mini’)
API_KEY = os.getenv(‘OPENCLAW_API_KEY’)

# 各平台Webhook
DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK', '')
FEISHU_WEBHOOK = os.getenv('FEISHU_WEBHOOK', '')

# 微信配置(使用itchat-uos)
WECHAT_ENABLED = os.getenv('WECHAT_AUTO_REPLY', 'true').lower() == 'true'

# QQ配置(使用Mirai)
QQ_BOT_ID = os.getenv('QQ_BOT_ID', '')

# 路径配置
SCREENSHOT_DIR = os.getenv('SCREENSHOT_PATH', './screenshots')
LOG_DIR = './logs'

@staticmethod
def validate():
    if not Config.API_KEY:
        raise ValueError("请设置OPENCLAW_API_KEY环境变量")

EOF


## 💬 二、微信集成(2分钟)

### 使用 itchat-uos(最简方案)

```python
# wechat_bot.py
import itchat
import asyncio
from openclaw_core import OpenClawCore
import json

class WeChatBot:
    def __init__(self):
        self.claw = OpenClawCore()
        
    def start(self):
        """启动微信机器人"""
        @itchat.msg_register(itchat.content.TEXT)
        def text_reply(msg):
            print(f"收到消息: {msg['Text']} from {msg['FromUserName']}")
            
            # 异步处理消息
            asyncio.create_task(self.handle_message(msg))
            return "正在处理您的请求..."
        
        # 登录
        itchat.auto_login(hotReload=True, enableCmdQR=2)
        print("✅ 微信登录成功!")
        itchat.run()

    async def handle_message(self, msg):
        """处理消息"""
        try:
            user_msg = msg['Text']
            user_id = msg['FromUserName']
            
            # 调用OpenClaw处理
            response = await self.claw.process_command(user_msg)
            
            # 回复用户
            itchat.send(response, toUserName=user_id)
            
        except Exception as e:
            error_msg = f"处理失败: {str(e)}"
            itchat.send(error_msg, toUserName=msg['FromUserName'])

# 运行微信机器人
if __name__ == "__main__":
    bot = WeChatBot()
    bot.start()

快速启动微信机器人

# 安装itchat-uos
pip install itchat-uos==1.5.0.dev0

# 创建快速启动脚本
cat > start_wechat.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
echo "📱 启动微信机器人..."
echo "请使用手机微信扫描二维码登录"
python wechat_bot.py
EOF

chmod +x start_wechat.sh

📱 三、飞书集成(1分钟)

# feishu_bot.py
from flask import Flask, request, jsonify
import requests
import json
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

# 飞书验证令牌
VERIFICATION_TOKEN = "your_verification_token"

@app.route('/feishu', methods=['POST'])
def feishu_webhook():
    """飞书Webhook接口"""
    data = request.json
    
    # 验证请求
    if data.get("token") != VERIFICATION_TOKEN:
        return jsonify({"error": "Invalid token"}), 403
    
    # 处理消息
    if data.get("type") == "url_verification":
        return jsonify({"challenge": data.get("challenge")})
    
    elif data.get("type") == "event_callback":
        event = data.get("event")
        if event.get("message_type") == "text":
            text = event.get("text", "")
            user_id = event.get("sender", {}).get("user_id")
            
            # 异步处理
            asyncio.create_task(process_feishu_message(text, user_id))
            
    return jsonify({"success": True})

async def process_feishu_message(text, user_id):
    """处理飞书消息"""
    try:
        # 调用OpenClaw
        result = await claw.process_command(text)
        
        # 回复到飞书(需要飞书机器人API)
        await reply_to_feishu(user_id, result)
        
    except Exception as e:
        print(f"飞书处理失败: {e}")

async def reply_to_feishu(user_id, message):
    """回复飞书消息"""
    # 这里需要飞书机器人的API密钥
    webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/your_token"
    
    payload = {
        "msg_type": "text",
        "content": {
            "text": message
        }
    }
    
    requests.post(webhook_url, json=payload)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

飞书快速配置脚本

# feishu_setup.sh
#!/bin/bash
echo "🔧 配置飞书机器人:"
echo "1. 访问 https://open.feishu.cn/app"
echo "2. 创建新应用"
echo "3. 启用机器人能力"
echo "4. 复制Webhook地址到 .env 文件的 FEISHU_WEBHOOK"
echo ""
echo "📌 获取Webhook URL后运行:"
echo "export FEISHU_WEBHOOK='你的Webhook地址'"
echo "python feishu_bot.py"

📌 四、钉钉集成(1分钟)

# dingtalk_bot.py
from flask import Flask, request, jsonify
import requests
import hashlib
import base64
import hmac
import time
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

class DingTalkBot:
    def __init__(self, app_secret, access_token):
        self.app_secret = app_secret
        self.access_token = access_token
        
    def sign(self, timestamp):
        """生成签名"""
        string_to_sign = f'{timestamp}\n{self.app_secret}'
        hmac_code = hmac.new(
            self.app_secret.encode('utf-8'),
            string_to_sign.encode('utf-8'),
            digestmod=hashlib.sha256
        ).digest()
        
        return base64.b64encode(hmac_code).decode('utf-8')
    
    async def send_message(self, user_id, message):
        """发送钉钉消息"""
        timestamp = str(round(time.time() * 1000))
        sign = self.sign(timestamp)
        
        url = f'https://oapi.dingtalk.com/robot/send?access_token={self.access_token}&timestamp={timestamp}&sign={sign}'
        
        payload = {
            "msgtype": "text",
            "text": {
                "content": message
            },
            "at": {
                "atUserIds": [user_id]
            }
        }
        
        response = requests.post(url, json=payload)
        return response.json()

@app.route('/dingtalk', methods=['POST'])
def dingtalk_webhook():
    """钉钉Webhook接口"""
    data = request.json
    
    if data.get('msgtype') == 'text':
        text = data.get('text', {}).get('content', '')
        sender_id = data.get('senderStaffId', '')
        
        # 异步处理
        asyncio.create_task(process_dingtalk_message(text, sender_id))
    
    return jsonify({"msgtype": "text", "text": {"content": "处理中..."}})

async def process_dingtalk_message(text, user_id):
    """处理钉钉消息"""
    try:
        # 配置钉钉机器人
        bot = DingTalkBot(
            app_secret=os.getenv('DINGTALK_APP_SECRET'),
            access_token=os.getenv('DINGTALK_ACCESS_TOKEN')
        )
        
        # 调用OpenClaw
        result = await claw.process_command(text)
        
        # 回复消息
        await bot.send_message(user_id, result)
        
    except Exception as e:
        print(f"钉钉处理失败: {e}")

if __name__ == '__main__':
    app.run(port=5001)

🎮 五、QQ集成(1分钟)

使用 go-cqhttp(推荐)

# 下载 go-cqhttp
wget https://github.com/Mrs4s/go-cqhttp/releases/download/v1.0.0-rc3/go-cqhttp_linux_amd64.tar.gz
tar -zxvf go-cqhttp_linux_amd64.tar.gz
cd go-cqhttp

# 生成配置文件
./go-cqhttp
# 选择 3 (HTTP通信) 和 0 (正向WebSocket)

QQ机器人服务端

# qq_bot.py
from flask import Flask, request, jsonify
import websocket
import json
import asyncio
from openclaw_core import OpenClawCore

app = Flask(__name__)
claw = OpenClawCore()

class QQBot:
    def __init__(self, ws_url="ws://127.0.0.1:6700"):
        self.ws_url = ws_url
        self.ws = None
        
    def connect(self):
        """连接WebSocket"""
        self.ws = websocket.WebSocketApp(
            self.ws_url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        self.ws.on_open = self.on_open
        self.ws.run_forever()
    
    def on_message(self, ws, message):
        """处理收到的消息"""
        data = json.loads(message)
        
        if data.get('post_type') == 'message':
            # 异步处理消息
            asyncio.create_task(self.handle_qq_message(data))
    
    async def handle_qq_message(self, data):
        """处理QQ消息"""
        try:
            message = data.get('message', '')
            user_id = data.get('user_id')
            group_id = data.get('group_id')
            
            # 调用OpenClaw
            result = await claw.process_command(message)
            
            # 回复消息
            if group_id:
                self.send_group_message(group_id, result)
            else:
                self.send_private_message(user_id, result)
                
        except Exception as e:
            print(f"QQ消息处理失败: {e}")
    
    def send_group_message(self, group_id, message):
        """发送群消息"""
        payload = {
            "action": "send_group_msg",
            "params": {
                "group_id": group_id,
                "message": message
            }
        }
        self.ws.send(json.dumps(payload))
    
    def send_private_message(self, user_id, message):
        """发送私聊消息"""
        payload = {
            "action": "send_private_msg",
            "params": {
                "user_id": user_id,
                "message": message
            }
        }
        self.ws.send(json.dumps(payload))
    
    def on_error(self, ws, error):
        print(f"WebSocket错误: {error}")
    
    def on_close(self, ws, close_status_code, close_msg):
        print("WebSocket连接关闭")
    
    def on_open(self, ws):
        print("✅ QQ机器人已连接")

@app.route('/qq', methods=['POST'])
def qq_http():
    """HTTP模式下的QQ消息接口"""
    data = request.json
    
    if data.get('post_type') == 'message':
        message = data.get('message', '')
        asyncio.create_task(claw.process_command(message))
    
    return jsonify({"status": "ok"})

if __name__ == '__main__':
    # 启动WebSocket连接
    bot = QQBot()
    bot.connect()
    
    # 启动HTTP服务器
    app.run(port=5002)

🧠 六、OpenClaw核心实现

# openclaw_core.py
import os
import asyncio
import base64
from typing import Optional
import openai
from config import Config
import pyautogui
from PIL import Image
import io

class OpenClawCore:
    """OpenClaw核心引擎"""
    
    def __init__(self):
        self.client = openai.OpenAI(api_key=Config.API_KEY)
        self.model = Config.MODEL
        
        # 初始化自动化工具
        pyautogui.FAILSAFE = True
        
    async def process_command(self, command: str) -> str:
        """处理用户命令"""
        try:
            print(f"处理命令: {command}")
            
            # 1. 分析命令类型
            command_type = self.analyze_command_type(command)
            
            # 2. 根据类型执行
            if command_type == "screen_operation":
                return await self.handle_screen_operation(command)
            elif command_type == "browser_operation":
                return await self.handle_browser_operation(command)
            elif command_type == "file_operation":
                return await self.handle_file_operation(command)
            else:
                return await self.handle_general_query(command)
                
        except Exception as e:
            return f"❌ 执行失败: {str(e)}"
    
    def analyze_command_type(self, command: str) -> str:
        """分析命令类型"""
        command = command.lower()
        
        if any(word in command for word in ['点击', '打开', '关闭', '截图', '输入', '查找']):
            return "screen_operation"
        elif any(word in command for word in ['网页', '网站', '浏览器', '搜索']):
            return "browser_operation"
        elif any(word in command for word in ['文件', '文档', '保存', '读取']):
            return "file_operation"
        else:
            return "general_query"
    
    async def handle_screen_operation(self, command: str) -> str:
        """处理屏幕操作"""
        try:
            # 截图当前屏幕
            screenshot = pyautogui.screenshot()
            
            # 保存截图
            timestamp = asyncio.get_event_loop().time()
            screenshot_path = f"{Config.SCREENSHOT_DIR}/screen_{timestamp}.png"
            screenshot.save(screenshot_path)
            
            # 转换为base64
            buffered = io.BytesIO()
            screenshot.save(buffered, format="PNG")
            img_base64 = base64.b64encode(buffered.getvalue()).decode()
            
            # 调用GPT分析截图并执行操作
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": """你是一个屏幕操作助手。
                    分析用户指令和当前屏幕截图,返回要执行的操作步骤。
                    操作格式:click(x,y) 或 type(text) 或 press(key)"""},
                    {"role": "user", "content": [
                        {"type": "text", "text": f"指令: {command}"},
                        {"type": "image_url", "image_url": {
                            "url": f"data:image/png;base64,{img_base64}"
                        }}
                    ]}
                ]
            )
            
            # 解析并执行操作
            action_text = response.choices[0].message.content
            return await self.execute_action(action_text)
            
        except Exception as e:
            return f"屏幕操作失败: {e}"
    
    async def execute_action(self, action_text: str) -> str:
        """执行操作指令"""
        try:
            # 解析操作(简化版)
            if action_text.startswith("click("):
                # 提取坐标
                coords = action_text[6:-1].split(",")
                x, y = int(coords[0]), int(coords[1])
                pyautogui.click(x, y)
                return f"✅ 已点击位置 ({x}, {y})"
                
            elif action_text.startswith("type("):
                text = action_text[5:-1]
                pyautogui.write(text)
                return f"✅ 已输入: {text}"
                
            elif action_text.startswith("press("):
                key = action_text[6:-1]
                pyautogui.press(key)
                return f"✅ 已按下按键: {key}"
                
            else:
                return f"✅ 操作完成: {action_text}"
                
        except Exception as e:
            return f"❌ 操作执行失败: {e}"
    
    async def handle_browser_operation(self, command: str) -> str:
        """处理浏览器操作"""
        from selenium import webdriver
        from selenium.webdriver.common.by import By
        from selenium.webdriver.common.keys import Keys
        
        driver = None
        try:
            # 启动浏览器
            driver = webdriver.Chrome()
            
            # 根据指令执行操作
            if "打开" in command and "http" in command:
                # 提取URL
                import re
                urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', command)
                if urls:
                    driver.get(urls[0])
                    return f"✅ 已打开: {urls[0]}"
            
            elif "搜索" in command:
                # 提取搜索关键词
                search_term = command.split("搜索")[-1].strip()
                driver.get(f"https://www.google.com/search?q={search_term}")
                return f"✅ 已搜索: {search_term}"
            
            return "✅ 浏览器操作完成"
            
        except Exception as e:
            return f"浏览器操作失败: {e}"
        finally:
            if driver:
                driver.quit()
    
    async def handle_general_query(self, query: str) -> str:
        """处理一般查询"""
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "你是一个智能助手,帮助用户完成各种任务。"},
                {"role": "user", "content": query}
            ]
        )
        
        return response.choices[0].message.content

🚀 七、一键启动脚本

# start_all.sh
#!/bin/bash

echo "🤖 启动所有机器人服务..."
echo "========================"

# 1. 检查环境
if [ ! -d "venv" ]; then
    echo "❌ 请先运行 ./deploy.sh 安装环境"
    exit 1
fi

source venv/bin/activate

# 2. 创建日志目录
mkdir -p logs

# 3. 启动各服务(使用screen/tmux)
echo "1. 启动微信机器人..."
screen -dmS wechat_bot python wechat_bot.py

echo "2. 启动飞书机器人..."
screen -dmS feishu_bot python feishu_bot.py

echo "3. 启动钉钉机器人..."
screen -dmS dingtalk_bot python dingtalk_bot.py

echo "4. 启动QQ机器人..."
screen -dmS qq_bot python qq_bot.py

echo ""
echo "✅ 所有服务已启动!"
echo ""
echo "📋 运行状态:"
echo "微信机器人: screen -r wechat_bot"
echo "飞书机器人: screen -r feishu_bot"
echo "钉钉机器人: screen -r dingtalk_bot"
echo "QQ机器人:   screen -r qq_bot"
echo ""
echo "📝 查看日志:"
echo "tail -f logs/*.log"

📊 八、监控和管理面板

# dashboard.py
from flask import Flask, render_template_string
import psutil
import json
from datetime import datetime

app = Flask(__name__)

HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
    <title>OpenClaw 监控面板</title>
    <style>
        body { font-family: Arial; margin: 20px; }
        .card { 
            border: 1px solid #ddd; 
            padding: 20px; 
            margin: 10px; 
            border-radius: 5px;
            display: inline-block;
            width: 200px;
        }
        .status-online { color: green; }
        .status-offline { color: red; }
    </style>
</head>
<body>
    <h1>🤖 OpenClaw 监控面板</h1>
    
    <div class="card">
        <h3>系统状态</h3>
        <p>CPU: {{ cpu_percent }}%</p>
        <p>内存: {{ memory_percent }}%</p>
        <p>运行时间: {{ uptime }}</p>
    </div>
    
    <div class="card">
        <h3>微信机器人</h3>
        <p class="status-{{ wechat_status }}">{{ wechat_status|upper }}</p>
        <p>消息数: {{ wechat_stats.count }}</p>
    </div>
    
    <div class="card">
        <h3>飞书机器人</h3>
        <p class="status-{{ feishu_status }}">{{ feishu_status|upper }}</p>
        <p>消息数: {{ feishu_stats.count }}</p>
    </div>
    
    <div class="card">
        <h3>最近任务</h3>
        <ul>
            {% for task in recent_tasks %}
            <li>{{ task.time }}: {{ task.command }}</li>
            {% endfor %}
        </ul>
    </div>
    
    <div>
        <h3>快速命令</h3>
        <input type="text" id="command" placeholder="输入命令...">
        <button onclick="sendCommand()">执行</button>
    </div>
    
    <script>
        function sendCommand() {
            var command = document.getElementById('command').value;
            fetch('/api/command', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({command: command})
            });
        }
        
        // 自动刷新
        setInterval(() => location.reload(), 30000);
    </script>
</body>
</html>
"""

@app.route('/')
def dashboard():
    """监控面板"""
    return render_template_string(HTML_TEMPLATE, 
        cpu_percent=psutil.cpu_percent(),
        memory_percent=psutil.virtual_memory().percent,
        uptime=str(datetime.now() - datetime.fromtimestamp(psutil.boot_time())),
        wechat_status="online",
        feishu_status="online",
        wechat_stats={"count": 123},
        feishu_stats={"count": 456},
        recent_tasks=[
            {"time": "10:30", "command": "打开浏览器"},
            {"time": "10:31", "command": "搜索信息"},
            {"time": "10:32", "command": "保存文件"}
        ]
    )

@app.route('/api/command', methods=['POST'])
def api_command():
    """API命令接口"""
    import json
    data = request.json
    # 这里可以调用OpenClaw处理命令
    return jsonify({"success": True, "message": "命令已接收"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=True)

📝 九、完整部署检查清单

# checklist.sh
#!/bin/bash

echo "🔍 部署检查清单"
echo "================"

check() {
    if command -v $1 &> /dev/null; then
        echo "✅ $1: 已安装"
    else
        echo "❌ $1: 未安装"
    fi
}

echo ""
echo "1. 基础环境检查:"
check python3.9
check pip
check git

echo ""
echo "2. Python包检查:"
source venv/bin/activate
for pkg in "openai" "flask" "itchat-uos" "selenium" "playwright"; do
    if python -c "import $pkg" 2>/dev/null; then
        echo "✅ $pkg: 已安装"
    else
        echo "❌ $pkg: 未安装"
    fi
done

echo ""
echo "3. 配置文件检查:"
if [ -f ".env" ]; then
    echo "✅ .env: 存在"
    if grep -q "OPENCLAW_API_KEY" .env; then
        echo "✅ API Key: 已配置"
    else
        echo "❌ API Key: 未配置"
    fi
else
    echo "❌ .env: 不存在"
fi

echo ""
echo "4. 目录结构检查:"
for dir in "logs" "screenshots" "plugins" "tasks"; do
    if [ -d "$dir" ]; then
        echo "✅ $dir/: 存在"
    else
        echo "❌ $dir/: 不存在"
    fi
done

echo ""
echo "📌 下一步:"
echo "1. 获取OpenAI API Key: https://platform.openai.com/api-keys"
echo "2. 编辑 .env 文件填入API_KEY"
echo "3. 运行 ./start_all.sh 启动所有服务"

🎯 十、实用命令示例

# 1. 查看所有运行的服务
screen -ls

# 2. 查看特定服务日志
tail -f logs/wechat.log

# 3. 重启服务
screen -X -S wechat_bot quit
screen -dmS wechat_bot python wechat_bot.py

# 4. 测试OpenClaw功能
echo "测试命令:"
echo "1. '截图当前屏幕'"
echo "2. '打开浏览器访问百度'"
echo "3. '搜索人工智能最新进展'"
echo "4. '点击屏幕左上角'"
echo "5. '输入Hello World'"

# 5. 更新所有依赖
pip install --upgrade -r requirements.txt

🚨 注意事项

  1. 安全性

    • 不要将API密钥上传到GitHub
    • 使用防火墙限制访问端口
    • 定期更新依赖包
  2. 稳定性

    • 使用screen/tmux保持服务运行
    • 设置日志轮转防止日志过大
    • 添加异常重试机制
  3. 性能优化

    • 截图压缩减少API调用大小
    • 使用模型缓存减少重复计算
    • 异步处理避免阻塞
  4. 平台限制

    • 微信网页版可能被封号(建议用小号测试)
    • 钉钉/飞书需要企业认证
    • QQ机器人需要服务器部署

这个保姆级教程包含了从零开始部署OpenClaw并集成到各大即时通讯平台的完整流程。按照步骤操作,10分钟内即可搭建起可用的自动化系统!

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐