仅记录,谨慎参考。

开源来自 星动纪元代码。

1 训练后如何Play。

   与训练时给定的环境差不多,进行一些简单的修改。

   例如:地形,噪声,是否Push,走几个环境,指令等等。

humanoid\scripts\play.py

def play(args):
    env_cfg, train_cfg = task_registry.get_cfgs(name=args.task)
    # override some parameters for testing
    env_cfg.env.num_envs = min(env_cfg.env.num_envs, 1)
    env_cfg.sim.max_gpu_contact_pairs = 2**10
    # env_cfg.terrain.mesh_type = 'trimesh'
    env_cfg.terrain.mesh_type = 'plane'
    env_cfg.terrain.num_rows = 5
    env_cfg.terrain.num_cols = 5
    env_cfg.terrain.curriculum = False     
    env_cfg.terrain.max_init_terrain_level = 5
    env_cfg.noise.add_noise = True
    env_cfg.domain_rand.push_robots = False 
    env_cfg.domain_rand.joint_angle_noise = 0.
    env_cfg.noise.curriculum = False
    env_cfg.noise.noise_level = 0.5


    train_cfg.seed = 123145
    print("train_cfg.runner_class_name:", train_cfg.runner_class_name)

    # prepare environment
    env, _ = task_registry.make_env(name=args.task, args=args, env_cfg=env_cfg)
    env.set_camera(env_cfg.viewer.pos, env_cfg.viewer.lookat)

    obs = env.get_observations()

    # load policy
    train_cfg.runner.resume = True
    ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args, train_cfg=train_cfg)
    policy = ppo_runner.get_inference_policy(device=env.device)
    
    # export policy as a jit module (used to run it from C++)
    if EXPORT_POLICY:
        path = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name, 'exported', 'policies')
        export_policy_as_jit(ppo_runner.alg.actor_critic, path)
        print('Exported policy as jit script to: ', path)

    logger = Logger(env.dt)
    robot_index = 0 # which robot is used for logging
    joint_index = 1 # which joint is used for logging
    stop_state_log = 1200 # number of steps before plotting states
    if RENDER:
        camera_properties = gymapi.CameraProperties()
        camera_properties.width = 1920
        camera_properties.height = 1080
        h1 = env.gym.create_camera_sensor(env.envs[0], camera_properties)
        camera_offset = gymapi.Vec3(1, -1, 0.5)
        camera_rotation = gymapi.Quat.from_axis_angle(gymapi.Vec3(-0.3, 0.2, 1),
                                                    np.deg2rad(135))
        actor_handle = env.gym.get_actor_handle(env.envs[0], 0)
        body_handle = env.gym.get_actor_rigid_body_handle(env.envs[0], actor_handle, 0)
        env.gym.attach_camera_to_body(
            h1, env.envs[0], body_handle,
            gymapi.Transform(camera_offset, camera_rotation),
            gymapi.FOLLOW_POSITION)

        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        video_dir = os.path.join(LEGGED_GYM_ROOT_DIR, 'videos')
        experiment_dir = os.path.join(LEGGED_GYM_ROOT_DIR, 'videos', train_cfg.runner.experiment_name)
        dir = os.path.join(experiment_dir, datetime.now().strftime('%b%d_%H-%M-%S')+ args.run_name + '.mp4')
        if not os.path.exists(video_dir):
            os.mkdir(video_dir)
        if not os.path.exists(experiment_dir):
            os.mkdir(experiment_dir)
        video = cv2.VideoWriter(dir, fourcc, 50.0, (1920, 1080))

    for i in tqdm(range(stop_state_log)):

        # 输入Observe,获取动作
        actions = policy(obs.detach()) # * 0.
        
        if FIX_COMMAND:
            env.commands[:, 0] = 0.5    # 1.0
            env.commands[:, 1] = 0.
            env.commands[:, 2] = 0.
            env.commands[:, 3] = 0.
        # 动作给到环境,
        # 执行了训练时也执行的动作的各项操作流程。
        # 修改了噪声,但并没有多余操作动作。如延迟,域随机化,剪裁,缩放,PD控制等。
        obs, critic_obs, rews, dones, infos = env.step(actions.detach())

        if RENDER:
            env.gym.fetch_results(env.sim, True)
            env.gym.step_graphics(env.sim)
            env.gym.render_all_camera_sensors(env.sim)
            img = env.gym.get_camera_image(env.sim, env.envs[0], h1, gymapi.IMAGE_COLOR)
            img = np.reshape(img, (1080, 1920, 4))
            img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
            video.write(img[..., :3])

        logger.log_states(
            {
                'dof_pos_target': actions[robot_index, joint_index].item() * env.cfg.control.action_scale,
                'dof_pos': env.dof_pos[robot_index, joint_index].item(),
                'dof_vel': env.dof_vel[robot_index, joint_index].item(),
                'dof_torque': env.torques[robot_index, joint_index].item(),
                'command_x': env.commands[robot_index, 0].item(),
                'command_y': env.commands[robot_index, 1].item(),
                'command_yaw': env.commands[robot_index, 2].item(),
                'base_vel_x': env.base_lin_vel[robot_index, 0].item(),
                'base_vel_y': env.base_lin_vel[robot_index, 1].item(),
                'base_vel_z': env.base_lin_vel[robot_index, 2].item(),
                'base_vel_yaw': env.base_ang_vel[robot_index, 2].item(),
                'contact_forces_z': env.contact_forces[robot_index, env.feet_indices, 2].cpu().numpy()
            }
            )
        # ====================== Log states ======================
        if infos["episode"]:
            num_episodes = torch.sum(env.reset_buf).item()
            if num_episodes>0:
                logger.log_rewards(infos["episode"], num_episodes)

    logger.print_rewards()
    logger.plot_states()
    
    if RENDER:
        video.release()

