TSDB时序数据库-OpenTSDB
TSDB时序数据库,openTSDB
·
背景
需要及时获取短信发送消息队列的堆积情况;pass将相关信息及时的存入了opentsdb时序数据库中,现需要读取时序数据库将堆积情况以短信形式通知
一:概念
时间序列数据库:主要用来存储时间序列数据,常常用来做监控预警数据的存储
OpenTSDB是基于HBase存储时间序列数据的一个开源数据库
特点:
- 顺序写:实时数据写入,多以追加的方式
- 毫无遗漏的接收并存储大量的时间序列数据
- 主要用做监控系统;譬如收集大规模集群(包括网络设备、操作系统、应用程序)的监控数据并进行存储,查询
存储:
- 无需转换,写的是什么数据存的就是什么数据
- 时序数据以毫秒的精度保存
- 永久保留原始数据
读能力
- 直接通过内置的GUI来生成图表
- 还可以通过HTTP API查询数据
- 另外还可以使用开源的前端与其交互
中文文档:
https://www.docs4dev.com/amp/docs/zh/opentsdb/2.3/reference/installation.html#runtime-requirements
二:使用引入
-
maven依赖:
<dependency> <groupId>com.github.eulery</groupId> <artifactId>opentsdb-java-sdk</artifactId> <version>1.1.6</version> </dependency>
-
工具类:
package com.lmy.util; import org.apache.http.nio.reactor.IOReactorException; import org.opentsdb.client.OpenTSDBClient; import org.opentsdb.client.OpenTSDBClientFactory; import org.opentsdb.client.OpenTSDBConfig; import org.opentsdb.client.bean.request.Point; import org.opentsdb.client.bean.response.DetailResult; import org.opentsdb.client.http.callback.BatchPutHttpResponseCallback; import java.io.IOException; import java.util.List; import java.util.Map; /** * OpenTsDbUtil类 * @author 86152 */ public class OpenTsDbUtil { private static final String OPENTSDB_HOST = "http://127.0.0.1"; private static final int OPENTSDB_PORT = 8080; private static final OpenTSDBConfig config; static { config = OpenTSDBConfig // OpenTsDb数据库地址和端口号 .address(OPENTSDB_HOST, OPENTSDB_PORT) // http连接池大小,默认100 .httpConnectionPool(100) // http请求超时时间,默认100s .httpConnectTimeout(100) // 异步写入数据时,每次http提交的数据条数,默认50 .batchPutSize(50) // 异步写入数据中,内部有一个队列,默认队列大小20000 .batchPutBufferSize(20000) // 异步写入等待时间,如果距离上一次请求超多300ms,且有数据,则直接提交 .batchPutTimeLimit(300) // 当确认这个client只用于查询时设置,可不创建内部队列从而提高效率 .readonly() // 每批数据提交完成后回调 .batchPutCallBack(new BatchPutHttpResponseCallback.BatchPutCallBack() { @Override public void response(List<Point> points, DetailResult result) { // 在请求完成并且response code成功时回调 } @Override public void responseError(List<Point> points, DetailResult result) { // 在response code失败时回调 } @Override public void failed(List<Point> points, Exception e) { // 在发生错误是回调 } }).config(); } /** * 获取客户端 */ public static OpenTSDBClient getClient() { OpenTSDBClient client = null; try { client = OpenTSDBClientFactory.connect(config); } catch (IOReactorException e) { e.printStackTrace(); } return client; } /** * 插入单个tag数据 */ public static void insertOne(OpenTSDBClient client, String metric, String tagName, String tagValue, Number value) { //获取当前秒 Long timestamp = System.currentTimeMillis() / 1000; //创建数据对象 Point point = Point.metric(metric).tag(tagName, tagValue).value(timestamp, value).build(); //将对象插入数据库 client.put(point); } /** * 插入多个tag数据 */ public static void insertMap(OpenTSDBClient client, String metric, Map<String, String> tags, Number value) { //获取当前秒 Long timestamp = System.currentTimeMillis() / 1000; //创建数据对象 Point point = Point.metric(metric).tag(tags).value(timestamp, value).build(); //将对象插入数据库 client.put(point); } /** * 优雅关闭连接,会等待所有异步操作完成 * * @param client 需要关闭的客户端 */ public static void close(OpenTSDBClient client) { if (client != null) { try { client.gracefulClose(); } catch (IOException e) { e.printStackTrace(); } } } }
-
查询
//获取客户端 OpenTSDBClient client = OpenTsDbUtil.getClient(); //查询条件集合 List<SubQuery.Filter> filterList = new ArrayList<>(); //查询条件 SubQuery.Filter filter = new SubQuery.Filter(); //设置成true, 不设置或设置成false会导致读超时 filter.setGroupBy(Boolean.TRUE); //设置过滤类型 //LiteralOr:等于查询,或查询,类似 SQL 里的 IN 查询; //NotLiteralOr:等于查询,或查询,类似 SQL 里的 NOT IN 查询; //Wildcard:模糊匹配,类似 SQL 里的 like 查询; //Regexp:正则匹配; filter.setType(SubQuery.Filter.FilterType.LITERAL_OR); //设置tag,即查询的条件对象 filter.setTagk("metric_code"); //要查询的tag filter.setFilter("MQ_001|" + "MQ_002"); filterList.add(filter); //查询的时间范围,3m-ago:3秒前到当前数据 Query query = Query.begin("3m-ago") //要查询的库 .sub(SubQuery.metric("3066688880001") //查询的聚合类型 .aggregator(SubQuery.Aggregator.NONE) .filter(filterList) .build()) .build(); // 同步查询 List<QueryResult> resultList = client.query(query);
// 异步查询 client.query(query, new QueryHttpResponseCallback.QueryCallback() { @Override public void response(Query query, List<QueryResult> queryResults) { // 在请求完成并且response code成功时回调 } @Override public void responseError(Query query, HttpException e) { // 在response code失败时回调 } @Override public void failed(Query query, Exception e) { // 在发生错误是回调 } })
-
插入数据
//获取客户端 OpenTSDBClient client = OpenTsDbUtil.getClient(); //获取当前秒 Long timestamp = System.currentTimeMillis() / 1000; //创建数据对象 Point point = Point.metric("point").tag("testTag", "test").value(timestamp, 1.0).build(); //将对象插入数据库 client.put(point); //关闭资源 OpenTsDbUtil.close(client);
-
删除
Query query = Query.begin("7d-ago") .sub(SubQuery.metric("metric.test") .aggregator(SubQuery.Aggregator.NONE) .build()) .build(); client.delete(query);

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)