经手的项目有这样的需求:将业务系统内提交的数据及时发送到企业微信内部群。然而,由于平台接口存在限定,无法直接发送标准的 markdown 格式的数据。因此,需要先将数据发送到中间件进行转换处理。这里简单地使用 Flask 框架来搭建一个用于接收和处理数据的程序,此仅为个人记录用途。

平台中的代码 


        string f0000036Value = this.Request.BizObject["F0000036"] + string.Empty;

       
        decimal f0000036Parsed;

        
        bool isParsedSuccessfully = decimal.TryParse(f0000036Value, out f0000036Parsed);

       
        if(actionName == "Submit" && this.Request.ActivityCode == "Activity2" && isParsedSuccessfully && f0000036Parsed > 99)
{

            string id = this.Request.BizObjectId; //单据ID

            //新增加
            string webhook_key = this.Request.BizObject["F0000085"] + string.Empty; //大区企业微信机器人地址
            string ksrwb = this.Request.BizObject["F0000089"] + string.Empty; //考试人文本
            string xsqwb = this.Request.BizObject["F0000087"] + string.Empty; // 销售区文本
            string kssj = this.Request.BizObject["F0000022"] + string.Empty; //考试时间
            string sjmc = this.Request.BizObject["F0000090"] + string.Empty; //试卷名称
            string djcks = this.Request.BizObject["F0000048"] + string.Empty; //第几次考试
            string dxtdf = this.Request.BizObject["F0000023"] + string.Empty; //单选题得分
            string ddxtdf = this.Request.BizObject["F0000033"] + string.Empty; //多选题得分
            string tktdf = this.Request.BizObject["F0000060"] + string.Empty; //填空题得分
            string pdtdf = this.Request.BizObject["F0000034"] + string.Empty; //判断题得分
            string zf = this.Request.BizObject["F0000036"] + string.Empty; //总分
            string content = string.Format("{{\"msgtype\":\"markdown\",\"markdown\":{{\"content\":\"{0}:\\n\\n>考试人:{1}\\n\\n>试卷名称:{2}\\n>考试时间:{3}\\n>第{4}次考试\\n\\n>单选题得分:{5}\\n>多选题得分:{6}\\n>填空题得分:{7}\\n>判断题得分:{8}\\n\\n>总得分:{9}\"}}}}", xsqwb, ksrwb, sjmc, kssj, djcks, dxtdf, ddxtdf, tktdf, pdtdf, zf);
            string token = ""; //GetToken(response);
            if(id != "") 
            {
                H3.BizBus.BizStructureSchema msgSchema = new H3.BizBus.BizStructureSchema();
                msgSchema.Add(new H3.BizBus.ItemSchema("ret", "状态", H3.Data.BizDataType.String, null));
                msgSchema.Add(new H3.BizBus.ItemSchema("msg", "返回信息", H3.Data.BizDataType.String, null));
                Dictionary < string, string > headers=new Dictionary<string, string>();
                Dictionary < string, object > bodys=new Dictionary<string, object>();
                Dictionary < string, string > querys=new Dictionary<string, string>();
                headers.Add("Authorization", token);//Authorization 
                bodys.Add("webhook_key", webhook_key);
                bodys.Add("content", content);
                //调用Invoke接口,系统底层访问第三方WebService接口的Invoke方法
                H3.BizBus.InvokeResult InResult = this.Engine.BizBus.InvokeApi(H3.Organization.User.SystemUserId, H3.BizBus.AccessPointType.ThirdConnection, "qywxqjqr", "POST", "application/x-www-form-urlencoded;charset=UTF-8", headers, querys, bodys, msgSchema);
                if(InResult != null)
                {
                    int Code = InResult.Code; //调用是否成功
                    if(Code == 0)
                    {
                        //获取返回数据
                        response.ReturnData = new Dictionary<string, object>();
                        H3.BizBus.BizStructure Obj = InResult.Data;
                        string Ecode = Obj["ret"] as string;
                        string msg = Obj["msg"] as string;
                        if(Ecode == "0") 
                        {

                        }
                    }
                }
            }

        }

