Skip to content

Adds texture and scale randomization event terms #2121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 25, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions source/isaaclab/isaaclab/envs/mdp/events.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@

from __future__ import annotations

import math
import torch
from typing import TYPE_CHECKING, Literal

@@ -274,6 +275,97 @@ def __call__(
self.asset.root_physx_view.set_material_properties(materials, env_ids)


class randomize_visual_texture_material(ManagerTermBase):
"""Randomize the visual texture of bodies on an asset using Replicator API.

This function randomizes the visual texture of the bodies of the asset using the Replicator API.
The function samples random textures from the given texture paths and applies them to the bodies
of the asset. The textures are projected onto the bodies and rotated by the given angles.

.. note::
The function assumes that the asset follows the prim naming convention as:
"{asset_prim_path}/{body_name}/visuals" where the body name is the name of the body to
which the texture is applied. This is the default prim ordering when importing assets
from the asset converters in Isaac Lab.

.. note::
When randomizing the texture of individual assets, please make sure to set
:attr:`isaaclab.scene.InteractiveSceneCfg.replicate_physics` to False. This ensures that physics
parser will parse the individual asset properties separately.
"""

def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv):
"""Initialize the term.

Args:
cfg: The configuration of the event term.
env: The environment instance.
"""
super().__init__(cfg, env)

# we import the module here since we may not always need the replicator
import omni.replicator.core as rep

# read parameters from the configuration
asset_cfg: SceneEntityCfg = cfg.params.get("asset_cfg")
texture_paths = cfg.params.get("texture_paths")
event_name = cfg.params.get("event_name")
texture_rotation = cfg.params.get("texture_rotation", (0.0, 0.0))

# check to make sure replicate_physics is set to False, else raise warning
if env.cfg.scene.replicate_physics:
raise ValueError(
"Unable to randomize visual texture material - ensure InteractiveSceneCfg's replicate_physics parameter"
" is set to False."
)

# convert from radians to degrees
texture_rotation = tuple(math.degrees(angle) for angle in texture_rotation)

# obtain the asset entity
asset_entity = env.scene[asset_cfg.name]
# join all bodies in the asset
body_names = asset_cfg.body_names
if isinstance(body_names, str):
body_names_regex = body_names
elif isinstance(body_names, list):
body_names_regex = "|".join(body_names)
else:
body_names_regex = ".*"

# Create the omni-graph node for the randomization term
def rep_texture_randomization():
prims_group = rep.get.prims(path_pattern=f"{asset_entity.cfg.prim_path}/{body_names_regex}/visuals")
Copy link
Contributor

@Mayankm96 Mayankm96 Mar 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm since "visuals" is something hardcoded, I think it isn't great as we asume asset structure here. Should we make an attribute child_prim_paths and let user specify the exact mesh paths they want to be modifying?

For example:

  • For cubes, this will be ["geometry/mesh"]
  • For articulations this will be "[body_A/visuals", "body_B/visuals]"


with prims_group:
rep.randomizer.texture(
textures=texture_paths, project_uvw=True, texture_rotate=rep.distribution.uniform(*texture_rotation)
)

return prims_group.node

# Register the event to the replicator
with rep.trigger.on_custom_event(event_name=event_name):
rep_texture_randomization()

def __call__(
self,
env: ManagerBasedEnv,
env_ids: torch.Tensor,
event_name: str,
asset_cfg: SceneEntityCfg,
texture_paths: list[str],
texture_rotation: tuple[float, float] = (0.0, 0.0),
):
# import replicator
import omni.replicator.core as rep

# only send the event to the replicator
# note: This triggers the nodes for all the environments.
# We need to investigate how to make it happen only for a subset based on env_ids.
rep.utils.send_og_event(event_name)


def randomize_rigid_body_mass(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
345 changes: 345 additions & 0 deletions source/isaaclab/test/envs/test_scale_randomization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can combine the two into a single test fixture with the event terms. Might make it easier to add new tests later instead of copying code for just function everytime :)

# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""
This script checks the functionality of scale randomization.
"""

from __future__ import annotations

"""Launch Isaac Sim Simulator first."""

from isaaclab.app import AppLauncher, run_tests

# launch omniverse app
app_launcher = AppLauncher(headless=True, enable_cameras=True)
simulation_app = app_launcher.app

"""Rest everything follows."""

import torch
import unittest

import omni.usd
from pxr import Sdf

import isaaclab.envs.mdp as mdp
import isaaclab.sim as sim_utils
from isaaclab.assets import AssetBaseCfg, RigidObject, RigidObjectCfg
from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg
from isaaclab.managers import ActionTerm, ActionTermCfg
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import SceneEntityCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.terrains import TerrainImporterCfg
from isaaclab.utils import configclass

##
# Custom action term
##


class CubeActionTerm(ActionTerm):
"""Simple action term that implements a PD controller to track a target position.
The action term is applied to the cube asset. It involves two steps:
1. **Process the raw actions**: Typically, this includes any transformations of the raw actions
that are required to map them to the desired space. This is called once per environment step.
2. **Apply the processed actions**: This step applies the processed actions to the asset.
It is called once per simulation step.
In this case, the action term simply applies the raw actions to the cube asset. The raw actions
are the desired target positions of the cube in the environment frame. The pre-processing step
simply copies the raw actions to the processed actions as no additional processing is required.
The processed actions are then applied to the cube asset by implementing a PD controller to
track the target position.
"""

_asset: RigidObject
"""The articulation asset on which the action term is applied."""

def __init__(self, cfg: CubeActionTermCfg, env: ManagerBasedEnv):
# call super constructor
super().__init__(cfg, env)
# create buffers
self._raw_actions = torch.zeros(env.num_envs, 3, device=self.device)
self._processed_actions = torch.zeros(env.num_envs, 3, device=self.device)
self._vel_command = torch.zeros(self.num_envs, 6, device=self.device)
# gains of controller
self.p_gain = cfg.p_gain
self.d_gain = cfg.d_gain

"""
Properties.
"""

@property
def action_dim(self) -> int:
return self._raw_actions.shape[1]

@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions

@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions

"""
Operations
"""

def process_actions(self, actions: torch.Tensor):
# store the raw actions
self._raw_actions[:] = actions
# no-processing of actions
self._processed_actions[:] = self._raw_actions[:]

def apply_actions(self):
# implement a PD controller to track the target position
pos_error = self._processed_actions - (self._asset.data.root_pos_w - self._env.scene.env_origins)
vel_error = -self._asset.data.root_lin_vel_w
# set velocity targets
self._vel_command[:, :3] = self.p_gain * pos_error + self.d_gain * vel_error
self._asset.write_root_velocity_to_sim(self._vel_command)


@configclass
class CubeActionTermCfg(ActionTermCfg):
"""Configuration for the cube action term."""

class_type: type = CubeActionTerm
"""The class corresponding to the action term."""

p_gain: float = 5.0
"""Proportional gain of the PD controller."""
d_gain: float = 0.5
"""Derivative gain of the PD controller."""


##
# Custom observation term
##


def base_position(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Root linear velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_pos_w - env.scene.env_origins


##
# Scene definition
##


@configclass
class MySceneCfg(InteractiveSceneCfg):
"""Example scene configuration.
The scene comprises of a ground plane, light source and floating cubes (gravity disabled).
"""

# add terrain
terrain = TerrainImporterCfg(prim_path="/World/ground", terrain_type="plane", debug_vis=False)

# add cube for scale randomization
cube1: RigidObjectCfg = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/cube1",
spawn=sim_utils.CuboidCfg(
size=(0.2, 0.2, 0.2),
rigid_props=sim_utils.RigidBodyPropertiesCfg(max_depenetration_velocity=1.0, disable_gravity=True),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
physics_material=sim_utils.RigidBodyMaterialCfg(),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 0.0, 0.0)),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 5)),
)

# add cube for static scale values
cube2: RigidObjectCfg = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/cube2",
spawn=sim_utils.CuboidCfg(
size=(0.2, 0.2, 0.2),
rigid_props=sim_utils.RigidBodyPropertiesCfg(max_depenetration_velocity=1.0, disable_gravity=True),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
physics_material=sim_utils.RigidBodyMaterialCfg(),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.0, 0.0, 0.0)),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 5)),
)

# lights
light = AssetBaseCfg(
prim_path="/World/light",
spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0),
)


##
# Environment settings
##


@configclass
class ActionsCfg:
"""Action specifications for the MDP."""

joint_pos = CubeActionTermCfg(asset_name="cube1")


@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""

@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group."""

# cube velocity
position = ObsTerm(func=base_position, params={"asset_cfg": SceneEntityCfg("cube1")})

