influxdb是一个时序数据库,应用于物联网领域,用于存储时序数据。
InfluxDB Time Series Data Platform | InfluxData

docker安装influxdb
docker run -d --name influxdb -p 8086:8086 -v D:/dockerData/influxdb2711:/var/lib/influxdb2 influxdb:2.7.11

启动后浏览器输入http://localhost:8086, 进入管理界面,第一次会让注册用户名及密码

数据结构

influxDB 的数据格式主要采用 行协议(Line Protocol),这是一种高效的文本格式,专为时间序列数据设计
行协议(Line Protocol)基本结构

<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
  • Measurement(测量名称)
    描述数据的类别(如 cpu、temperature)
    要求:不可包含空格或逗号
  • Tags(标签)
    键值对形式的元数据,用于高效索引
    特点:
    可选,多个标签用逗号分隔
    键和值都必须是字符串
    会被索引,适合高频查询条件
    示例:cpu,host=server01,region=zh value=444,status="running",其中host=server01,region=zh是标签
  • Fields(字段)
    实际存储的数值数据
    特点:
    至少需要一个字段
    支持类型:float、integer、string、boolean
    不被索引,适合低频查询但高频变化的数据
    示例: cpu,host=server01,region=zh value=444,status="running,其中value=444,status="running"是字段
  • Timestamp(时间戳)
    可选,默认使用服务器时间
    精度:支持纳秒(默认)、微秒、毫秒、秒
java依赖
    <dependency>
        <groupId>com.influxdb</groupId>
        <artifactId>influxdb-client-java</artifactId>
        <version>6.6.0</version>
    </dependency>


influxdb:
  url: http://localhost:8086
  token: ${INFLUX_TOKEN:Wv22oHXcPBh0WNAdivZFtBJflow8M-sEfOgEq52UbHM-_OdAhIr7ebcf0XjRJWNDJ3gJ-yFot2ayx0HSJWVfig==}  # 优先从环境变量读取
  org: wwe
  bucket: default
写入数据
// 定 义InfluxDBClient 和 WriteApi,
//   url: http://localhost:8086, org组织: cxzn, bucket目录桶: default

   @Bean
    public InfluxDBClient influxDBClient() {
        return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
    }


    @Bean
    public WriteApi writeApi(InfluxDBClient influxDBClient) {
        return influxDBClient.getWriteApi();
    }


    // 数据准备 String testLineProtocol = "cpu,host=server01,region=zh value=444 " + System.currentTimeMillis() * 1000000;
    private final WriteApi writeApi; 注入

    private final WriteApi writeApi;

    // 代码片段
    public void writeData(String lineProtocolData) {
        try {
            // writeApi.writeRecord(WritePrecision.NS,lineProtocolData); 单个保存
            writeApi.writeRecords(WritePrecision.NS,new ArrayList<String>(){{add(lineProtocolData);}}); //批量保存
            writeApi.flush();
        } catch (Exception e) {
            throw new RuntimeException("写入InfluxDB失败");
        }
    }

    // 更安全的关闭方式
    @PreDestroy
    public void close() {
        writeApi.close();
    }
读取数据
private final InfluxDBClient influxDBClient; // 注入

