Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 270 additions & 0 deletions examples/visualization/curobo_collision_spheres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
"""Visualize cuRobo robot self-collision spheres during pipeline execution.

This script runs an AutoSim pipeline and updates collision sphere + EE frame
visualization after every simulation step.

Usage
-----
Run with Isaac Sim UI enabled (do NOT use ``--headless``):

python examples/visualization/curobo_collision_spheres.py --pipeline_id <PIPELINE_ID>

Defaults
--------
* Environment ID: 0
* Sphere color: green (0.2, 0.9, 0.2)
* Sphere opacity: 0.4
* EE frame scale: 0.1

Notes
-----
* Pipeline execution logic is inlined so visualization can hook into every step.
* Spheres with radius <= 0 are disabled placeholders and are skipped.
* VisualizationMarkers groups spheres by radius (one USD prototype per unique radius).
"""

from __future__ import annotations

import argparse

import numpy as np
import torch
from isaaclab.app import AppLauncher

parser = argparse.ArgumentParser(description="Visualize cuRobo collision spheres during pipeline execution.")
parser.add_argument("--pipeline_id", type=str, required=True, help="Name of the autosim pipeline.")

AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()

app_launcher = AppLauncher(vars(args_cli))
simulation_app = app_launcher.app

import isaaclab.sim as sim_utils
from isaaclab.markers import VisualizationMarkers, VisualizationMarkersCfg
from isaaclab.markers.config import FRAME_MARKER_CFG

import autosim_examples # noqa: F401
from autosim import make_pipeline
from autosim.core.registration import SkillRegistry


def _build_curobo_q(pipeline, env_id: int) -> torch.Tensor:
"""Build a joint position tensor in cuRobo's joint order from Isaac Lab state.

Isaac Lab and cuRobo use different joint orderings. We look up each cuRobo
joint by name in Isaac Lab's joint_names list and reorder accordingly.
Joints not present in Isaac Lab (e.g. virtual base joints) are set to 0.
"""
planner = pipeline._motion_planner
robot = pipeline._robot

isaaclab_names = list(robot.data.joint_names)
isaaclab_q = robot.data.joint_pos[env_id]

q = torch.zeros(len(planner.target_joint_names), dtype=isaaclab_q.dtype, device=isaaclab_q.device)
for i, name in enumerate(planner.target_joint_names):
if name in isaaclab_names:
q[i] = isaaclab_q[isaaclab_names.index(name)]
# joints missing from Isaac Lab (virtual base joints) stay at 0

return planner._to_curobo_device(q)


def _get_spheres_world(pipeline, env_id: int) -> tuple[np.ndarray, np.ndarray]:
"""Return (positions, radii) for all active collision spheres in world frame."""
import isaaclab.utils.math as PoseUtils
from curobo.types.state import JointState

planner = pipeline._motion_planner
robot = pipeline._robot

q_curobo = _build_curobo_q(pipeline, env_id)
js = JointState(position=q_curobo, joint_names=planner.target_joint_names)
kin_state = planner.motion_gen.compute_kinematics(js)

spheres_root = kin_state.robot_spheres[0].detach() # [N, 4]

root_pose = robot.data.root_pose_w[env_id].detach()
robot_root_pos = root_pose[:3]
robot_root_quat = root_pose[3:] # wxyz

device, dtype = root_pose.device, root_pose.dtype
xyz = spheres_root[:, :3].to(device=device, dtype=dtype)
radii_t = spheres_root[:, 3].to(device=device, dtype=dtype)

n = xyz.shape[0]
robot_root_pos_b = robot_root_pos.unsqueeze(0).expand(n, -1)
robot_root_quat_b = robot_root_quat.unsqueeze(0).expand(n, -1)
identity = torch.tensor([1.0, 0.0, 0.0, 0.0], device=device, dtype=dtype).unsqueeze(0).expand(n, -1)

centers_w, _ = PoseUtils.combine_frame_transforms(robot_root_pos_b, robot_root_quat_b, xyz, identity)

mask = radii_t > 0.0
positions = centers_w[mask].cpu().numpy()
radii = radii_t[mask].cpu().numpy()
return positions, radii


def _get_ee_pose_world(pipeline, env_id: int) -> torch.Tensor:
"""Return EE pose in world frame as [x, y, z, qw, qx, qy, qz] via cuRobo FK."""
import isaaclab.utils.math as PoseUtils

planner = pipeline._motion_planner
robot = pipeline._robot

q_curobo = _build_curobo_q(pipeline, env_id)
ee_pose_root = planner.get_ee_pose(q_curobo)

root_pose = robot.data.root_pose_w[env_id].detach()
rr_pos = root_pose[:3].unsqueeze(0)
rr_quat = root_pose[3:].unsqueeze(0) # wxyz

device, dtype = root_pose.device, root_pose.dtype
ee_pos_root = ee_pose_root.position.view(1, 3).to(device=device, dtype=dtype)
ee_quat_root = ee_pose_root.quaternion.view(1, 4).to(device=device, dtype=dtype) # wxyz

ee_pos_w, ee_quat_w = PoseUtils.combine_frame_transforms(rr_pos, rr_quat, ee_pos_root, ee_quat_root)
return torch.cat([ee_pos_w, ee_quat_w], dim=-1).squeeze(0) # [7]


