[目录]

  1. TDengine 订阅概述
    1.1 什么是 TDengine 订阅?
    1.2 TDengine 订阅架构
    1.3 TDengine 订阅与其他消息队列的对比
  2. TDengine 订阅实践
    2.1 环境准备
    2.2 创建和管理订阅
    2.3 数据发布和消费
  3. TDengine 订阅高级应用
    3.1 订阅过滤
    3.2 订阅安全
    3.3 订阅性能优化
  4. 常见问题
  5. 参考资料

3.1 订阅过滤

3.1.1 基于 SQL 语句过滤数据

TDengine 的 TMQ 支持在创建消费者时通过 SQL 语句对数据进行过滤。这种方式可以灵活地筛选出符合条件的数据,减少不必要的数据传输和处理。

基于 SQL 语句过滤数据的语法:

CREATE TOPIC [IF NOT EXISTS] topic_name as subquery;

该 SQL 通过 SELECT 语句订阅(包括 SELECT *,或 SELECT ts, c1 等指定查询订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:

  • 该类型 TOPIC 一旦创建则订阅数据的结构确定。
  • 被订阅或用于计算的列或标签不可被删除(ALTER table DROP)、修改(ALTER table MODIFY)。
  • 若发生表结构变更,新增的列不出现在结果中。
  • 对于 select *,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)

示例:

  1. 基于标签值进行过滤
create topic tp_test3 as select ts,v1 from test.stb1 where t1 =3;
  1. 基于数据列进行过滤
create topic tp_test4 as select ts,v1 from test.stb1 where v1 > 3;
  1. 基于数据列进行过滤并赋值
create topic tp_test5 as select ts,case when v1>100 then 100 else v1 end as v1 from test1.stb1;
  1. 对数据列进行预处理
create topic tp_test6 as select ts,v1*100 as v1 from test1.stb1;
3.1.2 订阅过滤注意事项
  • 性能影响: 订阅过滤会增加数据处理的复杂度,可能会影响订阅性能。
  • 过滤条件: 过滤条件需要根据实际需求进行设计,避免过于复杂或无效的过滤条件。
  • 数据一致性: 订阅过滤可能会导致数据丢失或不一致,需要根据实际场景进行评估。

3.2 订阅安全

3.2.1 认证和授权机制

TDengine 提供了完善的认证和授权机制,保障订阅功能的安全性。

  1. 认证
    TDengine Websocket 支持用户名和密码认证,确保只有授权用户可以访问数据库和订阅功能。

  2. 授权

TDengine 支持细粒度的权限控制,可以为不同用户分配不同的权限。

  • 任意用户都可以在自己拥有读权限的数据库上创建 topic。
  • 超级用户 root 可以在任意数据库上创建 topic。
  • 每个 topic 的订阅权限都可以被独立授权给任何用户,不管该用户是否拥有该数据库的访问权限。
  • 删除 topic 只能由 root 用户或者该 topic 的创建者进行。
  • topic 只能由超级用户、topic的创建者或者被显式授予 subscribe 权限的用户订阅。
  • 授权语法
    GRANT SUBSCRIBE ON topic_name TO user_name;
    
    REVOKE SUBSCRIBE ON topic_name FROM user_name;
    
  1. 白名单
    TDengine 自持 IP 白名单,通过创建可信的 IP 地址列表,将它们作为唯一标识符分配给用户,并且只允许这些 IP 地址访问数据库集群。

增加 IP 白名单的 SQL 如下。

create user test pass password [sysinfo value] [host host_name1[,host_name2]]; 
alter user test add host host_name1;

查询 IP 白名单的 SQL 如下。

SELECT TEST, ALLOWED_HOST FROM INS_USERS;
SHOW USERS;

删除 IP 白名单的命令如下。

ALTER USER TEST DROP HOST HOST_NAME1

注意:要启用白名单,需要配置集群参数 enableWhiteList = 1,配置方法详见企业用户手册。

3.2.2 数据加密传输

TDengine 支持 SSL/TLS 加密传输,保障数据在传输过程中的安全性。

  • 配置 SSL/TLS: 需要在 TDengine taosAdapter 组件配置SSL证书。配置方法见官方文档:taosAdapter 参考手册
  • 启用加密传输: Python Websocket 订阅数据使用 HTTPS 仅需要修改 td.connect.websocket.scheme
"td.connect.websocket.scheme": "wss",
3.2.3 安全配置建议
  • 使用强密码: 为 TDengine 数据库设置强密码,并定期更换密码, Websocket 使用 token 进行认证。
  • 限制访问权限: 只授予用户必要的权限,避免权限过大。
  • 启用加密传输: 在生产环境中,建议启用 SSL/TLS 加密传输。
  • 定期安全审计: 定期进行安全审计,发现和修复安全漏洞。

3.3 订阅性能优化

3.3.1 多线程并发

在 TDengine 中,vnode 扮演着至关重要的角色,它作为数据存储、查询以及备份的基本单元,使得 TDengine 订阅主题天然具备分片属性。具体而言,一个主题的分片数量,与该主题所对应数据库(DB)的 vgroups 数量完全一致。
举例来说,在 2.1.4 章节所创建的数据库 test1,其 vgroups 数量设定为 4。那么,基于此数据库所创建的主题 tp_test2,其数据自然就会被划分为 4 个分片。
若期望提升数据的消费速度,一种有效的方式是在代码层面采用多线程机制进行消费。通过合理分配线程资源,能够并行处理更多的数据,从而显著提高消费效率。

