-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathtest_scale_randomization.py
345 lines (266 loc) · 10.9 KB
/
test_scale_randomization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script checks the functionality of scale randomization.
"""
from __future__ import annotations
"""Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests
# launch omniverse app
app_launcher = AppLauncher(headless=True, enable_cameras=True)
simulation_app = app_launcher.app
"""Rest everything follows."""
import torch
import unittest
import omni.usd
from pxr import Sdf
import isaaclab.envs.mdp as mdp
import isaaclab.sim as sim_utils
from isaaclab.assets import AssetBaseCfg, RigidObject, RigidObjectCfg
from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg
from isaaclab.managers import ActionTerm, ActionTermCfg
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import SceneEntityCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.terrains import TerrainImporterCfg
from isaaclab.utils import configclass
##
# Custom action term
##
class CubeActionTerm(ActionTerm):
"""Simple action term that implements a PD controller to track a target position.
The action term is applied to the cube asset. It involves two steps:
1. **Process the raw actions**: Typically, this includes any transformations of the raw actions
that are required to map them to the desired space. This is called once per environment step.
2. **Apply the processed actions**: This step applies the processed actions to the asset.
It is called once per simulation step.
In this case, the action term simply applies the raw actions to the cube asset. The raw actions
are the desired target positions of the cube in the environment frame. The pre-processing step
simply copies the raw actions to the processed actions as no additional processing is required.
The processed actions are then applied to the cube asset by implementing a PD controller to
track the target position.
"""
_asset: RigidObject
"""The articulation asset on which the action term is applied."""
def __init__(self, cfg: CubeActionTermCfg, env: ManagerBasedEnv):
# call super constructor
super().__init__(cfg, env)
# create buffers
self._raw_actions = torch.zeros(env.num_envs, 3, device=self.device)
self._processed_actions = torch.zeros(env.num_envs, 3, device=self.device)
self._vel_command = torch.zeros(self.num_envs, 6, device=self.device)
# gains of controller
self.p_gain = cfg.p_gain
self.d_gain = cfg.d_gain
"""
Properties.
"""
@property
def action_dim(self) -> int:
return self._raw_actions.shape[1]
@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions
@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
"""
Operations
"""
def process_actions(self, actions: torch.Tensor):
# store the raw actions
self._raw_actions[:] = actions
# no-processing of actions
self._processed_actions[:] = self._raw_actions[:]
def apply_actions(self):
# implement a PD controller to track the target position
pos_error = self._processed_actions - (self._asset.data.root_pos_w - self._env.scene.env_origins)
vel_error = -self._asset.data.root_lin_vel_w
# set velocity targets
self._vel_command[:, :3] = self.p_gain * pos_error + self.d_gain * vel_error
self._asset.write_root_velocity_to_sim(self._vel_command)
@configclass
class CubeActionTermCfg(ActionTermCfg):
"""Configuration for the cube action term."""
class_type: type = CubeActionTerm
"""The class corresponding to the action term."""
p_gain: float = 5.0
"""Proportional gain of the PD controller."""
d_gain: float = 0.5
"""Derivative gain of the PD controller."""
##
# Custom observation term
##
def base_position(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Root linear velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_pos_w - env.scene.env_origins
##
# Scene definition
##
@configclass
class MySceneCfg(InteractiveSceneCfg):
"""Example scene configuration.
The scene comprises of a ground plane, light source and floating cubes (gravity disabled).
"""
# add terrain
terrain = TerrainImporterCfg(prim_path="/World/ground", terrain_type="plane", debug_vis=False)
# add cube for scale randomization
cube1: RigidObjectCfg = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/cube1",
spawn=sim_utils.CuboidCfg(
size=(0.2, 0.2, 0.2),
rigid_props=sim_utils.RigidBodyPropertiesCfg(max_depenetration_velocity=1.0, disable_gravity=True),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
physics_material=sim_utils.RigidBodyMaterialCfg(),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 0.0, 0.0)),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 5)),
)
# add cube for static scale values
cube2: RigidObjectCfg = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/cube2",
spawn=sim_utils.CuboidCfg(
size=(0.2, 0.2, 0.2),
rigid_props=sim_utils.RigidBodyPropertiesCfg(max_depenetration_velocity=1.0, disable_gravity=True),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
physics_material=sim_utils.RigidBodyMaterialCfg(),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 0.0, 0.0)),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 5)),
)
# lights
light = AssetBaseCfg(
prim_path="/World/light",
spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0),
)
##
# Environment settings
##
@configclass
class ActionsCfg:
"""Action specifications for the MDP."""
joint_pos = CubeActionTermCfg(asset_name="cube1")
@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group."""
# cube velocity
position = ObsTerm(func=base_position, params={"asset_cfg": SceneEntityCfg("cube1")})
def __post_init__(self):
self.enable_corruption = True
self.concatenate_terms = True
# observation groups
policy: PolicyCfg = PolicyCfg()
@configclass
class EventCfg:
"""Configuration for events."""
reset_base = EventTerm(
func=mdp.reset_root_state_uniform,
mode="reset",
params={
"pose_range": {"x": (-0.5, 0.5), "y": (-0.5, 0.5), "yaw": (-3.14, 3.14)},
"velocity_range": {
"x": (-0.5, 0.5),
"y": (-0.5, 0.5),
"z": (-0.5, 0.5),
},
"asset_cfg": SceneEntityCfg("cube1"),
},
)
# Scale randomization as intended
randomize_cube1__scale = EventTerm(
func=mdp.randomize_rigid_body_scale,
mode="prestartup",
params={
"scale_range": {"x": (0.5, 1.5), "y": (0.5, 1.5), "z": (0.5, 1.5)},
"asset_cfg": SceneEntityCfg("cube1"),
},
)
# Static scale values
randomize_cube2__scale = EventTerm(
func=mdp.randomize_rigid_body_scale,
mode="prestartup",
params={
"scale_range": {"x": (1.0, 1.0), "y": (1.0, 1.0), "z": (1.0, 1.0)},
"asset_cfg": SceneEntityCfg("cube2"),
},
)
##
# Environment configuration
##
@configclass
class CubeEnvCfg(ManagerBasedEnvCfg):
"""Configuration for the locomotion velocity-tracking environment."""
# Scene settings
scene: MySceneCfg = MySceneCfg(num_envs=10, env_spacing=2.5, replicate_physics=False)
# Basic settings
observations: ObservationsCfg = ObservationsCfg()
actions: ActionsCfg = ActionsCfg()
events: EventCfg = EventCfg()
def __post_init__(self):
"""Post initialization."""
# general settings
self.decimation = 2
# simulation settings
self.sim.dt = 0.01
self.sim.physics_material = self.scene.terrain.physics_material
class TestScaleRandomization(unittest.TestCase):
"""Test for texture randomization"""
"""
Tests
"""
def test_scale_randomization(self):
"""Main function."""
# setup base environment
env = ManagerBasedEnv(cfg=CubeEnvCfg())
# setup target position commands
target_position = torch.rand(env.num_envs, 3, device=env.device) * 2
target_position[:, 2] += 2.0
# offset all targets so that they move to the world origin
target_position -= env.scene.env_origins
stage = omni.usd.get_context().get_stage()
# test to make sure all assets in the scene are created
all_prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube.*/.*")
self.assertEqual(len(all_prim_paths), (env.num_envs * 2))
# test to make sure randomized values are truly random
applied_scaling_randomization = set()
prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube1")
for i in range(3):
prim_spec = Sdf.CreatePrimInLayer(stage.GetRootLayer(), prim_paths[i])
scale_spec = prim_spec.GetAttributeAtPath(prim_paths[i] + ".xformOp:scale")
if scale_spec.default in applied_scaling_randomization:
raise ValueError(
"Detected repeat in applied scale values - indication scaling randomization is not working."
)
applied_scaling_randomization.add(scale_spec.default)
# test to make sure that fixed values are assigned correctly
prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube2")
for i in range(3):
prim_spec = Sdf.CreatePrimInLayer(stage.GetRootLayer(), prim_paths[i])
scale_spec = prim_spec.GetAttributeAtPath(prim_paths[i] + ".xformOp:scale")
self.assertEqual(tuple(scale_spec.default), (1.0, 1.0, 1.0))
# simulate physics
with torch.inference_mode():
for count in range(200):
# reset every few steps to check nothing breaks
if count % 100 == 0:
env.reset()
# step the environment
env.step(target_position)
env.close()
def test_scale_randomization_failure_replicate_physics(self):
with self.assertRaises(ValueError):
cfg_failure = CubeEnvCfg()
cfg_failure.scene.replicate_physics = True
env = ManagerBasedEnv(cfg_failure)
env.close()
if __name__ == "__main__":
run_tests()