def __post_init__(self):
self.enable_corruption = True
self.concatenate_terms = True

# observation groups
policy: PolicyCfg = PolicyCfg()


@configclass
class EventCfg:
"""Configuration for events."""

reset_base = EventTerm(
func=mdp.reset_root_state_uniform,
mode="reset",
params={
"pose_range": {"x": (-0.5, 0.5), "y": (-0.5, 0.5), "yaw": (-3.14, 3.14)},
"velocity_range": {
"x": (-0.5, 0.5),
"y": (-0.5, 0.5),
"z": (-0.5, 0.5),
},
"asset_cfg": SceneEntityCfg("cube1"),
},
)

# Scale randomization as intended
randomize_cube1__scale = EventTerm(
func=mdp.randomize_rigid_body_scale,
mode="prestartup",
params={
"scale_range": {"x": (0.5, 1.5), "y": (0.5, 1.5), "z": (0.5, 1.5)},
"asset_cfg": SceneEntityCfg("cube1"),
},
)

# Static scale values
randomize_cube2__scale = EventTerm(
func=mdp.randomize_rigid_body_scale,
mode="prestartup",
params={
"scale_range": {"x": (1.0, 1.0), "y": (1.0, 1.0), "z": (1.0, 1.0)},
"asset_cfg": SceneEntityCfg("cube2"),
},
)


##
# Environment configuration
##


@configclass
class CubeEnvCfg(ManagerBasedEnvCfg):
"""Configuration for the locomotion velocity-tracking environment."""

# Scene settings
scene: MySceneCfg = MySceneCfg(num_envs=10, env_spacing=2.5, replicate_physics=False)
# Basic settings
observations: ObservationsCfg = ObservationsCfg()
actions: ActionsCfg = ActionsCfg()
events: EventCfg = EventCfg()

def __post_init__(self):
"""Post initialization."""
# general settings
self.decimation = 2
# simulation settings
self.sim.dt = 0.01
self.sim.physics_material = self.scene.terrain.physics_material


class TestScaleRandomization(unittest.TestCase):
"""Test for texture randomization"""

"""
Tests
"""

def test_scale_randomization(self):
"""Main function."""

# setup base environment
env = ManagerBasedEnv(cfg=CubeEnvCfg())
# setup target position commands
target_position = torch.rand(env.num_envs, 3, device=env.device) * 2
target_position[:, 2] += 2.0
# offset all targets so that they move to the world origin
target_position -= env.scene.env_origins

stage = omni.usd.get_context().get_stage()

# test to make sure all assets in the scene are created
all_prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube.*/.*")
self.assertEqual(len(all_prim_paths), (env.num_envs * 2))

# test to make sure randomized values are truly random
applied_scaling_randomization = set()
prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube1")

for i in range(3):
prim_spec = Sdf.CreatePrimInLayer(stage.GetRootLayer(), prim_paths[i])
scale_spec = prim_spec.GetAttributeAtPath(prim_paths[i] + ".xformOp:scale")
if scale_spec.default in applied_scaling_randomization:
raise ValueError(
"Detected repeat in applied scale values - indication scaling randomization is not working."
)
applied_scaling_randomization.add(scale_spec.default)

# test to make sure that fixed values are assigned correctly
prim_paths = sim_utils.find_matching_prim_paths("/World/envs/env_.*/cube2")
for i in range(3):
prim_spec = Sdf.CreatePrimInLayer(stage.GetRootLayer(), prim_paths[i])
scale_spec = prim_spec.GetAttributeAtPath(prim_paths[i] + ".xformOp:scale")
self.assertEqual(tuple(scale_spec.default), (1.0, 1.0, 1.0))

# simulate physics
with torch.inference_mode():
for count in range(200):
# reset every few steps to check nothing breaks
if count % 100 == 0:
env.reset()
# step the environment
env.step(target_position)

env.close()

def test_scale_randomization_failure_replicate_physics(self):
with self.assertRaises(ValueError):
cfg_failure = CubeEnvCfg()
cfg_failure.scene.replicate_physics = True
env = ManagerBasedEnv(cfg_failure)

env.close()


