影刀RPA实战:企业微信自动化消息推送与群机器人开发指南

作者:林焱 | 影刀RPA技术专栏

标签: 影刀RPA / 企业微信 / 消息推送 / 群机器人 / 自动化通知 / Webhook

适合读者: 需要定时发送报告、异常告警、业务通知的RPA开发者


前言:为什么企业微信是RPA最佳通知渠道?

RPA流程跑完了,结果怎么通知人?

  • 发邮件:不及时,容易被忽略
  • 钉钉:需要单独集成
  • 短信:成本高,不适合高频
  • 在这里插入图片描述

企业微信群机器人 是最佳选择:

  • ✅ 免费,无限发送次数(合理使用)
  • ✅ 支持Markdown格式,可发富文本消息
  • ✅ 支持@指定成员,保证重要消息被看到
  • ✅ 支持图片、文件、图文卡片等多种消息类型
  • ✅ 手机端/PC端实时推送

本文将带你从零开始,用影刀RPA搭建一套完整的企业微信自动化通知体系,覆盖:日报推送、异常告警、数据汇报、审批提醒等常用场景。


第一章:企业微信群机器人基础

1.1 创建群机器人

步骤:

  1. 打开企业微信,进入目标群聊

  2. 点击右上角「…」→「添加群机器人」

  3. 点击「新创建一个机器人」

  4. 输入机器人名称(如:RPA自动通知机器人)

  5. 在这里插入图片描述

  6. 复制生成的 Webhook URL(非常重要,保存好)

Webhook URL 格式:

https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

1.2 消息类型总览

店群矩阵自动化突破运营极限!

消息类型 适用场景 支持格式
text 纯文字通知,可@成员 普通文本
markdown 富文本报告,支持标题/表格/加粗 Markdown
image 发送图片(Base64或URL) JPG/PNG
news 图文卡片,适合链接分享 带链接的卡片
file 发送文件(需先上传获取media_id) 各种文件

第二章:影刀RPA发送企业微信消息

2.1 基础文本消息

在这里插入图片描述

import urllib.request
import json

def send_wecom_text(webhook_url: str, content: str, 
                     mentioned_list: list = None,
                     mentioned_mobile_list: list = None) -> dict:
    """
    发送文本消息到企业微信群
    
    参数:
    - webhook_url: 群机器人的Webhook地址
    - content: 消息内容(最大2048字节)
    - mentioned_list: @的成员列表(userid或@all)
    - mentioned_mobile_list: @的手机号列表
    
    返回:API响应字典
    """
    data = {
        "msgtype": "text",
        "text": {
            "content": content,
            "mentioned_list": mentioned_list or [],
            "mentioned_mobile_list": mentioned_mobile_list or []
        }
    }
    
    req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
    
    req = urllib.request.Request(
        webhook_url,
        data=req_data,
        headers={"Content-Type": "application/json"},
        method="POST"
    )
    
    with urllib.request.urlopen(req, timeout=10) as resp:
        result = json.loads(resp.read().decode("utf-8"))
    
    if result.get("errcode") == 0:
        print("✅ 消息发送成功")
    else:
        print(f"❌ 发送失败:{result.get('errmsg')}")
    
    return result


# 使用示例
WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的key"

# 普通文本消息
send_wecom_text(WEBHOOK, "RPA任务执行完成,共处理200条订单数据。")

# @所有人
send_wecom_text(WEBHOOK, "⚠️ 系统异常!请立即处理!", mentioned_list=["@all"])

# @指定人(用userid)
send_wecom_text(WEBHOOK, "你好,请查看今日报表", mentioned_list=["zhangsan", "lisi"])

2.2 Markdown格式消息(推荐)

Markdown消息是企业微信群机器人中最强大的消息类型,支持:

  • 标题(最多3级)
  • 加粗删除线
  • 代码块
  • 引用

  • 有序/无序列表
  • 超链接
  • 颜色标签(标签)
