影刀RPA实战:企业微信自动化消息推送与群机器人开发指南
·
影刀RPA实战:企业微信自动化消息推送与群机器人开发指南
作者:林焱 | 影刀RPA技术专栏
标签: 影刀RPA / 企业微信 / 消息推送 / 群机器人 / 自动化通知 / Webhook
适合读者: 需要定时发送报告、异常告警、业务通知的RPA开发者
前言:为什么企业微信是RPA最佳通知渠道?
RPA流程跑完了,结果怎么通知人?
- 发邮件:不及时,容易被忽略
- 钉钉:需要单独集成
- 短信:成本高,不适合高频

企业微信群机器人 是最佳选择:
- ✅ 免费,无限发送次数(合理使用)
- ✅ 支持Markdown格式,可发富文本消息
- ✅ 支持@指定成员,保证重要消息被看到
- ✅ 支持图片、文件、图文卡片等多种消息类型
- ✅ 手机端/PC端实时推送
本文将带你从零开始,用影刀RPA搭建一套完整的企业微信自动化通知体系,覆盖:日报推送、异常告警、数据汇报、审批提醒等常用场景。
第一章:企业微信群机器人基础
1.1 创建群机器人
步骤:
-
打开企业微信,进入目标群聊
-
点击右上角「…」→「添加群机器人」
-
点击「新创建一个机器人」
-
输入机器人名称(如:RPA自动通知机器人)
-

-
复制生成的 Webhook URL(非常重要,保存好)
Webhook URL 格式:
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
1.2 消息类型总览
店群矩阵自动化突破运营极限!
| 消息类型 | 适用场景 | 支持格式 |
|---|---|---|
| text | 纯文字通知,可@成员 | 普通文本 |
| markdown | 富文本报告,支持标题/表格/加粗 | Markdown |
| image | 发送图片(Base64或URL) | JPG/PNG |
| news | 图文卡片,适合链接分享 | 带链接的卡片 |
| file | 发送文件(需先上传获取media_id) | 各种文件 |
第二章:影刀RPA发送企业微信消息
2.1 基础文本消息

