【ROS/ROS2与实时Linux系列】第八篇 ROS 2实时执行器:单线程、多线程与专用调度
摘要: ROS2执行器优化对机器人导航、自动驾驶等实时应用至关重要。执行器管理节点回调函数,其配置直接影响任务响应速度和系统性能。本文详解执行器的核心机制(单线程/多线程/自定义类型),指导搭建实时Linux环境(安装PREEMPT_RT内核和ROS2),并通过代码案例演示如何配置实时执行器(如自定义RealtimeExecutor)。关键实践包括优先级设置、CPU核心绑定及性能调试(如perf工
一、简介:为什么执行器优化对 ROS 2 至关重要?
在 ROS 2 应用中,无论是机器人导航、工业自动化控制还是自动驾驶,实时性都是关键需求。例如,自动驾驶车辆需要在极短时间内对传感器数据进行处理并做出决策,以确保安全。ROS 2 执行器负责管理节点的回调函数,其配置直接影响实时任务的执行效率和响应速度。掌握执行器优化,可以帮助开发者确保关键任务及时执行,提升系统的整体性能和可靠性。
二、核心概念:执行器的工作机制与术语
2.1 执行器(Executor)
-
定义:执行器是 ROS 2 中负责管理节点回调函数的组件。它决定何时调用回调函数,以及如何分配线程来执行这些回调。
-
类型:
-
单线程执行器(SingleThreadedExecutor):所有回调都在同一个线程中执行,适合简单的应用。
-
多线程执行器(MultiThreadedExecutor):使用多个线程来执行回调,适合需要高并发的应用。
-
自定义执行器:可以自定义线程池和调度策略,适合实时性要求高的应用。
-
2.2 回调函数(Callback)
-
定义:回调函数是节点中需要执行的函数,例如订阅者回调、定时器回调等。
-
特性:回调函数的执行顺序和时间由执行器决定。
2.3 调度策略(Scheduling Policy)
-
定义:调度策略决定回调函数的执行顺序和时间。
-
类型:
-
默认调度策略:由执行器默认配置。
-
实时调度策略:通过自定义执行器配置,确保关键任务及时执行。
-
三、环境准备:搭建实时 Linux 环境
3.1 硬件需求
-
CPU:多核处理器(建议至少 4 核)
-
内存:至少 4 GB RAM
-
存储:SSD 硬盘
3.2 软件需求
-
操作系统:Ubuntu 20.04 或更高版本(推荐使用实时内核)
-
开发工具:GCC、CMake、Git
-
ROS 2:ROS 2 Foxy 或更高版本
3.3 安装实时内核
-
安装实时内核(推荐使用 PREEMPT_RT 内核):
sudo apt update
sudo apt install linux-headers-$(uname -r) linux-image-$(uname -r)
sudo apt install linux-headers-$(uname -r)-realtime linux-image-$(uname -r)-realtime
-
重启并选择实时内核:
sudo reboot
重启后,选择实时内核启动。
3.4 安装 ROS 2
-
安装 ROS 2 Foxy:
sudo apt update && sudo apt install -y curl gnupg2 lsb-release
curl -s https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo apt-key add -
echo "deb [arch=$(dpkg --print-architecture)] http://packages.ros.org/ros2/ubuntu $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/ros2.list
sudo apt update && sudo apt install -y ros-foxy-desktop
source /opt/ros/foxy/setup.bash
四、应用场景:机器人实时控制
在机器人实时控制场景中,关键任务如传感器数据处理、运动控制算法需要在极短时间内完成,以确保机器人的响应速度和精度。通过优化 ROS 2 执行器,可以确保这些任务及时获得 CPU 资源,避免因调度延迟导致的控制误差。例如,在一个自动驾驶机器人项目中,传感器数据处理和路径规划算法需要在 10 毫秒内完成,以确保机器人的安全和效率。通过配置实时执行器,可以显著提升系统的实时性和稳定性。
五、实际案例与步骤:配置实时执行器
5.1 查看默认执行器
-
创建一个简单的 ROS 2 节点:
# file: talker.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Talker(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'topic', 10)
timer_period = 1.0 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher.publish(msg)
self.get_logger().info(f'Published: {msg.data}')
self.i += 1
def main(args=None):
rclpy.init(args=args)
talker = Talker()
rclpy.spin(talker)
talker.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
-
运行节点:
python3 talker.py
5.2 使用单线程执行器
-
修改节点代码,使用单线程执行器:
# file: talker_single_threaded.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.executors import SingleThreadedExecutor
class Talker(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'topic', 10)
timer_period = 1.0 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher.publish(msg)
self.get_logger().info(f'Published: {msg.data}')
self.i += 1
def main(args=None):
rclpy.init(args=args)
talker = Talker()
executor = SingleThreadedExecutor()
executor.add_node(talker)
executor.spin()
talker.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
-
运行节点:
python3 talker_single_threaded.py
5.3 使用多线程执行器
-
修改节点代码,使用多线程执行器:
# file: talker_multi_threaded.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.executors import MultiThreadedExecutor
class Talker(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'topic', 10)
timer_period = 1.0 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher.publish(msg)
self.get_logger().info(f'Published: {msg.data}')
self.i += 1
def main(args=None):
rclpy.init(args=args)
talker = Talker()
executor = MultiThreadedExecutor()
executor.add_node(talker)
executor.spin()
talker.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
-
运行节点:
python3 talker_multi_threaded.py
5.4 配置实时调度策略
-
创建自定义执行器:
# file: talker_realtime.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.executors import ExternalExecutor
import threading
class Talker(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'topic', 10)
timer_period = 1.0 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher.publish(msg)
self.get_logger().info(f'Published: {msg.data}')
self.i += 1
class RealtimeExecutor(ExternalExecutor):
def __init__(self):
super().__init__()
self.thread = threading.Thread(target
=self.spin)
def spin(self):
while rclpy.ok():
self.wait_for_work()
self.do_work()
def main(args=None): rclpy.init(args=args) talker = Talker() executor = RealtimeExecutor() executor.add_node(talker) executor.thread.start() executor.spin() talker.destroy_node() rclpy.shutdown()
if name == 'main': main()
2. **运行节点**:
```bash
python3 talker_realtime.py
六、常见问题与解答
6.1 如何确定任务是否需要实时调度?
-
问题:如何判断一个任务是否需要实时调度?
-
解答:实时任务通常需要在极短时间内完成,对延迟敏感。例如,传感器数据处理、运动控制算法等。可以通过任务的响应时间要求来判断是否需要实时调度。
6.2 如何设置实时优先级?
-
问题:如何设置实时任务的优先级?
-
解答:可以使用
chrt命令或在代码中调用sched_setscheduler函数。例如,chrt -f -p 99 <pid>将进程设置为 SCHED_FIFO,优先级为 99。
6.3 如何验证调度策略是否生效?
-
问题:如何验证调度策略是否生效?
-
解答:可以使用
top -H -p <pid>查看任务的调度情况。如果任务的优先级和调度策略正确设置,任务将按预期运行。
6.4 如何调整 CFS 调度器参数?
-
问题:如何调整 CFS 调度器参数?
-
解答:可以通过写入
/sys/kernel/mm/cfs/rt_runtime_us和/sys/kernel/mm/cfs/rt_period_us文件来调整 CFS 的时间片和调度周期。
七、实践建议与最佳实践
7.1 调试技巧
-
使用
strace跟踪系统调用:
strace -p <pid>
-
使用
perf分析性能:
perf record -g -p <pid>
perf report
7.2 性能优化
-
减少上下文切换:尽量减少实时任务的上下文切换,提高任务的连续运行时间。
-
合理分配 CPU 核心:使用
taskset命令将实时任务固定在特定的 CPU 核心上,减少 CPU 亲和性切换带来的延迟。
7.3 常见错误的解决方案
-
实时任务被挂起:检查任务的优先级是否过高,导致其他任务无法运行。适当调整优先级。
-
任务响应时间过长:检查任务是否被其他高优先级任务抢占,调整任务的调度策略。
八、总结与应用场景
通过本文的介绍,我们深入讲解了 ROS 2 执行器的工作机制,包括默认单线程、多线程执行器,以及如何构建或配置具有实时调度策略的定制化执行器。掌握这些技能,可以帮助开发者确保关键任务及时执行,提升系统的整体性能和可靠性。
在实际应用中,例如机器人实时控制、自动驾驶、工业自动化等场景,通过优化 ROS 2 执行器,可以显著提升系统的实时性和稳定性。希望本文能够帮助读者在实际项目中应用所学知识,优化系统性能,确保任务的高效执行。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)