def _create_ee_marker(scale: float) -> VisualizationMarkers:
"""Create a frame-axis marker for the EE pose."""
cfg = FRAME_MARKER_CFG.copy()
cfg.markers["frame"].scale = (scale, scale, scale)
cfg = cfg.replace(prim_path="/World/debug/ee_frame")
return VisualizationMarkers(cfg)


def _update_ee_marker(vm: VisualizationMarkers, pose_w: torch.Tensor) -> None:
pos = pose_w[:3].unsqueeze(0) # [1, 3]
quat = pose_w[3:].unsqueeze(0) # [1, 4] wxyz
vm.visualize(translations=pos, orientations=quat, marker_indices=[0])


def _create_markers(unique_radii: np.ndarray, color: list[float], alpha: float) -> VisualizationMarkers:
"""Build a VisualizationMarkers with one sphere prototype per unique radius."""
markers_cfg: dict[str, sim_utils.SphereCfg] = {}
for i, r in enumerate(unique_radii):
markers_cfg[f"sphere_{i}"] = sim_utils.SphereCfg(
radius=float(r),
visual_material=sim_utils.PreviewSurfaceCfg(
diffuse_color=tuple(color),
opacity=alpha,
),
)
cfg = VisualizationMarkersCfg(prim_path="/World/debug/collision_spheres", markers=markers_cfg)
return VisualizationMarkers(cfg)


def _update_markers(
vm: VisualizationMarkers,
positions: np.ndarray,
radii: np.ndarray,
unique_radii: np.ndarray,
) -> None:
radius_to_idx = {float(r): i for i, r in enumerate(unique_radii)}
marker_indices = np.array([radius_to_idx[float(r)] for r in radii], dtype=np.int32)
translations = torch.from_numpy(positions).float()
vm.visualize(translations=translations, marker_indices=marker_indices.tolist())


def _update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii):
positions, radii = _get_spheres_world(pipeline, env_id)
_update_markers(vm_spheres, positions, radii, unique_radii)
_update_ee_marker(vm_ee, _get_ee_pose_world(pipeline, env_id))


def _execute_single_skill_with_viz(pipeline, skill, goal, vm_spheres, vm_ee, unique_radii, env_id):
"""Inlined from AutoSimPipeline._execute_single_skill with per-step visualization."""
world_state = pipeline._build_world_state()
plan_success = skill.plan(world_state, goal)

steps = 0
while plan_success and steps < pipeline.cfg.max_steps:
world_state = pipeline._build_world_state()
output = skill.step(world_state)

adapter_result = pipeline._action_adapter.apply(skill, output, pipeline._env)
action = pipeline._last_action.clone()
action[pipeline._env_id, : adapter_result.shape[0]] = adapter_result

pipeline._env.step(action)
pipeline._last_action = action
pipeline._generated_actions.append(action)

pipeline._env.sim.render()
_update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii)

steps += 1
if output.done:
return True, steps

if steps >= pipeline.cfg.max_steps:
world_state = pipeline._build_world_state()
current_pos = world_state.robot_base_pose[:2]
if goal.target_pose is not None:
target_pos = goal.target_pose[:2]
dist = float(torch.linalg.norm(current_pos - target_pos))
pipeline._logger.warning(
f"Max steps reached. Current pos: ({current_pos[0]:.3f}, {current_pos[1]:.3f}), "
f"Target pos: ({target_pos[0]:.3f}, {target_pos[1]:.3f}), Distance: {dist:.3f}m"
)

return False, steps


def main():
env_id = 0
color = [0.2, 0.9, 0.2]
alpha = 0.4
ee_scale = 0.1

pipeline = make_pipeline(args_cli.pipeline_id)
pipeline.initialize()

# Build markers using the initial robot pose (before reset)
positions, radii = _get_spheres_world(pipeline, env_id)
unique_radii = np.unique(radii)
vm_spheres = _create_markers(unique_radii, color, alpha)
vm_ee = _create_ee_marker(scale=ee_scale)

# Decompose task
decompose_result = pipeline.decompose()

# Execute skill sequence with per-step visualization
pipeline._check_skill_extra_cfg()
pipeline.reset_env()
_update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii)

for subtask in decompose_result.subtasks:
for skill_info in subtask.skills:
skill = SkillRegistry.create(
skill_info.skill_type, pipeline.cfg.skills.get(skill_info.skill_type).extra_cfg
)

if pipeline._action_adapter.should_skip_apply(skill):
pipeline._logger.info(f"Skill {skill_info.skill_type} skipped.")
continue

goal = skill.extract_goal_from_info(skill_info, pipeline._env, pipeline._env_extra_info)
success, steps = _execute_single_skill_with_viz(
pipeline, skill, goal, vm_spheres, vm_ee, unique_radii, env_id
)

if not success:
pipeline._logger.error(f"Skill {skill_info.skill_type} failed after {steps} steps.")
raise ValueError(f"Skill {skill_info.skill_type} failed after {steps} steps.")
pipeline._logger.info(f"Skill {skill_info.skill_type} done ({steps} steps).")

pipeline._logger.info(f"Subtask {subtask.subtask_name} completed.")

pipeline._logger.info("Pipeline execution completed.")

while simulation_app.is_running():
pipeline._env.sim.render()


if __name__ == "__main__":
main()
simulation_app.close()
Loading