前言


ROS(Robot Operating System)作为机器人领域最流行的开源软件框架,其核心通信机制是学习ROS的必经之路。 话题(Topic)通信 是ROS中最基础也是最常用的通信方式,采用经典的 发布-订阅(Publish-Subscribe)模式 ,实现了节点间的松耦合异步通信。

本文将通过一个完整的实战项目,深入讲解ROS话题通信的原理、实现方法和最佳实践,包含 标准消息类型 和 自定义消息类型 的完整代码实现。

 一、ROS话题通信原理


1.1 核心概念

1.2 工作机制

通信流程 :

1. 发布者通过 ros::advertise() 向ROS Master注册话题
2. 订阅者通过 ros::subscribe() 向ROS Master注册订阅
3. ROS Master建立发布者和订阅者之间的连接
4. 发布者通过 publish() 方法发布消息
5. 订阅者通过回调函数接收并处理消息


1.3 特点优势


- 异步通信 :发布者和订阅者无需同步等待
- 松耦合 :发布者和订阅者互相独立,无需知道对方存在
- 多对多 :一个话题可由多个节点发布,也可被多个节点订阅
- 实时性 :适合周期性数据传输,如传感器数据

二、项目环境搭建


 2.1 系统环境

2.2 创建工作空间

# 创建目录结构
mkdir -p ~/catkin_ws/src
cd ~/catkin_ws/src

# 初始化工作空间
catkin_init_workspace

# 编译工作空间
cd ~/catkin_ws
catkin_make

# 配置环境变量
source devel/setup.bash

2.3 创建功能包

cd ~/catkin_ws/src
catkin_create_pkg topic_demo std_msgs rospy roscpp message_generation
cd ~/catkin_ws
catkin_make
source devel/setup.bash

功能包依赖说明 :

- std_msgs :标准消息类型库
- rospy :Python ROS客户端库
- roscpp :C++ ROS客户端库
- message_generation :自定义消息生成工具

三、自定义消息类型


3.1 创建消息文件


在 topic_demo/msg/ 目录下创建自定义消息:
 

3.2 配置package.xml

<?xml version="1.0"?>
<package format="2">
  <name>topic_demo</name>
  <version>1.0.0</version>
  <description>ROS Topic Communication Demonstration</description>
  <maintainer email="student@robotics.edu">24级人工智能2班</maintainer>
  <license>BSD</license>
  
  <buildtool_depend>catkin</buildtool_depend>
  <build_depend>roscpp</build_depend>
  <build_depend>rospy</build_depend>
  <build_depend>std_msgs</build_depend>
  <build_depend>message_generation</build_depend>
  
  <exec_depend>roscpp</exec_depend>
  <exec_depend>rospy</exec_depend>
  <exec_depend>std_msgs</exec_depend>
  <exec_depend>message_runtime</exec_depend>
</package>

3.3 配置CMakeLists.txt

cmake_minimum_required(VERSION 3.0.2)
project(topic_demo)

# 查找依赖包
find_package(catkin REQUIRED COMPONENTS
  roscpp
  rospy
  std_msgs
  message_generation
)

# 添加自定义消息文件
add_message_files(
  FILES
  Person.msg
  RobotState.msg
)

# 生成消息
generate_messages(
  DEPENDENCIES
  std_msgs
)

# 配置catkin包
catkin_package(
  CATKIN_DEPENDS roscpp rospy std_msgs message_runtime
)

# 包含目录
include_directories(
  ${catkin_INCLUDE_DIRS}
)

# 编译C++节点
add_executable(talker src/talker.cpp)
target_link_libraries(talker ${catkin_LIBRARIES})
add_dependencies(talker ${${PROJECT_NAME}_EXPORTED_TARGETS})

add_executable(listener src/listener.cpp)
target_link_libraries(listener ${catkin_LIBRARIES})
add_dependencies(listener ${${PROJECT_NAME}_EXPORTED_TARGETS})

3.4 编译消息

cd ~/catkin_ws
catkin_make
source devel/setup.bash

编译成功后,系统会自动生成消息的头文件和Python模块:

- C++头文件: devel/include/topic_demo/Person.h 、 devel/include/topic_demo/RobotState.h
- Python模块: devel/lib/python3/dist-packages/topic_demo/msg/__init__.py