def send_wecom_markdown(webhook_url: str, content: str) -> dict:
    """
    发送Markdown格式消息
    
    注意:企业微信Markdown是其特有子集,
    不支持所有标准Markdown语法
    """
    data = {
        "msgtype": "markdown",
        "markdown": {
            "content": content
        }
    }
    
    req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
    req = urllib.request.Request(
        webhook_url,
        data=req_data,
        headers={"Content-Type": "application/json"},
        method="POST"
    )
    
    with urllib.request.urlopen(req, timeout=10) as resp:
        return json.loads(resp.read().decode("utf-8"))


# Markdown消息模板示例
def build_daily_report_markdown(date: str, stats: dict) -> str:
    """构建每日数据报告Markdown消息"""
    
    success_rate = (stats["success"] / stats["total"] * 100) if stats["total"] > 0 else 0
    status_icon = "✅" if success_rate >= 95 else "⚠️" if success_rate >= 80 else "❌"
    
    return f"""## {status_icon} RPA日报 - {date}

**任务执行统计**

> 执行时间:{stats.get("start_time", "N/A")} ~ {stats.get("end_time", "N/A")}

| 指标 | 数值 |
|------|------|
| 总任务数 | **{stats["total"]}** |
| 成功 | <font color="info">{stats["success"]}</font> |
| 失败 | <font color="warning">{stats.get("failed", 0)}</font> |
| 跳过 | {stats.get("skipped", 0)} |
| 成功率 | **{success_rate:.1f}%** |

**处理详情**
- 采集商品数据:{stats.get("scraped_items", 0)} 条
- 导出Excel报表:{stats.get("excel_files", 0)} 个
- 发送邮件:{stats.get("sent_emails", 0)}{f'**⚠️ 异常信息**{chr(10)}> {stats["error_summary"]}' if stats.get("error_summary") else ''}

[查看详细日志](http://rpa-monitor.company.com/logs/{date})
"""

# 发送日报示例
stats_data = {
    "total": 200,
    "success": 195,
    "failed": 3,
    "skipped": 2,
    "scraped_items": 1580,
    "excel_files": 5,
    "sent_emails": 12,
    "start_time": "08:30:00",
    "end_time": "09:15:23"
}

markdown_content = build_daily_report_markdown("2024-01-15", stats_data)
send_wecom_markdown(WEBHOOK, markdown_content)

2.3 图片消息

import base64
import hashlib

def send_wecom_image(webhook_url: str, image_path: str) -> dict:
    """
    发送图片消息(图片转Base64)
    
    限制:图片大小不超过2MB,仅支持JPG/PNG
    """
    with open(image_path, "rb") as f:
        image_data = f.read()
    
    image_base64 = base64.b64encode(image_data).decode("utf-8")
    image_md5 = hashlib.md5(image_data).hexdigest()
    
    data = {
        "msgtype": "image",
        "image": {
            "base64": image_base64,
            ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6cc0dae393884a078e36bbbaa9d178ee.png#pic_center)

            "md5": image_md5
        }
    }
    
    req_data = json.dumps(data).encode("utf-8")
    req = urllib.request.Request(
        webhook_url,
        data=req_data,
        headers={"Content-Type": "application/json"},
        method="POST"
    )
    
    with urllib.request.urlopen(req, timeout=30) as resp:
        return json.loads(resp.read().decode("utf-8"))

# 发送图表截图
send_wecom_image(WEBHOOK, "C:/RPA/output/charts/daily_trend.png")

2.4 图文卡片消息

def send_wecom_news(webhook_url: str, articles: list) -> dict:
    """
    发送图文卡片消息(最多8篇文章)
    
    articles格式:[{
        "title": "标题",
        "description": "描述",  # 可选
        "url": "点击跳转URL",
        "picurl": "图片URL"     # 可选
    }]
    """
    data = {
        "msgtype": "news",
        "news": {
            "articles": articles[:8]  # 最多8篇
        }
    }
    
    req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
    req = urllib.request.Request(
        webhook_url,
        data=req_data,
        headers={"Content-Type": "application/json"},
        method="POST"
    )
    
    with urllib.request.urlopen(req, timeout=10) as resp:
        return json.loads(resp.read().decode("utf-8"))


