Skip to content

Latest commit

 

History

History
292 lines (228 loc) · 9.58 KB

File metadata and controls

292 lines (228 loc) · 9.58 KB

Datasets and Packing: Naive vs Streaming

This guide explains the two dataset implementations in LMMS Engine and helps you choose the right approach for your training needs.

Overview

LMMS Engine provides two distinct dataset implementations:

Dataset Type Class Description Best For
Naive (Map-style) MultiModalDataset Precomputes packing groups before training Small to medium datasets, deterministic packing
Streaming (Iterable) MultiModalIterableDataset Packs sequences on-the-fly during iteration Large datasets, low memory usage, dynamic data

Both implementations share the same DatasetConfig interface for seamless switching between approaches.

Quick Start

Basic Usage

from lmms_engine.datasets import DatasetConfig, MultiModalDataset, MultiModalIterableDataset
from lmms_engine.train import FSDP2SFTTrainer

# Configure your dataset
config = DatasetConfig(
    # Core settings
    dataset_type="vision",                    # Type: vision | vision_audio | fineweb_edu | rae | sit | qwen_omni
                                              # Note: Use vision_iterable or bagel_iterable for streaming versions
    dataset_format="hf_dataset",              # Format: json | jsonl | csv | yaml | hf_dataset | arrow | parquet
    dataset_path="your/dataset/path",         # Path to dataset or HF Hub ID
    
    # Processing
    processor_config={"processor_type": "your_processor"},
    shuffle=True,
    
    # Packing configuration
    packing=True,                              # Enable sequence packing
    packing_length=32000,                      # Maximum tokens per packed sequence
    filter_overlong=True,                      # Drop sequences > packing_length
    packing_strategy="first_fit",              # Naive only: first_fit | window_XX (ignored for Streaming)
)

# Choose your dataset implementation
# Option 1: Naive (precomputed packing)
dataset = MultiModalDataset(config)

# Option 2: Streaming (on-the-fly packing)
# Important: For Streaming dataset, prefer dataset_format="hf_dataset", "arrow", or "parquet"
# json/jsonl formats work better with Naive dataset  
dataset = MultiModalIterableDataset(config)

# Build and use
dataset.build()
collator = dataset.get_collator()

# Train with FSDP2
trainer = FSDP2SFTTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    data_collator=collator
)
trainer.train()

Dataset Implementation Details

Naive Dataset (Precomputed Packing)

The MultiModalDataset loads all data into memory and precomputes optimal packing arrangements before training begins.

How it works:

  1. Load: Loads dataset (memory-mapped for arrow/parquet/hf_dataset, full load for json/jsonl)
  2. Estimate: Calculates token length for each sample using map operations
  3. Pack: Precomputes optimal packing arrangements using selected algorithm
  4. Serve: Returns precomputed packs during training

Characteristics:

  • Deterministic: Same packing arrangement every epoch
  • Optimal packing: Can use sophisticated algorithms for better utilization
  • Known length: Exact number of steps per epoch is known
  • ⚠️ Memory usage: Full load for json/jsonl; memory-mapped for arrow/parquet/hf_dataset
  • Slower startup: Preprocessing adds initialization time

When to use:

  • Dataset fits comfortably in memory (< 100GB)
  • You need reproducible training runs
  • Packing efficiency is critical
  • You're debugging or experimenting

Streaming Dataset (On-the-fly Packing)

The MultiModalIterableDataset streams data and packs sequences dynamically during iteration.

How it works:

  1. Stream: Loads data samples one at a time
  2. Buffer: Accumulates samples in a buffer
  3. Pack: When buffer + next sample > packing_length, yields buffer
  4. Flush: Yields remaining buffer at epoch end

Characteristics:

  • Memory efficient: Streams data samples without precomputing packs
  • Fast startup: No preprocessing required
  • Scales infinitely: Works with any dataset size
  • Non-deterministic: Different packing each epoch
  • Unknown length: Can't calculate exact steps per epoch (use max_steps instead of num_train_epochs)
  • Suboptimal packing: Uses greedy buffer-filling strategy - yields buffer when buffer_length + next_sample > packing_length, may waste tokens compared to global optimization

When to use:

  • Large datasets (> 100GB)
  • Limited memory environments
  • Continuous/streaming data sources
  • Production training at scale

Distributed Training Behavior

Naive Dataset

  • Uses DistributedSampler or DistributedLengthGroupedSampler
  • Each rank gets deterministic subset of packs
  • Steps per epoch = total_packs / world_size
  • Supports group_by_length for improved training efficiency

