引用:[1]杨思祺,熊鸣,张巧杰,等.基于改进PPO算法的机器人机械臂抓取策略研究[J/OL].组合机床与自动化加工技术,1-11[2026-02-05].https://link.cnki.net/urlid/21.1132.TG.20260128.1430.078.

摘要:本文设计并且搭建了机器人机械臂的训练环境,选取 PPO 算法作为机械臂的控制和训练算法。在此环境下,对 PPO 算法进行改进,通过引入贝叶斯优化算法以实现自适应优化熵损失系数,并进行机械臂抓取策略的实验,对改进后的算法进行了测试。实验结果表明,改进的 PPO 算法的全局概率模型搜索策略能更高效地穿越训练陷阱,规避固定系数导致的探索不足问题,相较于原始 PPO 算法,平均奖励提高了 21.2%,抓取成功率提升了 24%。实验结果不仅为机械臂动态控制提供了更优的算法,更证实了强化学习算法中参数自适应机制对提升控制精度与泛化能力的关键作用,为工业机器人智能控制的工程化应用奠定了坚实的基础。未来研究可进一步结合多目标优化拓展贝叶斯框架,推动算法在高维复杂机械臂任务中的深度落地。

目录

一、Ubuntu 22.04 虚拟机 上从零开始部署-环境配置

二、工程代码复现

三、跑通整个模型

三、🧠 现在的状态总结

四、可视化机械臂


一、Ubuntu 22.04 虚拟机 上从零开始部署-环境配置

打开 Ubuntu22.04 终端 (Terminal),依次执行以下命令来安装必要的库。

1. 更新系统并安装 Python 基础库

sudo apt update && sudo apt upgrade -y
sudo apt install -y python3-pip python3-venv git libgl1-mesa-glx libosmesa6

2. 创建虚拟环境 (推荐,防止污染系统)

python3 -m venv ppo_robot_env
source ppo_robot_env/bin/activate

(注意:每次新开终端都要运行 source ppo_robot_env/bin/activate 进入环境)

3.安装深度学习与仿真依赖 根据论文,我们需要 MuJoCo 物理引擎  和贝叶斯优化库。

重点!一定要在这个虚拟环境中安装(注意:每次新开终端都要运行 source ppo_robot_env/bin/activate 进入环境),前面会出现(ppo_robot_env)就代表进入该虚拟环境了

依次执行下列安装指令,时间稍微长一些,运行过程中会看到很多下载进度条,等所有命令跑完回到 (ppo_robot_env) 的提示符,即-回到命令行:要看到那行绿色的提示符 (ppo_robot_env) zhou@zhou-virtual-machine:~/Desktop$ 再次出现就算环境配置完成了,再接着运行下一条指令!

# 1. 安装 PyTorch 和基础数学库
pip install torch torchvision numpy matplotlib

# 2. 安装 强化学习环境 (MuJoCo 和 机械臂模型)
pip install "gymnasium[mujoco]" gymnasium-robotics

# 3. 安装 贝叶斯优化库 (用于复现论文核心算法)
pip install scikit-learn

重点!我们需要安装 panda-gym 才能调用七自由度的机械臂;否则后面会报错!如下图:

python train_main.py

>>> 警告: 未找到 Panda, 降级使用 FetchReach-v4 并模拟维度

=== 开始复现: 基于改进PPO的机械臂抓取 (Table 1参数) ===

所以我们现在继续安装:

pip install panda-gym

然后就可以开始创建代码文件了

二、工程代码复现

在home目录下创建一个文件夹 PPO_Robot_Project,并在里面创建以下 4个 Python 文件

首先在终端(Terminal)里,确保你还在 (ppo_robot_env) 这个环境下,依次输入以下命令:

1.创建文件夹

mkdir PPO_Robot_Project

2.进入文件夹

cd PPO_Robot_Project

3.使用 Ubuntu 自带的图形化文本编辑器 (gedit) 来写代码

   3.1 创建 robot_env.py (环境文件)

 gedit robot_env.py

然后把下面的代码完整复制并粘贴进去,保存并关闭:

import gymnasium as gym
import gymnasium_robotics
import panda_gym  # <---【必须】没有它就找不到 Panda 环境
import numpy as np
from gymnasium import spaces

