diff --git a/source/isaaclab/isaaclab/envs/mdp/events.py b/source/isaaclab/isaaclab/envs/mdp/events.py index 0c23c902427..cb7b443d9c9 100644 --- a/source/isaaclab/isaaclab/envs/mdp/events.py +++ b/source/isaaclab/isaaclab/envs/mdp/events.py @@ -14,12 +14,14 @@ from __future__ import annotations +import math import torch from typing import TYPE_CHECKING, Literal import carb import omni.physics.tensors.impl.api as physx import omni.usd +from isaacsim.core.utils.extensions import enable_extension from pxr import Gf, Sdf, UsdGeom, Vt import isaaclab.sim as sim_utils @@ -274,6 +276,99 @@ def __call__( self.asset.root_physx_view.set_material_properties(materials, env_ids) +class randomize_visual_texture_material(ManagerTermBase): + """Randomize the visual texture of bodies on an asset using Replicator API. + + This function randomizes the visual texture of the bodies of the asset using the Replicator API. + The function samples random textures from the given texture paths and applies them to the bodies + of the asset. The textures are projected onto the bodies and rotated by the given angles. + + .. note:: + The function assumes that the asset follows the prim naming convention as: + "{asset_prim_path}/{body_name}/visuals" where the body name is the name of the body to + which the texture is applied. This is the default prim ordering when importing assets + from the asset converters in Isaac Lab. + + .. note:: + When randomizing the texture of individual assets, please make sure to set + :attr:`isaaclab.scene.InteractiveSceneCfg.replicate_physics` to False. This ensures that physics + parser will parse the individual asset properties separately. + """ + + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + """Initialize the term. + + Args: + cfg: The configuration of the event term. + env: The environment instance. + """ + super().__init__(cfg, env) + + enable_extension("omni.replicator.core") + + # we import the module here since we may not always need the replicator + import omni.replicator.core as rep + + # read parameters from the configuration + asset_cfg: SceneEntityCfg = cfg.params.get("asset_cfg") + texture_paths = cfg.params.get("texture_paths") + event_name = cfg.params.get("event_name") + texture_rotation = cfg.params.get("texture_rotation", (0.0, 0.0)) + + # check to make sure replicate_physics is set to False, else raise warning + if env.cfg.scene.replicate_physics: + raise ValueError( + "Unable to randomize visual texture material - ensure InteractiveSceneCfg's replicate_physics parameter" + " is set to False." + ) + + # convert from radians to degrees + texture_rotation = tuple(math.degrees(angle) for angle in texture_rotation) + + # obtain the asset entity + asset_entity = env.scene[asset_cfg.name] + # join all bodies in the asset + body_names = asset_cfg.body_names + if isinstance(body_names, str): + body_names_regex = body_names + elif isinstance(body_names, list): + body_names_regex = "|".join(body_names) + else: + body_names_regex = ".*" + + # Create the omni-graph node for the randomization term + def rep_texture_randomization(): + prims_group = rep.get.prims(path_pattern=f"{asset_entity.cfg.prim_path}/{body_names_regex}/visuals") + + with prims_group: + rep.randomizer.texture( + textures=texture_paths, project_uvw=True, texture_rotate=rep.distribution.uniform(*texture_rotation) + ) + + return prims_group.node + + # Register the event to the replicator + with rep.trigger.on_custom_event(event_name=event_name): + rep_texture_randomization() + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: torch.Tensor, + event_name: str, + asset_cfg: SceneEntityCfg, + texture_paths: list[str], + texture_rotation: tuple[float, float] = (0.0, 0.0), + ): + # import replicator + import omni.replicator.core as rep + + # only send the event to the replicator + # note: This triggers the nodes for all the environments. + # We need to investigate how to make it happen only for a subset based on env_ids. + rep.utils.send_og_event(event_name) + + def randomize_rigid_body_mass( env: ManagerBasedEnv, env_ids: torch.Tensor | None, diff --git a/source/isaaclab/test/envs/test_scale_randomization.py b/source/isaaclab/test/envs/test_scale_randomization.py new file mode 100644 index 00000000000..122f011f531 --- /dev/null +++ b/source/isaaclab/test/envs/test_scale_randomization.py @@ -0,0 +1,345 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +""" +This script checks the functionality of scale randomization. +""" + +from __future__ import annotations + +"""Launch Isaac Sim Simulator first.""" + +from isaaclab.app import AppLauncher, run_tests + +# launch omniverse app +app_launcher = AppLauncher(headless=True, enable_cameras=True) +simulation_app = app_launcher.app + +"""Rest everything follows.""" + +import torch +import unittest + +import omni.usd +from pxr import Sdf + +import isaaclab.envs.mdp as mdp +import isaaclab.sim as sim_utils +from isaaclab.assets import AssetBaseCfg, RigidObject, RigidObjectCfg +from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg +from isaaclab.managers import ActionTerm, ActionTermCfg +from isaaclab.managers import EventTermCfg as EventTerm +from isaaclab.managers import ObservationGroupCfg as ObsGroup +from isaaclab.managers import ObservationTermCfg as ObsTerm +from isaaclab.managers import SceneEntityCfg +from isaaclab.scene import InteractiveSceneCfg +from isaaclab.terrains import TerrainImporterCfg +from isaaclab.utils import configclass + +## +# Custom action term +## + + +class CubeActionTerm(ActionTerm): + """Simple action term that implements a PD controller to track a target position. + + The action term is applied to the cube asset. It involves two steps: + + 1. **Process the raw actions**: Typically, this includes any transformations of the raw actions + that are required to map them to the desired space. This is called once per environment step. + 2. **Apply the processed actions**: This step applies the processed actions to the asset. + It is called once per simulation step. + + In this case, the action term simply applies the raw actions to the cube asset. The raw actions + are the desired target positions of the cube in the environment frame. The pre-processing step + simply copies the raw actions to the processed actions as no additional processing is required. + The processed actions are then applied to the cube asset by implementing a PD controller to + track the target position. + """ + + _asset: RigidObject + """The articulation asset on which the action term is applied.""" + + def __init__(self, cfg: CubeActionTermCfg, env: ManagerBasedEnv): + # call super constructor + super().__init__(cfg, env) + # create buffers + self._raw_actions = torch.zeros(env.num_envs, 3, device=self.device) + self._processed_actions = torch.zeros(env.num_envs, 3, device=self.device) + self._vel_command = torch.zeros(self.num_envs, 6, device=self.device) + # gains of controller + self.p_gain = cfg.p_gain + self.d_gain = cfg.d_gain + + """ + Properties. + """ + + @property + def action_dim(self) -> int: + return self._raw_actions.shape[1] + + @property + def raw_actions(self) -> torch.Tensor: + return self._raw_actions + + @property + def processed_actions(self) -> torch.Tensor: + return self._processed_actions + + """ + Operations + """ + + def process_actions(self, actions: torch.Tensor): + # store the raw actions + self._raw_actions[:] = actions + # no-processing of actions + self._processed_actions[:] = self._raw_actions[:] + + def apply_actions(self): + # implement a PD controller to track the target position + pos_error = self._processed_actions - (self._asset.data.root_pos_w - self._env.scene.env_origins) + vel_error = -self._asset.data.root_lin_vel_w + # set velocity targets + self._vel_command[:, :3] = self.p_gain * pos_error + self.d_gain * vel_error + self._asset.write_root_velocity_to_sim(self._vel_command) + + +@configclass +class CubeActionTermCfg(ActionTermCfg): + """Configuration for the cube action term.""" + + class_type: type = CubeActionTerm + """The class corresponding to the action term.""" + + p_gain: float = 5.0 + """Proportional gain of the PD controller.""" + d_gain: float = 0.5 + """Derivative gain of the PD controller.""" + + +## +# Custom observation term +## + + +def base_position(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor: + """Root linear velocity in the asset's root frame.""" + # extract the used quantities (to enable type-hinting) + asset: RigidObject = env.scene[asset_cfg.name] + return asset.data.root_pos_w - env.scene.env_origins + + +## +# Scene definition +## + + +@configclass +class MySceneCfg(InteractiveSceneCfg): + """Example scene configuration. + + The scene comprises of a ground plane, light source and floating cubes (gravity disabled). + """ + + # add terrain + terrain = TerrainImporterCfg(prim_path="/World/ground", terrain_type="plane", debug_vis=False) + + # add cube for scale randomization + cube1: RigidObjectCfg = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/cube1", + spawn=sim_utils.CuboidCfg( + size=(0.2, 0.2, 0.2), + rigid_props=sim_utils.RigidBodyPropertiesCfg(max_depenetration_velocity=1.0, disable_gravity=True), + mass_props=sim_utils.MassPropertiesCfg(mass=1.0), + physics_material=sim_utils.RigidBodyMaterialCfg(), + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 0.0, 0.0)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 5)), + ) + + # add cube for static scale values + cube2: RigidObjectCfg = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/cube2", + spawn=sim_utils.CuboidCfg( + size=(0.2, 0.2, 0.2), + rigid_props=sim_utils.RigidBodyPropertiesCfg(max_depenetration_velocity=1.0, disable_gravity=True), + mass_props=sim_utils.MassPropertiesCfg(mass=1.0), + physics_material=sim_utils.RigidBodyMaterialCfg(), + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 0.0, 0.0)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 5)), + ) + + # lights + light = AssetBaseCfg( + prim_path="/World/light", + spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0), + ) + + +## +# Environment settings +## + + +@configclass +class ActionsCfg: + """Action specifications for the MDP.""" + + joint_pos = CubeActionTermCfg(asset_name="cube1") + + +@configclass +class ObservationsCfg: + """Observation specifications for the MDP.""" + + @configclass + class PolicyCfg(ObsGroup): + """Observations for policy group.""" + + # cube velocity + position = ObsTerm(func=base_position, params={"asset_cfg": SceneEntityCfg("cube1")}) + + def __post_init__(self): + self.enable_corruption = True + self.concatenate_terms = True + + # observation groups + policy: PolicyCfg = PolicyCfg() + + +@configclass +class EventCfg: + """Configuration for events.""" + + reset_base = EventTerm( + func=mdp.reset_root_state_uniform, + mode="reset", + params={ + "pose_range": {"x": (-0.5, 0.5), "y": (-0.5, 0.5), "yaw": (-3.14, 3.14)}, + "velocity_range": { + "x": (-0.5, 0.5), + "y": (-0.5, 0.5), + "z": (-0.5, 0.5), + }, + "asset_cfg": SceneEntityCfg("cube1"), + }, + ) + + # Scale randomization as intended + randomize_cube1__scale = EventTerm( + func=mdp.randomize_rigid_body_scale, + mode="prestartup", + params={ + "scale_range": {"x": (0.5, 1.5), "y": (0.5, 1.5), "z": (0.5, 1.5)}, + "asset_cfg": SceneEntityCfg("cube1"), + }, + ) + + # Static scale values + randomize_cube2__scale = EventTerm( + func=mdp.randomize_rigid_body_scale, + mode="prestartup", + params={ + "scale_range": {"x": (1.0, 1.0), "y": (1.0, 1.0), "z": (1.0, 1.0)}, + "asset_cfg": SceneEntityCfg("cube2"), + }, + ) + + +## +# Environment configuration +## + + +@configclass +class CubeEnvCfg(ManagerBasedEnvCfg): + """Configuration for the locomotion velocity-tracking environment.""" + + # Scene settings + scene: MySceneCfg = MySceneCfg(num_envs=10, env_spacing=2.5, replicate_physics=False) + # Basic settings + observations: ObservationsCfg = ObservationsCfg() + actions: ActionsCfg = ActionsCfg() + events: EventCfg = EventCfg() + + def __post_init__(self): + """Post initialization.""" + # general settings + self.decimation = 2 + # simulation settings + self.sim.dt = 0.01 + self.sim.physics_material = self.scene.terrain.physics_material + + +class TestScaleRandomization(unittest.TestCase): + """Test for texture randomization""" + + """ + Tests + """ + + def test_scale_randomization(self): + """Main function.""" + + # setup base environment + env = ManagerBasedEnv(cfg=CubeEnvCfg()) + # setup target position commands + target_position = torch.rand(env.num_envs, 3, device=env.device) * 2 + target_position[:, 2] += 2.0 + # offset all targets so that they move to the world origin + target_position -= env.scene.env_origins + + stage = omni.usd.get_context().get_stage() + + # test to make sure all assets in the scene are created + all_prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube.*/.*") + self.assertEqual(len(all_prim_paths), (env.num_envs * 2)) + + # test to make sure randomized values are truly random + applied_scaling_randomization = set() + prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube1") + + for i in range(3): + prim_spec = Sdf.CreatePrimInLayer(stage.GetRootLayer(), prim_paths[i]) + scale_spec = prim_spec.GetAttributeAtPath(prim_paths[i] + ".xformOp:scale") + if scale_spec.default in applied_scaling_randomization: + raise ValueError( + "Detected repeat in applied scale values - indication scaling randomization is not working." + ) + applied_scaling_randomization.add(scale_spec.default) + + # test to make sure that fixed values are assigned correctly + prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube2") + for i in range(3): + prim_spec = Sdf.CreatePrimInLayer(stage.GetRootLayer(), prim_paths[i]) + scale_spec = prim_spec.GetAttributeAtPath(prim_paths[i] + ".xformOp:scale") + self.assertEqual(tuple(scale_spec.default), (1.0, 1.0, 1.0)) + + # simulate physics + with torch.inference_mode(): + for count in range(200): + # reset every few steps to check nothing breaks + if count % 100 == 0: + env.reset() + # step the environment + env.step(target_position) + + env.close() + + def test_scale_randomization_failure_replicate_physics(self): + with self.assertRaises(ValueError): + cfg_failure = CubeEnvCfg() + cfg_failure.scene.replicate_physics = True + env = ManagerBasedEnv(cfg_failure) + + env.close() + + +if __name__ == "__main__": + run_tests() diff --git a/source/isaaclab/test/envs/test_texture_randomization.py b/source/isaaclab/test/envs/test_texture_randomization.py new file mode 100644 index 00000000000..cab13757101 --- /dev/null +++ b/source/isaaclab/test/envs/test_texture_randomization.py @@ -0,0 +1,189 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +""" +This script tests the functionality of texture randomization applied to the cartpole scene. +""" + +"""Launch Isaac Sim Simulator first.""" + +from isaaclab.app import AppLauncher, run_tests + +# launch omniverse app +app_launcher = AppLauncher(headless=True, enable_cameras=True) +simulation_app = app_launcher.app + +"""Rest everything follows.""" + +import math +import torch +import unittest + +import isaaclab.envs.mdp as mdp +from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg +from isaaclab.managers import EventTermCfg as EventTerm +from isaaclab.managers import ObservationGroupCfg as ObsGroup +from isaaclab.managers import ObservationTermCfg as ObsTerm +from isaaclab.managers import SceneEntityCfg +from isaaclab.utils import configclass +from isaaclab.utils.assets import NVIDIA_NUCLEUS_DIR + +from isaaclab_tasks.manager_based.classic.cartpole.cartpole_env_cfg import CartpoleSceneCfg + + +@configclass +class ActionsCfg: + """Action specifications for the environment.""" + + joint_efforts = mdp.JointEffortActionCfg(asset_name="robot", joint_names=["slider_to_cart"], scale=5.0) + + +@configclass +class ObservationsCfg: + """Observation specifications for the environment.""" + + @configclass + class PolicyCfg(ObsGroup): + """Observations for policy group.""" + + # observation terms (order preserved) + joint_pos_rel = ObsTerm(func=mdp.joint_pos_rel) + joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel) + + def __post_init__(self) -> None: + self.enable_corruption = False + self.concatenate_terms = True + + # observation groups + policy: PolicyCfg = PolicyCfg() + + +@configclass +class EventCfg: + """Configuration for events.""" + + # on reset apply a new set of textures + cart_texture_randomizer = EventTerm( + func=mdp.randomize_visual_texture_material, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", body_names=["cart"]), + "texture_paths": [ + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Bamboo_Planks/Bamboo_Planks_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Cherry/Cherry_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Oak/Oak_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber/Timber_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber_Cladding/Timber_Cladding_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Walnut_Planks/Walnut_Planks_BaseColor.png", + ], + "event_name": "cart_texture_randomizer", + "texture_rotation": (math.pi / 2, math.pi / 2), + }, + ) + + pole_texture_randomizer = EventTerm( + func=mdp.randomize_visual_texture_material, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", body_names=["pole"]), + "texture_paths": [ + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Bamboo_Planks/Bamboo_Planks_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Cherry/Cherry_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Oak/Oak_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber/Timber_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber_Cladding/Timber_Cladding_BaseColor.png", + f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Walnut_Planks/Walnut_Planks_BaseColor.png", + ], + "event_name": "pole_texture_randomizer", + "texture_rotation": (math.pi / 2, math.pi / 2), + }, + ) + + reset_cart_position = EventTerm( + func=mdp.reset_joints_by_offset, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]), + "position_range": (-1.0, 1.0), + "velocity_range": (-0.1, 0.1), + }, + ) + + reset_pole_position = EventTerm( + func=mdp.reset_joints_by_offset, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]), + "position_range": (-0.125 * math.pi, 0.125 * math.pi), + "velocity_range": (-0.01 * math.pi, 0.01 * math.pi), + }, + ) + + +@configclass +class CartpoleEnvCfg(ManagerBasedEnvCfg): + """Configuration for the cartpole environment.""" + + # Scene settings + scene = CartpoleSceneCfg(env_spacing=2.5) + + # Basic settings + actions = ActionsCfg() + observations = ObservationsCfg() + events = EventCfg() + + def __post_init__(self): + """Post initialization.""" + # viewer settings + self.viewer.eye = [4.5, 0.0, 6.0] + self.viewer.lookat = [0.0, 0.0, 2.0] + # step settings + self.decimation = 4 # env step every 4 sim steps: 200Hz / 4 = 50Hz + # simulation settings + self.sim.dt = 0.005 # sim step every 5ms: 200Hz + + +class TestTextureRandomization(unittest.TestCase): + """Test for texture randomization""" + + """ + Tests + """ + + def test_texture_randomization(self): + # set the arguments + env_cfg = CartpoleEnvCfg() + env_cfg.scene.num_envs = 16 + env_cfg.scene.replicate_physics = False + + # setup base environment + env = ManagerBasedEnv(cfg=env_cfg) + + # simulate physics + with torch.inference_mode(): + for count in range(50): + # reset every few steps to check nothing breaks + if count % 10 == 0: + env.reset() + # sample random actions + joint_efforts = torch.randn_like(env.action_manager.action) + # step the environment + env.step(joint_efforts) + + env.close() + + def test_texture_randomization_failure_replicate_physics(self): + with self.assertRaises(ValueError): + cfg_failure = CartpoleEnvCfg() + cfg_failure.scene.num_envs = 16 + cfg_failure.scene.replicate_physics = True + env = ManagerBasedEnv(cfg_failure) + + env.close() + + +if __name__ == "__main__": + # run the main function + run_tests()