ROS 2从入门到精通系列(十九):多机器人协作 - 分布式系统与协调
·
ROS 2从入门到精通系列(十九):多机器人协作 - 分布式系统与协调
构建多机器人协作系统,实现分布式任务执行和协调。
引言
单个机器人的能力有限,多个机器人协作可以实现:
- 任务分解:复杂任务分配给多个机器人
- 并行执行:同时完成多个子任务
- 协调运动:避免碰撞和死锁
- 资源共享:共享传感器和计算资源
本篇将展示如何设计和实现一个多机器人协作系统。
一、多机器人系统架构
1.1 系统架构设计
1.2 通信拓扑
星型拓扑(中心协调):
环形拓扑(无中心):
混合拓扑(分层):
二、分布式任务管理
2.1 任务定义和分配
#!/usr/bin/env python3
"""
分布式任务管理系统
"""
from enum import Enum
from dataclasses import dataclass
import uuid
from typing import List, Dict
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
ASSIGNED = "assigned"
EXECUTING = "executing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Task:
"""任务定义"""
task_id: str
task_type: str # navigation, manipulation, inspection等
priority: int # 优先级 1-10
status: TaskStatus
assigned_robot: str = None
start_time: float = None
estimated_duration: float = None
goal_pose: tuple = None # (x, y, z)
class TaskManager:
"""集中式任务管理器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.robot_capacities = {}
self.robot_status = {}
def create_task(self, task_type: str, priority: int,
goal_pose: tuple = None) -> Task:
"""创建新任务"""
task = Task(
task_id=str(uuid.uuid4()),
task_type=task_type,
priority=priority,
status=TaskStatus.PENDING,
goal_pose=goal_pose
)
self.tasks[task.task_id] = task
return task
def assign_task(self, task_id: str, robot_id: str) -> bool:
"""分配任务给机器人"""
if task_id not in self.tasks:
return False
task = self.tasks[task_id]
# 检查机器人是否可用
if not self.is_robot_available(robot_id):
return False
# 检查机器人能力
if not self.robot_can_handle(robot_id, task):
return False
task.assigned_robot = robot_id
task.status = TaskStatus.ASSIGNED
self.robot_status[robot_id] = "busy"
return True
def is_robot_available(self, robot_id: str) -> bool:
"""检查机器人是否可用"""
return self.robot_status.get(robot_id) == "idle"
def robot_can_handle(self, robot_id: str, task: Task) -> bool:
"""检查机器人是否能执行任务"""
capabilities = self.robot_capacities.get(robot_id, [])
return task.task_type in capabilities
def update_task_status(self, task_id: str, new_status: TaskStatus):
"""更新任务状态"""
if task_id in self.tasks:
self.tasks[task_id].status = new_status
if new_status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
robot_id = self.tasks[task_id].assigned_robot
if robot_id:
self.robot_status[robot_id] = "idle"
def get_pending_tasks(self) -> List[Task]:
"""获取待分配的任务(按优先级排序)"""
pending = [t for t in self.tasks.values()
if t.status == TaskStatus.PENDING]
return sorted(pending, key=lambda t: t.priority, reverse=True)
class DistributedTaskExecutor:
"""分布式任务执行器"""
def __init__(self, robot_id: str):
self.robot_id = robot_id
self.current_task: Task = None
self.task_queue: List[Task] = []
def accept_task(self, task: Task):
"""接受分配的任务"""
self.task_queue.append(task)
self.task_queue.sort(key=lambda t: t.priority, reverse=True)
def execute_current_task(self) -> bool:
"""执行当前任务"""
if not self.task_queue:
return False
self.current_task = self.task_queue.pop(0)
# 执行任务
try:
success = self.execute_task_impl(self.current_task)
return success
except Exception as e:
print(f"任务执行失败: {e}")
return False
def execute_task_impl(self, task: Task) -> bool:
"""实际任务执行(由子类重写)"""
if task.task_type == "navigation":
return self.navigate_to(task.goal_pose)
elif task.task_type == "manipulation":
return self.manipulate(task)
return False
def navigate_to(self, goal: tuple) -> bool:
"""导航到目标位置"""
# 实现具体的导航逻辑
print(f"[{self.robot_id}] 导航到 {goal}")
return True
def manipulate(self, task: Task) -> bool:
"""执行操作任务"""
print(f"[{self.robot_id}] 执行操作任务")
return True
三、协调和冲突解决
3.1 碰撞检测和避免
import math
from typing import Tuple
class CollisionAvoidanceManager:
"""碰撞检测和避免管理器"""
def __init__(self, robot_radius: float = 0.2,
safety_distance: float = 0.1):
self.robot_radius = robot_radius
self.safety_distance = safety_distance
self.robot_positions: Dict[str, Tuple[float, float]] = {}
def update_robot_position(self, robot_id: str, x: float, y: float):
"""更新机器人位置"""
self.robot_positions[robot_id] = (x, y)
def check_collision(self, robot_id: str, x: float, y: float) -> bool:
"""检查是否会碰撞"""
for other_id, (other_x, other_y) in self.robot_positions.items():
if other_id == robot_id:
continue
# 计算距离
distance = math.sqrt((x - other_x)**2 + (y - other_y)**2)
# 检查安全距离
min_distance = 2 * self.robot_radius + self.safety_distance
if distance < min_distance:
return True
return False
def plan_avoid_path(self, robot_id: str, goal: Tuple[float, float],
current: Tuple[float, float]) -> List[Tuple[float, float]]:
"""规划避碰路径"""
# 使用势场法或RRT避碰
path = [current, goal]
# 检查直线路径是否安全
if not self.is_path_safe(robot_id, path):
# 生成备选路径
path = self.generate_alternative_path(
robot_id, current, goal)
return path
def is_path_safe(self, robot_id: str, path: List[Tuple[float, float]]) -> bool:
"""检查路径是否安全"""
for x, y in path:
if self.check_collision(robot_id, x, y):
return False
return True
def generate_alternative_path(self, robot_id: str,
start: Tuple[float, float],
goal: Tuple[float, float]
) -> List[Tuple[float, float]]:
"""生成替代路径"""
# 简单实现:绕过障碍物
mid_x = (start[0] + goal[0]) / 2 + 0.5
mid_y = (start[1] + goal[1]) / 2
return [start, (mid_x, mid_y), goal]
3.2 协调算法 - 匈牙利算法任务分配
from scipy.optimize import linear_sum_assignment
import numpy as np
class HungarianAssigner:
"""使用匈牙利算法的任务分配"""
def __init__(self, robots: List[str], tasks: List[Task]):
self.robots = robots
self.tasks = tasks
def compute_cost_matrix(self) -> np.ndarray:
"""计算成本矩阵"""
n_robots = len(self.robots)
n_tasks = len(self.tasks)
cost_matrix = np.zeros((n_robots, n_tasks))
for i, robot in enumerate(self.robots):
for j, task in enumerate(self.tasks):
cost = self.compute_cost(robot, task)
cost_matrix[i, j] = cost
return cost_matrix
def compute_cost(self, robot_id: str, task: Task) -> float:
"""计算单个分配成本"""
cost = 0.0
# 距离成本
distance = self.estimate_distance(robot_id, task.goal_pose)
cost += distance
# 能力匹配成本
if not self.robot_can_handle(robot_id, task):
cost += 1000 # 很高的成本
# 优先级成本(优先级高的任务成本低)
cost -= task.priority / 10
return cost
def assign_optimal(self) -> Dict[str, Task]:
"""计算最优分配"""
cost_matrix = self.compute_cost_matrix()
# 使用匈牙利算法
robot_idx, task_idx = linear_sum_assignment(cost_matrix)
assignment = {}
for r_idx, t_idx in zip(robot_idx, task_idx):
assignment[self.robots[r_idx]] = self.tasks[t_idx]
return assignment
def estimate_distance(self, robot_id: str,
goal: Tuple[float, float]) -> float:
"""估计距离(简化版)"""
# 从某个位置信息服务获取当前位置
return 0.0
def robot_can_handle(self, robot_id: str, task: Task) -> bool:
"""检查机器人能力"""
return True
四、ROS2分布式节点实现
4.1 协调器节点
#!/usr/bin/env python3
"""
多机器人协调器节点
"""
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from std_msgs.msg import String
from geometry_msgs.msg import PoseArray
import json
from typing import Dict, List
class RobotCoordinator(Node):
"""机器人协调器"""
def __init__(self):
super().__init__('robot_coordinator')
# 初始化
self.robots: Dict[str, Dict] = {}
self.task_manager = TaskManager()
# 发布者
self.task_publisher = self.create_publisher(
String, '/task_assignments', 10)
# 订阅者
self.robot_status_subscriber = self.create_subscription(
String, '/robot_status', self.robot_status_callback, 10)
# 定时器
self.coordinator_timer = self.create_timer(
1.0, self.coordination_loop)
# 服务
self.create_service(
String, '/assign_task', self.assign_task_service)
self.get_logger().info('协调器已启动')
def robot_status_callback(self, msg):
"""接收机器人状态"""
try:
status = json.loads(msg.data)
robot_id = status['robot_id']
self.robots[robot_id] = status
except Exception as e:
self.get_logger().error(f"解析机器人状态失败: {e}")
def coordination_loop(self):
"""主协调循环"""
# 获取待分配的任务
pending_tasks = self.task_manager.get_pending_tasks()
if not pending_tasks:
return
# 收集可用机器人
available_robots = [
robot_id for robot_id, status in self.robots.items()
if status.get('status') == 'idle'
]
if not available_robots:
return
# 分配任务
assigner = HungarianAssigner(available_robots, pending_tasks)
assignments = assigner.assign_optimal()
for robot_id, task in assignments.items():
self.assign_task_to_robot(robot_id, task)
def assign_task_to_robot(self, robot_id: str, task: Task):
"""分配任务给机器人"""
self.task_manager.assign_task(task.task_id, robot_id)
# 发布分配消息
msg = String()
msg.data = json.dumps({
'robot_id': robot_id,
'task_id': task.task_id,
'task_type': task.task_type,
'goal_pose': task.goal_pose
})
self.task_publisher.publish(msg)
self.get_logger().info(
f"任务 {task.task_id[:8]} 分配给 {robot_id}")
def assign_task_service(self, request, response):
"""处理任务分配请求"""
try:
task_data = json.loads(request.data)
task = self.task_manager.create_task(
task_type=task_data['task_type'],
priority=task_data['priority'],
goal_pose=tuple(task_data.get('goal_pose', [0, 0, 0]))
)
response.data = task.task_id
return response
except Exception as e:
self.get_logger().error(f"创建任务失败: {e}")
return response
def main(args=None):
rclpy.init(args=args)
coordinator = RobotCoordinator()
executor = MultiThreadedExecutor()
rclpy.spin(coordinator, executor=executor)
rclpy.shutdown()
if __name__ == '__main__':
main()
4.2 机器人执行节点
#!/usr/bin/env python3
"""
单个机器人的任务执行节点
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import json
import uuid
class RobotExecutor(Node):
"""机器人任务执行器"""
def __init__(self, robot_id: str):
super().__init__(f'robot_{robot_id}_executor')
self.robot_id = robot_id
self.current_task = None
self.executor = DistributedTaskExecutor(robot_id)
# 发布者
self.status_publisher = self.create_publisher(
String, '/robot_status', 10)
# 订阅者
self.task_subscriber = self.create_subscription(
String, '/task_assignments', self.task_callback, 10)
# 定时器
self.status_timer = self.create_timer(
1.0, self.publish_status)
self.execution_timer = self.create_timer(
0.5, self.execute_task)
self.get_logger().info(f"机器人 {robot_id} 执行器已启动")
def task_callback(self, msg):
"""接收任务分配"""
try:
task_data = json.loads(msg.data)
if task_data['robot_id'] != self.robot_id:
return
task = Task(
task_id=task_data['task_id'],
task_type=task_data['task_type'],
priority=5,
status=TaskStatus.ASSIGNED,
goal_pose=tuple(task_data.get('goal_pose', [0, 0, 0]))
)
self.executor.accept_task(task)
self.get_logger().info(f"接受任务 {task.task_id[:8]}")
except Exception as e:
self.get_logger().error(f"处理任务失败: {e}")
def execute_task(self):
"""执行任务"""
if not self.executor.task_queue:
return
task = self.executor.current_task or self.executor.task_queue[0]
# 执行任务
success = self.executor.execute_current_task()
if success:
self.get_logger().info(
f"任务 {task.task_id[:8]} 执行成功")
else:
self.get_logger().warn(
f"任务 {task.task_id[:8]} 执行失败")
def publish_status(self):
"""发布机器人状态"""
status = {
'robot_id': self.robot_id,
'status': 'busy' if self.executor.task_queue else 'idle',
'current_task': self.executor.current_task.task_id
if self.executor.current_task else None,
'task_queue_size': len(self.executor.task_queue)
}
msg = String()
msg.data = json.dumps(status)
self.status_publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
# 可以通过命令行参数指定机器人ID
import sys
robot_id = sys.argv[1] if len(sys.argv) > 1 else 'robot_1'
executor = RobotExecutor(robot_id)
rclpy.spin(executor)
rclpy.shutdown()
if __name__ == '__main__':
main()
五、多机器人系统Launch文件
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
num_robots = LaunchConfiguration('num_robots')
declare_num_robots = DeclareLaunchArgument(
'num_robots',
default_value='3',
description='机器人数量'
)
nodes = [
# 协调器
Node(
package='multi_robot_system',
executable='coordinator',
name='coordinator',
output='screen'
),
]
# 动态创建机器人节点
for i in range(1, 4):
nodes.append(
Node(
package='multi_robot_system',
executable='robot_executor',
name=f'robot_{i}_executor',
arguments=[f'robot_{i}'],
output='screen'
)
)
# 每个机器人的驱动节点
nodes.append(
Node(
package='multi_robot_system',
executable='robot_driver',
name=f'robot_{i}_driver',
arguments=[f'robot_{i}'],
output='screen'
)
)
return LaunchDescription([declare_num_robots] + nodes)
六、本项目要点总结
✅ 系统架构:
- 中心协调器 vs 分布式协调
- 通信拓扑设计
- 分层控制结构
✅ 任务管理:
- 任务定义和队列
- 优先级管理
- 状态追踪
✅ 协调算法:
- 匈牙利算法任务分配
- 碰撞检测和避免
- 死锁预防
✅ 分布式实现:
- 协调器节点
- 执行节点
- 状态同步
✅ 可扩展性:
- 易于添加新机器人
- 动态任务分配
- 故障容错
七、完整系列总结
ROS2从入门到精通 - 三大阶段共19篇
第一阶段:入门基础(第1-8篇)
核心概念、环境搭建、工作空间、通信机制、工具链、调试技巧
第二阶段:核心深化(第9-15篇)
消息定义、DDS优化、生命周期、坐标变换、传感器、时间管理、性能优化
第三阶段:实战项目(第16-19篇)
自主导航系统 - SLAM、路径规划、运动控制
机械臂控制 - 正逆运动学、轨迹规划、MoveIt
视觉识别 - 目标检测、多目标跟踪、YOLO
多机器人协作 - 分布式任务管理、协调、编队
八、学习成果
完成本系列学习后,你将能够:
✅ 设计和实现 完整的机器人系统
✅ 集成 各种硬件和软件模块
✅ 优化 系统性能和实时性
✅ 调试 复杂的分布式系统
✅ 扩展 系统功能和能力
九、继续学习
推荐的下一步方向:
-
深入专题研究
- SLAM高级算法(EKF-SLAM、GraphSLAM)
- 运动规划高级技术(RRT、PRM)
- 深度学习应用(3D检测、语义分割)
-
参与开源项目
- https://github.com/ros2
- https://github.com/ros-planning
- https://github.com/ros-perception
-
实战开发
- 参加机器人比赛
- 开发实际应用系统
- 贡献到开源社区
恭贺!你已完成ROS2从入门到精通的全部课程!
这19篇文章涵盖了ROS2的几乎所有核心功能和实战应用。现在,你已经拥有独立开发复杂机器人系统的能力。
记住:最好的学习方式是实践。选择一个有趣的项目,动手开发吧!
完整教程索引
第一阶段:入门基础
- ROS2是什么 - 核心概念和架构
- 环境搭建 - Ubuntu系统安装ROS2
- 工作空间与包 - 创建和管理
- 节点与话题 - 发布-订阅模型实战
- 服务与动作 - 同步通信机制
- 常用命令行工具 - 开发必备
- 参数与配置 - Launch文件深入讲解
- 调试技巧 - 日志监控与问题排查
第二阶段:核心深化
- 消息定义与自定义 - 核心数据结构
- DDS中间件 - 通信优化与QoS策略
- 生命周期节点 - 完善的节点管理
- 坐标变换(TF2)- 三维空间几何关系
- 传感器集成 - 从硬件到数据流
- 时间管理 - 模拟时钟与同步
- 性能优化 - 实战案例与系统集成
第三阶段:实战项目
感谢你的学习!如有任何问题,欢迎提出反馈!
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐

所有评论(0)