Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions examples/visualization/curobo_collision_spheres.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@

parser = argparse.ArgumentParser(description="Visualize cuRobo collision spheres during pipeline execution.")
parser.add_argument("--pipeline_id", type=str, required=True, help="Name of the autosim pipeline.")
parser.add_argument(
"--curobo_link_name", type=str, default=None, help="cuRobo link name to query pose in robot-root frame."
)
parser.add_argument(
"--isaaclab_link_name", type=str, default=None, help="Isaac Lab body name to query pose in robot-root frame."
)

AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
Expand Down Expand Up @@ -175,7 +181,58 @@ def _update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii):
_update_ee_marker(vm_ee, _get_ee_pose_world(pipeline, env_id))


def _execute_single_skill_with_viz(pipeline, skill, goal, vm_spheres, vm_ee, unique_radii, env_id):
def _print_link_pose_in_root_frame(
pipeline, env_id: int, curobo_link_name: str | None, isaaclab_link_name: str | None
) -> None:
"""Print link pose in robot-root frame from cuRobo FK and/or Isaac Lab body_state_w.

- cuRobo: FK directly gives the pose in robot-root frame.
- Isaac Lab: body_state_w (world frame) minus root_pose_w via subtract_frame_transforms.
"""
import isaaclab.utils.math as PoseUtils
from curobo.types.state import JointState as CuroboJointState

planner = pipeline._motion_planner
robot = pipeline._robot

# --- cuRobo ---
if curobo_link_name is not None:
q_curobo = _build_curobo_q(pipeline, env_id)
js = CuroboJointState(position=q_curobo, joint_names=planner.target_joint_names)
kin_state = planner.motion_gen.compute_kinematics(js)
link_poses_curobo = kin_state.link_poses
if curobo_link_name not in link_poses_curobo:
print(f"[cuRobo:{curobo_link_name}] link not found. Available: {list(link_poses_curobo.keys())}")
else:
link_pose_root = link_poses_curobo[curobo_link_name]
pos = link_pose_root.position.view(-1).detach().cpu()
quat = link_pose_root.quaternion.view(-1).detach().cpu() # wxyz
print(f"[cuRobo:{curobo_link_name}] (root frame) pos={pos.tolist()} quat(wxyz)={quat.tolist()}")

# --- Isaac Lab ---
if isaaclab_link_name is not None:
body_names = list(robot.data.body_names)
if isaaclab_link_name not in body_names:
print(f"[IsaacLab:{isaaclab_link_name}] link not found. Available: {body_names}")
else:
idx = body_names.index(isaaclab_link_name)
body_state = robot.data.body_state_w[env_id, idx].detach()
root_pos_w = robot.data.root_pos_w[env_id].detach()
root_quat_w = robot.data.root_quat_w[env_id].detach() # wxyz
pos_il, quat_il = PoseUtils.subtract_frame_transforms(
root_pos_w.unsqueeze(0),
root_quat_w.unsqueeze(0),
body_state[:3].unsqueeze(0),
body_state[3:7].unsqueeze(0),
)
pos_il = pos_il.squeeze(0).cpu()
quat_il = quat_il.squeeze(0).cpu() # wxyz
print(f"[IsaacLab:{isaaclab_link_name}] (root frame) pos={pos_il.tolist()} quat(wxyz)={quat_il.tolist()}")


def _execute_single_skill_with_viz(
pipeline, skill, goal, vm_spheres, vm_ee, unique_radii, env_id, curobo_link_name=None, isaaclab_link_name=None
):
"""Inlined from AutoSimPipeline._execute_single_skill with per-step visualization."""
world_state = pipeline._build_world_state()
plan_success = skill.plan(world_state, goal)
Expand All @@ -195,6 +252,8 @@ def _execute_single_skill_with_viz(pipeline, skill, goal, vm_spheres, vm_ee, uni

pipeline._env.sim.render()
_update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii)
if curobo_link_name is not None or isaaclab_link_name is not None:
_print_link_pose_in_root_frame(pipeline, env_id, curobo_link_name, isaaclab_link_name)

