Skip to content

Commit eed1613

Browse files
authored
Merge pull request #34 from BerkeleyAutomation/example/oxe-demo
Example/oxe-demo
2 parents 83cedc5 + 5769772 commit eed1613

33 files changed

Lines changed: 7055 additions & 4170 deletions

INGESTION_API.md

Lines changed: 548 additions & 0 deletions
Large diffs are not rendered by default.

example_codec_usage.py

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Example script demonstrating the new codec abstraction system.
4+
5+
This shows how to use different raw data codecs for non-image data:
6+
1. pickle_raw (legacy behavior) - each data point is pickled individually
7+
2. pyarrow_batch - batches data points for better seeking performance
8+
"""
9+
10+
import numpy as np
11+
import tempfile
12+
import os
13+
from pathlib import Path
14+
15+
# Add the project directory to the Python path
16+
import sys
17+
sys.path.insert(0, str(Path(__file__).parent))
18+
19+
from robodm import Trajectory, FeatureType
20+
from robodm.backend.codec_config import CodecConfig
21+
22+
def demo_pickle_codec():
23+
"""Demonstrate the pickle-based raw codec (legacy behavior)"""
24+
print("=== Pickle Raw Codec Demo ===")
25+
26+
with tempfile.TemporaryDirectory() as temp_dir:
27+
path = os.path.join(temp_dir, "pickle_demo.vla")
28+
29+
# Create trajectory with pickle-based raw codec
30+
traj = Trajectory(path, mode="w", video_codec="rawvideo_pickle")
31+
32+
# Add some test data
33+
for i in range(10):
34+
# Non-image data - will use raw codec
35+
vector_data = np.random.rand(5).astype(np.float32)
36+
joint_positions = np.array([i, i+1, i+2], dtype=np.float32)
37+
38+
traj.add("sensor/vector", vector_data, timestamp=i*100)
39+
traj.add("robot/joints", joint_positions, timestamp=i*100)
40+
41+
traj.close()
42+
43+
# Read back and verify
44+
traj_read = Trajectory(path, mode="r")
45+
data = traj_read.load()
46+
traj_read.close()
47+
48+
print(f"Loaded {len(data)} features:")
49+
for key, values in data.items():
50+
print(f" {key}: shape={values.shape}, dtype={values.dtype}")
51+
52+
file_size = os.path.getsize(path)
53+
print(f"File size: {file_size} bytes")
54+
55+
return file_size
56+
57+
58+
def demo_pyarrow_codec():
59+
"""Demonstrate the PyArrow-based raw codec with batching"""
60+
print("\n=== PyArrow Batch Codec Demo ===")
61+
62+
try:
63+
import pyarrow # Check if PyArrow is available
64+
except ImportError:
65+
print("PyArrow not available - skipping demo")
66+
return None
67+
68+
with tempfile.TemporaryDirectory() as temp_dir:
69+
path = os.path.join(temp_dir, "pyarrow_demo.vla")
70+
71+
# Create trajectory with PyArrow-based raw codec
72+
traj = Trajectory(path, mode="w", video_codec="rawvideo_pyarrow")
73+
74+
# Add the same test data
75+
for i in range(10):
76+
# Non-image data - will use raw codec
77+
vector_data = np.random.rand(5).astype(np.float32)
78+
joint_positions = np.array([i, i+1, i+2], dtype=np.float32)
79+
80+
traj.add("sensor/vector", vector_data, timestamp=i*100)
81+
traj.add("robot/joints", joint_positions, timestamp=i*100)
82+
83+
traj.close()
84+
85+
# Read back and verify
86+
traj_read = Trajectory(path, mode="r")
87+
data = traj_read.load()
88+
traj_read.close()
89+
90+
print(f"Loaded {len(data)} features:")
91+
for key, values in data.items():
92+
print(f" {key}: shape={values.shape}, dtype={values.dtype}")
93+
94+
file_size = os.path.getsize(path)
95+
print(f"File size: {file_size} bytes")
96+
97+
return file_size
98+
99+
100+
def demo_mixed_data():
101+
"""Demonstrate mixed RGB image and raw data with different codecs"""
102+
print("\n=== Mixed Data Demo ===")
103+
104+
with tempfile.TemporaryDirectory() as temp_dir:
105+
path = os.path.join(temp_dir, "mixed_demo.vla")
106+
107+
# Create trajectory with default codec selection
108+
traj = Trajectory(path, mode="w", video_codec="auto")
109+
110+
# Add mixed data
111+
for i in range(5):
112+
# RGB image - will use video codec
113+
rgb_image = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8)
114+
115+
# Non-image data - will use raw codec
116+
vector_data = np.random.rand(10).astype(np.float32)
117+
depth_data = np.random.rand(32, 32).astype(np.float32) # Grayscale
118+
119+
traj.add("camera/rgb", rgb_image, timestamp=i*100)
120+
traj.add("sensor/vector", vector_data, timestamp=i*100)
121+
traj.add("camera/depth", depth_data, timestamp=i*100)
122+
123+
traj.close()
124+
125+
# Read back and verify
126+
traj_read = Trajectory(path, mode="r")
127+
data = traj_read.load()
128+
traj_read.close()
129+
130+
print(f"Loaded {len(data)} features:")
131+
for key, values in data.items():
132+
print(f" {key}: shape={values.shape}, dtype={values.dtype}")
133+
134+
file_size = os.path.getsize(path)
135+
print(f"File size: {file_size} bytes")
136+
137+
return file_size
138+
139+
140+
def demo_codec_config():
141+
"""Demonstrate custom codec configuration"""
142+
print("\n=== Custom Codec Configuration Demo ===")
143+
144+
# Create custom codec config
145+
config = CodecConfig(codec="rawvideo_pyarrow", options={
146+
"batch_size": 50, # Smaller batches
147+
"compression": "lz4" # Different compression
148+
})
149+
150+
with tempfile.TemporaryDirectory() as temp_dir:
151+
path = os.path.join(temp_dir, "custom_config_demo.vla")
152+
153+
# Create trajectory with custom config
154+
traj = Trajectory(path, mode="w", codec_config=config)
155+
156+
# Add test data
157+
for i in range(20):
158+
vector_data = np.random.rand(8).astype(np.float32)
159+
traj.add("sensor/data", vector_data, timestamp=i*50)
160+
161+
traj.close()
162+
163+
# Read back and verify
164+
traj_read = Trajectory(path, mode="r")
165+
data = traj_read.load()
166+
traj_read.close()
167+
168+
print(f"Loaded {len(data)} features:")
169+
for key, values in data.items():
170+
print(f" {key}: shape={values.shape}, dtype={values.dtype}")
171+
172+
file_size = os.path.getsize(path)
173+
print(f"File size: {file_size} bytes")
174+
175+
return file_size
176+
177+
178+
if __name__ == "__main__":
179+
print("Codec Abstraction System Demo")
180+
print("=" * 50)
181+
182+
pickle_size = demo_pickle_codec()
183+
pyarrow_size = demo_pyarrow_codec()
184+
mixed_size = demo_mixed_data()
185+
custom_size = demo_codec_config()
186+
187+
print("\n=== Summary ===")
188+
print(f"Pickle codec file size: {pickle_size} bytes")
189+
if pyarrow_size is not None:
190+
print(f"PyArrow codec file size: {pyarrow_size} bytes")
191+
if pickle_size:
192+
compression_ratio = pickle_size / pyarrow_size
193+
print(f"Compression ratio: {compression_ratio:.2f}x")
194+
print(f"Mixed data file size: {mixed_size} bytes")
195+
print(f"Custom config file size: {custom_size} bytes")
196+
197+
print("\nDemo completed successfully!")

examples/oxe_conversion.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import os
2+
import tempfile
3+
4+
import numpy as np
5+
import tensorflow as tf
6+
import tensorflow_datasets as tfds
7+
8+
import robodm
9+
10+
# Prevent tensorflow from allocating GPU memory
11+
tf.config.set_visible_devices([], "GPU")
12+
13+
import logging
14+
logging.basicConfig(level=logging.DEBUG)
15+
logging.getLogger("robodm").setLevel(logging.DEBUG)
16+
17+
18+
def main():
19+
"""
20+
This example demonstrates converting an Open-X Embodiment (OXE)
21+
dataset episode to robodm format and loading it back.
22+
"""
23+
24+
def _transpose_list_of_dicts(list_of_dicts):
25+
"""Converts a list of nested dictionaries to a nested dictionary of lists."""
26+
if not list_of_dicts:
27+
return {}
28+
29+
# Base case: if the first element is not a dictionary, it's a leaf.
30+
if not isinstance(list_of_dicts[0], dict):
31+
return list_of_dicts
32+
33+
dict_of_lists = {}
34+
# Assume all dicts in the list have the same keys as the first one.
35+
for key in list_of_dicts[0].keys():
36+
# Recursively process the values for each key.
37+
dict_of_lists[key] = _transpose_list_of_dicts(
38+
[d[key] for d in list_of_dicts]
39+
)
40+
return dict_of_lists
41+
42+
# 1. Load an episode from an OXE dataset
43+
# We use `fractal20220817_data/bridge_from_patak_to_aloha_space` as used in the
44+
# reference notebook.
45+
# NOTE: This might take a significant amount of time on the first run
46+
# as it needs to download the dataset index and relevant files.
47+
print("Loading OXE dataset from tensorflow_datasets...")
48+
builder = tfds.builder_from_directory(builder_dir=
49+
"gs://gresearch/robotics/fractal20220817_data/0.1.0"
50+
)
51+
52+
# Load the first episode from the training split.
53+
ds = builder.as_dataset(split="train[:1]")
54+
episode = next(iter(tfds.as_numpy(ds)))
55+
56+
# The episode contains 'steps' which is a tf.data.Dataset object.
57+
# We first convert it into a list of step dictionaries.
58+
steps_list = list(episode["steps"])
59+
60+
if not steps_list:
61+
print("Episode is empty, exiting.")
62+
return
63+
64+
# Now, we transpose this list of dictionaries into a dictionary of lists.
65+
# This is the format `from_dict_of_lists` expects.
66+
episode_steps = _transpose_list_of_dicts(steps_list)
67+
68+
num_steps = len(episode_steps["observation"]["image"])
69+
print(f"Loaded episode with {num_steps} steps.")
70+
71+
# Let's check the shape of an image from the original dataset
72+
original_image_shape = episode_steps["observation"]["image"][0].shape
73+
print(f"Original image shape: {original_image_shape}")
74+
75+
# 2. Convert to robodm format and save
76+
path = "./oxe_bridge_example.vla" #os.path.join(tempfile.gettempdir(), "oxe_bridge_example.vla")
77+
print(f"Converting and saving to {path}...")
78+
79+
# `from_dict_of_lists` is perfect for this. It takes a dictionary
80+
# where keys are feature names and values are lists (or arrays) of data
81+
# for each timestep. The nested dictionary from OXE is flattened automatically.
82+
robodm.Trajectory.from_dict_of_lists(data=episode_steps, path=path, video_codec="libx264")
83+
print("Conversion successful.")
84+
85+
# 3. Load the trajectory back
86+
print("Loading trajectory back with robodm...")
87+
traj = robodm.Trajectory(path=path, mode="r")
88+
loaded_data = traj.load()
89+
traj.close()
90+
91+
# 4. Verify the loaded data
92+
loaded_num_steps = len(loaded_data["observation/image"])
93+
print(f"Loaded trajectory with {loaded_num_steps} timesteps")
94+
print(f"Image shape from robodm: {loaded_data['observation/image'][0].shape}")
95+
print(f"Loaded keys: {loaded_data.keys()}")
96+
97+
# write all images to disk
98+
for i in range(loaded_num_steps):
99+
from PIL import Image
100+
import os
101+
os.makedirs("images", exist_ok=True)
102+
image = loaded_data["observation/image"][i]
103+
image = image.astype(np.uint8)
104+
image = Image.fromarray(image)
105+
image.save(f"images/image_{i}.png")
106+
107+
# Compare shapes and number of steps
108+
assert loaded_num_steps == num_steps
109+
assert loaded_data["observation/image"][0].shape == original_image_shape
110+
print("\nVerification successful: Number of steps and image shapes match.")
111+
112+
# Clean up
113+
# os.remove(path)
114+
print(f"Cleaned up temporary file: {path}")
115+
116+
117+
if __name__ == "__main__":
118+
main()

0 commit comments

Comments
 (0)