ROS 2从入门到精通系列(十九):多机器人协作 - 分布式系统与协调

构建多机器人协作系统,实现分布式任务执行和协调。


引言

单个机器人的能力有限,多个机器人协作可以实现:

  • 任务分解:复杂任务分配给多个机器人
  • 并行执行:同时完成多个子任务
  • 协调运动:避免碰撞和死锁
  • 资源共享:共享传感器和计算资源

本篇将展示如何设计和实现一个多机器人协作系统。


一、多机器人系统架构

1.1 系统架构设计

机器人3

机器人2

机器人1

全局系统

通信

通信

通信

协调

协调

协调

监视

监视

监视

通信中间件
ROS2 DDS

协调器
Central Coordinator

监视系统
Monitoring

驱动层
Hardware

控制层
Control

任务执行
Task Executor

驱动层
Hardware

控制层
Control

任务执行
Task Executor

驱动层
Hardware

控制层
Control

任务执行
Task Executor

1.2 通信拓扑

星型拓扑(中心协调):

Coordinator

Robot1

Robot2

Robot3

环形拓扑(无中心):

Robot1

Robot2

Robot3

混合拓扑(分层):

Controller

R1

R2

R3


二、分布式任务管理

2.1 任务定义和分配

#!/usr/bin/env python3
"""
分布式任务管理系统
"""

from enum import Enum
from dataclasses import dataclass
import uuid
from typing import List, Dict

class TaskStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    ASSIGNED = "assigned"
    EXECUTING = "executing"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class Task:
    """任务定义"""
    task_id: str
    task_type: str          # navigation, manipulation, inspection等
    priority: int           # 优先级 1-10
    status: TaskStatus
    assigned_robot: str = None
    start_time: float = None
    estimated_duration: float = None
    goal_pose: tuple = None  # (x, y, z)

class TaskManager:
    """集中式任务管理器"""

    def __init__(self):
        self.tasks: Dict[str, Task] = {}
        self.robot_capacities = {}
        self.robot_status = {}

    def create_task(self, task_type: str, priority: int,
                   goal_pose: tuple = None) -> Task:
        """创建新任务"""
        task = Task(
            task_id=str(uuid.uuid4()),
            task_type=task_type,
            priority=priority,
            status=TaskStatus.PENDING,
            goal_pose=goal_pose
        )
        self.tasks[task.task_id] = task
        return task

    def assign_task(self, task_id: str, robot_id: str) -> bool:
        """分配任务给机器人"""
        if task_id not in self.tasks:
            return False

        task = self.tasks[task_id]

        # 检查机器人是否可用
        if not self.is_robot_available(robot_id):
            return False

        # 检查机器人能力
        if not self.robot_can_handle(robot_id, task):
            return False

        task.assigned_robot = robot_id
        task.status = TaskStatus.ASSIGNED
        self.robot_status[robot_id] = "busy"

        return True

    def is_robot_available(self, robot_id: str) -> bool:
        """检查机器人是否可用"""
        return self.robot_status.get(robot_id) == "idle"

    def robot_can_handle(self, robot_id: str, task: Task) -> bool:
        """检查机器人是否能执行任务"""
        capabilities = self.robot_capacities.get(robot_id, [])
        return task.task_type in capabilities

    def update_task_status(self, task_id: str, new_status: TaskStatus):
        """更新任务状态"""
        if task_id in self.tasks:
            self.tasks[task_id].status = new_status

            if new_status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
                robot_id = self.tasks[task_id].assigned_robot
                if robot_id:
                    self.robot_status[robot_id] = "idle"

    def get_pending_tasks(self) -> List[Task]:
        """获取待分配的任务(按优先级排序)"""
        pending = [t for t in self.tasks.values()
                  if t.status == TaskStatus.PENDING]
        return sorted(pending, key=lambda t: t.priority, reverse=True)

class DistributedTaskExecutor:
    """分布式任务执行器"""

    def __init__(self, robot_id: str):
        self.robot_id = robot_id
        self.current_task: Task = None
        self.task_queue: List[Task] = []

    def accept_task(self, task: Task):
        """接受分配的任务"""
        self.task_queue.append(task)
        self.task_queue.sort(key=lambda t: t.priority, reverse=True)

    def execute_current_task(self) -> bool:
        """执行当前任务"""
        if not self.task_queue:
            return False

        self.current_task = self.task_queue.pop(0)

        # 执行任务
        try:
            success = self.execute_task_impl(self.current_task)
            return success
        except Exception as e:
            print(f"任务执行失败: {e}")
            return False

    def execute_task_impl(self, task: Task) -> bool:
        """实际任务执行(由子类重写)"""
        if task.task_type == "navigation":
            return self.navigate_to(task.goal_pose)
        elif task.task_type == "manipulation":
            return self.manipulate(task)
        return False

    def navigate_to(self, goal: tuple) -> bool:
        """导航到目标位置"""
        # 实现具体的导航逻辑
        print(f"[{self.robot_id}] 导航到 {goal}")
        return True

    def manipulate(self, task: Task) -> bool:
        """执行操作任务"""
        print(f"[{self.robot_id}] 执行操作任务")
        return True