四、C++实现


4.1 发布者节点(talker.cpp)

#include "ros/ros.h"
#include "std_msgs/String.h"
#include "std_msgs/Int32.h"
#include "std_msgs/Float64.h"
#include "topic_demo/Person.h"
#include "topic_demo/RobotState.h"
#include <sstream>

int main(int argc, char **argv)
{
    // 初始化节点,节点名为"talker"
    ros::init(argc, argv, "talker");
    
    // 创建节点句柄
    ros::NodeHandle n;

    // 创建发布者,向5个话题发布消息
    ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);
    ros::Publisher counter_pub = n.advertise<std_msgs::Int32>("counter", 1000);
    ros::Publisher temperature_pub = n.advertise<std_msgs::Float64>("temperature", 1000);
    ros::Publisher person_pub = n.advertise<topic_demo::Person>("person_info", 1000);
    ros::Publisher robot_state_pub = n.advertise<topic_demo::RobotState>("robot_state", 1000);

    // 设置发布频率为10Hz
    ros::Rate loop_rate(10);
    
    int count = 0;

    // 主循环,持续发布消息
    while (ros::ok())
    {
        // 1. 发布字符串消息
        std_msgs::String str_msg;
        std::stringstream ss;
        ss << "Hello ROS! Message #" << count;
        str_msg.data = ss.str();
        ROS_INFO("[发布] chatter: %s", str_msg.data.c_str());
        chatter_pub.publish(str_msg);

        // 2. 发布整数计数器
        std_msgs::Int32 int_msg;
        int_msg.data = count;
        ROS_INFO("[发布] counter: %d", int_msg.data);
        counter_pub.publish(int_msg);

        // 3. 发布模拟温度数据(正弦波动)
        std_msgs::Float64 float_msg;
        float_msg.data = 25.0 + sin(count * 0.5) * 5.0;
        ROS_INFO("[发布] temperature: %.2f", float_msg.data);
        temperature_pub.publish(float_msg);

        // 4. 发布自定义Person消息
        topic_demo::Person person_msg;
        person_msg.name = "Student_" + std::to_string(count % 10);
        person_msg.age = 18 + (count % 5);
        person_msg.height = 1.70 + (count % 10) * 0.02;
        person_msg.address = "Room_" + std::to_string(count % 100);
        person_msg.gender = count % 2;
        person_msg.is_student = true;
        ROS_INFO("[发布] person: name=%s, age=%d, height=%.2f", 
                 person_msg.name.c_str(), person_msg.age, person_msg.height);
        person_pub.publish(person_msg);

        // 5. 发布自定义RobotState消息
        topic_demo::RobotState robot_msg;
        robot_msg.x = count * 0.1;
        robot_msg.y = count * 0.05;
        robot_msg.theta = count * 0.1;
        robot_msg.linear_velocity = 0.5;
        robot_msg.angular_velocity = 0.2;
        robot_msg.is_moving = true;
        robot_msg.battery_level = "85%";
        ROS_INFO("[发布] robot: x=%.2f, y=%.2f, theta=%.2f", 
                 robot_msg.x, robot_msg.y, robot_msg.theta);
        robot_state_pub.publish(robot_msg);

        // 处理回调
        ros::spinOnce();
        
        // 按频率休眠
        loop_rate.sleep();
        
        ++count;
    }

    return 0;
}

代码解析 :

- ros::init() :初始化ROS节点
- ros::NodeHandle :创建节点句柄,用于管理节点资源
- ros::Publisher :创建发布者对象,参数为话题名和队列大小
- ros::Rate :控制发布频率
- publish() :发布消息
- ros::spinOnce() :处理一次回调队列

4.2 订阅者节点(listener.cpp)

#include "ros/ros.h"
#include "std_msgs/String.h"
#include "std_msgs/Int32.h"
#include "std_msgs/Float64.h"
#include "topic_demo/Person.h"
#include "topic_demo/RobotState.h"

// 字符串消息回调函数
void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
    ROS_INFO("[订阅] chatter: %s", msg->data.c_str());
}

// 整数消息回调函数
void counterCallback(const std_msgs::Int32::ConstPtr& msg)
{
    ROS_INFO("[订阅] counter: %d", msg->data);
}

