单聊机器人实现

开发 Stream 模式推送服务端(推荐)

什么是Stream模式

Stream 模式是钉钉开放平台提供的一种集成方式,它可以监听机器人回调、事件订阅回调和注册卡片回调。使用 Stream 模式接入,钉钉开放平台将通过 Websocket 连接与应用程序通讯,Stream 模式将极大降低接入门槛和资源依赖,不需要公网服务器、IP、域名等资源,只需集成钉钉开放平台 SDK 即可。

Stream模式原理

在 Stream 模式下,开发者的应用程序通过集成 SDK 的方式与钉钉开放平台建立一条 WebSocket 连接,建立连接过程中开放平台将对连接进行鉴权。当有卡片回调发生时,开放平台将通过 WebSocket 连接将数据通知到开发者的应用程序。开发者的应用程序可以接收到这些数据并进行相应处理,从而实现与钉钉开放平台的实时通信,参考下图所示

在这里插入图片描述

Stream模式优势

在钉钉开放平台向应用程序发送请求的场景中,大部分都是采用 Webhook (注册公网 HTTPS 服务)的方式,包括卡片回调,使用 Webhook 方式开发过程中会遇到较多的问题,包括

  • 申请公网域名和TLS证书
  • 申请公网IP并部署接入网关
  • 部署应用防火墙并配置白名单
  • 独立处理请求的鉴权,以及加解密处理
  • 搭建内网穿透环境进行本地开发调试

针对以上问题,Stream 模式将为开发者提供"五零"接入体验,将1~2周的接入开发周期降低到5分钟,包括

  • 零公网IP

    不需要依赖公网IP或域名,也不需要暴露公网IP,减少了公网暴露服务的安全风险并降低了开发门槛。

  • 零加解密/签名/TLS证书管理

    使用应用身份对连接进行鉴权,通过反向连接的方式与钉钉开放平台建立TLS加密连接,提供了快速、安全的通信体验。

  • 零防火墙白名单

    Stream 模式下开发者无需向公网开放提供任何服务端口,无需部署防火墙和配置白名单。

  • 零网关部署

    通过反向连接的方式建立通道,开发者只需保证运行环境具备公网访问能力即可,无需部署网关。

  • 零内网穿透

    开发者无需在本地搭建内网穿透工具,通过 Stream 模式在本地开发环境中即可接收卡片回调。

接入方式
接入限制
  1. 应用程序所部署环境具备访问公网的能力。
  2. 仅适用于企业内部开发和第三方企业应用。
  3. 每个客户端实例默认启用一条 WebSocket 连接,一个应用默认最多建立50条连接。
协议接入步骤
介绍

钉钉 Stream 协议接入主要包括两个步骤:

  1. 注册连接凭证:通过 HTTP POST 方法,获取 WebSocket 通道的 endpoint(协议域名和Path信息) 和 ticket(URL 中的 Ticket 参数);
  2. 建立 WebSocket 连接:通过步骤一中获取的 endpoint 和 ticket 信息,建立 WebSocket 通道,开始订阅;
步骤一:注册连接凭证

调用以下接口注册 Stream 连接凭证:

请求方法(HTTP)示例:

POST /v1.0/gateway/connections/open HTTP/1.1
Host: api.dingtalk.com
Content-Type:application/json
Accept: application/json

{
    "clientId": "${ClientID}",
    "clientSecret": "${ClientSecret}",
    "localIp": "10.34.22.11",
    "subscriptions": [
        {
            "topic": "*",
            "type": "EVENT"
        },
        {
            "topic": "/v1.0/im/bot/messages/get",
            "type": "CALLBACK"
        }
    ],
    "ua": "dingtalk-sdk-java/1.0.2"
}
步骤二:建立 WebSocket 连接

注册长连接信息成功后,客户端将获取长连的身份标识ticket和钉钉开放平台的地址信息,通过此信息客户端和钉钉服务端建立一条WebSocket连接,握手请求的路径和参数信息如下所示

GET /connect?ticket=${ticket} HTTP/1.1
Host: wss-open-connection.dingtalk.com
Upgrade: websocket

以上示例中 Host、Path 信息仅用于示例展示,请用步骤一中返回的 endpoint 信息作为 Host 和 Path 构建 WebSocket 请求。

钉钉服务端收到 WebSocket 握手信息后会通过 ticket 校验客户端身份信息,校验成功后会返回正确的握手信息。

至此,已经完成了推送订阅通道建立,可以实时接收到订阅的消息列表。下一章节将介绍各种类型的消息,以及如何响应(用于通知钉钉服务端已经成功接收,请勿重复推送)。
Java
  • 运行环境

    JDK1.8及以上。

  • 安装Java SDK

    添加依赖项到工程的pom.xml文件或下载对应的jar包,最新的 SDK 版本可以在这里查看和下载。

<dependency>
  <groupId>com.dingtalk.open</groupId>
  <artifactId>dingtalk-stream</artifactId>
  <version>{sdk-version}</version>