Python 示例代码如下:

import time
import logging
import taosws
import multiprocessing

# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

def consumer_task(consumer_conf, topics, process_id):
    try:
        consumer_conf["client.id"] = f"client_{process_id}"  # 为每个进程设置独立的client.id
        consumer = taosws.Consumer(conf=consumer_conf)

        # 这里不再等待start_event因为进程的同步机制不同
        logging.info(f"Process {process_id}: Subscribed to topics: {topics}")

        consumer.subscribe(topics)
        logging.info(f"Process {process_id}: Subscribed to topics.")

        while True:
            try:
                # 拉取数据
                res = consumer.poll(1)  # 超时时间 1 秒
                if not res:
                    continue
                for block in res:
                    for row in block:
                        logging.info(f"Process {process_id}: Fetched data: {row}")

            except Exception as e:
                logging.error(f"Process {process_id}: Error while consuming messages: {e}")
                time.sleep(5)
    except KeyboardInterrupt:
        logging.info(f"Process {process_id}: Interrupted by user. Exiting...")
        consumer.unsubscribe()
        print("Consumer unsubscribed successfully.");
    except Exception as e:
        logging.critical(f"Process {process_id}: Failed to execute consumer process: {e}")
    finally:
        if consumer:
            consumer.close()
            print("Consumer closed successfully."); 
        logging.info(f"Process {process_id}: Shutting down consumer process.")

def main():
    NUM_PROCESSES = 4

    consumer_conf = {
        "td.connect.websocket.scheme": "ws",
        "group.id": "test_ws_group01",
        "client.id": "test_ws_client",
        "td.connect.user": "test",
        "td.connect.pass": "Tbase_125#",
        "td.connect.ip": "10.7.7.14",
        "td.connect.port": "6041",
        "auto.commit.interval.ms": "1000",
        "enable.auto.commit": "true",
    }

    topics = ["tp_test2"]

    processes = []

    for i in range(NUM_PROCESSES):
        process = multiprocessing.Process(target=consumer_task, args=(consumer_conf, topics, i))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

if __name__ == "__main__":
    main()
3.3.2 多订阅端

在系统架构设计中,采用多个订阅段对同一个主题进行订阅,并且设定为同一个消费组,这样做具有显著优势。一方面,它能够有效实现负载均衡,让系统资源得到更合理的分配与利用;另一方面,还能极大地增强系统的可用性,确保在部分组件出现故障时,系统依然能够稳定运行。

例如,当我们同时启用两个应用对主题 tp_test2 进行订阅,并且为每个应用都配置两个线程时,该主题数据的 4 个分片就会分别由这两个应用进行订阅。在此情况下,如果其中一个应用出现异常状况,TDengine 会自动触发重新平衡机制,使得剩余的应用能够接管原本由异常应用负责的所有数据,从而维持系统数据处理的连续性。

不过,在实际操作过程中,需要特别留意以下几点:

  1. 消费组配置:多个应用必须配置为同一个消费组,即确保 group.id 一致。这是保证消息能够在同一消费组内各应用间合理分配的关键设置。
  2. 客户端标识配置:每个应用都应配置独立的 client.id。这有助于在系统中准确区分不同的应用实例,避免因标识重复而引发的冲突与错误。
  3. 线程数限制:所有应用的线程数总和务必不能大于主题的分片数量。若超出这一限制,可能会导致部分线程无法获取到有效的数据分片,进而影响系统整体性能与数据处理的准确性。

4. 常见问题

  1. 订阅报错 “Insufficient privilege for operation”

权限错误,使用的用户无该主题的订阅权限,需要进行授权。

GRANT SUBSCRIBE ON topic_name TO user_name;

查询是否已授权:

select * from information_schema.ins_user_privileges where user_name='USERNAME';
  • 注意

  • 用户可以在拥有读权限的库上创建主题,并自动拥有该主题的订阅权限,但此权限不会显示授权表中。

  • 很遗憾的是,目前无法查询到主题的创建者,因此无法提前判断用户是否能订阅某主题。

  1. 订阅不到数据,或订阅的数据不全

TDengine 的订阅是基于 WAL(预写日志),WAL 有保存时间(WAL_RETENTION_PERIOD)。 超过保存时间的 WAL 会被删除,这部分数据无法订阅到。

如果数据没有进入 WAL,也无法被订阅到。

如果多个应用端使用同一个 group_id 订阅同一个主题,那么主题数据会按照 vnode 数量进行评分,每个应用端仅会获取到一部分数据。

  1. 订阅数据有延迟

TDengine 数据订阅延迟通常在毫秒范围。如果发现订阅数据有延迟,可能是由于如下原因:

  • 网络带宽不足
  • TDengine 服务端负载过高
  • 客户端处理速度慢
  1. 订阅到重复数据

客户端订阅数据进度基于offset。 如果客户端没有及时提交 offset,又异常停止,那么下次启动时大概率会消费到一部分重复数据。

如果不想频繁去提交 offset,可以设置自动提交+自动提交间隔,让客户端自动完成。

  1. 订阅报错 “Consumer is not safe for multi-threaded access”

通常出现在多线程并发场景,出现原因为订阅跟消费不在同一个线程,建议放到同一个线程中。

  1. 订阅报错 “failed to complete the task:{} within the specified time : 10000,MILLISECONDS”

在创建连接时,会设置连接超时时间;同样在创建消费者时也会创建消费超时时间。

如果连接超时时间小于消费超时时间,那么会造成消费进程的异常中断。

5. 参考资料

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