ROS=通信+工具+功能+生态

ROS是适用于机器人的开源元操作系统,机器人里,一个模块负责“看”(摄像头),一个负责“动”(电机),ROS让它们能像微信群聊一样交换信息,而不用管对方是用C++还是Python写的

一、ROS中程序实现流程:1.创工作空间;2.创功能包;3.编辑源文件;4.编辑配置文件;5.编译执行

1.创建工作空间并初始化

创建工作空间以及一个src子目录,然后再进入工作空间调用catkin_make命令编译

mkdir -p 自定义空间名称/src
cd 自定义空间名称
catkin_make

2.进入src创建ros包并添加依赖

在工作空间下生成一个功能包,该功能包依赖于roscpp(c++实现的库)、rospy(Python实现的库)、std_msgs(标准信息库),创建ROS功能包时,一般依赖这三个库

cd src
catkin_create_pkg 自定义ROS包名 roscpp rospy std_msgs

3.进入ros包添加scripts目录并编辑Python文件

cd ros包
mkdir scripts

新建Python文件(自定义命名)

#指定解释器
#! /user/bin/env  Python
#1.导包
import rospy
#2.编写主入口
if __name__=="__main__":
#3.初始化ROS节点
rospy.init_node("hello_p"):
#4.输出日志
rospy.loginfo("hello world! by python");

4.为Python文件添加可执行权限

chmod +x 自定义文件名.py

5.编辑ros包下的CamkeList.txt文件(第161行)

catkin_install_python(PROGRAMS scripts/自定义文件名.py
  DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)

6.进入工作空间目录并编译

cd 自定义空间名称
catkin_make

7.进入工作空间目录并执行

先启动命令行1:

roscore

在启动命令行2:

cd 工作空间
source ./devel/setup.bash
rosrun 包名  自定义文件名.py

输出结果:Hello  World!

安装terminator(多分屏终端显示)

sudo apt install terminator

分屏操作快捷键

垂直分屏: Ctrl + Shift + E 

水平分屏: Ctrl + Shift + O 

切换分屏: Alt + 方向键  或  Ctrl + Tab 

关闭分屏: Ctrl + Shift + W 

其他实用快捷键

新建标签页: Ctrl + Shift + T 

关闭终端: Ctrl + Shift + Q 

复制文本: Ctrl + Shift + C 

粘贴文本: Ctrl + Shift + V 

全屏: F11 

二、使用VScode

Python脚本指示器

若不配置CMakeLists.txt执行Python文件抛出异常,会显示没有那个文件或目录

解决方法:1.声明解释器为Python3;2.软连接,将Python链接到Python3

launch文件:使用launch文件,可以一次性启动多个ROS节点

记得保存写好的launch文件!!!

三、ROS架构

ROS=Plumbing(通讯机制:实现ROS不同节点的交互)+Tools(工具软件包:ROS中的开发和调试工具)+Capabilities(高层技术,ROS某些功能的集合,如导航)+Ecosystem(生态系统)

1.ROS文件系统

 

package.xml    该文件定义有关软件包的属性(如软件包名称,版本号,作者,维护者,对其他catkin软件包的依赖性)

CMakeLists.txt   该文件是CMake构建系统的输入,用于构建软件包,描述将代码安装到何处

2.ROS文件系统相关命令

catkin_create_pkg 自定义包名 依赖包 创建新的ROS功能包
sudo apt install xxx 安装ROS功能包
sudo apt purge xxx 删除某个功能包
rospack list 列出所有功能包
rospack find 包名 查找功能包,存在则返回安装路径
roscd 包名 进入某个功能包
rosls 包名 列出某个包下的文件
apt search xxx 搜素某个功能包
rosed 包名 文件名 修改功能包文件
执行 roscore  (-p:指定端口号) 使ROS节点通信
rosrun 包名 可执行文件名 运行指定的ROS节点
roslaunch 包名 launch 文件名 执行某个包下的launch文件

3.ROS计算图

