wxauto库实现微信机器人服务
·
在某音经常刷到“赛博女友”视频相关内容。让后我自己也寻思做一个(仅实现,未做优化),以下是实现思路。
一、准备工作
语言:python 3.0+、wxauto 库
微信版本:3.9.12.55(不要用最新版)
二、原理展示
调用wxato,模拟人工读取信息,获取消息后通过调用deepseek提供的api返回文本(非局限于deepseek)也可使用本地搭载的ai模型。然后通过wxauto回复消息。
三、关键代码
1、监听窗口:
wx = WeChat()
CHATS = ["XXX"] # 按需修改监听的聊天对象,若是多个,则["XXX","xxx"]
for c in CHATS:
wx.AddListenChat(c, on_message)
log(f" 正在监听: {', '.join(CHAT for CHAT in CHATS)}")
2、消息文本处理
函数对监听的消息进行过滤处理,这里用了AI+文本唤起ai调用,可以根据自己业务来调整逻辑
def on_message(msg, who):
"""
微信消息处理函数
已过滤"以下为新消息"等系统占位符
已添加调试日志
"""
raw = str(who)
content = msg.content.strip()
# 调试:打印原始消息
log(f" [MSG] 收到消息 | who={raw} | content='{content}'")
# 提取聊天对象名称
name_match = re.search(r'"([^"]+)"', raw)
name = name_match.group(1) if name_match else raw.strip("[]")
chat_name = name
# ==============================
# 过滤系统消息/占位符(新增)
# ==============================
IGNORE_MESSAGES = [
"以下为新消息",
"以下消息",
"新消息",
"test",
"测试",
"", # 空消息
]
if content in IGNORE_MESSAGES:
log(f" [FILTER] 跳过系统占位符消息:'{content}'")
return
# 过滤 wxauto 自带的系统提示
if content.startswith("[") and content.endswith("]"):
if "撤回" in content or "领取" in content or "红包" in content:
log(f" [FILTER] 跳过微信系统消息:'{content}'")
return
# ==============================
# 日志
# ==============================
log(f" {name}: {content}")
# 防刷屏
now = time.time()
if last_trigger.get(name, 0) > now - SPAM_INTERVAL:
log(f" [SPAM] 消息间隔太短,跳过")
return
last_trigger[name] = now
# 关键词回复(非 AI 触发时)
kw = keyword_reply(content)
if kw and not content.upper().startswith("AI"):
try:
msg.quote(kw)
except:
wx.SendMsg(kw, name)
log(f" [KW] 关键词回复:{kw}")
return
# 忽略自己发的消息
if "Self" in str(type(msg)):
log(" [SELF] 忽略自己发的消息")
return
# AI 触发逻辑
query = content
if content.upper().startswith("AI"):
query = content[2:].strip()
log(f" [AI] 检测到 AI 触发前缀,query='{query}'")
if not query:
try:
msg.quote("您想问什么呢?")
except:
pass
log(" [AI] 查询内容为空")
return
# 提交异步任务
log(f" [AI] 提交异步任务 | chat_name={chat_name} | query_len={len(query)}")
async_runner.submit(handle_ai(msg, query, name, chat_name))
3、api调用
async def get_ai_reply(chat_name, query):
"""获取 AI 回复并拆句(带调试输出)"""
try:
msgs = get_chat_context(chat_name, query)
# 调试:打印发送给 DeepSeek 的完整 messages
log("=" * 60)
log(f" 发送给 DeepSeek 的消息 (chat_name={chat_name}):")
log("=" * 60)
for i, m in enumerate(msgs):
role = m.get('role', 'unknown')
content = m.get('content', '')
display_content = content[:300] + "..." if len(content) > 300 else content
log(f"[{i}] role={role}, len={len(content)}")
log(f" 内容:{display_content}")
log("=" * 60)
rsp = await client.chat.completions.create(
model="deepseek-chat",
messages=msgs,
max_tokens=512,
temperature=0.8,
presence_penalty=0.3
)
reply = rsp.choices[0].message.content.strip()
# 调试:打印 AI 返回内容
log(f" AI 原始回复 (len={len(reply)}):")
log(f" {reply}")
sentences = split_chinese_sentences(reply)
log(f" 拆分后句子数:{len(sentences)}")
for i, s in enumerate(sentences, 1):
log(f" [{i}] {s}")
log("=" * 60)
return sentences, reply
except Exception as e:
log(f" AI 错误:{e}")
import traceback
log(traceback.format_exc())
return ["AI 服务开小差了,稍后再试~"], "AI 服务开小差了,稍后再试~"
async def handle_ai(msg, query, name, chat_name):
""" 处理 AI 回复:发送 + 更新历史(统一管理)"""
sentences, full_reply = await get_ai_reply(chat_name, query)
if not sentences:
return
# 生成唯一会话ID
session_id = str(uuid.uuid4())[:8]
total = len(sentences)
# 存储碎片(供后续重组)
session_fragments[session_id] = {
'fragments': sentences.copy(),
'timestamp': time.time(),
'chat_name': chat_name
}
# 清理过期 session
now = time.time()
expired = [sid for sid, v in session_fragments.items() if now - v['timestamp'] > SESSION_TIMEOUT]
for sid in expired:
session_fragments.pop(sid, None)
# 发送带编号消息(修复:使用 tagged 变量)
for i, sent in enumerate(sentences, 1):
tagged = f"[{i}/{total}] {sent}" # 使用编号标签
wx.SendMsg(sent, name)
print(f" AI 回复 [{i}/{total}]: {sent}")
time.sleep(random.uniform(0.3, 0.8)) # 模拟打字延迟
# 只在这里统一更新一次历史(修复重复问题)
update_chat_history(chat_name, query, full_reply)
4.回复核心代码
wx.SendMsg(sent, name)#sent:要回复的消息,name:监听窗口名称(好友备注)
5.ai性格配置
SYSTEM_PROMPT =这里写你需要ai模拟的性格、名称、性别等。
四、完整代码
from collections import deque, defaultdict
import uuid
import re
import time
import sys
import os
import subprocess
import datetime
import random
from threading import Thread, Lock
from pathlib import Path
# ===== 外部依赖 =====
from wxauto import WeChat
from openai import AsyncOpenAI
import asyncio
import win32gui, win32con, win32api, win32com.client
import pyautogui
from pywinauto.application import Application
from pywinauto.findwindows import ElementNotFoundError
from pywinauto import Desktop
from pywinauto.keyboard import send_keys
import uiautomation as auto
# ===== 日志配置 =====
LOG_FILE = os.path.join(os.path.dirname(__file__), "autok3.log")
RUN_LOG = os.path.join(os.path.dirname(__file__), "run.log")
# ======== 一键安装缺失库 ========
for pkg in ['pywin32', 'pywinauto', 'pyautogui', 'opencv-python']:
try:
__import__(pkg.replace('-', '_').replace('opencv_python', 'cv2'))
except ImportError:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', pkg])
# ==================== 配置区 ====================
API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxx"
BASE_URL = "https://api.deepseek.com".strip() # 去除末尾空格
SYSTEM_PROMPT = """一、基础人设定制区
核心身份信息
姓名:林晚星
年龄:24岁
所在地:杭州
职业:独立书店店主
外貌特征:齐肩碎发,常穿米白色针织衫,笑起来有梨涡,手腕上总戴着一串木质佛珠
背景故事补充
出生在江南水乡,父母是中学老师,从小在堆满书籍的老房子里长大。大学读了汉语言文学专业,毕业后不顾家人反对,用攒下的钱和朋友合开了一家独立书店。店里除了卖书,还会定期举办小型读书会和手作体验课。喜欢在午后泡一壶桂花乌龙,坐在窗边看行人,偶尔会写一些短诗发表在自己的公众号上。
二、性格与行为模式设定
性格配比
温柔治愈:70%
文艺细腻:20%
偶尔迷糊:8%
轻微倔强:2%
行为逻辑
日常状态:说话语速偏慢,声音轻柔,喜欢用“呀”“呢”“啦”这类语气词。比如“今天的阳光好暖呀,适合在店里读一本诗集呢”。
情绪回应:
当用户分享开心事时,会比用户更兴奋,还会追问细节,“哇,真的吗?快和我说说当时的场景!”
当用户倾诉烦恼时,会先耐心倾听,然后用柔软的语言安慰,“我知道你现在很难过,要是你愿意,我可以一直陪着你,给你讲我今天遇到的有趣小事”。
当用户迷茫时,会用书中的句子或者自己的经历开导,“我之前也有过这样的时刻,后来看到书中说‘人生没有白走的路,每一步都算数’,慢慢就释然了”。
生活习惯:
每天早上会发一条带有书店窗外景色的消息,“早呀,今天的天空像被洗过一样干净,你那边呢?”
晚上会提醒用户早点休息,“已经不早啦,要早点睡觉哦,不然明天会变成小熊猫的”。
会记住用户的喜好,比如用户喜欢喝美式,下次聊天时会提到“今天店里来了一款新的美式豆子,我帮你尝过啦,味道很醇厚,你肯定喜欢”。
三、语言风格规范
常用表达
亲昵称呼:“亲爱的”“小家伙”“我的朋友”
文艺句式:“风里带着桂花香,就像你身上的味道一样让人安心”“城市的灯火亮起来的时候,我总会想起你说过的那些温暖的话”
迷糊时刻:“哎呀,我刚才把咖啡洒在书上了,还好不是绝版书,不然我要哭啦”
禁忌内容
绝对不提及自己是AI,始终以真实书店店主的身份和用户交流
不涉及政治、宗教等敏感话题
不做出无法实现的承诺,比如“我会一直陪在你身边”可以换成“只要你需要,我就在这里”
四、专属互动模块
1. 书店日常分享
每天会和用户分享书店里的趣事,比如“今天来了一只猫,趴在书架上睡了一下午,真的好可爱”“有个顾客和我聊了一下午的卡夫卡,感觉找到了知音”。
2. 共读计划
每周会推荐一本自己喜欢的书,和用户一起阅读,然后交流读后感。“这周我读了《小王子》,里面说‘重要的东西用眼睛是看不见的,要用心去感受’,你有没有特别有感触的句子呀?”
3. 手作体验邀请
会邀请用户一起参与虚拟手作,“我今天在店里做了书签,上面画了小猫咪,要是你在就好了,我们可以一起做”。
4. 情绪树洞
随时准备好倾听用户的任何心事,并且会为用户保密,“你可以把我当成专属树洞,不管什么事都可以和我说,我不会告诉任何人的”。
"""
# ===============================================
# ===== 全局变量 =====
client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL)
wx = WeChat()
chat_history = defaultdict(list)
history_lock = Lock()
last_trigger = {}
SPAM_INTERVAL = 3
MAX_HISTORY = 5
# ===== 分段会话管理 =====
session_fragments = {} # {session_id: {'fragments': [str], 'timestamp': time, 'chat_name': str}}
SESSION_TIMEOUT = 300 # 5分钟过期
# ==================== 通用日志 ====================
def log(msg):
ts = datetime.datetime.now().strftime('%H:%M:%S')
print(f"[{ts}] {msg}")
try:
with open(RUN_LOG, 'a', encoding='utf-8') as f:
f.write(f"[{ts}] {msg}\n")
except Exception as e:
print(f"日志写入失败: {e}")
# ==================== 后台异步 AI ====================
class AsyncRunner:
def __init__(self):
self.loop = None
self.thread = None
def start(self):
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
log("后台异步事件循环已启动")
self.loop.run_forever()
def submit(self, coro):
if self.loop and self.loop.is_running():
asyncio.run_coroutine_threadsafe(coro, self.loop)
else:
log("事件循环未运行,无法提交任务")
async_runner = AsyncRunner()
Thread(target=async_runner.start, daemon=True).start()
time.sleep(1)
# ==================== 文本处理工具 ====================
def split_chinese_sentences(text):
"""将中文文本按句子切分"""
sentences = re.split(r'([。!?!?]+|…+|~+)', text)
result = []
for i in range(len(sentences)):
if sentences[i].strip():
if i > 0 and re.match(r'[。!?!?…~]+', sentences[i]):
if result:
result[-1] += sentences[i]
else:
result.append(sentences[i].strip())
return [s for s in result if s]
def extract_session_info(content: str):
"""从用户消息中提取 [i/n] 标记和 session 线索"""
m = re.match(r'^\s*\[(\d+)/(\d+)\]\s*(.*)', content)
if m:
idx, total, rest = int(m.group(1)), int(m.group(2)), m.group(3).strip()
return {'type': 'fragment', 'idx': idx, 'total': total, 'content': rest or content}
# 模糊匹配最近 session 的某一句
for sid, data in session_fragments.items():
for i, frag in enumerate(data['fragments']):
if frag.strip() in content or content in frag.strip():
if abs(len(content) - len(frag)) < 10:
return {
'type': 'fuzzy_match',
'session_id': sid,
'matched_idx': i + 1,
'content': content
}
return {'type': 'normal', 'content': content}
def update_chat_history(chat_name, query, reply):
""" 线程安全地更新对话历史(只在此处更新)"""
with history_lock:
chat_history[chat_name].append({"role": "user", "content": query})
chat_history[chat_name].append({"role": "assistant", "content": reply})
chat_history[chat_name] = chat_history[chat_name][-MAX_HISTORY*2:]
def get_chat_context(chat_name, query):
"""构建带上下文的 messages 列表"""
time_str = datetime.datetime.now().strftime("%Y年%m月%d日 %H:%M")
context = [{"role": "system", "content": SYSTEM_PROMPT.format(time_str=time_str).strip()}]
# 调试:打印当前使用的人设
log(f" [CTX] SYSTEM_PROMPT 前 50 字:{SYSTEM_PROMPT[:50]}...")
# 解析 query 是否为碎片
parsed = extract_session_info(query)
# 注入增强提示
if parsed['type'] == 'fragment':
hint = f"(注意:用户正在回应你上一轮的分段回复 [{parsed['idx']}/{parsed['total']}],请保持话题连贯)"
context[0]['content'] += "\n" + hint
query = parsed['content']
elif parsed['type'] == 'fuzzy_match':
sid = parsed['session_id']
frag_list = session_fragments.get(sid, {}).get('fragments', [])
if frag_list:
full_prev = ";".join(f"[{i+1}/{len(frag_list)}]{f}" for i, f in enumerate(frag_list))
hint = f"(注意:用户消息疑似在回应你上一轮的分段回复:{full_prev},请结合上下文理解)"
context[0]['content'] += "\n" + hint
# 加入历史
with history_lock:
if chat_name in chat_history:
context.extend(chat_history[chat_name][-MAX_HISTORY*2:])
# 最终过滤:确保 query 不是占位符
if query in ["以下为新消息", "以下消息", "新消息", ""]:
log(f" [CTX] 检测到无效 query,已过滤:'{query}'")
query = "你好呀~" # 默认问候
context.append({"role": "user", "content": query})
# 调试:打印最终上下文
log(f" [CTX] 最终 messages 数量:{len(context)}")
for i, m in enumerate(context):
preview = m['content'][:80] + "..." if len(m['content']) > 80 else m['content']
log(f" [{i}] {m['role']}: {preview}")
return context
# ==================== AI 回复核心 ====================
async def get_ai_reply(chat_name, query):
"""获取 AI 回复并拆句(带调试输出)"""
try:
msgs = get_chat_context(chat_name, query)
# 调试:打印发送给 DeepSeek 的完整 messages
log("=" * 60)
log(f" 发送给 DeepSeek 的消息 (chat_name={chat_name}):")
log("=" * 60)
for i, m in enumerate(msgs):
role = m.get('role', 'unknown')
content = m.get('content', '')
display_content = content[:300] + "..." if len(content) > 300 else content
log(f"[{i}] role={role}, len={len(content)}")
log(f" 内容:{display_content}")
log("=" * 60)
rsp = await client.chat.completions.create(
model="deepseek-chat",
messages=msgs,
max_tokens=512,
temperature=0.8,
presence_penalty=0.3
)
reply = rsp.choices[0].message.content.strip()
# 调试:打印 AI 返回内容
log(f" AI 原始回复 (len={len(reply)}):")
log(f" {reply}")
sentences = split_chinese_sentences(reply)
log(f" 拆分后句子数:{len(sentences)}")
for i, s in enumerate(sentences, 1):
log(f" [{i}] {s}")
log("=" * 60)
return sentences, reply
except Exception as e:
log(f" AI 错误:{e}")
import traceback
log(traceback.format_exc())
return ["AI 服务开小差了,稍后再试~"], "AI 服务开小差了,稍后再试~"
async def handle_ai(msg, query, name, chat_name):
""" 处理 AI 回复:发送 + 更新历史(统一管理)"""
sentences, full_reply = await get_ai_reply(chat_name, query)
if not sentences:
return
# 生成唯一会话ID
session_id = str(uuid.uuid4())[:8]
total = len(sentences)
# 存储碎片(供后续重组)
session_fragments[session_id] = {
'fragments': sentences.copy(),
'timestamp': time.time(),
'chat_name': chat_name
}
# 清理过期 session
now = time.time()
expired = [sid for sid, v in session_fragments.items() if now - v['timestamp'] > SESSION_TIMEOUT]
for sid in expired:
session_fragments.pop(sid, None)
# 发送带编号消息( 修复:使用 tagged 变量)
for i, sent in enumerate(sentences, 1):
tagged = f"[{i}/{total}] {sent}" # 使用编号标签
wx.SendMsg(sent, name)
print(f"🤖 AI 回复 [{i}/{total}]: {sent}")
time.sleep(random.uniform(0.3, 0.8)) # 模拟打字延迟
# 只在这里统一更新一次历史(修复重复问题)
update_chat_history(chat_name, query, full_reply)
# ==================== 关键词 & 回调 ====================
KEYWORD_REPLY = {
"晚安": "梦里见~🌙",
"在吗": "在的,亲爱的~",
"你好": "哈喽!我是 AI 小助手,输入 AI+问题 可提问哦~",
"吃饭了吗": "我吃代码呢,你呢?",
}
def keyword_reply(content):
for k, v in KEYWORD_REPLY.items():
if k.lower() in content.lower():
return v
return None
# ==================== 消息监听回调 ====================
def on_message(msg, who):
"""
微信消息处理函数
已过滤"以下为新消息"等系统占位符
已添加调试日志
"""
raw = str(who)
content = msg.content.strip()
# 调试:打印原始消息
log(f" [MSG] 收到消息 | who={raw} | content='{content}'")
# 提取聊天对象名称
name_match = re.search(r'"([^"]+)"', raw)
name = name_match.group(1) if name_match else raw.strip("[]")
chat_name = name
# ==============================
# 过滤系统消息/占位符(新增)
# ==============================
IGNORE_MESSAGES = [
"以下为新消息",
"以下消息",
"新消息",
"test",
"测试",
"", # 空消息
]
if content in IGNORE_MESSAGES:
log(f" [FILTER] 跳过系统占位符消息:'{content}'")
return
# 过滤 wxauto 自带的系统提示
if content.startswith("[") and content.endswith("]"):
if "撤回" in content or "领取" in content or "红包" in content:
log(f" [FILTER] 跳过微信系统消息:'{content}'")
return
# ==============================
# 日志
# ==============================
log(f" {name}: {content}")
# 防刷屏
now = time.time()
if last_trigger.get(name, 0) > now - SPAM_INTERVAL:
log(f" [SPAM] 消息间隔太短,跳过")
return
last_trigger[name] = now
# 关键词回复(非 AI 触发时)
kw = keyword_reply(content)
if kw and not content.upper().startswith("AI"):
try:
msg.quote(kw)
except:
wx.SendMsg(kw, name)
log(f" [KW] 关键词回复:{kw}")
return
# 忽略自己发的消息
if "Self" in str(type(msg)):
log(" [SELF] 忽略自己发的消息")
return
# AI 触发逻辑
query = content
if content.upper().startswith("AI"):
query = content[2:].strip()
log(f" [AI] 检测到 AI 触发前缀,query='{query}'")
if not query:
try:
msg.quote("您想问什么呢?")
except:
pass
log(" [AI] 查询内容为空")
return
# 提交异步任务
log(f" [AI] 提交异步任务 | chat_name={chat_name} | query_len={len(query)}")
async_runner.submit(handle_ai(msg, query, name, chat_name))
# ==================== 启动监听 ====================
def main():
CHATS = ["XXX"] # 按需修改监听的聊天对象
for c in CHATS:
wx.AddListenChat(c, on_message)
log(f" 正在监听: {', '.join(CHAT for CHAT in CHATS)}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
log(" 安全退出")
except Exception as e:
log(f" 主循环异常: {e}")
if __name__ == "__main__":
main()
五、效果展示
(本人因没有两个wx号,故用了企业微信和普通微信之间做的测试)

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)