前置知识与环境说明

1. 核心依赖

  • ROS2 版本:以 Humble 为例(其他版本仅需微调导入路径);
  • Action 接口:使用 Nav2 的 nav2_msgs/action/NavigateToPose(最常用的导航 Action,若你用自定义 Action,核心逻辑完全通用);
  • 取消机制:选择 按键触发(最直观,新手易调试),依赖 pynput 库监听键盘按键。

2. 专业术语解释

  • Action Client:Action 客户端,向 Action Server 发送目标请求、接收反馈和结果;
  • Feedback:Action 执行过程中的实时状态反馈(如当前机器人位姿、距离目标点的距离);
  • Goal:Action 的目标请求(这里是每个巡航目标点的位姿);
  • Result:Action 执行完成后的最终结果(成功 / 失败);
  • Cancel Request:取消 Action 执行的请求。

完整代码实现

步骤 1:安装依赖

bash

运行

# 安装 nav2 相关依赖(若未安装)
sudo apt install ros-humble-nav2-msgs
# 安装按键监听库
pip install pynput

步骤 2:完整代码

python

运行

#!/usr/bin/env python3
"""
ROS2 Action Client 实现多目标点顺序巡航
功能:
1. 按顺序执行目标点,等待结果,失败自动重试
2. 实时输出 Action Feedback
3. 按键(ESC)触发取消机制
"""
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped
from pynput import keyboard
import threading
import time

# 全局变量:标记是否触发取消
CANCEL_FLAG = False