# 发送每周工作报告卡片
send_wecom_news(WEBHOOK, [
    {
        "title": "📊 本周RPA运行报告(2024-W03)",
        "description": "本周共处理8500条数据,节省工时28小时",
        "url": "http://intranet.company.com/rpa/weekly/2024W03",
        "picurl": "https://example.com/images/report_cover.png"
    },
    {
        "title": "🚀 新上线:财务报表自动化流程",
        "description": "月末对账时间从4小时缩短至15分钟",
        "url": "http://intranet.company.com/rpa/flows/finance",
    }
])

第三章:实战场景一——RPA异常告警系统

3.1 设计思路

异常告警的核心要求:

  1. 及时性:异常发生后30秒内通知到人
  2. 可操作性:消息要包含足够的上下文,让值班人员知道该怎么处理
  3. 不打扰:同类异常不要重复发送(去重)
  4. 分级:区分紧急/一般/可忽略三个等级

3.2 告警系统完整实现

在这里插入图片描述
在这里插入图片描述

import json
import datetime
import hashlib
import urllib.request
from collections import defaultdict

class WeComAlertSystem:
    """企业微信RPA告警系统"""
    
    ALERT_WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的key"
    
    # 告警等级配置
    LEVELS = {
        "critical": {
            "icon": "🔴",
            "prefix": "【紧急告警】",
            "mention": ["@all"],
            "dedupe_seconds": 300  # 5分钟内不重复
        },
        "warning": {
            "icon": "🟡",
            "prefix": "【警告】",
            "mention": ["zhangsan"],  # 只通知负责人
            "dedupe_seconds": 1800   # 30分钟内不重复
        },
        "info": {
            "icon": "🔵",
            "prefix": "【提示】",
            "mention": [],
            "dedupe_seconds": 3600   # 1小时内不重复
        }
    }
    
    def __init__(self):
        self._sent_alerts = {}  # {alert_hash: last_sent_time}
    
    def _get_alert_hash(self, level: str, task_name: str, error_type: str) -> str:
        """计算告警去重哈希"""
        key = f"{level}:{task_name}:{error_type}"
        return hashlib.md5(key.encode()).hexdigest()[:8]
    
    def _is_duplicate(self, alert_hash: str, dedupe_seconds: int) -> bool:
        """检查是否是重复告警"""
        if alert_hash not in self._sent_alerts:
            return False
        
        last_sent = self._sent_alerts[alert_hash]
        elapsed = (datetime.datetime.now() - last_sent).seconds
        return elapsed < dedupe_seconds
    
    def alert(self, level: str, task_name: str, error_type: str,
              error_detail: str, suggestion: str = "", context: dict = None) -> bool:
        """
        发送告警消息
        
        参数:
        - level: 告警级别 (critical/warning/info)
        - task_name: 发生问题的任务名
        - error_type: 错误类型简短描述
        - error_detail: 详细错误信息
        - suggestion: 建议处理方式
        - context: 额外上下文信息(字典)
        
        返回:是否成功发送(重复告警返回False)
        """
        level_config = self.LEVELS.get(level, self.LEVELS["info"])
        dedupe_seconds = level_config["dedupe_seconds"]
        
        alert_hash = self._get_alert_hash(level, task_name, error_type)
        
        # 去重检查
        if self._is_duplicate(alert_hash, dedupe_seconds):
            print(f"ℹ️ 告警已去重({dedupe_seconds}s内不重复发送):{error_type}")
            return False
        
        # 构建Markdown消息
        icon = level_config["icon"]
        prefix = level_config["prefix"]
        now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        content = f"""{icon} {prefix} {task_name}

**错误类型:** {error_type}
**发生时间:** {now}
**错误详情:**
> {error_detail[:500]}  

"""
        
        if context:
            content += "**上下文信息:**\n"
            for k, v in context.items():
                content += f"- {k}:`{v}`\n"
            content += "\n"
        
        if suggestion:
            content += f"**处理建议:**\n> {suggestion}\n\n"
        
        content += f"*告警ID:{alert_hash}*"
        
        # 发送消息
        data = {
            "msgtype": "markdown",
            "markdown": {"content": content}
        }
        
        mention_list = level_config["mention"]
        if mention_list:
            # 如果有@信息,先发text消息@人,再发markdown内容
            self._send_text_mention(mention_list, task_name, prefix)
        
        result = self._send_request(data)
        
        if result.get("errcode") == 0:
            self._sent_alerts[alert_hash] = datetime.datetime.now()
            return True
        
        return False
    
    def _send_text_mention(self, mention_list: list, task_name: str, prefix: str):
        """发送@消息"""
        data = {
            "msgtype": "text",
            "text": {
                "content": f"{prefix} {task_name} 需要处理",
                "mentioned_list": mention_list
            }
        }
        self._send_request(data)
    
    def _send_request(self, data: dict) -> dict:
        """发送HTTP请求到企业微信"""
        req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
        req = urllib.request.Request(
            self.ALERT_WEBHOOK,
            data=req_data,
            headers={"Content-Type": "application/json"},
            method="POST"
        )
        try:
            with urllib.request.urlopen(req, timeout=10) as resp:
                return json.loads(resp.read().decode("utf-8"))
        except Exception as e:
            print(f"发送消息失败:{e}")
            return {"errcode": -1, "errmsg": str(e)}