/**
     * 查询InfluxDB数据
     * @param measurement 测量名称
     * @param field 字段名
     * @param start 开始时间(相对于现在,例如 "-1h" 表示1小时前)
     * @param stop 结束时间(相对于现在,例如 "now()" 表示现在)
     * @return 查询结果列表
     */
    public List<Map<String, Object>> queryData(String measurement, String field, String start, String stop) {
        QueryApi queryApi = influxDBClient.getQueryApi();

        // 构建Flux查询语句
        String flux = String.format(
                "from(bucket:\"default\")" +
                " |> range(start: %s, stop: %s)" +
                " |> filter(fn: (r) => r._measurement == \"%s\")" +
                (field != null ? " |> filter(fn: (r) => r._field == \"%s\")" : "") +
                " |> yield()",
                start, stop, measurement, field);

        System.out.println("执行查询: " + flux);

        // 执行查询
        List<FluxTable> tables = queryApi.query(flux);

        // 处理查询结果
        List<Map<String, Object>> resultList = new java.util.ArrayList<>();
        for (FluxTable table : tables) {
            for (FluxRecord record : table.getRecords()) {
                Map<String, Object> row = new HashMap<>();
                row.put("time", record.getTime());
                row.put("measurement", record.getMeasurement());
                row.put("field", record.getField());
                row.put("value", record.getValue());

                // 添加所有标签
                record.getValues().forEach((key, value) -> {
                    if (!key.startsWith("_")) {  // 排除内部字段
                        row.put(key, value);
                    }
                });

                resultList.add(row);
                System.out.println("查询结果: " + row);
            }
        }

        return resultList;
    }

    /**
     * 简化版查询方法,使用默认时间范围(过去1小时到现在)
     * @param measurement 测量名称
     * @return 查询结果列表
     */
    public List<Map<String, Object>> queryData(String measurement) {
        return queryData(measurement, null, "-1h", "now()");
    }

    // 更安全的关闭方式
    @PreDestroy
    public void close() {
        writeApi.close();
    }

执行结果示例

执行查询: from(bucket:"default") |> range(start: -4h, stop: now()) |> filter(fn: (r) => r._measurement == "cpu") |> filter(fn: (r) => r._field == "value") |> yield()
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T06:59:02.377Z, region=us-west, value=0.64, measurement=cpu, table=0}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T07:03:16.196Z, region=zh, value=0.77, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T08:26:04.425Z, region=zh, value=911.0, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T08:26:42.829Z, region=zh, value=333.0, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T08:36:37.167Z, region=zh, value=444.0, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server02, time=2025-03-27T07:08:01.890Z, region=zh, value=110.0, measurement=cpu, table=2}

在浏览器控制台查看数据

实际使用可以混合使用(MySQL + InfluxDB),用InfluxDB存储与用户行为相关的时间序列数据(如登录日志、操作记录)。

// 用户登录日志写入 InfluxDB
@PostMapping("/login")
public void login(@RequestBody User user) {
    // 1. 验证用户信息(从 MySQL 查询)
    SysUser dbUser = userMapper.selectByUserName(user.getUserName());
    
    // 2. 记录登录时间(写入 InfluxDB)
    Point point = Point.measurement("user_login")
        .tag("user_id", dbUser.getUserId().toString())
        .addField("ip", user.getIp())
        .time(System.currentTimeMillis(), WritePrecision.MS);
    influxDBClient.write(point);
}

高可用配置

参考chengshiwen的influx-proxy,使用的是influxdb2.7版本
架构图来源https://github.com/chengshiwen/influx-proxy/wiki

生产环境可以使用:两个 influx proxy、两个 circle 、每个 circle 三个 influxdb
如下是一个 influx proxy、两个 circle 、每个 circle 三个 influxdb