if __name__ == "__main__":
run_tests()
189 changes: 189 additions & 0 deletions source/isaaclab/test/envs/test_texture_randomization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""
This script tests the functionality of texture randomization applied to the cartpole scene.
"""

"""Launch Isaac Sim Simulator first."""

from isaaclab.app import AppLauncher, run_tests

# launch omniverse app
app_launcher = AppLauncher(headless=True, enable_cameras=True)
simulation_app = app_launcher.app

"""Rest everything follows."""

import math
import torch
import unittest

import isaaclab.envs.mdp as mdp
from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import SceneEntityCfg
from isaaclab.utils import configclass
from isaaclab.utils.assets import NVIDIA_NUCLEUS_DIR

from isaaclab_tasks.manager_based.classic.cartpole.cartpole_env_cfg import CartpoleSceneCfg


@configclass
class ActionsCfg:
"""Action specifications for the environment."""

joint_efforts = mdp.JointEffortActionCfg(asset_name="robot", joint_names=["slider_to_cart"], scale=5.0)


@configclass
class ObservationsCfg:
"""Observation specifications for the environment."""

@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group."""

# observation terms (order preserved)
joint_pos_rel = ObsTerm(func=mdp.joint_pos_rel)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel)

def __post_init__(self) -> None:
self.enable_corruption = False
self.concatenate_terms = True

# observation groups
policy: PolicyCfg = PolicyCfg()


@configclass
class EventCfg:
"""Configuration for events."""

# on reset apply a new set of textures
cart_texture_randomizer = EventTerm(
func=mdp.randomize_visual_texture_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=["cart"]),
"texture_paths": [
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Bamboo_Planks/Bamboo_Planks_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Cherry/Cherry_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Oak/Oak_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber/Timber_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber_Cladding/Timber_Cladding_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Walnut_Planks/Walnut_Planks_BaseColor.png",
],
"event_name": "cart_texture_randomizer",
"texture_rotation": (math.pi / 2, math.pi / 2),
},
)

pole_texture_randomizer = EventTerm(
func=mdp.randomize_visual_texture_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=["pole"]),
"texture_paths": [
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Bamboo_Planks/Bamboo_Planks_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Cherry/Cherry_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Oak/Oak_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber/Timber_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber_Cladding/Timber_Cladding_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Walnut_Planks/Walnut_Planks_BaseColor.png",
],
"event_name": "pole_texture_randomizer",
"texture_rotation": (math.pi / 2, math.pi / 2),
},
)

reset_cart_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]),
"position_range": (-1.0, 1.0),
"velocity_range": (-0.1, 0.1),
},
)

reset_pole_position = EventTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]),
"position_range": (-0.125 * math.pi, 0.125 * math.pi),
"velocity_range": (-0.01 * math.pi, 0.01 * math.pi),
},
)


@configclass
class CartpoleEnvCfg(ManagerBasedEnvCfg):
"""Configuration for the cartpole environment."""

# Scene settings
scene = CartpoleSceneCfg(env_spacing=2.5)

# Basic settings
actions = ActionsCfg()
observations = ObservationsCfg()
events = EventCfg()

def __post_init__(self):
"""Post initialization."""
# viewer settings
self.viewer.eye = [4.5, 0.0, 6.0]
self.viewer.lookat = [0.0, 0.0, 2.0]
# step settings
self.decimation = 4 # env step every 4 sim steps: 200Hz / 4 = 50Hz
# simulation settings
self.sim.dt = 0.005 # sim step every 5ms: 200Hz


class TestTextureRandomization(unittest.TestCase):
"""Test for texture randomization"""

"""
Tests
"""

def test_texture_randomization(self):
# set the arguments
env_cfg = CartpoleEnvCfg()
env_cfg.scene.num_envs = 16
env_cfg.scene.replicate_physics = False

# setup base environment
env = ManagerBasedEnv(cfg=env_cfg)

# simulate physics
with torch.inference_mode():
for count in range(50):
# reset every few steps to check nothing breaks
if count % 10 == 0:
env.reset()
# sample random actions
joint_efforts = torch.randn_like(env.action_manager.action)
# step the environment
env.step(joint_efforts)

env.close()

def test_texture_randomization_failure_replicate_physics(self):
with self.assertRaises(ValueError):
cfg_failure = CartpoleEnvCfg()
cfg_failure.scene.num_envs = 16
cfg_failure.scene.replicate_physics = True
env = ManagerBasedEnv(cfg_failure)

env.close()


if __name__ == "__main__":
# run the main function
run_tests()