Skip to content

Commit c602e77

Browse files
committed
Merge remote-tracking branch 'upstream/develop' into esekkin/camera-refactor
2 parents fba12ee + 5673af0 commit c602e77

26 files changed

Lines changed: 562 additions & 192 deletions

scripts/imitation_learning/isaaclab_mimic/generate_dataset.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,21 @@
3030
parser.add_argument(
3131
"--pause_subtask",
3232
action="store_true",
33-
help="pause after every subtask during generation for debugging - only useful with render flag",
33+
help="Pause after every subtask during generation for debugging - only useful with render flag",
3434
)
3535
parser.add_argument(
3636
"--use_skillgen",
3737
action="store_true",
3838
default=False,
39-
help="use skillgen to generate motion trajectories",
39+
help="Use skillgen to generate motion trajectories",
4040
)
41+
parser.add_argument(
42+
"--disable_dataset_compression",
43+
action="store_true",
44+
default=False,
45+
help="Disables dataset compression",
46+
)
47+
4148
# append AppLauncher cli args
4249
AppLauncher.add_app_launcher_args(parser)
4350
# parse the arguments
@@ -88,6 +95,7 @@ def main():
8895
num_envs=num_envs,
8996
device=args_cli.device,
9097
generation_num_trials=args_cli.generation_num_trials,
98+
dataset_compression=not args_cli.disable_dataset_compression,
9199
)
92100

93101
# Create environment
@@ -158,6 +166,7 @@ def main():
158166
async_components["action_queue"],
159167
async_components["info_pool"],
160168
async_components["event_loop"],
169+
data_gen_tasks=data_gen_tasks,
161170
)
162171
except asyncio.CancelledError:
163172
print("Tasks were cancelled.")

source/isaaclab/docs/CHANGELOG.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ Removed
3434
now managed internally by the renderer backend and are not part of the public API.
3535

3636

37+
4.5.29 (2026-04-10)
38+
~~~~~~~~~~~~~~~~~~~
39+
40+
Added
41+
^^^^^
42+
43+
* Added flag to toggle dataset compression in RecorderManager and dataset file handler.
44+
45+
Changed
46+
^^^^^^^
47+
48+
* Changed RecorderManager to clone value tensors before adding to episode data, removing multiple clones in ``episodes.add()`` and replacing with a single clone.
49+
50+
3751
4.5.28 (2026-04-10)
3852
~~~~~~~~~~~~~~~~~~~
3953

source/isaaclab/isaaclab/managers/recorder_manager.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class RecorderManagerBaseCfg:
5555
export_in_close: bool = False
5656
"""Whether to export episodes in the close call."""
5757

58+
dataset_compression: bool = True
59+
"""Enable dataset compression."""
60+
5861

5962
class RecorderTerm(ManagerTermBase):
6063
"""Base class for recorder terms.
@@ -335,13 +338,14 @@ def add_to_episodes(self, key: str, value: torch.Tensor | dict, env_ids: Sequenc
335338
self.add_to_episodes(f"{key}/{sub_key}", sub_value, env_ids)
336339
return
337340

341+
if isinstance(value, wp.array):
342+
value = wp.to_torch(value)
343+
value = value.clone() # Clone once for all envs
338344
for value_index, env_id in enumerate(env_ids):
339345
if env_id not in self._episodes:
340346
self._episodes[env_id] = EpisodeData()
341347
self._episodes[env_id].env_id = env_id
342-
if isinstance(value, wp.array):
343-
value = wp.to_torch(value)
344-
self._episodes[env_id].add(key, value[value_index])
348+
self._episodes[env_id].add(key, value[value_index], clone=False)
345349

346350
def set_success_to_episodes(self, env_ids: Sequence[int] | None, success_values: torch.Tensor):
347351
"""Sets the task success values to the episodes for the given environment ids.
@@ -513,7 +517,9 @@ def export_episodes(self, env_ids: Sequence[int] | None = None, demo_ids: Sequen
513517
if target_dataset_file_handler is not None:
514518
# Use corresponding demo_id if provided, otherwise None
515519
current_demo_id = demo_ids[i] if demo_ids is not None else None
516-
target_dataset_file_handler.write_episode(self._episodes[env_id], current_demo_id)
520+
target_dataset_file_handler.write_episode(
521+
self._episodes[env_id], current_demo_id, self.cfg.dataset_compression
522+
)
517523
need_to_flush = True
518524
# Update episode count
519525
if episode_succeeded:
@@ -567,6 +573,7 @@ def _prepare_terms(self):
567573
"dataset_export_mode",
568574
"export_in_record_pre_reset",
569575
"export_in_close",
576+
"dataset_compression",
570577
]:
571578
continue
572579
# check if term config is None