class CruiseActionClient(Node):
    def __init__(self):
        super().__init__('cruise_action_client')
        # 1. 创建 Action Client,指定 Action 类型和服务名(Nav2 标准服务名)
        self._action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
        
        # 2. 配置巡航参数
        self.target_poses = self._generate_target_poses()  # 巡航目标点列表
        self.retry_max = 3  # 单个目标点最大重试次数
        self.current_target_idx = 0  # 当前执行的目标点索引

        # 3. 启动按键监听线程(非阻塞)
        self._start_key_listener()

    def _generate_target_poses(self):
        """生成示例巡航目标点(可替换为实际坐标)"""
        target_list = []
        # 目标点1:(x=1.0, y=0.0, z=0.0),朝向正前方
        pose1 = PoseStamped()
        pose1.header.frame_id = 'map'  # 基于地图坐标系
        pose1.pose.position.x = 1.0
        pose1.pose.position.y = 0.0
        pose1.pose.position.z = 0.0
        pose1.pose.orientation.w = 1.0  # 四元数,无旋转
        target_list.append(pose1)

        # 目标点2:(x=1.0, y=1.0, z=0.0)
        pose2 = PoseStamped()
        pose2.header.frame_id = 'map'
        pose2.pose.position.x = 1.0
        pose2.pose.position.y = 1.0
        pose2.pose.position.z = 0.0
        pose2.pose.orientation.w = 1.0
        target_list.append(pose2)

        # 目标点3:(x=0.0, y=1.0, z=0.0)
        pose3 = PoseStamped()
        pose3.header.frame_id = 'map'
        pose3.pose.position.x = 0.0
        pose3.pose.position.y = 1.0
        pose3.pose.position.z = 0.0
        pose3.pose.orientation.w = 1.0
        target_list.append(pose3)
        return target_list

    def _start_key_listener(self):
        """启动按键监听线程,ESC键触发取消"""
        def on_press(key):
            global CANCEL_FLAG
            try:
                if key == keyboard.Key.esc:  # 监听ESC键
                    CANCEL_FLAG = True
                    self.get_logger().warn('检测到ESC键,触发取消巡航!')
                    return False  # 停止监听
            except Exception as e:
                self.get_logger().error(f'按键监听异常: {e}')

        # 启动线程(避免阻塞主线程)
        listener_thread = threading.Thread(
            target=lambda: keyboard.Listener(on_press=on_press).start()
        )
        listener_thread.daemon = True  # 主线程退出时,子线程自动退出
        listener_thread.start()
        self.get_logger().info('按键监听已启动,按ESC键可取消巡航')

    def _send_goal(self, target_pose, retry_count=0):
        """发送单个目标点请求,包含重试逻辑"""
        # 检查取消标志
        if CANCEL_FLAG:
            self.get_logger().warn('取消标志已触发,终止当前目标点执行')
            return False

        # 1. 等待 Action Server 上线
        self.get_logger().info(f'等待导航Action Server上线...')
        if not self._action_client.wait_for_server(timeout_sec=10.0):
            self.get_logger().error('Action Server 未上线,无法发送目标点')
            return False

        # 2. 构造 Action Goal
        goal_msg = NavigateToPose.Goal()
        goal_msg.pose = target_pose

        # 3. 发送目标,并注册反馈回调、结果回调
        self.get_logger().info(
            f'发送目标点 {self.current_target_idx+1}/{len(self.target_poses)}: '
            f'x={target_pose.pose.position.x}, y={target_pose.pose.position.y}'
        )
        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self._feedback_callback  # 反馈回调
        )
        # 等待发送结果(阻塞,直到服务器接收目标)
        self._send_goal_future.add_done_callback(self._goal_response_callback)

        # 4. 等待 Action 执行完成(阻塞,直到拿到结果)
        rclpy.spin_until_future_complete(self, self._send_goal_future)
        goal_handle = self._send_goal_future.result()

        if not goal_handle.accepted:
            self.get_logger().error(f'目标点 {self.current_target_idx+1} 被服务器拒绝')
            # 重试逻辑
            if retry_count < self.retry_max:
                self.get_logger().info(f'开始第 {retry_count+1} 次重试...')
                time.sleep(1)  # 重试前等待1秒
                return self._send_goal(target_pose, retry_count+1)
            else:
                self.get_logger().error(f'目标点 {self.current_target_idx+1} 重试{self.retry_max}次失败,放弃')
                return False

        # 5. 等待最终结果
        self._get_result_future = goal_handle.get_result_async()
        rclpy.spin_until_future_complete(self, self._get_result_future)
        result = self._get_result_future.result().result

        # 6. 处理结果
        if result.result == 'success':  # Nav2 的 result 字段为字符串
            self.get_logger().info(f'目标点 {self.current_target_idx+1} 执行成功!')
            return True
        else:
            self.get_logger().error(f'目标点 {self.current_target_idx+1} 执行失败,结果: {result.result}')
            # 重试逻辑
            if retry_count < self.retry_max:
                self.get_logger().info(f'开始第 {retry_count+1} 次重试...')
                time.sleep(1)
                return self._send_goal(target_pose, retry_count+1)
            else:
                self.get_logger().error(f'目标点 {self.current_target_idx+1} 重试{self.retry_max}次失败,放弃')
                return False

    def _feedback_callback(self, feedback_msg):
        """Action 反馈回调函数:实时输出反馈信息"""
        feedback = feedback_msg.feedback
        # Nav2 的 NavigateToPose Feedback 包含当前机器人位姿、距离目标点的距离等
        current_pose = feedback.current_pose.pose
        distance_remaining = feedback.distance_remaining  # 剩余距离(米)
        # 输出反馈(新手友好的格式化输出)
        self.get_logger().info(
            f'【实时反馈】当前位置: x={current_pose.position.x:.2f}, y={current_pose.position.y:.2f} | '
            f'距离目标点剩余: {distance_remaining:.2f} 米'
        )

        # 【替代方案说明】若当前 Action 接口无 feedback 字段:
        # 1. 订阅机器人位姿话题(如 /amcl_pose),实时获取位置;
        # 2. 手动计算当前位置与目标点的距离,作为自定义反馈;
        # 3. 示例代码(可替换上述反馈输出):
        # target_pose = self.target_poses[self.current_target_idx].pose
        # dx = target_pose.position.x - current_pose.position.x
        # dy = target_pose.position.y - current_pose.position.y
        # distance = (dx**2 + dy**2)**0.5
        # self.get_logger().info(f'自定义反馈:剩余距离 {distance:.2f} 米')

    def _goal_response_callback(self, future):
        """目标请求响应回调:确认服务器是否接收目标"""
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().warn(f'目标点 {self.current_target_idx+1} 请求被服务器拒绝')
            return
        self.get_logger().info(f'目标点 {self.current_target_idx+1} 请求已被服务器接受,开始执行')

    def cancel_current_goal(self):
        """取消当前正在执行的目标点"""
        if self._action_client.is_server_available():
            # 发送取消请求
            cancel_future = self._action_client.cancel_goal_async()
            rclpy.spin_until_future_complete(self, cancel_future)
            if cancel_future.result().success:
                self.get_logger().info('当前目标点已成功取消')
            else:
                self.get_logger().error('取消当前目标点失败')
        else:
            self.get_logger().warn('Action Server 不可用,无法取消')

    def run_cruise(self):
        """执行顺序巡航主逻辑"""
        self.get_logger().info(f'开始执行巡航任务,共 {len(self.target_poses)} 个目标点')
        while self.current_target_idx < len(self.target_poses):
            # 检查取消标志
            if CANCEL_FLAG:
                self.cancel_current_goal()
                self.get_logger().error('巡航任务已被取消,退出')
                break

            # 执行当前目标点
            current_pose = self.target_poses[self.current_target_idx]
            success = self._send_goal(current_pose)

            # 处理执行结果
            if success:
                self.current_target_idx += 1  # 执行成功,切换下一个目标点
            else:
                self.get_logger().error('当前目标点执行失败,终止整个巡航任务')
                break

        # 巡航完成/取消后的收尾
        if not CANCEL_FLAG and self.current_target_idx == len(self.target_poses):
            self.get_logger().info('所有目标点执行完成,巡航任务结束!')
        else:
            self.get_logger().info('巡航任务已终止')