Streaming Dataset

  • Performs rank sharding via HFDataset.shard()
  • Worker splitting via torch.utils.data.get_worker_info()
  • Dynamic step count per rank (depends on data distribution)
  • No sampler attachment (handled internally)

⚠️ Important: Ensure dataset length divides evenly across ranks to avoid imbalanced workloads.

Configuration Reference

Core Parameters

Parameter Type Description Default
packing bool Enable sequence packing False
packing_length int Maximum tokens per packed sequence 32000
filter_overlong bool Drop sequences exceeding packing_length True
packing_strategy str Naive only: first_fit or window_XX first_fit
shuffle bool Shuffle dataset before packing True

Packing Strategies (Naive Only)

  • first_fit: Greedily pack sequences into first available space
  • window_XX: Group sequences within sliding windows of size XX (e.g., window_100)
    • First sorts sequences by length
    • Groups sequences within windows of XX consecutive samples
    • Provides better packing for sorted data while maintaining some randomness

Configuration Examples

YAML Configuration

dataset:
  dataset_type: vision
  dataset_format: hf_dataset
  dataset_path: your/dataset/path
  shuffle: true
  packing: true
  packing_length: 32000
  filter_overlong: true
  processor_config:
    processor_type: your_processor

Python Configuration

config = DatasetConfig(
    dataset_type="vision",
    dataset_format="hf_dataset",
    dataset_path="your/dataset/path",
    packing=True,
    packing_length=32000,
    filter_overlong=True,
    processor_config={"processor_type": "your_processor"}
)

Performance Tips

Optimizing Packing Efficiency

  1. Choose appropriate packing_length:

    • Too small: Underutilized sequences
    • Too large: May exceed memory limits
    • Recommended: Start with model's max sequence length
  2. Monitor packing metrics:

    # Trainer logs these automatically
    - perf/global_seq_len_avg  # Average packed sequence length
    - perf/global_seq_len_min  # Minimum across ranks
    - perf/global_seq_len_max  # Maximum across ranks
  3. Handle outliers:

    • Set filter_overlong=True to drop anomalously long sequences
    • Prevents memory spikes and improves batch consistency

Memory Management

For Naive Dataset:

# Estimate memory usage
estimated_memory = num_samples * avg_sample_size * 1.2  # 20% overhead

For Streaming Dataset:

# Memory usage is constant
max_memory = batch_size * packing_length * token_size

Troubleshooting

Common Issues

1. Distributed Training Hangs

Problem: Collective operations hang during training.

Solution: Ensure all ranks have identical tensor shapes:

# Bad: Different shapes across ranks
loss = torch.tensor([loss1, loss2, ...])  

# Good: Scalar aggregation
loss = torch.tensor(loss.item())
torch.distributed.all_reduce(loss, op=ReduceOp.AVG)

2. Imbalanced Workload

Problem: Some ranks finish before others.

Solution:

  • Ensure dataset size is divisible by world_size
  • Use streaming dataset for better load balancing
  • Enable shuffle=True to randomize distribution

3. OOM with Naive Dataset

Problem: Out of memory during dataset loading.

Solutions:

  • Switch to streaming dataset
  • Reduce packing_length
  • Enable filter_overlong=True
  • Use data sharding:
    dataset = load_dataset("path", split=f"train[{rank}:{rank+1}:{world_size}]")

Decision Matrix

Criterion Naive Dataset Streaming Dataset
Dataset Size < 100GB Any size
Memory Usage Medium (memory-mapped) to High (json/jsonl) Low
Best Formats All supported hf_dataset, arrow, parquet
Startup Time Slow Fast
Packing Quality Optimal Good
Reproducibility Yes No
Step Count Known Yes No
LR Schedulers All supported Limited (use max_steps)
Best For Research, debugging Production, scale

Migration Guide

From Naive to Streaming

# Before (Naive)
from lmms_engine.datasets import MultiModalDataset
dataset = MultiModalDataset(config)

# After (Streaming)
from lmms_engine.datasets import MultiModalIterableDataset
dataset = MultiModalIterableDataset(config)
# Note: Ensure dataset_format="hf_dataset"

From Streaming to Naive

# Before (Streaming)
from lmms_engine.datasets import MultiModalIterableDataset
dataset = MultiModalIterableDataset(config)

# After (Naive)
from lmms_engine.datasets import MultiModalDataset
dataset = MultiModalDataset(config)
# Note: May need to adjust memory allocation