ROS程序运行之后,不同节点错综复杂,rpt_graph能够创建一个显示当前系统运行情况的动态图形,计算图可以以点对点的网络形式表现数据交互过程。

四、ROS通信机制

1. 节点(Node)​ = 一个独立的小程序

  • 例子

    • 乌龟仿真器是一个节点

    • 键盘控制是另一个节点

  • 特点:每个节点只做一件事,可以独立运行

  • 您的理解:一个节点 = 一个.py.cpp文件变成的可执行程序

2. 话题(Topic)​ = 节点之间的“广播频道”

  • 例子

    • /turtle1/cmd_vel是“速度指令频道”

    • 键盘控制节点“喊话”到这个频道

    • 乌龟仿真节点“收听”这个频道

  • 特点:一个广播,多个收听(1对多)

3. 消息(Message)​ = 在话题上传递的“纸条”

  • 例子

    • 纸条上写:“前进速度0.5,转弯速度0.0”

    • 格式固定,大家都认识

三种实现策略:话题通信(发布订阅)、服务通信(请求响应)、参数服务器(参数共享)

1.话题通信:已发布订阅的方式实现不同节点之间数据交互的通信方式

1.用于不断更新的、少逻辑处理的数据传输场景

2.实现流程:1.编写发布方实现;2.编写订阅方实现;3.编辑配置文件;4.编译并执行

📁 第一步:在您的ROS包中创建Python节点

1. 确保在正确的目录

# 进入您的ROS包的scripts目录
cd ~/catkin_ws/src/my_ros_pkg/scripts

2. 创建“说话者”节点(talker.py)

# 创建文件
nano talker.py

复制粘贴以下内容

#!/usr/bin/env python3
# 这行必须写!告诉系统这是Python3脚本

import rospy
from std_msgs.msg import String  # 导入字符串消息类型

def talker():
    # 1. 初始化节点,名字叫"talker"
    rospy.init_node('talker', anonymous=True)
    
    # 2. 创建一个发布者(Publisher)
    # 参数说明:发布到"chatter"话题,消息类型是String,队列长度10
    pub = rospy.Publisher('chatter', String, queue_size=10)
    
    # 3. 设置说话频率:1秒1次
    rate = rospy.Rate(1)  # 1Hz = 每秒1次
    
    count = 0
    rospy.loginfo("说话者已启动,开始广播...")
    
    # 4. 循环说话
    while not rospy.is_shutdown():
        # 准备要说的话
        hello_str = f"大家好!这是第 {count} 条消息"
        
        # 发布消息(对外喊话)
        pub.publish(hello_str)
        
        # 在终端打印日志
        rospy.loginfo(f"我说:{hello_str}")
        
        count += 1
        rate.sleep()  # 等待1秒

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        rospy.loginfo("说话者已停止")

保存并退出

  • Ctrl+O,然后回车保存

  • Ctrl+X退出nano

3. 创建“收听者”节点(listener.py)

nano listener.py

复制粘贴以下内容

#!/usr/bin/env python3

import rospy
from std_msgs.msg import String  # 导入字符串消息类型

def callback(data):
    # 当收到消息时,这个函数会被自动调用
    # data就是收到的消息
    rospy.loginfo(f"我听到:{data.data}")

def listener():
    # 1. 初始化节点,名字叫"listener"
    rospy.init_node('listener', anonymous=True)
    
    rospy.loginfo("收听者已启动,正在监听...")
    
    # 2. 创建一个订阅者(Subscriber)
    # 参数说明:订阅"chatter"话题,消息类型是String
    # 收到消息时自动调用callback函数
    rospy.Subscriber("chatter", String, callback)
    
    # 3. 保持节点运行,直到被关闭
    rospy.spin()

if __name__ == '__main__':
    listener()

保存并退出

🔧 第二步:设置文件权限

Python脚本需要可执行权限才能被ROS运行:

# 给两个脚本添加执行权限
chmod +x talker.py
chmod +x listener.py

# 检查权限
ls -la