</dependency>
钉钉Stream流的构建

创建 IM 消息的监听

@Configuration
public class DingTalkStreamClientConfiguration {

    @Value("${app.appKey}")
    private String clientId;
    @Value("${app.appSecret}")
    private String clientSecret;

    /**
     * 配置OpenDingTalkClient客户端并配置初始化方法(start)
     *
     * @param chatBotCallbackListener
     * @param aiGraphPluginCallbackListener
     * @return
     * @throws Exception
     */
    @Bean(initMethod = "start")
    public OpenDingTalkClient configureStreamClient(@Autowired ChatBotCallbackListener chatBotCallbackListener) throws Exception {
        // init stream client
        return OpenDingTalkStreamClientBuilder.custom()
                //配置应用的身份信息, 企业内部应用分别为appKey和appSecret, 三方应用为suiteKey和suiteSecret
                .credential(new AuthClientCredential(clientId, clientSecret))
                //注册机器人回调
                .registerCallbackListener(DingTalkStreamTopics.BOT_MESSAGE_TOPIC, chatBotCallbackListener)
                .build();
    }
}

机器人消息回调

/**
 * 机器人消息回调
 *
 * @author zeymo
 */
@Slf4j
@Component
public class ChatBotCallbackListener implements OpenDingTalkCallbackListener<ChatbotMessage, JSONObject> {
     private RobotPrivateMessagesService robotPrivateMessagesService;

    @Autowired
    public RobotMsgCallbackConsumer(RobotPrivateMessagesService robotPrivateMessagesService) {
        this.robotPrivateMessagesService = robotPrivateMessagesService;
    }

    /**
     * https://open.dingtalk.com/document/orgapp/the-application-robot-in-the-enterprise-sends-group-chat-messages
     *
     * @param message
     * @return
     */
    @Override
    public JSONObject execute(ChatbotMessage message) {
        try {
            MessageContent text = message.getText();
            if (text != null) {
                String msg = text.getContent();
                log.info("receive bot message from user={}, msg={}", message.getSenderId(), msg);
                String openConversationId = message.getConversationId();
                try {
                    //发送机器人消息
                    robotPrivateMessagesService.send(openConversationId, "hello");
                } catch (Exception e) {
                    log.error("send group message by robot error:" + e.getMessage(), e);
                }
            }
        } catch (Exception e) {
            log.error("receive group message by robot error:" + e.getMessage(), e);
        }
        return new JSONObject();
    }
}

以上代码实现了这些能力:

  1. 通过命令行参数读取 Client ID 和 Client Secret 选项
  2. 通过 Client ID 和 Client Secret 创建 Stream Client
  3. 在 Stream Client 中注册机器人消息回调方法,实现消息接收能力
  4. 在消息回调方法中,简单 echo 机器人消息回去,实现消息发送(回复)能力

在 IDE 中运行 BotEchoMarkdownApplication.java 中 main 函数,当看到这样的日志输出时候表示运行成功 [DingTalk] connection is established, connectionId=...

配置Stream配送
前提条件
  1. 拥有所在钉钉组织开发者后台的开发者权限
  2. 拥有所在钉钉组织的企业内部应用
  3. 已经完成开发 Stream 模式推送服务端(推荐)流程。
操作步骤
  1. 登录开发者后台,单击目标应用,进入应用详情页。
  2. 单击开发配置 > **事件订阅,**选择 Stream 模式推送
  3. 服务端开发完成后,单击已完成接入,验证连接通道
  4. 单击保存。保存完成后,事件订阅列表才会展示。

记录bug

1.重新发送
原因是网络延迟不可控,如果因为互联网的正常抖动导致推送延迟,触发超时重新推送的话,就会出现重复事件。因此无论是否正确的ACK了,都需要考虑到收到重复事件的可能性。
备注:机器人消息当前是fire-forgot模式,不会因为网络超时而重复推送。

2.多个实例,监听同一个机器人。只会有一个消费。确保只能被服务器消费。

解决方案:使用conditional控制IP地址来阻止Stream流连接

ConditionalOnIp.java

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Conditional(RobotContion.class)
public @interface ConditionalOnIp {
    /**
     * 允许的IP地址列表,支持通配符和CIDR表示法
     * 例如: "192.168.1.*", "10.0.0.0/24", "127.0.0.1"
     */
    String[] allowed() default {};

    /**
     * 禁止的IP地址列表
     */
    String[] denied() default {};

    /**
     * 配置属性名称,从配置文件中读取IP列表
     */
    String value() default "";

    /**
     * 当无法获取IP时是否匹配(默认false)
     */
    boolean matchIfMissing() default false;
}

