Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,15 @@ eval = [
"httpx",
]

video = [
"lmms-video-utils[all]>=0.1.0",
]

all = [
"lmms_engine[pref]",
"lmms_engine[storage]",
"lmms_engine[eval]",
"lmms_engine[video]",
]


Expand Down
71 changes: 71 additions & 0 deletions src/lmms_engine/datasets/codec_video_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Mixin for loading videos through the ``lmms_video_utils`` codec backend.

Separates the dataset-level codec orchestration (collecting canvases plus
their ``CodecVideoOutput`` metadata across a message list) from both the
backend implementation (``load_video_lmms_video_utils`` lives in
``MultiModalDataLoadingMixin``) and the concrete dataset class. A dataset
that mixes this in can pass the collected ``video_metadata`` straight into a
codec-aware processor (e.g. LLaVA-OneVision-2).
"""

from __future__ import annotations

import os
from typing import Any, Dict, List, Optional, Tuple


class CodecVideoLoadingMixin:
"""Collects codec-stream video inputs from OpenAI-style messages.

Requires the host class to also provide
``load_video_lmms_video_utils`` (from ``MultiModalDataLoadingMixin``)
and a ``config`` with ``video_backend`` / ``fps``.
"""

def load_codec_videos(
self,
video_path: str,
data_folder: Optional[str] = None,
fps: int = 1,
video_kwargs: Optional[Dict[str, Any]] = None,
) -> Tuple[Any, float, Any]:
assert (
self.config.video_backend == "lmms_video_utils"
), "CodecVideoLoadingMixin requires video_backend='lmms_video_utils'"
if data_folder is not None:
video_path = os.path.join(data_folder, video_path)
return self.load_video_lmms_video_utils(video_path, fps, video_kwargs=video_kwargs)

def collect_codec_video_inputs(
self,
messages: List[dict],
data_folder: Optional[str] = None,
) -> Tuple[List[Any], List[Any], Optional[float]]:
"""Walk ``messages`` and load every ``video_url`` via the codec
backend.

Returns ``(videos, video_metadata_list, sample_fps)`` where
``videos`` are the canvas arrays, ``video_metadata_list`` holds the
matching ``CodecVideoOutput`` objects, and ``sample_fps`` is the fps
of the last loaded video (or ``None`` if no video was present).
"""
videos: List[Any] = []
video_metadata_list: List[Any] = []
sample_fps: Optional[float] = None

for message in messages:
for content in message["content"]:
if content.get("type") != "video_url":
continue
video_url = content["video_url"]
extra = {k: v for k, v in video_url.items() if k != "url" and v is not None}
frames, sample_fps, codec_output = self.load_codec_videos(
video_url["url"],
data_folder=data_folder,
fps=self.config.fps,
video_kwargs=extra or None,
)
videos.append(frames)
video_metadata_list.append(codec_output)

return videos, video_metadata_list, sample_fps
2 changes: 1 addition & 1 deletion src/lmms_engine/datasets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DatasetConfig(Args):
video_min_pixels: Optional[int] = 3136
frame_num: Optional[int] = 64
fps: Optional[int] = 1
video_backend: Optional[Literal["decord", "qwen_vl_utils", "qwen_omni_utils"]] = "qwen_vl_utils"
video_backend: Optional[Literal["decord", "qwen_vl_utils", "qwen_omni_utils", "lmms_video_utils"]] = "qwen_vl_utils"

@field_validator(
"video_max_pixels",
Expand Down
2 changes: 2 additions & 0 deletions src/lmms_engine/datasets/iterable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .bagel_iterable_dataset import BagelIterableDataset
from .base_iterable_dataset import BaseIterableDataset
from .fineweb_edu_dataset import FinewebEduDataset
from .llava_ov2_iterable_dataset import LlavaOv2IterableDataset
from .multimodal_iterable_dataset import MultiModalIterableDataset
from .qwen3_vl_iterable_dataset import Qwen3VLIterableDataset
from .qwen_omni_iterable_dataset import QwenOmniIterableDataset
Expand All @@ -12,6 +13,7 @@
"MultiModalIterableDataset",
"VisionSFTIterableDataset",
"BagelIterableDataset",
"LlavaOv2IterableDataset",
"Qwen3VLIterableDataset",
"QwenOmniIterableDataset",
]
56 changes: 56 additions & 0 deletions src/lmms_engine/datasets/iterable/llava_ov2_iterable_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import os
from typing import Any, Dict

import torch
from PIL import Image

from lmms_engine.datasets.codec_video_mixin import CodecVideoLoadingMixin
from lmms_engine.datasets.iterable.vision_iterable_dataset import (

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kcz358 can we extract an abstract class, and name it to something like codec native input.

also for processor, ov2 is a specification in both data and processor.

VisionSFTIterableDataset,
)
from lmms_engine.mapping_func import register_dataset
from lmms_engine.utils.train_utils import TrainUtilities


@register_dataset("llava_ov2_iterable")
class LlavaOv2IterableDataset(CodecVideoLoadingMixin, VisionSFTIterableDataset):
"""Iterable dataset for LLaVA-OneVision-2 with codec-stream video input.