if __name__ == '__main__':
    EXPORT_POLICY = True
    RENDER = True
    FIX_COMMAND = True
    args = get_args()
    play(args)

2 如何sim2sim

humanoid\scripts\sim2sim.py

主要是控制环境的搭建,obs输入,action输出的一些操作。需要注意,具体看注释

import math
import numpy as np
import mujoco, mujoco_viewer
from tqdm import tqdm
from collections import deque
from scipy.spatial.transform import Rotation as R
from humanoid import LEGGED_GYM_ROOT_DIR
from humanoid.envs import XBotLCfg
import torch

# 指令
class cmd:
    vx = 0.4
    vy = 0.0
    dyaw = 0.0

# 四元数转欧拉角
def quaternion_to_euler_array(quat):
    # Ensure quaternion is in the correct format [x, y, z, w]
    x, y, z, w = quat
    
    # Roll (x-axis rotation)
    t0 = +2.0 * (w * x + y * z)
    t1 = +1.0 - 2.0 * (x * x + y * y)
    roll_x = np.arctan2(t0, t1)
    
    # Pitch (y-axis rotation)
    t2 = +2.0 * (w * y - z * x)
    t2 = np.clip(t2, -1.0, 1.0)
    pitch_y = np.arcsin(t2)
    
    # Yaw (z-axis rotation)
    t3 = +2.0 * (w * z + x * y)
    t4 = +1.0 - 2.0 * (y * y + z * z)
    yaw_z = np.arctan2(t3, t4)
    
    # Returns roll, pitch, yaw in a NumPy array in radians
    return np.array([roll_x, pitch_y, yaw_z])

def get_obs(data):
    '''Extracts an observation from the mujoco data structure
    从mujoco 中获取观测值
    '''
    # 关节位置
    q = data.qpos.astype(np.double)
    # 关节速度(应该是广义速度,前三个是Body的线速度)
    dq = data.qvel.astype(np.double)
    # 调整四元素顺序,四元数的顺序是 [w, x, y, z]。而这段代码将其顺序调整为 [x, y, z, w]
    quat = data.sensor('orientation').data[[1, 2, 3, 0]].astype(np.double)
    # 四元数转旋转矩阵
    r = R.from_quat(quat)
    # 提取广义速度的的前三个分量,然后从世界坐标系转到本体坐标系
    v = r.apply(data.qvel[:3], inverse=True).astype(np.double)  # In the base frame
    # 获取角速度的信息XYZ
    omega = data.sensor('angular-velocity').data.astype(np.double)
    #  将重力方向(假设在世界坐标系下为 0, 0, -1)转换到机器人基坐标系中
    # 就是表示重力的影响方向。为Z轴的负方向。
    gvec = r.apply(np.array([0., 0., -1.]), inverse=True).astype(np.double)
    return (q, dq, quat, v, omega, gvec)