1st 创建自定义网络
docker network create influx_net
2nd docker-compose.yml
version: "3.5"
services:
  influx-proxy:
    image: chengshiwen/influx-proxy:3.0.0-preview
    container_name: influx-proxy
    ports:
      - 7076:7076
    environment:
      - TZ=Asia/Shanghai
    volumes:
      - ./proxy.json:/etc/influx-proxy/proxy.json
    restart: unless-stopped
    networks:
      - influx_net

  influxdb-1:
    image: influxdb:2.7
    container_name: influxdb-1
    restart: unless-stopped
    volumes:
      - D:/dockerData/influxdb-cluster/influxdb-1:/var/lib/influxdb2  
    ports:
      - "192.168.31.62:8087:8086"  
    networks:
      - influx_net

  influxdb-2:
    image: influxdb:2.7
    container_name: influxdb-2
    restart: unless-stopped
    volumes:
      - D:/dockerData/influxdb-cluster/influxdb-2:/var/lib/influxdb2  
    ports:
      - "192.168.31.62:8088:8086"
    networks:
      - influx_net

  influxdb-3:
    image: influxdb:2.7
    container_name: influxdb-3
    restart: unless-stopped
    volumes:
      - D:/dockerData/influxdb-cluster/influxdb-3:/var/lib/influxdb2  
    ports:
      - "192.168.31.62:8089:8086"
    networks:
      - influx_net

  influxdb-4:
    image: influxdb:2.7
    container_name: influxdb-4
    restart: unless-stopped
    volumes:
      - D:/dockerData/influxdb-cluster/influxdb-4:/var/lib/influxdb2  
    ports:
      - "192.168.31.62:8090:8086"
    networks:
      - influx_net

  influxdb-5:
    image: influxdb:2.7
    container_name: influxdb-5
    restart: unless-stopped
    volumes:
      - D:/dockerData/influxdb-cluster/influxdb-5:/var/lib/influxdb2  
    ports:
      - "192.168.31.62:8091:8086"
    networks:
      - influx_net

  influxdb-6:
    image: influxdb:2.7
    container_name: influxdb-6
    restart: unless-stopped
    volumes:
      - D:/dockerData/influxdb-cluster/influxdb-6:/var/lib/influxdb2  
    ports:
      - "192.168.31.62:8092:8086"
    networks:
      - influx_net

networks:
  influx_net:
3rd proxy.json

如下内容中的backends中的token内容可以随便填写,后续会从docker中获取各influxdb实例并回填
java客户端调用时使用的客户端需要的token,来自于和circles平行的token

{
    "circles": [
        {
            "name": "circle-1",
            "backends": [
                {
                    "name": "influxdb-1-1",
                    "url": "http://192.168.31.62:8087",
                    "token": "SBYEiwtJxVG9EK9XOB92UVqsWRj_RCo-bOjBuAv7-dd1kKBAj1s-tWKWbbNYT5B5U8K4Ggu537M1HxorsmYU1w=="
                },
                {
                    "name": "influxdb-1-2",
                    "url": "http://192.168.31.62:8088",
                    "token": "X3oX29YRf0EA0d8YCr_kODgnvHDnOEM2-YSVFEVsKt-J-IG5H_A0IKoEbGHbYu8x0-w_Sctwqb8cYci6y16cCg=="
                },
                { 
                    "name": "influxdb-1-3",
                    "url": "http://192.168.31.62:8089",
                    "token": "rBi_fFLhmuzqxByK9Su0GwyuXjv5Gvr9vJMB4kcVFO7gT3eho9R9JgZLORMYLYPks0dhhQTb63_xH5yIO2OIyg=="
                }
            ]
        },
        {
            "name": "circle-2",
            "backends": [
                {
                    "name": "influxdb-2-1",
                    "url": "http://192.168.31.62:8090",
                    "token": "bdmBye0SvSaC6nIYG-WtF62V-aDNXUDzkVyNVjX3ckoJvcqFjOVmDAWRy7xhKyveX0LWUFZOmGnev__3h-HfVQ=="
                },
                {
                    "name": "influxdb-2-2",
                    "url": "http://192.168.31.62:8091",
                    "token": "5bt3buqFV6SXJj-BRnSFuctzwqHFaszfjvdC6il2iNlK4hV8ov7nmdjjcf7PllLdVK_TzLeEaRTyyHhZMPs34g=="
                },
                {  
                    "name": "influxdb-2-3",
                    "url": "http://192.168.31.62:8092",
                    "token": "eomM1z_q6yuk5LSI2YDmZJ7WKpBgGTC0_laq-G02m_LnmQhvGqJqjRENhV7b7R7qzVBS7JTOLCSLp7s3OBAIEQ=="
                }
            ]
        }
    ], 
    "dbrp": {
        "separator": "/",
        "mapping": {"mydb": "myorg/mybucket", "mydb/myrp": "myorg/mybucket"}
    },
    "listen_addr": ":7076",
    "data_dir": "data",
    "hash_key": "%circle",
    "shard_key": "%org,%bk,%mm",
    "flush_size": 10000,
    "flush_time": 1,
    "check_interval": 1,
    "rewrite_interval": 10,
    "rewrite_threads": 5,
    "conn_pool_size": 20,
    "write_timeout": 10,
    "idle_timeout": 10,
    "token": "Wv22oHXcPBh0WNAdivZFtBJflow8M-sEfOgEq52UbHM-_OdAhIr7ebcf0XjRJWNDJ3gJ-yFot2ayx0HSJWVfig==",
    "ping_auth_enabled": false,
    "write_tracing": false,
    "query_tracing": false,
    "pprof_enabled": false,
    "https_enabled": false,
    "https_cert": "",
    "https_key": "",
    "tls": {
        "ciphers": [],
        "min_version": "",
        "max_version": ""
    }
}
4th setup.sh
#!/bin/bash