# ===== 在RPA流程中使用 =====
alert_system = WeComAlertSystem()

def run_order_scrape():
    """订单采集主流程(带完整告警)"""
    task_name = "京东店铺订单采集"
    
    try:
        # 检查环境
        if not check_vpn_connected():
            alert_system.alert(
                level="critical",
                task_name=task_name,
                error_type="网络环境异常",
                error_detail="VPN连接断开,无法访问内网系统",
                suggestion="请检查VPN连接状态,重新连接后手动重启任务",
                context={"服务器地址": "10.0.1.100", "端口": "443"}
            )
            return
        
        # 登录
        try:
            login_jd_shop()
        except LoginFailedException as e:
            alert_system.alert(
                level="critical",
                task_name=task_name,
                error_type="登录失败",
                error_detail=str(e),
                suggestion="可能密码已更改或账号被锁定,请手动登录检查",
                context={"账号": "shop_admin", "错误码": getattr(e, "code", "N/A")}
            )
            return
        
        # 采集过程
        total = 0
        errors = 0
        
        for page_num in range(1, 100):
            try:
                items = scrape_order_page(page_num)
                if not items:
                    break
                save_orders_to_excel(items)
                total += len(items)
                
            except PageLoadTimeoutError:
                errors += 1
                alert_system.alert(
                    level="warning",
                    task_name=task_name,
                    error_type="页面加载超时",
                    error_detail=f"第{page_num}页超时,已跳过",
                    suggestion="网络较慢,已自动跳过该页,数据可能不完整",
                    context={"页码": page_num, "当前采集量": total}
                )
                continue
        
        # 完成通知
        send_wecom_markdown(
            WeComAlertSystem.ALERT_WEBHOOK,
            f"""✅ **{task_name}完成**

- 采集订单:{total} 条
- 跳过页面:{errors} 页
- 完成时间:{datetime.datetime.now().strftime("%H:%M:%S")}
"""
        )
        
    except Exception as e:
        alert_system.alert(
            level="critical",
            task_name=task_name,
            error_type="未知异常导致任务中断",
            error_detail=str(e)[:300],
            suggestion="请查看详细日志,必要时手动执行任务"
        )

第四章:实战场景二——定时数据播报

4.1 早报系统(每日8:30)

import datetime
import json