// 浮点消息回调函数
void temperatureCallback(const std_msgs::Float64::ConstPtr& msg)
{
    ROS_INFO("[订阅] temperature: %.2f", msg->data);
}

// Person消息回调函数
void personCallback(const topic_demo::Person::ConstPtr& msg)
{
    ROS_INFO("[订阅] Person:");
    ROS_INFO("  姓名: %s", msg->name.c_str());
    ROS_INFO("  年龄: %d", msg->age);
    ROS_INFO("  身高: %.2f m", msg->height);
    ROS_INFO("  地址: %s", msg->address.c_str());
    ROS_INFO("  性别: %s", msg->gender == 0 ? "男" : "女");
    ROS_INFO("  学生: %s", msg->is_student ? "是" : "否");
}

// RobotState消息回调函数
void robotStateCallback(const topic_demo::RobotState::ConstPtr& msg)
{
    ROS_INFO("[订阅] RobotState:");
    ROS_INFO("  位置: (%.2f, %.2f)", msg->x, msg->y);
    ROS_INFO("  航向角: %.2f rad", msg->theta);
    ROS_INFO("  线速度: %.2f m/s", msg->linear_velocity);
    ROS_INFO("  角速度: %.2f rad/s", msg->angular_velocity);
    ROS_INFO("  移动中: %s", msg->is_moving ? "是" : "否");
    ROS_INFO("  电池电量: %s", msg->battery_level.c_str());
}

int main(int argc, char **argv)
{
    // 初始化节点,节点名为"listener"
    ros::init(argc, argv, "listener");
    
    // 创建节点句柄
    ros::NodeHandle n;

    // 创建订阅者,订阅5个话题
    ros::Subscriber sub_chatter = n.subscribe("chatter", 1000, chatterCallback);
    ros::Subscriber sub_counter = n.subscribe("counter", 1000, counterCallback);
    ros::Subscriber sub_temp = n.subscribe("temperature", 1000, temperatureCallback);
    ros::Subscriber sub_person = n.subscribe("person_info", 1000, personCallback);
    ros::Subscriber sub_robot = n.subscribe("robot_state", 1000, robotStateCallback);

    ROS_INFO("订阅者节点已启动,等待接收消息...");

    // 进入事件循环,持续处理回调
    ros::spin();

    return 0;
}

代码解析 :

- ros::Subscriber :创建订阅者对象,参数为话题名、队列大小和回调函数
- 回调函数:当收到消息时自动调用,参数为消息的智能指针
- ros::spin() :进入事件循环,持续处理回调,直到节点关闭

五、Python实现


 5.1 发布者节点(talker.py)

#!/usr/bin/env python3
import rospy
import math
from std_msgs.msg import String, Int32, Float64
from topic_demo.msg import Person, RobotState

def talker():
    # 初始化节点
    rospy.init_node('talker', anonymous=True)
    
    # 创建发布者
    pub_chatter = rospy.Publisher('chatter', String, queue_size=1000)
    pub_counter = rospy.Publisher('counter', Int32, queue_size=1000)
    pub_temp = rospy.Publisher('temperature', Float64, queue_size=1000)
    pub_person = rospy.Publisher('person_info', Person, queue_size=1000)
    pub_robot = rospy.Publisher('robot_state', RobotState, queue_size=1000)
    
    # 设置发布频率
    rate = rospy.Rate(10)
    count = 0
    
    rospy.loginfo("Python发布者节点已启动")
    
    while not rospy.is_shutdown():
        # 发布字符串消息
        pub_chatter.publish(String(data=f"Hello ROS! Message #{count}"))
        
        # 发布整数消息
        pub_counter.publish(Int32(data=count))
        
        # 发布浮点消息
        pub_temp.publish(Float64(data=25.0 + math.sin(count * 0.5) * 5.0))
        
        # 发布Person消息
        person = Person()
        person.name = f"Student_{count % 10}"
        person.age = 18 + (count % 5)
        person.height = 1.70 + (count % 10) * 0.02
        person.address = f"Room_{count % 100}"
        person.gender = count % 2
        person.is_student = True
        pub_person.publish(person)
        
        # 发布RobotState消息
        robot = RobotState()
        robot.x = count * 0.1
        robot.y = count * 0.05
        robot.theta = count * 0.1
        robot.linear_velocity = 0.5
        robot.angular_velocity = 0.2
        robot.is_moving = True
        robot.battery_level = "85%"
        pub_robot.publish(robot)
        
        rospy.loginfo(f"已发布第 {count} 组消息")
        rate.sleep()
        count += 1

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        rospy.loginfo("发布者节点被中断")