import urllib.request
import json
def send_wecom_text(webhook_url: str, content: str,
mentioned_list: list = None,
mentioned_mobile_list: list = None) -> dict:
"""
发送文本消息到企业微信群
参数:
- webhook_url: 群机器人的Webhook地址
- content: 消息内容(最大2048字节)
- mentioned_list: @的成员列表(userid或@all)
- mentioned_mobile_list: @的手机号列表
返回:API响应字典
"""
data = {
"msgtype": "text",
"text": {
"content": content,
"mentioned_list": mentioned_list or [],
"mentioned_mobile_list": mentioned_mobile_list or []
}
}
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
webhook_url,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode("utf-8"))
if result.get("errcode") == 0:
print("✅ 消息发送成功")
else:
print(f"❌ 发送失败:{result.get('errmsg')}")
return result
# 使用示例
WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的key"
# 普通文本消息
send_wecom_text(WEBHOOK, "RPA任务执行完成,共处理200条订单数据。")
# @所有人
send_wecom_text(WEBHOOK, "⚠️ 系统异常!请立即处理!", mentioned_list=["@all"])
# @指定人(用userid)
send_wecom_text(WEBHOOK, "你好,请查看今日报表", mentioned_list=["zhangsan", "lisi"])
2.2 Markdown格式消息(推荐)
Markdown消息是企业微信群机器人中最强大的消息类型,支持:
- 标题(最多3级)
- 加粗、
删除线 代码块-
引用
- 有序/无序列表
- 超链接
- 颜色标签(标签)
def send_wecom_markdown(webhook_url: str, content: str) -> dict:
"""
发送Markdown格式消息
注意:企业微信Markdown是其特有子集,
不支持所有标准Markdown语法
"""
data = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
webhook_url,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
# Markdown消息模板示例
def build_daily_report_markdown(date: str, stats: dict) -> str:
"""构建每日数据报告Markdown消息"""
success_rate = (stats["success"] / stats["total"] * 100) if stats["total"] > 0 else 0
status_icon = "✅" if success_rate >= 95 else "⚠️" if success_rate >= 80 else "❌"
return f"""## {status_icon} RPA日报 - {date}
**任务执行统计**
> 执行时间:{stats.get("start_time", "N/A")} ~ {stats.get("end_time", "N/A")}
| 指标 | 数值 |
|------|------|
| 总任务数 | **{stats["total"]}** |
| 成功 | <font color="info">{stats["success"]}</font> |
| 失败 | <font color="warning">{stats.get("failed", 0)}</font> |
| 跳过 | {stats.get("skipped", 0)} |
| 成功率 | **{success_rate:.1f}%** |
**处理详情**
- 采集商品数据:{stats.get("scraped_items", 0)} 条
- 导出Excel报表:{stats.get("excel_files", 0)} 个
- 发送邮件:{stats.get("sent_emails", 0)} 封
{f'**⚠️ 异常信息**{chr(10)}> {stats["error_summary"]}' if stats.get("error_summary") else ''}
[查看详细日志](http://rpa-monitor.company.com/logs/{date})
"""
# 发送日报示例
stats_data = {
"total": 200,
"success": 195,
"failed": 3,
"skipped": 2,
"scraped_items": 1580,
"excel_files": 5,
"sent_emails": 12,
"start_time": "08:30:00",
"end_time": "09:15:23"
}
markdown_content = build_daily_report_markdown("2024-01-15", stats_data)
send_wecom_markdown(WEBHOOK, markdown_content)
2.3 图片消息
import base64
import hashlib
def send_wecom_image(webhook_url: str, image_path: str) -> dict:
"""
发送图片消息(图片转Base64)
限制:图片大小不超过2MB,仅支持JPG/PNG
"""
with open(image_path, "rb") as f:
image_data = f.read()
image_base64 = base64.b64encode(image_data).decode("utf-8")
image_md5 = hashlib.md5(image_data).hexdigest()
data = {
"msgtype": "image",
"image": {
"base64": image_base64,

"md5": image_md5
}
}
req_data = json.dumps(data).encode("utf-8")
req = urllib.request.Request(
webhook_url,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
# 发送图表截图
send_wecom_image(WEBHOOK, "C:/RPA/output/charts/daily_trend.png")
2.4 图文卡片消息
def send_wecom_news(webhook_url: str, articles: list) -> dict:
"""
发送图文卡片消息(最多8篇文章)
articles格式:[{
"title": "标题",
"description": "描述", # 可选
"url": "点击跳转URL",
"picurl": "图片URL" # 可选
}]
"""
data = {
"msgtype": "news",
"news": {
"articles": articles[:8] # 最多8篇
}
}
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
webhook_url,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
# 发送每周工作报告卡片
send_wecom_news(WEBHOOK, [
{
"title": "📊 本周RPA运行报告(2024-W03)",
"description": "本周共处理8500条数据,节省工时28小时",
"url": "http://intranet.company.com/rpa/weekly/2024W03",
"picurl": "https://example.com/images/report_cover.png"
},
{
"title": "🚀 新上线:财务报表自动化流程",
"description": "月末对账时间从4小时缩短至15分钟",
"url": "http://intranet.company.com/rpa/flows/finance",
}
])
第三章:实战场景一——RPA异常告警系统
3.1 设计思路
异常告警的核心要求:
- 及时性:异常发生后30秒内通知到人
- 可操作性:消息要包含足够的上下文,让值班人员知道该怎么处理
- 不打扰:同类异常不要重复发送(去重)
- 分级:区分紧急/一般/可忽略三个等级
3.2 告警系统完整实现


import json
import datetime
import hashlib
import urllib.request
from collections import defaultdict
class WeComAlertSystem:
"""企业微信RPA告警系统"""
ALERT_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的key"
# 告警等级配置
LEVELS = {
"critical": {
"icon": "🔴",
"prefix": "【紧急告警】",
"mention": ["@all"],
"dedupe_seconds": 300 # 5分钟内不重复
},
"warning": {
"icon": "🟡",
"prefix": "【警告】",
"mention": ["zhangsan"], # 只通知负责人
"dedupe_seconds": 1800 # 30分钟内不重复
},
"info": {
"icon": "🔵",
"prefix": "【提示】",
"mention": [],
"dedupe_seconds": 3600 # 1小时内不重复
}
}
def __init__(self):
self._sent_alerts = {} # {alert_hash: last_sent_time}
def _get_alert_hash(self, level: str, task_name: str, error_type: str) -> str:
"""计算告警去重哈希"""
key = f"{level}:{task_name}:{error_type}"
return hashlib.md5(key.encode()).hexdigest()[:8]
def _is_duplicate(self, alert_hash: str, dedupe_seconds: int) -> bool:
"""检查是否是重复告警"""
if alert_hash not in self._sent_alerts:
return False
last_sent = self._sent_alerts[alert_hash]
elapsed = (datetime.datetime.now() - last_sent).seconds
return elapsed < dedupe_seconds
def alert(self, level: str, task_name: str, error_type: str,
error_detail: str, suggestion: str = "", context: dict = None) -> bool:
"""
发送告警消息
参数:
- level: 告警级别 (critical/warning/info)
- task_name: 发生问题的任务名
- error_type: 错误类型简短描述
- error_detail: 详细错误信息
- suggestion: 建议处理方式
- context: 额外上下文信息(字典)
返回:是否成功发送(重复告警返回False)
"""
level_config = self.LEVELS.get(level, self.LEVELS["info"])
dedupe_seconds = level_config["dedupe_seconds"]
alert_hash = self._get_alert_hash(level, task_name, error_type)
# 去重检查
if self._is_duplicate(alert_hash, dedupe_seconds):
print(f"ℹ️ 告警已去重({dedupe_seconds}s内不重复发送):{error_type}")
return False
# 构建Markdown消息
icon = level_config["icon"]
prefix = level_config["prefix"]
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = f"""{icon} {prefix} {task_name}
**错误类型:** {error_type}
**发生时间:** {now}
**错误详情:**
> {error_detail[:500]}
"""
if context:
content += "**上下文信息:**\n"
for k, v in context.items():
content += f"- {k}:`{v}`\n"
content += "\n"
if suggestion:
content += f"**处理建议:**\n> {suggestion}\n\n"
content += f"*告警ID:{alert_hash}*"
# 发送消息
data = {
"msgtype": "markdown",
"markdown": {"content": content}
}
mention_list = level_config["mention"]
if mention_list:
# 如果有@信息,先发text消息@人,再发markdown内容
self._send_text_mention(mention_list, task_name, prefix)
result = self._send_request(data)
if result.get("errcode") == 0:
self._sent_alerts[alert_hash] = datetime.datetime.now()
return True
return False
def _send_text_mention(self, mention_list: list, task_name: str, prefix: str):
"""发送@消息"""
data = {
"msgtype": "text",
"text": {
"content": f"{prefix} {task_name} 需要处理",
"mentioned_list": mention_list
}
}
self._send_request(data)
def _send_request(self, data: dict) -> dict:
"""发送HTTP请求到企业微信"""
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
self.ALERT_WEBHOOK,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(f"发送消息失败:{e}")
return {"errcode": -1, "errmsg": str(e)}
# ===== 在RPA流程中使用 =====
alert_system = WeComAlertSystem()
def run_order_scrape():
"""订单采集主流程(带完整告警)"""
task_name = "京东店铺订单采集"
try:
# 检查环境
if not check_vpn_connected():
alert_system.alert(
level="critical",
task_name=task_name,
error_type="网络环境异常",
error_detail="VPN连接断开,无法访问内网系统",
suggestion="请检查VPN连接状态,重新连接后手动重启任务",
context={"服务器地址": "10.0.1.100", "端口": "443"}
)
return
# 登录
try:
login_jd_shop()
except LoginFailedException as e:
alert_system.alert(
level="critical",
task_name=task_name,
error_type="登录失败",
error_detail=str(e),
suggestion="可能密码已更改或账号被锁定,请手动登录检查",
context={"账号": "shop_admin", "错误码": getattr(e, "code", "N/A")}
)
return
# 采集过程
total = 0
errors = 0
for page_num in range(1, 100):
try:
items = scrape_order_page(page_num)
if not items:
break
save_orders_to_excel(items)
total += len(items)
except PageLoadTimeoutError:
errors += 1
alert_system.alert(
level="warning",
task_name=task_name,
error_type="页面加载超时",
error_detail=f"第{page_num}页超时,已跳过",
suggestion="网络较慢,已自动跳过该页,数据可能不完整",
context={"页码": page_num, "当前采集量": total}
)
continue
# 完成通知
send_wecom_markdown(
WeComAlertSystem.ALERT_WEBHOOK,
f"""✅ **{task_name}完成**
- 采集订单:{total} 条
- 跳过页面:{errors} 页
- 完成时间:{datetime.datetime.now().strftime("%H:%M:%S")}
"""
)
except Exception as e:
alert_system.alert(
level="critical",
task_name=task_name,
error_type="未知异常导致任务中断",
error_detail=str(e)[:300],
suggestion="请查看详细日志,必要时手动执行任务"
)
第四章:实战场景二——定时数据播报
4.1 早报系统(每日8:30)
import datetime
import json
class DailyBriefingBot:
"""每日数据早报机器人"""
def __init__(self, webhook_url: str):
self.webhook = webhook_url
def generate_morning_report(self) -> str:
"""生成早报内容"""
today = datetime.date.today()
weekday_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
weekday = weekday_names[today.weekday()]
# 获取各项数据(实际从数据库/Excel读取)
data = self._collect_daily_data()
# 今日任务待办
tasks = self._get_pending_tasks()
task_list = "\n".join([f"- [ ] {t}" for t in tasks[:5]])
return f"""# 📅 早报 {today.strftime("%Y年%m月%d日")} {weekday}
## 📊 昨日数据汇总
| 业务线 | 订单量 | 销售额 | 环比 |
|--------|--------|--------|------|
| 电商主站 | {data["ec_orders"]} | ¥{data["ec_revenue"]:,.0f} | {data["ec_growth"]:+.1f}% |
| 批发渠道 | {data["wholesale_orders"]} | ¥{data["wholesale_revenue"]:,.0f} | {data["wholesale_growth"]:+.1f}% |
| 海外仓 | {data["oversea_orders"]} | ¥{data["oversea_revenue"]:,.0f} | {data["oversea_growth"]:+.1f}% |
## 🤖 RPA今日任务
{task_list}
## ⚠️ 待处理事项
{self._get_pending_issues()}
---
*由RPA自动播报 · {datetime.datetime.now().strftime("%H:%M")}*
"""
def _collect_daily_data(self) -> dict:
"""从各数据源收集昨日数据"""
# 实际实现:读取Excel/数据库/API
return {
"ec_orders": 1258, "ec_revenue": 158000, "ec_growth": 12.5,
"wholesale_orders": 86, "wholesale_revenue": 285000, "wholesale_growth": -3.2,
"oversea_orders": 234, "oversea_revenue": 96000, "oversea_growth": 28.1
}
def _get_pending_tasks(self) -> list:
"""获取今日待执行的RPA任务"""
return [
"09:00 - 电商平台订单同步",
"10:00 - 发货单打印(约320单)",
"14:00 - 库存盘点数据采集",
"16:00 - 日报Excel生成",
"17:30 - 日报邮件发送"
]
def _get_pending_issues(self) -> str:
"""获取待处理异常"""
return """> ⚠️ 京东店铺3个订单待人工审核(金额超过5000元)
> ⚠️ 仓库系统API昨日18:00-19:30响应缓慢,相关数据已标记待核实"""
def send_morning_report(self):
"""发送早报"""
content = self.generate_morning_report()
data = {
"msgtype": "markdown",
"markdown": {"content": content}
}
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
self.webhook,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
# 在影刀定时任务中调用
bot = DailyBriefingBot(WEBHOOK)
bot.send_morning_report()
4.2 实时进度播报(大批量任务)
class ProgressReporter:
"""大批量任务进度实时播报"""
def __init__(self, webhook_url: str, task_name: str, total_items: int,
report_interval: int = 100):
"""
参数:
- report_interval: 每处理多少条汇报一次
"""
self.webhook = webhook_url
self.task_name = task_name
self.total = total_items
self.report_interval = report_interval
self.current = 0
self.errors = 0
self.start_time = datetime.datetime.now()
def update(self, success: bool = True):
"""更新进度(每处理一条调用一次)"""
self.current += 1
if not success:
self.errors += 1
# 每隔N条或完成时汇报
if self.current % self.report_interval == 0 or self.current == self.total:
self._send_progress()
def _send_progress(self):
"""发送进度消息"""
elapsed = (datetime.datetime.now() - self.start_time).seconds
progress_pct = self.current / self.total * 100
# 估算剩余时间
if self.current > 0:
speed = self.current / max(elapsed, 1) # 条/秒
remaining_items = self.total - self.current
eta_seconds = int(remaining_items / speed)
eta_str = f"{eta_seconds // 60}分{eta_seconds % 60}秒"
else:
eta_str = "计算中..."
# 进度条(20个字符)
filled = int(progress_pct / 5)
bar = "▓" * filled + "░" * (20 - filled)
is_done = self.current >= self.total
status = "✅ **已完成**" if is_done else "⏳ **进行中**"
content = f"""{status} {self.task_name}
`{bar}` {progress_pct:.0f}%
- 已处理:**{self.current}** / {self.total}
- 成功:{self.current - self.errors} | 失败:{self.errors}
- 耗时:{elapsed // 60}分{elapsed % 60}秒
{f'- 预计剩余:{eta_str}' if not is_done else f'- 总耗时:{elapsed // 60}分{elapsed % 60}秒'}
"""
data = {
"msgtype": "markdown",
"markdown": {"content": content}
}
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
self.webhook,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=5) as resp:
pass
except Exception:
pass # 进度播报失败不影响主任务
# 使用示例
reporter = ProgressReporter(WEBHOOK, "全量商品数据同步", total_items=5000,
report_interval=500)
for item in all_products:
try:
process_product(item)
reporter.update(success=True)
except Exception:
reporter.update(success=False)
第五章:实战场景三——审批提醒与交互
5.1 审批待处理提醒
def send_approval_reminder(webhook_url: str, approvals: list):
"""
发送审批待处理提醒
approvals: [{
"id": "AP-2024-001",
"type": "采购申请",
"applicant": "张三",
"amount": 15000,
"submitted_at": "2024-01-15 14:30",
"approver_userid": "lisi"
}]
"""
if not approvals:
return
approval_list = ""
for ap in approvals[:10]: # 最多显示10条
amount_str = f"¥{ap['amount']:,.0f}" if ap.get("amount") else ""
approval_list += f"- **{ap['id']}** | {ap['type']} | {ap['applicant']} | {amount_str}\n"
approver_mentions = list(set([f"<@{ap['approver_userid']}>" for ap in approvals]))
mentions_str = " ".join(approver_mentions[:3]) # 最多@3人
content = f"""⏰ **审批提醒** {mentions_str}
您有 **{len(approvals)}** 个审批待处理:
{approval_list}
{'> ...' if len(approvals) > 10 else ''}
[👉 前往审批系统处理](http://oa.company.com/approval)
*最后更新:{datetime.datetime.now().strftime("%H:%M")}*
"""
data = {
"msgtype": "markdown",
"markdown": {"content": content}
}
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
webhook_url,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
第六章:多Webhook管理与路由
6.1 消息路由器
temu店群自动化报活动案例

