Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions examples/visualization/curobo_world_obstacles.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Visualize cuRobo world obstacles in the Isaac Sim viewport.

This script reads the static obstacle world that cuRobo uses for collision checking
(as built inside `autosim`'s `CuroboPlanner._initialize_static_world()`), converts
all obstacle poses from the planner's reference frame (robot root) to the world frame,
and draws simple wireframes.
This script reads the obstacle world currently held inside cuRobo's collision checker
(`planner.motion_gen.world_coll_checker.world_model`) and draws simple wireframes.
Before drawing, it synchronizes dynamic rigid object poses so the visualization matches
the obstacle poses currently used by cuRobo for collision checking.

Usage
-----
Expand All @@ -13,10 +13,8 @@

Notes
-----
* cuRobo's `UsdHelper.get_obstacles_from_stage(..., reference_prim_path=robot_prim_path)`
applies an inverse reference transform, so obstacle poses are returned in the
**robot root frame** (not world). We therefore compose with the robot root pose
in world to get world-frame obstacle poses for visualization.
* This script reads from cuRobo's live collision world instead of rebuilding from USD,
so reset-time pose changes for dynamic rigid objects are reflected in the visualization.
* Wireframe drawing uses debug lines; keep the app running to inspect the scene.
"""

Expand All @@ -38,7 +36,6 @@
app_launcher = AppLauncher(vars(args_cli))
simulation_app = app_launcher.app


import isaaclab.utils.math as PoseUtils

import autosim_examples # noqa: F401
Expand Down Expand Up @@ -148,36 +145,38 @@ def _visualize_world_obstacles(
max_mesh_vertices: int = 200000,
):
planner = pipeline._motion_planner
if not hasattr(planner, "_static_world_config"):
raise RuntimeError(
"CuroboPlanner has no `_static_world_config`. Ensure planner initialized and warmup complete."
)
world_r = planner._static_world_config # WorldConfig in robot root frame
planner.sync_dynamic_objects()

world_model = planner.motion_gen.world_coll_checker.world_model
if world_model is None:
raise RuntimeError("cuRobo collision checker has no world model loaded.")

robot_root_pose_w = pipeline._robot.data.root_pose_w[env_id].detach()
device = robot_root_pose_w.device
dtype = robot_root_pose_w.dtype

clear_debug_drawing()

# NOTE: poses in the world_model are in the robot-root frame

# Cuboids (OBB pose + dims)
for cub in world_r.cuboid:
for cub in world_model.cuboid:
pose_r = _as_pose7(cub.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
dims = torch.as_tensor(cub.dims, device=device, dtype=dtype).view(3)
half_dims = dims * 0.5
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)

# Spheres (pose center + radius) -> draw as cube approximation
for sph in world_r.sphere:
for sph in world_model.sphere:
pose_r = _as_pose7(sph.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
r = float(sph.radius)
half_dims = torch.tensor([r, r, r], device=device, dtype=dtype)
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)

# Cylinders (pose + radius + height) -> draw as oriented bounding box approximation
for cyl in world_r.cylinder:
for cyl in world_model.cylinder:
pose_r = _as_pose7(cyl.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
half_dims = torch.tensor(
Expand All @@ -186,7 +185,7 @@ def _visualize_world_obstacles(
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)

# Capsules (pose + radius + base/tip) -> draw as oriented bounding box approximation
for cap in world_r.capsule:
for cap in world_model.capsule:
pose_r = _as_pose7(cap.pose, device=device, dtype=dtype)
pose_w = _compose_robot_to_world(robot_root_pose_w=robot_root_pose_w, pose_r=pose_r)
base = torch.as_tensor(cap.base, device=device, dtype=dtype).view(3)
Expand All @@ -196,7 +195,7 @@ def _visualize_world_obstacles(
_draw_oriented_box(pose_w=pose_w, half_dims_xyz=half_dims, color=color, thickness=thickness, z_lift=z_lift)

# Mesh obstacles: prefer a tight OBB via cuRobo's `Mesh.get_cuboid()`.
for mesh in world_r.mesh:
for mesh in world_model.mesh:
verts = getattr(mesh, "vertices", None)
if verts is not None and len(verts) > int(max_mesh_vertices):
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import torch
from curobo.cuda_robot_model.util import load_robot_yaml
from curobo.geom.types import WorldConfig
from curobo.rollout.cost.pose_cost import PoseCostMetric
from curobo.types.base import TensorDeviceType
from curobo.types.file_path import ContentPath
from curobo.types.math import Pose
Expand All @@ -20,6 +21,7 @@
)
from isaaclab.assets import Articulation
from isaaclab.envs import ManagerBasedEnv
from isaaclab.utils.math import subtract_frame_transforms

from autosim.core.logger import AutoSimLogger

Expand Down Expand Up @@ -96,6 +98,9 @@ def __init__(
# Read static world geometry once
self._initialize_static_world()

# Cache for dynamic world synchronization.
self._cached_object_mappings: dict[str, str] | None = None

# Define supported cuRobo primitive types for object discovery and pose synchronization
self.primitive_types: list[str] = ["mesh", "cuboid", "sphere", "capsule", "cylinder", "voxel", "blox"]

Expand Down Expand Up @@ -150,22 +155,102 @@ def _initialize_static_world(self) -> None:
env_prim_path = f"/World/envs/env_{self._env_id}"
robot_prim_path = self.cfg.robot_prim_path or f"{env_prim_path}/Robot"

only_paths = [f"{env_prim_path}/{sub}" for sub in self.cfg.world_only_subffixes]
world_only_subffixes = self.cfg.world_only_subffixes or []
only_paths = [f"{env_prim_path}/{sub}" for sub in world_only_subffixes]

ignore_list = [f"{env_prim_path}/{sub}" for sub in self.cfg.world_ignore_subffixes] or [
world_ignore_subffixes = self.cfg.world_ignore_subffixes or []
ignore_list = [f"{env_prim_path}/{sub}" for sub in world_ignore_subffixes] or [
f"{env_prim_path}/target",
"/World/defaultGroundPlane",
"/curobo",
]
ignore_list.append(robot_prim_path)

self._static_world_config = self.usd_helper.get_obstacles_from_stage(
world_cfg = self.usd_helper.get_obstacles_from_stage(
only_paths=only_paths,
reference_prim_path=robot_prim_path,
ignore_substring=ignore_list,
)
self._static_world_config = self._static_world_config.get_collision_check_world()
self._static_world_config = world_cfg.get_collision_check_world()
self.motion_gen.update_world(self._static_world_config)
self._invalidate_object_mapping_cache()

def _get_object_mappings(self) -> dict[str, list[str]]:
"""Map IsaacLab scene object names to cuRobo world obstacle names.

Returns:
Dictionary mapping IsaacLab scene object names to list of cuRobo world obstacle names.
"""

if self._cached_object_mappings is not None:
return self._cached_object_mappings

world_model = self.motion_gen.world_coll_checker.world_model
rigid_objects = self._env.scene.rigid_objects

env_prefix = f"/World/envs/env_{self._env_id}/Scene"
world_object_paths: list[str] = []
for primitive_type in self.primitive_types:
primitive_list = getattr(world_model, primitive_type, None)
if not primitive_list:
continue
for primitive in primitive_list:
primitive_name = primitive.name
if env_prefix in primitive_name:
world_object_paths.append(primitive_name)

mappings: dict[str, list[str]] = {}
for object_name in rigid_objects.keys():
mappings[object_name] = []
for world_path in world_object_paths:
if world_path.startswith(f"{env_prefix}/{object_name}"):
mappings[object_name].append(world_path)

self._cached_object_mappings = mappings
self._logger.debug(f"Object mappings built: {mappings}")
return mappings

def _invalidate_object_mapping_cache(self) -> None:
"""Invalidate cached object-name mapping used for dynamic sync."""

self._cached_object_mappings = None

def sync_dynamic_objects(self) -> int:
"""Synchronize dynamic object poses into cuRobo world model.

Returns:
Number of obstacles whose pose was updated.
"""

object_mappings = self._get_object_mappings()
if not object_mappings:
return 0

rigid_objects = self._env.scene.rigid_objects
robot_root_pos_in_world, robot_root_quat_in_world = self._robot.data.root_pos_w, self._robot.data.root_quat_w

updated_count = 0
for object_name, world_obstacle_names in object_mappings.items():
obj = rigid_objects[object_name]
# NOTE: cuRobo world model is in the robot-root frame
obj_pos_in_world, obj_quat_in_world = obj.data.root_pos_w, obj.data.root_quat_w
obj_pos_in_robot_root, obj_quat_in_robot_root = subtract_frame_transforms(
robot_root_pos_in_world, robot_root_quat_in_world, obj_pos_in_world, obj_quat_in_world
)
obj_pose = Pose(
position=self._to_curobo_device(obj_pos_in_robot_root[self._env_id]),
quaternion=self._to_curobo_device(obj_quat_in_robot_root[self._env_id]),
)
for world_obstacle_name in world_obstacle_names:
self.motion_gen.world_coll_checker.update_obstacle_pose(
world_obstacle_name,
obj_pose,
env_idx=self._env_id,
update_cpu_reference=True,
)
updated_count += 1

return updated_count

def plan_motion(
self,
Expand All @@ -189,6 +274,10 @@ def plan_motion(
JointState of the trajectory or None if planning failed
"""

# Dynamic object pose sync before planning (cheap, incremental).
if self.cfg.enable_dynamic_world_sync:
self.sync_dynamic_objects()

if current_qd is None:
current_qd = torch.zeros_like(current_q)
dof_needed = len(self.target_joint_names)
Expand Down Expand Up @@ -233,11 +322,26 @@ def plan_motion(
for link_name, pose in link_goals.items()
}

# Build per-call plan config: clone only when we need to attach a pose_cost_metric
# so the shared self.plan_config is never mutated.
if self.cfg.reach_partial_pose_weight is not None:
weights = torch.tensor(
self.cfg.reach_partial_pose_weight,
device=self.tensor_args.device,
dtype=self.tensor_args.dtype,
)
pose_metric = PoseCostMetric(reach_partial_pose=True, reach_vec_weight=weights)
active_plan_config = self.plan_config.clone()
active_plan_config.pose_cost_metric = pose_metric
self._logger.debug(f"reach_partial_pose_weight applied: {self.cfg.reach_partial_pose_weight}")
else:
active_plan_config = self.plan_config

# execute planning
result = self.motion_gen.plan_single(
current_joint_state.unsqueeze(0),
goal,
self.plan_config,
active_plan_config,
link_poses=link_poses,
)

Expand Down Expand Up @@ -382,6 +486,82 @@ def solve_ik_batch(
}
return self.motion_gen.ik_solver.solve_batch(goal, link_poses=link_poses)

def plan_to_joint_config(
self,
target_q: torch.Tensor,
current_q: torch.Tensor,
current_qd: torch.Tensor | None = None,
) -> JointState | None:
"""Plan a joint-space trajectory to a target joint configuration.

Unlike plan_motion which targets a Cartesian pose, this plans directly
in joint space using cuRobo's plan_single_js. Used by RetractSkill to
move the robot to a predefined retract configuration.

Args:
target_q: Target joint positions, shape [dof].
current_q: Current joint positions, shape [dof].
current_qd: Current joint velocities, shape [dof]. Defaults to zeros.

Returns:
JointState of the trajectory or None if planning failed.
"""

if current_qd is None:
current_qd = torch.zeros_like(current_q)

dof_needed = len(self.target_joint_names)
for name, q in [("current_q", current_q), ("target_q", target_q)]:
if len(q) < dof_needed:
pad = torch.zeros(dof_needed - len(q), dtype=q.dtype)
q = torch.concatenate([q, pad], axis=0)
elif len(q) > dof_needed:
q = q[:dof_needed]
if name == "current_q":
current_q = q
else:
target_q = q

if len(current_qd) < dof_needed:
current_qd = torch.concatenate(
[current_qd, torch.zeros(dof_needed - len(current_qd), dtype=current_qd.dtype)]
)
elif len(current_qd) > dof_needed:
current_qd = current_qd[:dof_needed]

start_state = JointState(
position=self._to_curobo_device(current_q),
velocity=self._to_curobo_device(current_qd) * 0.0,
acceleration=self._to_curobo_device(current_qd) * 0.0,
jerk=self._to_curobo_device(current_qd) * 0.0,
joint_names=self.target_joint_names,
)
goal_state = JointState(
position=self._to_curobo_device(target_q),
velocity=self._to_curobo_device(torch.zeros_like(target_q)),
acceleration=self._to_curobo_device(torch.zeros_like(target_q)),
jerk=self._to_curobo_device(torch.zeros_like(target_q)),
joint_names=self.target_joint_names,
)

start_state = start_state.get_ordered_joint_state(self.target_joint_names)
goal_state = goal_state.get_ordered_joint_state(self.target_joint_names)

result = self.motion_gen.plan_single_js(
start_state.unsqueeze(0),
goal_state.unsqueeze(0),
self.plan_config.clone(),
)

if result.success.item():
current_plan = result.get_interpolated_plan()
motion_plan = current_plan.get_ordered_joint_state(self.target_joint_names)
self._logger.debug(f"joint-space planning succeeded with {len(motion_plan.position)} waypoints")
return motion_plan
else:
self._logger.warning(f"joint-space planning failed: {result.status}")
return None

def reset(self):
"""reset the planner state"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ class CuroboPlannerCfg:
"""Maximum number of planning attempts."""
time_dilation_factor: float = 0.5
"""Time dilation factor for planning."""

reach_partial_pose_weight: list[float] | None = None
"""Per-axis weights [rx, ry, rz, px, py, pz] for partial-pose reaching via cuRobo PoseCostMetric.
Setting a weight to 0.0 relaxes that axis (e.g. [0,0,0,1,1,1] for position-only reaching)."""
# Optional prim path configuration
robot_prim_path: str | None = None
"""Absolute USD prim path to the robot root for world extraction; None derives it from environment root."""
Expand All @@ -55,6 +57,10 @@ class CuroboPlannerCfg:
world_ignore_subffixes: list[str] | None = None
"""List of subffixes to ignore when extracting world obstacles."""

# World update strategy
enable_dynamic_world_sync: bool = False
"""If True, synchronize dynamic object poses into cuRobo world before planning (fast incremental update)."""

# Debug and visualization
debug_planner: bool = False
"""Enable detailed motion planning debug information."""
Expand Down
Loading
Loading