https://gitee.com/louis_slam/websocket_slam_tool

一、引言

在机器人领域,同步定位与地图构建(SLAM)是实现自主导航的关键技术。然而,SLAM算法通常需要较高的计算资源,对于一些采用低算力处理器的机器人来说,实时运行SLAM算法可能会力不从心,导致建图和定位的延迟和精度下降。为了解决这一问题,我们开发了一个基于WebSocket的ROS2分布式SLAM解决方案,通过将机器人的传感器数据上传到服务器进行SLAM运算,再将地图和定位信息返回给机器人,实现了低算力机器人的实时建图和定位。

二、系统架构

2.1 整体设计思路

我们的系统主要由机器人端和服务器端两部分组成。机器人端负责采集传感器数据,包括激光雷达的scan数据、IMU数据和里程计的odom数据,并通过WebSocket将这些数据发送到服务器端。服务器端接收到数据后,运行SLAM算法进行建图和定位,然后将生成的地图和定位信息通过WebSocket发送回机器人端。机器人端根据接收到的地图和定位信息,实现自主导航。原则上websocket是不受ros系统限制的,所以主从机可以使用不同的系统,目前笔者在ros2和ros1做过异构测试,本文中代码以双端ros2为例。

2.2 技术栈选择

  • ROS2:作为机器人操作系统,ROS2提供了丰富的机器人开发工具和库,方便我们实现传感器数据的采集和处理,以及机器人的控制和导航。
  • WebSocket:作为一种全双工通信协议,WebSocket可以在客户端和服务器之间建立持久的连接,实现高效的数据传输。与传统的HTTP协议相比,WebSocket减少了通信的开销,提高了数据传输的实时性,非常适合用于机器人传感器数据和SLAM结果的传输。
  • SLAM算法:我们可以根据具体的需求选择合适的SLAM算法,如GMapping、Cartographer等。这些算法可以在服务器端利用较高的计算资源高效地运行,生成准确的地图和定位信息。

三、实现步骤

3.1 机器人端实现

  1. 传感器数据采集:在ROS2中,我们可以通过订阅相应的话题来获取激光雷达的scan数据、IMU数据和里程计的odom数据。例如,激光雷达的数据通常发布在/scan话题上,IMU数据发布在/imu话题上,里程计数据发布在/odom话题上。我们可以使用ROS2的消息订阅机制,实时获取这些传感器数据。
  2. WebSocket客户端开发:我们需要在机器人端开发一个WebSocket客户端,用于将采集到的传感器数据发送到服务器端。在ROS2中,我们可以使用Python或C++等编程语言来实现WebSocket客户端。以Python为例,可以使用websockets库来实现WebSocket的通信。以下面是机器人端订阅和传输odom的Python代码示例:

python

# robot_odom_sender_callback_fixed.py

import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
import websockets
import asyncio
import struct
import threading
import time

class OdomSender(Node):

    def __init__(self):
        super().__init__("robot_scan_sender")

        self.ws = None
        self.connected = False
        self.ws_loop = None    # ★★★ 必须先定义

        self.uri = "ws://47.97.170.44:8891"服务器地址和端口号

        self.create_subscription(Odometry, "/odom", self.odom_callback, 10)

        threading.Thread(target=self.ws_thread, daemon=True).start()

    # ----------------------------------------
    # WebSocket 独立线程
    # ----------------------------------------
    def ws_thread(self):
        self.ws_loop = asyncio.new_event_loop()      # ★★★ 创建 loop
        asyncio.set_event_loop(self.ws_loop)
        self.ws_loop.run_until_complete(self.ws_main())

    async def ws_main(self):
        while True:
            try:
                self.get_logger().info("Trying to connect to cloud odom server...")
                async with websockets.connect(self.uri, max_size=10*1024*1024) as ws:
                    self.ws = ws
                    self.connected = True
                    self.get_logger().info("Connected to cloud odom server.")
                    await self.ws.wait_closed()  # 保持连接
            except Exception as e:
                self.connected = False
                self.get_logger().warn(f"WebSocket error: {e}, retry in 1s")
                await asyncio.sleep(1)

    # ----------------------------------------
    # Odom 回调中发送
    # ----------------------------------------
    def odom_callback(self, msg):
        # ★★★ 如果 websocket 还没准备好 → 不发
        if not self.connected or self.ws is None or self.ws_loop is None:
            return

        try:
            data = self.pack_odom(msg)
            asyncio.run_coroutine_threadsafe(self.ws.send(data), self.ws_loop)
            #print("send odom", msg.header.stamp)
        except Exception as e:
            self.get_logger().warn(f"Send failed: {e}")

    # ----------------------------------------
    # 打包 Odom
    # ----------------------------------------
    def pack_odom(self, msg):
        sec=msg.header.stamp.sec
        nano=msg.header.stamp.nanosec
        p = msg.pose.pose.position
        q = msg.pose.pose.orientation
        v = msg.twist.twist.linear
        w = msg.twist.twist.angular

        return struct.pack(
            "dddddddddddddII",
            p.x, p.y, p.z,
            q.x, q.y, q.z, q.w,
            v.x, v.y, v.z,
            w.x, w.y, w.z,
            sec,nano
        )


