InfluxDB时序数据库
influxdb是一个时序数据库,应用于物联网领域,用于存储时序数据。InfluxDB Time Series Data Platform | InfluxData启动后浏览器输入http://localhost:8086, 进入管理界面,第一次会让注册用户名及密码influxDB 的数据格式主要采用 行协议(Line Protocol),这是一种高效的文本格式,专为时间序列数据设计行协议(Lin
influxdb是一个时序数据库,应用于物联网领域,用于存储时序数据。
InfluxDB Time Series Data Platform | InfluxData
docker安装influxdb
docker run -d --name influxdb -p 8086:8086 -v D:/dockerData/influxdb2711:/var/lib/influxdb2 influxdb:2.7.11
启动后浏览器输入http://localhost:8086, 进入管理界面,第一次会让注册用户名及密码
数据结构
influxDB 的数据格式主要采用 行协议(Line Protocol),这是一种高效的文本格式,专为时间序列数据设计
行协议(Line Protocol)基本结构
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
- Measurement(测量名称)
描述数据的类别(如 cpu、temperature)
要求:不可包含空格或逗号 - Tags(标签)
键值对形式的元数据,用于高效索引
特点:
可选,多个标签用逗号分隔
键和值都必须是字符串
会被索引,适合高频查询条件
示例:cpu,host=server01,region=zh value=444,status="running",其中host=server01,region=zh是标签 - Fields(字段)
实际存储的数值数据
特点:
至少需要一个字段
支持类型:float、integer、string、boolean
不被索引,适合低频查询但高频变化的数据
示例: cpu,host=server01,region=zh value=444,status="running,其中value=444,status="running"是字段 - Timestamp(时间戳)
可选,默认使用服务器时间
精度:支持纳秒(默认)、微秒、毫秒、秒
java依赖
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.6.0</version>
</dependency>
influxdb:
url: http://localhost:8086
token: ${INFLUX_TOKEN:Wv22oHXcPBh0WNAdivZFtBJflow8M-sEfOgEq52UbHM-_OdAhIr7ebcf0XjRJWNDJ3gJ-yFot2ayx0HSJWVfig==} # 优先从环境变量读取
org: wwe
bucket: default
写入数据
// 定 义InfluxDBClient 和 WriteApi,
// url: http://localhost:8086, org组织: cxzn, bucket目录桶: default
@Bean
public InfluxDBClient influxDBClient() {
return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
}
@Bean
public WriteApi writeApi(InfluxDBClient influxDBClient) {
return influxDBClient.getWriteApi();
}
// 数据准备 String testLineProtocol = "cpu,host=server01,region=zh value=444 " + System.currentTimeMillis() * 1000000;
private final WriteApi writeApi; 注入
private final WriteApi writeApi;
// 代码片段
public void writeData(String lineProtocolData) {
try {
// writeApi.writeRecord(WritePrecision.NS,lineProtocolData); 单个保存
writeApi.writeRecords(WritePrecision.NS,new ArrayList<String>(){{add(lineProtocolData);}}); //批量保存
writeApi.flush();
} catch (Exception e) {
throw new RuntimeException("写入InfluxDB失败");
}
}
// 更安全的关闭方式
@PreDestroy
public void close() {
writeApi.close();
}
读取数据
private final InfluxDBClient influxDBClient; // 注入
/**
* 查询InfluxDB数据
* @param measurement 测量名称
* @param field 字段名
* @param start 开始时间(相对于现在,例如 "-1h" 表示1小时前)
* @param stop 结束时间(相对于现在,例如 "now()" 表示现在)
* @return 查询结果列表
*/
public List<Map<String, Object>> queryData(String measurement, String field, String start, String stop) {
QueryApi queryApi = influxDBClient.getQueryApi();
// 构建Flux查询语句
String flux = String.format(
"from(bucket:\"default\")" +
" |> range(start: %s, stop: %s)" +
" |> filter(fn: (r) => r._measurement == \"%s\")" +
(field != null ? " |> filter(fn: (r) => r._field == \"%s\")" : "") +
" |> yield()",
start, stop, measurement, field);
System.out.println("执行查询: " + flux);
// 执行查询
List<FluxTable> tables = queryApi.query(flux);
// 处理查询结果
List<Map<String, Object>> resultList = new java.util.ArrayList<>();
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
Map<String, Object> row = new HashMap<>();
row.put("time", record.getTime());
row.put("measurement", record.getMeasurement());
row.put("field", record.getField());
row.put("value", record.getValue());
// 添加所有标签
record.getValues().forEach((key, value) -> {
if (!key.startsWith("_")) { // 排除内部字段
row.put(key, value);
}
});
resultList.add(row);
System.out.println("查询结果: " + row);
}
}
return resultList;
}
/**
* 简化版查询方法,使用默认时间范围(过去1小时到现在)
* @param measurement 测量名称
* @return 查询结果列表
*/
public List<Map<String, Object>> queryData(String measurement) {
return queryData(measurement, null, "-1h", "now()");
}
// 更安全的关闭方式
@PreDestroy
public void close() {
writeApi.close();
}
执行结果示例
执行查询: from(bucket:"default") |> range(start: -4h, stop: now()) |> filter(fn: (r) => r._measurement == "cpu") |> filter(fn: (r) => r._field == "value") |> yield()
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T06:59:02.377Z, region=us-west, value=0.64, measurement=cpu, table=0}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T07:03:16.196Z, region=zh, value=0.77, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T08:26:04.425Z, region=zh, value=911.0, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T08:26:42.829Z, region=zh, value=333.0, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server01, time=2025-03-27T08:36:37.167Z, region=zh, value=444.0, measurement=cpu, table=1}
查询结果: {result=_result, field=value, host=server02, time=2025-03-27T07:08:01.890Z, region=zh, value=110.0, measurement=cpu, table=2}
在浏览器控制台查看数据
实际使用可以混合使用(MySQL + InfluxDB),用InfluxDB存储与用户行为相关的时间序列数据(如登录日志、操作记录)。
// 用户登录日志写入 InfluxDB
@PostMapping("/login")
public void login(@RequestBody User user) {
// 1. 验证用户信息(从 MySQL 查询)
SysUser dbUser = userMapper.selectByUserName(user.getUserName());
// 2. 记录登录时间(写入 InfluxDB)
Point point = Point.measurement("user_login")
.tag("user_id", dbUser.getUserId().toString())
.addField("ip", user.getIp())
.time(System.currentTimeMillis(), WritePrecision.MS);
influxDBClient.write(point);
}
高可用配置
参考chengshiwen的influx-proxy,使用的是influxdb2.7版本
架构图来源https://github.com/chengshiwen/influx-proxy/wiki
生产环境可以使用:两个 influx proxy、两个 circle 、每个 circle 三个 influxdb
如下是一个 influx proxy、两个 circle 、每个 circle 三个 influxdb
1st 创建自定义网络
docker network create influx_net
2nd docker-compose.yml
version: "3.5"
services:
influx-proxy:
image: chengshiwen/influx-proxy:3.0.0-preview
container_name: influx-proxy
ports:
- 7076:7076
environment:
- TZ=Asia/Shanghai
volumes:
- ./proxy.json:/etc/influx-proxy/proxy.json
restart: unless-stopped
networks:
- influx_net
influxdb-1:
image: influxdb:2.7
container_name: influxdb-1
restart: unless-stopped
volumes:
- D:/dockerData/influxdb-cluster/influxdb-1:/var/lib/influxdb2
ports:
- "192.168.31.62:8087:8086"
networks:
- influx_net
influxdb-2:
image: influxdb:2.7
container_name: influxdb-2
restart: unless-stopped
volumes:
- D:/dockerData/influxdb-cluster/influxdb-2:/var/lib/influxdb2
ports:
- "192.168.31.62:8088:8086"
networks:
- influx_net
influxdb-3:
image: influxdb:2.7
container_name: influxdb-3
restart: unless-stopped
volumes:
- D:/dockerData/influxdb-cluster/influxdb-3:/var/lib/influxdb2
ports:
- "192.168.31.62:8089:8086"
networks:
- influx_net
influxdb-4:
image: influxdb:2.7
container_name: influxdb-4
restart: unless-stopped
volumes:
- D:/dockerData/influxdb-cluster/influxdb-4:/var/lib/influxdb2
ports:
- "192.168.31.62:8090:8086"
networks:
- influx_net
influxdb-5:
image: influxdb:2.7
container_name: influxdb-5
restart: unless-stopped
volumes:
- D:/dockerData/influxdb-cluster/influxdb-5:/var/lib/influxdb2
ports:
- "192.168.31.62:8091:8086"
networks:
- influx_net
influxdb-6:
image: influxdb:2.7
container_name: influxdb-6
restart: unless-stopped
volumes:
- D:/dockerData/influxdb-cluster/influxdb-6:/var/lib/influxdb2
ports:
- "192.168.31.62:8092:8086"
networks:
- influx_net
networks:
influx_net:
3rd proxy.json
如下内容中的backends中的token内容可以随便填写,后续会从docker中获取各influxdb实例并回填
java客户端调用时使用的客户端需要的token,来自于和circles平行的token
{
"circles": [
{
"name": "circle-1",
"backends": [
{
"name": "influxdb-1-1",
"url": "http://192.168.31.62:8087",
"token": "SBYEiwtJxVG9EK9XOB92UVqsWRj_RCo-bOjBuAv7-dd1kKBAj1s-tWKWbbNYT5B5U8K4Ggu537M1HxorsmYU1w=="
},
{
"name": "influxdb-1-2",
"url": "http://192.168.31.62:8088",
"token": "X3oX29YRf0EA0d8YCr_kODgnvHDnOEM2-YSVFEVsKt-J-IG5H_A0IKoEbGHbYu8x0-w_Sctwqb8cYci6y16cCg=="
},
{
"name": "influxdb-1-3",
"url": "http://192.168.31.62:8089",
"token": "rBi_fFLhmuzqxByK9Su0GwyuXjv5Gvr9vJMB4kcVFO7gT3eho9R9JgZLORMYLYPks0dhhQTb63_xH5yIO2OIyg=="
}
]
},
{
"name": "circle-2",
"backends": [
{
"name": "influxdb-2-1",
"url": "http://192.168.31.62:8090",
"token": "bdmBye0SvSaC6nIYG-WtF62V-aDNXUDzkVyNVjX3ckoJvcqFjOVmDAWRy7xhKyveX0LWUFZOmGnev__3h-HfVQ=="
},
{
"name": "influxdb-2-2",
"url": "http://192.168.31.62:8091",
"token": "5bt3buqFV6SXJj-BRnSFuctzwqHFaszfjvdC6il2iNlK4hV8ov7nmdjjcf7PllLdVK_TzLeEaRTyyHhZMPs34g=="
},
{
"name": "influxdb-2-3",
"url": "http://192.168.31.62:8092",
"token": "eomM1z_q6yuk5LSI2YDmZJ7WKpBgGTC0_laq-G02m_LnmQhvGqJqjRENhV7b7R7qzVBS7JTOLCSLp7s3OBAIEQ=="
}
]
}
],
"dbrp": {
"separator": "/",
"mapping": {"mydb": "myorg/mybucket", "mydb/myrp": "myorg/mybucket"}
},
"listen_addr": ":7076",
"data_dir": "data",
"hash_key": "%circle",
"shard_key": "%org,%bk,%mm",
"flush_size": 10000,
"flush_time": 1,
"check_interval": 1,
"rewrite_interval": 10,
"rewrite_threads": 5,
"conn_pool_size": 20,
"write_timeout": 10,
"idle_timeout": 10,
"token": "Wv22oHXcPBh0WNAdivZFtBJflow8M-sEfOgEq52UbHM-_OdAhIr7ebcf0XjRJWNDJ3gJ-yFot2ayx0HSJWVfig==",
"ping_auth_enabled": false,
"write_tracing": false,
"query_tracing": false,
"pprof_enabled": false,
"https_enabled": false,
"https_cert": "",
"https_key": "",
"tls": {
"ciphers": [],
"min_version": "",
"max_version": ""
}
}
4th setup.sh
#!/bin/bash
BASEDIR=$(cd $(dirname $0); pwd)
function sedx() {
if [[ -n $(sed --version 2> /dev/null | grep "GNU sed") ]]; then
sed -i "$@"
else
sed -i '' "$@"
fi
}
function setup() {
until docker logs influxdb-$1 2>&1 | grep ':8086' &>/dev/null; do
counter=$((counter+1))
if [ $counter -eq 30 ]; then
echo "error: influxdb is not ready"
exit 1
fi
sleep 0.5
done
# setup
docker exec -it influxdb-$1 influx setup -u influxdb -p influxdb -o myorg -b mybucket -f &> /dev/null
# dbrp mapping
BUCKET_ID=$(docker exec -it influxdb-$1 influx bucket list -n mybucket --hide-headers | cut -f 1)
docker exec -it influxdb-$1 influx v1 dbrp create --db mydb --rp myrp --bucket-id ${BUCKET_ID} --default &> /dev/null
# set token
INFLUX_TOKEN=$(docker exec -it influxdb-$1 influx auth list -u influxdb --hide-headers | cut -f 3)
sedx "$2s#\"token\": \".*\"#\"token\": \"${INFLUX_TOKEN}\"#" ${BASEDIR}/proxy.json
}
sedx '36s#"mapping": {.*}#"mapping": {"mydb": "myorg/mybucket", "mydb/myrp": "myorg/mybucket"}#' ${BASEDIR}/proxy.json
# 初始化所有6个容器,并替换对应行号的 Token
setup 1 9
setup 2 14
setup 3 19
setup 4 29
setup 5 34
setup 6 39
5th 启动
依次执行如下命令
docker-compose up -d --scale influx-proxy=0
./setup.sh
docker-compose up -d
通过http://localhost:7076 打开是404,但通过最上面的JAVA测试,使用如上定义的token,从influxdb可以写入和读取到
setup.sh内容说明
#!/bin/bash:声明使用 Bash 解释器执行脚本。
BASEDIR:获取脚本所在目录的绝对路径(用于后续文件操作)。
GNU sed:-i 直接修改文件,无需备份。
BSD sed:-i '' 表示不备份文件
# 初始化 InfluxDB,这里指定了默认用户名和密码
docker exec -it influxdb-$1 influx setup -u influxdb -p influxdb -o myorg -b mybucket -f &> /dev/null
# 更新 proxy.json 中的 token
INFLUX_TOKEN=$(docker exec -it influxdb-$1 influx auth list -u influxdb --hide-headers | cut -f 3)
$1:传入的 InfluxDB 容器编号(对应 docker-compose.yml 中的 influxdb-1 到 influxdb-4)。
$2:proxy.json 中需要替换 token 的行号(例如 9、14、19、29、34、39)

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)