可穿戴设备能够实时采集用户的健康数据(如心率、体温、步数等),并通过无线网络传输到云端进行存储和分析。然而,在现实生活中,网络不稳定或设备故障可能导致数据丢失,影响健康数据分析的准确性。例如:针对心率监测手环在地铁隧道等弱网环境下的数据积压问题,我们就利用了IoTDB的Write Ahead Log机制实现断点续传。

推荐IoTDB 的理由

  • 高效的数据压缩: IoTDB内置了多种高效的压缩算法,可以显著减少存储空间占用。

  • 分布式架构: IoTDB支持集群部署,可以水平扩展以应对大规模数据和高并发请求。

  • 高可用性和容错性: 提供主从复制机制,确保数据在节点故障时仍然可用,并且可以通过WAL(Write-Ahead Logging)机制实现断点续传,保证数据一致性。

  • 开源免费: 作为Apache软件基金会的顶级项目,IoTDB是开源且免费的,降低了项目的初期投入成本。

  • 快速的数据查询: 支持复杂的时序查询操作,如聚合、降采样等,满足数据分析的需求。

代码实操

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Apache IoTDB Client -->
    <dependency>
        <groupId>org.apache.iotdb</groupId>
        <artifactId>iotdb-session</artifactId>
        <version>1.0.0</version>
    </dependency>

application.properties

iotdb.host=localhost
iotdb.port=6667
iotdb.username=root
iotdb.password=root

启用了WAL机制

确保在IoTDB的配置文件(conf/iotdb-engine.properties)中启用了WAL机制。

enable_wal=true
wal_dir=data/wal

配置类

package com.example.iotdbspringbootdemo;

import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
class IotdbConfig {

    @Value("${iotdb.host}")
    private String host; // IoTDB服务器主机地址

    @Value("${iotdb.port}")
    private int port; // IoTDB服务器端口号

    @Value("${iotdb.username}")
    private String username; // 连接IoTDB的用户名

    @Value("${iotdb.password}")
    private String password; // 连接IoTDB的密码

    @Bean
    public Session getSession() throws IoTDBConnectionException {
        // 创建一个Session对象来连接IoTDB
        Session session = new Session(host, port, username, password);
        session.open(false); // 禁用自动获取模式
        return session;
    }
}

服务类

package com.example.iotdbspringbootdemo;

import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TsDataType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Service
class HealthDataService {

    @Autowired
    private Session session;

    /**
     * 将健康数据插入到IoTDB中
     *
     * @param deviceId 设备ID
     * @param measurements 测量值及其对应的数据类型
     * @param values 测量值的实际数据
     * @throws StatementExecutionException SQL语句执行异常
     * @throws IOException IO异常
     */
    public void insertHealthData(String deviceId, Map<String, TsDataType> measurements, Map<String, Object> values)
            throws StatementExecutionException, IOException {
        long time = System.currentTimeMillis(); // 获取当前时间戳作为记录的时间戳
        List<String> measurementList = new ArrayList<>(measurements.keySet()); // 提取测量值名称列表
        List<TsDataType> typeList = new ArrayList<>(measurements.values()); // 提取测量值类型列表
        Object[] valueList = values.values().toArray(); // 提取测量值数组

        try {
            // 插入一条记录到IoTDB
            session.insertRecord(deviceId, time, measurementList, typeList, valueList);
        } catch (StatementExecutionException | IOException e) {
            // 捕获并抛出异常
            throw new RuntimeException("Failed to insert health data", e);
        }
    }
}

Controller

package com.example.iotdbspringbootdemo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/health")
class HealthController {

    @Autowired
    private HealthDataService healthDataService;

    /**
     * 接收来自可穿戴设备的健康数据
     *
     * @param deviceId 设备ID
     * @param healthData 健康数据Map,键为测量值名称,值为测量值
     * @return HTTP响应实体
     */
    @PostMapping("/data")
    public ResponseEntity<?> receiveHealthData(@RequestParam String deviceId,
                                             @RequestBody Map<String, Object> healthData) {
        Map<String, TsDataType> measurements = new HashMap<>(); // 存储测量值及其对应的数据类型

        // 根据传入的健康数据确定其数据类型
        for (String key : healthData.keySet()) {
            if (key.equals("heartRate")) {
                measurements.put(key, TsDataType.INT32); // 心率是整数类型
            } elseif (key.equals("temperature")) {
                measurements.put(key, TsDataType.FLOAT); // 体温是浮点类型
            } elseif (key.equals("steps")) {
                measurements.put(key, TsDataType.INT32); // 步数是整数类型
            }
        }

        try {
            // 调用服务方法插入健康数据
            healthDataService.insertHealthData(deviceId, measurements, healthData);
            return ResponseEntity.ok("Data inserted successfully"); // 返回成功响应
        } catch (Exception e) {
            // 捕获并返回错误响应
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
        }
    }
}

Application

package com.example.iotdbspringbootdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class IotdbSpringbootDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(IotdbSpringbootDemoApplication.class, args);
    }
}

测试

curl -X POST http://localhost:8080/health/data?deviceId=device_1 \
-H "Content-Type: application/json" \
-d '{"heartRate": 75, "temperature": 36.8, "steps": 1200}'

Respons

{
    "timestamp": "2025-06-16T09:45:30.123+00:00",
    "status": 200,
    "error": null,
    "message": "Data inserted successfully",
    "path": "/health/data"
}

关注我,送Java福利

/**
 * 这段代码只有Java开发者才能看得懂!
 * 关注我微信公众号之后,
 * 发送:"666",
 * 即可获得一本由Java大神一手面试经验诚意出品
 * 《Java开发者面试百宝书》Pdf电子书
 * 福利截止日期为2025年06月28日止
 * 手快有手慢没!!!
*/
System.out.println("请关注我的微信公众号:");
System.out.println("Java知识日历");
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