中间件python的代码:

特别注意的小地方,在部署到云服务器的时候如果有多个IP时,flask中最好增加下面的代码,要不然就可能会造成访问不成功:

# 运行 Flask 应用,监听 0.0.0.0 地址,开启调试模式,端口为 5082
    app.run(host='0.0.0.0', debug=True, port='5082')
from flask import Flask, request, jsonify
import http.client
import json
import urllib.parse
from datetime import datetime

# 创建 Flask 应用实例
app = Flask(__name__)

# 根据当前日期创建 log.txt 日志文件
current_date = datetime.now().strftime("%Y-%m-%d")
log_file_name = f"{current_date}_log.txt"
log_file = open(log_file_name, "a")

# 定义路由,接收 POST 请求,路径为 /send_message
@app.route('/send_message', methods=['POST'])
def send_message():
    # 记录请求开始时间
    request_start_time = datetime.now()

    # 解析 POST 请求中的数据,转换为字典格式
    post_data = request.form.to_dict()

    # 打印并保存接收到的数据到 log.txt 日志文件
    log_file.write(f"{request_start_time} - Received data: {post_data}\n")
    log_file.flush()  # 确保数据立即写入文件

    # 从 POST 数据中提取 webhook_key 和 content
    webhook_key = post_data.get('webhook_key')
    raw_content = post_data.get('content')

    # 将 content 从字符串转换为字典,假设 content 是以 JSON 格式编码的字符串
    try:
        content_dict = json.loads(raw_content)
    except json.JSONDecodeError:
        # 如果 JSON 解析失败,返回错误响应并记录到日志
        response = jsonify({"error": "Invalid content format, expecting JSON."}), 400
        log_file.write(f"{datetime.now()} - Response: {response}\n")
        log_file.flush()
        return response

    # 确保 content 字典中存在 msgtype 和 markdown 字段
    if 'msgtype' not in content_dict or 'markdown' not in content_dict:
        # 如果缺少必要字段,返回错误响应并记录到日志
        response = jsonify({"error": "Missing 'msgtype' or 'markdown' in the content."}), 400
        log_file.write(f"{datetime.now()} - Response: {response}\n")
        log_file.flush()
        return response

    # 提取 markdown 的内容
    markdown_content = content_dict['markdown']['content']

    # 构建发送给企业微信的 payload
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "content": markdown_content
        }
    }

    try:
        # 发送请求到企业微信
        conn = http.client.HTTPSConnection("qyapi.weixin.qq.com")
        headers = {'Content-Type': "application/json"}
        # 发送 POST 请求,包含 payload 和 headers
        conn.request("POST", f"/cgi-bin/webhook/send?key={webhook_key}", json.dumps(payload), headers)
        res = conn.getresponse()
        data = res.read()

        # 关闭连接
        conn.close()

        # 根据响应状态码判断是否成功,并返回结果
        if res.status == 200:
            response = jsonify({"message": "Message sent successfully", "response": data.decode("utf-8")})
        else:
            response = jsonify({"error": "Failed to send message", "details": data.decode("utf-8")}), res.status

        # 记录响应结束时间
        response_end_time = datetime.now()

        # 将完整的请求和响应过程记录到日志文件中
        log_file.write(f"{request_start_time} - Request: {post_data}\n")
        log_file.write(f"{response_end_time} - Response: {response}\n")
        log_file.flush()

        return response

    except Exception as e:
        # 如果出现异常,返回错误响应并记录到日志
        response = jsonify({"error": str(e)}), 500
        log_file.write(f"{datetime.now()} - Response: {response}\n")
        log_file.flush()
        return response

if __name__ == '__main__':
    # 运行 Flask 应用,监听 0.0.0.0 地址,开启调试模式,端口为 5082
    app.run(host='0.0.0.0', debug=True, port='5082')

# 关闭日志文件
log_file.close()

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