Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
548 changes: 548 additions & 0 deletions INGESTION_API.md

Large diffs are not rendered by default.

197 changes: 197 additions & 0 deletions example_codec_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""
Example script demonstrating the new codec abstraction system.

This shows how to use different raw data codecs for non-image data:
1. pickle_raw (legacy behavior) - each data point is pickled individually
2. pyarrow_batch - batches data points for better seeking performance
"""

import numpy as np
import tempfile
import os
from pathlib import Path

# Add the project directory to the Python path
import sys
sys.path.insert(0, str(Path(__file__).parent))

from robodm import Trajectory, FeatureType
from robodm.backend.codec_config import CodecConfig

def demo_pickle_codec():
"""Demonstrate the pickle-based raw codec (legacy behavior)"""
print("=== Pickle Raw Codec Demo ===")

with tempfile.TemporaryDirectory() as temp_dir:
path = os.path.join(temp_dir, "pickle_demo.vla")

# Create trajectory with pickle-based raw codec
traj = Trajectory(path, mode="w", video_codec="rawvideo_pickle")

# Add some test data
for i in range(10):
# Non-image data - will use raw codec
vector_data = np.random.rand(5).astype(np.float32)
joint_positions = np.array([i, i+1, i+2], dtype=np.float32)

traj.add("sensor/vector", vector_data, timestamp=i*100)
traj.add("robot/joints", joint_positions, timestamp=i*100)

traj.close()

# Read back and verify
traj_read = Trajectory(path, mode="r")
data = traj_read.load()
traj_read.close()

print(f"Loaded {len(data)} features:")
for key, values in data.items():
print(f" {key}: shape={values.shape}, dtype={values.dtype}")

file_size = os.path.getsize(path)
print(f"File size: {file_size} bytes")

return file_size


def demo_pyarrow_codec():
"""Demonstrate the PyArrow-based raw codec with batching"""
print("\n=== PyArrow Batch Codec Demo ===")

try:
import pyarrow # Check if PyArrow is available
except ImportError:
print("PyArrow not available - skipping demo")
return None

with tempfile.TemporaryDirectory() as temp_dir:
path = os.path.join(temp_dir, "pyarrow_demo.vla")

# Create trajectory with PyArrow-based raw codec
traj = Trajectory(path, mode="w", video_codec="rawvideo_pyarrow")

# Add the same test data
for i in range(10):
# Non-image data - will use raw codec
vector_data = np.random.rand(5).astype(np.float32)
joint_positions = np.array([i, i+1, i+2], dtype=np.float32)

traj.add("sensor/vector", vector_data, timestamp=i*100)
traj.add("robot/joints", joint_positions, timestamp=i*100)

traj.close()

# Read back and verify
traj_read = Trajectory(path, mode="r")
data = traj_read.load()
traj_read.close()

print(f"Loaded {len(data)} features:")
for key, values in data.items():
print(f" {key}: shape={values.shape}, dtype={values.dtype}")

file_size = os.path.getsize(path)
print(f"File size: {file_size} bytes")

return file_size


def demo_mixed_data():
"""Demonstrate mixed RGB image and raw data with different codecs"""
print("\n=== Mixed Data Demo ===")

with tempfile.TemporaryDirectory() as temp_dir:
path = os.path.join(temp_dir, "mixed_demo.vla")

# Create trajectory with default codec selection
traj = Trajectory(path, mode="w", video_codec="auto")

# Add mixed data
for i in range(5):
# RGB image - will use video codec
rgb_image = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8)

# Non-image data - will use raw codec
vector_data = np.random.rand(10).astype(np.float32)
depth_data = np.random.rand(32, 32).astype(np.float32) # Grayscale

traj.add("camera/rgb", rgb_image, timestamp=i*100)
traj.add("sensor/vector", vector_data, timestamp=i*100)
traj.add("camera/depth", depth_data, timestamp=i*100)

traj.close()

# Read back and verify
traj_read = Trajectory(path, mode="r")
data = traj_read.load()
traj_read.close()

print(f"Loaded {len(data)} features:")
for key, values in data.items():
print(f" {key}: shape={values.shape}, dtype={values.dtype}")

file_size = os.path.getsize(path)
print(f"File size: {file_size} bytes")

return file_size


def demo_codec_config():
"""Demonstrate custom codec configuration"""
print("\n=== Custom Codec Configuration Demo ===")

# Create custom codec config
config = CodecConfig(codec="rawvideo_pyarrow", options={
"batch_size": 50, # Smaller batches
"compression": "lz4" # Different compression
})

with tempfile.TemporaryDirectory() as temp_dir:
path = os.path.join(temp_dir, "custom_config_demo.vla")

# Create trajectory with custom config
traj = Trajectory(path, mode="w", codec_config=config)

# Add test data
for i in range(20):
vector_data = np.random.rand(8).astype(np.float32)
traj.add("sensor/data", vector_data, timestamp=i*50)

traj.close()

# Read back and verify
traj_read = Trajectory(path, mode="r")
data = traj_read.load()
traj_read.close()

print(f"Loaded {len(data)} features:")
for key, values in data.items():
print(f" {key}: shape={values.shape}, dtype={values.dtype}")

file_size = os.path.getsize(path)
print(f"File size: {file_size} bytes")

return file_size


if __name__ == "__main__":
print("Codec Abstraction System Demo")
print("=" * 50)

pickle_size = demo_pickle_codec()
pyarrow_size = demo_pyarrow_codec()
mixed_size = demo_mixed_data()
custom_size = demo_codec_config()

print("\n=== Summary ===")
print(f"Pickle codec file size: {pickle_size} bytes")
if pyarrow_size is not None:
print(f"PyArrow codec file size: {pyarrow_size} bytes")
if pickle_size:
compression_ratio = pickle_size / pyarrow_size
print(f"Compression ratio: {compression_ratio:.2f}x")
print(f"Mixed data file size: {mixed_size} bytes")
print(f"Custom config file size: {custom_size} bytes")

print("\nDemo completed successfully!")
118 changes: 118 additions & 0 deletions examples/oxe_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import os
import tempfile

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

import robodm

# Prevent tensorflow from allocating GPU memory
tf.config.set_visible_devices([], "GPU")

import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("robodm").setLevel(logging.DEBUG)


def main():
"""
This example demonstrates converting an Open-X Embodiment (OXE)
dataset episode to robodm format and loading it back.
"""

def _transpose_list_of_dicts(list_of_dicts):
"""Converts a list of nested dictionaries to a nested dictionary of lists."""
if not list_of_dicts:
return {}

# Base case: if the first element is not a dictionary, it's a leaf.
if not isinstance(list_of_dicts[0], dict):
return list_of_dicts

dict_of_lists = {}
# Assume all dicts in the list have the same keys as the first one.
for key in list_of_dicts[0].keys():
# Recursively process the values for each key.
dict_of_lists[key] = _transpose_list_of_dicts(
[d[key] for d in list_of_dicts]
)
return dict_of_lists

# 1. Load an episode from an OXE dataset
# We use `fractal20220817_data/bridge_from_patak_to_aloha_space` as used in the
# reference notebook.
# NOTE: This might take a significant amount of time on the first run
# as it needs to download the dataset index and relevant files.
print("Loading OXE dataset from tensorflow_datasets...")
builder = tfds.builder_from_directory(builder_dir=
"gs://gresearch/robotics/fractal20220817_data/0.1.0"
)

# Load the first episode from the training split.
ds = builder.as_dataset(split="train[:1]")
episode = next(iter(tfds.as_numpy(ds)))

# The episode contains 'steps' which is a tf.data.Dataset object.
# We first convert it into a list of step dictionaries.
steps_list = list(episode["steps"])

if not steps_list:
print("Episode is empty, exiting.")
return

# Now, we transpose this list of dictionaries into a dictionary of lists.
# This is the format `from_dict_of_lists` expects.
episode_steps = _transpose_list_of_dicts(steps_list)

num_steps = len(episode_steps["observation"]["image"])
print(f"Loaded episode with {num_steps} steps.")

# Let's check the shape of an image from the original dataset
original_image_shape = episode_steps["observation"]["image"][0].shape
print(f"Original image shape: {original_image_shape}")

# 2. Convert to robodm format and save
path = "./oxe_bridge_example.vla" #os.path.join(tempfile.gettempdir(), "oxe_bridge_example.vla")
print(f"Converting and saving to {path}...")

# `from_dict_of_lists` is perfect for this. It takes a dictionary
# where keys are feature names and values are lists (or arrays) of data
# for each timestep. The nested dictionary from OXE is flattened automatically.
robodm.Trajectory.from_dict_of_lists(data=episode_steps, path=path, video_codec="libx264")
print("Conversion successful.")

# 3. Load the trajectory back
print("Loading trajectory back with robodm...")
traj = robodm.Trajectory(path=path, mode="r")
loaded_data = traj.load()
traj.close()

# 4. Verify the loaded data
loaded_num_steps = len(loaded_data["observation/image"])
print(f"Loaded trajectory with {loaded_num_steps} timesteps")
print(f"Image shape from robodm: {loaded_data['observation/image'][0].shape}")
print(f"Loaded keys: {loaded_data.keys()}")

# write all images to disk
for i in range(loaded_num_steps):
from PIL import Image
import os
os.makedirs("images", exist_ok=True)
image = loaded_data["observation/image"][i]
image = image.astype(np.uint8)
image = Image.fromarray(image)
image.save(f"images/image_{i}.png")

# Compare shapes and number of steps
assert loaded_num_steps == num_steps
assert loaded_data["observation/image"][0].shape == original_image_shape
print("\nVerification successful: Number of steps and image shapes match.")

# Clean up
# os.remove(path)
print(f"Cleaned up temporary file: {path}")


if __name__ == "__main__":
main()
Loading
Loading