一、介绍

IoTDB 数据同步功能可以将 IoTDB 的数据传输到另一个数据平台,我们将一个数据同步任务称为 Pipe。

一个 Pipe 包含三个子任务(插件):

  • 抽取(Extract)
  • 处理(Process)
  • 发送(Connect)

Pipe 允许用户自定义三个子任务的处理逻辑,通过类似 UDF 的方式处理数据。 在一个 Pipe 中,上述的子任务分别由三种插件执行实现,数据会依次经过这三个插件进行处理:Pipe Extractor 用于抽取数据,Pipe Processor 用于处理数据,Pipe Connector 用于发送数据,最终数据将被发至外部系统。

二、应用场景

Pipe 任务的模型如下:
在这里插入图片描述
描述一个数据同步任务,本质就是描述 Pipe Extractor、Pipe Processor 和 Pipe Connector 插件的属性。用户可以通过 SQL 语句声明式地配置三个子任务的具体属性,通过组合不同的属性,实现灵活的数据 ETL 能力。

利用数据同步功能,可以搭建完整的数据链路来满足端边云同步、异地灾备、读写负载分库等需求。

三、IoTDB 数据库搭建

IoTDB 数据库搭建直通车

四、数据同步

实现 IoTDB A -> IoTDB B 的全量数据同步

启动两个 IoTDB,A(datanode -> 127.0.0.1:6667) B(datanode -> 192.168.178.141:6667)

创建 A -> B 的 Pipe,在 A 上执行

create pipe hui 
with connector (
'connector'='iotdb-thrift-connector',
'connector.ip'='192.168.178.141',
'connector.port'='6667' 
)

CREATE PIPE hui: 创建了一个名为 “hui” 的管道。管道是用于在 IoTDB 中进行数据传输的一个通道。

WITH CONNECTOR (…): 指定管道的连接器配置。连接器指定了管道与数据源或目标的连接方式和属性。

‘connector’ = ‘iotdb-thrift-connector’: 指定了连接器的类型为 “iotdb-thrift-connector”。这意味着管道将使用 IoTDB 的 Thrift 连接器进行连接。

‘connector.ip’ = ‘192.168.178.141’: 指定了连接器的 IP 地址为 “192.168.178.141”。这是目标或源的 IP 地址,表示管道将与该 IP 地址上的数据源或目标进行连接。

‘connector.port’ = ‘6667’: 指定了连接器的端口为 “6667”。这是目标或源的端口号,表示管道将通过该端口与目标或源进行通信。

启动 A -> B 的 Pipe,在 A 上执行
start pipe hui
在这里插入图片描述

向 A 写入数据
INSERT INTO root.db.d(time, m) values (1, 1)

在这里插入图片描述

在 B 检查由 A 同步过来的数据
SELECT ** FROM root
在这里插入图片描述
持续更新中关注不迷糊

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