diff --git a/docs/cli/backends.rst b/docs/cli/backends.rst index 8df76a58..2e28102b 100644 --- a/docs/cli/backends.rst +++ b/docs/cli/backends.rst @@ -21,6 +21,8 @@ It is mostly reliable and fast, although can occasionally run into issues proces The OpenCV backend also supports image sequences as inputs (e.g. ``frame%02d.jpg`` if you want to load frame001.jpg, frame002.jpg, frame003.jpg...). Make sure to specify the framerate manually (``-f``/``--framerate``) to ensure accurate timing calculations. +Variable framerate (VFR) video is supported. Scene detection uses PTS-derived timestamps from ``CAP_PROP_POS_MSEC`` for accurate timecodes. Seeking compensates for OpenCV's average-fps-based internal seek approximation, so output timecodes remain accurate across the full video. + ======================================================================= PyAV @@ -28,6 +30,8 @@ PyAV The `PyAV `_ backend (`av package `_) is a more robust backend that handles multiple audio tracks and frame decode errors gracefully. +Variable framerate (VFR) video is fully supported. PyAV uses native PTS timestamps directly from the container, giving the most accurate timecodes for VFR content. + This backend can be used by specifying ``-b pyav`` via command line, or setting ``backend = pyav`` under the ``[global]`` section of your :ref:`config file `. @@ -41,4 +45,6 @@ MoviePy launches ffmpeg as a subprocess, and can be used with various types of i The MoviePy backend is still under development and is not included with current Windows distribution. To enable MoviePy support, you must install PySceneDetect using `python` and `pip`. + Variable framerate (VFR) video is **not supported**. MoviePy assumes a fixed framerate, so timecodes for VFR content will be inaccurate. Use the PyAV or OpenCV backend instead. + This backend can be used by specifying ``-b moviepy`` via command line, or setting ``backend = moviepy`` under the ``[global]`` section of your :ref:`config file `. diff --git a/scenedetect/_cli/commands.py b/scenedetect/_cli/commands.py index 6813caa8..0003f30e 100644 --- a/scenedetect/_cli/commands.py +++ b/scenedetect/_cli/commands.py @@ -401,17 +401,19 @@ def _save_xml_fcp( sequence = ElementTree.SubElement(project, "sequence") ElementTree.SubElement(sequence, "name").text = context.video_stream.name + fps = float(context.video_stream.frame_rate) + ntsc = "True" if context.video_stream.frame_rate.denominator != 1 else "False" duration = scenes[-1][1] - scenes[0][0] - ElementTree.SubElement(sequence, "duration").text = f"{duration.frame_num}" + ElementTree.SubElement(sequence, "duration").text = str(round(duration.seconds * fps)) rate = ElementTree.SubElement(sequence, "rate") - ElementTree.SubElement(rate, "timebase").text = str(context.video_stream.frame_rate) - ElementTree.SubElement(rate, "ntsc").text = "False" + ElementTree.SubElement(rate, "timebase").text = str(round(fps)) + ElementTree.SubElement(rate, "ntsc").text = ntsc timecode = ElementTree.SubElement(sequence, "timecode") tc_rate = ElementTree.SubElement(timecode, "rate") - ElementTree.SubElement(tc_rate, "timebase").text = str(context.video_stream.frame_rate) - ElementTree.SubElement(tc_rate, "ntsc").text = "False" + ElementTree.SubElement(tc_rate, "timebase").text = str(round(fps)) + ElementTree.SubElement(tc_rate, "ntsc").text = ntsc ElementTree.SubElement(timecode, "frame").text = "0" ElementTree.SubElement(timecode, "displayformat").text = "NDF" @@ -427,13 +429,13 @@ def _save_xml_fcp( ElementTree.SubElement(clip, "name").text = f"Shot {i + 1}" ElementTree.SubElement(clip, "enabled").text = "TRUE" ElementTree.SubElement(clip, "rate").append( - ElementTree.fromstring(f"{context.video_stream.frame_rate}") + ElementTree.fromstring(f"{round(fps)}") ) - # TODO: Are these supposed to be frame numbers or another format? - ElementTree.SubElement(clip, "start").text = str(start.frame_num) - ElementTree.SubElement(clip, "end").text = str(end.frame_num) - ElementTree.SubElement(clip, "in").text = str(start.frame_num) - ElementTree.SubElement(clip, "out").text = str(end.frame_num) + # Frame numbers relative to the declared fps, computed from PTS seconds. + ElementTree.SubElement(clip, "start").text = str(round(start.seconds * fps)) + ElementTree.SubElement(clip, "end").text = str(round(end.seconds * fps)) + ElementTree.SubElement(clip, "in").text = str(round(start.seconds * fps)) + ElementTree.SubElement(clip, "out").text = str(round(end.seconds * fps)) file_ref = ElementTree.SubElement(clip, "file", id=f"file{i + 1}") ElementTree.SubElement(file_ref, "name").text = context.video_stream.name @@ -485,6 +487,9 @@ def save_xml( logger.error(f"Unknown format: {format}") +# TODO: We have to export framerate as a float for OTIO's current format. When OTIO supports +# fractional timecodes, we should export the framerate as a rational number instead. +# https://github.com/AcademySoftwareFoundation/OpenTimelineIO/issues/190 def save_otio( context: CliContext, scenes: SceneList, @@ -501,7 +506,7 @@ def save_otio( video_name = context.video_stream.name video_path = os.path.abspath(context.video_stream.path) video_base_name = os.path.basename(context.video_stream.path) - frame_rate = context.video_stream.frame_rate + frame_rate = float(context.video_stream.frame_rate) # List of track mapping to resource type. # TODO(https://scenedetect.com/issues/497): Allow OTIO export without an audio track. @@ -534,12 +539,12 @@ def save_otio( "duration": { "OTIO_SCHEMA": "RationalTime.1", "rate": frame_rate, - "value": float((end - start).frame_num), + "value": round((end - start).seconds * frame_rate, 6), }, "start_time": { "OTIO_SCHEMA": "RationalTime.1", "rate": frame_rate, - "value": float(start.frame_num), + "value": round(start.seconds * frame_rate, 6), }, }, "enabled": True, diff --git a/scenedetect/backends/moviepy.py b/scenedetect/backends/moviepy.py index 14758952..6624cdbe 100644 --- a/scenedetect/backends/moviepy.py +++ b/scenedetect/backends/moviepy.py @@ -17,6 +17,7 @@ """ import typing as ty +from fractions import Fraction from logging import getLogger import cv2 @@ -24,7 +25,7 @@ from moviepy.video.io.ffmpeg_reader import FFMPEG_VideoReader from scenedetect.backends.opencv import VideoStreamCv2 -from scenedetect.common import _USE_PTS_IN_DEVELOPMENT, FrameTimecode +from scenedetect.common import FrameTimecode, Timecode, framerate_to_fraction from scenedetect.platform import get_file_name from scenedetect.video_stream import SeekError, VideoOpenFailure, VideoStream @@ -83,9 +84,9 @@ def __init__( """Unique name used to identify this backend.""" @property - def frame_rate(self) -> float: - """Framerate in frames/sec.""" - return self._reader.fps + def frame_rate(self) -> Fraction: + """Framerate in frames/sec as a rational Fraction.""" + return framerate_to_fraction(self._reader.fps) @property def path(self) -> ty.Union[bytes, str]: @@ -135,7 +136,14 @@ def position(self) -> FrameTimecode: calling `read`. This will always return 0 (e.g. be equal to `base_timecode`) if no frames have been `read` yet.""" frame_number = max(self._frame_number - 1, 0) - return FrameTimecode(frame_number, self.frame_rate) + # Synthesize a Timecode from the frame count and rational framerate. + # MoviePy assumes CFR, so this is equivalent to frame-based timing. + # Use the framerate denominator as the time_base denominator for exact timing. + fps = self.frame_rate + time_base = Fraction(1, fps.numerator) + pts = frame_number * fps.denominator + timecode = Timecode(pts=pts, time_base=time_base) + return FrameTimecode(timecode=timecode, fps=fps) @property def position_ms(self) -> float: @@ -173,10 +181,6 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]): ValueError: `target` is not a valid value (i.e. it is negative). """ success = False - if _USE_PTS_IN_DEVELOPMENT: - # TODO(https://scenedetect.com/issue/168): Need to handle PTS here. - raise NotImplementedError() - if not isinstance(target, FrameTimecode): target = FrameTimecode(target, self.frame_rate) try: diff --git a/scenedetect/backends/opencv.py b/scenedetect/backends/opencv.py index 298f0301..5401119a 100644 --- a/scenedetect/backends/opencv.py +++ b/scenedetect/backends/opencv.py @@ -27,7 +27,7 @@ import cv2 import numpy as np -from scenedetect.common import _USE_PTS_IN_DEVELOPMENT, MAX_FPS_DELTA, FrameTimecode, Timecode +from scenedetect.common import MAX_FPS_DELTA, FrameTimecode, Timecode, framerate_to_fraction from scenedetect.platform import get_file_name from scenedetect.video_stream import ( FrameRateUnavailable, @@ -111,7 +111,7 @@ def __init__( self._cap: ty.Optional[cv2.VideoCapture] = ( None # Reference to underlying cv2.VideoCapture object. ) - self._frame_rate: ty.Optional[float] = None + self._frame_rate: ty.Optional[Fraction] = None # VideoCapture state self._has_grabbed = False @@ -144,7 +144,7 @@ def capture(self) -> cv2.VideoCapture: """Unique name used to identify this backend.""" @property - def frame_rate(self) -> float: + def frame_rate(self) -> Fraction: assert self._frame_rate return self._frame_rate @@ -196,30 +196,25 @@ def aspect_ratio(self) -> float: @property def timecode(self) -> Timecode: - """Current position within stream as a Timecode. This is not frame accurate.""" + """Current position within stream as a Timecode.""" # *NOTE*: Although OpenCV has `CAP_PROP_PTS`, it doesn't seem to be reliable. For now, we - # use `CAP_PROP_POS_MSEC` instead, with a time base of 1/1000. Unfortunately this means that - # rounding errors will affect frame accuracy with this backend. - pts = self._cap.get(cv2.CAP_PROP_POS_MSEC) - time_base = Fraction(1, 1000) - return Timecode(pts=round(pts), time_base=time_base) + # use `CAP_PROP_POS_MSEC` instead, converting to microseconds for sufficient precision to + # avoid frame-boundary rounding errors at common framerates like 24000/1001. + ms = self._cap.get(cv2.CAP_PROP_POS_MSEC) + time_base = Fraction(1, 1000000) + return Timecode(pts=round(ms * 1000), time_base=time_base) @property def position(self) -> FrameTimecode: - # TODO(https://scenedetect.com/issue/168): See if there is a better way to do this, or - # add a config option before landing this. - if _USE_PTS_IN_DEVELOPMENT: - timecode = self.timecode - # If PTS is 0 but we've read frames, derive from frame number. - # This handles image sequences and cases where CAP_PROP_POS_MSEC is unreliable. - if timecode.pts == 0 and self.frame_number > 0: - time_sec = (self.frame_number - 1) / self.frame_rate - pts = round(time_sec * 1000) - timecode = Timecode(pts=pts, time_base=Fraction(1, 1000)) - return FrameTimecode(timecode=timecode, fps=self.frame_rate) - if self.frame_number < 1: - return self.base_timecode - return self.base_timecode + (self.frame_number - 1) + timecode = self.timecode + # If PTS is 0 but we've read frames, derive from frame number. + # This handles image sequences and cases where CAP_PROP_POS_MSEC is unreliable. + if timecode.pts == 0 and self.frame_number > 0: + fps = self.frame_rate + time_base = Fraction(1, fps.numerator) + pts = (self.frame_number - 1) * fps.denominator + timecode = Timecode(pts=pts, time_base=time_base) + return FrameTimecode(timecode=timecode, fps=self.frame_rate) @property def position_ms(self) -> float: @@ -235,23 +230,32 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]): if target < 0: raise ValueError("Target seek position cannot be negative!") - # TODO(https://scenedetect.com/issue/168): Shouldn't use frames for VFR video here. - # Have to seek one behind and call grab() after to that the VideoCapture - # returns a valid timestamp when using CAP_PROP_POS_MSEC. - target_frame_cv2 = (self.base_timecode + target).frame_num - if target_frame_cv2 > 0: - target_frame_cv2 -= 1 - self._cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_cv2) + target_secs = (self.base_timecode + target).seconds self._has_grabbed = False - # Preemptively grab the frame behind the target position if possible. - if target > 0: + if target_secs > 0: + # Seek one frame before target so the next read() returns the frame at target. + one_frame_ms = 1000.0 / float(self._frame_rate) + seek_ms = max(0.0, target_secs * 1000.0 - one_frame_ms) + self._cap.set(cv2.CAP_PROP_POS_MSEC, seek_ms) self._has_grabbed = self._cap.grab() - # If we seeked past the end of the video, need to seek one frame backwards - # from the current position and grab that frame instead. + if self._has_grabbed: + # VFR correction: set(CAP_PROP_POS_MSEC) converts time using avg_fps internally, + # which can land ~1s too early for VFR video. Read forward until we reach the + # intended position. The threshold (2x one_frame_ms) never triggers for CFR. + actual_ms = self._cap.get(cv2.CAP_PROP_POS_MSEC) + corrections = 0 + while actual_ms < seek_ms - 2.0 * one_frame_ms and corrections < 100: + if not self._cap.grab(): + break + actual_ms = self._cap.get(cv2.CAP_PROP_POS_MSEC) + corrections += 1 + # If we seeked past the end, back up one frame. if not self._has_grabbed: seek_pos = round(self._cap.get(cv2.CAP_PROP_POS_FRAMES) - 1.0) self._cap.set(cv2.CAP_PROP_POS_FRAMES, max(0, seek_pos)) self._has_grabbed = self._cap.grab() + else: + self._cap.set(cv2.CAP_PROP_POS_FRAMES, 0) def reset(self): """Close and re-open the VideoStream (should be equivalent to calling `seek(0)`).""" @@ -329,14 +333,11 @@ def _open_capture(self, framerate: ty.Optional[float] = None): raise FrameRateUnavailable() self._cap = cap - self._frame_rate = framerate + self._frame_rate = framerate_to_fraction(framerate) self._has_grabbed = False cap.set(cv2.CAP_PROP_ORIENTATION_AUTO, 1.0) # https://github.com/opencv/opencv/issues/26795 -# TODO(https://scenedetect.com/issues/168): Support non-monotonic timing for `position`. VFR timecode -# support is a prerequisite for this. Timecodes are currently calculated by multiplying the -# framerate by number of frames. Actual elapsed time can be obtained via `position_ms` for now. class VideoCaptureAdapter(VideoStream): """Adapter for existing VideoCapture objects. Unlike VideoStreamCv2, this class supports VideoCaptures which may not support seeking. @@ -378,7 +379,7 @@ def __init__( raise FrameRateUnavailable() self._cap = cap - self._frame_rate: float = framerate + self._frame_rate: Fraction = framerate_to_fraction(framerate) self._num_frames = 0 self._max_read_attempts = max_read_attempts self._decode_failures = 0 @@ -408,7 +409,7 @@ def capture(self) -> cv2.VideoCapture: """Unique name used to identify this backend.""" @property - def frame_rate(self) -> float: + def frame_rate(self) -> Fraction: """Framerate in frames/sec.""" assert self._frame_rate return self._frame_rate @@ -439,8 +440,6 @@ def frame_size(self) -> ty.Tuple[int, int]: @property def duration(self) -> ty.Optional[FrameTimecode]: """Duration of the stream as a FrameTimecode, or None if non terminating.""" - # TODO(https://scenedetect.com/issue/168): This will be incorrect for VFR. See if there is - # another property we can use to estimate the video length correctly. frame_count = math.trunc(self._cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count > 0: return self.base_timecode + frame_count @@ -455,7 +454,12 @@ def aspect_ratio(self) -> float: def position(self) -> FrameTimecode: if self.frame_number < 1: return self.base_timecode - return self.base_timecode + (self.frame_number - 1) + # Synthesize a Timecode from frame count and rational framerate. + fps = self.frame_rate + time_base = Fraction(1, fps.numerator) + pts = (self.frame_number - 1) * fps.denominator + timecode = Timecode(pts=pts, time_base=time_base) + return FrameTimecode(timecode=timecode, fps=fps) @property def position_ms(self) -> float: diff --git a/scenedetect/backends/pyav.py b/scenedetect/backends/pyav.py index 8692cdb5..a1ade9b4 100644 --- a/scenedetect/backends/pyav.py +++ b/scenedetect/backends/pyav.py @@ -18,7 +18,7 @@ import av import numpy as np -from scenedetect.common import _USE_PTS_IN_DEVELOPMENT, MAX_FPS_DELTA, FrameTimecode, Timecode +from scenedetect.common import MAX_FPS_DELTA, FrameTimecode, Timecode from scenedetect.platform import get_file_name from scenedetect.video_stream import FrameRateUnavailable, VideoOpenFailure, VideoStream @@ -81,6 +81,8 @@ def __init__( self._name = "" if name is None else name self._path = "" self._frame: ty.Optional[av.VideoFrame] = None + self._decoder: ty.Optional[ty.Generator] = None + self._decode_count: int = 0 self._reopened = True if threading_mode: @@ -172,8 +174,8 @@ def duration(self) -> FrameTimecode: return self.base_timecode + self._duration_frames @property - def frame_rate(self) -> float: - """Frame rate in frames/sec.""" + def frame_rate(self) -> Fraction: + """Frame rate in frames/sec as a rational Fraction.""" return self._frame_rate @property @@ -184,10 +186,8 @@ def position(self) -> FrameTimecode: to the presentation time 0. Returns 0 even if `frame_number` is 1.""" if self._frame is None: return self.base_timecode - if _USE_PTS_IN_DEVELOPMENT: - timecode = Timecode(pts=self._frame.pts, time_base=self._frame.time_base) - return FrameTimecode(timecode=timecode, fps=self.frame_rate) - return FrameTimecode(round(self._frame.time * self.frame_rate), self.frame_rate) + timecode = Timecode(pts=self._frame.pts, time_base=self._frame.time_base) + return FrameTimecode(timecode=timecode, fps=self.frame_rate) @property def position_ms(self) -> float: @@ -199,16 +199,13 @@ def position_ms(self) -> float: @property def frame_number(self) -> int: - """Current position within stream as the frame number. + """Current position within stream as the frame number (CFR-equivalent). - Will return 0 until the first frame is `read`.""" - - if self._frame: - if _USE_PTS_IN_DEVELOPMENT: - # frame_number is 1-indexed, so add 1 to the 0-based frame position. - return round(self._frame.time * self.frame_rate) + 1 - return self.position.frame_num + 1 - return 0 + Will return 0 until the first frame is `read`. For VFR video this is an approximation + derived from PTS × framerate; use `position` for accurate PTS-based timing.""" + if self._frame is None: + return 0 + return round(self._frame.time * float(self.frame_rate)) + 1 @property def rate(self) -> Fraction: @@ -258,7 +255,6 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]) -> None: raise ValueError("Target cannot be negative!") beginning = target == 0 - # TODO(https://scenedetect.com/issues/168): This breaks with PTS mode enabled. target = self.base_timecode + target if target >= 1: target = target - 1 @@ -266,6 +262,8 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]) -> None: (self.base_timecode + target).seconds / self._video_stream.time_base ) self._frame = None + self._decoder = None + self._decode_count = 0 self._container.seek(target_pts, stream=self._video_stream) if not beginning: self.read(decode=False) @@ -277,15 +275,23 @@ def reset(self): """Close and re-open the VideoStream (should be equivalent to calling `seek(0)`).""" self._container.close() self._frame = None + self._decoder = None + self._decode_count = 0 try: self._container = av.open(self._path if self._path else self._io) except Exception as ex: raise VideoOpenFailure() from ex def read(self, decode: bool = True) -> ty.Union[np.ndarray, bool]: + # Reuse a persistent decoder generator so the codec's internal frame buffer (used for + # B-frame reordering) is never flushed prematurely. Creating a new generator each call + # caused the last buffered frame to be lost at EOF. + if self._decoder is None: + self._decoder = self._container.decode(video=0) try: last_frame = self._frame - self._frame = next(self._container.decode(video=0)) + self._frame = next(self._decoder) + self._decode_count += 1 except av.error.EOFError: self._frame = last_frame if self._handle_eof(): @@ -350,7 +356,7 @@ def _handle_eof(self): # Don't re-open the video if we can't seek or aren't in AUTO/FRAME thread_type mode. if not self.is_seekable or self._video_stream.thread_type not in ("AUTO", "FRAME"): return False - last_frame = self.frame_number + last_pos_secs = self.position.seconds orig_pos = self._io.tell() try: self._io.seek(0) @@ -360,5 +366,6 @@ def _handle_eof(self): raise self._container.close() self._container = container - self.seek(last_frame) + self._decoder = None + self.seek(last_pos_secs) return True diff --git a/scenedetect/common.py b/scenedetect/common.py index e4ab7e48..545ef7df 100644 --- a/scenedetect/common.py +++ b/scenedetect/common.py @@ -70,10 +70,6 @@ import cv2 -# TODO(https://scenedetect.com/issue/168): Ensure both CFR and VFR videos work as intended with this -# flag enabled. When this feature is stable, we can then work on a roll-out plan. -_USE_PTS_IN_DEVELOPMENT = False - ## ## Type Aliases ## @@ -99,6 +95,34 @@ _SECONDS_PER_HOUR = 60.0 * _SECONDS_PER_MINUTE _MINUTES_PER_HOUR = 60.0 +# Common framerates mapped from their float representation to exact rational values. +_COMMON_FRAMERATES: ty.Dict[Fraction, Fraction] = { + Fraction(24000, 1001): Fraction(24000, 1001), # 23.976... + Fraction(30000, 1001): Fraction(30000, 1001), # 29.97... + Fraction(60000, 1001): Fraction(60000, 1001), # 59.94... + Fraction(120000, 1001): Fraction(120000, 1001), # 119.88... +} + + +def framerate_to_fraction(fps: float) -> Fraction: + """Convert a float framerate to an exact rational Fraction. + + Recognizes common NTSC framerates (23.976, 29.97, 59.94, 119.88) and maps them to their + exact rational representation (e.g. 24000/1001). For other values, uses limit_denominator + to find a clean rational approximation, or returns the exact integer fraction for whole + number framerates. + """ + if fps <= MAX_FPS_DELTA: + raise ValueError("Framerate must be positive and greater than zero.") + # Integer framerates are exact. + if fps == int(fps): + return Fraction(int(fps), 1) + # Check against known common framerates using limit_denominator to find the closest match. + candidate = Fraction(fps).limit_denominator(10000) + if candidate in _COMMON_FRAMERATES: + return _COMMON_FRAMERATES[candidate] + return candidate + class Interpolation(Enum): """Interpolation method used for image resizing. Based on constants defined in OpenCV.""" @@ -115,24 +139,6 @@ class Interpolation(Enum): """Lanczos interpolation over 8x8 neighborhood.""" -# TODO(@Breakthrough): How should we deal with frame numbers when we have a `Timecode`? -# -# Each backend has slight nuances we have to take into account: -# - PyAV: Does not include a position in frames, we can probably estimate it. Need to also compare -# with how OpenCV handles this. It also seems to fail to decode the last frame. This library -# provides the most accurate timing information however. -# - OpenCV: Lacks any kind of timebase, only provides position in milliseconds and as frames. -# This is probably sufficient, since we could just use 1ms as a timebase. -# - MoviePy: Assumes fixed framerate and doesn't include timing information. Fixing this is -# probably not feasible, so we should make sure the docs warn users about this. -# -# In the meantime, having backends provide accurate timing information is controlled by a hard-coded -# constant `_USE_PTS_IN_DEVELOPMENT` in each backend implementation that supports it. It still does -# not work correctly however, as we have to modify detectors themselves to work with FrameTimecode -# objects instead of integer frame numbers like they do now. -# -# We might be able to avoid changing the detector interface if we just have them work directly with -# PTS and convert them back to FrameTimecodes with the same time base. @dataclass(frozen=True) class Timecode: """Timing information associated with a given frame.""" @@ -242,16 +248,9 @@ def __init__( @property def frame_num(self) -> ty.Optional[int]: - """The frame number. This value will be an estimate if the video is VFR. Prefer using the - `pts` property.""" + """The frame number. For VFR video or Timecode-backed objects, this is an approximation + based on the average framerate. Prefer using `pts` and `time_base` for precise timing.""" if isinstance(self._time, Timecode): - # We need to audit anything currently using this property to guarantee temporal - # consistency when handling VFR videos (i.e. no assumptions on fixed frame rate). - warnings.warn( - message="TODO(https://scenedetect.com/issue/168): Update caller to handle VFR.", - stacklevel=2, - category=UserWarning, - ) # Calculate approximate frame number from seconds and framerate. if self._rate is not None: return round(self._time.seconds * float(self._rate)) @@ -312,7 +311,6 @@ def get_framerate(self) -> float: ) return self.framerate - # TODO(https://scenedetect.com/issue/168): Figure out how to deal with VFR here. def equal_framerate(self, fps) -> bool: """Equal Framerate: Determines if the passed framerate is equal to that of this object. @@ -323,8 +321,8 @@ def equal_framerate(self, fps) -> bool: bool: True if passed fps matches the FrameTimecode object's framerate, False otherwise. """ - # TODO(https://scenedetect.com/issue/168): Support this comparison in the case FPS is not - # set but a timecode is. + if self.framerate is None: + return False return math.fabs(self.framerate - fps) < MAX_FPS_DELTA @property @@ -377,7 +375,10 @@ def get_timecode( str: The current time in the form ``"HH:MM:SS[.nnn]"``. """ # Compute hours and minutes based off of seconds, and update seconds. - if nearest_frame and self.framerate: + # For PTS-backed timecodes, the PTS already represents an exact frame boundary, so we use + # `seconds` directly. For non-PTS timecodes, `nearest_frame` snaps to the nearest frame + # boundary using frame_num, which avoids floating point drift in CFR video display. + if nearest_frame and self.framerate and not isinstance(self._time, Timecode): secs = self.frame_num / self.framerate else: secs = self.seconds @@ -566,12 +567,17 @@ def __iadd__(self, other: ty.Union[int, float, str, "FrameTimecode"]) -> "FrameT other_is_timecode = isinstance(other, FrameTimecode) and isinstance(other._time, Timecode) if isinstance(self._time, Timecode) and other_is_timecode: - if self._time.time_base != other._time.time_base: - raise ValueError("timecodes have different time bases") - self._time = Timecode( - pts=max(0, self._time.pts + other._time.pts), - time_base=self._time.time_base, - ) + if self._time.time_base == other._time.time_base: + self._time = Timecode( + pts=max(0, self._time.pts + other._time.pts), + time_base=self._time.time_base, + ) + return self + # Different time bases: use the finer (smaller) one for better precision. + time_base = min(self._time.time_base, other._time.time_base) + self_pts = round(Fraction(self._time.pts) * self._time.time_base / time_base) + other_pts = round(Fraction(other._time.pts) * other._time.time_base / time_base) + self._time = Timecode(pts=max(0, self_pts + other_pts), time_base=time_base) return self # If either input is a timecode, the output shall also be one. The input which isn't a @@ -613,12 +619,17 @@ def __isub__(self, other: ty.Union[int, float, str, "FrameTimecode"]) -> "FrameT other_is_timecode = isinstance(other, FrameTimecode) and isinstance(other._time, Timecode) if isinstance(self._time, Timecode) and other_is_timecode: - if self._time.time_base != other._time.time_base: - raise ValueError("timecodes have different time bases") - self._time = Timecode( - pts=max(0, self._time.pts - other._time.pts), - time_base=self._time.time_base, - ) + if self._time.time_base == other._time.time_base: + self._time = Timecode( + pts=max(0, self._time.pts - other._time.pts), + time_base=self._time.time_base, + ) + return self + # Different time bases: use the finer (smaller) one for better precision. + time_base = min(self._time.time_base, other._time.time_base) + self_pts = round(Fraction(self._time.pts) * self._time.time_base / time_base) + other_pts = round(Fraction(other._time.pts) * other._time.time_base / time_base) + self._time = Timecode(pts=max(0, self_pts - other_pts), time_base=time_base) return self # If either input is a timecode, the output shall also be one. The input which isn't a diff --git a/scenedetect/detector.py b/scenedetect/detector.py index e7d9e731..7c3e1b70 100644 --- a/scenedetect/detector.py +++ b/scenedetect/detector.py @@ -122,6 +122,7 @@ def __init__(self, mode: Mode, length: int): """ self._mode = mode self._filter_length = length # Number of frames to use for activating the filter. + self._filter_secs: ty.Optional[float] = None # Threshold in seconds, computed on first use. self._last_above = None # Last frame above threshold. self._merge_enabled = False # Used to disable merging until at least one cut was found. self._merge_triggered = False # True when the merge filter is active. @@ -143,9 +144,12 @@ def filter(self, timecode: FrameTimecode, above_threshold: bool) -> ty.List[Fram raise RuntimeError("Unhandled FlashFilter mode.") def _filter_suppress(self, timecode: FrameTimecode, above_threshold: bool) -> ty.List[int]: - framerate = timecode.framerate - assert framerate >= 0 - min_length_met: bool = (timecode - self._last_above) >= (self._filter_length / framerate) + assert timecode.framerate >= 0 + # Compute the threshold in seconds once from the first frame's framerate. This avoids + # using an incorrect average fps (e.g. OpenCV on VFR video) on subsequent frames. + if self._filter_secs is None: + self._filter_secs = self._filter_length / timecode.framerate + min_length_met: bool = (timecode - self._last_above) >= self._filter_secs if not (above_threshold and min_length_met): return [] # Both length and threshold requirements were satisfied. Emit the cut, and wait until both @@ -154,16 +158,21 @@ def _filter_suppress(self, timecode: FrameTimecode, above_threshold: bool) -> ty return [timecode] def _filter_merge(self, timecode: FrameTimecode, above_threshold: bool) -> ty.List[int]: - framerate = timecode.framerate - assert framerate >= 0 - min_length_met: bool = (timecode - self._last_above) >= (self._filter_length / framerate) + assert timecode.framerate >= 0 + # Compute the threshold in seconds once from the first frame's framerate. + if self._filter_secs is None: + self._filter_secs = self._filter_length / timecode.framerate + min_length_met: bool = (timecode - self._last_above) >= self._filter_secs # Ensure last frame is always advanced to the most recent one that was above the threshold. if above_threshold: self._last_above = timecode if self._merge_triggered: # This frame was under the threshold, see if enough frames passed to disable the filter. - num_merged_frames = self._last_above - self._merge_start - if min_length_met and not above_threshold and num_merged_frames >= self._filter_length: + if ( + min_length_met + and not above_threshold + and (self._last_above - self._merge_start) >= self._filter_secs + ): self._merge_triggered = False return [self._last_above] # Keep merging until enough frames pass below the threshold. diff --git a/scenedetect/detectors/threshold_detector.py b/scenedetect/detectors/threshold_detector.py index 2cfe05b8..8d28cd62 100644 --- a/scenedetect/detectors/threshold_detector.py +++ b/scenedetect/detectors/threshold_detector.py @@ -88,7 +88,7 @@ def __init__( self.add_final_scene = add_final_scene # Where the last fade (threshold crossing) was detected. self.last_fade = { - "frame": 0, # frame number where the last detected fade is + "frame": None, # FrameTimecode where the last detected fade is "type": None, # type of fade, can be either 'in' or 'out' } self._metric_keys = [ThresholdDetector.THRESHOLD_VALUE_KEY] @@ -100,40 +100,29 @@ def get_metrics(self) -> ty.List[str]: def process_frame( self, timecode: FrameTimecode, frame_img: numpy.ndarray ) -> ty.List[FrameTimecode]: - """Process the next frame. `frame_num` is assumed to be sequential. + """Process the next frame. Args: - frame_num (int): Frame number of frame that is being passed. Can start from any value - but must remain sequential. - frame_img (numpy.ndarray or None): Video frame corresponding to `frame_img`. + timecode: FrameTimecode of the current frame position. + frame_img (numpy.ndarray or None): Video frame corresponding to `timecode`. Returns: - ty.List[int]: List of frames where scene cuts have been detected. There may be 0 - or more frames in the list, and not necessarily the same as frame_num. + List of FrameTimecodes where scene cuts have been detected. """ - # TODO(https://scenedetect.com/issue/168): We need to consider PTS here instead. The methods below using frame numbers - # won't work for variable framerates. - frame_num = timecode.frame_num - # Initialize last scene cut point at the beginning of the frames of interest. if self.last_scene_cut is None: - self.last_scene_cut = frame_num - - # Compare the # of pixels under threshold in current_frame & last_frame. - # If absolute value of pixel intensity delta is above the threshold, - # then we trigger a new scene cut/break. + self.last_scene_cut = timecode - # List of cuts to return. - cuts = [] + cuts: ty.List[FrameTimecode] = [] # The metric used here to detect scene breaks is the percent of pixels # less than or equal to the threshold; however, since this differs on # user-supplied values, we supply the average pixel intensity as this # frame metric instead (to assist with manually selecting a threshold) if (self.stats_manager is not None) and ( - self.stats_manager.metrics_exist(frame_num, self._metric_keys) + self.stats_manager.metrics_exist(timecode, self._metric_keys) ): - frame_avg = self.stats_manager.get_metrics(frame_num, self._metric_keys)[0] + frame_avg = self.stats_manager.get_metrics(timecode, self._metric_keys)[0] else: frame_avg = numpy.mean(frame_img) if self.stats_manager is not None: @@ -146,32 +135,31 @@ def process_frame( ): # Just faded out of a scene, wait for next fade in. self.last_fade["type"] = "out" - self.last_fade["frame"] = frame_num + self.last_fade["frame"] = timecode elif self.last_fade["type"] == "out" and ( (self.method == ThresholdDetector.Method.FLOOR and frame_avg >= self.threshold) or (self.method == ThresholdDetector.Method.CEILING and frame_avg < self.threshold) ): # Only add the scene if min_scene_len frames have passed. - if (frame_num - self.last_scene_cut) >= self.min_scene_len: + if (timecode - self.last_scene_cut) >= self.min_scene_len: # Just faded into a new scene, compute timecode for the scene # split based on the fade bias. f_out = self.last_fade["frame"] - f_split = int( - (frame_num + f_out + int(self.fade_bias * (frame_num - f_out))) / 2 - ) - cuts.append(f_split) - self.last_scene_cut = frame_num + duration = (timecode - f_out).seconds + split_seconds = f_out.seconds + (duration * (1.0 + self.fade_bias)) / 2.0 + cuts.append(FrameTimecode(split_seconds, fps=timecode)) + self.last_scene_cut = timecode self.last_fade["type"] = "in" - self.last_fade["frame"] = frame_num + self.last_fade["frame"] = timecode else: - self.last_fade["frame"] = 0 + self.last_fade["frame"] = timecode if frame_avg < self.threshold: self.last_fade["type"] = "out" else: self.last_fade["type"] = "in" self.processed_frame = True - return [FrameTimecode(cut, fps=timecode) for cut in cuts] + return cuts def post_process(self, timecode: FrameTimecode) -> ty.List[FrameTimecode]: """Writes a final scene cut if the last detected fade was a fade-out. @@ -185,14 +173,15 @@ def post_process(self, timecode: FrameTimecode) -> ty.List[FrameTimecode]: # If the last fade detected was a fade out, we add a corresponding new # scene break to indicate the end of the scene. This is only done for # fade-outs, as a scene cut is already added when a fade-in is found. - cuts = [] + cuts: ty.List[FrameTimecode] = [] if ( self.last_fade["type"] == "out" and self.add_final_scene + and self.last_fade["frame"] is not None and ( (self.last_scene_cut is None and timecode >= self.min_scene_len) or (timecode - self.last_scene_cut) >= self.min_scene_len ) ): cuts.append(self.last_fade["frame"]) - return [FrameTimecode(cut, fps=timecode) for cut in cuts] + return cuts diff --git a/scenedetect/output/image.py b/scenedetect/output/image.py index 3fa54b04..d5cf00de 100644 --- a/scenedetect/output/image.py +++ b/scenedetect/output/image.py @@ -290,48 +290,37 @@ def image_save_thread(self, save_queue: queue.Queue, progress_bar: tqdm): if progress_bar is not None: progress_bar.update(1) - def generate_timecode_list(self, scene_list: SceneList) -> ty.List[ty.Iterable[FrameTimecode]]: + def generate_timecode_list(self, scene_list: SceneList) -> ty.List[ty.List[FrameTimecode]]: """Generates a list of timecodes for each scene in `scene_list` based on the current config - parameters.""" - # TODO(v0.7): This needs to be fixed as part of PTS overhaul. + parameters. + + Uses PTS-accurate seconds-based timing so results are correct for both CFR and VFR video. + """ framerate = scene_list[0][0].framerate - # TODO(v1.0): Split up into multiple sub-expressions so auto-formatter works correctly. - return [ - ( - FrameTimecode(int(f), fps=framerate) - for f in ( - # middle frames - a[len(a) // 2] - if (0 < j < self._num_images - 1) or self._num_images == 1 - # first frame - else min(a[0] + self._frame_margin, a[-1]) - if j == 0 - # last frame - else max(a[-1] - self._frame_margin, a[0]) - # for each evenly-split array of frames in the scene list - for j, a in enumerate(np.array_split(r, self._num_images)) - ) - ) - for r in ( - # pad ranges to number of images - r - if 1 + r[-1] - r[0] >= self._num_images - else list(r) + [r[-1]] * (self._num_images - len(r)) - # create range of frames in scene - for r in ( - range( - start.frame_num, - start.frame_num - + max( - 1, # guard against zero length scenes - end.frame_num - start.frame_num, - ), - ) - # for each scene in scene list - for start, end in scene_list - ) - ) - ] + # Convert frame_margin to seconds using the nominal framerate. + margin_secs = self._frame_margin / framerate + result = [] + for start, end in scene_list: + duration_secs = (end - start).seconds + if duration_secs <= 0: + result.append([start] * self._num_images) + continue + segment_secs = duration_secs / self._num_images + timecodes = [] + for j in range(self._num_images): + seg_start = start.seconds + j * segment_secs + seg_end = start.seconds + (j + 1) * segment_secs + if self._num_images == 1: + t = start.seconds + duration_secs / 2.0 + elif j == 0: + t = min(seg_start + margin_secs, seg_end) + elif j == self._num_images - 1: + t = max(seg_end - margin_secs, seg_start) + else: + t = (seg_start + seg_end) / 2.0 + timecodes.append(FrameTimecode(t, fps=framerate)) + result.append(timecodes) + return result def resize_image( self, diff --git a/scenedetect/output/video.py b/scenedetect/output/video.py index 737cb92d..07b01c1b 100644 --- a/scenedetect/output/video.py +++ b/scenedetect/output/video.py @@ -125,7 +125,8 @@ class SceneMetadata: def default_formatter(template: str) -> PathFormatter: """Formats filenames using a template string which allows the following variables: - `$VIDEO_NAME`, `$SCENE_NUMBER`, `$START_TIME`, `$END_TIME`, `$START_FRAME`, `$END_FRAME` + `$VIDEO_NAME`, `$SCENE_NUMBER`, `$START_TIME`, `$END_TIME`, `$START_FRAME`, `$END_FRAME`, + `$START_PTS`, `$END_PTS` (presentation timestamp in milliseconds, accurate for VFR video) """ MIN_DIGITS = 3 format_scene_number: PathFormatter = lambda video, scene: ( @@ -139,6 +140,8 @@ def default_formatter(template: str) -> PathFormatter: END_TIME=str(scene.end.get_timecode().replace(":", ";")), START_FRAME=str(scene.start.frame_num), END_FRAME=str(scene.end.frame_num), + START_PTS=str(round(scene.start.seconds * 1000)), + END_PTS=str(round(scene.end.seconds * 1000)), ) return formatter diff --git a/scenedetect/video_stream.py b/scenedetect/video_stream.py index 26c2cefe..18a04b2c 100644 --- a/scenedetect/video_stream.py +++ b/scenedetect/video_stream.py @@ -124,8 +124,8 @@ def is_seekable(self) -> bool: @property @abstractmethod - def frame_rate(self) -> float: - """Frame rate in frames/sec.""" + def frame_rate(self) -> Fraction: + """Frame rate in frames/sec as a rational Fraction (e.g. Fraction(24000, 1001)).""" ... @property diff --git a/tests/conftest.py b/tests/conftest.py index 25bf517e..8cef4b0e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -114,6 +114,17 @@ def test_vfr_video() -> str: return check_exists("tests/resources/goldeneye-vfr.mp4") +@pytest.fixture +def test_vfr_drop3_video() -> str: + """Synthetic VFR video created from goldeneye.mp4 by dropping every 3rd frame. + + Frame pattern: keeps frames where (n+1) % 3 != 0 (i.e. drops frames 2,5,8,...). + Resulting PTS durations alternate: 1001, 2002, 1001, 2002, ... (time_base=1/24000). + Nominal fps: 24000/1001. Average fps: ~16 fps. Duration: ~10s, 160 frames. + """ + return check_exists("tests/resources/goldeneye-vfr-drop3.mp4") + + @pytest.fixture def corrupt_video_file() -> str: """Video containing a corrupted frame causing a decode failure.""" diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 00000000..4bdb6d7c --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,38 @@ +# +# PySceneDetect: Python-Based Video Scene Detector +# ------------------------------------------------------------------- +# [ Site: https://scenedetect.com ] +# [ Docs: https://scenedetect.com/docs/ ] +# [ Github: https://github.com/Breakthrough/PySceneDetect/ ] +# +# Copyright (C) 2014-2025 Brandon Castellano . +# PySceneDetect is licensed under the BSD 3-Clause License; see the +# included LICENSE file, or visit one of the above pages for details. +# +"""Shared test helpers.""" + +import typing as ty + +from click.testing import CliRunner + +from scenedetect._cli import scenedetect as _scenedetect_cli +from scenedetect._cli.context import CliContext +from scenedetect._cli.controller import run_scenedetect + + +def invoke_cli(args: ty.List[str], catch_exceptions: bool = False) -> ty.Tuple[int, str]: + """Invoke the scenedetect CLI in-process using Click's CliRunner. + + Replicates the two-step execution of ``__main__.py``: + + 1. ``scenedetect.main(obj=context)`` — parse args and register callbacks on ``CliContext`` + 2. ``run_scenedetect(context)`` — execute detection and output commands + + Returns ``(exit_code, output_text)``. + """ + context = CliContext() + runner = CliRunner() + result = runner.invoke(_scenedetect_cli, args, obj=context, catch_exceptions=catch_exceptions) + if result.exit_code == 0: + run_scenedetect(context) + return result.exit_code, result.output diff --git a/tests/test_api.py b/tests/test_api.py index e86243ad..853b7c32 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -142,3 +142,28 @@ def on_new_scene(frame_img: numpy.ndarray, frame_num: int): scene_manager = SceneManager() scene_manager.add_detector(ContentDetector()) scene_manager.detect_scenes(video=video, duration=total_frames, callback=on_new_scene) + + +# TODO(v0.8): Remove this test when these deprecated modules are removed from the codebase. +def test_deprecated_modules_emits_warning_on_import(): + import importlib + + import pytest + + SCENE_DETECTOR_WARNING = ( + "The `scene_detector` submodule is deprecated, import from the base package instead." + ) + with pytest.warns(DeprecationWarning, match=SCENE_DETECTOR_WARNING): + importlib.import_module("scenedetect.scene_detector") + + FRAME_TIMECODE_WARNING = ( + "The `frame_timecode` submodule is deprecated, import from the base package instead." + ) + with pytest.warns(DeprecationWarning, match=FRAME_TIMECODE_WARNING): + importlib.import_module("scenedetect.frame_timecode") + + VIDEO_SPLITTER_WARNING = ( + "The `video_splitter` submodule is deprecated, import from the base package instead." + ) + with pytest.warns(DeprecationWarning, match=VIDEO_SPLITTER_WARNING): + importlib.import_module("scenedetect.video_splitter") diff --git a/tests/test_cli.py b/tests/test_cli.py index 29fd2738..3f77e91f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,37 +12,36 @@ import os import subprocess -import typing as ty -from pathlib import Path - -import cv2 -import numpy as np -import pytest - -import scenedetect -from scenedetect.output import is_ffmpeg_available, is_mkvmerge_available # These tests validate that the CLI itself functions correctly, mainly based on the return # return code from the process. We do not yet check for correctness of the output, just a # successful invocation of the command (i.e. no exceptions/errors). - # TODO: Add some basic correctness tests to validate the output (just look for the # last expected log message or extract # of scenes). Might need to refactor the test cases # since we need to calculate the output file names for commands that write to disk. - # TODO: Define error/exit codes explicitly. Right now these tests only verify that the # exit code is zero or nonzero. - # TODO: These tests are very expensive since they spin up new Python interpreters. # Move most of these test cases (e.g. argument validation) to ones that interface directly # with the scenedetect._cli module. Click also supports unit testing directly, so we should # probably use that instead of spinning up new subprocesses for each run of the controller. # That will also allow splitting up the validation of argument parsing logic from the controller # logic by creating a CLI context with the desired parameters. - # TODO: Missing tests for --min-scene-len and --drop-short-scenes. +import sys +import typing as ty +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +import scenedetect +from scenedetect.output import is_ffmpeg_available, is_mkvmerge_available +from tests.helpers import invoke_cli + +SCENEDETECT_CMD = sys.executable + " -m scenedetect" -SCENEDETECT_CMD = "python -m scenedetect" ALL_DETECTORS = [ "detect-content", "detect-threshold", @@ -305,14 +304,22 @@ def test_cli_detector_with_stats(tmp_path, detector_command: str): def test_cli_list_scenes(tmp_path: Path): """Test `list-scenes` command.""" - # Regular invocation - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} list-scenes", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "list-scenes", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}-Scenes.csv") assert os.path.exists(output_path) EXPECTED_CSV_OUTPUT = """Timecode List:,00:00:03.754 @@ -744,13 +751,22 @@ def test_cli_load_scenes_round_trip(): def test_cli_save_edl(tmp_path: Path): """Test `save-edl` command.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-edl", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-edl", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}.edl") assert os.path.exists(output_path) EXPECTED_EDL_OUTPUT = f"""* CREATED WITH PYSCENEDETECT {scenedetect.__version__} @@ -765,13 +781,28 @@ def test_cli_save_edl(tmp_path: Path): def test_cli_save_edl_with_params(tmp_path: Path): """Test `save-edl` command but override the other options.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-edl -t title -r BX -f file_no_ext", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-edl", + "-t", + "title", + "-r", + "BX", + "-f", + "file_no_ext", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath("file_no_ext") assert os.path.exists(output_path) EXPECTED_EDL_OUTPUT = f"""* CREATED WITH PYSCENEDETECT {scenedetect.__version__} @@ -786,13 +817,22 @@ def test_cli_save_edl_with_params(tmp_path: Path): def test_cli_save_otio(tmp_path: Path): """Test `save-otio` command.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-otio", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-otio", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}.otio") assert os.path.exists(output_path) EXPECTED_OTIO_OUTPUT = """{ @@ -994,13 +1034,23 @@ def test_cli_save_otio(tmp_path: Path): def test_cli_save_otio_no_audio(tmp_path: Path): """Test `save-otio` command without audio.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-otio --no-audio", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-otio", + "--no-audio", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}.otio") assert os.path.exists(output_path) EXPECTED_OTIO_OUTPUT = """{ diff --git a/tests/test_detectors.py b/tests/test_detectors.py index 445112ee..0e5f4214 100644 --- a/tests/test_detectors.py +++ b/tests/test_detectors.py @@ -224,12 +224,3 @@ def test_detectors_with_stats(test_video_file): scene_manager.detect_scenes(video=video, end_time=end_time) scene_list = scene_manager.get_scene_list() assert len(scene_list) == initial_scene_len - - -# TODO(v0.8): Remove this test during the removal of `scenedetect.scene_detector`. -def test_deprecated_detector_module_emits_warning_on_import(): - SCENE_DETECTOR_WARNING = ( - "The `scene_detector` submodule is deprecated, import from the base package instead." - ) - with pytest.warns(DeprecationWarning, match=SCENE_DETECTOR_WARNING): - from scenedetect.scene_detector import SceneDetector as _ diff --git a/tests/test_output.py b/tests/test_output.py index bc1762e5..db3f2307 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -191,12 +191,3 @@ def test_save_images_zero_width_scene(test_video_file, tmp_path: Path): total_images += 1 assert total_images == len([path for path in tmp_path.glob(image_name_glob)]) - - -# TODO(v0.8): Remove this test during the removal of `scenedetect.video_splitter`. -def test_deprecated_output_modules_emits_warning_on_import(): - VIDEO_SPLITTER_WARNING = ( - "The `video_splitter` submodule is deprecated, import from the base package instead." - ) - with pytest.warns(DeprecationWarning, match=VIDEO_SPLITTER_WARNING): - from scenedetect.video_splitter import split_video_ffmpeg as _ diff --git a/tests/test_stats_manager.py b/tests/test_stats_manager.py index 0c32371e..03bef2c7 100644 --- a/tests/test_stats_manager.py +++ b/tests/test_stats_manager.py @@ -27,8 +27,6 @@ """ import csv -import os -import random from pathlib import Path import pytest diff --git a/tests/test_timecode.py b/tests/test_timecode.py index 4d6c0d21..f4296923 100644 --- a/tests/test_timecode.py +++ b/tests/test_timecode.py @@ -26,7 +26,7 @@ import pytest # Standard Library Imports -from scenedetect.common import MAX_FPS_DELTA, FrameTimecode +from scenedetect.common import MAX_FPS_DELTA, FrameTimecode, Timecode, framerate_to_fraction def test_framerate(): @@ -299,10 +299,43 @@ def test_precision(): assert FrameTimecode(990, fps).get_timecode(precision=0, use_rounding=False) == "00:00:00" -# TODO(v0.8): Remove this test during the removal of `scenedetect.scene_detector`. -def test_deprecated_timecode_module_emits_warning_on_import(): - FRAME_TIMECODE_WARNING = ( - "The `frame_timecode` submodule is deprecated, import from the base package instead." - ) - with pytest.warns(DeprecationWarning, match=FRAME_TIMECODE_WARNING): - from scenedetect.frame_timecode import FrameTimecode as _ +def test_rational_framerate_precision(): + """Rational framerates should round-trip frame/second conversions without drift.""" + fps = Fraction(24000, 1001) + # Verify that frame_num round-trips through seconds without drift over many frames. + for frame in [0, 1, 100, 1000, 10000, 100000]: + tc = FrameTimecode(frame, fps) + assert tc.frame_num == frame, f"Frame {frame} drifted to {tc.frame_num}" + + +def test_ntsc_framerate_detection(): + """Common NTSC framerates should be detected from float values.""" + assert framerate_to_fraction(23.976023976023978) == Fraction(24000, 1001) + assert framerate_to_fraction(29.97002997002997) == Fraction(30000, 1001) + assert framerate_to_fraction(59.94005994005994) == Fraction(60000, 1001) + assert framerate_to_fraction(24.0) == Fraction(24, 1) + assert framerate_to_fraction(30.0) == Fraction(30, 1) + assert framerate_to_fraction(60.0) == Fraction(60, 1) + assert framerate_to_fraction(25.0) == Fraction(25, 1) + + +def test_timecode_arithmetic_mixed_time_base(): + """Arithmetic with FrameTimecodes using different time_bases should work.""" + fps = Fraction(24000, 1001) + # Timecode with time_base 1/24000 (from PyAV) + tc_pyav = FrameTimecode(timecode=Timecode(pts=1001, time_base=Fraction(1, 24000)), fps=fps) + # Timecode with time_base 1/1000000 (from OpenCV microseconds) + tc_cv2 = FrameTimecode(timecode=Timecode(pts=41708, time_base=Fraction(1, 1000000)), fps=fps) + # Both represent approximately 1 frame duration. Addition/subtraction shouldn't raise. + result = tc_pyav + tc_cv2 + assert result.seconds > 0 + result = tc_pyav - tc_cv2 + assert result.seconds >= 0 # Clamped to 0 if negative + + +def test_timecode_frame_num_for_vfr(): + """frame_num should return approximate values for Timecode-backed objects without warning.""" + fps = Fraction(24000, 1001) + tc = FrameTimecode(timecode=Timecode(pts=1001, time_base=Fraction(1, 24000)), fps=fps) + # Should not raise or warn - just return the approximate frame number. + assert tc.frame_num == 1 diff --git a/tests/test_vfr.py b/tests/test_vfr.py new file mode 100644 index 00000000..09aab163 --- /dev/null +++ b/tests/test_vfr.py @@ -0,0 +1,401 @@ +# +# PySceneDetect: Python-Based Video Scene Detector +# ------------------------------------------------------------------- +# [ Site: https://scenedetect.com ] +# [ Docs: https://scenedetect.com/docs/ ] +# [ Github: https://github.com/Breakthrough/PySceneDetect/ ] +# +# Copyright (C) 2014-2025 Brandon Castellano . +# PySceneDetect is licensed under the BSD 3-Clause License; see the +# included LICENSE file, or visit one of the above pages for details. +# +"""Tests for VFR (Variable Frame Rate) video support.""" + +import csv +import json +import os +import typing as ty + +import cv2 +import numpy as np +import pytest + +from scenedetect import SceneManager, open_video +from scenedetect.common import Timecode +from scenedetect.detectors import ContentDetector +from scenedetect.output import save_images, write_scene_list +from scenedetect.stats_manager import StatsManager +from tests.helpers import invoke_cli + +# Expected scene cuts for `goldeneye-vfr.mp4` detected with ContentDetector() and end_time=10.0s. +# Entries are (start_timecode, end_timecode). All backends should agree on cut timecodes since +# CAP_PROP_POS_MSEC gives accurate PTS-derived timestamps. The last scene ends at the clip +# boundary (end_time) which may vary slightly between backends based on frame counting. +EXPECTED_SCENES_VFR: ty.List[ty.Tuple[str, str]] = [ + ("00:00:00.000", "00:00:03.921"), + ("00:00:03.921", "00:00:09.676"), +] + +# Expected scene cuts for `goldeneye-vfr-drop3.mp4` — a synthetic VFR clip created from the first +# 10s of goldeneye.mp4 by dropping every 3rd frame (frames 2,5,8,...). PTS durations alternate +# between 1001 and 2002 (time_base=1/24000), nominal fps=24000/1001, avg fps≈16. The last scene +# ends at the clip boundary and may vary slightly between backends. +EXPECTED_SCENES_VFR_DROP3: ty.List[ty.Tuple[str, str]] = [ + ("00:00:00.000", "00:00:03.754"), + ("00:00:03.754", "00:00:08.759"), +] + + +def _tc_to_secs(tc: str) -> float: + """Parse a HH:MM:SS.mmm timecode string to seconds.""" + h, m, rest = tc.split(":") + s, ms = rest.split(".") + return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 + + +def test_vfr_position_is_timecode(test_vfr_video: str): + """Position should be a Timecode-backed FrameTimecode.""" + video = open_video(test_vfr_video, backend="pyav") + assert video.read() is not False + assert isinstance(video.position._time, Timecode) + + +def test_vfr_position_monotonic_pyav(test_vfr_video: str): + """PTS-based position should be monotonically non-decreasing (PyAV).""" + video = open_video(test_vfr_video, backend="pyav") + last_seconds = -1.0 + frame_count = 0 + while True: + frame = video.read() + if frame is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count > 0 + + +def test_vfr_position_monotonic_opencv(test_vfr_video: str): + """PTS-based position should be monotonically non-decreasing (OpenCV).""" + video = open_video(test_vfr_video, backend="opencv") + last_seconds = -1.0 + frame_count = 0 + while True: + frame = video.read() + if frame is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count > 0 + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_scene_detection(test_vfr_video: str, backend: str): + """Scene detection on VFR video should produce timestamps matching known ground truth. + + Both PyAV (native PTS) and OpenCV (CAP_PROP_POS_MSEC) should agree on scene cuts since + both expose accurate PTS-derived timestamps. + """ + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, end_time=10.0) + scene_list = sm.get_scene_list() + + # The last scene ends at the clip boundary which may vary by backend; only check known cuts. + assert len(scene_list) >= len(EXPECTED_SCENES_VFR), ( + f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR)} scenes, got {len(scene_list)}" + ) + for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( + zip(scene_list, EXPECTED_SCENES_VFR, strict=False) + ): + assert start.get_timecode() == exp_start_tc, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" + ) + assert end.get_timecode() == exp_end_tc, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" + ) + + +def test_vfr_seek_pyav(test_vfr_video: str): + """Seeking should work with VFR video.""" + video = open_video(test_vfr_video, backend="pyav") + target_time = 2.0 # seconds + video.seek(target_time) + frame = video.read() + assert frame is not False + # Position should be close to target (within 1 second for keyframe-based seeking). + assert abs(video.position.seconds - target_time) < 1.0 + + +def test_vfr_stats_manager(test_vfr_video: str): + """StatsManager should work correctly with VFR video.""" + video = open_video(test_vfr_video, backend="pyav") + stats = StatsManager() + sm = SceneManager(stats_manager=stats) + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + assert len(sm.get_scene_list()) > 0 + + +def test_vfr_csv_output(test_vfr_video: str, tmp_path): + """CSV export should work correctly with VFR video.""" + from scenedetect.output import write_scene_list + + video = open_video(test_vfr_video, backend="pyav") + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + scene_list = sm.get_scene_list() + assert len(scene_list) > 0 + + csv_path = os.path.join(str(tmp_path), "scenes.csv") + with open(csv_path, "w", newline="") as f: + write_scene_list(f, scene_list) + + # Verify CSV contains valid data. + with open(csv_path, "r") as f: + reader = csv.reader(f) + rows = list(reader) + assert len(rows) >= 3 # 2 header rows + data + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_drop3_scene_detection(test_vfr_drop3_video: str, backend: str): + """Synthetic VFR video (drop every 3rd frame, alternating 1x/2x durations) should produce + timecodes matching known ground truth with both backends.""" + video = open_video(test_vfr_drop3_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, show_progress=False) + scene_list = sm.get_scene_list() + + assert len(scene_list) >= len(EXPECTED_SCENES_VFR_DROP3), ( + f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR_DROP3)} scenes, got {len(scene_list)}" + ) + for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( + zip(scene_list, EXPECTED_SCENES_VFR_DROP3, strict=False) + ): + assert start.get_timecode() == exp_start_tc, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" + ) + assert end.get_timecode() == exp_end_tc, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" + ) + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_drop3_position_monotonic(test_vfr_drop3_video: str, backend: str): + """PTS-based position should be monotonically non-decreasing on synthetic VFR video.""" + video = open_video(test_vfr_drop3_video, backend=backend) + last_seconds = -1.0 + frame_count = 0 + while True: + if video.read() is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"[{backend}] Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count == 160 # 2/3 of original 240 frames in 10s at 24000/1001 + + +def test_cfr_position_is_timecode(test_movie_clip: str): + """CFR video positions should also be Timecode-backed with PTS support.""" + video = open_video(test_movie_clip, backend="pyav") + assert video.read() is not False + assert isinstance(video.position._time, Timecode) + + +def test_cfr_frame_num_exact(test_movie_clip: str): + """For CFR video, frame_num should be exact (not approximate).""" + video = open_video(test_movie_clip, backend="pyav") + for expected_frame in range(1, 11): + assert video.read() is not False + assert video.position.frame_num == expected_frame - 1 + + +def test_vfr_save_images_opencv_matches_pyav(test_vfr_video: str, tmp_path): + """OpenCV save-images thumbnails should match PyAV thumbnails for all scenes. + + If the OpenCV seek off-by-one bug is present, scene thumbnails will show content from the + wrong scene; MSE against PyAV (ground truth) will be very high for those scenes. + """ + # Run save-images for both backends with 1 image per scene for simplicity. + scene_lists = {} + for backend in ("pyav", "opencv"): + out_dir = tmp_path / backend + out_dir.mkdir() + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + scene_lists[backend] = sm.get_scene_list() + assert len(scene_lists[backend]) > 0 + save_images(scene_lists[backend], video, num_images=1, output_dir=str(out_dir)) + + pyav_imgs = sorted((tmp_path / "pyav").glob("*.jpg")) + opencv_imgs = sorted((tmp_path / "opencv").glob("*.jpg")) + assert len(pyav_imgs) > 0 + assert len(pyav_imgs) == len(opencv_imgs), ( + f"Image count mismatch: pyav={len(pyav_imgs)}, opencv={len(opencv_imgs)}" + ) + + # Compare every corresponding thumbnail. Wrong-scene content produces very high MSE. + MAX_MSE = 5000 + for pyav_path, opencv_path in zip(pyav_imgs, opencv_imgs, strict=False): + img_pyav = cv2.imread(str(pyav_path)) + img_opencv = cv2.imread(str(opencv_path)) + assert img_pyav is not None, f"Failed to load {pyav_path}" + assert img_opencv is not None, f"Failed to load {opencv_path}" + if img_pyav.shape != img_opencv.shape: + # Resize opencv image to match pyav dimensions before comparing. + img_opencv = cv2.resize(img_opencv, (img_pyav.shape[1], img_pyav.shape[0])) + mse = float(np.mean((img_pyav.astype(np.float32) - img_opencv.astype(np.float32)) ** 2)) + assert mse < MAX_MSE, ( + f"Thumbnail mismatch for {pyav_path.name} vs {opencv_path.name}: MSE={mse:.0f}" + ) + + +# ------------------------------------------------------------------ +# Output format tests +# ------------------------------------------------------------------ + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_csv_accuracy(test_vfr_video: str, backend: str, tmp_path): + """CSV timecodes for VFR video should match known ground truth for both backends.""" + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, end_time=10.0) + scene_list = sm.get_scene_list() + assert len(scene_list) >= len(EXPECTED_SCENES_VFR) + + csv_path = tmp_path / "scenes.csv" + with open(csv_path, "w", newline="") as f: + write_scene_list(f, scene_list, include_cut_list=False) + + with open(csv_path) as f: + rows = list(csv.DictReader(f)) + + for i, (row, (exp_start, exp_end)) in enumerate(zip(rows, EXPECTED_SCENES_VFR, strict=False)): + assert row["Start Timecode"] == exp_start, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start!r}, got {row['Start Timecode']!r}" + ) + assert row["End Timecode"] == exp_end, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end!r}, got {row['End Timecode']!r}" + ) + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_otio_export(test_vfr_video: str, backend: str, tmp_path): + """OTIO export for VFR video should have no spurious float precision and correct timecodes. + + Regression test for the float precision bug where seconds * frame_rate could produce + values like 90.00000000000001 instead of 90.0 for CFR video. + """ + exit_code, _ = invoke_cli( + [ + "-i", + test_vfr_video, + "-b", + backend, + "-o", + str(tmp_path), + "detect-content", + "time", + "--end", + "10s", + "save-otio", + ] + ) + assert exit_code == 0 + + otio_path = next(tmp_path.glob("*.otio")) + data = json.loads(otio_path.read_text()) + frame_rate = data["global_start_time"]["rate"] + one_frame_secs = 1.0 / frame_rate + + clips = data["tracks"]["children"][0]["children"] + assert len(clips) >= len(EXPECTED_SCENES_VFR) + + for i, (clip, (exp_start_tc, exp_end_tc)) in enumerate( + zip(clips, EXPECTED_SCENES_VFR, strict=False) + ): + sr = clip["source_range"] + start_val = sr["start_time"]["value"] + dur_val = sr["duration"]["value"] + + # No spurious float precision: values should have at most 6 decimal places. + assert round(start_val, 6) == start_val, ( + f"[{backend}] Clip {i + 1} start_time.value has excess precision: {start_val!r}" + ) + assert round(dur_val, 6) == dur_val, ( + f"[{backend}] Clip {i + 1} duration.value has excess precision: {dur_val!r}" + ) + + # Values should round-trip to the expected timecodes within 1 frame. + start_secs = start_val / frame_rate + end_secs = (start_val + dur_val) / frame_rate + assert abs(start_secs - _tc_to_secs(exp_start_tc)) < one_frame_secs, ( + f"[{backend}] Clip {i + 1} start: {start_secs:.4f}s vs expected {exp_start_tc}" + ) + assert abs(end_secs - _tc_to_secs(exp_end_tc)) < one_frame_secs, ( + f"[{backend}] Clip {i + 1} end: {end_secs:.4f}s vs expected {exp_end_tc}" + ) + + +def test_vfr_edl_export(test_vfr_video: str, tmp_path): + """EDL export for VFR video should succeed and contain valid edit entries. + + EDL uses HH:MM:SS:FF frame counts at nominal fps, which is an approximation for VFR + content. This test only verifies structural correctness, not exact timecodes. + """ + exit_code, _ = invoke_cli( + [ + "-i", + test_vfr_video, + "-o", + str(tmp_path), + "detect-content", + "time", + "--end", + "10s", + "save-edl", + ] + ) + assert exit_code == 0 + edl_path = next(tmp_path.glob("*.edl")) + content = edl_path.read_text() + assert "FCM: NON-DROP FRAME" in content + assert "001 AX V" in content + + +def test_vfr_csv_backend_conformance(test_vfr_video: str): + """PyAV and OpenCV should produce identical scene timecodes for VFR video. + + Only the known interior scenes are compared; the last scene's end time may vary slightly + between backends since it reflects the clip boundary rather than a detected cut. + """ + timecodes: ty.Dict[str, ty.List[ty.Tuple[str, str]]] = {} + for backend in ("pyav", "opencv"): + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, end_time=10.0) + timecodes[backend] = [(s.get_timecode(), e.get_timecode()) for s, e in sm.get_scene_list()] + # Compare only the known scenes (last scene's end varies by backend at the clip boundary). + n = len(EXPECTED_SCENES_VFR) + assert timecodes["pyav"][:n] == timecodes["opencv"][:n], ( + f"Backend timecode mismatch:\n pyav: {timecodes['pyav']}\n opencv: {timecodes['opencv']}" + ) diff --git a/website/pages/changelog.md b/website/pages/changelog.md index acfb9ffd..535e2d5a 100644 --- a/website/pages/changelog.md +++ b/website/pages/changelog.md @@ -675,7 +675,9 @@ Although there have been minimal changes to most API examples, there are several ### CLI Changes -- [feature] [WIP] New `save-xml` command supports saving scenes in Final Cut Pro format [#156](https://github.com/Breakthrough/PySceneDetect/issues/156) +- [feature] VFR videos are handled correctly by the OpenCV and PyAV backends, and should work correctly with default parameters +- [feature] New `save-xml` command supports saving scenes in Final Cut Pro formats [#156](https://github.com/Breakthrough/PySceneDetect/issues/156) +- [bugfix] Fix floating-point precision error in `save-otio` output where frame values near integer boundaries (e.g. `90.00000000000001`) were serialized with spurious precision - [refactor] Remove deprecated `-d`/`--min-delta-hsv` option from `detect-adaptive` command ### API Changes @@ -723,5 +725,5 @@ Although there have been minimal changes to most API examples, there are several * Remove deprecated `AdaptiveDetector.get_content_val()` method (use `StatsManager` instead) * Remove deprecated `AdaptiveDetector` constructor arg `min_delta_hsv` (use `min_content_val` instead) * Remove `advance` parameter from `VideoStream.read()` - - + * Remove `SceneDetector.stats_manager_required` property, no longer required + * `SceneDetector` is now a [Python abstract class](https://docs.python.org/3/library/abc.html)