5.2 订阅者节点(listener.py)

#!/usr/bin/env python3
import rospy
from std_msgs.msg import String, Int32, Float64
from topic_demo.msg import Person, RobotState

def chatter_callback(msg):
    rospy.loginfo(f"[订阅] chatter: {msg.data}")

def counter_callback(msg):
    rospy.loginfo(f"[订阅] counter: {msg.data}")

def temperature_callback(msg):
    rospy.loginfo(f"[订阅] temperature: {msg.data:.2f}")

def person_callback(msg):
    rospy.loginfo("[订阅] Person:")
    rospy.loginfo(f"  姓名: {msg.name}")
    rospy.loginfo(f"  年龄: {msg.age}")
    rospy.loginfo(f"  身高: {msg.height:.2f} m")
    rospy.loginfo(f"  地址: {msg.address}")
    rospy.loginfo(f"  性别: {'男' if msg.gender == 0 else '女'}")
    rospy.loginfo(f"  学生: {'是' if msg.is_student else '否'}")

def robot_state_callback(msg):
    rospy.loginfo("[订阅] RobotState:")
    rospy.loginfo(f"  位置: ({msg.x:.2f}, {msg.y:.2f})")
    rospy.loginfo(f"  航向角: {msg.theta:.2f} rad")
    rospy.loginfo(f"  线速度: {msg.linear_velocity:.2f} m/s")
    rospy.loginfo(f"  角速度: {msg.angular_velocity:.2f} rad/s")
    rospy.loginfo(f"  移动中: {'是' if msg.is_moving else '否'}")
    rospy.loginfo(f"  电池电量: {msg.battery_level}")

def listener():
    rospy.init_node('listener', anonymous=True)
    
    rospy.Subscriber('chatter', String, chatter_callback)
    rospy.Subscriber('counter', Int32, counter_callback)
    rospy.Subscriber('temperature', Float64, temperature_callback)
    rospy.Subscriber('person_info', Person, person_callback)
    rospy.Subscriber('robot_state', RobotState, robot_state_callback)
    
    rospy.loginfo("Python订阅者节点已启动,等待接收消息...")
    rospy.spin()

if __name__ == '__main__':
    try:
        listener()
    except rospy.ROSInterruptException:
        rospy.loginfo("订阅者节点被中断")

六、运行与测试


6.1 编译项目

cd ~/catkin_ws
catkin_make
source devel/setup.bash

6.2 方式一:使用launch文件启动


创建 launch/topic_demo.launch :

<launch>
  <node name="talker" pkg="topic_demo" type="talker" output="screen"/>
  <node name="listener" pkg="topic_demo" type="listener" output="screen"/>
</launch>

启动命令:

roslaunch topic_demo topic_demo.launch

6.3 方式二:分步运行


终端1 - 启动ROS核心 :

roscore

终端2 - 运行C++发布者 :

rosrun topic_demo talker

终端3 - 运行C++订阅者 :

rosrun topic_demo listener

终端4 - 运行Python发布者 :

rosrun topic_demo talker.py

终端5 - 运行Python订阅者 :

rosrun topic_demo listener.py

6.4 运行结果


发布者终端输出:

[ INFO] [1620000000.000000]: [发布] chatter: Hello ROS! Message #0
[ INFO] [1620000000.000001]: [发布] counter: 0
[ INFO] [1620000000.000002]: [发布] temperature: 25.00
[ INFO] [1620000000.000003]: [发布] person: name=Student_0, age=18, height=1.70
[ INFO] [1620000000.000004]: [发布] robot: x=0.00, y=0.00, theta=0.00
[ INFO] [1620000000.100000]: [发布] chatter: Hello ROS! Message #1
[ INFO] [1620000000.100001]: [发布] counter: 1
[ INFO] [1620000000.100002]: [发布] temperature: 27.48
[ INFO] [1620000000.100003]: [发布] person: name=Student_1, age=19, height=1.72
[ INFO] [1620000000.100004]: [发布] robot: x=0.10, y=0.05, theta=0.10
...