三、协调和冲突解决

3.1 碰撞检测和避免

import math
from typing import Tuple

class CollisionAvoidanceManager:
    """碰撞检测和避免管理器"""

    def __init__(self, robot_radius: float = 0.2,
                 safety_distance: float = 0.1):
        self.robot_radius = robot_radius
        self.safety_distance = safety_distance
        self.robot_positions: Dict[str, Tuple[float, float]] = {}

    def update_robot_position(self, robot_id: str, x: float, y: float):
        """更新机器人位置"""
        self.robot_positions[robot_id] = (x, y)

    def check_collision(self, robot_id: str, x: float, y: float) -> bool:
        """检查是否会碰撞"""
        for other_id, (other_x, other_y) in self.robot_positions.items():
            if other_id == robot_id:
                continue

            # 计算距离
            distance = math.sqrt((x - other_x)**2 + (y - other_y)**2)

            # 检查安全距离
            min_distance = 2 * self.robot_radius + self.safety_distance
            if distance < min_distance:
                return True

        return False

    def plan_avoid_path(self, robot_id: str, goal: Tuple[float, float],
                       current: Tuple[float, float]) -> List[Tuple[float, float]]:
        """规划避碰路径"""
        # 使用势场法或RRT避碰
        path = [current, goal]

        # 检查直线路径是否安全
        if not self.is_path_safe(robot_id, path):
            # 生成备选路径
            path = self.generate_alternative_path(
                robot_id, current, goal)

        return path

    def is_path_safe(self, robot_id: str, path: List[Tuple[float, float]]) -> bool:
        """检查路径是否安全"""
        for x, y in path:
            if self.check_collision(robot_id, x, y):
                return False
        return True

    def generate_alternative_path(self, robot_id: str,
                                 start: Tuple[float, float],
                                 goal: Tuple[float, float]
                                 ) -> List[Tuple[float, float]]:
        """生成替代路径"""
        # 简单实现:绕过障碍物
        mid_x = (start[0] + goal[0]) / 2 + 0.5
        mid_y = (start[1] + goal[1]) / 2

        return [start, (mid_x, mid_y), goal]

3.2 协调算法 - 匈牙利算法任务分配

from scipy.optimize import linear_sum_assignment
import numpy as np

class HungarianAssigner:
    """使用匈牙利算法的任务分配"""

    def __init__(self, robots: List[str], tasks: List[Task]):
        self.robots = robots
        self.tasks = tasks

    def compute_cost_matrix(self) -> np.ndarray:
        """计算成本矩阵"""
        n_robots = len(self.robots)
        n_tasks = len(self.tasks)

        cost_matrix = np.zeros((n_robots, n_tasks))

        for i, robot in enumerate(self.robots):
            for j, task in enumerate(self.tasks):
                cost = self.compute_cost(robot, task)
                cost_matrix[i, j] = cost

        return cost_matrix

    def compute_cost(self, robot_id: str, task: Task) -> float:
        """计算单个分配成本"""
        cost = 0.0

        # 距离成本
        distance = self.estimate_distance(robot_id, task.goal_pose)
        cost += distance

        # 能力匹配成本
        if not self.robot_can_handle(robot_id, task):
            cost += 1000  # 很高的成本

        # 优先级成本(优先级高的任务成本低)
        cost -= task.priority / 10

        return cost

    def assign_optimal(self) -> Dict[str, Task]:
        """计算最优分配"""
        cost_matrix = self.compute_cost_matrix()

        # 使用匈牙利算法
        robot_idx, task_idx = linear_sum_assignment(cost_matrix)

        assignment = {}
        for r_idx, t_idx in zip(robot_idx, task_idx):
            assignment[self.robots[r_idx]] = self.tasks[t_idx]

        return assignment

    def estimate_distance(self, robot_id: str,
                         goal: Tuple[float, float]) -> float:
        """估计距离(简化版)"""
        # 从某个位置信息服务获取当前位置
        return 0.0

    def robot_can_handle(self, robot_id: str, task: Task) -> bool:
        """检查机器人能力"""
        return True

四、ROS2分布式节点实现

4.1 协调器节点

#!/usr/bin/env python3
"""
多机器人协调器节点
"""

import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from std_msgs.msg import String
from geometry_msgs.msg import PoseArray
import json
from typing import Dict, List

