RL 机器人 sim2sim 八
仅记录,谨慎参考。与训练时给定的环境差不多,进行一些简单的修改。例如:地形,噪声,是否Push,走几个环境,指令等等。
·
仅记录,谨慎参考。
开源来自 星动纪元代码。
1 训练后如何Play。
与训练时给定的环境差不多,进行一些简单的修改。
例如:地形,噪声,是否Push,走几个环境,指令等等。
humanoid\scripts\play.py
def play(args):
env_cfg, train_cfg = task_registry.get_cfgs(name=args.task)
# override some parameters for testing
env_cfg.env.num_envs = min(env_cfg.env.num_envs, 1)
env_cfg.sim.max_gpu_contact_pairs = 2**10
# env_cfg.terrain.mesh_type = 'trimesh'
env_cfg.terrain.mesh_type = 'plane'
env_cfg.terrain.num_rows = 5
env_cfg.terrain.num_cols = 5
env_cfg.terrain.curriculum = False
env_cfg.terrain.max_init_terrain_level = 5
env_cfg.noise.add_noise = True
env_cfg.domain_rand.push_robots = False
env_cfg.domain_rand.joint_angle_noise = 0.
env_cfg.noise.curriculum = False
env_cfg.noise.noise_level = 0.5
train_cfg.seed = 123145
print("train_cfg.runner_class_name:", train_cfg.runner_class_name)
# prepare environment
env, _ = task_registry.make_env(name=args.task, args=args, env_cfg=env_cfg)
env.set_camera(env_cfg.viewer.pos, env_cfg.viewer.lookat)
obs = env.get_observations()
# load policy
train_cfg.runner.resume = True
ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args, train_cfg=train_cfg)
policy = ppo_runner.get_inference_policy(device=env.device)
# export policy as a jit module (used to run it from C++)
if EXPORT_POLICY:
path = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name, 'exported', 'policies')
export_policy_as_jit(ppo_runner.alg.actor_critic, path)
print('Exported policy as jit script to: ', path)
logger = Logger(env.dt)
robot_index = 0 # which robot is used for logging
joint_index = 1 # which joint is used for logging
stop_state_log = 1200 # number of steps before plotting states
if RENDER:
camera_properties = gymapi.CameraProperties()
camera_properties.width = 1920
camera_properties.height = 1080
h1 = env.gym.create_camera_sensor(env.envs[0], camera_properties)
camera_offset = gymapi.Vec3(1, -1, 0.5)
camera_rotation = gymapi.Quat.from_axis_angle(gymapi.Vec3(-0.3, 0.2, 1),
np.deg2rad(135))
actor_handle = env.gym.get_actor_handle(env.envs[0], 0)
body_handle = env.gym.get_actor_rigid_body_handle(env.envs[0], actor_handle, 0)
env.gym.attach_camera_to_body(
h1, env.envs[0], body_handle,
gymapi.Transform(camera_offset, camera_rotation),
gymapi.FOLLOW_POSITION)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
video_dir = os.path.join(LEGGED_GYM_ROOT_DIR, 'videos')
experiment_dir = os.path.join(LEGGED_GYM_ROOT_DIR, 'videos', train_cfg.runner.experiment_name)
dir = os.path.join(experiment_dir, datetime.now().strftime('%b%d_%H-%M-%S')+ args.run_name + '.mp4')
if not os.path.exists(video_dir):
os.mkdir(video_dir)
if not os.path.exists(experiment_dir):
os.mkdir(experiment_dir)
video = cv2.VideoWriter(dir, fourcc, 50.0, (1920, 1080))
for i in tqdm(range(stop_state_log)):
# 输入Observe,获取动作
actions = policy(obs.detach()) # * 0.
if FIX_COMMAND:
env.commands[:, 0] = 0.5 # 1.0
env.commands[:, 1] = 0.
env.commands[:, 2] = 0.
env.commands[:, 3] = 0.
# 动作给到环境,
# 执行了训练时也执行的动作的各项操作流程。
# 修改了噪声,但并没有多余操作动作。如延迟,域随机化,剪裁,缩放,PD控制等。
obs, critic_obs, rews, dones, infos = env.step(actions.detach())
if RENDER:
env.gym.fetch_results(env.sim, True)
env.gym.step_graphics(env.sim)
env.gym.render_all_camera_sensors(env.sim)
img = env.gym.get_camera_image(env.sim, env.envs[0], h1, gymapi.IMAGE_COLOR)
img = np.reshape(img, (1080, 1920, 4))
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
video.write(img[..., :3])
logger.log_states(
{
'dof_pos_target': actions[robot_index, joint_index].item() * env.cfg.control.action_scale,
'dof_pos': env.dof_pos[robot_index, joint_index].item(),
'dof_vel': env.dof_vel[robot_index, joint_index].item(),
'dof_torque': env.torques[robot_index, joint_index].item(),
'command_x': env.commands[robot_index, 0].item(),
'command_y': env.commands[robot_index, 1].item(),
'command_yaw': env.commands[robot_index, 2].item(),
'base_vel_x': env.base_lin_vel[robot_index, 0].item(),
'base_vel_y': env.base_lin_vel[robot_index, 1].item(),
'base_vel_z': env.base_lin_vel[robot_index, 2].item(),
'base_vel_yaw': env.base_ang_vel[robot_index, 2].item(),
'contact_forces_z': env.contact_forces[robot_index, env.feet_indices, 2].cpu().numpy()
}
)
# ====================== Log states ======================
if infos["episode"]:
num_episodes = torch.sum(env.reset_buf).item()
if num_episodes>0:
logger.log_rewards(infos["episode"], num_episodes)
logger.print_rewards()
logger.plot_states()
if RENDER:
video.release()
if __name__ == '__main__':
EXPORT_POLICY = True
RENDER = True
FIX_COMMAND = True
args = get_args()
play(args)
2 如何sim2sim
humanoid\scripts\sim2sim.py
主要是控制环境的搭建,obs输入,action输出的一些操作。需要注意,具体看注释
import math
import numpy as np
import mujoco, mujoco_viewer
from tqdm import tqdm
from collections import deque
from scipy.spatial.transform import Rotation as R
from humanoid import LEGGED_GYM_ROOT_DIR
from humanoid.envs import XBotLCfg
import torch
# 指令
class cmd:
vx = 0.4
vy = 0.0
dyaw = 0.0
# 四元数转欧拉角
def quaternion_to_euler_array(quat):
# Ensure quaternion is in the correct format [x, y, z, w]
x, y, z, w = quat
# Roll (x-axis rotation)
t0 = +2.0 * (w * x + y * z)
t1 = +1.0 - 2.0 * (x * x + y * y)
roll_x = np.arctan2(t0, t1)
# Pitch (y-axis rotation)
t2 = +2.0 * (w * y - z * x)
t2 = np.clip(t2, -1.0, 1.0)
pitch_y = np.arcsin(t2)
# Yaw (z-axis rotation)
t3 = +2.0 * (w * z + x * y)
t4 = +1.0 - 2.0 * (y * y + z * z)
yaw_z = np.arctan2(t3, t4)
# Returns roll, pitch, yaw in a NumPy array in radians
return np.array([roll_x, pitch_y, yaw_z])
def get_obs(data):
'''Extracts an observation from the mujoco data structure
从mujoco 中获取观测值
'''
# 关节位置
q = data.qpos.astype(np.double)
# 关节速度(应该是广义速度,前三个是Body的线速度)
dq = data.qvel.astype(np.double)
# 调整四元素顺序,四元数的顺序是 [w, x, y, z]。而这段代码将其顺序调整为 [x, y, z, w]
quat = data.sensor('orientation').data[[1, 2, 3, 0]].astype(np.double)
# 四元数转旋转矩阵
r = R.from_quat(quat)
# 提取广义速度的的前三个分量,然后从世界坐标系转到本体坐标系
v = r.apply(data.qvel[:3], inverse=True).astype(np.double) # In the base frame
# 获取角速度的信息XYZ
omega = data.sensor('angular-velocity').data.astype(np.double)
# 将重力方向(假设在世界坐标系下为 0, 0, -1)转换到机器人基坐标系中
# 就是表示重力的影响方向。为Z轴的负方向。
gvec = r.apply(np.array([0., 0., -1.]), inverse=True).astype(np.double)
return (q, dq, quat, v, omega, gvec)
def pd_control(target_q, q, kp, target_dq, dq, kd):
'''Calculates torques from position commands
'''
# PD控制器,无前馈
return (target_q - q) * kp + (target_dq - dq) * kd
def run_mujoco(policy, cfg):
"""
Run the Mujoco simulation using the provided policy and configuration.
Args:
policy: The policy used for controlling the simulation.
cfg: The configuration object containing simulation settings.
Returns:
None
"""
# 加载模型
model = mujoco.MjModel.from_xml_path(cfg.sim_config.mujoco_model_path)
# 时间步长
model.opt.timestep = cfg.sim_config.dt
# 创建数据结构
data = mujoco.MjData(model)
# 进行仿真步骤
mujoco.mj_step(model, data)
# 可视化
viewer = mujoco_viewer.MujocoViewer(model, data)
# 初始化目标动作,为位置
target_q = np.zeros((cfg.env.num_actions), dtype=np.double)
action = np.zeros((cfg.env.num_actions), dtype=np.double)
# 历史观测放入一个队列
hist_obs = deque()
# frame_stack= 15,num_single_obs观测维度= 47(无特权的正常观测)
# 15*47个观测
for _ in range(cfg.env.frame_stack):
hist_obs.append(np.zeros([1, cfg.env.num_single_obs], dtype=np.double))
count_lowlevel = 0
# 迭代的次数是通过计算仿真持续时间 sim_duration 除以时间步长 dt 得到的
for _ in tqdm(range(int(cfg.sim_config.sim_duration / cfg.sim_config.dt)), desc="Simulating..."):
# Obtain an observation
# 获取观察值,可以看到,mujoco里获取观测是十分容易的。
q, dq, quat, v, omega, gvec = get_obs(data)
# 从q中取后面num_actions个,因为前面的是广义的位置姿态什么的,不是关节角
q = q[-cfg.env.num_actions:]
dq = dq[-cfg.env.num_actions:]
# 1000hz -> 100hz
# 通过计数降低频率
# 以下是10个dt 执行一次
if count_lowlevel % cfg.sim_config.decimation == 0:
# 定义观测 1*47
obs = np.zeros([1, cfg.env.num_single_obs], dtype=np.float32)
# 欧拉角
eu_ang = quaternion_to_euler_array(quat)
# 欧拉角的标准范围通常是 [-π, π]。如果某个欧拉角的值大于 π,则减去 2π
eu_ang[eu_ang > math.pi] -= 2 * math.pi
#开始给观测赋值
# 一个0.64秒的周期性波形,模拟相位发生器
obs[0, 0] = math.sin(2 * math.pi * count_lowlevel * cfg.sim_config.dt / 0.64)
obs[0, 1] = math.cos(2 * math.pi * count_lowlevel * cfg.sim_config.dt / 0.64)
# obs_scales 配置为2
# 对观测的值进行量纲的缩放
obs[0, 2] = cmd.vx * cfg.normalization.obs_scales.lin_vel
obs[0, 3] = cmd.vy * cfg.normalization.obs_scales.lin_vel # 2
obs[0, 4] = cmd.dyaw * cfg.normalization.obs_scales.ang_vel # 0.25
obs[0, 5:17] = q * cfg.normalization.obs_scales.dof_pos # 1
obs[0, 17:29] = dq * cfg.normalization.obs_scales.dof_vel # 0.05
# 上次的动作,经过剪裁后的action.
obs[0, 29:41] = action
# 角速度
obs[0, 41:44] = omega
# 姿态欧拉角
obs[0, 44:47] = eu_ang
# 18,观测阈值剪裁,这样做是为了防止观测数据中出现异常值或过大的数值,这对于训练稳定的控制算法非常重要
obs = np.clip(obs, -cfg.normalization.clip_observations, cfg.normalization.clip_observations)
# 加入队列
hist_obs.append(obs)
# 保持队列长度一致
hist_obs.popleft()
# 策略(RL控制器的)输入
policy_input = np.zeros([1, cfg.env.num_observations], dtype=np.float32)
# 在15个历史观测下
# 将历史观测(hist_obs[i])中的数据拼接到 policy_input 数组中,移位47。
for i in range(cfg.env.frame_stack):
policy_input[0, i * cfg.env.num_single_obs : (i + 1) * cfg.env.num_single_obs] = hist_obs[i][0, :]
# policy_input输入47 * 15,个数据,然后输出动作(12维的)
action[:] = policy(torch.tensor(policy_input))[0].detach().numpy()
# 对输出的动作进行剪裁 ,正负18
action = np.clip(action, -cfg.normalization.clip_actions, cfg.normalization.clip_actions)
# 关节力矩 = 关节位置增量(就是相对零点的),动作输出 * 0.25
target_q = action * cfg.control.action_scale
# 期望速度为0。
target_dq = np.zeros((cfg.env.num_actions), dtype=np.double)
# Generate PD control
# PD控制器输出力矩
# q是当前位置,未使用默认位置,那么就是都是基于同一零点的绝对位置
tau = pd_control(target_q, q, cfg.robot_config.kps,
target_dq, dq, cfg.robot_config.kds) # Calc torques
# 限制力矩范围 正负200
tau = np.clip(tau, -cfg.robot_config.tau_limit, cfg.robot_config.tau_limit) # Clamp torques
# 输出的控制就是力矩
# 这里10ms中输出了10个力矩,并没有做插值,但是按1ms的关节反馈,输出力矩应该是按反馈变化的。
# 但这10ms内的目标关节位置是不变的
data.ctrl = tau
# 步进
mujoco.mj_step(model, data)
# 仿真渲染
viewer.render()
count_lowlevel += 1
viewer.close()
# 分析:
# 在sim2sim中,从策略获取到动作(基于零点的一个绝对位置,弧度)后
# 1对动作进行剪裁(不是软限位,正负18)
# 2对动作进行缩放 (0.25)
# 3作为目标的关节位置。
# 4 PD控制器算力矩(action 位置+ 默认位置 - 当前关节位置)
# 5 限制力矩范围
# 6下发到电机去
# 对比一下play:
# 获取到action
# 1随机延迟模拟
# 2高斯噪声模拟
# 3对动作进行剪裁(正负18)
# 4对动作进行缩放(0.25)
# 5 PD控制器 (action - 当前关节位置)
# 5限制力矩范围
# 6下发到电机去
# 对于噪声延迟等,仿真可以暂时去掉
# 默认位置的定义:default_joint_angles = # = target angles [rad] when action = 0.0
# 若定义的默认位置不是0,那么sim2sim 中的PD控制器,就需要加上默认定义位置这一项
# 因为训练出来的action输出是基于默认关节位置的。
# 仿真中并没有定义关节软限位
# 也没有限制关节的转动速度
# 关节的正方向应该是从urdf,mujoco 的xml ,isaac的USD是约定统一的。
# 在训练和sim时包括real 时,对于剪裁及缩放,一定要使用相同的参数。
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Deployment script.')
parser.add_argument('--load_model', type=str, required=True,
help='Run to load from.')
parser.add_argument('--terrain', action='store_true', help='terrain or plane')
args = parser.parse_args()
# 配置
class Sim2simCfg(XBotLCfg):
# 新建的数据结构
class sim_config:
#如果使用了地形,加载不同的模型路径
if args.terrain:
mujoco_model_path = f'{LEGGED_GYM_ROOT_DIR}/resources/robots/XBot/mjcf/XBot-L-terrain.xml'
else:
mujoco_model_path = f'{LEGGED_GYM_ROOT_DIR}/resources/robots/XBot/mjcf/XBot-L.xml'
# 仿真时间60s
sim_duration = 60.0
# 1000HZ
dt = 0.001
# 控制频率 100HZ
decimation = 10
class robot_config:
# KP KD 和关节力矩限制
kps = np.array([200, 200, 350, 350, 15, 15, 200, 200, 350, 350, 15, 15], dtype=np.double)
kds = np.array([10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10], dtype=np.double)
tau_limit = 200. * np.ones(12, dtype=np.double)
# 控制策略 (加载的模型)
policy = torch.jit.load(args.load_model)
run_mujoco(policy, Sim2simCfg())
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)