RobotContion.java

    public class RobotContion implements Condition {
    private static final String LOCAL_IP = getLocalIp();
    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        Map<String, Object> annotationAttributes = metadata.getAnnotationAttributes(ConditionalOnIp.class.getName());
        if(annotationAttributes == null){
            return false;
        }

        String[] allowedIPs = (String[]) annotationAttributes.get("allowed");
        String[] deniedIPs = (String[]) annotationAttributes.get("denied");
        String configProperty = (String) annotationAttributes.get("value");
        boolean matchIfMissing = (boolean) annotationAttributes.get("matchIfMissing");

        if(StringUtils.hasText(configProperty)){
            Environment env = context.getEnvironment();
            String configValue = env.getProperty(configProperty);
            if (configValue != null) {
                allowedIPs = configValue.split(",");
            } else {
                return matchIfMissing;
            }
        }

        String currentIP = LOCAL_IP;
        if (currentIP == null) {
            return matchIfMissing;
        }
        // 检查禁止列表
        if (deniedIPs.length > 0 && isIPInList(currentIP, deniedIPs)) {
            return false;
        }

        // 检查允许列表
        if (allowedIPs.length > 0) {
            return isIPInList(currentIP, allowedIPs);
        }

        // 如果没有设置允许列表,且不在禁止

        return true;
    }

    /**
     * 判断IP是否在列表中
     */
    private boolean isIPInList(String ip, String[] ipList) {
        for (String ipPattern : ipList) {
            if (matchesIP(ip, ipPattern.trim())) {
                return true;
            }
        }
        return false;
    }

    /**
     * IP匹配逻辑,支持通配符和CIDR
     */
    private boolean matchesIP(String ip, String pattern) {
        // 精确匹配
        if (ip.equals(pattern)) {
            return true;
        }

        // 通配符匹配: 192.168.1.*
        if (pattern.contains("*")) {
            String regex = pattern.replace(".", "\\.").replace("*", ".*");
            return Pattern.matches(regex, ip);
        }

        // CIDR表示法匹配: 192.168.1.0/24
        if (pattern.contains("/")) {
            try {
                return isInCIDRRange(ip, pattern);
            } catch (Exception e) {
                return false;
            }
        }

        // IP范围匹配: 192.168.1.1-192.168.1.100
        if (pattern.contains("-")) {
            try {
                return isInIPRange(ip, pattern);
            } catch (Exception e) {
                return false;
            }
        }

        return false;
    }

    /**
     * CIDR范围匹配
     */
    private boolean isInCIDRRange(String ip, String cidr) {
        String[] parts = cidr.split("/");
        String network = parts[0];
        int prefixLength = Integer.parseInt(parts[1]);

        long ipLong = ipToLong(ip);
        long networkLong = ipToLong(network);
        long mask = (0xFFFFFFFFL) << (32 - prefixLength);

        return (ipLong & mask) == (networkLong & mask);
    }

    /**
     * IP范围匹配
     */
    private boolean isInIPRange(String ip, String range) {
        String[] ips = range.split("-");
        long start = ipToLong(ips[0].trim());
        long end = ipToLong(ips[1].trim());
        long ipLong = ipToLong(ip);

        return ipLong >= start && ipLong <= end;
    }

    /**
     * IP转long
     */
    private long ipToLong(String ip) {
        String[] octets = ip.split("\\.");
        long result = 0;
        for (int i = 0; i < 4; i++) {
            result
                    |= Long.parseLong(octets[i]) << (24 - (8 * i));
        }
        return result;
    }

    private static String getLocalIp() {
        //优先获取回环地址
        try {
            Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
            while (netInterfaces.hasMoreElements()) {
                NetworkInterface iface = netInterfaces.nextElement();
                if (iface.isLoopback() || !iface.isUp()) {
                    continue;
                }

                Enumeration<InetAddress> addresses = iface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress addr = addresses.nextElement();
                    if (!addr.isLoopbackAddress() && iface.getDisplayName().indexOf("Virtual") == -1) {
                        return addr.getHostAddress();
                    }
                }
            }
//            System.out.println(InetAddress.getLocalHost().getHostAddress());
            //如果没有找到,返回会回环地址
            return InetAddress.getLocalHost().getHostAddress();
        } catch (SocketException e) {
            return "127.0.0.1";
        } catch (UnknownHostException e) {
            return "127.0.0.1";
        }
    }
}

接下来就可以设置配置文件或者设置allowed属性来控制IP了

3.响应消息处理是否支持负载均衡方式处理?
补充问题:启动多个程序订阅相同事件进行数据处理,担心将来推送数据量大时可能出现无法及时响应的情况;可能需要考虑方案提供给客户端侧进行参考
支持的。如果事件量较大的话,可以采用多进程,或者单进程下多Stream Client实例方式,建立多个 Stream 通道,也即多个 WebSocket 长连接。钉钉服务端每次推送消息时候,通过随机策略选取一个Stream通道推送。如果需要支持更多的负载均衡策略,可以通过技术支持提交反馈。

4.注册连接凭证中的 localIp 是否可以标记多个IP?
补充问题:或增加自定义的客户端标识参数【可选】
可以支持多个IP,采用英文逗号分隔。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