实际企业中,不同类型的消息需要发送到不同的群组:
class WeComMessageRouter:
"""企业微信消息路由器(多群管理)"""
WEBHOOKS = {
"ops_team": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=运维群key",
"sales_team": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=销售群key",
"finance_team": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=财务群key",
"all_staff": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=全员群key",
"rpa_monitor": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=RPA监控群key",
}
# 消息类型到群组的映射规则
ROUTING_RULES = {
"system_alert": ["ops_team", "rpa_monitor"], # 系统告警→运维+RPA监控
"daily_sales": ["sales_team", "finance_team"], # 日销数据→销售+财务
"monthly_report": ["all_staff"], # 月报→全员
"rpa_status": ["rpa_monitor"], # RPA状态→监控群
"approval_reminder": ["ops_team"], # 审批提醒→运维(看实际配置)
}
def route_message(self, message_type: str, content: str,
msg_format: str = "markdown") -> list:
"""
根据消息类型路由到对应群组
返回:各群发送结果列表
"""
target_groups = self.ROUTING_RULES.get(message_type, ["rpa_monitor"])
results = []
for group_name in target_groups:
webhook = self.WEBHOOKS.get(group_name)
if not webhook:
print(f"⚠️ 未找到群组配置:{group_name}")
continue
data = {
"msgtype": msg_format,
msg_format: {"content": content}
}
try:
req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
webhook,
data=req_data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode("utf-8"))
results.append({"group": group_name, "success": result["errcode"] == 0})
print(f"{'✅' if result['errcode'] == 0 else '❌'} 发送到 {group_name}:{result.get('errmsg')}")
except Exception as e:
results.append({"group": group_name, "success": False, "error": str(e)})
return results
# 使用示例
router = WeComMessageRouter()
# 系统异常同时发到运维群和监控群
router.route_message(
"system_alert",
"🔴 **紧急:数据库连接超时!** 请立即检查DB服务器状态"
)
# 日销数据发到销售+财务群
router.route_message(
"daily_sales",
"📊 **昨日销售数据** 总额:¥1,258,000,同比增长8.5%"
)
第七章:最佳实践与注意事项
7.1 频率限制
企业微信群机器人有频率限制:
- 同一群机器人:每分钟最多发送20条
- 超出限制会返回
errcode: 45009 
import time
from collections import deque
class RateLimiter:
"""发送频率限制器"""
def __init__(self, max_per_minute: int = 18): # 留2条余量
self.max_per_minute = max_per_minute
self.sent_times = deque()
def wait_if_needed(self):
"""在发送前调用,必要时等待"""
now = time.time()
# 清除1分钟前的记录
while self.sent_times and now - self.sent_times[0] > 60:
self.sent_times.popleft()
# 如果达到限制,等待
if len(self.sent_times) >= self.max_per_minute:
wait_until = self.sent_times[0] + 61 # 等到最早记录过期
wait_seconds = max(0, wait_until - now)
if wait_seconds > 0:
print(f"⏳ 发送频率限制,等待 {wait_seconds:.1f}s")
time.sleep(wait_seconds)
self.sent_times.append(time.time())
rate_limiter = RateLimiter()
def send_message_safely(webhook: str, data: dict):
rate_limiter.wait_if_needed()
# ... 发送逻辑
7.2 消息长度限制
| 消息类型 | 内容长度限制 |
|---|---|
| text | 2048字节 |
| markdown | 4096字节 |
| news标题 | 128字节 |
| news描述 | 512字节 |
7.3 安全建议
# ❌ 不要这样做:Webhook URL硬编码
WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=abc123"
# ✅ 正确做法:从环境变量或加密配置文件读取
import os
WEBHOOK = os.environ.get("WECOM_ALERT_WEBHOOK")
# 或者从加密配置文件读取
import json
with open("config/webhooks.json.enc") as f:
webhooks = decrypt_config(f.read())
WEBHOOK = webhooks["alert_group"]
总结
通过本文,你已经掌握了:

| 能力 | 实现方式 |
|---|---|
| 文本/Markdown/图片/卡片消息 | 四种消息类型完整API |
| 异常告警系统 | 分级告警 + 去重 + @指定人 |
| 定时数据播报 | 早报模板 + 进度实时更新 |
| 审批提醒 | 结构化审批列表消息 |
| 多群路由管理 | 消息类型到群组的映射路由 |
| 频率限制处理 | 令牌桶算法避免被封 |
企业微信群机器人配合影刀RPA,能让你的自动化流程"会说话"——执行结果不再沉默,所有人随时知道任务状态。
作者:林焱 | 影刀RPA技术专栏 | 欢迎点赞收藏,持续更新中!
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)