def pd_control(target_q, q, kp, target_dq, dq, kd):
    '''Calculates torques from position commands
    '''
    # PD控制器,无前馈
    return (target_q - q) * kp + (target_dq - dq) * kd

def run_mujoco(policy, cfg):
    """
    Run the Mujoco simulation using the provided policy and configuration.

    Args:
        policy: The policy used for controlling the simulation.
        cfg: The configuration object containing simulation settings.

    Returns:
        None
    """
    # 加载模型
    model = mujoco.MjModel.from_xml_path(cfg.sim_config.mujoco_model_path)
    # 时间步长
    model.opt.timestep = cfg.sim_config.dt
    # 创建数据结构
    data = mujoco.MjData(model)
    # 进行仿真步骤
    mujoco.mj_step(model, data)
    # 可视化
    viewer = mujoco_viewer.MujocoViewer(model, data)

    # 初始化目标动作,为位置
    target_q = np.zeros((cfg.env.num_actions), dtype=np.double)
    action = np.zeros((cfg.env.num_actions), dtype=np.double)

    # 历史观测放入一个队列
    hist_obs = deque()
    #  frame_stack= 15,num_single_obs观测维度= 47(无特权的正常观测)
    # 15*47个观测
    for _ in range(cfg.env.frame_stack):
        hist_obs.append(np.zeros([1, cfg.env.num_single_obs], dtype=np.double))

    count_lowlevel = 0

    # 迭代的次数是通过计算仿真持续时间 sim_duration 除以时间步长 dt 得到的
    for _ in tqdm(range(int(cfg.sim_config.sim_duration / cfg.sim_config.dt)), desc="Simulating..."):

        # Obtain an observation
        # 获取观察值,可以看到,mujoco里获取观测是十分容易的。
        q, dq, quat, v, omega, gvec = get_obs(data)
        # 从q中取后面num_actions个,因为前面的是广义的位置姿态什么的,不是关节角
        q = q[-cfg.env.num_actions:]
        dq = dq[-cfg.env.num_actions:]

        # 1000hz -> 100hz
        # 通过计数降低频率
        # 以下是10个dt 执行一次
        if count_lowlevel % cfg.sim_config.decimation == 0:
            # 定义观测 1*47
            obs = np.zeros([1, cfg.env.num_single_obs], dtype=np.float32)
            # 欧拉角
            eu_ang = quaternion_to_euler_array(quat)
            # 欧拉角的标准范围通常是 [-π, π]。如果某个欧拉角的值大于 π,则减去 2π
            eu_ang[eu_ang > math.pi] -= 2 * math.pi
            
            #开始给观测赋值
            # 一个0.64秒的周期性波形,模拟相位发生器
            obs[0, 0] = math.sin(2 * math.pi * count_lowlevel * cfg.sim_config.dt  / 0.64)
            obs[0, 1] = math.cos(2 * math.pi * count_lowlevel * cfg.sim_config.dt  / 0.64)
            # obs_scales 配置为2
            # 对观测的值进行量纲的缩放
            obs[0, 2] = cmd.vx * cfg.normalization.obs_scales.lin_vel
            obs[0, 3] = cmd.vy * cfg.normalization.obs_scales.lin_vel # 2
            obs[0, 4] = cmd.dyaw * cfg.normalization.obs_scales.ang_vel # 0.25
            obs[0, 5:17] = q * cfg.normalization.obs_scales.dof_pos   # 1
            obs[0, 17:29] = dq * cfg.normalization.obs_scales.dof_vel  # 0.05
            # 上次的动作,经过剪裁后的action.
            obs[0, 29:41] = action
            # 角速度
            obs[0, 41:44] = omega
            # 姿态欧拉角
            obs[0, 44:47] = eu_ang
            
            # 18,观测阈值剪裁,这样做是为了防止观测数据中出现异常值或过大的数值,这对于训练稳定的控制算法非常重要
            obs = np.clip(obs, -cfg.normalization.clip_observations, cfg.normalization.clip_observations)

            # 加入队列
            hist_obs.append(obs)
            # 保持队列长度一致
            hist_obs.popleft()

            # 策略(RL控制器的)输入
            policy_input = np.zeros([1, cfg.env.num_observations], dtype=np.float32)
            # 在15个历史观测下
            # 将历史观测(hist_obs[i])中的数据拼接到 policy_input 数组中,移位47。
            for i in range(cfg.env.frame_stack):
                policy_input[0, i * cfg.env.num_single_obs : (i + 1) * cfg.env.num_single_obs] = hist_obs[i][0, :]
            
            # policy_input输入47 * 15,个数据,然后输出动作(12维的)
            action[:] = policy(torch.tensor(policy_input))[0].detach().numpy()
            # 对输出的动作进行剪裁 ,正负18
            action = np.clip(action, -cfg.normalization.clip_actions, cfg.normalization.clip_actions)
            # 关节力矩 = 关节位置增量(就是相对零点的),动作输出 * 0.25
            target_q = action * cfg.control.action_scale

        # 期望速度为0。
        target_dq = np.zeros((cfg.env.num_actions), dtype=np.double)
        # Generate PD control
        # PD控制器输出力矩
        # q是当前位置,未使用默认位置,那么就是都是基于同一零点的绝对位置
        tau = pd_control(target_q, q, cfg.robot_config.kps,
                        target_dq, dq, cfg.robot_config.kds)  # Calc torques
        # 限制力矩范围 正负200
        tau = np.clip(tau, -cfg.robot_config.tau_limit, cfg.robot_config.tau_limit)  # Clamp torques
        # 输出的控制就是力矩
        # 这里10ms中输出了10个力矩,并没有做插值,但是按1ms的关节反馈,输出力矩应该是按反馈变化的。
        # 但这10ms内的目标关节位置是不变的
        data.ctrl = tau

        # 步进
        mujoco.mj_step(model, data)
        # 仿真渲染
        viewer.render()
        count_lowlevel += 1

    viewer.close()

    # 分析:
    # 在sim2sim中,从策略获取到动作(基于零点的一个绝对位置,弧度)后
    # 1对动作进行剪裁(不是软限位,正负18)
    # 2对动作进行缩放 (0.25)
    # 3作为目标的关节位置。
    # 4 PD控制器算力矩(action 位置+ 默认位置 - 当前关节位置)
    # 5 限制力矩范围
    # 6下发到电机去

    # 对比一下play:
    # 获取到action
    # 1随机延迟模拟
    # 2高斯噪声模拟
    # 3对动作进行剪裁(正负18)
    # 4对动作进行缩放(0.25)
    # 5 PD控制器 (action - 当前关节位置)
    # 5限制力矩范围
    # 6下发到电机去

    # 对于噪声延迟等,仿真可以暂时去掉
    # 默认位置的定义:default_joint_angles =  # = target angles [rad] when action = 0.0
    # 若定义的默认位置不是0,那么sim2sim 中的PD控制器,就需要加上默认定义位置这一项
    # 因为训练出来的action输出是基于默认关节位置的。
    # 仿真中并没有定义关节软限位
    # 也没有限制关节的转动速度
    # 关节的正方向应该是从urdf,mujoco 的xml ,isaac的USD是约定统一的。
    # 在训练和sim时包括real 时,对于剪裁及缩放,一定要使用相同的参数。

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser(description='Deployment script.')
    parser.add_argument('--load_model', type=str, required=True,
                        help='Run to load from.')
    parser.add_argument('--terrain', action='store_true', help='terrain or plane')
    args = parser.parse_args()

    # 配置
    class Sim2simCfg(XBotLCfg):
        # 新建的数据结构    
        class sim_config:
            #如果使用了地形,加载不同的模型路径
            if args.terrain:
                mujoco_model_path = f'{LEGGED_GYM_ROOT_DIR}/resources/robots/XBot/mjcf/XBot-L-terrain.xml'
            else:
                mujoco_model_path = f'{LEGGED_GYM_ROOT_DIR}/resources/robots/XBot/mjcf/XBot-L.xml'
            # 仿真时间60s
            sim_duration = 60.0
            # 1000HZ 
            dt = 0.001
            # 控制频率 100HZ
            decimation = 10

        class robot_config:
            # KP KD 和关节力矩限制
            kps = np.array([200, 200, 350, 350, 15, 15, 200, 200, 350, 350, 15, 15], dtype=np.double)
            kds = np.array([10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10], dtype=np.double)
            tau_limit = 200. * np.ones(12, dtype=np.double)

    # 控制策略 (加载的模型)
    policy = torch.jit.load(args.load_model)
    run_mujoco(policy, Sim2simCfg())

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