钉钉单聊机器人实现
单聊机器人实现
开发 Stream 模式推送服务端(推荐)
什么是Stream模式
Stream 模式是钉钉开放平台提供的一种集成方式,它可以监听机器人回调、事件订阅回调和注册卡片回调。使用 Stream 模式接入,钉钉开放平台将通过 Websocket 连接与应用程序通讯,Stream 模式将极大降低接入门槛和资源依赖,不需要公网服务器、IP、域名等资源,只需集成钉钉开放平台 SDK 即可。
Stream模式原理
在 Stream 模式下,开发者的应用程序通过集成 SDK 的方式与钉钉开放平台建立一条 WebSocket 连接,建立连接过程中开放平台将对连接进行鉴权。当有卡片回调发生时,开放平台将通过 WebSocket 连接将数据通知到开发者的应用程序。开发者的应用程序可以接收到这些数据并进行相应处理,从而实现与钉钉开放平台的实时通信,参考下图所示

Stream模式优势
在钉钉开放平台向应用程序发送请求的场景中,大部分都是采用 Webhook (注册公网 HTTPS 服务)的方式,包括卡片回调,使用 Webhook 方式开发过程中会遇到较多的问题,包括
- 申请公网域名和TLS证书
- 申请公网IP并部署接入网关
- 部署应用防火墙并配置白名单
- 独立处理请求的鉴权,以及加解密处理
- 搭建内网穿透环境进行本地开发调试
针对以上问题,Stream 模式将为开发者提供"五零"接入体验,将1~2周的接入开发周期降低到5分钟,包括
-
零公网IP
不需要依赖公网IP或域名,也不需要暴露公网IP,减少了公网暴露服务的安全风险并降低了开发门槛。
-
零加解密/签名/TLS证书管理
使用应用身份对连接进行鉴权,通过反向连接的方式与钉钉开放平台建立TLS加密连接,提供了快速、安全的通信体验。
-
零防火墙白名单
Stream 模式下开发者无需向公网开放提供任何服务端口,无需部署防火墙和配置白名单。
-
零网关部署
通过反向连接的方式建立通道,开发者只需保证运行环境具备公网访问能力即可,无需部署网关。
-
零内网穿透
开发者无需在本地搭建内网穿透工具,通过 Stream 模式在本地开发环境中即可接收卡片回调。
接入方式
接入限制
- 应用程序所部署环境具备访问公网的能力。
- 仅适用于企业内部开发和第三方企业应用。
- 每个客户端实例默认启用一条 WebSocket 连接,一个应用默认最多建立50条连接。
协议接入步骤
介绍
钉钉 Stream 协议接入主要包括两个步骤:
- 注册连接凭证:通过 HTTP POST 方法,获取 WebSocket 通道的 endpoint(协议域名和Path信息) 和 ticket(URL 中的 Ticket 参数);
- 建立 WebSocket 连接:通过步骤一中获取的 endpoint 和 ticket 信息,建立 WebSocket 通道,开始订阅;
步骤一:注册连接凭证
调用以下接口注册 Stream 连接凭证:
请求方法(HTTP)示例:
POST /v1.0/gateway/connections/open HTTP/1.1
Host: api.dingtalk.com
Content-Type:application/json
Accept: application/json
{
"clientId": "${ClientID}",
"clientSecret": "${ClientSecret}",
"localIp": "10.34.22.11",
"subscriptions": [
{
"topic": "*",
"type": "EVENT"
},
{
"topic": "/v1.0/im/bot/messages/get",
"type": "CALLBACK"
}
],
"ua": "dingtalk-sdk-java/1.0.2"
}
步骤二:建立 WebSocket 连接
注册长连接信息成功后,客户端将获取长连的身份标识ticket和钉钉开放平台的地址信息,通过此信息客户端和钉钉服务端建立一条WebSocket连接,握手请求的路径和参数信息如下所示
GET /connect?ticket=${ticket} HTTP/1.1
Host: wss-open-connection.dingtalk.com
Upgrade: websocket
以上示例中 Host、Path 信息仅用于示例展示,请用步骤一中返回的 endpoint 信息作为 Host 和 Path 构建 WebSocket 请求。
钉钉服务端收到 WebSocket 握手信息后会通过 ticket 校验客户端身份信息,校验成功后会返回正确的握手信息。
至此,已经完成了推送订阅通道建立,可以实时接收到订阅的消息列表。下一章节将介绍各种类型的消息,以及如何响应(用于通知钉钉服务端已经成功接收,请勿重复推送)。
Java
-
运行环境
JDK1.8及以上。
-
安装Java SDK
添加依赖项到工程的
pom.xml文件或下载对应的jar包,最新的 SDK 版本可以在这里查看和下载。
<dependency>
<groupId>com.dingtalk.open</groupId>
<artifactId>dingtalk-stream</artifactId>
<version>{sdk-version}</version>
</dependency>
钉钉Stream流的构建
创建 IM 消息的监听
@Configuration
public class DingTalkStreamClientConfiguration {
@Value("${app.appKey}")
private String clientId;
@Value("${app.appSecret}")
private String clientSecret;
/**
* 配置OpenDingTalkClient客户端并配置初始化方法(start)
*
* @param chatBotCallbackListener
* @param aiGraphPluginCallbackListener
* @return
* @throws Exception
*/
@Bean(initMethod = "start")
public OpenDingTalkClient configureStreamClient(@Autowired ChatBotCallbackListener chatBotCallbackListener) throws Exception {
// init stream client
return OpenDingTalkStreamClientBuilder.custom()
//配置应用的身份信息, 企业内部应用分别为appKey和appSecret, 三方应用为suiteKey和suiteSecret
.credential(new AuthClientCredential(clientId, clientSecret))
//注册机器人回调
.registerCallbackListener(DingTalkStreamTopics.BOT_MESSAGE_TOPIC, chatBotCallbackListener)
.build();
}
}
机器人消息回调
/**
* 机器人消息回调
*
* @author zeymo
*/
@Slf4j
@Component
public class ChatBotCallbackListener implements OpenDingTalkCallbackListener<ChatbotMessage, JSONObject> {
private RobotPrivateMessagesService robotPrivateMessagesService;
@Autowired
public RobotMsgCallbackConsumer(RobotPrivateMessagesService robotPrivateMessagesService) {
this.robotPrivateMessagesService = robotPrivateMessagesService;
}
/**
* https://open.dingtalk.com/document/orgapp/the-application-robot-in-the-enterprise-sends-group-chat-messages
*
* @param message
* @return
*/
@Override
public JSONObject execute(ChatbotMessage message) {
try {
MessageContent text = message.getText();
if (text != null) {
String msg = text.getContent();
log.info("receive bot message from user={}, msg={}", message.getSenderId(), msg);
String openConversationId = message.getConversationId();
try {
//发送机器人消息
robotPrivateMessagesService.send(openConversationId, "hello");
} catch (Exception e) {
log.error("send group message by robot error:" + e.getMessage(), e);
}
}
} catch (Exception e) {
log.error("receive group message by robot error:" + e.getMessage(), e);
}
return new JSONObject();
}
}
以上代码实现了这些能力:
- 通过命令行参数读取 Client ID 和 Client Secret 选项
- 通过 Client ID 和 Client Secret 创建 Stream Client
- 在 Stream Client 中注册机器人消息回调方法,实现消息接收能力
- 在消息回调方法中,简单 echo 机器人消息回去,实现消息发送(回复)能力
在 IDE 中运行 BotEchoMarkdownApplication.java 中 main 函数,当看到这样的日志输出时候表示运行成功 [DingTalk] connection is established, connectionId=...
配置Stream配送
前提条件
- 拥有所在钉钉组织开发者后台的开发者权限。
- 拥有所在钉钉组织的企业内部应用。
- 已经完成开发 Stream 模式推送服务端(推荐)流程。
操作步骤
- 登录开发者后台,单击目标应用,进入应用详情页。
- 单击开发配置 > **事件订阅,**选择 Stream 模式推送。
- 服务端开发完成后,单击已完成接入,验证连接通道。
- 单击保存。保存完成后,事件订阅列表才会展示。
记录bug
1.重新发送
原因是网络延迟不可控,如果因为互联网的正常抖动导致推送延迟,触发超时重新推送的话,就会出现重复事件。因此无论是否正确的ACK了,都需要考虑到收到重复事件的可能性。
备注:机器人消息当前是fire-forgot模式,不会因为网络超时而重复推送。
2.多个实例,监听同一个机器人。只会有一个消费。确保只能被服务器消费。
解决方案:使用conditional控制IP地址来阻止Stream流连接
ConditionalOnIp.java
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Conditional(RobotContion.class)
public @interface ConditionalOnIp {
/**
* 允许的IP地址列表,支持通配符和CIDR表示法
* 例如: "192.168.1.*", "10.0.0.0/24", "127.0.0.1"
*/
String[] allowed() default {};
/**
* 禁止的IP地址列表
*/
String[] denied() default {};
/**
* 配置属性名称,从配置文件中读取IP列表
*/
String value() default "";
/**
* 当无法获取IP时是否匹配(默认false)
*/
boolean matchIfMissing() default false;
}
RobotContion.java
public class RobotContion implements Condition {
private static final String LOCAL_IP = getLocalIp();
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Map<String, Object> annotationAttributes = metadata.getAnnotationAttributes(ConditionalOnIp.class.getName());
if(annotationAttributes == null){
return false;
}
String[] allowedIPs = (String[]) annotationAttributes.get("allowed");
String[] deniedIPs = (String[]) annotationAttributes.get("denied");
String configProperty = (String) annotationAttributes.get("value");
boolean matchIfMissing = (boolean) annotationAttributes.get("matchIfMissing");
if(StringUtils.hasText(configProperty)){
Environment env = context.getEnvironment();
String configValue = env.getProperty(configProperty);
if (configValue != null) {
allowedIPs = configValue.split(",");
} else {
return matchIfMissing;
}
}
String currentIP = LOCAL_IP;
if (currentIP == null) {
return matchIfMissing;
}
// 检查禁止列表
if (deniedIPs.length > 0 && isIPInList(currentIP, deniedIPs)) {
return false;
}
// 检查允许列表
if (allowedIPs.length > 0) {
return isIPInList(currentIP, allowedIPs);
}
// 如果没有设置允许列表,且不在禁止
return true;
}
/**
* 判断IP是否在列表中
*/
private boolean isIPInList(String ip, String[] ipList) {
for (String ipPattern : ipList) {
if (matchesIP(ip, ipPattern.trim())) {
return true;
}
}
return false;
}
/**
* IP匹配逻辑,支持通配符和CIDR
*/
private boolean matchesIP(String ip, String pattern) {
// 精确匹配
if (ip.equals(pattern)) {
return true;
}
// 通配符匹配: 192.168.1.*
if (pattern.contains("*")) {
String regex = pattern.replace(".", "\\.").replace("*", ".*");
return Pattern.matches(regex, ip);
}
// CIDR表示法匹配: 192.168.1.0/24
if (pattern.contains("/")) {
try {
return isInCIDRRange(ip, pattern);
} catch (Exception e) {
return false;
}
}
// IP范围匹配: 192.168.1.1-192.168.1.100
if (pattern.contains("-")) {
try {
return isInIPRange(ip, pattern);
} catch (Exception e) {
return false;
}
}
return false;
}
/**
* CIDR范围匹配
*/
private boolean isInCIDRRange(String ip, String cidr) {
String[] parts = cidr.split("/");
String network = parts[0];
int prefixLength = Integer.parseInt(parts[1]);
long ipLong = ipToLong(ip);
long networkLong = ipToLong(network);
long mask = (0xFFFFFFFFL) << (32 - prefixLength);
return (ipLong & mask) == (networkLong & mask);
}
/**
* IP范围匹配
*/
private boolean isInIPRange(String ip, String range) {
String[] ips = range.split("-");
long start = ipToLong(ips[0].trim());
long end = ipToLong(ips[1].trim());
long ipLong = ipToLong(ip);
return ipLong >= start && ipLong <= end;
}
/**
* IP转long
*/
private long ipToLong(String ip) {
String[] octets = ip.split("\\.");
long result = 0;
for (int i = 0; i < 4; i++) {
result
|= Long.parseLong(octets[i]) << (24 - (8 * i));
}
return result;
}
private static String getLocalIp() {
//优先获取回环地址
try {
Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
while (netInterfaces.hasMoreElements()) {
NetworkInterface iface = netInterfaces.nextElement();
if (iface.isLoopback() || !iface.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = iface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (!addr.isLoopbackAddress() && iface.getDisplayName().indexOf("Virtual") == -1) {
return addr.getHostAddress();
}
}
}
// System.out.println(InetAddress.getLocalHost().getHostAddress());
//如果没有找到,返回会回环地址
return InetAddress.getLocalHost().getHostAddress();
} catch (SocketException e) {
return "127.0.0.1";
} catch (UnknownHostException e) {
return "127.0.0.1";
}
}
}
接下来就可以设置配置文件或者设置allowed属性来控制IP了
3.响应消息处理是否支持负载均衡方式处理?
补充问题:启动多个程序订阅相同事件进行数据处理,担心将来推送数据量大时可能出现无法及时响应的情况;可能需要考虑方案提供给客户端侧进行参考
支持的。如果事件量较大的话,可以采用多进程,或者单进程下多Stream Client实例方式,建立多个 Stream 通道,也即多个 WebSocket 长连接。钉钉服务端每次推送消息时候,通过随机策略选取一个Stream通道推送。如果需要支持更多的负载均衡策略,可以通过技术支持提交反馈。
4.注册连接凭证中的 localIp 是否可以标记多个IP?
补充问题:或增加自定义的客户端标识参数【可选】
可以支持多个IP,采用英文逗号分隔。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)