ROS话题通信实战:从原理到完整实现
前言
ROS(Robot Operating System)作为机器人领域最流行的开源软件框架,其核心通信机制是学习ROS的必经之路。 话题(Topic)通信 是ROS中最基础也是最常用的通信方式,采用经典的 发布-订阅(Publish-Subscribe)模式 ,实现了节点间的松耦合异步通信。
本文将通过一个完整的实战项目,深入讲解ROS话题通信的原理、实现方法和最佳实践,包含 标准消息类型 和 自定义消息类型 的完整代码实现。
一、ROS话题通信原理
1.1 核心概念

1.2 工作机制

通信流程 :
1. 发布者通过 ros::advertise() 向ROS Master注册话题
2. 订阅者通过 ros::subscribe() 向ROS Master注册订阅
3. ROS Master建立发布者和订阅者之间的连接
4. 发布者通过 publish() 方法发布消息
5. 订阅者通过回调函数接收并处理消息
1.3 特点优势
- 异步通信 :发布者和订阅者无需同步等待
- 松耦合 :发布者和订阅者互相独立,无需知道对方存在
- 多对多 :一个话题可由多个节点发布,也可被多个节点订阅
- 实时性 :适合周期性数据传输,如传感器数据
二、项目环境搭建
2.1 系统环境

2.2 创建工作空间
# 创建目录结构
mkdir -p ~/catkin_ws/src
cd ~/catkin_ws/src
# 初始化工作空间
catkin_init_workspace
# 编译工作空间
cd ~/catkin_ws
catkin_make
# 配置环境变量
source devel/setup.bash
2.3 创建功能包
cd ~/catkin_ws/src
catkin_create_pkg topic_demo std_msgs rospy roscpp message_generation
cd ~/catkin_ws
catkin_make
source devel/setup.bash
功能包依赖说明 :
- std_msgs :标准消息类型库
- rospy :Python ROS客户端库
- roscpp :C++ ROS客户端库
- message_generation :自定义消息生成工具
三、自定义消息类型
3.1 创建消息文件
在 topic_demo/msg/ 目录下创建自定义消息:

