ROS(Robot Operating System机器人操作系统)
ROS=通信+工具+功能+生态ROS是适用于机器人的开源元操作系统,机器人里,一个模块负责“看”(摄像头),一个负责“动”(电机),ROS让它们能像微信群聊一样交换信息,而不用管对方是用C++还是Python写的。
ROS=通信+工具+功能+生态
ROS是适用于机器人的开源元操作系统,机器人里,一个模块负责“看”(摄像头),一个负责“动”(电机),ROS让它们能像微信群聊一样交换信息,而不用管对方是用C++还是Python写的
一、ROS中程序实现流程:1.创工作空间;2.创功能包;3.编辑源文件;4.编辑配置文件;5.编译执行
1.创建工作空间并初始化
创建工作空间以及一个src子目录,然后再进入工作空间调用catkin_make命令编译
mkdir -p 自定义空间名称/src
cd 自定义空间名称
catkin_make
2.进入src创建ros包并添加依赖
在工作空间下生成一个功能包,该功能包依赖于roscpp(c++实现的库)、rospy(Python实现的库)、std_msgs(标准信息库),创建ROS功能包时,一般依赖这三个库
cd src
catkin_create_pkg 自定义ROS包名 roscpp rospy std_msgs
3.进入ros包添加scripts目录并编辑Python文件
cd ros包
mkdir scripts
新建Python文件(自定义命名)
#指定解释器
#! /user/bin/env Python
#1.导包
import rospy
#2.编写主入口
if __name__=="__main__":
#3.初始化ROS节点
rospy.init_node("hello_p"):
#4.输出日志
rospy.loginfo("hello world! by python");
4.为Python文件添加可执行权限
chmod +x 自定义文件名.py
5.编辑ros包下的CamkeList.txt文件(第161行)
catkin_install_python(PROGRAMS scripts/自定义文件名.py
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
6.进入工作空间目录并编译
cd 自定义空间名称
catkin_make
7.进入工作空间目录并执行
先启动命令行1:
roscore
在启动命令行2:
cd 工作空间
source ./devel/setup.bash
rosrun 包名 自定义文件名.py
输出结果:Hello World!
安装terminator(多分屏终端显示)
sudo apt install terminator
分屏操作快捷键
垂直分屏: Ctrl + Shift + E
水平分屏: Ctrl + Shift + O
切换分屏: Alt + 方向键 或 Ctrl + Tab
关闭分屏: Ctrl + Shift + W
其他实用快捷键
新建标签页: Ctrl + Shift + T
关闭终端: Ctrl + Shift + Q
复制文本: Ctrl + Shift + C
粘贴文本: Ctrl + Shift + V
全屏: F11
二、使用VScode
Python脚本指示器
若不配置CMakeLists.txt执行Python文件抛出异常,会显示没有那个文件或目录
解决方法:1.声明解释器为Python3;2.软连接,将Python链接到Python3

launch文件:使用launch文件,可以一次性启动多个ROS节点


记得保存写好的launch文件!!!
三、ROS架构
ROS=Plumbing(通讯机制:实现ROS不同节点的交互)+Tools(工具软件包:ROS中的开发和调试工具)+Capabilities(高层技术,ROS某些功能的集合,如导航)+Ecosystem(生态系统)
1.ROS文件系统



package.xml 该文件定义有关软件包的属性(如软件包名称,版本号,作者,维护者,对其他catkin软件包的依赖性)
CMakeLists.txt 该文件是CMake构建系统的输入,用于构建软件包,描述将代码安装到何处
2.ROS文件系统相关命令
| 增 | catkin_create_pkg 自定义包名 依赖包 | 创建新的ROS功能包 |
| sudo apt install xxx | 安装ROS功能包 | |
| 删 | sudo apt purge xxx | 删除某个功能包 |
| 查 | rospack list | 列出所有功能包 |
| rospack find 包名 | 查找功能包,存在则返回安装路径 | |
| roscd 包名 | 进入某个功能包 | |
| rosls 包名 | 列出某个包下的文件 | |
| apt search xxx | 搜素某个功能包 | |
| 改 | rosed 包名 文件名 | 修改功能包文件 |
| 执行 | roscore (-p:指定端口号) | 使ROS节点通信 |
| rosrun 包名 可执行文件名 | 运行指定的ROS节点 | |
| roslaunch 包名 launch 文件名 | 执行某个包下的launch文件 |
3.ROS计算图
ROS程序运行之后,不同节点错综复杂,rpt_graph能够创建一个显示当前系统运行情况的动态图形,计算图可以以点对点的网络形式表现数据交互过程。
四、ROS通信机制
1. 节点(Node) = 一个独立的小程序
-
例子:
-
乌龟仿真器是一个节点 -
键盘控制是另一个节点
-
-
特点:每个节点只做一件事,可以独立运行
-
您的理解:一个节点 = 一个
.py或.cpp文件变成的可执行程序
2. 话题(Topic) = 节点之间的“广播频道”
-
例子:
-
/turtle1/cmd_vel是“速度指令频道” -
键盘控制节点“喊话”到这个频道
-
乌龟仿真节点“收听”这个频道
-
-
特点:一个广播,多个收听(1对多)
3. 消息(Message) = 在话题上传递的“纸条”
-
例子:
-
纸条上写:“前进速度0.5,转弯速度0.0”
-
格式固定,大家都认识
-
三种实现策略:话题通信(发布订阅)、服务通信(请求响应)、参数服务器(参数共享)
1.话题通信:已发布订阅的方式实现不同节点之间数据交互的通信方式
1.用于不断更新的、少逻辑处理的数据传输场景

2.实现流程:1.编写发布方实现;2.编写订阅方实现;3.编辑配置文件;4.编译并执行
📁 第一步:在您的ROS包中创建Python节点
1. 确保在正确的目录
# 进入您的ROS包的scripts目录
cd ~/catkin_ws/src/my_ros_pkg/scripts
2. 创建“说话者”节点(talker.py)
# 创建文件
nano talker.py
复制粘贴以下内容:
#!/usr/bin/env python3
# 这行必须写!告诉系统这是Python3脚本
import rospy
from std_msgs.msg import String # 导入字符串消息类型
def talker():
# 1. 初始化节点,名字叫"talker"
rospy.init_node('talker', anonymous=True)
# 2. 创建一个发布者(Publisher)
# 参数说明:发布到"chatter"话题,消息类型是String,队列长度10
pub = rospy.Publisher('chatter', String, queue_size=10)
# 3. 设置说话频率:1秒1次
rate = rospy.Rate(1) # 1Hz = 每秒1次
count = 0
rospy.loginfo("说话者已启动,开始广播...")
# 4. 循环说话
while not rospy.is_shutdown():
# 准备要说的话
hello_str = f"大家好!这是第 {count} 条消息"
# 发布消息(对外喊话)
pub.publish(hello_str)
# 在终端打印日志
rospy.loginfo(f"我说:{hello_str}")
count += 1
rate.sleep() # 等待1秒
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
rospy.loginfo("说话者已停止")
保存并退出:
-
按
Ctrl+O,然后回车保存 -
按
Ctrl+X退出nano
3. 创建“收听者”节点(listener.py)
nano listener.py
复制粘贴以下内容:
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String # 导入字符串消息类型
def callback(data):
# 当收到消息时,这个函数会被自动调用
# data就是收到的消息
rospy.loginfo(f"我听到:{data.data}")
def listener():
# 1. 初始化节点,名字叫"listener"
rospy.init_node('listener', anonymous=True)
rospy.loginfo("收听者已启动,正在监听...")
# 2. 创建一个订阅者(Subscriber)
# 参数说明:订阅"chatter"话题,消息类型是String
# 收到消息时自动调用callback函数
rospy.Subscriber("chatter", String, callback)
# 3. 保持节点运行,直到被关闭
rospy.spin()
if __name__ == '__main__':
listener()
保存并退出。
🔧 第二步:设置文件权限
Python脚本需要可执行权限才能被ROS运行:
# 给两个脚本添加执行权限
chmod +x talker.py
chmod +x listener.py
# 检查权限
ls -la
应该看到文件名是绿色的,并且有 x权限。
🚀 第三步:编译您的包
# 回到工作空间根目录
cd ~/catkin_ws
# 编译
catkin_make
# 加载环境
source devel/setup.bash
🎮 第四步:运行测试
打开4个终端窗口:
终端1:启动ROS核心(必须首先运行)
roscore
终端2:运行说话者
cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg talker.py
会看到每秒输出一条消息。
终端3:运行收听者
cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg listener.py
看到收听者实时重复说话者的话。
终端4:查看系统状态
# 查看所有节点
rosnode list
# 应该看到:/listener, /rosout, /talker
# 查看所有话题
rostopic list
# 应该看到:/chatter, /rosout, ...
# 查看chatter话题的消息
rostopic echo /chatter
🎯 第五步:创建launch文件(一键启动)
1. 创建launch文件
# 进入launch目录
cd ~/catkin_ws/src/my_ros_pkg/launch
# 创建launch文件
nano talk_listen.launch
2. 复制粘贴内容:
<launch>
<!-- 启动说话者节点 -->
<node pkg="my_ros_pkg" type="talker.py" name="talker" output="screen"/>
<!-- 启动收听者节点 -->
<node pkg="my_ros_pkg" type="listener.py" name="listener" output="screen"/>
</launch>
保存并退出。
3. 用launch文件一键启动
# 确保roscore在运行
# 然后运行launch文件
roslaunch my_ros_pkg talk_listen.launch
这会同时启动两个节点!
📊 第六步:用rqt_graph可视化查看
在节点运行的情况下:
rosrun rqt_graph rqt_graph
您会看到:
-
两个节点:
/talker和/listener -
一个话题:
/chatter -
箭头从
/talker指向/listener
2.话题通信msg自定义消息调用
默认消息:ROS自带的(如String, Twist)
自定义消息:您自己定义的消息类型,比如:
-
机器人状态消息
-
传感器数据包
-
控制指令等
第一步:创建自定义消息文件
1. 在ROS包中创建msg目录
# 进入您的ROS包
cd ~/catkin_ws/src/my_ros_pkg
# 创建msg目录
mkdir msg
# 进入msg目录
cd msg
2. 创建您的第一个自定义消息文件
# 创建Person.msg文件
nano Person.msg
输入以下内容:
string first_name
string last_name
uint8 age
uint8 id
string phone
保存并退出:Ctrl+O→ 回车 → Ctrl+X
3. 创建第二个消息文件(包含数组)
nano Team.msg
输入内容:
Person[] members
uint32 team_id
string team_name
第二步:配置包文件
1. 修改package.xml
# 回到包根目录
cd ~/catkin_ws/src/my_ros_pkg
# 编辑package.xml
nano package.xml
添加以下两行(在适当位置,通常是最后):
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
2. 修改CMakeLists.txt
nano CMakeLists.txt
需要修改3个地方:
A. 找到find_package部分,添加message_generation:
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs
message_generation # 添加这行
)
B. 找到add_message_files部分,取消注释并修改:
## Generate messages in the 'msg' folder
add_message_files(
FILES
Person.msg
Team.msg
)
C. 找到generate_messages部分,取消注释:
## Generate added messages and services with any dependencies listed here
generate_messages(
DEPENDENCIES
std_msgs
)
D. 找到catkin_package部分,确保包含message_runtime:
catkin_package(
CATKIN_DEPENDS rospy std_msgs message_runtime
)
第三步:编译消息
# 回到工作空间根目录
cd ~/catkin_ws
# 编译
catkin_make
# 加载环境
source devel/setup.bash
第四步:验证自定义消息
1. 检查消息是否生成成功
# 查看消息类型
rosmsg show my_ros_pkg/Person
应该显示:
string first_name
string last_name
uint8 age
uint8 id
string phone
rosmsg show my_ros_pkg/Team
应该显示:
my_ros_pkg/Person[] members
uint32 team_id
string team_name
2. 查看完整类型
rosmsg show Person
应该显示完整路径:my_ros_pkg/Person
第五步:使用自定义消息的Python节点
1. 创建使用Person消息的发布者
cd ~/catkin_ws/src/my_ros_pkg/scripts
nano person_talker.py
输入以下内容:
#!/usr/bin/env python3
import rospy
from my_ros_pkg.msg import Person # 导入自定义消息
import random
def talker():
rospy.init_node('person_talker', anonymous=True)
# 创建发布者,发布到person_info话题
pub = rospy.Publisher('person_info', Person, queue_size=10)
rate = rospy.Rate(1) # 1Hz
# 模拟一些人员数据
first_names = ["张", "李", "王", "刘", "陈"]
last_names = ["三", "四", "五", "六", "七"]
person_id = 1
rospy.loginfo("开始发布人员信息...")
while not rospy.is_shutdown():
# 创建Person消息
person_msg = Person()
person_msg.first_name = random.choice(first_names)
person_msg.last_name = random.choice(last_names)
person_msg.age = random.randint(20, 40)
person_msg.id = person_id
person_msg.phone = f"138{random.randint(1000,9999)}{random.randint(1000,9999)}"
# 发布消息
pub.publish(person_msg)
rospy.loginfo(f"发布人员: {person_msg.first_name}{person_msg.last_name}, "
f"年龄: {person_msg.age}, ID: {person_msg.id}")
person_id += 1
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
rospy.loginfo("发布者已停止")
2. 创建订阅者
nano person_listener.py
输入内容
#!/usr/bin/env python3
import rospy
from my_ros_pkg.msg import Person
def callback(data):
rospy.loginfo("收到人员信息:")
rospy.loginfo(f" 姓名: {data.first_name}{data.last_name}")
rospy.loginfo(f" 年龄: {data.age}")
rospy.loginfo(f" ID: {data.id}")
rospy.loginfo(f" 电话: {data.phone}")
rospy.loginfo("-" * 30)
def listener():
rospy.init_node('person_listener', anonymous=True)
rospy.loginfo("等待接收人员信息...")
rospy.Subscriber("person_info", Person, callback)
rospy.spin()
if __name__ == '__main__':
listener()
3. 设置权限
chmod +x person_talker.py
chmod +x person_listener.py
第六步:运行测试
打开3个终端:
终端1:
roscore
终端2(发布者):
cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg person_talker.py
终端3(订阅者)
cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg person_listener.py
终端4(查看消息):
# 查看话题
rostopic list
# 应该看到:/person_info
# 查看消息内容
rostopic echo /person_info
第七步:创建launch文件
cd ~/catkin_ws/src/my_ros_pkg/launch
nano person_demo.launch
输入内容:
<launch>
<!-- 启动人员信息发布者 -->
<node pkg="my_ros_pkg" type="person_talker.py" name="person_talker" output="screen"/>
<!-- 启动人员信息收听者 -->
<node pkg="my_ros_pkg" type="person_listener.py" name="person_listener" output="screen"/>
</launch>
运行launch文件:
roslaunch my_ros_pkg person_demo.launch
3.服务通信
客户端 发送请求 → 服务端 处理并返回响应
-
特点:双向、请求-响应、一次性
第一步:创建自定义服务
1. 创建 srv 目录
cd ~/catkin_ws/src/my_ros_pkg
mkdir srv
2. 创建第一个服务定义
nano srv/AddTwoInts.srv
输入内容:
# 请求部分
int64 a
int64 b
---
# 响应部分(三个减号分隔)
int64 sum
再创建一个复杂点的服务:
nano srv/PersonInfo.srv
# 请求:根据ID查询人员
uint8 id
---
# 响应:返回人员信息
string full_name
uint8 age
bool found
第二步:配置 CMakeLists.txt
修改 CMakeLists.txt,添加服务配置:
# 在 add_message_files 部分下面,添加:
add_service_files(
FILES
AddTwoInts.srv
PersonInfo.srv
)
# generate_messages 部分保持不变(已包含)
generate_messages(
DEPENDENCIES
std_msgs
)
第三步:配置 package.xml
确保有这些依赖:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
第四步:编译生成服务
cd ~/catkin_ws
catkin_make
source devel/setup.bash
第五步:编写服务端(Server)
创建加法服务端
cd ~/catkin_ws/src/my_ros_pkg/scripts
nano add_two_ints_server.py
#!/usr/bin/env python3
import rospy
from my_ros_pkg.srv import AddTwoInts, AddTwoIntsResponse
def handle_add_two_ints(req):
rospy.loginfo(f"收到请求: {req.a} + {req.b}")
sum_result = req.a + req.b
rospy.loginfo(f"计算结果: {sum_result}")
return AddTwoIntsResponse(sum_result)
def add_two_ints_server():
rospy.init_node('add_two_ints_server')
# 创建服务,服务名为 "add_two_ints"
s = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
rospy.loginfo("加法服务端已启动,等待请求...")
rospy.spin()
if __name__ == "__main__":
add_two_ints_server()
第六步:编写客户端(Client)
nano add_two_ints_client.py
#!/usr/bin/env python3
import sys
import rospy
from my_ros_pkg.srv import AddTwoInts
def add_two_ints_client(x, y):
rospy.wait_for_service('add_two_ints')
try:
# 创建服务代理
add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
rospy.loginfo(f"发送请求: {x} + {y}")
# 调用服务
resp = add_two_ints(x, y)
rospy.loginfo(f"收到响应: 和 = {resp.sum}")
return resp.sum
except rospy.ServiceException as e:
rospy.logerr(f"服务调用失败: {e}")
return None
if __name__ == "__main__":
if len(sys.argv) == 3:
a = int(sys.argv[1])
b = int(sys.argv[2])
else:
# 使用默认值
a = 5
b = 3
rospy.loginfo(f"使用默认值: {a} + {b}")
rospy.init_node('add_two_ints_client')
result = add_two_ints_client(a, b)
if result is not None:
rospy.loginfo(f"最终结果: {result}")
第七步:设置权限
chmod +x add_two_ints_server.py
chmod +x add_two_ints_client.py
第八步:运行测试
打开三个终端:
终端1:启动 ROS 核心
roscore
终端2:启动服务端
cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg add_two_ints_server.py
终端3:启动客户端
cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg add_two_ints_client.py
或带参数调用:
rosrun my_ros_pkg add_two_ints_client.py 10 20
第九步:命令行测试服务
终端4:用命令行测试
# 查看当前所有服务
rosservice list
# 查看服务类型
rosservice type /add_two_ints
# 查看服务定义
rossrv show my_ros_pkg/AddTwoInts
# 命令行调用服务
rosservice call /add_two_ints "a: 15
b: 25"
📁 第十步:创建 launch 文件
cd ~/catkin_ws/src/my_ros_pkg/launch
nano service_demo.launch
<launch>
<!-- 启动服务端 -->
<node pkg="my_ros_pkg"
type="add_two_ints_server.py"
name="add_two_ints_server"
output="screen"/>
<!-- 启动客户端(示例调用) -->
<node pkg="my_ros_pkg"
type="add_two_ints_client.py"
name="add_two_ints_client"
output="screen"
args="7 8"/>
</launch>
运行launch文件(先roscore)
cd ~/catkin_ws
source devel/setup.bash
roslaunch my_ros_pkg service_demo.launch
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)