最近在做一个物联网项目,需求是将传感器数据在无网络的环境下收集到服务器。思前想后,决定走DTU通信。

前提:

1.DTU相关配置请自行百度,本项目使用的是原子云4G DTU,需要准备4G物联网卡。

       相关配置参考:原子云初体验——ESP8266连接原子云 - it610.com

2.需要下载RXTXcomm.jar,网址:http://fizzed.com/oss/rxtx-for-java,拷贝动态库到对应的jdk目录下

·Windows平台

  拷贝  rxtxSerial.dll ---> <JAVA_HOME>\jre\bin
  拷贝  rxtxParallel.dll ---> <JAVA_HOME>\jre\bin

·Linux平台

  拷贝  librxtxSerial.so ---> <JAVA_HOME>/jre/lib/i386/
  拷贝  librxtxParallel.so ---> <JAVA_HOME>/jre/lib/i386/

3.)在 pom.xml 中引入本地 jar 包依赖

<dependency>
     <groupId>org.bidib.jbidib.org.qbang.rxtx</groupId>
     <artifactId>rxtxcomm</artifactId>
     <version>2.2</version>
</dependency>

代码:

package com.example.goose.util;

import java.io.*;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.example.goose.redis.RedisUtil;
import com.example.goose.webSocket.MyWebSocket;
import gnu.io.*;
import javax.annotation.Resource;

public class ContinueRead extends Thread implements SerialPortEventListener {
    // SerialPortEventListener
    // 监听器,独立线程监听串口数据
    static CommPortIdentifier portId; // 串口通信管理类
    static Enumeration<?> portList; // 有效连接上的端口的枚举
    InputStream inputStream; // 从串口来的输入流
    public static OutputStream outputStream;// 向串口输出的流
    public static SerialPort serialPort; // 串口的引用

    
    @Override
    /**
     * SerialPort EventListene 的方法,持续监听端口上是否有数据流
     **/
    public void serialEvent(SerialPortEvent event) {

        switch (event.getEventType()) {
            case SerialPortEvent.BI:
            case SerialPortEvent.OE:
            case SerialPortEvent.FE:
            case SerialPortEvent.PE:
            case SerialPortEvent.CD:
            case SerialPortEvent.CTS:
            case SerialPortEvent.DSR:
            case SerialPortEvent.RI:
            case SerialPortEvent.OUTPUT_BUFFER_EMPTY:break;
            case SerialPortEvent.DATA_AVAILABLE:// 当有可用数据时读取数据

                //创建100byte缓冲池
                byte[] readBuffer = new byte[100];

          //频繁插入数据,为了数据安全上悲观锁,防止读脏数据
          //也可以用堵塞队列 
          //private BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>();

                synchronized(this){
                try {
                     //循环读取数据流,DTU会发送心跳包,注意过滤字段(可以在原子云设置)
                     while (inputStream.available() > 0) {
                        //你的业务代码
                        inputStream.read(readBuffer);
                        //根据*切分成数组 如数据为 kabit*123
                        String[] split = new String(readBuffer).split("\\*");

                        System.out.println("此数据为名称和ID");
                        
                        //将数据插入数据库
                        activeInsert(split);

                        //WebSocket通知页面更新数据
                        MyWebSocket.broadcast("名称");

                        // 重新构造缓冲对象,否则有可能会影响接下来接收的数据
                        readBuffer = new byte[100];
                     }
                    
                    }
                } catch (IOException e) {
                } 
                break;
            }
        }
    }

    /**
     *
     * 通过程序打开COM串口,设置监听器以及相关的参数
     *
     * @return 返回1 表示端口打开成功,返回 0表示端口打开失败
     */
	public int startComPort() {

        // 通过串口通信管理类获得当前连接上的串口列表
        portList = CommPortIdentifier.getPortIdentifiers();
 
        while (portList.hasMoreElements()) {

            // 获取相应串口对象
            portId = (CommPortIdentifier) portList.nextElement();

            System.out.println("设备类型:->" + portId.getPortType());
            System.out.println("设备名称:->" + portId.getName());
            // 判断端口类型是否为串口
            if (portId.getPortType() == CommPortIdentifier.PORT_SERIAL) {
                // 判断如果COM10串口存在,就打开该串口
                if (portId.getName().equals("COM4")) {
                    try {
                        // 打开串口名字为COM10(名字任意),延迟为2毫秒
                        serialPort = (SerialPort) portId.open("COM4", 2000);

                    } catch (PortInUseException e) {
                        e.printStackTrace();
                        return 0;
                    }
                    // 设置当前串口的输入输出流
                    try {
                        inputStream = serialPort.getInputStream();
                        outputStream = serialPort.getOutputStream();
                    } catch (IOException e) {
                        e.printStackTrace();
                        return 0;
                    }
                    // 给当前串口添加一个监听器
                    try {
                        serialPort.addEventListener(this);
                    } catch (TooManyListenersException e) {
                        e.printStackTrace();
                        return 0;
                    }
                    // 设置监听器生效,即:当有数据时通知
                    serialPort.notifyOnDataAvailable(true);

                    // 设置串口的一些读写参数
                    try {
                        // 比特率、数据位、停止位、奇偶校验位
                        serialPort.setSerialPortParams(115200,
                        SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
                        SerialPort.PARITY_NONE);
                    } catch (UnsupportedCommOperationException e) {
                        e.printStackTrace();
                        return 0;
                    }
                    return 1;
                }
            }
        }
        return 0;
    }

    public static void closeSerialPort() {
        if(serialPort != null) {
            serialPort.close();
            System.out.println("关闭了串口:"+serialPort.getName());
            serialPort = null;
        }
    }

 public synchronized static void activeInsert(String [] sourceArray) throws MalformedURLException {

        //调用自身Http接口
        String s1="http://localhost/setInfo?name=sourceArray[0]&id=sourceArray[1]";
        URL url=new URL(s1);
        System.out.println(url);
        URLConnection urlcon;
        try {
            urlcon = url.openConnection();
            InputStream is = urlcon.getInputStream();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        //通知Redis更新数据
        String s2="http://localhost/RedisSetNameList?key=nameUpdate";
        URL url2=new URL(s2);
        System.out.println(url2);
        URLConnection urlcon1;
        try {
            urlcon1 = url2.openConnection();
            InputStream is1 = urlcon1.getInputStream();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }




}

控制层:

package com.example.goose.controller;

import com.example.goose.util.ContinueRead;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@CrossOrigin
public class RXTXController {
    //RXTX通信DTU
    @RequestMapping(path = "/openRxtx", method = {RequestMethod.GET})
    public void openEquipment() throws Exception {
            ContinueRead cRead = new ContinueRead();
            cRead.closeSerialPort();
            int i = cRead.startComPort();
            if (i == 1) {
                // 启动线程来处理收到的数据
                cRead.start();
            } else { return; }
      }
    @RequestMapping(path = "/closeRxtx", method = {RequestMethod.GET})
    public void closeRxtx() throws Exception {
        ContinueRead cRead = new ContinueRead();
        cRead.closeSerialPort();
    }
}


其他业务代码就不放了,有问题欢迎在留言区留言。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