订阅者终端输出 :

[ INFO] [1620000000.000000]: 订阅者节点已启动,等待接收消息...
[ INFO] [1620000000.000001]: [订阅] chatter: Hello ROS! Message #0
[ INFO] [1620000000.000002]: [订阅] counter: 0
[ INFO] [1620000000.000003]: [订阅] temperature: 25.00
[ INFO] [1620000000.000004]: [订阅] Person:
[ INFO] [1620000000.000005]:   姓名: Student_0
[ INFO] [1620000000.000006]:   年龄: 18
[ INFO] [1620000000.000007]:   身高: 1.70 m
[ INFO] [1620000000.000008]:   地址: Room_0
[ INFO] [1620000000.000009]:   性别: 男
[ INFO] [1620000000.000010]:   学生: 是
[ INFO] [1620000000.000011]: [订阅] RobotState:
[ INFO] [1620000000.000012]:   位置: (0.00, 0.00)
[ INFO] [1620000000.000013]:   航向角: 0.00 rad
[ INFO] [1620000000.000014]:   线速度: 0.50 m/s
[ INFO] [1620000000.000015]:   角速度: 0.20 rad/s
[ INFO] [1620000000.000016]:   移动中: 是
[ INFO] [1620000000.000017]:   电池电量: 85%
...

七、话题管理工具


7.1 查看所有话题

rostopic list

输出:

/chatter
/counter
/temperature
/person_info
/robot_state
/rosout
/rosout_agg

7.2 查看话题详情

rostopic info /person_info

输出:

Type: topic_demo/Person

Publishers: 
 * /talker (http://localhost:11311/)

Subscribers: 
 * /listener (http://localhost:11311/)

7.3 查看消息内容

rostopic echo /chatter

7.4 查看消息类型定义

rosmsg show topic_demo/Person

输出:

string name
uint8 age
float32 height
string address
uint8 gender
bool is_student

7.5 查看话题发布频率

rostopic hz /chatter

输出:

subscribed to [/chatter]
average rate: 10.001
	min: 0.099s max: 0.101s std dev: 0.00033s window: 10

八、技术要点总结


8.1 C++与Python对比

8.2 最佳实践


1. 消息设计原则 :
   
   - 字段命名清晰,语义明确
   - 避免过深的嵌套结构
   - 优先使用标准消息类型
2. 队列大小设置 :
   
   - 根据发布频率和处理能力设置
   - 过小可能导致消息丢失
   - 过大可能增加内存占用
3. 命名规范 :
   
   - 话题名使用小写字母和下划线
   - 使用命名空间区分功能模块
4. 错误处理 :
   
   - 检查 ros::ok() 状态
   - 添加日志输出便于调试
   - 处理异常情况


8.3 常见问题

九、总结


通过本文的实战项目,我们系统学习了ROS话题通信的完整流程:

1. 原理理解 :掌握了发布-订阅模式的工作机制
2. 消息定义 :学会了自定义消息类型的创建和配置
3. 代码实现 :掌握了C++和Python两种语言的发布者/订阅者编写
4. 调试技巧 :熟悉了rostopic、rosmsg等工具的使用
5. 最佳实践 :了解了话题通信的设计原则和注意事项
话题通信是ROS中最基础也是最重要的通信方式,广泛应用于传感器数据传输、机器人状态发布、控制指令下发等场景。掌握好这一知识点,对于后续学习服务通信、动作通信等高级内容至关重要。

附录:项目文件结构

catkin_ws/
├── src/
│   └── topic_demo/
│       ├── CMakeLists.txt
│       ├── package.xml
│       ├── launch/
│       │   └── topic_demo.launch
│       ├── msg/
│       │   ├── Person.msg
│       │   └── RobotState.msg
│       ├── scripts/
│       │   ├── talker.py
│       │   └── listener.py
│       └── src/
│           ├── talker.cpp
│           └── listener.cpp
├── build/
└── devel/

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