TDengine数据订阅新手入门避坑指南3/3
TDengine 数据订阅,订阅过滤,订阅安全,常见问题
[目录]
- TDengine 订阅概述
1.1 什么是 TDengine 订阅?
1.2 TDengine 订阅架构
1.3 TDengine 订阅与其他消息队列的对比 - TDengine 订阅实践
2.1 环境准备
2.2 创建和管理订阅
2.3 数据发布和消费 - TDengine 订阅高级应用
3.1 订阅过滤
3.2 订阅安全
3.3 订阅性能优化 - 常见问题
- 参考资料
3.1 订阅过滤
3.1.1 基于 SQL 语句过滤数据
TDengine 的 TMQ 支持在创建消费者时通过 SQL 语句对数据进行过滤。这种方式可以灵活地筛选出符合条件的数据,减少不必要的数据传输和处理。
基于 SQL 语句过滤数据的语法:
CREATE TOPIC [IF NOT EXISTS] topic_name as subquery;
该 SQL 通过 SELECT 语句订阅(包括 SELECT *,或 SELECT ts, c1 等指定查询订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
- 被订阅或用于计算的列或标签不可被删除(ALTER table DROP)、修改(ALTER table MODIFY)。
- 若发生表结构变更,新增的列不出现在结果中。
- 对于 select *,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
示例:
- 基于标签值进行过滤
create topic tp_test3 as select ts,v1 from test.stb1 where t1 =3;
- 基于数据列进行过滤
create topic tp_test4 as select ts,v1 from test.stb1 where v1 > 3;
- 基于数据列进行过滤并赋值
create topic tp_test5 as select ts,case when v1>100 then 100 else v1 end as v1 from test1.stb1;
- 对数据列进行预处理
create topic tp_test6 as select ts,v1*100 as v1 from test1.stb1;
3.1.2 订阅过滤注意事项
- 性能影响: 订阅过滤会增加数据处理的复杂度,可能会影响订阅性能。
- 过滤条件: 过滤条件需要根据实际需求进行设计,避免过于复杂或无效的过滤条件。
- 数据一致性: 订阅过滤可能会导致数据丢失或不一致,需要根据实际场景进行评估。
3.2 订阅安全
3.2.1 认证和授权机制
TDengine 提供了完善的认证和授权机制,保障订阅功能的安全性。
-
认证
TDengine Websocket 支持用户名和密码认证,确保只有授权用户可以访问数据库和订阅功能。 -
授权
TDengine 支持细粒度的权限控制,可以为不同用户分配不同的权限。
- 任意用户都可以在自己拥有读权限的数据库上创建 topic。
- 超级用户 root 可以在任意数据库上创建 topic。
- 每个 topic 的订阅权限都可以被独立授权给任何用户,不管该用户是否拥有该数据库的访问权限。
- 删除 topic 只能由 root 用户或者该 topic 的创建者进行。
- topic 只能由超级用户、topic的创建者或者被显式授予 subscribe 权限的用户订阅。
- 授权语法
GRANT SUBSCRIBE ON topic_name TO user_name; REVOKE SUBSCRIBE ON topic_name FROM user_name;
- 白名单
TDengine 自持 IP 白名单,通过创建可信的 IP 地址列表,将它们作为唯一标识符分配给用户,并且只允许这些 IP 地址访问数据库集群。
增加 IP 白名单的 SQL 如下。
create user test pass password [sysinfo value] [host host_name1[,host_name2]];
alter user test add host host_name1;
查询 IP 白名单的 SQL 如下。
SELECT TEST, ALLOWED_HOST FROM INS_USERS;
SHOW USERS;
删除 IP 白名单的命令如下。
ALTER USER TEST DROP HOST HOST_NAME1
注意:要启用白名单,需要配置集群参数 enableWhiteList = 1
,配置方法详见企业用户手册。
3.2.2 数据加密传输
TDengine 支持 SSL/TLS 加密传输,保障数据在传输过程中的安全性。
- 配置 SSL/TLS: 需要在 TDengine taosAdapter 组件配置SSL证书。配置方法见官方文档:taosAdapter 参考手册
- 启用加密传输: Python Websocket 订阅数据使用 HTTPS 仅需要修改
td.connect.websocket.scheme
"td.connect.websocket.scheme": "wss",
3.2.3 安全配置建议
- 使用强密码: 为 TDengine 数据库设置强密码,并定期更换密码, Websocket 使用
token
进行认证。 - 限制访问权限: 只授予用户必要的权限,避免权限过大。
- 启用加密传输: 在生产环境中,建议启用 SSL/TLS 加密传输。
- 定期安全审计: 定期进行安全审计,发现和修复安全漏洞。
3.3 订阅性能优化
3.3.1 多线程并发
在 TDengine 中,vnode 扮演着至关重要的角色,它作为数据存储、查询以及备份的基本单元,使得 TDengine 订阅主题天然具备分片属性。具体而言,一个主题的分片数量,与该主题所对应数据库(DB)的 vgroups 数量完全一致。
举例来说,在 2.1.4 章节所创建的数据库 test1,其 vgroups 数量设定为 4。那么,基于此数据库所创建的主题 tp_test2,其数据自然就会被划分为 4 个分片。
若期望提升数据的消费速度,一种有效的方式是在代码层面采用多线程机制进行消费。通过合理分配线程资源,能够并行处理更多的数据,从而显著提高消费效率。
Python 示例代码如下:
import time
import logging
import taosws
import multiprocessing
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def consumer_task(consumer_conf, topics, process_id):
try:
consumer_conf["client.id"] = f"client_{process_id}" # 为每个进程设置独立的client.id
consumer = taosws.Consumer(conf=consumer_conf)
# 这里不再等待start_event因为进程的同步机制不同
logging.info(f"Process {process_id}: Subscribed to topics: {topics}")
consumer.subscribe(topics)
logging.info(f"Process {process_id}: Subscribed to topics.")
while True:
try:
# 拉取数据
res = consumer.poll(1) # 超时时间 1 秒
if not res:
continue
for block in res:
for row in block:
logging.info(f"Process {process_id}: Fetched data: {row}")
except Exception as e:
logging.error(f"Process {process_id}: Error while consuming messages: {e}")
time.sleep(5)
except KeyboardInterrupt:
logging.info(f"Process {process_id}: Interrupted by user. Exiting...")
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as e:
logging.critical(f"Process {process_id}: Failed to execute consumer process: {e}")
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
logging.info(f"Process {process_id}: Shutting down consumer process.")
def main():
NUM_PROCESSES = 4
consumer_conf = {
"td.connect.websocket.scheme": "ws",
"group.id": "test_ws_group01",
"client.id": "test_ws_client",
"td.connect.user": "test",
"td.connect.pass": "Tbase_125#",
"td.connect.ip": "10.7.7.14",
"td.connect.port": "6041",
"auto.commit.interval.ms": "1000",
"enable.auto.commit": "true",
}
topics = ["tp_test2"]
processes = []
for i in range(NUM_PROCESSES):
process = multiprocessing.Process(target=consumer_task, args=(consumer_conf, topics, i))
processes.append(process)
process.start()
for process in processes:
process.join()
if __name__ == "__main__":
main()
3.3.2 多订阅端
在系统架构设计中,采用多个订阅段对同一个主题进行订阅,并且设定为同一个消费组,这样做具有显著优势。一方面,它能够有效实现负载均衡,让系统资源得到更合理的分配与利用;另一方面,还能极大地增强系统的可用性,确保在部分组件出现故障时,系统依然能够稳定运行。
例如,当我们同时启用两个应用对主题 tp_test2 进行订阅,并且为每个应用都配置两个线程时,该主题数据的 4 个分片就会分别由这两个应用进行订阅。在此情况下,如果其中一个应用出现异常状况,TDengine 会自动触发重新平衡机制,使得剩余的应用能够接管原本由异常应用负责的所有数据,从而维持系统数据处理的连续性。
不过,在实际操作过程中,需要特别留意以下几点:
- 消费组配置:多个应用必须配置为同一个消费组,即确保 group.id 一致。这是保证消息能够在同一消费组内各应用间合理分配的关键设置。
- 客户端标识配置:每个应用都应配置独立的 client.id。这有助于在系统中准确区分不同的应用实例,避免因标识重复而引发的冲突与错误。
- 线程数限制:所有应用的线程数总和务必不能大于主题的分片数量。若超出这一限制,可能会导致部分线程无法获取到有效的数据分片,进而影响系统整体性能与数据处理的准确性。
4. 常见问题
- 订阅报错 “Insufficient privilege for operation”
权限错误,使用的用户无该主题的订阅权限,需要进行授权。
GRANT SUBSCRIBE ON topic_name TO user_name;
查询是否已授权:
select * from information_schema.ins_user_privileges where user_name='USERNAME';
-
注意
-
用户可以在拥有读权限的库上创建主题,并自动拥有该主题的订阅权限,但此权限不会显示授权表中。
-
很遗憾的是,目前无法查询到主题的创建者,因此无法提前判断用户是否能订阅某主题。
- 订阅不到数据,或订阅的数据不全
TDengine 的订阅是基于 WAL(预写日志),WAL 有保存时间(WAL_RETENTION_PERIOD)。 超过保存时间的 WAL 会被删除,这部分数据无法订阅到。
如果数据没有进入 WAL,也无法被订阅到。
如果多个应用端使用同一个 group_id
订阅同一个主题,那么主题数据会按照 vnode 数量进行评分,每个应用端仅会获取到一部分数据。
- 订阅数据有延迟
TDengine 数据订阅延迟通常在毫秒范围。如果发现订阅数据有延迟,可能是由于如下原因:
- 网络带宽不足
- TDengine 服务端负载过高
- 客户端处理速度慢
- 订阅到重复数据
客户端订阅数据进度基于offset
。 如果客户端没有及时提交 offset
,又异常停止,那么下次启动时大概率会消费到一部分重复数据。
如果不想频繁去提交 offset
,可以设置自动提交+自动提交间隔,让客户端自动完成。
- 订阅报错 “Consumer is not safe for multi-threaded access”
通常出现在多线程并发场景,出现原因为订阅跟消费不在同一个线程,建议放到同一个线程中。
- 订阅报错 “failed to complete the task:{} within the specified time : 10000,MILLISECONDS”
在创建连接时,会设置连接超时时间;同样在创建消费者时也会创建消费超时时间。
如果连接超时时间小于消费超时时间,那么会造成消费进程的异常中断。
5. 参考资料
- TDengine 官方文档: https://www.taosdata.com/cn/documentation/
- 集群部署: https://docs.taosdata.com/operation/deployment/
- 开发指南:https://docs.taosdata.com/develop/connect/
- 数据订阅:https://docs.taosdata.com/advanced/subscription/
- 存储引擎:https://docs.taosdata.com/tdinternal/storage
- Python 连接器:https://docs.taosdata.com/reference/connector/python/

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)