BASEDIR=$(cd $(dirname $0); pwd)

function sedx() {
    if [[ -n $(sed --version 2> /dev/null | grep "GNU sed") ]]; then
        sed -i "$@"
    else
        sed -i '' "$@"
    fi
}

function setup() {
    until docker logs influxdb-$1 2>&1 | grep ':8086' &>/dev/null; do
        counter=$((counter+1))
        if [ $counter -eq 30 ]; then
            echo "error: influxdb is not ready"
            exit 1
        fi
        sleep 0.5
    done
    # setup
    docker exec -it influxdb-$1 influx setup -u influxdb -p influxdb -o myorg -b mybucket -f &> /dev/null
    # dbrp mapping
    BUCKET_ID=$(docker exec -it influxdb-$1 influx bucket list -n mybucket --hide-headers | cut -f 1)
    docker exec -it influxdb-$1 influx v1 dbrp create --db mydb --rp myrp --bucket-id ${BUCKET_ID} --default &> /dev/null
    # set token
    INFLUX_TOKEN=$(docker exec -it influxdb-$1 influx auth list -u influxdb --hide-headers | cut -f 3)
    sedx "$2s#\"token\": \".*\"#\"token\": \"${INFLUX_TOKEN}\"#" ${BASEDIR}/proxy.json
}

sedx '36s#"mapping": {.*}#"mapping": {"mydb": "myorg/mybucket", "mydb/myrp": "myorg/mybucket"}#' ${BASEDIR}/proxy.json

# 初始化所有6个容器,并替换对应行号的 Token
setup 1 9
setup 2 14
setup 3 19
setup 4 29
setup 5 34
setup 6 39
5th 启动

依次执行如下命令

docker-compose up -d --scale influx-proxy=0

./setup.sh

docker-compose up -d

通过http://localhost:7076 打开是404,但通过最上面的JAVA测试,使用如上定义的token,从influxdb可以写入和读取到

setup.sh内容说明
    #!/bin/bash:声明使用 Bash 解释器执行脚本。

    BASEDIR:获取脚本所在目录的绝对路径(用于后续文件操作)。

GNU sed:-i 直接修改文件,无需备份。

BSD sed:-i '' 表示不备份文件

# 初始化 InfluxDB,这里指定了默认用户名和密码
    docker exec -it influxdb-$1 influx setup -u influxdb -p influxdb -o myorg -b mybucket -f &> /dev/null

# 更新 proxy.json 中的 token
    INFLUX_TOKEN=$(docker exec -it influxdb-$1 influx auth list -u influxdb --hide-headers | cut -f 3)

$1:传入的 InfluxDB 容器编号(对应 docker-compose.yml 中的 influxdb-1 到 influxdb-4)。

$2:proxy.json 中需要替换 token 的行号(例如 9、14、19、29、34、39)
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