Reuses ``VisionSFTIterableDataset`` plumbing but routes video loading
through the ``lmms_video_utils`` backend (via ``CodecVideoLoadingMixin``)
so each video produces a ``CodecVideoOutput`` (canvases + patch_positions
+ source_pts) that the downstream processor can consume directly instead
of re-deriving timestamps from frame index.
"""

def load_from_json(self, data, data_folder=None) -> Dict[str, torch.Tensor]:
images_list = []
kwargs: Dict[str, Any] = {}
messages = data["messages"]
if isinstance(messages, str):
messages = json.loads(messages)

for message in messages:
for content in message["content"]:
if content["type"] == "image_url":
images_list.append(content["image_url"]["url"])

videos, video_metadata_list, sample_fps = self.collect_codec_video_inputs(messages, data_folder=data_folder)
if sample_fps is not None:
kwargs["fps"] = sample_fps

hf_messages = TrainUtilities.convert_open_to_hf(messages)
if data_folder is not None:
images = [Image.open(os.path.join(data_folder, image)) for image in images_list]
else:
images = [Image.open(image) for image in images_list]
if len(images) == 0:
images = None
if len(videos) == 0:
videos = None
else:
kwargs["video_metadata"] = video_metadata_list

inputs = self.processor.process(images=images, hf_messages=hf_messages, videos=videos, **kwargs)
return inputs
79 changes: 79 additions & 0 deletions src/lmms_engine/datasets/multimodal_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def load_videos(
return self.load_video_qwen_vl_utils(video_path, fps, video_kwargs=video_kwargs)
elif self.config.video_backend == "qwen_omni_utils":
return self.load_video_qwen_omni_utils(video_path, fps)
elif self.config.video_backend == "lmms_video_utils":
return self.load_video_lmms_video_utils(video_path, fps, video_kwargs=video_kwargs)
else:
raise ValueError(f"Video backend {self.config.video_backend} not supported")

Expand Down Expand Up @@ -280,3 +282,80 @@ def load_video_qwen_omni_utils(
return video_frames, sample_fps
else:
raise ValueError("No video frames returned from process_mm_info")

def load_video_lmms_video_utils(
self,
video_path: str,
fps: int,
video_kwargs: Optional[Dict[str, Any]] = None,
) -> Tuple[np.ndarray, float, Any]:
"""
Load video using lmms_video_utils codec-stream backend.

Returns canvases (codec-packed "frames") plus the full
``CodecVideoOutput`` metadata so a downstream processor that
understands per-patch positions (e.g. LLaVA-OneVision-2) can use
them instead of re-deriving timestamps from frame index.

Decoder selection is read from ``config.extra_kwargs``:
``video_decode_backend``: "auto" | "torchcodec" | "pyav"
(default: "pyav")
``video_decode_device``: "cpu" | "cuda" | "cuda:N"
(default: "cpu"; "cuda" resolves to cuda:LOCAL_RANK)

Args:
video_path: Path to video file.
fps: Target fps; mapped to ``target_fps`` when sampling strategy
is ``"fps"``.
video_kwargs: Optional extra fields. Supported keys mirror
qwen-vl-utils (``video_start``/``video_end``/``fps``/
``nframes``/``max_pixels``/``min_pixels``); plus any
``CodecConfig`` field (``score_mode``, ``gop_mode``,
``target_canvas`` etc.).

Returns:
Tuple of (frames [T, H, W, C] np.uint8, sample_fps, codec_output).
"""
from lmms_video_utils import fetch_codec_video

extra = self.config.extra_kwargs or {}
decode_backend = extra.get("video_decode_backend", "pyav")
decode_device = extra.get("video_decode_device", "cpu")
if decode_device == "cuda":
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
decode_device = f"cuda:{local_rank}"

overrides: Dict[str, Any] = {
"backend": decode_backend,
"device": decode_device,
}
if self.config.video_max_pixels is not None:
overrides["max_pixels"] = int(self.config.video_max_pixels)
if self.config.video_min_pixels is not None:
overrides["min_pixels"] = int(self.config.video_min_pixels)
if self.config.video_max_frames is not None:
overrides["max_frames"] = int(self.config.video_max_frames)

if self.config.video_sampling_strategy == "fps":
overrides["target_fps"] = float(fps)
elif self.config.video_sampling_strategy == "frame_num":
overrides["max_frames"] = int(self.config.frame_num)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should not be max_frames, more like we set a visual tokens budget?


if video_kwargs:
qwen_to_ours = {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe should not consider qwen_to_ours, we can convert all data offline, prepare all things needed using ffprobe.

"video_start": "start_time",
"video_end": "end_time",
"fps": "target_fps",
"nframes": "max_frames",
"max_pixels": "max_pixels",
"min_pixels": "min_pixels",
}
for k, v in video_kwargs.items():
overrides[qwen_to_ours.get(k, k)] = v

codec_output = fetch_codec_video(video_path, **overrides)
canvases = codec_output.canvases.cpu().numpy()
if canvases.ndim == 4 and canvases.shape[1] in (1, 3, 4):
canvases = np.transpose(canvases, (0, 2, 3, 1))
sample_fps = float(codec_output.fps)
return canvases, sample_fps, codec_output
Loading
Loading