TDengine 数据订阅:新手指南
1. 简介
TDengine 数据订阅基本概念,可参考之前写的文章 数据订阅, 本文重点从实操方面介绍数据订阅的使用。
2. TDengine 订阅实践
2.1 环境准备
2.1.1 TDengine 集群搭建
TDengine 订阅功能依赖于 TDengine 数据库,因此首先需要搭建 TDengine 集群。 以下是搭建 TDengine 集群的基本步骤:
- 准备环境:
- 准备一台服务器(生产环境需要至少三台),并确保服务器之间网络互通。
- 在服务器上安装主流 Linux 操作系统(建议安装Ubuntu 20.04+/RHEL 8+/麒麟V10)。
- 每台服务器配置 hostname 及地址解析。
- 下载 TDengine:
- 从 TDengine 官网下载最新版本的 TDengine 安装包。[以下所有操作均在基于TDengine 3.3.5.2版本]
- 安装 TDengine:
- 解压安装包到每台服务器的指定目录,运行目录下
install.sh脚本进行安装。 - 修改配置文件
taos.cfg,配置集群节点信息、数据存储路径等参数。 关键参数(详见官方文档)
- 解压安装包到每台服务器的指定目录,运行目录下
firstEp node1:6030
secondEp node2:6030
fqdn node1
logDir /data/taos/log
dataDir /data/taos/data
tempDir /data/taos/tmp
timezone UTC-8
locale en_US.UTF-8
charset UTF-8
- 启动 TDengine:
- 在每台服务器上执行启动命令
/usr/loca/taos/bin/start-all.sh,启动 TDengine 服务所有组件。
- 在每台服务器上执行启动命令
- 验证集群:
- 使用 TDengine 客户端工具连接到集群,并执行 SQL 语句验证集群是否正常运行。
[root@ ~]# taos
taos> show dnodes;
- 直接执行
taos会使用 root 账号和默认密码(taosdata)登录。 - 使用指定用户和密码登录使用如下命令:
taos -u USERNAME -p'PASSWORD'
- 注意:
-p参数和密码直接没有空格。
具体安装步骤见官方集群部署文档。
2.1.2 客户端工具安装
为了方便操作 TDengine 数据库和订阅功能,建议安装以下客户端工具:
- TDengine CLI: TDengine 命令行工具,用于执行 SQL 语句和管理数据库。
- TDengine 连接器: TDengine 提供各种编程语言的连接器来连接 TDengine 数据库。
2.1.3 Python 连接器安装
Python 连接器需要 Python 3.6.2 以上版本。
- pip 安装: 如果存在旧版本的 Python 连接器,需要先进行卸载。
pip3 uninstall taos taospy
pip3 uninstall taos taos-ws-py
安装最新版本的 Python 连接器。
pip3 install taospy
pip3 install taos-ws-py
- 注意: 如果安装的不是最新版本的 TDengine,那么需要安装对应版本的连接器。版本对应关系见 官方文档-Python连接器。
安装验证,在 Python 交互模式执行:
import taosws
2.1.4 测试数据准备
使用 TDengine CLI 或 WEB 工具连接到 TDengine 数据库。
- 客户端登录需要配置服务端 FQDN 解析和
taos.cfg(配置参数除 fqnd 外其他可与服务端一致)。 - 使用 WEB 工具,直接访问服务端 6060 端口,例:http://IP:6060
- 创建数据库
创建测试用数据库用于数据的存储,下面是示例语句:
CREATE DATABASE `test1` VGROUPS 4 WAL_RETENTION_PERIOD 3600 WAL_RETENTION_SIZE 0 ;
在上述语句中,VGROUPS、WAL_RETENTION_PERIOD、WAL_RETENTION_SIZE 3个参数与订阅息息相关。
- VGROUPS:数据库中初始 vgroup 的数目,订阅时一个 topic 中的一个 vgroup 只能对应一个消费者。
- WAL_RETENTION_PERIOD:为了数据订阅消费,需要 WAL 日志文件额外保留的最大时长策略。WAL 日志清理,不受订阅客户端消费状态影响。单位为 s,默认为 3600,表示在 WAL 保留最近 3600 秒的数据,用户可以根据数据订阅的需要修改这个参数为适当值。
- WAL_RETENTION_SIZE:为了数据订阅消费,需要 WAL 日志文件额外保留的最大累计大小策略。单位为 KB,默认为 0,表示累计大小无上限。
- 创建超级表 创建一个超级表 stb1。
USE test1;
CREATE STABLE `stb1` (`ts` TIMESTAMP, `v1` INT ') TAGS (`t1` INT);
- 创建子表 在超级表 stb1 下创建 5 个子表 t1~t5。
CREATE TABLE `t1` USING `stb1` (`t1`) TAGS (1);
CREATE TABLE `t2` USING `stb1` (`t2`) TAGS (2);
CREATE TABLE `t3` USING `stb1` (`t3`) TAGS (3);
CREATE TABLE `t4` USING `stb1` (`t4`) TAGS (4);
CREATE TABLE `t5` USING `stb1` (`t5`) TAGS (5);
- 写入数据 为了达到测试效果,可以编写一个简单脚本进行持续写入。
vi insert.sh
#!/bin/sh
i=0
while true
do
echo "insert into test1.t1 values(now,$i);" >i.sql
echo "insert into test1.t2 values(now,$i);" >>i.sql
echo "insert into test1.t3 values(now,$i);" >>i.sql
echo "insert into test1.t4 values(now,$i);" >>i.sql
echo "insert into test1.t5 values(now,$i);" >>i.sql
taos -f i.sql
sleep 1
i=$(($i+1))
done
- 创建测试用户 创建一个单独的用户 test, 用于数据订阅,并允许用户从任何 IP 地址登录。
create user test pass 'Tbase_125#' host '0.0.0.0/0';
2.1.5 环境准备注意事项
- 版本兼容性: 确保 TDengine 数据库、客户端工具和 Websocket 客户端库的版本兼容。
- 网络配置: 确保服务器之间网络互通,并开放必要的端口。
- 资源分配: 根据数据量和访问量合理分配服务器资源。
- 安全配置: TDengine 数据库集群内部建议关闭防火墙,如需开启,需要开放相关端口(6030、6041、6043、6060)。
2.2 创建和管理订阅
2.2.1 主题 (Topic)类型
TDengine 支持 3 种主题类型:
- 查询主题 订阅一条 SQL 查询的结果,本质上是连续查询,每次查询仅返回最新值(以下均为此类型主题)。
- 超级表主题 订阅一个超级表所有数据,仅用于 taosx 数据同步和备份。
- 数据库主题 订阅一个DB所有数据,仅用于 taosx 数据同步和备份。
2.2.1 创建查询主题 (Topic)
创建查询主题的步骤如下:
- 连接到 TDengine 数据库:
使用 TDengine CLI 或 WEB 工具连接到 TDengine 数据库。
- 创建主题:
使用 SQL 语句 CREATE TOPIC 创建主题,例如:
CREATE TOPIC tp_test2 as select ts,v1 from test1.stb1;
该语句创建了一个名为 tp_test2 的主题。
- 注意:
- 超级用户 root 可以在任意数据库上创建和订阅 topic。
- 任意用户都可以在自己拥有读权限的数据库上创建 topic。
- 普通用户默认可以订阅自己创建的 topic。
- 订阅授权
为测试用户 test 授予主题 tp_test2 订阅权限。
GRANT SUBSCRIBE ON tp_test2 TO test;
2.2.2 删除主题 (Topic)
如果不再需要订阅数据,可以删除 topic,需要注意正在订阅中的 topic 无法被删除。
DROP TOPIC IF EXISTS tp_test2;
2.2.3 查询主题 (Topic)
查询当前集群存在的主题:
show topics;
查询主题详细信息:
select * from information_schema.ins_topics;
ins_topics 表结构说明:
| # | 列名 | 数据类型 | 说明 |
|---|---|---|---|
| 1 | topic_name | VARCHAR(192) | topic 名称 |
| 2 | db_name | VARCHAR(64) | topic 相关的 DB |
| 3 | create_time | TIMESTAMP | topic 的 创建时间 |
| 4 | sql | VARCHAR(1024) | 创建该 topic 时所用的 SQL 语句 |
示例:
taos> select * from information_schema.ins_topics\G;
*************************** 1.row ***************************
topic_name: tp_test2
db_name: test1
create_time: 2025-02-13 21:24:54.704
sql: create topic tp_test2 as select ts,v1 from test1.stb1;
schema: [{"name":"ts","type":"TIMESTAMP","length":8},{"name":"v1","type":"INT","length":4}]
meta: no
type: column
2.2.4 创建消费者
消费者的创建只能通过 TDengine 客户端驱动或者连接器所提供的 API 创建,以 Python为例:
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": "ws",
"group.id": "test_ws_group01",
"client.id": "test_ws_client01",
"td.connect.user": "test",
"td.connect.pass": "Tbase_125#",
"td.connect.ip": "10.7.7.14",
"td.connect.port": "6041",
"enable.auto.commit": "true", # 自动提交偏移量
}
)
参数说明:
| 参数名称 | 类型 | 参数说明 | 备注 |
|---|---|---|---|
| td.connect.ip | string | 服务端的 FQDN | 可以是ip或者host name |
| td.connect.user | string | 用户名 | |
| td.connect.pass | string | 密码 | |
| td.connect.port | integer | 服务端的端口号 | |
| group.id | string | 消费组 ID,同一消费组共享消费进度 | 必填项。最大长度:192,超长将截断。每个topic最多可建立 100 个 consumer group |
| client.id | string | 客户端 ID | 最大长度:255,超长将截断。 |
| auto.offset.reset | enum | 消费组订阅的初始位置 | earliest: default(version < 3.2.0.0);从头开始订阅;latest: default(version >= 3.2.0.0);仅从最新数据开始订阅;none: 没有提交的 offset 无法订阅 |
| enable.auto.commit | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
| auto.commit.interval.ms | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| msg.with.table.name | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) | 默认关闭 |
| enable.replay | boolean | 是否开启数据回放功能 | 默认关闭 |
| session.timeout.ms | integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除(从3.3.3.0版本开始支持) | 默认值为 12000,取值范围 [6000, 1800000] |
| max.poll.interval.ms | integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线,触发rebalance 逻辑,成功后该 consumer 会被删除(从3.3.3.0版本开始支持) | 默认值为 300000,[1000,INT32_MAX] |
2.2.4 查看消费者
查询当前数据库下所有消费者的信息,会显示消费者的状态,创建时间等信息。
SHOW CONSUMERS;
select * from performance_schema.perf_consumers
以上两个命令输出的信息一致。
| # | 列名 | 数据类型 | 说明 |
|---|---|---|---|
| 1 | consumer_id | BIGINT | 消费者的唯一 ID |
| 2 | consumer_group | BINARY(192) | 消费者组 |
| 3 | client_id | BINARY(192) | 用户自定义字符串,通过创建 consumer 时指定 client_id 来展示 |
| 4 | status | BINARY(20) | 消费者当前状态。消费者状态包括:ready(正常可用)、 lost(连接已丢失)、 rebalancing(消费者所属 vgroup 正在分配中)、unknown(未知状 态) |
| 5 | topics | BINARY(204) | 被订阅的 topic。若订阅多个 topic,则展示为多行 |
| 6 | up_time | TIMESTAMP | 第一次连接 taosd 的时间 |
| 7 | subscribe_time | TIMESTAMP | 上一次发起订阅的时间 |
| 8 | rebalance_time | TIMESTAMP | 上一次触发 rebalance 的时间 |
示例:
taos> select * from performance_schema.perf_consumers\G;
*************************** 1.row ***************************
consumer_id: 0x157f10288388e37d
consumer_group: test_ws_group01
client_id: test_ws_client01
user: test
fqdn: test4
status: ready
topics: tp_test2
up_time: 2025-02-14 11:37:16.738
subscribe_time: 2025-02-14 11:37:16.738
rebalance_time: 2025-02-14 11:37:18.620
parameters: tbname:1,commit:0,interval:5000ms,reset:latest
2.2.5 删除消费组
消费者创建的时候,会给消费者指定一个消费者组,消费者不能显式的删除,但是消费者组在组内没有消费者时可以通过下面语句删除:
DROP CONSUMER GROUP IF EXISTS test_ws_group01 ON tp_test2;
注意 正在消费的消费组无法被删除,而停止消费的消费组无法使用上述语句查看,仅可通过2.2.5中查询数据订阅查询到。
2.2.5 查询数据订阅
通过以下命令可以查询 topic 在不同 vgroup 上的消费信息,可用于查看消费进度。
SHOW SUBSCRIPTIONS;
select * from information_schema.ins_subscriptions
以上两个命令输出的信息相同,表结构描述如下:
| # | 列名 | 数据类型 | 说明 |
|---|---|---|---|
| 1 | topic_name | VARCHAR(204) | 被订阅的 topic |
| 2 | consumer_group | VARCHAR(193) | 订阅者的消费者组 |
| 3 | vgroup_id | INT | 消费者被分配的 vgroup id |
| 4 | consumer_id | BIGINT | 消费组的唯一 id |
| 5 | user | VARCHAR(24) | 消费者的登录的用户名 |
| 6 | fqdn | VARCHAR(128) | 消费者的所在机器的 fqdn |
| 7 | offset | VARCHAR(64) | 消费者的消费进度 |
| 8 | rows | BIGINT | 消费者的消费的数据条数 |
示例:
taos> select * from information_schema.ins_subscriptions where topic_name='tp_test2' and consumer_id is not null\G;
*************************** 1.row ***************************
topic_name: tp_test2
consumer_group: test_ws_group01
vgroup_id: 31
consumer_id: 0x157f102d329911e7
user: test
fqdn: test4
offset: wal:11831/11831
rows: 84
*************************** 2.row ***************************
topic_name: tp_test2
consumer_group: test_ws_group01
vgroup_id: 32
consumer_id: 0x157f102d329911e7
user: test
fqdn: test4
offset: wal:6939/6939
rows: 42
*************************** 3.row ***************************
topic_name: tp_test2
consumer_group: test_ws_group01
vgroup_id: 33
consumer_id: 0x157f102d329911e7
user: test
fqdn: test4
offset: wal:7099/7099
rows: 42
*************************** 4.row ***************************
topic_name: tp_test2
consumer_group: test_ws_group01
vgroup_id: 34
consumer_id: 0x157f102d329911e7
user: test
fqdn: test4
offset: wal:6991/6991
rows: 42
2.3 数据发布和消费
2.3.1 数据发布
TDengine 订阅功能的数据发布是通过预写日志 WAL (Write-Ahead Logging) 文件实现的。当数据写入 TDengine 数据库时,会同时写入 WAL 文件。订阅功能会从 WAL 文件中读取数据,并将其推送给订阅端。
在我们的测试环境中,insert.sh 即为数据发布端。
运行脚本:
sh insert.sh
2.3.2 数据消费
TDengine 支持多种语言和连接方式订阅数据,订阅功能支持列表如下:
| 连接方式 | Java | Python | Go | C# | Node.js | Rust | C/C++ |
|---|---|---|---|---|---|---|---|
| 原生方式 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| Websocket | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
以下为Python 消费示例:
import time
import logging
import taosws
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def main():
try:
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": "ws",
"group.id": "test_ws_group01",
"client.id": "test_ws_client01",
"td.connect.user": "test",
"td.connect.pass": "Tbase_125#",
"td.connect.ip": "10.7.7.14",
"td.connect.port": "6041",
"enable.auto.commit": "true", # 自动提交偏移量
}
)
# 订阅主题
topics = ["tp_test2"]
consumer.subscribe(topics)
logging.info(f"Subscribed to topics: {topics}")
while True:
try:
# 拉取数据
res = consumer.poll(1) # 超时时间 1 秒
if not res:
continue # 没有数据,继续轮询
# 处理数据
for block in res:
for row in block:
logging.info(f"Fetched data: {row}")
except Exception as e:
logging.error(f"Error while consuming messages: {e}")
time.sleep(5) # 出现错误后暂停一段时间重试
except KeyboardInterrupt:
logging.info("Consumer interrupted by user. Exiting...")
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as e:
logging.critical(f"Failed to initialize consumer: {e}")
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
logging.info("Shutting down consumer.")
if __name__ == "__main__":
main()
代码说明:
poll每次调用获取一个消息,一个消息中可能包含多个记录。records包含了多个 block 块, 每个块中可能包含多个记录。- 当消费者读取并处理完消息后,它可以提交 Offset,这表示消费者已经成功处理到这个 Offset 的消息。Offset 提交可以是自动的(根据配置定期提交)或手动的(应用程序控制何时提交)。当创建消费者时,属性 enable.auto.commit 为 false 时,可以手动提交 offset。
- 消费者可以取消对主题的订阅,停止接收消息。当消费者不再需要时,应该关闭消费者实例,以释放资源和断开与 TDengine 服务器的连接。
运行示例(Ctrl+C 中止):
# python3 test_sub_ws.py
2025-02-14 13:45:33,623 - INFO - Subscribed to topics: ['tp_test2']
2025-02-14 13:45:43,649 - INFO - Fetched data: (datetime.datetime(2025, 2, 14, 13, 45, 43, 164000), 0)
2025-02-14 13:45:43,651 - INFO - Fetched data: (datetime.datetime(2025, 2, 14, 13, 45, 43, 166000), 0)
2025-02-14 13:45:43,653 - INFO - Fetched data: (datetime.datetime(2025, 2, 14, 13, 45, 43, 166000), 0)
2025-02-14 13:45:43,655 - INFO - Fetched data: (datetime.datetime(2025, 2, 14, 13, 45, 43, 167000), 0)
2025-02-14 13:45:43,657 - INFO - Fetched data: (datetime.datetime(2025, 2, 14, 13, 45, 43, 165000), 0)
^C
2025-02-14 13:45:47,665 - INFO - Consumer interrupted by user. Exiting...
Consumer unsubscribed successfully.
Consumer closed successfully.
2025-02-14 13:45:47,666 - INFO - Shutting down consumer.
2.3.3 注意事项
- 数据顺序: TDengine 订阅功能保证数据的顺序性,即订阅端接收到的数据顺序与写入顺序一致。
- 数据丢失: TDengine 订阅功能提供至少一次 (at least once) 的数据传输保证,即订阅端可能会收到重复的数据,但不会丢失数据。
- 数据延迟: TDengine 订阅功能的数据延迟取决于网络环境和数据量。
访问官网
更多内容欢迎访问 TDengine 官网
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)