在高性能计算、自动驾驶与机器人领域,C++ 依然是无可撼动的王者。然而,传统的通信中间件(如 DDS、MQTT C 库)往往在性能开销、配置复杂度或实时性上难以两全。Zenoh(Zero Overhead Network Protocol)作为下一代数据分发协议,以其“零开销”、“多模态通信”和“自适应路由”特性,正在重塑 C++ 生态的通信标准。本文将深入剖析 Zenoh 协议的核心机制,重点推荐官方维护的zenoh-cpp库,并通过完整的 C++17/20 代码示例,展示如何构建一个具备发布/订阅、查询/响应及时间序列存储能力的工业级分布式系统。


一、为什么 C++ 开发者需要关注 Zenoh?

在 C++ 高性能系统中,通信层往往是隐形的瓶颈。

  • DDS (Data Distribution Service):功能强大但极其笨重,XML 配置繁琐,内存占用高,难以嵌入资源受限的边缘设备。
  • MQTT (C/C++ 库):轻量但功能单一,缺乏原生的请求/响应机制,且严重依赖中心化 Broker,难以实现 P2P 直连。
  • ZeroMQ:灵活但需要开发者手动管理拓扑和路由逻辑,缺乏统一的数据模型。

Zenoh 的出现解决了这些痛点:

  1. 极致性能:基于 Rust 内核(通过 C API 暴露),利用零拷贝技术,在 C++ 端实现微秒级延迟。
  2. 统一抽象:一套 API 同时支持 Pub/Sub、Query/Reply(RPC)、Time-Series(历史数据)。
  3. 协议无关:自动在 UDP、TCP、WebSocket、共享内存(SHM)之间切换。对于同一台机器上的进程通信,Zenoh 可自动使用共享内存,完全绕过内核网络栈。
  4. 零配置组网:无需编写复杂的 XML 配置,节点启动即自动发现。

二、核心库推荐:zenoh-cpp

对于 C++ 开发者,我们强烈推荐使用官方维护的zenoh-cpp库。

推荐理由

  • 原生 C++ 风格:基于现代的 zenoh-c (C API) 封装,提供符合 C++17/20 标准的 RAII 资源管理、智能指针、Lambda 表达式支持和异步编程模型 (std::future / asio)。
  • 类型安全:相比纯 C API,C++ 封装提供了更强的类型检查和键表达式(KeyExpr)编译时检查辅助。
  • 活跃维护:由 ZettaScale Technology 直接维护,与 Rust 版 Zenoh 保持同步更新。
  • 跨平台:完美支持 Linux (x86/ARM), Windows, macOS, QNX, VxWorks。

获取方式

  • GitHub

    eclipse-zenoh/zenoh-cpp

  • 包管理:
    • Conanconan install "zenoh-cpp/1.0.0" (版本随时间演进,请查阅最新)
    • vcpkgvcpkg install zenoh-cpp
    • 源码编译: 需先安装 zenoh-c 依赖。

三、Zenoh 协议核心概念速览

