python flask 企业微信机器人发送群信息 中间件
·
经手的项目有这样的需求:将业务系统内提交的数据及时发送到企业微信内部群。然而,由于平台接口存在限定,无法直接发送标准的 markdown 格式的数据。因此,需要先将数据发送到中间件进行转换处理。这里简单地使用 Flask 框架来搭建一个用于接收和处理数据的程序,此仅为个人记录用途。
平台中的代码
string f0000036Value = this.Request.BizObject["F0000036"] + string.Empty;
decimal f0000036Parsed;
bool isParsedSuccessfully = decimal.TryParse(f0000036Value, out f0000036Parsed);
if(actionName == "Submit" && this.Request.ActivityCode == "Activity2" && isParsedSuccessfully && f0000036Parsed > 99)
{
string id = this.Request.BizObjectId; //单据ID
//新增加
string webhook_key = this.Request.BizObject["F0000085"] + string.Empty; //大区企业微信机器人地址
string ksrwb = this.Request.BizObject["F0000089"] + string.Empty; //考试人文本
string xsqwb = this.Request.BizObject["F0000087"] + string.Empty; // 销售区文本
string kssj = this.Request.BizObject["F0000022"] + string.Empty; //考试时间
string sjmc = this.Request.BizObject["F0000090"] + string.Empty; //试卷名称
string djcks = this.Request.BizObject["F0000048"] + string.Empty; //第几次考试
string dxtdf = this.Request.BizObject["F0000023"] + string.Empty; //单选题得分
string ddxtdf = this.Request.BizObject["F0000033"] + string.Empty; //多选题得分
string tktdf = this.Request.BizObject["F0000060"] + string.Empty; //填空题得分
string pdtdf = this.Request.BizObject["F0000034"] + string.Empty; //判断题得分
string zf = this.Request.BizObject["F0000036"] + string.Empty; //总分
string content = string.Format("{{\"msgtype\":\"markdown\",\"markdown\":{{\"content\":\"{0}:\\n\\n>考试人:{1}\\n\\n>试卷名称:{2}\\n>考试时间:{3}\\n>第{4}次考试\\n\\n>单选题得分:{5}\\n>多选题得分:{6}\\n>填空题得分:{7}\\n>判断题得分:{8}\\n\\n>总得分:{9}\"}}}}", xsqwb, ksrwb, sjmc, kssj, djcks, dxtdf, ddxtdf, tktdf, pdtdf, zf);
string token = ""; //GetToken(response);
if(id != "")
{
H3.BizBus.BizStructureSchema msgSchema = new H3.BizBus.BizStructureSchema();
msgSchema.Add(new H3.BizBus.ItemSchema("ret", "状态", H3.Data.BizDataType.String, null));
msgSchema.Add(new H3.BizBus.ItemSchema("msg", "返回信息", H3.Data.BizDataType.String, null));
Dictionary < string, string > headers=new Dictionary<string, string>();
Dictionary < string, object > bodys=new Dictionary<string, object>();
Dictionary < string, string > querys=new Dictionary<string, string>();
headers.Add("Authorization", token);//Authorization
bodys.Add("webhook_key", webhook_key);
bodys.Add("content", content);
//调用Invoke接口,系统底层访问第三方WebService接口的Invoke方法
H3.BizBus.InvokeResult InResult = this.Engine.BizBus.InvokeApi(H3.Organization.User.SystemUserId, H3.BizBus.AccessPointType.ThirdConnection, "qywxqjqr", "POST", "application/x-www-form-urlencoded;charset=UTF-8", headers, querys, bodys, msgSchema);
if(InResult != null)
{
int Code = InResult.Code; //调用是否成功
if(Code == 0)
{
//获取返回数据
response.ReturnData = new Dictionary<string, object>();
H3.BizBus.BizStructure Obj = InResult.Data;
string Ecode = Obj["ret"] as string;
string msg = Obj["msg"] as string;
if(Ecode == "0")
{
}
}
}
}
}
中间件python的代码:
特别注意的小地方,在部署到云服务器的时候如果有多个IP时,flask中最好增加下面的代码,要不然就可能会造成访问不成功:
# 运行 Flask 应用,监听 0.0.0.0 地址,开启调试模式,端口为 5082
app.run(host='0.0.0.0', debug=True, port='5082')
from flask import Flask, request, jsonify
import http.client
import json
import urllib.parse
from datetime import datetime
# 创建 Flask 应用实例
app = Flask(__name__)
# 根据当前日期创建 log.txt 日志文件
current_date = datetime.now().strftime("%Y-%m-%d")
log_file_name = f"{current_date}_log.txt"
log_file = open(log_file_name, "a")
# 定义路由,接收 POST 请求,路径为 /send_message
@app.route('/send_message', methods=['POST'])
def send_message():
# 记录请求开始时间
request_start_time = datetime.now()
# 解析 POST 请求中的数据,转换为字典格式
post_data = request.form.to_dict()
# 打印并保存接收到的数据到 log.txt 日志文件
log_file.write(f"{request_start_time} - Received data: {post_data}\n")
log_file.flush() # 确保数据立即写入文件
# 从 POST 数据中提取 webhook_key 和 content
webhook_key = post_data.get('webhook_key')
raw_content = post_data.get('content')
# 将 content 从字符串转换为字典,假设 content 是以 JSON 格式编码的字符串
try:
content_dict = json.loads(raw_content)
except json.JSONDecodeError:
# 如果 JSON 解析失败,返回错误响应并记录到日志
response = jsonify({"error": "Invalid content format, expecting JSON."}), 400
log_file.write(f"{datetime.now()} - Response: {response}\n")
log_file.flush()
return response
# 确保 content 字典中存在 msgtype 和 markdown 字段
if 'msgtype' not in content_dict or 'markdown' not in content_dict:
# 如果缺少必要字段,返回错误响应并记录到日志
response = jsonify({"error": "Missing 'msgtype' or 'markdown' in the content."}), 400
log_file.write(f"{datetime.now()} - Response: {response}\n")
log_file.flush()
return response
# 提取 markdown 的内容
markdown_content = content_dict['markdown']['content']
# 构建发送给企业微信的 payload
payload = {
"msgtype": "markdown",
"markdown": {
"content": markdown_content
}
}
try:
# 发送请求到企业微信
conn = http.client.HTTPSConnection("qyapi.weixin.qq.com")
headers = {'Content-Type': "application/json"}
# 发送 POST 请求,包含 payload 和 headers
conn.request("POST", f"/cgi-bin/webhook/send?key={webhook_key}", json.dumps(payload), headers)
res = conn.getresponse()
data = res.read()
# 关闭连接
conn.close()
# 根据响应状态码判断是否成功,并返回结果
if res.status == 200:
response = jsonify({"message": "Message sent successfully", "response": data.decode("utf-8")})
else:
response = jsonify({"error": "Failed to send message", "details": data.decode("utf-8")}), res.status
# 记录响应结束时间
response_end_time = datetime.now()
# 将完整的请求和响应过程记录到日志文件中
log_file.write(f"{request_start_time} - Request: {post_data}\n")
log_file.write(f"{response_end_time} - Response: {response}\n")
log_file.flush()
return response
except Exception as e:
# 如果出现异常,返回错误响应并记录到日志
response = jsonify({"error": str(e)}), 500
log_file.write(f"{datetime.now()} - Response: {response}\n")
log_file.flush()
return response
if __name__ == '__main__':
# 运行 Flask 应用,监听 0.0.0.0 地址,开启调试模式,端口为 5082
app.run(host='0.0.0.0', debug=True, port='5082')
# 关闭日志文件
log_file.close()
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)