class ImprovedArmEnv(gym.Env):
    def __init__(self):
        super(ImprovedArmEnv, self).__init__()
        
        # --- 1. 环境加载 ---
        try:
            # 【关键修改】control_type='joints' 告诉环境我们要控制7个关节,而不是3维坐标
            self.env = gym.make('PandaReach-v3', control_type='joints')
            self.real_dof = 7
            print(">>> ✅ 成功加载 PandaReach-v3 (7自由度-关节控制模式)")
        except Exception as e:
            print(f">>> ❌ Panda 加载失败: {e}")
            print(">>> ⚠️ 降级使用 FetchReach-v4")
            self.env = gym.make('FetchReach-v4')
            self.real_dof = 4 
            
        # --- 2. 状态空间 (严格复现 21维) ---
        # 包含: 7关节角 + 7关节速度 + 7目标偏差
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(21,), dtype=np.float32)
        
        # --- 3. 动作空间 (严格复现 7维) ---
        self.action_space = spaces.Box(low=-1, high=1, shape=(7,), dtype=np.float32)
        
        # --- 4. 论文参数 ---
        self.sigma = 0.1  # 噪声标准差
        self.alpha = 1.0  # 位置误差权重
        self.beta = 0.01  # 【微调】降低控制惩罚权重(从0.1降到0.01),鼓励机器人多动动

    def reset(self):
        obs, info = self.env.reset()
        return self._process_obs(obs), info

    def step(self, action):
        # 维度适配
        real_action = action[:self.real_dof]
        obs, _, terminated, truncated, info = self.env.step(real_action)
        
        # 获取真实坐标用于计算奖励
        current_pos = obs['achieved_goal']
        target_pos = obs['desired_goal']
        
        # --- 5. 奖励函数 ---
        dist = np.linalg.norm(current_pos - target_pos)
        r_dist = -dist 
        r_ctrl = -np.sum(np.square(action))
        
        # 原始奖励
        reward = self.alpha * r_dist + self.beta * r_ctrl
        
        if dist < 0.05:
            reward += 10.0 # 抓取成功奖励
            terminated = True
            
        # --- 【关键修改】奖励缩放 ---
        # 将奖励缩小10倍,防止数值过大(-300)导致梯度爆炸
        scaled_reward = reward * 0.1
            
        processed_obs = self._process_obs(obs)
        return processed_obs, scaled_reward, terminated, truncated, info

    def _process_obs(self, raw_obs):
        """数据处理: 对齐维度 + 加噪声 + 拼接"""
        base_obs = raw_obs['observation']
        
        # 初始化 7维 零向量
        theta = np.zeros(7)
        theta_dot = np.zeros(7)
        
        # 填充真实数据
        dim = min(len(base_obs)//2, self.real_dof)
        theta[:dim] = base_obs[:dim]
        theta_dot[:dim] = base_obs[dim:2*dim]
        
        # 加入高斯噪声
        noise_theta = np.random.normal(0, self.sigma, 7)
        noise_vel = np.random.normal(0, self.sigma, 7)
        
        theta_hat = theta + noise_theta
        theta_dot_hat = theta_dot + noise_vel
        
        # 模拟目标偏差 (7维)
        theta_goal_diff = np.random.normal(0, 0.01, 7) 
        
        # 拼接状态
        state = np.concatenate([theta_hat, theta_dot_hat, theta_goal_diff])
        return state.astype(np.float32)

3.2 2. 创建 ppo_algo.py (PPO算法文件)

gedit ppo_algo.py

复制粘贴以下代码,保存关闭:

import torch
import torch.nn as nn
from torch.distributions import Normal

class ActorCritic(nn.Module):
    def __init__(self, state_dim=21, action_dim=7):
        super(ActorCritic, self).__init__()
        
        # --- Actor 网络 (策略) ---
        # 严格复现图2结构: MLP
        self.actor = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.Tanh(),
            nn.Linear(128, 64),
            nn.Tanh(),
            nn.Linear(64, action_dim),
            nn.Tanh() # 输出限制在 [-1, 1]
        )
        self.log_std = nn.Parameter(torch.zeros(action_dim))
        
        # --- Critic 网络 (价值) ---
        self.critic = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.Tanh(),
            nn.Linear(128, 64),
            nn.Tanh(),
            nn.Linear(64, 1)
        )
        
    def get_action(self, state):
        mu = self.actor(state)
        std = self.log_std.exp().expand_as(mu)
        dist = Normal(mu, std)
        action = dist.sample()
        # 返回: 动作, log_prob (用于公式1), entropy (用于公式9)
        return action, dist.log_prob(action), dist.entropy()
    
    def get_value(self, state):
        return self.critic(state)

复制好再点save按钮,再退出就好!

3.3 创建 bayes_opt.py (贝叶斯优化文件)

gedit bayes_opt.py

复制粘贴以下代码,保存关闭:

import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
from scipy.stats import norm