在编写代码前,必须理解 Zenoh 的三个核心支柱:

  1. Key Expressions (键表达式)

    • 类似 MQTT 的 Topic,但更强大。支持通配符层级匹配(如 sensor/**)和集合运算。
    • 示例:demo/example/test
  2. Session (会话)

    • 通信的入口点。一个 Session 代表一个节点加入 Zenoh 网络。
    • 它负责自动发现邻居、建立连接、协商传输协议(TCP/UDP/SHM)。
  3. 三大交互模式

    • Pub/Sub (发布/订阅):单向数据流,适用于传感器数据广播。
    • Queryable/Get (查询/响应):双向 RPC 模式,适用于配置读取、状态查询。
    • Liveliness (存活检测):自动感知节点的上线/下线,适用于服务发现。


四、C++ 实战:构建高性能分布式系统

以下示例基于C++17标准,展示 Zenoh 最核心的三大功能。

环境准备

假设已通过 Conan 或 vcpkg 安装好 zenoh-cpp。

# CMakeLists.txt 片段示例find_package(zenohcpp REQUIRED)target_link_libraries(my_app PRIVATE zenohcpp::zenohcpp)

场景 1:高性能发布/订阅 (Pub/Sub)

目标:模拟一个激光雷达节点发布点云数据,另一个节点实时接收。

发布者 (publisher.cpp)

#include <zenoh.hxx>#include <iostream>#include <thread>#include <chrono>#include <vector>#include <cstring>
using namespace std::chrono_literals;
int main() {    // 1. 初始化日志 (可选)    zenoh::init_logger();        std::cout << "[Publisher] 正在打开 Zenoh 会话..." << std::endl;        // 2. 打开会话 (Config 默认会自动探测局域网路由器或尝试 P2P)    // 如果需要在特定端口监听或连接特定路由器,可在此配置 JSON    auto config = zenoh::Config::default();    auto session = zenoh::open(std::move(config)).value();        // 3. 声明键表达式 (KeyExpr)    // "demo/lidar/pointcloud" 是数据的唯一标识    auto keyexpr = zenoh::KeyExpr("demo/lidar/pointcloud");
    // 4. 声明发布者    auto publisher = session.declare_publisher(keyexpr).value();    std::cout << "[Publisher] 已发布主题: " << keyexpr.as_string_view() << std::endl;    std::cout << "[Publisher] 开始发送模拟点云数据 (按 Ctrl+C 停止)..." << std::endl;        uint64_t sequence_id = 0;    while (true) {                // 模拟生成一些二进制数据 (例如 1KB 的点云片段)        std::vector<uint8_t> payload(1024);        std::memset(payload.data(), static_cast<int>(sequence_id % 256), payload.size());
        // 构造带有时间戳和序列号的样本        zenoh::Sample sample(keyexpr, zenoh::Bytes(std::move(payload)));                // 可以在附件中添加元数据,如传感器ID        sample.set_attachment(zenoh::Bytes::from_json(R"({"sensor_id": "lidar_01"})"));                // 5. 发布数据 (put)        // Zenoh 底层会自动选择最优传输路径 (UDP/TCP/SHM)        publisher.put(std::move(sample)).value();        std::cout << "\r[Publisher] 发送帧 #" << sequence_id << std::flush;
        sequence_id++;        std::this_thread::sleep_for(10ms); // 100Hz 频率    }    return 0;}

订阅者 (subscriber.cpp)

#include <zenoh.hxx>#include <iostream>#include <thread>#include <atomic>
std::atomic<bool> running{true};void signal_handler(int) {    running = false;}
int main() {    zenoh::init_logger();        std::cout << "[Subscriber] 正在打开会话并订阅数据..." << std::endl;        auto session = zenoh::open(zenoh::Config::default()).value();    auto keyexpr = zenoh::KeyExpr("demo/lidar/pointcloud");        // 定义数据回调    auto on_sample = [](const zenoh::Sample& sample) {                // 解析数据        const auto& payload = sample.get_payload();        size_t size = payload.get_len();
        // 解析附件 (Metadata)        std::string attachment_str = sample.get_attachment().as_string_view();
        std::cout << "\n[Subscriber] 收到数据!"                   << " 大小: " << size << " bytes"                  << " 附件: " << attachment_str                  << " 时间戳: " << sample.get_timestamp().to_string()                  << std::endl;
        // 在这里处理点云数据...        // 注意:回调在 Zenoh 内部线程执行,耗时操作请移至工作线程    };        // 声明订阅者    auto subscriber = session.declare_subscriber(keyexpr, on_sample).value();        std::cout << "[Subscriber] 等待数据中... (按 Ctrl+C 退出)" << std::endl;        // 主循环保持运行    while (running) {        std::this_thread::sleep_for(100ms);    }        std::cout << "[Subscriber] 正在关闭..." << std::endl;    return 0;}

场景 2:请求/响应模式 (Queryable & Get)

目标:实现一个配置服务。客户端发送“获取配置”请求,服务端返回当前的 JSON 配置。这是 MQTT 难以优雅实现的场景。

服务端 (Queryable - 配置服务器)

#include <zenoh.hxx>#include <iostream>#include <string>
int main() {    zenoh::init_logger();    auto session = zenoh::open(zenoh::Config::default()).value();
    // 声明一个 Queryable (可被查询的资源)    auto keyexpr = zenoh::KeyExpr("demo/config/device_01");
    std::string current_config = R"({"mode": "high_performance", "fps": 60})";        auto on_query = [&](const zenoh::Query& query) {        std::cout << "[ConfigServer] 收到查询: " << query.get_selector() << std::endl;
        // 业务逻辑:这里可以动态生成配置或读取文件        // 构造回复        zenoh::Sample reply_sample(keyexpr, zenoh::Bytes::from(current_config));
        // 发送回复        query.reply(keyexpr, std::move(reply_sample)).value();    };        auto queryable = session.declare_queryable(keyexpr, on_query).value();
    std::cout << "[ConfigServer] 服务已启动,监听: " << keyexpr.as_string_view() << std::endl;    std::cout << "等待查询请求..." << std::endl;       while (true) {        std::this_thread::sleep_for(1s);    }}

客户端 (Get - 配置请求者)

#include <zenoh.hxx>#include <iostream>
int main() {    zenoh::init_logger();    auto session = zenoh::open(zenoh::Config::default()).value();
    auto keyexpr = zenoh::KeyExpr("demo/config/device_01");
    std::cout << "[Client] 正在请求配置..." << std::endl;        // 发起 GET 请求    // Zenoh 会向所有匹配该 KeyExpr 的 Queryable 发送请求,并收集回复    auto replies = session.get(keyexpr).value();    int count = 0;    for (const auto& reply : replies) {        if (reply.is_ok()) {            const auto& sample = reply.get_ok();            std::string config_json = sample.get_payload().as_string_view();            std::cout << "[Client] 收到配置 #" << ++count << ": " << config_json << std::endl;        } else {            std::cerr << "[Client] 收到错误: " << reply.get_err().get_message() << std::endl;        }    }        if (count == 0) {        std::cout << "[Client] 未找到任何提供该配置的服务节点。" << std::endl;    }    return 0;}

场景 3:高级特性——共享内存 (Shared Memory, SHM)

这是 Zenoh 的杀手锏。当发布者和订阅者在同一台机器上运行时,Zenoh 可以自动检测到,并直接使用共享内存传递大数据(如图像、点云),零拷贝且不经过内核网络栈。

无需修改上述代码逻辑,只需在 Config 中启用 SHM 插件(通常在 zenoh-cpp 中默认开启或简单配置即可):

// 启用 SHM 的配置示例auto config = zenoh::Config::default();
// 插入 JSON 配置启用 shmconfig.insert_json5("transport/shared_memory/enabled", "true");
auto session = zenoh::open(std::move(config)).value();// 后续 declare_publisher/subscriber 用法完全一致// Zenoh 运行时会自动判断:如果是本地通信,走 SHM;如果是远程,走 TCP/UDP。

五、深度解析:Zenoh 如何在 C++ 中实现“零开销”

RAII 与生命周期管理

zenoh-cpp 充分利用 C++ 的移动语义(Move Semantics)。在上面的 publisher.put(std::move(sample)) 中,大数据负载的所有权被直接转移给 Zenoh 内部缓冲区,避免了不必要的内存复制。

异步与非阻塞

Zenoh 的 API 设计是非阻塞的。declare_publisher 和 put 操作立即返回。底层的网络 IO 由专用的后台线程池处理,确保用户线程(通常是机器人的控制循环)不会被网络抖动阻塞。

多路径传输 (Multipath)

如果在双网卡环境下,Zenoh 可以自动利用多条链路聚合带宽或进行冗余传输,这在 C++ 层面是完全透明的,无需开发者手动实现 socket 绑定逻辑。

动态发现 (Liveliness)

除了数据,Zenoh 还能监控节点的生命周期。你可以订阅 @/*/status 这样的特殊 KeyExpr,实时获知网络中哪些 C++ 节点上线或宕机,从而实现高可用的服务治理。


六、性能调优建议

在实际工业部署中,针对 C++ 应用可进行以下调优:

  1. 调整缓冲区大小:在 Config 中增加 transport/tx/queue_size,以应对突发流量,防止丢包。
  2. 压缩策略:对于文本型配置数据,开启 compression;对于已经压缩的二进制数据(如 JPEG 图像),关闭压缩以节省 CPU。
  3. 优先级队列:Zenoh 支持消息优先级。将控制指令(如“急停”)设置为 RealTime 优先级,将日志数据设置为 Background 优先级,确保关键任务不被阻塞。
zenoh::Encoding encoding = zenoh::Encoding::APPLICATION_JSON;zenoh::Priority priority = zenoh::Priority::REAL_TIME; publisher.put(payload, zenoh::PutOptions::default().encoding(encoding).priority(priority));

七、总结与展望

Zenoh 不仅仅是一个通信库,它是为现代 C++ 分布式系统打造的数据神经系统。

  • 对于自动驾驶,它提供了微秒级的传感器数据同步。
  • 对于工业机器人,它实现了控制器与执行器间的硬实时指令下发。
  • 对于边缘计算网关,它简化了云边端的数据路由,无需编写复杂的转发代码。

随着 zenoh-cpp 的日益成熟和 ROS 2 社区的全面接纳,掌握 Zenoh 已成为 C++ 系统架构师的高阶技能。不要再让过时的通信协议拖累你的算法性能,现在就开始用 Zenoh 重构你的分布式架构吧。

参考资源

  • 官方文档

    zenoh.io/docs

  • C++ GitHub 仓库

    github.com/eclipse-zenoh/zenoh-cpp

更多精彩推荐:

Android开发集

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选从 AIDL 到 HIDL:跨语言 Binder 通信的自动化桥接与零拷贝回调优化全栈指南

C/C++编程精选

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选宏之双刃剑:C/C++ 预处理器宏的威力、陷阱与现代化演进全解

开源工场与工具集

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选nlohmann/json:现代 C++ 开发者的 JSON 神器

MCU内核工坊

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选STM32:嵌入式世界的“瑞士军刀”——深度解析意法半导体32位MCU的架构演进、生态优势与全场景应用

拾光札记簿

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选周末遛娃好去处!黄河之巅畅享亲子欢乐时光

数智星河集

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选被算法盯上的岗位:人工智能优先取代的十大职业深度解析与人类突围路径

Docker 容器

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选Docker 原理及使用注意事项(精要版)

linux开发集

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选零拷贝之王:Linux splice() 全面深度解析与高性能实战指南

青衣染霜华

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选脑机接口:从瘫痪患者的“意念行走”到人类智能的下一次跃迁

QT开发记录-专栏

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选Qt 样式表(QSS)终极指南:打造媲美 Web 的精美原生界面

Web/webassembly技术情报局

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选WebAssembly 全栈透视:从应用开发到底层执行的完整技术链路与核心原理深度解析

数据库开发

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选ARM Linux 下 SQLite3 数据库使用全方位指南

鸿蒙开发全系列教程

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选掌握鸿蒙生态开发利器:ohpm 命令全解析与高效开发实战指南

具身智能时代:机器人开发全栈技术图谱与实战指南(2026版)

青衣霜华渡白鸽,公众号:清荷雅集-墨染优选具身智能时代:机器人开发全栈技术图谱与实战指南(2026版)

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