source/isaaclab/isaaclab/utils/datasets/episode_data.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,6 @@
33
#
44
# SPDX-License-Identifier: BSD-3-Clause
55

6-
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
7-
# All rights reserved.
8-
#
9-
# SPDX-License-Identifier: BSD-3-Clause
10-
116
from __future__ import annotations
127

138
import torch
@@ -90,7 +85,7 @@ def is_empty(self):
9085
"""Check if the episode data is empty."""
9186
return not bool(self._data)
9287

93-
def add(self, key: str, value: torch.Tensor | dict):
88+
def add(self, key: str, value: torch.Tensor | dict, clone: bool = True):
9489
"""Add a key-value pair to the dataset.
9590
9691
The key can be nested by using the "/" character.
@@ -99,23 +94,25 @@ def add(self, key: str, value: torch.Tensor | dict):
9994
Args:
10095
key: The key name.
10196
value: The corresponding value of tensor type or of dict type.
97+
clone: Whether to clone the tensor value before storing it in the episode data.
10298
"""
10399
# check datatype
104100
if isinstance(value, dict):
105101
for sub_key, sub_value in value.items():
106-
self.add(f"{key}/{sub_key}", sub_value)
102+
self.add(f"{key}/{sub_key}", sub_value, clone=clone)
107103
return
108104

105+
stored = value.clone() if (clone and isinstance(value, torch.Tensor)) else value
109106
sub_keys = key.split("/")
110107
current_dataset_pointer = self._data
111108
for sub_key_index in range(len(sub_keys)):
112109
if sub_key_index == len(sub_keys) - 1:
113110
# Add value to the final dict layer
114111
# Use lists to prevent slow tensor copy during concatenation
115112
if sub_keys[sub_key_index] not in current_dataset_pointer:
116-
current_dataset_pointer[sub_keys[sub_key_index]] = [value.clone()]
113+
current_dataset_pointer[sub_keys[sub_key_index]] = [stored]
117114
else:
118-
current_dataset_pointer[sub_keys[sub_key_index]].append(value.clone())
115+
current_dataset_pointer[sub_keys[sub_key_index]].append(stored)
119116
break
120117
# key index
121118
if sub_keys[sub_key_index] not in current_dataset_pointer:

source/isaaclab/isaaclab/utils/datasets/hdf5_dataset_file_handler.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,6 @@
33
#
44
# SPDX-License-Identifier: BSD-3-Clause
55

6-
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
7-
# All rights reserved.
8-
#
9-
# SPDX-License-Identifier: BSD-3-Clause
10-
116
from __future__ import annotations
127

138
import json
@@ -210,7 +205,7 @@ def load_dataset_helper(group, path=""):
210205

211206
return episode
212207

213-
def write_episode(self, episode: EpisodeData, demo_id: int | None = None):
208+
def write_episode(self, episode: EpisodeData, demo_id: int | None = None, dataset_compression: bool = True):
214209
"""Add an episode to the dataset.
215210
216211
Args:
@@ -251,7 +246,10 @@ def create_dataset_helper(group, key, value):
251246
for sub_key, sub_value in value.items():
252247
create_dataset_helper(key_group, sub_key, sub_value)
253248
else:
254-
group.create_dataset(key, data=value.cpu().numpy(), compression="gzip")
249+
if dataset_compression:
250+
group.create_dataset(key, data=value.cpu().numpy(), compression="gzip", compression_opts=2)
251+
else:
252+
group.create_dataset(key, data=value.cpu().numpy())
255253

256254
for key, value in episode.data.items():
257255
create_dataset_helper(h5_episode_group, key, value)

source/isaaclab_mimic/config/extension.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22

33
# Semantic Versioning is used: https://semver.org/
4-
version = "1.2.3"
4+
version = "1.2.4"
55

66
# Description
77
category = "isaaclab"

source/isaaclab_mimic/docs/CHANGELOG.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,21 @@
11
Changelog
22
---------
33

4+
1.2.4 (2026-04-06)
5+
~~~~~~~~~~~~~~~~~~~
6+
7+
Changed
8+
^^^^^^^
9+
10+
* Made performance enhancing changes to data generation pipeline (elimate large tensor usage, reduce asyncio overhead and blocking)
11+
* Locked h5py dependency to last stable version 3.15.1 to prevent package import errors on Windows with version 3.16.0.
12+
13+
Added
14+
^^^^^
15+
16+
* Added data generation test cases for all tasks (single and multi environment).
17+
18+
419
1.2.3 (2026-03-12)
520
~~~~~~~~~~~~~~~~~~~
621

source/isaaclab_mimic/isaaclab_mimic/datagen/data_generator.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""Base class for data generator."""
77

