mqtt ssl 上传数据
需求:有多个系统需要将时序数据上传到他们系统中,其中一个是ssl通信方式。设计: 使用springboot 创建多个mqtt客户端的bean。代码:pom中增加:<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</a
·
需求:有多个系统需要将时序数据上传到他们系统中,其中一个是ssl通信方式。
设计: 使用springboot 创建多个mqtt客户端的bean。
代码:
pom中增加:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>${bcprov-jdk15on.version}</version>
</dependency>
mqtt bean:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLSocketFactory;
public class MqttClientBean {
private static Logger logger = LoggerFactory.getLogger(MqttClientBean.class);
public MqttClient mqttClient;
private MemoryPersistence memoryPersistence;
private MqttConnectOptions mqttConnectOptions;
private SSLSocketFactory sslSocketFactory;
private String clientId;
private String mqUrl;
private String userName;
private String password;
private String keepAlive;
public MqttClientBean(String clientId,String mqUrl,String userName,String password,String keepAlive,SSLSocketFactory sslSocketFactory){
this.clientId=clientId;
this.mqUrl=mqUrl;
this.userName=userName;
this.password=password;
this.keepAlive=keepAlive;
this.sslSocketFactory=sslSocketFactory;
}
public void start(){
//初始化连接设置对象
mqttConnectOptions = new MqttConnectOptions();
//初始化MqttClient
if(null != mqttConnectOptions) {
//true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
mqttConnectOptions.setCleanSession(true);
//设置连接超时
mqttConnectOptions.setConnectionTimeout(90);
//设置会话心跳时间
mqttConnectOptions.setKeepAliveInterval(Integer.valueOf(keepAlive)*1000);
mqttConnectOptions.setUserName(userName);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setAutomaticReconnect(true);
try {
if(sslSocketFactory!=null){
mqttConnectOptions.setSocketFactory(sslSocketFactory);
}
} catch (Exception e) {
e.printStackTrace();
}
//设置持久化方式
memoryPersistence = new MemoryPersistence();
if(null != memoryPersistence && null != clientId) {
try {
mqttClient = new MqttClient(mqUrl, clientId,memoryPersistence);
new MqttClient(mqUrl,clientId,memoryPersistence);
} catch (MqttException e) {
e.printStackTrace();
}
}else {
}
}else {
logger.error("mqttConnectOptions对象为空");
}
logger.info(String.valueOf(mqttClient.isConnected()));
//设置连接和回调
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
//创建回调函数对象
MqttClientRecieveCallback mqttReceriveCallback = new MqttClientRecieveCallback();
//客户端添加回调函数
mqttClient.setCallback(mqttReceriveCallback);
//创建连接
try {
logger.info("创建连接");
mqttClient.connect(mqttConnectOptions);
logger.info("mqtt连接成功");
} catch (MqttException e) {
e.printStackTrace();
}
}
}else {
logger.warn("mqttClient为空");
}
logger.info(String.valueOf(mqttClient.isConnected()));
}
// 关闭连接
public void closeConnect() {
//关闭存储方式
if(null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
logger.warn("memoryPersistence is null");
}
//关闭连接
if(null != mqttClient) {
if(mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
logger.error("mqttClient is not connect");
}
}else {
logger.error("mqttClient is null");
}
}
// 发布消息
public void publishMessage(String pubTopic,String message,int qos) {
if(null != mqttClient&& mqttClient.isConnected()) {
logger.info("发布消息 "+mqttClient.isConnected());
logger.info("id:"+mqttClient.getClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(message.getBytes());
MqttTopic topic = mqttClient.getTopic(pubTopic);
if(null != topic) {
try {
MqttDeliveryToken publish = topic.publish(mqttMessage);
if(!publish.isComplete()) {
logger.info("消息发布成功");
}
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}else {
reConnect();
}
}
// 重新连接
public void reConnect() {
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
if(null != mqttConnectOptions) {
try {
mqttClient.connect(mqttConnectOptions);
logger.info("mqtt连接成功");
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
logger.error("mqttConnectOptions is null");
}
}else {
logger.error("mqttClient is null or connect");
}
}else {
start();
}
}
// 订阅主题
public void subTopic(String topic) {
if(null != mqttClient&& mqttClient.isConnected()) {
try {
mqttClient.subscribe(topic, 1);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
logger.error("mqttClient is error");
}
}
// 清空主题
public void cleanTopic(String topic) {
if(null != mqttClient&& !mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}else {
logger.error("mqttClient is error");
}
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttClientRecieveCallback implements MqttCallback{
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Client 接收消息主题 : " + topic);
System.out.println("Client 接收消息Qos : " + message.getQos());
System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
使用Configuration创建2个bean:
import com.crc.project.pi.domain.HsePointBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.ResourceUtils;
import javax.net.ssl.SSLSocketFactory;
import java.io.*;
import java.util.HashMap;
@Configuration
public class MqttConfiguration {
@Value("${mqtt.crsems.clientId}")
private String crsems_clientId;
@Value("${mqtt.crsems.mqUrl}")
private String crsems_mqUrl;
@Value("${mqtt.crsems.userName}")
private String crsems_userName;
@Value("${mqtt.crsems.password}")
private String crsems_password;
@Value("${mqtt.crsems.keepAlive}")
private String crsems_keepAlive;
@Value("${mqtt.dianli.clientId}")
private String dianli_clientId;
@Value("${mqtt.dianli.mqUrl}")
private String dianli_mqUrl;
@Value("${mqtt.dianli.userName}")
private String dianli_userName;
@Value("${mqtt.dianli.password}")
private String dianli_password;
@Value("${mqtt.dianli.keepAlive}")
private String dianli_keepAlive;
@Autowired
@Qualifier("crsemsSSLSocketFactory")
private SSLSocketFactory sslSocketFactory;
@Bean(name="crsemsMqttClient",initMethod="start",destroyMethod="closeConnect")
public MqttClientBean createCrsemsMqttClient(){
return new MqttClientBean(crsems_clientId,crsems_mqUrl,crsems_userName,crsems_password,crsems_keepAlive,sslSocketFactory);
}
@Bean(name="dianliEnergyMqttClient",initMethod="start",destroyMethod="closeConnect")
public MqttClientBean createDianliMqttClient(){
return new MqttClientBean(dianli_clientId,dianli_mqUrl,dianli_userName,dianli_password,dianli_keepAlive,null);
}
}
创建SSLSocketFactoryBean:
package com.crc.project.pi.bean;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.lang3.StringUtils;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
@Configuration
public class ConfigCrsemsSSLSocketFactoryBean{
private static final Logger logger = LoggerFactory.getLogger(ConfigCrsemsSSLSocketFactoryBean.class);
private static SSLSocketFactory selfSignKeystoreSocketFactory = null;
@Value("${mqtt.crsems.ssl-client-keystore-file}")
private String clientCertPath;
@Value("${mqtt.crsems.ssl-client-keystore-pwd}")
private String clientCertPassword;
@Value("${mqtt.crsems.ssl-client-key-pair-alias}")
private String clientPairAlias;
@Value("${mqtt.crsems.ssl-client-key-pair-pwd}")
private String clientPairPwd;
@Value("${mqtt.crsems.ssl-trusted-keystore-file}")
private String caPath;
@Value("${mqtt.crsems.ssl-trusted-keystore-alias}")
private String caAlias;
@Value("${mqtt.crsems.ssl-trusted-keystore-pwd}")
private String caPassword;
@Autowired
ResourceLoader resourceLoader;
@Bean(name="crsemsSSLSocketFactory")
public SSLSocketFactory getSelfSignKeystoreSocketFactory()
{
if (selfSignKeystoreSocketFactory == null)
{
Security.addProvider(new BouncyCastleProvider());
String trustedKeyStoreFile = caPath;
String trustedKeyStoreAlias = caAlias;
String trustedKeyStorePwd = caPassword;
logger.info(trustedKeyStoreFile);
if (StringUtils.isEmpty(trustedKeyStoreFile))
{
logger.error("=== Trusted keystore file is empty!");
return null;
}
if (StringUtils.isEmpty(trustedKeyStorePwd))
{
logger.error("=== Trusted keystore password is empty!");
return null;
}
TrustManagerFactory tmf = null;
try
{
KeyStore tks = KeyStore.getInstance("JKS");
Resource keyStoreFileResource = resourceLoader.getResource("classpath:"+trustedKeyStoreFile);
InputStream tKeyStoreFile = keyStoreFileResource.getInputStream();
tks.load(tKeyStoreFile, trustedKeyStorePwd.toCharArray());
tmf = TrustManagerFactory.getInstance("X509");
tmf.init(tks);
}
catch (KeyStoreException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
catch (IOException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
catch (CertificateException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
catch (NoSuchAlgorithmException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
String clientKeyStoreFile = clientCertPath;
String clientKeyStorePwd = clientCertPassword;
String clientKeyPairAlias = clientPairAlias;
String clientKeyPairPwd = clientPairPwd;
if (StringUtils.isEmpty(clientKeyStoreFile))
{
logger.error("=== Client keystore file is empty!");
return null;
}
if (StringUtils.isEmpty(trustedKeyStorePwd))
{
logger.error("=== Client keystore password is empty!");
return null;
}
if (StringUtils.isEmpty(clientKeyPairPwd))
{
logger.error("=== Client keypair password is empty!");
return null;
}
KeyManagerFactory kmf = null;
try
{
KeyStore cks = KeyStore.getInstance(KeyStore.getDefaultType());
Resource trustKeyFileResource = resourceLoader.getResource("classpath:"+clientCertPath);
InputStream cKeyStoreFile = trustKeyFileResource.getInputStream();
cks.load(cKeyStoreFile, clientKeyStorePwd.toCharArray());
kmf = KeyManagerFactory.getInstance(
KeyManagerFactory.getDefaultAlgorithm());
kmf.init(cks, clientKeyPairPwd.toCharArray());
}
catch (KeyStoreException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
catch (IOException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
catch (CertificateException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
catch (NoSuchAlgorithmException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
catch (UnrecoverableKeyException e)
{
e.printStackTrace();
logger.error(e.getMessage());
}
try
{
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
selfSignKeystoreSocketFactory = context.getSocketFactory();
}
catch (NoSuchAlgorithmException e)
{
e.printStackTrace();
logger.error(e.getMessage());
return null;
}
catch (KeyManagementException e)
{
e.printStackTrace();
logger.error(e.getMessage());
return null;
}
return selfSignKeystoreSocketFactory;
}
return selfSignKeystoreSocketFactory;
}
}

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)