diff --git a/pyproject.toml b/pyproject.toml index e57ffd7f..e2fb9527 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,10 +73,15 @@ eval = [ "httpx", ] +video = [ + "lmms-video-utils[all]>=0.1.0", +] + all = [ "lmms_engine[pref]", "lmms_engine[storage]", "lmms_engine[eval]", + "lmms_engine[video]", ] diff --git a/src/lmms_engine/datasets/codec_video_mixin.py b/src/lmms_engine/datasets/codec_video_mixin.py new file mode 100644 index 00000000..c4af356e --- /dev/null +++ b/src/lmms_engine/datasets/codec_video_mixin.py @@ -0,0 +1,71 @@ +"""Mixin for loading videos through the ``lmms_video_utils`` codec backend. + +Separates the dataset-level codec orchestration (collecting canvases plus +their ``CodecVideoOutput`` metadata across a message list) from both the +backend implementation (``load_video_lmms_video_utils`` lives in +``MultiModalDataLoadingMixin``) and the concrete dataset class. A dataset +that mixes this in can pass the collected ``video_metadata`` straight into a +codec-aware processor (e.g. LLaVA-OneVision-2). +""" + +from __future__ import annotations + +import os +from typing import Any, Dict, List, Optional, Tuple + + +class CodecVideoLoadingMixin: + """Collects codec-stream video inputs from OpenAI-style messages. + + Requires the host class to also provide + ``load_video_lmms_video_utils`` (from ``MultiModalDataLoadingMixin``) + and a ``config`` with ``video_backend`` / ``fps``. + """ + + def load_codec_videos( + self, + video_path: str, + data_folder: Optional[str] = None, + fps: int = 1, + video_kwargs: Optional[Dict[str, Any]] = None, + ) -> Tuple[Any, float, Any]: + assert ( + self.config.video_backend == "lmms_video_utils" + ), "CodecVideoLoadingMixin requires video_backend='lmms_video_utils'" + if data_folder is not None: + video_path = os.path.join(data_folder, video_path) + return self.load_video_lmms_video_utils(video_path, fps, video_kwargs=video_kwargs) + + def collect_codec_video_inputs( + self, + messages: List[dict], + data_folder: Optional[str] = None, + ) -> Tuple[List[Any], List[Any], Optional[float]]: + """Walk ``messages`` and load every ``video_url`` via the codec + backend. + + Returns ``(videos, video_metadata_list, sample_fps)`` where + ``videos`` are the canvas arrays, ``video_metadata_list`` holds the + matching ``CodecVideoOutput`` objects, and ``sample_fps`` is the fps + of the last loaded video (or ``None`` if no video was present). + """ + videos: List[Any] = [] + video_metadata_list: List[Any] = [] + sample_fps: Optional[float] = None + + for message in messages: + for content in message["content"]: + if content.get("type") != "video_url": + continue + video_url = content["video_url"] + extra = {k: v for k, v in video_url.items() if k != "url" and v is not None} + frames, sample_fps, codec_output = self.load_codec_videos( + video_url["url"], + data_folder=data_folder, + fps=self.config.fps, + video_kwargs=extra or None, + ) + videos.append(frames) + video_metadata_list.append(codec_output) + + return videos, video_metadata_list, sample_fps diff --git a/src/lmms_engine/datasets/config.py b/src/lmms_engine/datasets/config.py index c1164375..b37be491 100644 --- a/src/lmms_engine/datasets/config.py +++ b/src/lmms_engine/datasets/config.py @@ -38,7 +38,7 @@ class DatasetConfig(Args): video_min_pixels: Optional[int] = 3136 frame_num: Optional[int] = 64 fps: Optional[int] = 1 - video_backend: Optional[Literal["decord", "qwen_vl_utils", "qwen_omni_utils"]] = "qwen_vl_utils" + video_backend: Optional[Literal["decord", "qwen_vl_utils", "qwen_omni_utils", "lmms_video_utils"]] = "qwen_vl_utils" @field_validator( "video_max_pixels", diff --git a/src/lmms_engine/datasets/iterable/__init__.py b/src/lmms_engine/datasets/iterable/__init__.py index d818f274..bb364f83 100644 --- a/src/lmms_engine/datasets/iterable/__init__.py +++ b/src/lmms_engine/datasets/iterable/__init__.py @@ -1,6 +1,7 @@ from .bagel_iterable_dataset import BagelIterableDataset from .base_iterable_dataset import BaseIterableDataset from .fineweb_edu_dataset import FinewebEduDataset +from .llava_ov2_iterable_dataset import LlavaOv2IterableDataset from .multimodal_iterable_dataset import MultiModalIterableDataset from .qwen3_vl_iterable_dataset import Qwen3VLIterableDataset from .qwen_omni_iterable_dataset import QwenOmniIterableDataset @@ -12,6 +13,7 @@ "MultiModalIterableDataset", "VisionSFTIterableDataset", "BagelIterableDataset", + "LlavaOv2IterableDataset", "Qwen3VLIterableDataset", "QwenOmniIterableDataset", ] diff --git a/src/lmms_engine/datasets/iterable/llava_ov2_iterable_dataset.py b/src/lmms_engine/datasets/iterable/llava_ov2_iterable_dataset.py new file mode 100644 index 00000000..286d3d68 --- /dev/null +++ b/src/lmms_engine/datasets/iterable/llava_ov2_iterable_dataset.py @@ -0,0 +1,56 @@ +import json +import os +from typing import Any, Dict + +import torch +from PIL import Image + +from lmms_engine.datasets.codec_video_mixin import CodecVideoLoadingMixin +from lmms_engine.datasets.iterable.vision_iterable_dataset import ( + VisionSFTIterableDataset, +) +from lmms_engine.mapping_func import register_dataset +from lmms_engine.utils.train_utils import TrainUtilities + + +@register_dataset("llava_ov2_iterable") +class LlavaOv2IterableDataset(CodecVideoLoadingMixin, VisionSFTIterableDataset): + """Iterable dataset for LLaVA-OneVision-2 with codec-stream video input. + + Reuses ``VisionSFTIterableDataset`` plumbing but routes video loading + through the ``lmms_video_utils`` backend (via ``CodecVideoLoadingMixin``) + so each video produces a ``CodecVideoOutput`` (canvases + patch_positions + + source_pts) that the downstream processor can consume directly instead + of re-deriving timestamps from frame index. + """ + + def load_from_json(self, data, data_folder=None) -> Dict[str, torch.Tensor]: + images_list = [] + kwargs: Dict[str, Any] = {} + messages = data["messages"] + if isinstance(messages, str): + messages = json.loads(messages) + + for message in messages: + for content in message["content"]: + if content["type"] == "image_url": + images_list.append(content["image_url"]["url"]) + + videos, video_metadata_list, sample_fps = self.collect_codec_video_inputs(messages, data_folder=data_folder) + if sample_fps is not None: + kwargs["fps"] = sample_fps + + hf_messages = TrainUtilities.convert_open_to_hf(messages) + if data_folder is not None: + images = [Image.open(os.path.join(data_folder, image)) for image in images_list] + else: + images = [Image.open(image) for image in images_list] + if len(images) == 0: + images = None + if len(videos) == 0: + videos = None + else: + kwargs["video_metadata"] = video_metadata_list + + inputs = self.processor.process(images=images, hf_messages=hf_messages, videos=videos, **kwargs) + return inputs diff --git a/src/lmms_engine/datasets/multimodal_mixin.py b/src/lmms_engine/datasets/multimodal_mixin.py index 907a24d6..029cb104 100644 --- a/src/lmms_engine/datasets/multimodal_mixin.py +++ b/src/lmms_engine/datasets/multimodal_mixin.py @@ -144,6 +144,8 @@ def load_videos( return self.load_video_qwen_vl_utils(video_path, fps, video_kwargs=video_kwargs) elif self.config.video_backend == "qwen_omni_utils": return self.load_video_qwen_omni_utils(video_path, fps) + elif self.config.video_backend == "lmms_video_utils": + return self.load_video_lmms_video_utils(video_path, fps, video_kwargs=video_kwargs) else: raise ValueError(f"Video backend {self.config.video_backend} not supported") @@ -280,3 +282,80 @@ def load_video_qwen_omni_utils( return video_frames, sample_fps else: raise ValueError("No video frames returned from process_mm_info") + + def load_video_lmms_video_utils( + self, + video_path: str, + fps: int, + video_kwargs: Optional[Dict[str, Any]] = None, + ) -> Tuple[np.ndarray, float, Any]: + """ + Load video using lmms_video_utils codec-stream backend. + + Returns canvases (codec-packed "frames") plus the full + ``CodecVideoOutput`` metadata so a downstream processor that + understands per-patch positions (e.g. LLaVA-OneVision-2) can use + them instead of re-deriving timestamps from frame index. + + Decoder selection is read from ``config.extra_kwargs``: + ``video_decode_backend``: "auto" | "torchcodec" | "pyav" + (default: "pyav") + ``video_decode_device``: "cpu" | "cuda" | "cuda:N" + (default: "cpu"; "cuda" resolves to cuda:LOCAL_RANK) + + Args: + video_path: Path to video file. + fps: Target fps; mapped to ``target_fps`` when sampling strategy + is ``"fps"``. + video_kwargs: Optional extra fields. Supported keys mirror + qwen-vl-utils (``video_start``/``video_end``/``fps``/ + ``nframes``/``max_pixels``/``min_pixels``); plus any + ``CodecConfig`` field (``score_mode``, ``gop_mode``, + ``target_canvas`` etc.). + + Returns: + Tuple of (frames [T, H, W, C] np.uint8, sample_fps, codec_output). + """ + from lmms_video_utils import fetch_codec_video + + extra = self.config.extra_kwargs or {} + decode_backend = extra.get("video_decode_backend", "pyav") + decode_device = extra.get("video_decode_device", "cpu") + if decode_device == "cuda": + local_rank = int(os.environ.get("LOCAL_RANK", "0")) + decode_device = f"cuda:{local_rank}" + + overrides: Dict[str, Any] = { + "backend": decode_backend, + "device": decode_device, + } + if self.config.video_max_pixels is not None: + overrides["max_pixels"] = int(self.config.video_max_pixels) + if self.config.video_min_pixels is not None: + overrides["min_pixels"] = int(self.config.video_min_pixels) + if self.config.video_max_frames is not None: + overrides["max_frames"] = int(self.config.video_max_frames) + + if self.config.video_sampling_strategy == "fps": + overrides["target_fps"] = float(fps) + elif self.config.video_sampling_strategy == "frame_num": + overrides["max_frames"] = int(self.config.frame_num) + + if video_kwargs: + qwen_to_ours = { + "video_start": "start_time", + "video_end": "end_time", + "fps": "target_fps", + "nframes": "max_frames", + "max_pixels": "max_pixels", + "min_pixels": "min_pixels", + } + for k, v in video_kwargs.items(): + overrides[qwen_to_ours.get(k, k)] = v + + codec_output = fetch_codec_video(video_path, **overrides) + canvases = codec_output.canvases.cpu().numpy() + if canvases.ndim == 4 and canvases.shape[1] in (1, 3, 4): + canvases = np.transpose(canvases, (0, 2, 3, 1)) + sample_fps = float(codec_output.fps) + return canvases, sample_fps, codec_output diff --git a/src/lmms_engine/datasets/processor/llava_onevision2_processor.py b/src/lmms_engine/datasets/processor/llava_onevision2_processor.py index d3d1e792..a467494a 100644 --- a/src/lmms_engine/datasets/processor/llava_onevision2_processor.py +++ b/src/lmms_engine/datasets/processor/llava_onevision2_processor.py @@ -54,6 +54,7 @@ def process( audios: Optional[List[np.ndarray]] = None, sampling_rate: Optional[int] = None, videos=None, + video_metadata=None, system_message: str = "You are a helpful assistant", add_system_prompt: bool = True, add_generation_prompt: bool = False, @@ -70,16 +71,33 @@ def process( image_grid_thw = None # ---------------- Video branch ---------------- - # OV2 video_processor expects a list of videos as pre-decoded frame - # arrays in **HWC uint8** format (so it can call PIL.Image.fromarray). - # The iterable dataset (qwen3_vl_iterable / qwen_vl_utils) hands us - # **CHW float32 / uint8** numpy arrays of shape ``[T, 3, H, W]``, so - # convert here. - if videos is not None: + # Two paths: + # (a) ``video_metadata`` carries a list of ``CodecVideoOutput`` from + # the ``lmms_video_utils`` backend. Each entry already knows the + # per-patch ``(t, h, w)`` source coordinates and the per-canvas + # source timestamp. Treat canvases as the "frames" the model + # sees, but bypass OV2's video_processor and reuse our metadata. + # (b) ``videos`` is a list of raw decoded frame arrays. OV2's + # video_processor re-derives positions from ``arange(T)``; this + # is the legacy frame-sampling path. + codec_meta_path = ( + video_metadata is not None + and isinstance(video_metadata, (list, tuple)) + and len(video_metadata) > 0 + and self._looks_like_codec_output(video_metadata[0]) + ) + + if codec_meta_path: + videos_inputs = self._build_codec_video_inputs(videos, video_metadata) + elif videos is not None: videos = [self._normalize_video_for_ov2(v) for v in videos] videos_inputs = self.processor.video_processor(videos=videos, return_tensors="pt") - video_grid_thw = videos_inputs["video_grid_thw"] # [num_videos, 3] - frame_timestamps = videos_inputs["frame_timestamps"] # list[list[float]] + else: + videos_inputs = None + + if videos_inputs is not None: + video_grid_thw = videos_inputs["video_grid_thw"] + frame_timestamps = videos_inputs["frame_timestamps"] video_pixel_values = videos_inputs["pixel_values_videos"] video_patch_positions = videos_inputs["patch_positions"] else: @@ -206,6 +224,144 @@ def _get_build_patch_positions(self): self._cached_build_patch_positions = mod.build_patch_positions return self._cached_build_patch_positions + @staticmethod + def _looks_like_codec_output(obj) -> bool: + return all(hasattr(obj, attr) for attr in ("canvases", "patch_positions", "source_pts", "fps")) + + def _get_codec_module(self): + """Resolve the codec_video_processing module shipped with the OV2 + checkpoint (loaded via trust_remote_code). Cached after first use.""" + if not hasattr(self, "_cached_codec_module"): + import importlib + import sys + + video_proc = self.processor.video_processor + base_pkg = type(video_proc).__module__.rsplit(".", 1)[0] + candidate_names = [ + f"{base_pkg}.codec_video_processing_llava_onevision2", + "codec_video_processing_llava_onevision2", + ] + mod = None + for name in candidate_names: + if name in sys.modules: + mod = sys.modules[name] + break + if mod is None: + for name in candidate_names: + try: + mod = importlib.import_module(name) + break + except ImportError: + continue + if mod is None: + raise ImportError( + "Could not locate codec_video_processing_llava_onevision2 module; " + "ensure the OV2 checkpoint was loaded with trust_remote_code=True." + ) + self._cached_codec_module = mod + return self._cached_codec_module + + def _build_codec_video_inputs(self, videos, video_metadata) -> dict: + """Construct the same dict shape OV2 video_processor would emit, but + from ``lmms_video_utils.CodecVideoOutput`` metadata. + + - ``canvases`` are pushed through the OV2 image_processor (one canvas + == one "frame") to get ``pixel_values_videos`` and per-canvas grid. + - Source-side ``(t, h, w)`` patch coordinates are reordered into + OV2's 2x2 block layout via ``convert_positions_to_block_layout`` + from the codec processing module. + - ``frame_timestamps`` are read straight off ``source_pts``. + """ + codec_mod = self._get_codec_module() + convert_positions_to_block_layout = codec_mod.convert_positions_to_block_layout + + per_video_pixel_values = [] + per_video_grid_thw = [] + per_video_patch_positions = [] + per_video_timestamps: List[List[float]] = [] + + if videos is None: + videos = [None] * len(video_metadata) + if len(videos) != len(video_metadata): + raise ValueError(f"videos / video_metadata length mismatch: " f"{len(videos)} vs {len(video_metadata)}") + + ip = self.processor.image_processor + sms = int(ip.merge_size) + + for canvases_arr, meta in zip(videos, video_metadata): + pil_canvases = self._codec_canvases_to_pil(canvases_arr, meta) + image_data = ip(images=pil_canvases, return_tensors="pt") + image_grid_thw = image_data["image_grid_thw"] # [N, 3] rows [1, Hp, Wp] + if not torch.all(image_grid_thw[:, 1] == image_grid_thw[0, 1]) or not torch.all( + image_grid_thw[:, 2] == image_grid_thw[0, 2] + ): + raise RuntimeError("codec canvases yielded inconsistent (Hp, Wp); expected uniform shape.") + T_eff = int(image_grid_thw[:, 0].sum().item()) + H_p = int(image_grid_thw[0, 1].item()) + W_p = int(image_grid_thw[0, 2].item()) + video_grid_thw_row = torch.tensor([[T_eff, H_p, W_p]], dtype=image_grid_thw.dtype) + + src_positions = meta.patch_positions + if hasattr(src_positions, "cpu"): + src_positions = src_positions.cpu() + src_positions = src_positions.to(torch.long) + expected = T_eff * H_p * W_p + if src_positions.shape[0] != expected: + raise ValueError( + f"codec patch_positions length {src_positions.shape[0]} " f"!= expected T*H*W = {expected}" + ) + patch_positions = convert_positions_to_block_layout( + src_positions, + T_eff, + H_p, + W_p, + spatial_merge_size=sms, + ) + + if hasattr(meta.source_pts, "cpu"): + seconds_seq = meta.source_pts.cpu().tolist() + else: + seconds_seq = list(meta.source_pts) + if len(seconds_seq) < T_eff: + pad_val = seconds_seq[-1] if seconds_seq else 0.0 + seconds_seq = list(seconds_seq) + [pad_val] * (T_eff - len(seconds_seq)) + elif len(seconds_seq) > T_eff: + seconds_seq = list(seconds_seq[:T_eff]) + + per_video_pixel_values.append(image_data["pixel_values"]) + per_video_grid_thw.append(video_grid_thw_row) + per_video_patch_positions.append(patch_positions) + per_video_timestamps.append([float(s) for s in seconds_seq]) + + return { + "pixel_values_videos": torch.cat(per_video_pixel_values, dim=0), + "video_grid_thw": torch.cat(per_video_grid_thw, dim=0), + "patch_positions": torch.cat(per_video_patch_positions, dim=0), + "frame_timestamps": per_video_timestamps, + } + + @staticmethod + def _codec_canvases_to_pil(canvases_arr, meta) -> List[Image]: + """Coerce canvases from the iterable (THWC uint8 np) or directly from + ``meta.canvases`` (TCHW uint8 tensor) into a list of PIL images.""" + from PIL import Image as PILImage + + if canvases_arr is None: + arr = meta.canvases + if hasattr(arr, "cpu"): + arr = arr.cpu().numpy() + if arr.ndim == 4 and arr.shape[1] in (1, 3, 4): + arr = np.transpose(arr, (0, 2, 3, 1)) + else: + arr = canvases_arr + if hasattr(arr, "cpu"): + arr = arr.cpu().numpy() + if arr.ndim == 4 and arr.shape[1] in (1, 3, 4): + arr = np.transpose(arr, (0, 2, 3, 1)) + if arr.dtype != np.uint8: + arr = arr.clip(0, 255).astype(np.uint8) + return [PILImage.fromarray(arr[i]) for i in range(arr.shape[0])] + @property def vision_start_token_id(self) -> int: return self.processor.tokenizer.convert_tokens_to_ids("<|vision_start|>")