class RobotCoordinator(Node):
    """机器人协调器"""

    def __init__(self):
        super().__init__('robot_coordinator')

        # 初始化
        self.robots: Dict[str, Dict] = {}
        self.task_manager = TaskManager()

        # 发布者
        self.task_publisher = self.create_publisher(
            String, '/task_assignments', 10)

        # 订阅者
        self.robot_status_subscriber = self.create_subscription(
            String, '/robot_status', self.robot_status_callback, 10)

        # 定时器
        self.coordinator_timer = self.create_timer(
            1.0, self.coordination_loop)

        # 服务
        self.create_service(
            String, '/assign_task', self.assign_task_service)

        self.get_logger().info('协调器已启动')

    def robot_status_callback(self, msg):
        """接收机器人状态"""
        try:
            status = json.loads(msg.data)
            robot_id = status['robot_id']
            self.robots[robot_id] = status
        except Exception as e:
            self.get_logger().error(f"解析机器人状态失败: {e}")

    def coordination_loop(self):
        """主协调循环"""
        # 获取待分配的任务
        pending_tasks = self.task_manager.get_pending_tasks()

        if not pending_tasks:
            return

        # 收集可用机器人
        available_robots = [
            robot_id for robot_id, status in self.robots.items()
            if status.get('status') == 'idle'
        ]

        if not available_robots:
            return

        # 分配任务
        assigner = HungarianAssigner(available_robots, pending_tasks)
        assignments = assigner.assign_optimal()

        for robot_id, task in assignments.items():
            self.assign_task_to_robot(robot_id, task)

    def assign_task_to_robot(self, robot_id: str, task: Task):
        """分配任务给机器人"""
        self.task_manager.assign_task(task.task_id, robot_id)

        # 发布分配消息
        msg = String()
        msg.data = json.dumps({
            'robot_id': robot_id,
            'task_id': task.task_id,
            'task_type': task.task_type,
            'goal_pose': task.goal_pose
        })
        self.task_publisher.publish(msg)

        self.get_logger().info(
            f"任务 {task.task_id[:8]} 分配给 {robot_id}")

    def assign_task_service(self, request, response):
        """处理任务分配请求"""
        try:
            task_data = json.loads(request.data)
            task = self.task_manager.create_task(
                task_type=task_data['task_type'],
                priority=task_data['priority'],
                goal_pose=tuple(task_data.get('goal_pose', [0, 0, 0]))
            )
            response.data = task.task_id
            return response
        except Exception as e:
            self.get_logger().error(f"创建任务失败: {e}")
            return response

def main(args=None):
    rclpy.init(args=args)
    coordinator = RobotCoordinator()
    executor = MultiThreadedExecutor()
    rclpy.spin(coordinator, executor=executor)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

4.2 机器人执行节点

#!/usr/bin/env python3
"""
单个机器人的任务执行节点
"""

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import json
import uuid

class RobotExecutor(Node):
    """机器人任务执行器"""

    def __init__(self, robot_id: str):
        super().__init__(f'robot_{robot_id}_executor')

        self.robot_id = robot_id
        self.current_task = None
        self.executor = DistributedTaskExecutor(robot_id)

        # 发布者
        self.status_publisher = self.create_publisher(
            String, '/robot_status', 10)

        # 订阅者
        self.task_subscriber = self.create_subscription(
            String, '/task_assignments', self.task_callback, 10)

        # 定时器
        self.status_timer = self.create_timer(
            1.0, self.publish_status)
        self.execution_timer = self.create_timer(
            0.5, self.execute_task)

        self.get_logger().info(f"机器人 {robot_id} 执行器已启动")

    def task_callback(self, msg):
        """接收任务分配"""
        try:
            task_data = json.loads(msg.data)
            if task_data['robot_id'] != self.robot_id:
                return

            task = Task(
                task_id=task_data['task_id'],
                task_type=task_data['task_type'],
                priority=5,
                status=TaskStatus.ASSIGNED,
                goal_pose=tuple(task_data.get('goal_pose', [0, 0, 0]))
            )

            self.executor.accept_task(task)
            self.get_logger().info(f"接受任务 {task.task_id[:8]}")

        except Exception as e:
            self.get_logger().error(f"处理任务失败: {e}")

    def execute_task(self):
        """执行任务"""
        if not self.executor.task_queue:
            return

        task = self.executor.current_task or self.executor.task_queue[0]

        # 执行任务
        success = self.executor.execute_current_task()

        if success:
            self.get_logger().info(
                f"任务 {task.task_id[:8]} 执行成功")
        else:
            self.get_logger().warn(
                f"任务 {task.task_id[:8]} 执行失败")

    def publish_status(self):
        """发布机器人状态"""
        status = {
            'robot_id': self.robot_id,
            'status': 'busy' if self.executor.task_queue else 'idle',
            'current_task': self.executor.current_task.task_id
                if self.executor.current_task else None,
            'task_queue_size': len(self.executor.task_queue)
        }

        msg = String()
        msg.data = json.dumps(status)
        self.status_publisher.publish(msg)