88
import asyncio
9+
import contextlib
910
import copy
1011
import logging
1112
from typing import Any
@@ -33,6 +34,16 @@
3334
from .datagen_info_pool import DataGenInfoPool
3435

3536

37+
@contextlib.asynccontextmanager
38+
async def _optional_lock(lock):
39+
"""Async context manager that acquires the lock only if it is not None."""
40+
if lock is not None:
41+
async with lock:
42+
yield
43+
else:
44+
yield
45+
46+
3647
def transform_source_data_segment_using_delta_object_pose(
3748
src_eef_poses: torch.Tensor,
3849
delta_obj_pose: torch.Tensor,
@@ -664,10 +675,7 @@ async def generate( # noqa: C901
664675
for subtask_constraint in self.env_cfg.task_constraint_configs:
665676
runtime_subtask_constraints_dict.update(subtask_constraint.generate_runtime_subtask_constraints())
666677

667-
# save generated data in these variables
668-
generated_states = []
669-
generated_obs = []
670-
generated_actions = []
678+
# Track if the generated trajectory was successful
671679
generated_success = False
672680

673681
# some eef-specific state variables used during generation
@@ -694,7 +702,8 @@ async def generate( # noqa: C901
694702

695703
# While loop that runs per time step
696704
while True:
697-
async with self.src_demo_datagen_info_pool.asyncio_lock:
705+
await asyncio.sleep(0)
706+
async with _optional_lock(self.src_demo_datagen_info_pool.asyncio_lock):
698707
if len(self.src_demo_datagen_info_pool.datagen_infos) > prev_src_demo_datagen_info_pool_size:
699708
# src_demo_datagen_info_pool at this point may be updated with new demos,
700709
# So we need to update subtask boundaries again
@@ -871,20 +880,18 @@ async def generate( # noqa: C901
871880
eef_waypoint_dict[eef_name] = waypoint
872881
multi_waypoint = MultiWaypoint(eef_waypoint_dict)
873882

883+
await asyncio.sleep(0)
884+
874885
# Execute the next waypoints for all eefs
875-
exec_results = await multi_waypoint.execute(
886+
exec_success = await multi_waypoint.execute(
876887
env=self.env,
877888
success_term=success_term,
878889
env_id=env_id,
879890
env_action_queue=env_action_queue,
880891
)
881892

882-
# Update execution state buffers
883-
if len(exec_results["states"]) > 0:
884-
generated_states.extend(exec_results["states"])
885-
generated_obs.extend(exec_results["observations"])
886-
generated_actions.extend(exec_results["actions"])
887-
generated_success = generated_success or exec_results["success"]
893+
# Update success state
894+
generated_success = generated_success or exec_success
888895

889896
# Get the navigation state
890897
if self.env_cfg.datagen_config.use_navigation_controller:
@@ -983,10 +990,6 @@ async def generate( # noqa: C901
983990
if all(eef_subtasks_done.values()):
984991
break
985992

986-
# Merge numpy arrays
987-
if len(generated_actions) > 0:
988-
generated_actions = torch.cat(generated_actions, dim=0)
989-
990993
# Set success to the recorded episode data and export to file
991994
self.env.recorder_manager.set_success_to_episodes(
992995
env_id_tensor, torch.tensor([[generated_success]], dtype=torch.bool, device=self.env.device)
@@ -996,9 +999,6 @@ async def generate( # noqa: C901
996999

9971000
results = dict(
9981001
initial_state=new_initial_state,
999-
states=generated_states,
1000-
observations=generated_obs,
1001-
actions=generated_actions,
10021002
success=generated_success,
10031003
)
10041004
return results

source/isaaclab_mimic/isaaclab_mimic/datagen/generation.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def env_loop(
7878
env_action_queue: asyncio.Queue,
7979
shared_datagen_info_pool: DataGenInfoPool,
8080
asyncio_event_loop: asyncio.AbstractEventLoop,
81+
data_gen_tasks: asyncio.Future | None = None,
8182
):
8283
"""Main asyncio loop for the environment.
8384
@@ -87,6 +88,8 @@ def env_loop(
8788
env_action_queue: The asyncio queue to handle actions to for executing actions.
8889
shared_datagen_info_pool: The shared datagen info pool that stores source demo info.
8990
asyncio_event_loop: The main asyncio event loop.
91+
data_gen_tasks: The gathered async data generation future. When provided, the loop
92+
will exit early if all tasks finish unexpectedly (e.g. due to an unhandled exception).
9093
"""
9194
global num_success, num_failures, num_attempts
9295
env_id_tensor = torch.tensor([0], dtype=torch.int64, device=env.device)
@@ -97,17 +100,20 @@ def env_loop(
97100
# check if any environment needs to be reset while waiting for actions
98101
while env_action_queue.qsize() != env.num_envs:
99102
asyncio_event_loop.run_until_complete(asyncio.sleep(0))
103+
if data_gen_tasks is not None and data_gen_tasks.done():
104+
exc = data_gen_tasks.exception()
105+
if exc is not None:
106+
raise exc
107+
return
100108
while not env_reset_queue.empty():
101109
env_id_tensor[0] = env_reset_queue.get_nowait()
102110
env.reset(env_ids=env_id_tensor)
103111
env_reset_queue.task_done()
104112

105113
actions = torch.zeros(env.action_space.shape)
106-
107-
# get actions from all the data generators
108-
for i in range(env.num_envs):
109-
# an async-blocking call to get an action from a data generator
110-
env_id, action = asyncio_event_loop.run_until_complete(env_action_queue.get())
114+
get_tasks = [env_action_queue.get() for _ in range(env.num_envs)]
115+
results = asyncio_event_loop.run_until_complete(asyncio.gather(*get_tasks))
116+
for env_id, action in results:
111117
actions[env_id] = action
112118

113119
# perform action on environment
@@ -152,6 +158,7 @@ def setup_env_config(
152158
device: str,
153159
generation_num_trials: int | None = None,
154160
recorder_cfg: RecorderManagerBaseCfg | None = None,
161+
dataset_compression: bool = True,
155162
) -> tuple[Any, Any]:
156163
"""Configure the environment for data generation.
157164
@@ -162,6 +169,8 @@ def setup_env_config(
162169
num_envs: Number of environments to run
163170
device: Device to run on
164171
generation_num_trials: Optional override for number of trials
172+
recorder_cfg: Recorder manager configuration
173+
dataset_compression: Whether to enable dataset compression
165174
166175
Returns:
167176
tuple containing:
@@ -198,6 +207,8 @@ def setup_env_config(
198207
env_cfg.recorders.dataset_export_dir_path = output_dir
199208
env_cfg.recorders.dataset_filename = output_file_name
200209

210+
env_cfg.recorders.dataset_compression = dataset_compression
211+
201212
if env_cfg.datagen_config.generation_keep_failed:
202213
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_FAILED_IN_SEPARATE_FILES
203214
else:
@@ -230,8 +241,7 @@ def setup_async_generation(
230241
asyncio_event_loop = asyncio.get_event_loop()
231242
env_reset_queue = asyncio.Queue()
232243
env_action_queue = asyncio.Queue()
233-
shared_datagen_info_pool_lock = asyncio.Lock()
234-
shared_datagen_info_pool = DataGenInfoPool(env, env.cfg, env.device, asyncio_lock=shared_datagen_info_pool_lock)
244+
shared_datagen_info_pool = DataGenInfoPool(env, env.cfg, env.device)
235245
shared_datagen_info_pool.load_from_dataset_file(input_file)
236246
print(f"Loaded {shared_datagen_info_pool.num_datagen_infos} to datagen info pool")
237247

0 commit comments

Comments
 (0)