steps += 1
if output.done:
Expand Down Expand Up @@ -249,7 +308,15 @@ def main():

goal = skill.extract_goal_from_info(skill_info, pipeline._env, pipeline._env_extra_info)
success, steps = _execute_single_skill_with_viz(
pipeline, skill, goal, vm_spheres, vm_ee, unique_radii, env_id
pipeline,
skill,
goal,
vm_spheres,
vm_ee,
unique_radii,
env_id,
curobo_link_name=args_cli.curobo_link_name,
isaaclab_link_name=args_cli.isaaclab_link_name,
)

if not success:
Expand Down
112 changes: 107 additions & 5 deletions examples/visualization/curobo_world_obstacles.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,57 @@ def _compose_robot_to_world(*, robot_root_pose_w: torch.Tensor, pose_r: _Pose7)
return _Pose7(pos=pos_w.view(3), quat=quat_w.view(4))


def _is_obstacle_enabled(collision_checker, obstacle_name: str, env_idx: int = 0) -> bool:
"""Check if an obstacle is enabled in the collision checker.

Args:
collision_checker: cuRobo collision checker instance
obstacle_name: Name of the obstacle
env_idx: Environment index

Returns:
True if the obstacle is enabled, False otherwise
"""
try:
# Try cuboid (OBB)
if hasattr(collision_checker, "get_obb_idx"):
try:
obs_idx = collision_checker.get_obb_idx(obstacle_name, env_idx)
if obs_idx is not None:
return collision_checker._cube_tensor_list[2][env_idx, obs_idx].item() == 1
except (ValueError, IndexError, KeyError):
pass

# Try mesh
if hasattr(collision_checker, "get_mesh_idx"):
try:
mesh_idx = collision_checker.get_mesh_idx(obstacle_name, env_idx)
if mesh_idx is not None:
return collision_checker._mesh_tensor_list[2][env_idx, mesh_idx].item() == 1
except (ValueError, IndexError, KeyError):
pass

# Try voxel
if hasattr(collision_checker, "get_voxel_idx"):
try:
voxel_idx = collision_checker.get_voxel_idx(obstacle_name, env_idx)
if voxel_idx is not None:
return collision_checker._voxel_tensor_list[2][env_idx, voxel_idx].item() == 1
except (ValueError, IndexError, KeyError):
pass

# Try blox
if hasattr(collision_checker, "_blox_names") and obstacle_name in collision_checker._blox_names:
blox_idx = collision_checker._blox_names.index(obstacle_name)
return collision_checker._blox_tensor_list[1][blox_idx].item() == 1

except Exception:
pass

# Default to enabled if we can't determine the state
return True