def main():
    rclpy.init()
    node = OdomSender()
    rclpy.spin(node)

if __name__ == "__main__":
    main()

  1. 接收服务器数据:机器人端还需要接收服务器端发送的地图和定位信息。同样,我们可以通过WebSocket客户端来接收这些数据,并将其转换为ROS2的消息,发布到相应的话题上,供机器人的导航模块使用。

3.2 服务器端实现

  1. WebSocket服务器开发:在服务器端,我们需要开发一个WebSocket服务器,用于接收机器人端发送的传感器数据。同样,可以使用Python的websockets库来实现WebSocket服务器。以下是服务器发送地图的Python代码示例:

python

import asyncio
import threading
import struct
import websockets
import rclpy
from nav_msgs.msg import OccupancyGrid
from geometry_msgs.msg import Quaternion
import math

node = None
map_msg = None

def map_callback(msg):
    global map_msg
    map_msg = msg

def yaw_to_quaternion(yaw):
    return Quaternion(
        x=0.0,
        y=0.0,
        z=math.sin(yaw / 2.0),
        w=math.cos(yaw / 2.0)
    )

def pack_map(msg: OccupancyGrid):
    width  = msg.info.width
    height = msg.info.height
    res    = msg.info.resolution

    ox = msg.info.origin.position.x
    oy = msg.info.origin.position.y

    # 从四元数提取 yaw
    q = msg.info.origin.orientation
    yaw = math.atan2(2*(q.w*q.z + q.x*q.y), 1 - 2*(q.y*q.y + q.z*q.z))

    header = struct.pack("iifff", width, height, res, ox, oy)
    header += struct.pack("f", yaw)

    data = msg.data
    data_bin = struct.pack(f"{width*height}b", *data)

    return header + data_bin

async def map_sender(ws):
    while True:
        await asyncio.sleep(0.15)
        if map_msg is None:
            continue
        await ws.send(pack_map(map_msg))

async def handler(ws):
    print("Client connected (map)")
    try:
        await map_sender(ws)
    except Exception as e:
        print("map sender error:", e)

def init_ros():
    global node
    rclpy.init()
    node = rclpy.create_node("cloud_map_server")
    node.create_subscription(OccupancyGrid, "/map", map_callback, 10)

def spin_ros():
    rclpy.spin(node)

async def main():
    init_ros()
    threading.Thread(target=spin_ros, daemon=True).start()

    await websockets.serve(handler, "0.0.0.0", 8890, max_size=50*1024*1024)#端口号
    print("Cloud map server listening on :8890")

    await asyncio.Future()

asyncio.run(main())

  1. SLAM算法运行:服务器端接收到机器人发送的传感器数据后,运行SLAM算法进行建图和定位。我们可以根据选择的SLAM算法,编写相应的代码来运行算法。例如,如果使用cartographer算法,可以在ROS2中启动cartographer节点,并将接收到的传感器数据输入到cartographer节点中。
  2. 结果返回:服务器端将SLAM算法生成的地图和定位信息通过WebSocket发送回机器人端,供机器人进行导航。

三、性能测试与优化

3.1 性能测试

为了验证我们的系统性能,我们进行了一系列的测试。测试环境包括一台低算力处理器的机器人和一台高性能服务器。在测试中,我们让机器人在一个室内环境中移动,采集传感器数据,并通过WebSocket发送到服务器进行SLAM运算。我们记录了建图和定位的时间、延迟和精度等指标,并与机器人本地运行SLAM算法的结果进行了比较。

测试结果表明,我们的系统能够有效地降低机器人的计算负载,提高建图和定位的实时性和精度。与机器人本地运行SLAM算法相比,使用我们的系统后,建图和定位的延迟明显降低,精度也有所提高。

3.2 优化措施

在测试过程中,我们也发现了一些问题,并采取了相应的优化措施。例如,为了减少数据传输的延迟,我们对传感器数据进行了压缩处理,减少了数据量。此外,我们还优化了WebSocket的通信协议,提高了数据传输的效率。我们还可以根据服务器的负载情况,动态调整SLAM算法的参数,以提高算法的运行效率。

四、应用场景与拓展

4.1 应用场景

我们的系统可以广泛应用于各种机器人场景,特别是那些采用低算力处理器的机器人。例如,在智能家居领域,我们可以将机器人应用于家庭环境的巡检和清洁,通过分布式SLAM实现自主导航。在工业领域,我们可以将机器人应用于仓库的货物搬运和盘点,提高工作效率。

4.2 拓展方向

未来,我们还可以对系统进行进一步的拓展和优化。例如,我们可以引入多机器人协作的功能,实现多个机器人之间的分布式SLAM。我们还可以将系统与其他技术相结合,如深度学习和计算机视觉,进一步提高机器人的自主导航能力。此外,我们还可以开发一个可视化的界面,方便用户对系统进行监控和管理。

五、总结

通过基于WebSocket的ROS2分布式SLAM解决方案,我们成功地实现了低算力机器人的实时建图和定位。该系统具有较高的实时性和精度,能够有效地降低机器人的计算负载,为低算力机器人的应用提供了新的可能性。在未来的工作中,我们将继续对系统进行优化和拓展,使其能够更好地满足实际应用的需求。

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