3.2 配置package.xml
<?xml version="1.0"?>
<package format="2">
<name>topic_demo</name>
<version>1.0.0</version>
<description>ROS Topic Communication Demonstration</description>
<maintainer email="student@robotics.edu">24级人工智能2班</maintainer>
<license>BSD</license>
<buildtool_depend>catkin</buildtool_depend>
<build_depend>roscpp</build_depend>
<build_depend>rospy</build_depend>
<build_depend>std_msgs</build_depend>
<build_depend>message_generation</build_depend>
<exec_depend>roscpp</exec_depend>
<exec_depend>rospy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>message_runtime</exec_depend>
</package>
3.3 配置CMakeLists.txt
cmake_minimum_required(VERSION 3.0.2)
project(topic_demo)
# 查找依赖包
find_package(catkin REQUIRED COMPONENTS
roscpp
rospy
std_msgs
message_generation
)
# 添加自定义消息文件
add_message_files(
FILES
Person.msg
RobotState.msg
)
# 生成消息
generate_messages(
DEPENDENCIES
std_msgs
)
# 配置catkin包
catkin_package(
CATKIN_DEPENDS roscpp rospy std_msgs message_runtime
)
# 包含目录
include_directories(
${catkin_INCLUDE_DIRS}
)
# 编译C++节点
add_executable(talker src/talker.cpp)
target_link_libraries(talker ${catkin_LIBRARIES})
add_dependencies(talker ${${PROJECT_NAME}_EXPORTED_TARGETS})
add_executable(listener src/listener.cpp)
target_link_libraries(listener ${catkin_LIBRARIES})
add_dependencies(listener ${${PROJECT_NAME}_EXPORTED_TARGETS})
3.4 编译消息
cd ~/catkin_ws
catkin_make
source devel/setup.bash
编译成功后,系统会自动生成消息的头文件和Python模块:
- C++头文件: devel/include/topic_demo/Person.h 、 devel/include/topic_demo/RobotState.h
- Python模块: devel/lib/python3/dist-packages/topic_demo/msg/__init__.py
四、C++实现
4.1 发布者节点(talker.cpp)
#include "ros/ros.h"
#include "std_msgs/String.h"
#include "std_msgs/Int32.h"
#include "std_msgs/Float64.h"
#include "topic_demo/Person.h"
#include "topic_demo/RobotState.h"
#include <sstream>
int main(int argc, char **argv)
{
// 初始化节点,节点名为"talker"
ros::init(argc, argv, "talker");
// 创建节点句柄
ros::NodeHandle n;
// 创建发布者,向5个话题发布消息
ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);
ros::Publisher counter_pub = n.advertise<std_msgs::Int32>("counter", 1000);
ros::Publisher temperature_pub = n.advertise<std_msgs::Float64>("temperature", 1000);
ros::Publisher person_pub = n.advertise<topic_demo::Person>("person_info", 1000);
ros::Publisher robot_state_pub = n.advertise<topic_demo::RobotState>("robot_state", 1000);
// 设置发布频率为10Hz
ros::Rate loop_rate(10);
int count = 0;
// 主循环,持续发布消息
while (ros::ok())
{
// 1. 发布字符串消息
std_msgs::String str_msg;
std::stringstream ss;
ss << "Hello ROS! Message #" << count;
str_msg.data = ss.str();
ROS_INFO("[发布] chatter: %s", str_msg.data.c_str());
chatter_pub.publish(str_msg);
// 2. 发布整数计数器
std_msgs::Int32 int_msg;
int_msg.data = count;
ROS_INFO("[发布] counter: %d", int_msg.data);
counter_pub.publish(int_msg);
// 3. 发布模拟温度数据(正弦波动)
std_msgs::Float64 float_msg;
float_msg.data = 25.0 + sin(count * 0.5) * 5.0;
ROS_INFO("[发布] temperature: %.2f", float_msg.data);
temperature_pub.publish(float_msg);
// 4. 发布自定义Person消息
topic_demo::Person person_msg;
person_msg.name = "Student_" + std::to_string(count % 10);
person_msg.age = 18 + (count % 5);
person_msg.height = 1.70 + (count % 10) * 0.02;
person_msg.address = "Room_" + std::to_string(count % 100);
person_msg.gender = count % 2;
person_msg.is_student = true;
ROS_INFO("[发布] person: name=%s, age=%d, height=%.2f",
person_msg.name.c_str(), person_msg.age, person_msg.height);
person_pub.publish(person_msg);
// 5. 发布自定义RobotState消息
topic_demo::RobotState robot_msg;
robot_msg.x = count * 0.1;
robot_msg.y = count * 0.05;
robot_msg.theta = count * 0.1;
robot_msg.linear_velocity = 0.5;
robot_msg.angular_velocity = 0.2;
robot_msg.is_moving = true;
robot_msg.battery_level = "85%";
ROS_INFO("[发布] robot: x=%.2f, y=%.2f, theta=%.2f",
robot_msg.x, robot_msg.y, robot_msg.theta);
robot_state_pub.publish(robot_msg);
// 处理回调
ros::spinOnce();
// 按频率休眠
loop_rate.sleep();
++count;
}
return 0;
}
代码解析 :
- ros::init() :初始化ROS节点
- ros::NodeHandle :创建节点句柄,用于管理节点资源
- ros::Publisher :创建发布者对象,参数为话题名和队列大小
- ros::Rate :控制发布频率
- publish() :发布消息
- ros::spinOnce() :处理一次回调队列
4.2 订阅者节点(listener.cpp)
#include "ros/ros.h"
#include "std_msgs/String.h"
#include "std_msgs/Int32.h"
#include "std_msgs/Float64.h"
#include "topic_demo/Person.h"
#include "topic_demo/RobotState.h"
// 字符串消息回调函数
void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO("[订阅] chatter: %s", msg->data.c_str());
}
// 整数消息回调函数
void counterCallback(const std_msgs::Int32::ConstPtr& msg)
{
ROS_INFO("[订阅] counter: %d", msg->data);
}
// 浮点消息回调函数
void temperatureCallback(const std_msgs::Float64::ConstPtr& msg)
{
ROS_INFO("[订阅] temperature: %.2f", msg->data);
}
// Person消息回调函数
void personCallback(const topic_demo::Person::ConstPtr& msg)
{
ROS_INFO("[订阅] Person:");
ROS_INFO(" 姓名: %s", msg->name.c_str());
ROS_INFO(" 年龄: %d", msg->age);
ROS_INFO(" 身高: %.2f m", msg->height);
ROS_INFO(" 地址: %s", msg->address.c_str());
ROS_INFO(" 性别: %s", msg->gender == 0 ? "男" : "女");
ROS_INFO(" 学生: %s", msg->is_student ? "是" : "否");
}
// RobotState消息回调函数
void robotStateCallback(const topic_demo::RobotState::ConstPtr& msg)
{
ROS_INFO("[订阅] RobotState:");
ROS_INFO(" 位置: (%.2f, %.2f)", msg->x, msg->y);
ROS_INFO(" 航向角: %.2f rad", msg->theta);
ROS_INFO(" 线速度: %.2f m/s", msg->linear_velocity);
ROS_INFO(" 角速度: %.2f rad/s", msg->angular_velocity);
ROS_INFO(" 移动中: %s", msg->is_moving ? "是" : "否");
ROS_INFO(" 电池电量: %s", msg->battery_level.c_str());
}
int main(int argc, char **argv)
{
// 初始化节点,节点名为"listener"
ros::init(argc, argv, "listener");
// 创建节点句柄
ros::NodeHandle n;
// 创建订阅者,订阅5个话题
ros::Subscriber sub_chatter = n.subscribe("chatter", 1000, chatterCallback);
ros::Subscriber sub_counter = n.subscribe("counter", 1000, counterCallback);
ros::Subscriber sub_temp = n.subscribe("temperature", 1000, temperatureCallback);
ros::Subscriber sub_person = n.subscribe("person_info", 1000, personCallback);
ros::Subscriber sub_robot = n.subscribe("robot_state", 1000, robotStateCallback);
ROS_INFO("订阅者节点已启动,等待接收消息...");
// 进入事件循环,持续处理回调
ros::spin();
return 0;
}
代码解析 :
- ros::Subscriber :创建订阅者对象,参数为话题名、队列大小和回调函数
- 回调函数:当收到消息时自动调用,参数为消息的智能指针
- ros::spin() :进入事件循环,持续处理回调,直到节点关闭
五、Python实现
5.1 发布者节点(talker.py)
#!/usr/bin/env python3
import rospy
import math
from std_msgs.msg import String, Int32, Float64
from topic_demo.msg import Person, RobotState
def talker():
# 初始化节点
rospy.init_node('talker', anonymous=True)
# 创建发布者
pub_chatter = rospy.Publisher('chatter', String, queue_size=1000)
pub_counter = rospy.Publisher('counter', Int32, queue_size=1000)
pub_temp = rospy.Publisher('temperature', Float64, queue_size=1000)
pub_person = rospy.Publisher('person_info', Person, queue_size=1000)
pub_robot = rospy.Publisher('robot_state', RobotState, queue_size=1000)
# 设置发布频率
rate = rospy.Rate(10)
count = 0
rospy.loginfo("Python发布者节点已启动")
while not rospy.is_shutdown():
# 发布字符串消息
pub_chatter.publish(String(data=f"Hello ROS! Message #{count}"))
# 发布整数消息
pub_counter.publish(Int32(data=count))
# 发布浮点消息
pub_temp.publish(Float64(data=25.0 + math.sin(count * 0.5) * 5.0))
# 发布Person消息
person = Person()
person.name = f"Student_{count % 10}"
person.age = 18 + (count % 5)
person.height = 1.70 + (count % 10) * 0.02
person.address = f"Room_{count % 100}"
person.gender = count % 2
person.is_student = True
pub_person.publish(person)
# 发布RobotState消息
robot = RobotState()
robot.x = count * 0.1
robot.y = count * 0.05
robot.theta = count * 0.1
robot.linear_velocity = 0.5
robot.angular_velocity = 0.2
robot.is_moving = True
robot.battery_level = "85%"
pub_robot.publish(robot)
rospy.loginfo(f"已发布第 {count} 组消息")
rate.sleep()
count += 1
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
rospy.loginfo("发布者节点被中断")
5.2 订阅者节点(listener.py)
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String, Int32, Float64
from topic_demo.msg import Person, RobotState
def chatter_callback(msg):
rospy.loginfo(f"[订阅] chatter: {msg.data}")
def counter_callback(msg):
rospy.loginfo(f"[订阅] counter: {msg.data}")
def temperature_callback(msg):
rospy.loginfo(f"[订阅] temperature: {msg.data:.2f}")
def person_callback(msg):
rospy.loginfo("[订阅] Person:")
rospy.loginfo(f" 姓名: {msg.name}")
rospy.loginfo(f" 年龄: {msg.age}")
rospy.loginfo(f" 身高: {msg.height:.2f} m")
rospy.loginfo(f" 地址: {msg.address}")
rospy.loginfo(f" 性别: {'男' if msg.gender == 0 else '女'}")
rospy.loginfo(f" 学生: {'是' if msg.is_student else '否'}")
def robot_state_callback(msg):
rospy.loginfo("[订阅] RobotState:")
rospy.loginfo(f" 位置: ({msg.x:.2f}, {msg.y:.2f})")
rospy.loginfo(f" 航向角: {msg.theta:.2f} rad")
rospy.loginfo(f" 线速度: {msg.linear_velocity:.2f} m/s")
rospy.loginfo(f" 角速度: {msg.angular_velocity:.2f} rad/s")
rospy.loginfo(f" 移动中: {'是' if msg.is_moving else '否'}")
rospy.loginfo(f" 电池电量: {msg.battery_level}")
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber('chatter', String, chatter_callback)
rospy.Subscriber('counter', Int32, counter_callback)
rospy.Subscriber('temperature', Float64, temperature_callback)
rospy.Subscriber('person_info', Person, person_callback)
rospy.Subscriber('robot_state', RobotState, robot_state_callback)
rospy.loginfo("Python订阅者节点已启动,等待接收消息...")
rospy.spin()
if __name__ == '__main__':
try:
listener()
except rospy.ROSInterruptException:
rospy.loginfo("订阅者节点被中断")
六、运行与测试
6.1 编译项目
cd ~/catkin_ws
catkin_make
source devel/setup.bash
6.2 方式一:使用launch文件启动
创建 launch/topic_demo.launch :
<launch>
<node name="talker" pkg="topic_demo" type="talker" output="screen"/>
<node name="listener" pkg="topic_demo" type="listener" output="screen"/>
</launch>
启动命令:
roslaunch topic_demo topic_demo.launch
6.3 方式二:分步运行
终端1 - 启动ROS核心 :
roscore
终端2 - 运行C++发布者 :
rosrun topic_demo talker
终端3 - 运行C++订阅者 :
rosrun topic_demo listener
终端4 - 运行Python发布者 :
rosrun topic_demo talker.py
终端5 - 运行Python订阅者 :
rosrun topic_demo listener.py
6.4 运行结果
发布者终端输出:
[ INFO] [1620000000.000000]: [发布] chatter: Hello ROS! Message #0
[ INFO] [1620000000.000001]: [发布] counter: 0
[ INFO] [1620000000.000002]: [发布] temperature: 25.00
[ INFO] [1620000000.000003]: [发布] person: name=Student_0, age=18, height=1.70
[ INFO] [1620000000.000004]: [发布] robot: x=0.00, y=0.00, theta=0.00
[ INFO] [1620000000.100000]: [发布] chatter: Hello ROS! Message #1
[ INFO] [1620000000.100001]: [发布] counter: 1
[ INFO] [1620000000.100002]: [发布] temperature: 27.48
[ INFO] [1620000000.100003]: [发布] person: name=Student_1, age=19, height=1.72
[ INFO] [1620000000.100004]: [发布] robot: x=0.10, y=0.05, theta=0.10
...
订阅者终端输出 :
[ INFO] [1620000000.000000]: 订阅者节点已启动,等待接收消息...
[ INFO] [1620000000.000001]: [订阅] chatter: Hello ROS! Message #0
[ INFO] [1620000000.000002]: [订阅] counter: 0
[ INFO] [1620000000.000003]: [订阅] temperature: 25.00
[ INFO] [1620000000.000004]: [订阅] Person:
[ INFO] [1620000000.000005]: 姓名: Student_0
[ INFO] [1620000000.000006]: 年龄: 18
[ INFO] [1620000000.000007]: 身高: 1.70 m
[ INFO] [1620000000.000008]: 地址: Room_0
[ INFO] [1620000000.000009]: 性别: 男
[ INFO] [1620000000.000010]: 学生: 是
[ INFO] [1620000000.000011]: [订阅] RobotState:
[ INFO] [1620000000.000012]: 位置: (0.00, 0.00)
[ INFO] [1620000000.000013]: 航向角: 0.00 rad
[ INFO] [1620000000.000014]: 线速度: 0.50 m/s
[ INFO] [1620000000.000015]: 角速度: 0.20 rad/s
[ INFO] [1620000000.000016]: 移动中: 是
[ INFO] [1620000000.000017]: 电池电量: 85%
...
七、话题管理工具
7.1 查看所有话题
rostopic list
输出:
/chatter
/counter
/temperature
/person_info
/robot_state
/rosout
/rosout_agg
7.2 查看话题详情
rostopic info /person_info
输出:
Type: topic_demo/Person
Publishers:
* /talker (http://localhost:11311/)
Subscribers:
* /listener (http://localhost:11311/)
7.3 查看消息内容
rostopic echo /chatter
7.4 查看消息类型定义
rosmsg show topic_demo/Person
输出:
string name
uint8 age
float32 height
string address
uint8 gender
bool is_student
7.5 查看话题发布频率
rostopic hz /chatter
输出:
subscribed to [/chatter]
average rate: 10.001
min: 0.099s max: 0.101s std dev: 0.00033s window: 10
八、技术要点总结
8.1 C++与Python对比

8.2 最佳实践
1. 消息设计原则 :
- 字段命名清晰,语义明确
- 避免过深的嵌套结构
- 优先使用标准消息类型
2. 队列大小设置 :
- 根据发布频率和处理能力设置
- 过小可能导致消息丢失
- 过大可能增加内存占用
3. 命名规范 :
- 话题名使用小写字母和下划线
- 使用命名空间区分功能模块
4. 错误处理 :
- 检查 ros::ok() 状态
- 添加日志输出便于调试
- 处理异常情况
8.3 常见问题

九、总结
通过本文的实战项目,我们系统学习了ROS话题通信的完整流程:
1. 原理理解 :掌握了发布-订阅模式的工作机制
2. 消息定义 :学会了自定义消息类型的创建和配置
3. 代码实现 :掌握了C++和Python两种语言的发布者/订阅者编写
4. 调试技巧 :熟悉了rostopic、rosmsg等工具的使用
5. 最佳实践 :了解了话题通信的设计原则和注意事项
话题通信是ROS中最基础也是最重要的通信方式,广泛应用于传感器数据传输、机器人状态发布、控制指令下发等场景。掌握好这一知识点,对于后续学习服务通信、动作通信等高级内容至关重要。
附录:项目文件结构
catkin_ws/
├── src/
│ └── topic_demo/
│ ├── CMakeLists.txt
│ ├── package.xml
│ ├── launch/
│ │ └── topic_demo.launch
│ ├── msg/
│ │ ├── Person.msg
│ │ └── RobotState.msg
│ ├── scripts/
│ │ ├── talker.py
│ │ └── listener.py
│ └── src/
│ ├── talker.cpp
│ └── listener.cpp
├── build/
└── devel/
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)