class DailyBriefingBot:
    """每日数据早报机器人"""
    
    def __init__(self, webhook_url: str):
        self.webhook = webhook_url
    
    def generate_morning_report(self) -> str:
        """生成早报内容"""
        today = datetime.date.today()
        weekday_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
        weekday = weekday_names[today.weekday()]
        
        # 获取各项数据(实际从数据库/Excel读取)
        data = self._collect_daily_data()
        
        # 今日任务待办
        tasks = self._get_pending_tasks()
        task_list = "\n".join([f"- [ ] {t}" for t in tasks[:5]])
        
        return f"""# 📅 早报 {today.strftime("%Y年%m月%d日")} {weekday}

## 📊 昨日数据汇总

| 业务线 | 订单量 | 销售额 | 环比 |
|--------|--------|--------|------|
| 电商主站 | {data["ec_orders"]} | ¥{data["ec_revenue"]:,.0f} | {data["ec_growth"]:+.1f}% |
| 批发渠道 | {data["wholesale_orders"]} | ¥{data["wholesale_revenue"]:,.0f} | {data["wholesale_growth"]:+.1f}% |
| 海外仓 | {data["oversea_orders"]} | ¥{data["oversea_revenue"]:,.0f} | {data["oversea_growth"]:+.1f}% |

## 🤖 RPA今日任务

{task_list}

## ⚠️ 待处理事项

{self._get_pending_issues()}

---
*由RPA自动播报 · {datetime.datetime.now().strftime("%H:%M")}*
"""
    
    def _collect_daily_data(self) -> dict:
        """从各数据源收集昨日数据"""
        # 实际实现:读取Excel/数据库/API
        return {
            "ec_orders": 1258, "ec_revenue": 158000, "ec_growth": 12.5,
            "wholesale_orders": 86, "wholesale_revenue": 285000, "wholesale_growth": -3.2,
            "oversea_orders": 234, "oversea_revenue": 96000, "oversea_growth": 28.1
        }
    
    def _get_pending_tasks(self) -> list:
        """获取今日待执行的RPA任务"""
        return [
            "09:00 - 电商平台订单同步",
            "10:00 - 发货单打印(约320单)",
            "14:00 - 库存盘点数据采集",
            "16:00 - 日报Excel生成",
            "17:30 - 日报邮件发送"
        ]
    
    def _get_pending_issues(self) -> str:
        """获取待处理异常"""
        return """> ⚠️ 京东店铺3个订单待人工审核(金额超过5000元)
> ⚠️ 仓库系统API昨日18:00-19:30响应缓慢,相关数据已标记待核实"""
    
    def send_morning_report(self):
        """发送早报"""
        content = self.generate_morning_report()
        data = {
            "msgtype": "markdown",
            "markdown": {"content": content}
        }
        req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
        req = urllib.request.Request(
            self.webhook,
            data=req_data,
            headers={"Content-Type": "application/json"},
            method="POST"
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            return json.loads(resp.read().decode("utf-8"))


# 在影刀定时任务中调用
bot = DailyBriefingBot(WEBHOOK)
bot.send_morning_report()

4.2 实时进度播报(大批量任务)

class ProgressReporter:
    """大批量任务进度实时播报"""
    
    def __init__(self, webhook_url: str, task_name: str, total_items: int,
                 report_interval: int = 100):
        """
        参数:
        - report_interval: 每处理多少条汇报一次
        """
        self.webhook = webhook_url
        self.task_name = task_name
        self.total = total_items
        self.report_interval = report_interval
        self.current = 0
        self.errors = 0
        self.start_time = datetime.datetime.now()
    
    def update(self, success: bool = True):
        """更新进度(每处理一条调用一次)"""
        self.current += 1
        if not success:
            self.errors += 1
        
        # 每隔N条或完成时汇报
        if self.current % self.report_interval == 0 or self.current == self.total:
            self._send_progress()
    
    def _send_progress(self):
        """发送进度消息"""
        elapsed = (datetime.datetime.now() - self.start_time).seconds
        progress_pct = self.current / self.total * 100
        
        # 估算剩余时间
        if self.current > 0:
            speed = self.current / max(elapsed, 1)  # 条/秒
            remaining_items = self.total - self.current
            eta_seconds = int(remaining_items / speed)
            eta_str = f"{eta_seconds // 60}{eta_seconds % 60}秒"
        else:
            eta_str = "计算中..."
        
        # 进度条(20个字符)
        filled = int(progress_pct / 5)
        bar = "▓" * filled + "░" * (20 - filled)
        
        is_done = self.current >= self.total
        status = "✅ **已完成**" if is_done else "⏳ **进行中**"
        
        content = f"""{status} {self.task_name}

`{bar}` {progress_pct:.0f}%

- 已处理:**{self.current}** / {self.total}
- 成功:{self.current - self.errors} | 失败:{self.errors}
- 耗时:{elapsed // 60}{elapsed % 60}{f'- 预计剩余:{eta_str}' if not is_done else f'- 总耗时:{elapsed // 60}{elapsed % 60}秒'}
"""
        
        data = {
            "msgtype": "markdown",
            "markdown": {"content": content}
        }
        req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
        req = urllib.request.Request(
            self.webhook,
            data=req_data,
            headers={"Content-Type": "application/json"},
            method="POST"
        )
        try:
            with urllib.request.urlopen(req, timeout=5) as resp:
                pass
        except Exception:
            pass  # 进度播报失败不影响主任务


# 使用示例
reporter = ProgressReporter(WEBHOOK, "全量商品数据同步", total_items=5000, 
                              report_interval=500)

for item in all_products:
    try:
        process_product(item)
        reporter.update(success=True)
    except Exception:
        reporter.update(success=False)

第五章:实战场景三——审批提醒与交互

5.1 审批待处理提醒

def send_approval_reminder(webhook_url: str, approvals: list):
    """
    发送审批待处理提醒
    
    approvals: [{
        "id": "AP-2024-001",
        "type": "采购申请",
        "applicant": "张三",
        "amount": 15000,
        "submitted_at": "2024-01-15 14:30",
        "approver_userid": "lisi"
    }]
    """
    if not approvals:
        return
    
    approval_list = ""
    for ap in approvals[:10]:  # 最多显示10条
        amount_str = f"¥{ap['amount']:,.0f}" if ap.get("amount") else ""
        approval_list += f"- **{ap['id']}** | {ap['type']} | {ap['applicant']} | {amount_str}\n"
    
    approver_mentions = list(set([f"<@{ap['approver_userid']}>" for ap in approvals]))
    mentions_str = " ".join(approver_mentions[:3])  # 最多@3人
    
    content = f"""⏰ **审批提醒** {mentions_str}

您有 **{len(approvals)}** 个审批待处理:

{approval_list}
{'> ...' if len(approvals) > 10 else ''}

[👉 前往审批系统处理](http://oa.company.com/approval)

*最后更新:{datetime.datetime.now().strftime("%H:%M")}*
"""
    
    data = {
        "msgtype": "markdown",
        "markdown": {"content": content}
    }
    req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
    req = urllib.request.Request(
        webhook_url,
        data=req_data,
        headers={"Content-Type": "application/json"},
        method="POST"
    )
    with urllib.request.urlopen(req, timeout=10) as resp:
        return json.loads(resp.read().decode("utf-8"))

第六章:多Webhook管理与路由

6.1 消息路由器

temu店群自动化报活动案例

在这里插入图片描述

实际企业中,不同类型的消息需要发送到不同的群组:

class WeComMessageRouter:
    """企业微信消息路由器(多群管理)"""
    
    WEBHOOKS = {
        "ops_team": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=运维群key",
        "sales_team": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=销售群key",
        "finance_team": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=财务群key",
        "all_staff": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=全员群key",
        "rpa_monitor": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=RPA监控群key",
    }
    
    # 消息类型到群组的映射规则
    ROUTING_RULES = {
        "system_alert": ["ops_team", "rpa_monitor"],       # 系统告警→运维+RPA监控
        "daily_sales": ["sales_team", "finance_team"],     # 日销数据→销售+财务
        "monthly_report": ["all_staff"],                    # 月报→全员
        "rpa_status": ["rpa_monitor"],                     # RPA状态→监控群
        "approval_reminder": ["ops_team"],                  # 审批提醒→运维(看实际配置)
    }
    
    def route_message(self, message_type: str, content: str, 
                       msg_format: str = "markdown") -> list:
        """
        根据消息类型路由到对应群组
        
        返回:各群发送结果列表
        """
        target_groups = self.ROUTING_RULES.get(message_type, ["rpa_monitor"])
        results = []
        
        for group_name in target_groups:
            webhook = self.WEBHOOKS.get(group_name)
            if not webhook:
                print(f"⚠️ 未找到群组配置:{group_name}")
                continue
            
            data = {
                "msgtype": msg_format,
                msg_format: {"content": content}
            }
            
            try:
                req_data = json.dumps(data, ensure_ascii=False).encode("utf-8")
                req = urllib.request.Request(
                    webhook,
                    data=req_data,
                    headers={"Content-Type": "application/json"},
                    method="POST"
                )
                with urllib.request.urlopen(req, timeout=10) as resp:
                    result = json.loads(resp.read().decode("utf-8"))
                    results.append({"group": group_name, "success": result["errcode"] == 0})
                    print(f"{'✅' if result['errcode'] == 0 else '❌'} 发送到 {group_name}{result.get('errmsg')}")
            except Exception as e:
                results.append({"group": group_name, "success": False, "error": str(e)})
        
        return results


# 使用示例
router = WeComMessageRouter()

# 系统异常同时发到运维群和监控群
router.route_message(
    "system_alert",
    "🔴 **紧急:数据库连接超时!** 请立即检查DB服务器状态"
)

# 日销数据发到销售+财务群
router.route_message(
    "daily_sales",
    "📊 **昨日销售数据** 总额:¥1,258,000,同比增长8.5%"
)

第七章:最佳实践与注意事项

7.1 频率限制

企业微信群机器人有频率限制:

  • 同一群机器人:每分钟最多发送20条
  • 超出限制会返回 errcode: 45009
  • 在这里插入图片描述
import time
from collections import deque

class RateLimiter:
    """发送频率限制器"""
    
    def __init__(self, max_per_minute: int = 18):  # 留2条余量
        self.max_per_minute = max_per_minute
        self.sent_times = deque()
    
    def wait_if_needed(self):
        """在发送前调用,必要时等待"""
        now = time.time()
        
        # 清除1分钟前的记录
        while self.sent_times and now - self.sent_times[0] > 60:
            self.sent_times.popleft()
        
        # 如果达到限制,等待
        if len(self.sent_times) >= self.max_per_minute:
            wait_until = self.sent_times[0] + 61  # 等到最早记录过期
            wait_seconds = max(0, wait_until - now)
            if wait_seconds > 0:
                print(f"⏳ 发送频率限制,等待 {wait_seconds:.1f}s")
                time.sleep(wait_seconds)
        
        self.sent_times.append(time.time())


rate_limiter = RateLimiter()

def send_message_safely(webhook: str, data: dict):
    rate_limiter.wait_if_needed()
    # ... 发送逻辑

7.2 消息长度限制

消息类型 内容长度限制
text 2048字节
markdown 4096字节
news标题 128字节
news描述 512字节

7.3 安全建议

# ❌ 不要这样做:Webhook URL硬编码
WEBHOOK = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=abc123"

# ✅ 正确做法:从环境变量或加密配置文件读取
import os
WEBHOOK = os.environ.get("WECOM_ALERT_WEBHOOK")

# 或者从加密配置文件读取
import json
with open("config/webhooks.json.enc") as f:
    webhooks = decrypt_config(f.read())
WEBHOOK = webhooks["alert_group"]

总结

通过本文,你已经掌握了:

在这里插入图片描述

能力 实现方式
文本/Markdown/图片/卡片消息 四种消息类型完整API
异常告警系统 分级告警 + 去重 + @指定人
定时数据播报 早报模板 + 进度实时更新
审批提醒 结构化审批列表消息
多群路由管理 消息类型到群组的映射路由
频率限制处理 令牌桶算法避免被封

企业微信群机器人配合影刀RPA,能让你的自动化流程"会说话"——执行结果不再沉默,所有人随时知道任务状态。


作者:林焱 | 影刀RPA技术专栏 | 欢迎点赞收藏,持续更新中!
在这里插入图片描述

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