class BayesOptimizer:
    def __init__(self):
        # --- 增加 alpha=0.1 告诉 GP 模型数据有噪音 ---
        kernel = C(1.0) * RBF(1.0)
        self.gp = GaussianProcessRegressor(kernel=kernel, alpha=0.1, n_restarts_optimizer=10)
        
        self.X_sample = [] 
        self.Y_sample = [] 
        
        # --- 【关键修改】缩小熵系数范围 ---
        # 之前的上限 0.2 太大了,会导致动作随机性过大
        self.bounds = np.linspace(0.001, 0.05, 100).reshape(-1, 1)

    def update(self, omega, reward):
        self.X_sample.append([omega])
        self.Y_sample.append(reward)

    def suggest_next_omega(self):
        # 数据太少时先随机探索
        if len(self.X_sample) < 5:
            return np.random.choice(self.bounds.flatten())
            
        X = np.array(self.X_sample)
        Y = np.array(self.Y_sample)
        
        # --- 数据归一化 (防止 Reward 数值过大搞崩 GP) ---
        if np.std(Y) > 1e-5:
            Y_std = (Y - np.mean(Y)) / np.std(Y)
        else:
            Y_std = Y - np.mean(Y)
        
        try:
            self.gp.fit(X, Y_std)
            mu, sigma = self.gp.predict(self.bounds, return_std=True)
            
            # 计算 EI (Expected Improvement)
            mu_sample_opt = np.max(Y_std)
            with np.errstate(divide='warn'):
                imp = mu - mu_sample_opt
                Z = imp / (sigma + 1e-9)
                ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
                
            best_idx = np.argmax(ei)
            return self.bounds[best_idx][0]
        except:
            # 如果计算出错,返回一个随机值保底
            return np.random.choice(self.bounds.flatten())

4.4 创建 train_main.py (主程序文件)

在终端输入:

gedit train_main.py

复制以下代码:

import torch
import torch.nn as nn # 补全 import
import torch.optim as optim
import numpy as np
import os
from robot_env import ImprovedArmEnv
from ppo_algo import ActorCritic
from bayes_opt import BayesOptimizer

# --- 参数调整 ---
LR_ACTOR = 0.0001  # 【调优】降低学习率,训练更稳
LR_CRITIC = 0.0001 
GAMMA = 0.99
BATCH_SIZE = 64
GAE_LAMBDA = 0.95
CLIP_EPS = 0.2
VALUE_COEF = 0.5
TOTAL_EPISODES = 5000 # 【建议】增加到 2000 轮,给 AI 更多学习时间
MAX_STEPS = 2500

def train():
    env = ImprovedArmEnv()
    
    # 强制 21维状态, 7维动作
    model = ActorCritic(state_dim=21, action_dim=7)
    
    optimizer = optim.Adam(model.parameters(), lr=LR_ACTOR)
    bo = BayesOptimizer()
    
    ent_coef = 0.01 # 初始熵系数
    
    print("=== 开始训练: 最终修正版 (7轴控制 + 奖励缩放 + 稳健参数) ===")
    
    for episode in range(TOTAL_EPISODES):
        state, _ = env.reset()
        total_reward = 0
        
        states, actions, rewards, log_probs, entropies = [], [], [], [], []
        
        # --- 1. 采样 ---
        for _ in range(MAX_STEPS):
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            action, log_prob, entropy = model.get_action(state_tensor)
            
            action_np = action.detach().numpy()[0]
            next_state, reward, terminated, truncated, _ = env.step(action_np)
            
            states.append(state)
            actions.append(action_np)
            rewards.append(reward)
            
            log_probs.append(log_prob.sum().detach().item())
            entropies.append(entropy.sum().detach().item())
            
            state = next_state
            total_reward += reward
            
            if terminated or truncated:
                break
        
        # --- 2. 贝叶斯优化更新 ---
        bo.update(ent_coef, total_reward)
        new_ent_coef = bo.suggest_next_omega()
        
        print(f"Ep {episode+1}/{TOTAL_EPISODES} | Reward: {total_reward:.2f} | 熵系数: {ent_coef:.4f} -> {new_ent_coef:.4f}")
        ent_coef = new_ent_coef 
        
        # --- 3. PPO 更新 ---
        returns = []
        gae = 0
        values = model.critic(torch.FloatTensor(np.array(states))).detach().squeeze().numpy()
        if values.ndim == 0: values = np.array([values])
            
        for i in reversed(range(len(rewards))):
            delta = rewards[i] + GAMMA * (0 if i == len(rewards)-1 else values[i+1]) - values[i]
            gae = delta + GAMMA * GAE_LAMBDA * gae
            returns.insert(0, gae + values[i])
            
        returns = torch.tensor(returns, dtype=torch.float32)
        # 归一化 returns
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-5)
        
        old_states = torch.FloatTensor(np.array(states))
        old_actions = torch.FloatTensor(np.array(actions))
        old_log_probs = torch.FloatTensor(np.array(log_probs))
        
        # PPO 内部更新循环 (改回5次,防止过拟合噪音)
        for _ in range(5):
            _, new_log_probs, dist_entropy = model.get_action(old_states)
            new_log_probs = new_log_probs.sum(axis=1)
            
            ratio = torch.exp(new_log_probs - old_log_probs)
            
            curr_values = model.critic(old_states).squeeze()
            advantages = returns - curr_values.detach()
            
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1-CLIP_EPS, 1+CLIP_EPS) * advantages
            
            actor_loss = -torch.min(surr1, surr2).mean()
            critic_loss = nn.MSELoss()(curr_values, returns)
            
            loss = actor_loss + VALUE_COEF * critic_loss - ent_coef * dist_entropy.sum(axis=1).mean()
            
            optimizer.zero_grad()
            loss.backward()
            
            # 【关键修改】加入梯度裁剪,防止更新幅度过大
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
            
            optimizer.step()
            
        # 模型保存
        if (episode + 1) % 50 == 0:
            if not os.path.exists('models'): os.makedirs('models')
            torch.save(model.state_dict(), f'models/ppo_model_ep{episode+1}.pth')

