Action Client,按顺序巡航到几个目标点
本文介绍了一个基于ROS2 ActionClient实现多目标点顺序巡航的解决方案。该方案通过Nav2的NavigateToPose Action接口实现机器人导航功能,主要特点包括:1) 顺序执行预设目标点,支持失败自动重试机制;2) 实时输出导航过程中的反馈信息;3) 通过ESC按键触发取消机制。代码实现包含目标点生成、Action客户端创建、反馈回调处理、取消请求发送等核心模块,并采用多线程
·
前置知识与环境说明
1. 核心依赖
- ROS2 版本:以 Humble 为例(其他版本仅需微调导入路径);
- Action 接口:使用 Nav2 的
nav2_msgs/action/NavigateToPose(最常用的导航 Action,若你用自定义 Action,核心逻辑完全通用); - 取消机制:选择 按键触发(最直观,新手易调试),依赖
pynput库监听键盘按键。
2. 专业术语解释
- Action Client:Action 客户端,向 Action Server 发送目标请求、接收反馈和结果;
- Feedback:Action 执行过程中的实时状态反馈(如当前机器人位姿、距离目标点的距离);
- Goal:Action 的目标请求(这里是每个巡航目标点的位姿);
- Result:Action 执行完成后的最终结果(成功 / 失败);
- Cancel Request:取消 Action 执行的请求。
完整代码实现
步骤 1:安装依赖
bash
运行
# 安装 nav2 相关依赖(若未安装)
sudo apt install ros-humble-nav2-msgs
# 安装按键监听库
pip install pynput
步骤 2:完整代码
python
运行
#!/usr/bin/env python3
"""
ROS2 Action Client 实现多目标点顺序巡航
功能:
1. 按顺序执行目标点,等待结果,失败自动重试
2. 实时输出 Action Feedback
3. 按键(ESC)触发取消机制
"""
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped
from pynput import keyboard
import threading
import time
# 全局变量:标记是否触发取消
CANCEL_FLAG = False
class CruiseActionClient(Node):
def __init__(self):
super().__init__('cruise_action_client')
# 1. 创建 Action Client,指定 Action 类型和服务名(Nav2 标准服务名)
self._action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
# 2. 配置巡航参数
self.target_poses = self._generate_target_poses() # 巡航目标点列表
self.retry_max = 3 # 单个目标点最大重试次数
self.current_target_idx = 0 # 当前执行的目标点索引
# 3. 启动按键监听线程(非阻塞)
self._start_key_listener()
def _generate_target_poses(self):
"""生成示例巡航目标点(可替换为实际坐标)"""
target_list = []
# 目标点1:(x=1.0, y=0.0, z=0.0),朝向正前方
pose1 = PoseStamped()
pose1.header.frame_id = 'map' # 基于地图坐标系
pose1.pose.position.x = 1.0
pose1.pose.position.y = 0.0
pose1.pose.position.z = 0.0
pose1.pose.orientation.w = 1.0 # 四元数,无旋转
target_list.append(pose1)
# 目标点2:(x=1.0, y=1.0, z=0.0)
pose2 = PoseStamped()
pose2.header.frame_id = 'map'
pose2.pose.position.x = 1.0
pose2.pose.position.y = 1.0
pose2.pose.position.z = 0.0
pose2.pose.orientation.w = 1.0
target_list.append(pose2)
# 目标点3:(x=0.0, y=1.0, z=0.0)
pose3 = PoseStamped()
pose3.header.frame_id = 'map'
pose3.pose.position.x = 0.0
pose3.pose.position.y = 1.0
pose3.pose.position.z = 0.0
pose3.pose.orientation.w = 1.0
target_list.append(pose3)
return target_list
def _start_key_listener(self):
"""启动按键监听线程,ESC键触发取消"""
def on_press(key):
global CANCEL_FLAG
try:
if key == keyboard.Key.esc: # 监听ESC键
CANCEL_FLAG = True
self.get_logger().warn('检测到ESC键,触发取消巡航!')
return False # 停止监听
except Exception as e:
self.get_logger().error(f'按键监听异常: {e}')
# 启动线程(避免阻塞主线程)
listener_thread = threading.Thread(
target=lambda: keyboard.Listener(on_press=on_press).start()
)
listener_thread.daemon = True # 主线程退出时,子线程自动退出
listener_thread.start()
self.get_logger().info('按键监听已启动,按ESC键可取消巡航')
def _send_goal(self, target_pose, retry_count=0):
"""发送单个目标点请求,包含重试逻辑"""
# 检查取消标志
if CANCEL_FLAG:
self.get_logger().warn('取消标志已触发,终止当前目标点执行')
return False
# 1. 等待 Action Server 上线
self.get_logger().info(f'等待导航Action Server上线...')
if not self._action_client.wait_for_server(timeout_sec=10.0):
self.get_logger().error('Action Server 未上线,无法发送目标点')
return False
# 2. 构造 Action Goal
goal_msg = NavigateToPose.Goal()
goal_msg.pose = target_pose
# 3. 发送目标,并注册反馈回调、结果回调
self.get_logger().info(
f'发送目标点 {self.current_target_idx+1}/{len(self.target_poses)}: '
f'x={target_pose.pose.position.x}, y={target_pose.pose.position.y}'
)
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self._feedback_callback # 反馈回调
)
# 等待发送结果(阻塞,直到服务器接收目标)
self._send_goal_future.add_done_callback(self._goal_response_callback)
# 4. 等待 Action 执行完成(阻塞,直到拿到结果)
rclpy.spin_until_future_complete(self, self._send_goal_future)
goal_handle = self._send_goal_future.result()
if not goal_handle.accepted:
self.get_logger().error(f'目标点 {self.current_target_idx+1} 被服务器拒绝')
# 重试逻辑
if retry_count < self.retry_max:
self.get_logger().info(f'开始第 {retry_count+1} 次重试...')
time.sleep(1) # 重试前等待1秒
return self._send_goal(target_pose, retry_count+1)
else:
self.get_logger().error(f'目标点 {self.current_target_idx+1} 重试{self.retry_max}次失败,放弃')
return False
# 5. 等待最终结果
self._get_result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, self._get_result_future)
result = self._get_result_future.result().result
# 6. 处理结果
if result.result == 'success': # Nav2 的 result 字段为字符串
self.get_logger().info(f'目标点 {self.current_target_idx+1} 执行成功!')
return True
else:
self.get_logger().error(f'目标点 {self.current_target_idx+1} 执行失败,结果: {result.result}')
# 重试逻辑
if retry_count < self.retry_max:
self.get_logger().info(f'开始第 {retry_count+1} 次重试...')
time.sleep(1)
return self._send_goal(target_pose, retry_count+1)
else:
self.get_logger().error(f'目标点 {self.current_target_idx+1} 重试{self.retry_max}次失败,放弃')
return False
def _feedback_callback(self, feedback_msg):
"""Action 反馈回调函数:实时输出反馈信息"""
feedback = feedback_msg.feedback
# Nav2 的 NavigateToPose Feedback 包含当前机器人位姿、距离目标点的距离等
current_pose = feedback.current_pose.pose
distance_remaining = feedback.distance_remaining # 剩余距离(米)
# 输出反馈(新手友好的格式化输出)
self.get_logger().info(
f'【实时反馈】当前位置: x={current_pose.position.x:.2f}, y={current_pose.position.y:.2f} | '
f'距离目标点剩余: {distance_remaining:.2f} 米'
)
# 【替代方案说明】若当前 Action 接口无 feedback 字段:
# 1. 订阅机器人位姿话题(如 /amcl_pose),实时获取位置;
# 2. 手动计算当前位置与目标点的距离,作为自定义反馈;
# 3. 示例代码(可替换上述反馈输出):
# target_pose = self.target_poses[self.current_target_idx].pose
# dx = target_pose.position.x - current_pose.position.x
# dy = target_pose.position.y - current_pose.position.y
# distance = (dx**2 + dy**2)**0.5
# self.get_logger().info(f'自定义反馈:剩余距离 {distance:.2f} 米')
def _goal_response_callback(self, future):
"""目标请求响应回调:确认服务器是否接收目标"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warn(f'目标点 {self.current_target_idx+1} 请求被服务器拒绝')
return
self.get_logger().info(f'目标点 {self.current_target_idx+1} 请求已被服务器接受,开始执行')
def cancel_current_goal(self):
"""取消当前正在执行的目标点"""
if self._action_client.is_server_available():
# 发送取消请求
cancel_future = self._action_client.cancel_goal_async()
rclpy.spin_until_future_complete(self, cancel_future)
if cancel_future.result().success:
self.get_logger().info('当前目标点已成功取消')
else:
self.get_logger().error('取消当前目标点失败')
else:
self.get_logger().warn('Action Server 不可用,无法取消')
def run_cruise(self):
"""执行顺序巡航主逻辑"""
self.get_logger().info(f'开始执行巡航任务,共 {len(self.target_poses)} 个目标点')
while self.current_target_idx < len(self.target_poses):
# 检查取消标志
if CANCEL_FLAG:
self.cancel_current_goal()
self.get_logger().error('巡航任务已被取消,退出')
break
# 执行当前目标点
current_pose = self.target_poses[self.current_target_idx]
success = self._send_goal(current_pose)
# 处理执行结果
if success:
self.current_target_idx += 1 # 执行成功,切换下一个目标点
else:
self.get_logger().error('当前目标点执行失败,终止整个巡航任务')
break
# 巡航完成/取消后的收尾
if not CANCEL_FLAG and self.current_target_idx == len(self.target_poses):
self.get_logger().info('所有目标点执行完成,巡航任务结束!')
else:
self.get_logger().info('巡航任务已终止')
def main(args=None):
# 初始化 ROS2
rclpy.init(args=args)
# 创建客户端节点
cruise_client = CruiseActionClient()
# 使用多线程执行器(避免按键监听阻塞 Action 回调)
executor = MultiThreadedExecutor()
executor.add_node(cruise_client)
try:
# 启动巡航任务
cruise_client.run_cruise()
except KeyboardInterrupt:
cruise_client.get_logger().warn('检测到键盘中断(Ctrl+C)')
finally:
# 清理资源
cruise_client.cancel_current_goal()
cruise_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
核心功能详细解释
1. 顺序巡航 + 失败重试
实现逻辑
- 目标点列表:通过
_generate_target_poses生成待巡航的目标点(可替换为实际坐标); - 顺序执行:通过
current_target_idx标记当前执行的目标点,执行成功后索引 + 1; - 等待结果:使用
rclpy.spin_until_future_complete阻塞等待 Action 结果(确保前一个目标点完成后再执行下一个); - 失败重试:
_send_goal函数接收retry_count参数,失败时若未达到retry_max(默认 3 次),则递归调用自身重试,重试前等待 1 秒避免频繁请求。
关键代码段
python
运行
# 重试逻辑核心
if retry_count < self.retry_max:
self.get_logger().info(f'开始第 {retry_count+1} 次重试...')
time.sleep(1)
return self._send_goal(target_pose, retry_count+1)
else:
self.get_logger().error(f'目标点 {self.current_target_idx+1} 重试{self.retry_max}次失败,放弃')
return False
2. Action Feedback 输出
核心实现
- 通过
send_goal_async的feedback_callback参数注册反馈回调函数_feedback_callback; - Nav2 的
NavigateToPoseFeedback 包含current_pose(当前位姿)、distance_remaining(剩余距离)等关键字段,直接格式化输出即可。
替代方案(若 feedback 字段不可用)
如果使用的 Action 接口无内置 feedback,可通过以下方式实现:
- 订阅机器人位姿话题(如
/amcl_pose或/odom); - 在客户端中维护当前目标点的坐标;
- 实时计算当前位姿与目标点的欧式距离,作为自定义反馈输出(代码中已注释示例)。
3. 取消机制(ESC 键触发)
实现思路
- 非阻塞监听:启动独立线程运行
pynput的按键监听器,避免阻塞 ROS2 主线程; - 全局标志位:用
CANCEL_FLAG标记是否触发取消,在巡航主逻辑和目标点发送逻辑中实时检查; - 取消执行:检测到 ESC 键后,调用
cancel_current_goal发送取消请求给 Action Server,并终止后续目标点执行。
关键细节
- 线程设置为
daemon=True:主线程退出时,监听线程自动退出,避免资源泄漏; - 取消请求通过
action_client.cancel_goal_async()发送,符合 ROS2 Action 标准流程。
代码运行与测试
1. 运行前准备
- 确保 Nav2 环境已启动(或替换为你的 Action Server):
bash
运行
ros2 launch nav2_bringup tb3_simulation_launch.py # 以海龟机器人仿真为例 - 将代码保存为
cruise_action_client.py,添加可执行权限:bash
运行
chmod +x cruise_action_client.py
2. 运行代码
bash
运行
ros2 run <你的包名> cruise_action_client.py
3. 测试效果
- 控制台会输出目标点发送状态、实时反馈(当前位置 + 剩余距离);
- 若某个目标点执行失败,会自动重试(最多 3 次);
- 按 ESC 键可立即触发取消,终止当前目标点和后续巡航。
总结(核心关键点回顾)
- 顺序巡航 + 重试:通过索引标记当前目标点,阻塞等待结果,失败时递归重试(可自定义重试次数);
- Feedback 输出:优先使用 Action 内置 feedback 回调,无内置字段时可订阅位姿话题自定义计算;
- 取消机制:按键监听线程 + 全局标志位,触发后调用
cancel_goal_async取消当前目标,终止巡航。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)