应该看到文件名是绿色的,并且有 x权限。

🚀 第三步:编译您的包

# 回到工作空间根目录
cd ~/catkin_ws

# 编译
catkin_make

# 加载环境
source devel/setup.bash

🎮 第四步:运行测试

打开4个终端窗口:

终端1:启动ROS核心(必须首先运行)

roscore

终端2:运行说话者

cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg talker.py

会看到每秒输出一条消息。

终端3:运行收听者

cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg listener.py

看到收听者实时重复说话者的话。

终端4:查看系统状态

# 查看所有节点
rosnode list
# 应该看到:/listener, /rosout, /talker

# 查看所有话题
rostopic list
# 应该看到:/chatter, /rosout, ...

# 查看chatter话题的消息
rostopic echo /chatter

🎯 第五步:创建launch文件(一键启动)

1. 创建launch文件

# 进入launch目录
cd ~/catkin_ws/src/my_ros_pkg/launch

# 创建launch文件
nano talk_listen.launch

2. 复制粘贴内容:

<launch>
    <!-- 启动说话者节点 -->
    <node pkg="my_ros_pkg" type="talker.py" name="talker" output="screen"/>
    
    <!-- 启动收听者节点 -->
    <node pkg="my_ros_pkg" type="listener.py" name="listener" output="screen"/>
</launch>

保存并退出

3. 用launch文件一键启动

# 确保roscore在运行
# 然后运行launch文件
roslaunch my_ros_pkg talk_listen.launch

这会同时启动两个节点!

📊 第六步:用rqt_graph可视化查看

在节点运行的情况下:

rosrun rqt_graph rqt_graph

您会看到:

  • 两个节点:/talker/listener

  • 一个话题:/chatter

  • 箭头从 /talker指向 /listener

2.话题通信msg自定义消息调用