def main(args=None):
    # 初始化 ROS2
    rclpy.init(args=args)
    # 创建客户端节点
    cruise_client = CruiseActionClient()
    # 使用多线程执行器(避免按键监听阻塞 Action 回调)
    executor = MultiThreadedExecutor()
    executor.add_node(cruise_client)

    try:
        # 启动巡航任务
        cruise_client.run_cruise()
    except KeyboardInterrupt:
        cruise_client.get_logger().warn('检测到键盘中断(Ctrl+C)')
    finally:
        # 清理资源
        cruise_client.cancel_current_goal()
        cruise_client.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

核心功能详细解释

1. 顺序巡航 + 失败重试

实现逻辑
  • 目标点列表:通过 _generate_target_poses 生成待巡航的目标点(可替换为实际坐标);
  • 顺序执行:通过 current_target_idx 标记当前执行的目标点,执行成功后索引 + 1;
  • 等待结果:使用 rclpy.spin_until_future_complete 阻塞等待 Action 结果(确保前一个目标点完成后再执行下一个);
  • 失败重试_send_goal 函数接收 retry_count 参数,失败时若未达到 retry_max(默认 3 次),则递归调用自身重试,重试前等待 1 秒避免频繁请求。
关键代码段

python

运行

# 重试逻辑核心
if retry_count < self.retry_max:
    self.get_logger().info(f'开始第 {retry_count+1} 次重试...')
    time.sleep(1)
    return self._send_goal(target_pose, retry_count+1)
else:
    self.get_logger().error(f'目标点 {self.current_target_idx+1} 重试{self.retry_max}次失败,放弃')
    return False

2. Action Feedback 输出

核心实现
  • 通过 send_goal_asyncfeedback_callback 参数注册反馈回调函数 _feedback_callback
  • Nav2 的 NavigateToPose Feedback 包含 current_pose(当前位姿)、distance_remaining(剩余距离)等关键字段,直接格式化输出即可。
替代方案(若 feedback 字段不可用)

如果使用的 Action 接口无内置 feedback,可通过以下方式实现:

  1. 订阅机器人位姿话题(如 /amcl_pose/odom);
  2. 在客户端中维护当前目标点的坐标;
  3. 实时计算当前位姿与目标点的欧式距离,作为自定义反馈输出(代码中已注释示例)。

3. 取消机制(ESC 键触发)

实现思路
  1. 非阻塞监听:启动独立线程运行 pynput 的按键监听器,避免阻塞 ROS2 主线程;
  2. 全局标志位:用 CANCEL_FLAG 标记是否触发取消,在巡航主逻辑和目标点发送逻辑中实时检查;
  3. 取消执行:检测到 ESC 键后,调用 cancel_current_goal 发送取消请求给 Action Server,并终止后续目标点执行。
关键细节
  • 线程设置为 daemon=True:主线程退出时,监听线程自动退出,避免资源泄漏;
  • 取消请求通过 action_client.cancel_goal_async() 发送,符合 ROS2 Action 标准流程。

代码运行与测试

1. 运行前准备

  • 确保 Nav2 环境已启动(或替换为你的 Action Server):

    bash

    运行

    ros2 launch nav2_bringup tb3_simulation_launch.py  # 以海龟机器人仿真为例
    
  • 将代码保存为 cruise_action_client.py,添加可执行权限:

    bash

    运行

    chmod +x cruise_action_client.py
    

2. 运行代码

bash

运行

ros2 run <你的包名> cruise_action_client.py

3. 测试效果

  • 控制台会输出目标点发送状态、实时反馈(当前位置 + 剩余距离);
  • 若某个目标点执行失败,会自动重试(最多 3 次);
  • 按 ESC 键可立即触发取消,终止当前目标点和后续巡航。

总结(核心关键点回顾)

  1. 顺序巡航 + 重试:通过索引标记当前目标点,阻塞等待结果,失败时递归重试(可自定义重试次数);
  2. Feedback 输出:优先使用 Action 内置 feedback 回调,无内置字段时可订阅位姿话题自定义计算;
  3. 取消机制:按键监听线程 + 全局标志位,触发后调用 cancel_goal_async 取消当前目标,终止巡航。
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