def main(args=None):
    rclpy.init(args=args)

    # 可以通过命令行参数指定机器人ID
    import sys
    robot_id = sys.argv[1] if len(sys.argv) > 1 else 'robot_1'

    executor = RobotExecutor(robot_id)
    rclpy.spin(executor)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

五、多机器人系统Launch文件

from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
    num_robots = LaunchConfiguration('num_robots')

    declare_num_robots = DeclareLaunchArgument(
        'num_robots',
        default_value='3',
        description='机器人数量'
    )

    nodes = [
        # 协调器
        Node(
            package='multi_robot_system',
            executable='coordinator',
            name='coordinator',
            output='screen'
        ),
    ]

    # 动态创建机器人节点
    for i in range(1, 4):
        nodes.append(
            Node(
                package='multi_robot_system',
                executable='robot_executor',
                name=f'robot_{i}_executor',
                arguments=[f'robot_{i}'],
                output='screen'
            )
        )

        # 每个机器人的驱动节点
        nodes.append(
            Node(
                package='multi_robot_system',
                executable='robot_driver',
                name=f'robot_{i}_driver',
                arguments=[f'robot_{i}'],
                output='screen'
            )
        )

    return LaunchDescription([declare_num_robots] + nodes)

六、本项目要点总结

系统架构

  • 中心协调器 vs 分布式协调
  • 通信拓扑设计
  • 分层控制结构

任务管理

  • 任务定义和队列
  • 优先级管理
  • 状态追踪

协调算法

  • 匈牙利算法任务分配
  • 碰撞检测和避免
  • 死锁预防

分布式实现

  • 协调器节点
  • 执行节点
  • 状态同步

可扩展性

  • 易于添加新机器人
  • 动态任务分配
  • 故障容错

七、完整系列总结

ROS2从入门到精通 - 三大阶段共19篇

第一阶段:入门基础(第1-8篇)

核心概念、环境搭建、工作空间、通信机制、工具链、调试技巧

第二阶段:核心深化(第9-15篇)

消息定义、DDS优化、生命周期、坐标变换、传感器、时间管理、性能优化

第三阶段:实战项目(第16-19篇)

自主导航系统 - SLAM、路径规划、运动控制
机械臂控制 - 正逆运动学、轨迹规划、MoveIt
视觉识别 - 目标检测、多目标跟踪、YOLO
多机器人协作 - 分布式任务管理、协调、编队


八、学习成果

完成本系列学习后,你将能够:

设计和实现 完整的机器人系统
集成 各种硬件和软件模块
优化 系统性能和实时性
调试 复杂的分布式系统
扩展 系统功能和能力


九、继续学习

推荐的下一步方向:

  1. 深入专题研究

    • SLAM高级算法(EKF-SLAM、GraphSLAM)
    • 运动规划高级技术(RRT、PRM)
    • 深度学习应用(3D检测、语义分割)
  2. 参与开源项目

    • https://github.com/ros2
    • https://github.com/ros-planning
    • https://github.com/ros-perception
  3. 实战开发

    • 参加机器人比赛
    • 开发实际应用系统
    • 贡献到开源社区

恭贺!你已完成ROS2从入门到精通的全部课程!

这19篇文章涵盖了ROS2的几乎所有核心功能和实战应用。现在,你已经拥有独立开发复杂机器人系统的能力。

记住:最好的学习方式是实践。选择一个有趣的项目,动手开发吧!


完整教程索引

第一阶段:入门基础

  1. ROS2是什么 - 核心概念和架构
  2. 环境搭建 - Ubuntu系统安装ROS2
  3. 工作空间与包 - 创建和管理
  4. 节点与话题 - 发布-订阅模型实战
  5. 服务与动作 - 同步通信机制
  6. 常用命令行工具 - 开发必备
  7. 参数与配置 - Launch文件深入讲解
  8. 调试技巧 - 日志监控与问题排查

第二阶段:核心深化

  1. 消息定义与自定义 - 核心数据结构
  2. DDS中间件 - 通信优化与QoS策略
  3. 生命周期节点 - 完善的节点管理
  4. 坐标变换(TF2)- 三维空间几何关系
  5. 传感器集成 - 从硬件到数据流
  6. 时间管理 - 模拟时钟与同步
  7. 性能优化 - 实战案例与系统集成

第三阶段:实战项目

  1. 自主导航机器人 - 系统架构与SLAM
  2. 机械臂控制 - 正逆运动学与轨迹规划
  3. 视觉识别系统 - 目标检测与跟踪
  4. 多机器人协作 - 分布式系统与协调

感谢你的学习!如有任何问题,欢迎提出反馈!

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