DDS(Data Distribution Service)是一种面向实时系统的数据分发中间件标准,它采用发布-订阅模式实现高效、可靠的数据通信。以下是DDS的核心原理和工作机制的详细解析:

一、DDS架构概述

DDS采用去中心化的分布式架构,主要包含以下核心组件:

  1. Domain(域)

    • 通信的基本隔离单元,不同域的参与者无法直接通信

    • 通过Domain ID区分(通常为0-232范围内的整数)

  2. DomainParticipant(域参与者)

    • 应用程序接入DDS网络的入口点

    • 每个参与者可以包含多个发布者和订阅者

  3. Topic(主题)

    • 数据分类的基本单位,由名称和数据类型定义

    • 例如:"TemperatureSensorData"主题可能包含温度值和时间戳

  4. Publisher(发布者)/Subscriber(订阅者)

    • 发布者负责发送数据,订阅者接收感兴趣的数据

    • 支持一对多、多对多的通信模式

  5. DataWriter(数据写入器)/DataReader(数据读取器)

    • 实际执行数据读写的端点

    • 一个发布者可以包含多个DataWriter,一个订阅者可以包含多个DataReader

二、核心通信机制

1. 发现协议(Discovery Protocol)

DDS采用自动发现机制,无需中央服务器:

  • 参与者发现:新加入的DomainParticipant会广播自身信息

  • 端点发现:DataWriter和DataReader相互发现匹配的通信对端

  • 基于UDP多播:默认使用多播实现高效发现(也可配置为单播)

发现过程示例:

2. 数据分发模型

DDS提供丰富的QoS(服务质量)策略控制数据传输:

QoS策略 说明 典型配置
可靠性(Reliability) BEST_EFFORT(尽力而为)或RELIABLE(可靠) 关键数据用RELIABLE
持久性(Durability) VOLATILE(易失)/TRANSIENT_LOCAL(临时本地)/PERSISTENT(持久) 新订阅者获取历史数据用TRANSIENT_LOCAL
截止时间(Deadline) 数据发布的周期约束 设置预期更新频率
生命周期(Liveliness) 检测参与者活跃状态 AUTOMATIC(自动)/MANUAL_BY_PARTICIPANT(手动)
历史记录(History) 控制缓存的数据量 KEEP_LAST(保留最新N个)/KEEP_ALL(保留全部)

3. 数据流处理流程

  1. 发布端

    • 应用调用DataWriter.write()

    • DDS序列化数据并放入发送队列

    • 根据QoS策略选择传输方式(UDP/TCP/共享内存等)  

    • 执行流量控制和拥塞避免

  2. 接收端

    • 网络层接收数据包

    • 反序列化并验证数据完整性

    • 根据订阅条件和QoS过滤数据

    • 将有效数据放入接收队列

    • 通知应用程序通过DataReader.read()获取数据

 传输方式性能对比
传输方式 延迟(μs) 吞吐量(Gbps) 可靠性 适用场景
共享内存 0.1-1 10+ 可靠 同主机IPC
UDP多播 10-100 1-10 可选 局域网广播
UDP单播 50-200 0.1-1 可选 点对点实时
TCP 1000+ 0.1-0.5 强制可靠 广域网通信
DTLS 200-500 0.05-0.2 可靠加密 安全传输

  传输选择决策树

 

如果DDS是在同一台系统/设备内使用的话,最优先选择的传输方式就是共享内存(Shared Memory Transport)

为什么系统内部推荐用共享内存?

  • 延迟最低

    • 内存级访问速度,几微秒(μs)级别延迟。

    • 比走Loopback TCP/IP(127.0.0.1)快很多倍。

  • 吞吐量最高

    • 直接在物理内存里搬数据。

    • 不需要真正的socket堆栈、不需要拷贝网络包。

  • 资源占用更少

    • 不走协议栈,不要占用TCP/IP的系统buffer。

    • CPU开销也比socket要小很多。

  • 内存复制最小化(Zero Copy)(不同DDS厂商实现细节不同,但趋势一致):

    • 有些DDS实现能做到发布者写一次订阅者直接读(或者通过小拷贝+指针切换实现)。

各大主流DDS实现都怎么做?

DDS实现 系统内部默认行为 说明
RTI Connext DDS 自动优先用共享内存,fallback到UDP/TCP 共享内存叫"shmem transport"
Fast DDS (eProsima) 有Shared Memory Transport模块(默认配置要启用) 需要配置开启
Cyclone DDS 默认开启共享内存支持 自动内部选择
OpenDDS 支持共享内存传输 需要编译启用SharedMemory transport


 

三、关键技术特性

1. 实时数据分发

  • 零拷贝架构:通过共享内存减少数据复制

  • 低延迟传输:典型延迟在微秒级

  • 确定性传输:支持时间触发通信模式

2. 动态发现与匹配

  • 基于内容过滤:使用SQL-like语法订阅特定数据

// 示例:只接收温度>30度的数据
ContentFilteredTopic cft = subscriber.create_contentfilteredtopic(
    "HighTemp", temperatureTopic, "value > 30");
  • 主题别名:允许动态重定向数据流 

3. 容错机制

  • 心跳检测:通过Liveliness监控参与者状态

  • 冗余网络:支持多网卡冗余传输

  • 故障切换:快速检测和恢复机制

四、DDS与ROS2的集成

ROS2采用DDS作为底层通信中间件,关键集成点:

  1. RMW层(ROS MiddleWare Interface)

    • 抽象层,支持多种DDS实现(Fast DDS、Cyclone DDS、RTI Connext等)

    • 提供DDS到ROS消息的转换

  2. Topic映射规则

    • ROS Topic → DDS Topic

    • ROS Node → DDS DomainParticipant

    • ROS Publisher → DDS DataWriter

    • ROS Subscription → DDS DataReader

  3. QoS配置
    ROS2提供预定义的QoS策略集:

# ROS2 QoS配置示例
from rclpy.qos import QoSProfile, QoSReliabilityPolicy

qos = QoSProfile(
    reliability=QoSReliabilityPolicy.RELIABLE,
    depth=10
)
publisher = node.create_publisher(Image, "camera_image", qos)

五、典型DDS实现比较

实现方案 特点 适用场景
RTI Connext DDS 商业版,功能最全,认证齐全 航空、医疗等安全关键领域
Eclipse Cyclone DDS 开源,轻量级,符合DDSI-RTPS标准 嵌入式设备,资源受限系统
eProsima Fast DDS 开源,性能优异,与ROS2深度集成 机器人,科研项目
OpenDDS 开源,基于ACE/TAO框架 传统企业系统

六、性能优化技巧

  • 选择合适的QoS

    • 实时数据使用BEST_EFFORT + 小历史缓存

    • 关键指令使用RELIABLE + 确认机制

  • 调整网络参数

<!-- Fast DDS配置示例 -->
<transport_descriptors>
  <udp transport="udp">
    <non_blocking_send>true</non_blocking_send>
    <maxMessageSize>65536</maxMessageSize>
  </udp>
</transport_descriptors>
  • 利用共享内存

    • 同一主机上的通信优先使用共享内存传输

    • 减少序列化/反序列化开销

DDS的这些特性使其特别适合分布式实时系统,如自动驾驶、工业控制、机器人等对通信质量和实时性要求高的领域。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