def _visualize_world_obstacles(
*,
pipeline,
Expand All @@ -143,11 +194,24 @@ def _visualize_world_obstacles(
thickness: float = 2.0,
z_lift: float = 0.0,
max_mesh_vertices: int = 200000,
show_disabled: bool = True,
):
"""Visualize world obstacles from cuRobo collision checker.

Args:
pipeline: AutoSim pipeline instance
env_id: Environment index
color: RGBA color for enabled obstacles
thickness: Line thickness
z_lift: Z-axis offset for visualization
max_mesh_vertices: Skip meshes with more vertices than this
show_disabled: If True, show disabled obstacles in cyan (default: True)
"""
planner = pipeline._motion_planner
planner.sync_dynamic_objects()

world_model = planner.motion_gen.world_coll_checker.world_model
collision_checker = planner.motion_gen.world_coll_checker
if world_model is None:
raise RuntimeError("cuRobo collision checker has no world model loaded.")

Expand All @@ -159,43 +223,79 @@ def _visualize_world_obstacles(

# NOTE: poses in the world_model are in the robot-root frame

# Cyan (complement of red) for disabled obstacles - highly visible and indicates "inactive"
disabled_color = (0.2, 0.8, 0.8, 0.35) # Cyan with transparency

# Cuboids (OBB pose + dims)
for cub in world_model.cuboid:
is_enabled = _is_obstacle_enabled(collision_checker, cub.name, env_id)
if not is_enabled and not show_disabled:
continue
obstacle_color = color if is_enabled else disabled_color

pose_r = _as_pose7(cub.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
dims = torch.as_tensor(cub.dims, device=device, dtype=dtype).view(3)
half_dims = dims * 0.5
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)
_draw_oriented_box(
pose_w=pose_w, half_dims_xyz=half_dims, color=obstacle_color, thickness=thickness, z_lift=z_lift
)

# Spheres (pose center + radius) -> draw as cube approximation
for sph in world_model.sphere:
is_enabled = _is_obstacle_enabled(collision_checker, sph.name, env_id)
if not is_enabled and not show_disabled:
continue
obstacle_color = color if is_enabled else disabled_color

pose_r = _as_pose7(sph.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
r = float(sph.radius)
half_dims = torch.tensor([r, r, r], device=device, dtype=dtype)
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)
_draw_oriented_box(
pose_w=pose_w, half_dims_xyz=half_dims, color=obstacle_color, thickness=thickness, z_lift=z_lift
)

# Cylinders (pose + radius + height) -> draw as oriented bounding box approximation
for cyl in world_model.cylinder:
is_enabled = _is_obstacle_enabled(collision_checker, cyl.name, env_id)
if not is_enabled and not show_disabled:
continue
obstacle_color = color if is_enabled else disabled_color

pose_r = _as_pose7(cyl.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
half_dims = torch.tensor(
[float(cyl.radius), float(cyl.radius), float(cyl.height) * 0.5], device=device, dtype=dtype
)
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)
_draw_oriented_box(
pose_w=pose_w, half_dims_xyz=half_dims, color=obstacle_color, thickness=thickness, z_lift=z_lift
)

# Capsules (pose + radius + base/tip) -> draw as oriented bounding box approximation
for cap in world_model.capsule:
is_enabled = _is_obstacle_enabled(collision_checker, cap.name, env_id)
if not is_enabled and not show_disabled:
continue
obstacle_color = color if is_enabled else disabled_color

pose_r = _as_pose7(cap.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
base = torch.as_tensor(cap.base, device=device, dtype=dtype).view(3)
tip = torch.as_tensor(cap.tip, device=device, dtype=dtype).view(3)
height = float((tip - base).norm().item())
half_dims = torch.tensor([float(cap.radius), float(cap.radius), height * 0.5], device=device, dtype=dtype)
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)
_draw_oriented_box(
pose_w=pose_w, half_dims_xyz=half_dims, color=obstacle_color, thickness=thickness, z_lift=z_lift
)

# Mesh obstacles: prefer a tight OBB via cuRobo's `Mesh.get_cuboid()`.
for mesh in world_model.mesh:
is_enabled = _is_obstacle_enabled(collision_checker, mesh.name, env_id)
if not is_enabled and not show_disabled:
continue
obstacle_color = color if is_enabled else disabled_color

verts = getattr(mesh, "vertices", None)
if verts is not None and len(verts) > int(max_mesh_vertices):
continue
Expand All @@ -211,7 +311,9 @@ def _visualize_world_obstacles(
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
dims = torch.as_tensor(cub.dims, device=device, dtype=dtype).view(3)
half_dims = dims * 0.5
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)
_draw_oriented_box(
pose_w=pose_w, half_dims_xyz=half_dims, color=obstacle_color, thickness=thickness, z_lift=z_lift
)


def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def __init__(
# Define supported cuRobo primitive types for object discovery and pose synchronization
self.primitive_types: list[str] = ["mesh", "cuboid", "sphere", "capsule", "cylinder", "voxel", "blox"]

# Current target object for selective collision checking
self._current_target_object: str | None = None

def _refine_config_from_env(self, env: ManagerBasedEnv):
"""Refine the config from the environment."""

Expand Down Expand Up @@ -217,9 +220,23 @@ def _invalidate_object_mapping_cache(self) -> None:

self._cached_object_mappings = None

def set_target_object(self, target_object: str | None) -> None:
"""Set the current target object for selective collision checking.

Args:
target_object: Name of the target object, or None to disable all dynamic objects.
"""

self._current_target_object = target_object
self._logger.debug(f"Target object set to: {target_object}")

def sync_dynamic_objects(self) -> int:
"""Synchronize dynamic object poses into cuRobo world model.

If `only_enable_target_object_in_world_sync` is enabled:
- If target_object is set: only that object will be enabled
- If target_object is None: all dynamic objects will be disabled

Returns:
Number of obstacles whose pose was updated.
"""
Expand All @@ -232,6 +249,24 @@ def sync_dynamic_objects(self) -> int:
robot_root_pos_in_world, robot_root_quat_in_world = self._robot.data.root_pos_w, self._robot.data.root_quat_w

updated_count = 0

# Determine which objects should be enabled
if self.cfg.only_enable_target_object_in_world_sync:
if self._current_target_object:
# Only enable target object (reach scenario)
objects_to_enable = {self._current_target_object}
self._logger.debug(
f"Selective collision: only enabling '{self._current_target_object}', "
f"disabling {len(object_mappings) - 1} others"
)
else:
# Disable all dynamic objects (lift/push/pull scenario)
objects_to_enable = set()
self._logger.debug(f"Selective collision: disabling all {len(object_mappings)} dynamic objects")
else:
# Config not enabled, enable all objects
objects_to_enable = set(object_mappings.keys())

for object_name, world_obstacle_names in object_mappings.items():
obj = rigid_objects[object_name]
# NOTE: cuRobo world model is in the robot-root frame
Expand All @@ -243,13 +278,24 @@ def sync_dynamic_objects(self) -> int:
position=self._to_curobo_device(obj_pos_in_robot_root[self._env_id]),
quaternion=self._to_curobo_device(obj_quat_in_robot_root[self._env_id]),
)

# Determine if this object should be enabled
should_enable = object_name in objects_to_enable

for world_obstacle_name in world_obstacle_names:
# Update pose
self.motion_gen.world_coll_checker.update_obstacle_pose(
world_obstacle_name,
obj_pose,
env_idx=self._env_id,
update_cpu_reference=True,
)
# Enable or disable obstacle
self.motion_gen.world_coll_checker.enable_obstacle(
world_obstacle_name,
enable=should_enable,
env_idx=self._env_id,
)
updated_count += 1

return updated_count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ class CuroboPlannerCfg:
"""List of subffixes to only extract world obstacles from."""
world_ignore_subffixes: list[str] | None = None
"""List of subffixes to ignore when extracting world obstacles."""
# Self-collision configuration
self_collision_check: bool = True
"""Whether to check self-collision during planning."""
self_collision_opt: bool = True
"""Whether to optimize away self-collisions during planning."""

# World update strategy
enable_dynamic_world_sync: bool = False
enable_dynamic_world_sync: bool = True
"""If True, synchronize dynamic object poses into cuRobo world before planning (fast incremental update)."""
only_enable_target_object_in_world_sync: bool = True
"""If True, only enable the target object in the world sync when executing the reach (grasp) skill. Only valid when enable_dynamic_world_sync is True."""

# Debug and visualization
debug_planner: bool = False
Expand Down
4 changes: 4 additions & 0 deletions source/autosim/autosim/skills/reach.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool:

self._logger.debug(f"Reach from pose in environment: {state.robot_ee_pose}")

# Set current target object for selective collision checking
self._planner.set_target_object(goal.target_object)

target_pose = goal.target_pose # target pose in the robot root frame
target_pos, target_quat = target_pose[:3], target_pose[3:]

Expand Down Expand Up @@ -487,3 +490,4 @@ def reset(self) -> None:
self._saved_target_object = None
self._saved_reach_offset = None
self._saved_env_extra_info = None
self._planner.set_target_object(None)
Loading
Loading