【legged_gym学习】legged_robot.py部分解读三(环境与资产初始化 Environment & Asset Setup)
模块是整个 RL 训练框架的基础建设。它不仅决定了机器人模型的物理准确性,更通过预分配显存和 Tensor Wrapping 技术,锁死了整个系统的高性能上限。做好了这一步,后续的控制逻辑和网络训练才能在一个稳固且极速的世界中狂奔。
在上一篇文章中,我们详细拆解了强化学习的核心循环 (Core RL Loop),了解了智能体是如何在时间步的推进中与环境交互的。但在这套循环跑起来之前,我们需要先“创世”——也就是在显存中构建出成千上万个平行的物理世界,并将我们的机器人正确地放入其中。
今天,我们将聚焦框架的第二大核心模块:环境与资产初始化 (Environment & Asset Setup)。在基于 Isaac Gym 等 GPU 加速的仿真器中,高效的初始化和内存预分配是实现几万帧/秒极速训练的基础。
以下是构建这个虚拟物理世界的关键函数拆解:
1. 万物始于构造:__init__
作为整个环境类的构造函数,__init__ 承担了总指挥的角色。它并不直接处理物理底层的细节,而是负责宏观的统筹与准备工作。
-
配置解析 (Config Parsing):首先,它会读取并解析外部传入的配置文件(如
.yaml或 Python Config 类),获取诸如环境数量(num_envs)、控制频率、重力系数、各项奖励权重等关键参数。 -
按序创世:接着,它会按严格的顺序调用底层的创建函数(如接下来要讲的
create_sim),真正启动物理引擎。 -
张量缓冲区初始化:这是至关重要的一步。为了极致的性能,在此阶段会根据环境数量和机器人的自由度(DOF)预先分配好所有用于存储状态的张量(Tensors)的内存空间,例如观测值缓冲
self.obs_buf、奖励缓冲self.rew_buf等。这种预分配避免了在训练循环中动态申请内存带来的极大开销。
def __init__(self, cfg: LeggedRobotCfg, sim_params, physics_engine, sim_device, headless):
""" Parses the provided config file,
calls create_sim() (which creates, simulation, terrain and environments),
initilizes pytorch buffers used during training
Args:
cfg (Dict): Environment config file
sim_params (gymapi.SimParams): simulation parameters
physics_engine (gymapi.SimType): gymapi.SIM_PHYSX (must be PhysX)
device_type (string): 'cuda' or 'cpu'
device_id (int): 0, 1, ...
headless (bool): Run without rendering if True
解析配置文件,创建仿真器(包括地形和并行环境),并初始化 PyTorch buffer
"""
self.cfg = cfg# 将配置参数保存为实例变量
self.sim_params = sim_params# 将仿真参数保存为实例变量
self.height_samples = None# 用于存储高度采样点的变量
self.debug_viz = False# 是否启用调试可视化
self.init_done = False# 初始化是否完成
self._parse_cfg(self.cfg)# 解析并转换配置参数
super().__init__(self.cfg, sim_params, physics_engine, sim_device, headless) #将配置参数传递给父类构造函数,完成基础环境的初始化
if not self.headless:# 如果不是无头模式,则创建仿真器(包括地形和并行环境)并设置摄像机位置
self.set_camera(self.cfg.viewer.pos, self.cfg.viewer.lookat)#设置相机位置和观察点
# 初始化用于存放底层物理引擎数据的 PyTorch 张量
self._init_buffers()
self._prepare_reward_function()# 根据配置准备奖励函数和奖励缩放因子
self.init_done = True #完成初始化标志设置为 True,表示环境已经准备好可以开始训练
def _prepare_reward_function(self):
""" Prepares a list of reward functions, whcih will be called to compute the total reward.
Looks for self._reward_<REWARD_NAME>, where <REWARD_NAME> are names of all non zero reward scales in the cfg.
"""
# remove zero scales + multiply non-zero ones by dt
for key in list(self.reward_scales.keys()): # 遍历所有奖励缩放因子
scale = self.reward_scales[key]# 获取当前奖励的缩放因子
if scale==0:# 如果奖励缩放因子为零
self.reward_scales.pop(key) # 从字典中移除该奖励
else:
self.reward_scales[key] *= self.dt# 将非零奖励缩放因子乘以时间步长
# prepare list of functions
self.reward_functions = []# 初始化奖励函数列表
self.reward_names = []# 初始化奖励名称列表
for name, scale in self.reward_scales.items():# 遍历所有非零奖励缩放因子
if name=="termination":# 如果奖励名称为 "termination",则跳过(因为结束奖励将在 compute_reward 中单独处理)
continue
self.reward_names.append(name)# 添加奖励名称到列表
name = '_reward_' + name# 将奖励名称前缀添加为 '_reward_'
self.reward_functions.append(getattr(self, name))# 添加奖励函数到列表
# reward episode sums
# 初始化每个环境的奖励总和
self.episode_sums = {name: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False)
for name in self.reward_scales.keys()}
2. 物理世界的基石:create_sim()
如果说 __init__ 是图纸,那么 create_sim() 就是真正的施工现场。它直接与底层物理引擎 API 打交道。
-
引擎实例化:它负责创建 Isaac Gym 的物理模拟器实例,设定使用的物理后端(如 PhysX)以及基础的物理参数(如仿真步长
dt、求解器迭代次数等)。 -
地形生成分发:根据配置文件中的设定,它会决定当前需要什么样的训练场地,并调用对应的地形生成函数(平地、阶梯或崎岖地形)。
-
环境克隆与实例化:当地形准备就绪后,它会调用
_create_envs批量生成指定数量的机器人环境。
def create_sim(self):
""" Creates simulation, terrain and evironments
"""
self.up_axis_idx = 2 # 设置仿真中的向上轴索引为2,表示Z轴为向上方向
self.sim = self.gym.create_sim(self.sim_device_id, self.graphics_device_id, self.physics_engine, self.sim_params) #创建仿真器实例,指定设备、物理引擎和仿真参数
mesh_type = self.cfg.terrain.mesh_type # 获取地形网格类型
if mesh_type in ['heightfield', 'trimesh']: # 如果地形网格类型为高度场或三角网格
self.terrain = Terrain(self.cfg.terrain, self.num_envs)# 创建地形实例
if mesh_type=='plane':
self._create_ground_plane()
elif mesh_type=='heightfield':
self._create_heightfield()
elif mesh_type=='trimesh':
self._create_trimesh()
elif mesh_type is not None:
raise ValueError("Terrain mesh type not recognised. Allowed types are [None, plane, heightfield, trimesh]")
self._create_envs()# 创建环境
3. 机器人的降生与解剖:_create_envs()
这个函数是将具体的机器人模型导入虚拟世界的关键环节。无论是常规的四足机器狗,还是自由度更高、动力学更复杂的六足机器人,都需要在这里进行精确的资产(Asset)配置。
-
加载 URDF/MJCF:解析机器人的描述文件,将静态的三维网格和物理参数(质量、惯性张量、碰撞几何体)导入引擎。
-
物理属性精调:为每个并行环境创建机器人实例时,会配置各个关节的阻尼(Damping)、刚度(Stiffness)以及刚体的摩擦系数。
-
关键部件索引缓存:为了后续能高效地计算碰撞奖励或接触力,这个函数会遍历机器人的刚体树,寻找并保存关键部件(如所有脚尖/足端、躯干 Base)的全局或局部索引(Indices)。比如对于六足机器人,精准锁定六个足端的索引是后续计算步态相位和腾空奖励的前提。
def _create_envs(self):
""" Creates environments:
1. loads the robot URDF/MJCF asset,
2. For each environment
2.1 creates the environment,
2.2 calls DOF and Rigid shape properties callbacks,
2.3 create actor with these properties and add them to the env
3. Store indices of different bodies of the robot
"""
asset_path = self.cfg.asset.file.format(LEGGED_GYM_ROOT_DIR=LEGGED_GYM_ROOT_DIR) # 获取机器人资产文件的路径,路径格式化以包含根目录
asset_root = os.path.dirname(asset_path)# 获取资产文件所在的目录路径
asset_file = os.path.basename(asset_path)# 获取资产文件名
asset_options = gymapi.AssetOptions()# 创建资产选项
asset_options.default_dof_drive_mode = self.cfg.asset.default_dof_drive_mode # 设置默认DOF驱动模式
asset_options.collapse_fixed_joints = self.cfg.asset.collapse_fixed_joints # 设置是否折叠固定关节
asset_options.replace_cylinder_with_capsule = self.cfg.asset.replace_cylinder_with_capsule # 设置是否用胶囊体替换圆柱体
asset_options.flip_visual_attachments = self.cfg.asset.flip_visual_attachments # 设置是否翻转视觉附件
asset_options.fix_base_link = self.cfg.asset.fix_base_link # 设置是否固定基链接
asset_options.density = self.cfg.asset.density # 设置密度
asset_options.angular_damping = self.cfg.asset.angular_damping # 设置角阻尼
asset_options.linear_damping = self.cfg.asset.linear_damping # 设置线阻尼
asset_options.max_angular_velocity = self.cfg.asset.max_angular_velocity # 设置最大角速度
asset_options.max_linear_velocity = self.cfg.asset.max_linear_velocity # 设置最大线速度
asset_options.armature = self.cfg.asset.armature # 设置关节刚度
asset_options.thickness = self.cfg.asset.thickness # 设置碰撞形状厚度
asset_options.disable_gravity = self.cfg.asset.disable_gravity # 设置是否禁用重力
robot_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)# 加载机器人资产到仿真中,并获取资产句柄
self.num_dof = self.gym.get_asset_dof_count(robot_asset)# 获取DOF数量
self.num_bodies = self.gym.get_asset_rigid_body_count(robot_asset)# 获取刚体数量
dof_props_asset = self.gym.get_asset_dof_properties(robot_asset)# 获取DOF属性
rigid_shape_props_asset = self.gym.get_asset_rigid_shape_properties(robot_asset)# 获取刚体形状属性
# save body names from the asset
body_names = self.gym.get_asset_rigid_body_names(robot_asset)# 获取刚体名称
self.dof_names = self.gym.get_asset_dof_names(robot_asset)# 获取DOF名称
self.num_bodies = len(body_names)# 获取刚体数量
self.num_dofs = len(self.dof_names)# 获取DOF数量
feet_names = [s for s in body_names if self.cfg.asset.foot_name in s]# 获取脚部刚体名称
penalized_contact_names = []# 获取惩罚接触的刚体名称
for name in self.cfg.asset.penalize_contacts_on:# 遍历所有惩罚接触的名称
penalized_contact_names.extend([s for s in body_names if name in s])
termination_contact_names = []# 获取终止接触的刚体名称
for name in self.cfg.asset.terminate_after_contacts_on:# 遍历所有终止接触的名称
termination_contact_names.extend([s for s in body_names if name in s])
base_init_state_list = self.cfg.init_state.pos + self.cfg.init_state.rot + self.cfg.init_state.lin_vel + self.cfg.init_state.ang_vel#初始化状态列表,包含位置、旋转(四元数)、线速度和角速度
self.base_init_state = to_torch(base_init_state_list, device=self.device, requires_grad=False)#将初始化状态转换为Torch张量
start_pose = gymapi.Transform()# 创建起始姿态
start_pose.p = gymapi.Vec3(*self.base_init_state[:3])# 设置起始位置
self._get_env_origins()# 获取环境原点
env_lower = gymapi.Vec3(0., 0., 0.)# 设置环境下界
env_upper = gymapi.Vec3(0., 0., 0.)# 设置环境上界
self.actor_handles = []# 存储所有actor的句柄
self.envs = []# 存储所有环境的句柄
for i in range(self.num_envs):# 遍历所有环境
# create env instance
env_handle = self.gym.create_env(self.sim, env_lower, env_upper, int(np.sqrt(self.num_envs)))# 创建环境实例
pos = self.env_origins[i].clone()# 获取环境原点
pos[:2] += torch_rand_float(-1., 1., (2,1), device=self.device).squeeze(1)# 在原点附近随机偏移位置
start_pose.p = gymapi.Vec3(*pos)# 设置起始位置
rigid_shape_props = self._process_rigid_shape_props(rigid_shape_props_asset, i)# 处理刚体形状属性
self.gym.set_asset_rigid_shape_properties(robot_asset, rigid_shape_props)# 设置资产的刚体形状属性
actor_handle = self.gym.create_actor(env_handle, robot_asset, start_pose, self.cfg.asset.name, i, self.cfg.asset.self_collisions, 0)# 在环境中创建机器人actor,并获取其句柄
dof_props = self._process_dof_props(dof_props_asset, i)# 处理DOF属性
self.gym.set_actor_dof_properties(env_handle, actor_handle, dof_props)# 设置DOF属性
body_props = self.gym.get_actor_rigid_body_properties(env_handle, actor_handle)# 获取刚体属性
body_props = self._process_rigid_body_props(body_props, i)# 处理刚体属性
self.gym.set_actor_rigid_body_properties(env_handle, actor_handle, body_props, recomputeInertia=True)# 设置刚体属性,并重新计算惯性
self.envs.append(env_handle)# 存储所有环境的句柄
self.actor_handles.append(actor_handle)# 存储所有actor的句柄
self.feet_indices = torch.zeros(len(feet_names), dtype=torch.long, device=self.device, requires_grad=False)# 存储所有脚部的索引
for i in range(len(feet_names)):
self.feet_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], feet_names[i])# 获取脚部的刚体句柄
self.penalised_contact_indices = torch.zeros(len(penalized_contact_names), dtype=torch.long, device=self.device, requires_grad=False)# 存储所有惩罚接触的索引
for i in range(len(penalized_contact_names)):
self.penalised_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], penalized_contact_names[i])# 获取惩罚接触的刚体句柄
self.termination_contact_indices = torch.zeros(len(termination_contact_names), dtype=torch.long, device=self.device, requires_grad=False)# 存储所有终止接触的索引
for i in range(len(termination_contact_names)):
self.termination_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], termination_contact_names[i])# 获取终止接触的刚体句柄
def _get_env_origins(self):
""" Sets environment origins. On rough terrain the origins are defined by the terrain platforms.
Otherwise create a grid.
"""
# Create a grid of robots
if self.cfg.terrain.mesh_type in ["heightfield", "trimesh"]:# 地形类型为高度场或三角网格时,环境原点由地形平台定义
self.custom_origins = True # 自定义原点
self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False)# 存储所有环境的原点
# put robots at the origins defined by the terrain
max_init_level = self.cfg.terrain.max_init_terrain_level# 最大初始化地形级别
if not self.cfg.terrain.curriculum: max_init_level = self.cfg.terrain.num_rows - 1# 如果没有课程,则最大初始化级别为地形行数减1
self.terrain_levels = torch.randint(0, max_init_level+1, (self.num_envs,), device=self.device)# 随机生成地形级别
self.terrain_types = torch.div(torch.arange(self.num_envs, device=self.device), (self.num_envs/self.cfg.terrain.num_cols), rounding_mode='floor').to(torch.long)# 根据环境数量和地形列数,计算每个环境对应的地形类型索引
self.max_terrain_level = self.cfg.terrain.num_rows# 最大地形级别
self.terrain_origins = torch.from_numpy(self.terrain.env_origins).to(self.device).to(torch.float)# 将地形原点从NumPy数组转换为Torch张量,并存储在设备上
self.env_origins[:] = self.terrain_origins[self.terrain_levels, self.terrain_types]# 根据地形级别和类型设置环境原点
else:
self.custom_origins = False
self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False)# 存储所有环境的原点
# create a grid of robots
num_cols = np.floor(np.sqrt(self.num_envs))# 计算列数
num_rows = np.ceil(self.num_envs / num_cols)# 计算行数
xx, yy = torch.meshgrid(torch.arange(num_rows), torch.arange(num_cols))# 生成网格
spacing = self.cfg.env.env_spacing# 计算间距
self.env_origins[:, 0] = spacing * xx.flatten()[:self.num_envs]# 设置环境原点x坐标
self.env_origins[:, 1] = spacing * yy.flatten()[:self.num_envs]# 设置环境原点y坐标
self.env_origins[:, 2] = 0.
def _process_rigid_shape_props(self, props, env_id):
""" Callback allowing to store/change/randomize the rigid shape properties of each environment.
Called During environment creation.
Base behavior: randomizes the friction of each environment
Args:
props (List[gymapi.RigidShapeProperties]): Properties of each shape of the asset
env_id (int): Environment id
Returns:
[List[gymapi.RigidShapeProperties]]: Modified rigid shape properties
"""
if self.cfg.domain_rand.randomize_friction:# 如果配置中启用了摩擦随机化,则为每个环境随机生成一个摩擦系数,并应用到所有刚体形状属性中
if env_id==0:# 仅在第一个环境中准备摩擦随机化参数,其他环境共享这些参数
# prepare friction randomization
friction_range = self.cfg.domain_rand.friction_range # 获取摩擦范围
num_buckets = 64# 设置摩擦系数桶的数量
bucket_ids = torch.randint(0, num_buckets, (self.num_envs, 1))# 随机生成桶ID
friction_buckets = torch_rand_float(friction_range[0], friction_range[1], (num_buckets,1), device='cpu')# 生成摩擦系数桶
self.friction_coeffs = friction_buckets[bucket_ids]# 为每个环境分配摩擦系数
for s in range(len(props)):# 遍历每个刚体形状属性
props[s].friction = self.friction_coeffs[env_id]# 为每个刚体形状属性分配摩擦系数
return props
def _process_dof_props(self, props, env_id):
""" Callback allowing to store/change/randomize the DOF properties of each environment.
Called During environment creation.
Base behavior: stores position, velocity and torques limits defined in the URDF
Args:
props (numpy.array): Properties of each DOF of the asset
env_id (int): Environment id
Returns:
[numpy.array]: Modified DOF properties
"""
if env_id==0:# 仅在第一个环境中准备DOF限制参数,其他环境共享这些参数
self.dof_pos_limits = torch.zeros(self.num_dof, 2, dtype=torch.float, device=self.device, requires_grad=False)# 位置限制
self.dof_vel_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)# 速度限制
self.torque_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)# 力矩限制
for i in range(len(props)):
self.dof_pos_limits[i, 0] = props["lower"][i].item()# 位置下限
self.dof_pos_limits[i, 1] = props["upper"][i].item()# 位置上限
self.dof_vel_limits[i] = props["velocity"][i].item()# 速度限制
self.torque_limits[i] = props["effort"][i].item()# 力矩限制
# soft limits
m = (self.dof_pos_limits[i, 0] + self.dof_pos_limits[i, 1]) / 2 # 位置中点
r = self.dof_pos_limits[i, 1] - self.dof_pos_limits[i, 0] # 位置范围
self.dof_pos_limits[i, 0] = m - 0.5 * r * self.cfg.rewards.soft_dof_pos_limit # 位置下限
self.dof_pos_limits[i, 1] = m + 0.5 * r * self.cfg.rewards.soft_dof_pos_limit # 位置上限
return props
def _process_rigid_body_props(self, props, env_id):
# if env_id==0:
# sum = 0
# for i, p in enumerate(props):
# sum += p.mass
# print(f"Mass of body {i}: {p.mass} (before randomization)")
# print(f"Total mass {sum} (before randomization)")
# randomize base mass
if self.cfg.domain_rand.randomize_base_mass:# 如果配置中启用了基座质量随机化,则为每个环境的第一个刚体(通常是基座)随机增加一个质量值
rng = self.cfg.domain_rand.added_mass_range # 获取随机化范围
props[0].mass += np.random.uniform(rng[0], rng[1]) # 随机增加质量
return props
4. 训练场的地貌塑造:_create_ground_plane() 等地形函数
强化学习的泛化能力很大程度上取决于训练环境的多样性。框架通常提供三种级别的地形构建方法:
-
_create_ground_plane():生成无限大的平地。计算开销极小,通常用于基础步态的初步验证或简单的调试。def _create_ground_plane(self): """ Adds a ground plane to the simulation, sets friction and restitution based on the cfg. """ plane_params = gymapi.PlaneParams()# 创建一个平面参数对象,用于定义地面平面的属性 plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)# 设置平面的法向量 plane_params.static_friction = self.cfg.terrain.static_friction# 设置静态摩擦力 plane_params.dynamic_friction = self.cfg.terrain.dynamic_friction# 设置动态摩擦力 plane_params.restitution = self.cfg.terrain.restitution# 设置恢复系数 self.gym.add_ground(self.sim, plane_params)# 将地面平面添加到仿真中 -
_create_heightfield():高度图地形。通过二维数组生成高低不平的地表,非常适合模拟粗糙地面,计算效率与物理逼真度达到了很好的平衡。def _create_heightfield(self): """ Adds a heightfield terrain to the simulation, sets parameters based on the cfg. """ hf_params = gymapi.HeightFieldParams()# 创建一个高度场参数对象,用于定义高度场地形的属性 hf_params.column_scale = self.terrain.cfg.horizontal_scale# 设置水平缩放 hf_params.row_scale = self.terrain.cfg.horizontal_scale# 设置垂直缩放 hf_params.vertical_scale = self.terrain.cfg.vertical_scale# 设置高度缩放 hf_params.nbRows = self.terrain.tot_cols# 设置高度场的行数 hf_params.nbColumns = self.terrain.tot_rows# 设置高度场的列数 hf_params.transform.p.x = -self.terrain.cfg.border_size# 设置高度场的X轴位置 hf_params.transform.p.y = -self.terrain.cfg.border_size# 设置高度场的Y轴位置 hf_params.transform.p.z = 0.0# 设置高度场的Z轴位置 hf_params.static_friction = self.cfg.terrain.static_friction# 设置静态摩擦力 hf_params.dynamic_friction = self.cfg.terrain.dynamic_friction# 设置动态摩擦力 hf_params.restitution = self.cfg.terrain.restitution# 设置恢复系数 self.gym.add_heightfield(self.sim, self.terrain.heightsamples, hf_params)# 将高度场添加到仿真中 self.height_samples = torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows, self.terrain.tot_cols).to(self.device)# 将高度场样本转换为张量并移动到设备上 -
_create_trimesh():三角网格地形。这是最复杂的离散地形生成方式,可以构建楼梯、离散落脚点等复杂障碍物。在进行高难度的地形穿越(Locomotion)课程学习时,Trimesh 是必不可少的工具。def _create_trimesh(self): """ Adds a triangle mesh terrain to the simulation, sets parameters based on the cfg. # """ tm_params = gymapi.TriangleMeshParams()# 创建一个三角网格参数对象,用于定义三角网格地形的属性 tm_params.nb_vertices = self.terrain.vertices.shape[0]# 设置三角网格的顶点数量 tm_params.nb_triangles = self.terrain.triangles.shape[0]# 设置三角网格的三角形数量 tm_params.transform.p.x = -self.terrain.cfg.border_size# 设置三角网格的X轴位置 tm_params.transform.p.y = -self.terrain.cfg.border_size# 设置三角网格的Y轴位置 tm_params.transform.p.z = 0.0# 设置三角网格的Z轴位置 tm_params.static_friction = self.cfg.terrain.static_friction# 设置静态摩擦力 tm_params.dynamic_friction = self.cfg.terrain.dynamic_friction# 设置动态摩擦力 tm_params.restitution = self.cfg.terrain.restitution# 设置恢复系数 self.gym.add_triangle_mesh(self.sim, self.terrain.vertices.flatten(order='C'), self.terrain.triangles.flatten(order='C'), tm_params) # 将三角网格添加到仿真中 self.height_samples = torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows, self.terrain.tot_cols).to(self.device)# 将高度场样本转换为张量并移动到设备上
5. 跨越 GPU 显存的桥梁:_init_buffers()
这是 Isaac Gym 等现代并行仿真框架的精髓所在,也是它能比传统仿真器快几个数量级的秘密武器。
-
状态张量包裹 (Tensor Wrapping):底层的 PhysX 引擎在 GPU 显存中维护着所有物体的海量状态数据。
_init_buffers()并不把这些数据拷贝到 CPU,而是调用特定的 API 获取这些底层数据的内存指针,并将其“包裹”成 PyTorch 的 Tensor。 -
零拷贝 (Zero-Copy) 交互:通过这种方式,机器人的根状态(位置/姿态)、关节状态(角度/速度)、刚体接触力等核心物理量,直接就变成了可以被 PyTorch 自动求导机制或神经网络直接调用的张量。
-
视图更新:在后续的每次物理仿真步进后,只需刷新这些 Tensor 视图,我们就能瞬间获得几万个机器人的最新状态,实现了仿真与深度学习计算的无缝对接。
def _init_buffers(self):
""" Initialize torch tensors which will contain simulation states and processed quantities
"""
# get gym GPU state tensors
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)# 获取角色根节点(Base)状态
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)# 获取关节自由度(DOF)状态
net_contact_forces = self.gym.acquire_net_contact_force_tensor(self.sim)# 获取网络接触力状态
self.gym.refresh_dof_state_tensor(self.sim)# 刷新关节状态张量
self.gym.refresh_actor_root_state_tensor(self.sim)# 刷新角色根节点状态张量
self.gym.refresh_net_contact_force_tensor(self.sim)# 刷新网络接触力状态张量
# create some wrapper tensors for different slices
self.root_states = gymtorch.wrap_tensor(actor_root_state)# 包装为 PyTorch 张量,形状为 (num_envs, num_actors_per_env, 13),包含位置、旋转(四元数)和线性/角速度
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)# 包装为 PyTorch 张量,形状为 (num_envs, num_dofs_per_env, 2),包含位置和速度
self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0]# 包装为 PyTorch 张量,形状为 (num_envs, num_dofs_per_env),包含关节位置
self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]# 包装为 PyTorch 张量,形状为 (num_envs, num_dofs_per_env),包含关节速度
self.base_quat = self.root_states[:, 3:7]# 包装为 PyTorch 张量,形状为 (num_envs, num_actors_per_env, 4),包含四元数
# 计算接触力
self.contact_forces = gymtorch.wrap_tensor(net_contact_forces).view(self.num_envs, -1, 3) # shape: num_envs, num_bodies, xyz axis
# initialize some data used later on
## 初始化后续使用的辅助数据
self.common_step_counter = 0 # 记录物理仿真的步数
self.extras = {} # 存储额外信息
self.noise_scale_vec = self._get_noise_scale_vec(self.cfg)# 获取用于观测噪声的缩放向量
self.gravity_vec = to_torch(get_axis_params(-1., self.up_axis_idx), device=self.device).repeat((self.num_envs, 1))# 获取重力向量,根据上轴索引设置重力方向,并重复以匹配环境数量
self.forward_vec = to_torch([1., 0., 0.], device=self.device).repeat((self.num_envs, 1))# 获取前向向量(通常为 x 轴方向),并重复以匹配环境数量
self.torques = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)## 初始化扭矩存储张量
self.p_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)# 初始化比例增益 P 矩阵
self.d_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)# 初始化当前动作张量
self.last_actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)# 初始化上一帧动作张量(用于计算平滑度惩罚)
self.last_dof_vel = torch.zeros_like(self.dof_vel)# 初始化上一帧关节速度(用于计算加速度或微分项)
self.last_root_vel = torch.zeros_like(self.root_states[:, 7:13])# 初始化上一帧根节点速度
# 初始化指令张量(如目标线速度、角速度等)
self.commands = torch.zeros(self.num_envs, self.cfg.commands.num_commands, dtype=torch.float, device=self.device, requires_grad=False) # x vel, y vel, yaw vel, heading
# 设置指令的缩放因子
self.commands_scale = torch.tensor([self.obs_scales.lin_vel, self.obs_scales.lin_vel, self.obs_scales.ang_vel], device=self.device, requires_grad=False,) # TODO change this
# 初始化足端腾空时间(用于步态奖励)
self.feet_air_time = torch.zeros(self.num_envs, self.feet_indices.shape[0], dtype=torch.float, device=self.device, requires_grad=False)
# 记录上一帧的触地状态
self.last_contacts = torch.zeros(self.num_envs, len(self.feet_indices), dtype=torch.bool, device=self.device, requires_grad=False)
self.base_lin_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10]) # 将世界坐标系下的线速度旋转到机身局部坐标系下
self.base_ang_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])# 将世界坐标系下的角速度旋转到机身局部坐标系下
self.projected_gravity = quat_rotate_inverse(self.base_quat, self.gravity_vec)# 计算机身感知的重力向量(投影重力,常用于状态观测)
if self.cfg.terrain.measure_heights:# 如果配置需要测量地形高度(如在复杂地形行走)
self.height_points = self._init_height_points()
self.measured_heights = 0# 初始化测量的高度值
# joint positions offsets and PD gains
self.default_dof_pos = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)# 初始化关节位置偏移量
for i in range(self.num_dofs):# 遍历所有关节
name = self.dof_names[i]# 获取关节名称
angle = self.cfg.init_state.default_joint_angles[name]# 获取关节的默认角度
self.default_dof_pos[i] = angle# 设置关节的默认位置
found = False# 标记是否找到对应的刚度和阻尼
for dof_name in self.cfg.control.stiffness.keys():# 遍历所有关节的刚度
if dof_name in name:# 如果关节名称中包含刚度名称
self.p_gains[i] = self.cfg.control.stiffness[dof_name]# 设置比例增益
self.d_gains[i] = self.cfg.control.damping[dof_name]# 设置阻尼
found = True# 标记已找到对应的刚度和阻尼
if not found:# 如果未找到对应的刚度和阻尼
self.p_gains[i] = 0.# 设置比例增益为零
self.d_gains[i] = 0.# 设置阻尼为零
if self.cfg.control.control_type in ["P", "V"]:# 如果控制类型为 "P" 或 "V"
print(f"PD gain of joint {name} were not defined, setting them to zero")
self.default_dof_pos = self.default_dof_pos.unsqueeze(0)# 将默认关节位置偏移量扩展为 (1, num_dof),以便在计算时广播到所有环境
def _get_noise_scale_vec(self, cfg):
""" Sets a vector used to scale the noise added to the observations.
[NOTE]: Must be adapted when changing the observations structure
Args:
cfg (Dict): Environment config file
Returns:
[torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1]
"""
noise_vec = torch.zeros_like(self.obs_buf[0])# 创建一个与单个观测值相同形状的零向量,用于存储每个观测分量的噪声缩放因子
self.add_noise = self.cfg.noise.add_noise # 从配置中读取是否开启噪声添加的布尔值
noise_scales = self.cfg.noise.noise_scales # 获取配置中各传感器的噪声基准缩放比例(例如 lin_vel: 0.1)
noise_level = self.cfg.noise.noise_level # 获取总体的噪声强度等级(通常用于一键调节全局噪声大小)
noise_vec[:3] = noise_scales.lin_vel * noise_level * self.obs_scales.lin_vel
noise_vec[3:6] = noise_scales.ang_vel * noise_level * self.obs_scales.ang_vel
noise_vec[6:9] = noise_scales.gravity * noise_level
noise_vec[9:12] = 0. # commands
noise_vec[12:24] = noise_scales.dof_pos * noise_level * self.obs_scales.dof_pos
noise_vec[24:36] = noise_scales.dof_vel * noise_level * self.obs_scales.dof_vel
noise_vec[36:48] = 0. # previous actions
if self.cfg.terrain.measure_heights:# 如果配置中开启了地形高度测量(网格采样)
# 索引 48-234: 为测得的地形高度数据添加噪声缩放(模拟传感器误差)
noise_vec[48:235] = noise_scales.height_measurements* noise_level * self.obs_scales.height_measurements
return noise_vec
def _init_height_points(self):
""" Returns points at which the height measurments are sampled (in base frame)
Returns:
[torch.Tensor]: Tensor of shape (num_envs, self.num_height_points, 3)
"""
y = torch.tensor(self.cfg.terrain.measured_points_y, device=self.device, requires_grad=False)# 创建一个包含测量点y坐标的张量,形状为 (num_measured_points_y,)
x = torch.tensor(self.cfg.terrain.measured_points_x, device=self.device, requires_grad=False)# 创建一个包含测量点x坐标的张量,形状为 (num_measured_points_x,)
grid_x, grid_y = torch.meshgrid(x, y)# 生成网格
self.num_height_points = grid_x.numel()# 计算高度测量点的数量
points = torch.zeros(self.num_envs, self.num_height_points, 3, device=self.device, requires_grad=False)# 创建一个张量用于存储高度测量点
points[:, :, 0] = grid_x.flatten()# 将网格的x坐标赋值给高度测量点
points[:, :, 1] = grid_y.flatten()# 将网格的y坐标赋值给高度测量点
return points
总结
环境与资产初始化模块是整个 RL 训练框架的基础建设。它不仅决定了机器人模型的物理准确性,更通过预分配显存和 Tensor Wrapping 技术,锁死了整个系统的高性能上限。做好了这一步,后续的控制逻辑和网络训练才能在一个稳固且极速的世界中狂奔。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)