-
Notifications
You must be signed in to change notification settings - Fork 35
feat(datasets): lmms_video_utils video backend for LLaVA-OneVision-2 codec stream #183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a4ea857
a355b79
5869228
9ca9323
714856f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| """Mixin for loading videos through the ``lmms_video_utils`` codec backend. | ||
|
|
||
| Separates the dataset-level codec orchestration (collecting canvases plus | ||
| their ``CodecVideoOutput`` metadata across a message list) from both the | ||
| backend implementation (``load_video_lmms_video_utils`` lives in | ||
| ``MultiModalDataLoadingMixin``) and the concrete dataset class. A dataset | ||
| that mixes this in can pass the collected ``video_metadata`` straight into a | ||
| codec-aware processor (e.g. LLaVA-OneVision-2). | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import os | ||
| from typing import Any, Dict, List, Optional, Tuple | ||
|
|
||
|
|
||
| class CodecVideoLoadingMixin: | ||
| """Collects codec-stream video inputs from OpenAI-style messages. | ||
|
|
||
| Requires the host class to also provide | ||
| ``load_video_lmms_video_utils`` (from ``MultiModalDataLoadingMixin``) | ||
| and a ``config`` with ``video_backend`` / ``fps``. | ||
| """ | ||
|
|
||
| def load_codec_videos( | ||
| self, | ||
| video_path: str, | ||
| data_folder: Optional[str] = None, | ||
| fps: int = 1, | ||
| video_kwargs: Optional[Dict[str, Any]] = None, | ||
| ) -> Tuple[Any, float, Any]: | ||
| assert ( | ||
| self.config.video_backend == "lmms_video_utils" | ||
| ), "CodecVideoLoadingMixin requires video_backend='lmms_video_utils'" | ||
| if data_folder is not None: | ||
| video_path = os.path.join(data_folder, video_path) | ||
| return self.load_video_lmms_video_utils(video_path, fps, video_kwargs=video_kwargs) | ||
|
|
||
| def collect_codec_video_inputs( | ||
| self, | ||
| messages: List[dict], | ||
| data_folder: Optional[str] = None, | ||
| ) -> Tuple[List[Any], List[Any], Optional[float]]: | ||
| """Walk ``messages`` and load every ``video_url`` via the codec | ||
| backend. | ||
|
|
||
| Returns ``(videos, video_metadata_list, sample_fps)`` where | ||
| ``videos`` are the canvas arrays, ``video_metadata_list`` holds the | ||
| matching ``CodecVideoOutput`` objects, and ``sample_fps`` is the fps | ||
| of the last loaded video (or ``None`` if no video was present). | ||
| """ | ||
| videos: List[Any] = [] | ||
| video_metadata_list: List[Any] = [] | ||
| sample_fps: Optional[float] = None | ||
|
|
||
| for message in messages: | ||
| for content in message["content"]: | ||
| if content.get("type") != "video_url": | ||
| continue | ||
| video_url = content["video_url"] | ||
| extra = {k: v for k, v in video_url.items() if k != "url" and v is not None} | ||
| frames, sample_fps, codec_output = self.load_codec_videos( | ||
| video_url["url"], | ||
| data_folder=data_folder, | ||
| fps=self.config.fps, | ||
| video_kwargs=extra or None, | ||
| ) | ||
| videos.append(frames) | ||
| video_metadata_list.append(codec_output) | ||
|
|
||
| return videos, video_metadata_list, sample_fps |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| import json | ||
| import os | ||
| from typing import Any, Dict | ||
|
|
||
| import torch | ||
| from PIL import Image | ||
|
|
||
| from lmms_engine.datasets.codec_video_mixin import CodecVideoLoadingMixin | ||
| from lmms_engine.datasets.iterable.vision_iterable_dataset import ( | ||
| VisionSFTIterableDataset, | ||
| ) | ||
| from lmms_engine.mapping_func import register_dataset | ||
| from lmms_engine.utils.train_utils import TrainUtilities | ||
|
|
||
|
|
||
| @register_dataset("llava_ov2_iterable") | ||
| class LlavaOv2IterableDataset(CodecVideoLoadingMixin, VisionSFTIterableDataset): | ||
| """Iterable dataset for LLaVA-OneVision-2 with codec-stream video input. | ||
|
|
||
| Reuses ``VisionSFTIterableDataset`` plumbing but routes video loading | ||
| through the ``lmms_video_utils`` backend (via ``CodecVideoLoadingMixin``) | ||
| so each video produces a ``CodecVideoOutput`` (canvases + patch_positions | ||
| + source_pts) that the downstream processor can consume directly instead | ||
| of re-deriving timestamps from frame index. | ||
| """ | ||
|
|
||
| def load_from_json(self, data, data_folder=None) -> Dict[str, torch.Tensor]: | ||
| images_list = [] | ||
| kwargs: Dict[str, Any] = {} | ||
| messages = data["messages"] | ||
| if isinstance(messages, str): | ||
| messages = json.loads(messages) | ||
|
|
||
| for message in messages: | ||
| for content in message["content"]: | ||
| if content["type"] == "image_url": | ||
| images_list.append(content["image_url"]["url"]) | ||
|
|
||
| videos, video_metadata_list, sample_fps = self.collect_codec_video_inputs(messages, data_folder=data_folder) | ||
| if sample_fps is not None: | ||
| kwargs["fps"] = sample_fps | ||
|
|
||
| hf_messages = TrainUtilities.convert_open_to_hf(messages) | ||
| if data_folder is not None: | ||
| images = [Image.open(os.path.join(data_folder, image)) for image in images_list] | ||
| else: | ||
| images = [Image.open(image) for image in images_list] | ||
| if len(images) == 0: | ||
| images = None | ||
| if len(videos) == 0: | ||
| videos = None | ||
| else: | ||
| kwargs["video_metadata"] = video_metadata_list | ||
|
|
||
| inputs = self.processor.process(images=images, hf_messages=hf_messages, videos=videos, **kwargs) | ||
| return inputs | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -144,6 +144,8 @@ def load_videos( | |
| return self.load_video_qwen_vl_utils(video_path, fps, video_kwargs=video_kwargs) | ||
| elif self.config.video_backend == "qwen_omni_utils": | ||
| return self.load_video_qwen_omni_utils(video_path, fps) | ||
| elif self.config.video_backend == "lmms_video_utils": | ||
| return self.load_video_lmms_video_utils(video_path, fps, video_kwargs=video_kwargs) | ||
| else: | ||
| raise ValueError(f"Video backend {self.config.video_backend} not supported") | ||
|
|
||
|
|
@@ -280,3 +282,80 @@ def load_video_qwen_omni_utils( | |
| return video_frames, sample_fps | ||
| else: | ||
| raise ValueError("No video frames returned from process_mm_info") | ||
|
|
||
| def load_video_lmms_video_utils( | ||
| self, | ||
| video_path: str, | ||
| fps: int, | ||
| video_kwargs: Optional[Dict[str, Any]] = None, | ||
| ) -> Tuple[np.ndarray, float, Any]: | ||
| """ | ||
| Load video using lmms_video_utils codec-stream backend. | ||
|
|
||
| Returns canvases (codec-packed "frames") plus the full | ||
| ``CodecVideoOutput`` metadata so a downstream processor that | ||
| understands per-patch positions (e.g. LLaVA-OneVision-2) can use | ||
| them instead of re-deriving timestamps from frame index. | ||
|
|
||
| Decoder selection is read from ``config.extra_kwargs``: | ||
| ``video_decode_backend``: "auto" | "torchcodec" | "pyav" | ||
| (default: "pyav") | ||
| ``video_decode_device``: "cpu" | "cuda" | "cuda:N" | ||
| (default: "cpu"; "cuda" resolves to cuda:LOCAL_RANK) | ||
|
|
||
| Args: | ||
| video_path: Path to video file. | ||
| fps: Target fps; mapped to ``target_fps`` when sampling strategy | ||
| is ``"fps"``. | ||
| video_kwargs: Optional extra fields. Supported keys mirror | ||
| qwen-vl-utils (``video_start``/``video_end``/``fps``/ | ||
| ``nframes``/``max_pixels``/``min_pixels``); plus any | ||
| ``CodecConfig`` field (``score_mode``, ``gop_mode``, | ||
| ``target_canvas`` etc.). | ||
|
|
||
| Returns: | ||
| Tuple of (frames [T, H, W, C] np.uint8, sample_fps, codec_output). | ||
| """ | ||
| from lmms_video_utils import fetch_codec_video | ||
|
|
||
| extra = self.config.extra_kwargs or {} | ||
| decode_backend = extra.get("video_decode_backend", "pyav") | ||
| decode_device = extra.get("video_decode_device", "cpu") | ||
| if decode_device == "cuda": | ||
| local_rank = int(os.environ.get("LOCAL_RANK", "0")) | ||
| decode_device = f"cuda:{local_rank}" | ||
|
|
||
| overrides: Dict[str, Any] = { | ||
| "backend": decode_backend, | ||
| "device": decode_device, | ||
| } | ||
| if self.config.video_max_pixels is not None: | ||
| overrides["max_pixels"] = int(self.config.video_max_pixels) | ||
| if self.config.video_min_pixels is not None: | ||
| overrides["min_pixels"] = int(self.config.video_min_pixels) | ||
| if self.config.video_max_frames is not None: | ||
| overrides["max_frames"] = int(self.config.video_max_frames) | ||
|
|
||
| if self.config.video_sampling_strategy == "fps": | ||
| overrides["target_fps"] = float(fps) | ||
| elif self.config.video_sampling_strategy == "frame_num": | ||
| overrides["max_frames"] = int(self.config.frame_num) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there should not be max_frames, more like we set a visual tokens budget? |
||
|
|
||
| if video_kwargs: | ||
| qwen_to_ours = { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe should not consider qwen_to_ours, we can convert all data offline, prepare all things needed using |
||
| "video_start": "start_time", | ||
| "video_end": "end_time", | ||
| "fps": "target_fps", | ||
| "nframes": "max_frames", | ||
| "max_pixels": "max_pixels", | ||
| "min_pixels": "min_pixels", | ||
| } | ||
| for k, v in video_kwargs.items(): | ||
| overrides[qwen_to_ours.get(k, k)] = v | ||
|
|
||
| codec_output = fetch_codec_video(video_path, **overrides) | ||
| canvases = codec_output.canvases.cpu().numpy() | ||
| if canvases.ndim == 4 and canvases.shape[1] in (1, 3, 4): | ||
| canvases = np.transpose(canvases, (0, 2, 3, 1)) | ||
| sample_fps = float(codec_output.fps) | ||
| return canvases, sample_fps, codec_output | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kcz358 can we extract an abstract class, and name it to something like codec native input.
also for processor, ov2 is a specification in both data and processor.