默认消息:ROS自带的(如String, Twist

自定义消息:您自己定义的消息类型,比如:

  • 机器人状态消息

  • 传感器数据包

  • 控制指令等

第一步:创建自定义消息文件

1. 在ROS包中创建msg目录

# 进入您的ROS包
cd ~/catkin_ws/src/my_ros_pkg

# 创建msg目录
mkdir msg

# 进入msg目录
cd msg

2. 创建您的第一个自定义消息文件

# 创建Person.msg文件
nano Person.msg

输入以下内容

string first_name
string last_name
uint8 age
uint8 id
string phone

保存并退出Ctrl+O→ 回车 → Ctrl+X

3. 创建第二个消息文件(包含数组)

nano Team.msg

输入内容

Person[] members
uint32 team_id
string team_name

第二步:配置包文件

1. 修改package.xml

# 回到包根目录
cd ~/catkin_ws/src/my_ros_pkg

# 编辑package.xml
nano package.xml

添加以下两行(在适当位置,通常是最后):

<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>

2. 修改CMakeLists.txt

nano CMakeLists.txt

需要修改3个地方

A. 找到find_package部分,添加message_generation

find_package(catkin REQUIRED COMPONENTS
  rospy
  std_msgs
  message_generation  # 添加这行
)

B. 找到add_message_files部分,取消注释并修改

## Generate messages in the 'msg' folder
add_message_files(
  FILES
  Person.msg
  Team.msg
)

C. 找到generate_messages部分,取消注释

## Generate added messages and services with any dependencies listed here
generate_messages(
  DEPENDENCIES
  std_msgs
)

D. 找到catkin_package部分,确保包含message_runtime

catkin_package(
  CATKIN_DEPENDS rospy std_msgs message_runtime
)

第三步:编译消息

# 回到工作空间根目录
cd ~/catkin_ws

# 编译
catkin_make

# 加载环境
source devel/setup.bash

第四步:验证自定义消息

1. 检查消息是否生成成功

# 查看消息类型
rosmsg show my_ros_pkg/Person

应该显示:

string first_name
string last_name
uint8 age
uint8 id
string phone
rosmsg show my_ros_pkg/Team

应该显示:

my_ros_pkg/Person[] members
uint32 team_id
string team_name

2. 查看完整类型

rosmsg show Person

应该显示完整路径:my_ros_pkg/Person

第五步:使用自定义消息的Python节点

1. 创建使用Person消息的发布者

cd ~/catkin_ws/src/my_ros_pkg/scripts
nano person_talker.py

输入以下内容

#!/usr/bin/env python3

import rospy
from my_ros_pkg.msg import Person  # 导入自定义消息
import random

def talker():
    rospy.init_node('person_talker', anonymous=True)
    
    # 创建发布者,发布到person_info话题
    pub = rospy.Publisher('person_info', Person, queue_size=10)
    
    rate = rospy.Rate(1)  # 1Hz
    
    # 模拟一些人员数据
    first_names = ["张", "李", "王", "刘", "陈"]
    last_names = ["三", "四", "五", "六", "七"]
    
    person_id = 1
    
    rospy.loginfo("开始发布人员信息...")
    
    while not rospy.is_shutdown():
        # 创建Person消息
        person_msg = Person()
        person_msg.first_name = random.choice(first_names)
        person_msg.last_name = random.choice(last_names)
        person_msg.age = random.randint(20, 40)
        person_msg.id = person_id
        person_msg.phone = f"138{random.randint(1000,9999)}{random.randint(1000,9999)}"
        
        # 发布消息
        pub.publish(person_msg)
        
        rospy.loginfo(f"发布人员: {person_msg.first_name}{person_msg.last_name}, "
                     f"年龄: {person_msg.age}, ID: {person_msg.id}")
        
        person_id += 1
        rate.sleep()

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        rospy.loginfo("发布者已停止")

2. 创建订阅者

nano person_listener.py

输入内容

#!/usr/bin/env python3

import rospy
from my_ros_pkg.msg import Person

def callback(data):
    rospy.loginfo("收到人员信息:")
    rospy.loginfo(f"  姓名: {data.first_name}{data.last_name}")
    rospy.loginfo(f"  年龄: {data.age}")
    rospy.loginfo(f"  ID: {data.id}")
    rospy.loginfo(f"  电话: {data.phone}")
    rospy.loginfo("-" * 30)

def listener():
    rospy.init_node('person_listener', anonymous=True)
    rospy.loginfo("等待接收人员信息...")
    
    rospy.Subscriber("person_info", Person, callback)
    rospy.spin()

if __name__ == '__main__':
    listener()

3. 设置权限

chmod +x person_talker.py
chmod +x person_listener.py

第六步:运行测试

打开3个终端:

终端1

roscore

终端2(发布者):

cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg person_talker.py

终端3(订阅者)

cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg person_listener.py

终端4(查看消息):

# 查看话题
rostopic list
# 应该看到:/person_info

# 查看消息内容
rostopic echo /person_info

第七步:创建launch文件

cd ~/catkin_ws/src/my_ros_pkg/launch
nano person_demo.launch

输入内容

<launch>
    <!-- 启动人员信息发布者 -->
    <node pkg="my_ros_pkg" type="person_talker.py" name="person_talker" output="screen"/>
    
    <!-- 启动人员信息收听者 -->
    <node pkg="my_ros_pkg" type="person_listener.py" name="person_listener" output="screen"/>
</launch>

运行launch文件

roslaunch my_ros_pkg person_demo.launch

3.服务通信

客户端 发送请求 → 服务端 处理并返回响应
  • 特点:双向、请求-响应、一次性

第一步:创建自定义服务

1. 创建 srv 目录

cd ~/catkin_ws/src/my_ros_pkg
mkdir srv

2. 创建第一个服务定义

nano srv/AddTwoInts.srv

输入内容

# 请求部分
int64 a
int64 b
---
# 响应部分(三个减号分隔)
int64 sum

再创建一个复杂点的服务

nano srv/PersonInfo.srv
# 请求:根据ID查询人员
uint8 id
---
# 响应:返回人员信息
string full_name
uint8 age
bool found

第二步:配置 CMakeLists.txt

修改 CMakeLists.txt,添加服务配置:

# 在 add_message_files 部分下面,添加:
add_service_files(
  FILES
  AddTwoInts.srv
  PersonInfo.srv
)

# generate_messages 部分保持不变(已包含)
generate_messages(
  DEPENDENCIES
  std_msgs
)

第三步:配置 package.xml

确保有这些依赖:

<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>

第四步:编译生成服务

cd ~/catkin_ws
catkin_make
source devel/setup.bash

第五步:编写服务端(Server)

创建加法服务端

cd ~/catkin_ws/src/my_ros_pkg/scripts
nano add_two_ints_server.py
#!/usr/bin/env python3
import rospy
from my_ros_pkg.srv import AddTwoInts, AddTwoIntsResponse

def handle_add_two_ints(req):
    rospy.loginfo(f"收到请求: {req.a} + {req.b}")
    sum_result = req.a + req.b
    rospy.loginfo(f"计算结果: {sum_result}")
    return AddTwoIntsResponse(sum_result)

def add_two_ints_server():
    rospy.init_node('add_two_ints_server')
    
    # 创建服务,服务名为 "add_two_ints"
    s = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
    
    rospy.loginfo("加法服务端已启动,等待请求...")
    rospy.spin()

if __name__ == "__main__":
    add_two_ints_server()

第六步:编写客户端(Client)

nano add_two_ints_client.py
#!/usr/bin/env python3
import sys
import rospy
from my_ros_pkg.srv import AddTwoInts

def add_two_ints_client(x, y):
    rospy.wait_for_service('add_two_ints')
    
    try:
        # 创建服务代理
        add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
        
        rospy.loginfo(f"发送请求: {x} + {y}")
        
        # 调用服务
        resp = add_two_ints(x, y)
        
        rospy.loginfo(f"收到响应: 和 = {resp.sum}")
        return resp.sum
        
    except rospy.ServiceException as e:
        rospy.logerr(f"服务调用失败: {e}")
        return None

if __name__ == "__main__":
    if len(sys.argv) == 3:
        a = int(sys.argv[1])
        b = int(sys.argv[2])
    else:
        # 使用默认值
        a = 5
        b = 3
        rospy.loginfo(f"使用默认值: {a} + {b}")
    
    rospy.init_node('add_two_ints_client')
    result = add_two_ints_client(a, b)
    
    if result is not None:
        rospy.loginfo(f"最终结果: {result}")

第七步:设置权限

chmod +x add_two_ints_server.py
chmod +x add_two_ints_client.py

第八步:运行测试

打开三个终端:

终端1:启动 ROS 核心

roscore

终端2:启动服务端

cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg add_two_ints_server.py

终端3:启动客户端

cd ~/catkin_ws
source devel/setup.bash
rosrun my_ros_pkg add_two_ints_client.py

或带参数调用

rosrun my_ros_pkg add_two_ints_client.py 10 20

第九步:命令行测试服务

终端4:用命令行测试

# 查看当前所有服务
rosservice list

# 查看服务类型
rosservice type /add_two_ints

# 查看服务定义
rossrv show my_ros_pkg/AddTwoInts

# 命令行调用服务
rosservice call /add_two_ints "a: 15
b: 25"

📁 第十步:创建 launch 文件

cd ~/catkin_ws/src/my_ros_pkg/launch
nano service_demo.launch
<launch>
    <!-- 启动服务端 -->
    <node pkg="my_ros_pkg" 
          type="add_two_ints_server.py" 
          name="add_two_ints_server" 
          output="screen"/>
    
    <!-- 启动客户端(示例调用) -->
    <node pkg="my_ros_pkg" 
          type="add_two_ints_client.py" 
          name="add_two_ints_client" 
          output="screen"
          args="7 8"/>
</launch>

运行launch文件(先roscore)

cd ~/catkin_ws
source devel/setup.bash
roslaunch my_ros_pkg service_demo.launch

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