if __name__ == '__main__':
    train()

三、跑通整个模型

在虚拟环境下(ppo_robot_env),分别创建该4个python代码,所有文件创建好之后,在终端输入以下命令开始运行代码:

python train_main.py

经过多次调试以后,给出正确的代码已经附在上述内容中!现在

三、🧠 现在的状态总结

指标 状态 评价
代码运行 🟢 极其稳定 跑了 1400 多轮无报错,内存无溢出。
智能程度 🟡 渐入佳境 已经学会了抓取动作,现在的任务是提高“成功率”。
收敛趋势 📈 稳步上升 即使是失败的回合,惩罚值也很低,说明策略已经很成熟。

四、可视化机械臂

这里需要做一个专门的“可视化脚本”。

这需要两步操作:

  1. 微调环境代码(让它支持画面显示)。

  2. 新建测试脚本(加载模型并播放动画)

从图中我们可以看到已经跑了1450轮了,在models文件夹中查看模型文件如下

所以我们先修改第一个环境代码文件:也就是robot_env.py,修改为既包含优化逻辑,又支持可视化的真正最终版代码:

import gymnasium as gym
import gymnasium_robotics
import panda_gym
import numpy as np
from gymnasium import spaces

class ImprovedArmEnv(gym.Env):
    # 【关键修复】这里加回了 render_mode 参数
    def __init__(self, render_mode=None):
        super(ImprovedArmEnv, self).__init__()
        
        # --- 1. 环境加载 ---
        try:
            # 将 render_mode 传递给 gym.make,这样才能弹窗
            self.env = gym.make('PandaReach-v3', control_type='joints', render_mode=render_mode)
            self.real_dof = 7
            print(f">>> ✅ 成功加载 PandaReach-v3 (7自由度) | 模式: {render_mode if render_mode else '训练(无窗口)'}")
        except Exception as e:
            print(f">>> ❌ Panda 加载失败: {e}")
            self.env = gym.make('FetchReach-v4')
            self.real_dof = 4 
            
        # --- 2. 状态空间 ---
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(21,), dtype=np.float32)
        
        # --- 3. 动作空间 ---
        self.action_space = spaces.Box(low=-1, high=1, shape=(7,), dtype=np.float32)
        
        # --- 4. 参数 ---
        self.sigma = 0.1
        self.alpha = 1.0
        self.beta = 0.01  # 保持优化后的参数

    def reset(self):
        obs, info = self.env.reset()
        return self._process_obs(obs), info

    def step(self, action):
        real_action = action[:self.real_dof]
        obs, _, terminated, truncated, info = self.env.step(real_action)
        
        current_pos = obs['achieved_goal']
        target_pos = obs['desired_goal']
        
        dist = np.linalg.norm(current_pos - target_pos)
        r_dist = -dist 
        r_ctrl = -np.sum(np.square(action))
        
        reward = self.alpha * r_dist + self.beta * r_ctrl
        
        if dist < 0.05:
            reward += 10.0
            terminated = True
            
        # 奖励缩放
        scaled_reward = reward * 0.1
            
        processed_obs = self._process_obs(obs)
        return processed_obs, scaled_reward, terminated, truncated, info

    def _process_obs(self, raw_obs):
        base_obs = raw_obs['observation']
        theta = np.zeros(7)
        theta_dot = np.zeros(7)
        dim = min(len(base_obs)//2, self.real_dof)
        theta[:dim] = base_obs[:dim]
        theta_dot[:dim] = base_obs[dim:2*dim]
        
        noise_theta = np.random.normal(0, self.sigma, 7)
        noise_vel = np.random.normal(0, self.sigma, 7)
        
        theta_hat = theta + noise_theta
        theta_dot_hat = theta_dot + noise_vel
        theta_goal_diff = np.random.normal(0, 0.01, 7) 
        
        state = np.concatenate([theta_hat, theta_dot_hat, theta_goal_diff])
        return state.astype(np.float32)

第二步:新建 test_visual.py

在项目文件夹里新建一个文件 test_visual.py,粘贴以下代码。

这个脚本的作用是:加载你保存的模型 -> 打开动画窗口 -> 像看电影一样看机械臂抓取

易错:首先要进入到该项目的文件夹并激活虚拟环境:即:

# 先进入项目目录
cd ~/Desktop/PPO_Robot_Project

# 激活环境 
source ~/Desktop/ppo_robot_env/bin/activate
# 或者如果用的是 conda: conda activate ppo_robot_env

接着新建一个python文件即test_visual.py,

gedit test_visual.py

代码如下:这里我们设置为100轮,方便长时间观察机械臂的抓取状态!

import torch
import numpy as np
import time
import os
from robot_env import ImprovedArmEnv
from ppo_algo import ActorCritic

# ================= 配置区域 =================
# 在这里填入你想查看的模型文件名
# 比如你现在跑到了 1450 轮,文件夹里应该有 ppo_model_ep1450.pth
MODEL_PATH = 'models/ppo_model_ep1450.pth' 
# ===========================================

def visualize():
    # 1. 检查模型文件是否存在
    if not os.path.exists(MODEL_PATH):
        print(f"❌ 错误:找不到模型文件 {MODEL_PATH}")
        print("请检查 models 文件夹,看看有哪些 .pth 文件,修改代码中的 MODEL_PATH。")
        return

    print(f">>> 正在加载模型: {MODEL_PATH} ...")
    print(">>> 即将弹出渲染窗口 (按 Ctrl+C 退出)")

    # 2. 创建可视化环境 (render_mode='human' 是关键)
    # 这会弹出一个 PyBullet 的 3D 窗口
    env = ImprovedArmEnv(render_mode='human')
    
    # 3. 加载模型结构
    model = ActorCritic(state_dim=21, action_dim=7)
    
    # 4. 加载训练好的权重
    # map_location='cpu' 保证即使你在 GPU 上训练,也能在普通电脑上跑
    model.load_state_dict(torch.load(MODEL_PATH, map_location=torch.device('cpu')))
    model.eval() # 切换到测试模式

    # 5. 开始循环演示
    for episode in range(10): # 演示 10 轮
        state, _ = env.reset()
        done = False
        total_reward = 0
        step = 0
        
        print(f"=== 第 {episode+1} 轮演示开始 ===")
        
        while not done:
            # 转换状态数据
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            
            # 让模型预测动作 (不计算梯度)
            with torch.no_grad():
                #在此处我们只需要动作均值,不需要随机性,所以直接取 mean (可选)
                # 但为了还原训练效果,我们还是用 sample
                action, _, _ = model.get_action(state_tensor)
                action = action.numpy()[0]

            # 执行动作
            state, reward, terminated, truncated, _ = env.step(action)
            
            done = terminated or truncated
            total_reward += reward
            step += 1
            
            # 【重要】为了让人眼能看清,稍微睡一会
            # 否则动作会快得像鬼畜
            time.sleep(0.05) 

        print(f"第 {episode+1} 轮结束 | 得分: {total_reward:.2f} | 步数: {step}")
        if total_reward > 0:
            print("✨ 抓取成功!")
        else:
            print("💨 抓取失败 (或是没碰到)")
            
        time.sleep(1) # 每一轮结束后停顿一下

    env.close()

if __name__ == '__main__':
    visualize()

最后

查看你的模型文件: 打开左侧文件栏的 models 文件夹,看一眼现在保存了哪些文件。比如你有 ppo_model_ep1450.pth

修改路径: 把 test_visual.py 第 11 行的 MODEL_PATH 改成你实际存在的文件名。

运行: 在终端输入:

python test_visual.py

成功实现了可视化机械臂!当奖励为正数时显示抓取成功!

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