1616"""
1717
1818import math
19+ import typing as ty
1920from dataclasses import dataclass
20- from typing import List , NamedTuple , Optional
21+ from typing import NamedTuple
2122
2223import cv2
2324import numpy
2425
26+ from scenedetect .detector import Detector , Event , EventType
27+ from scenedetect .frame_timecode import FrameTimecode
2528from scenedetect .scene_detector import FlashFilter , SceneDetector
29+ from scenedetect .stats_manager import StatsManager
2630
2731
2832def _mean_pixel_distance (left : numpy .ndarray , right : numpy .ndarray ) -> float :
@@ -97,7 +101,7 @@ class _FrameData:
97101 """Frame saturation map [2D 8-bit]."""
98102 lum : numpy .ndarray
99103 """Frame luma/brightness map [2D 8-bit]."""
100- edges : Optional [numpy .ndarray ]
104+ edges : ty . Optional [numpy .ndarray ]
101105 """Frame edge map [2D 8-bit, edges are 255, non edges 0]. Affected by `kernel_size`."""
102106
103107 def __init__ (
@@ -106,7 +110,7 @@ def __init__(
106110 min_scene_len : int = 15 ,
107111 weights : "ContentDetector.Components" = DEFAULT_COMPONENT_WEIGHTS ,
108112 luma_only : bool = False ,
109- kernel_size : Optional [int ] = None ,
113+ kernel_size : ty . Optional [int ] = None ,
110114 filter_mode : FlashFilter .Mode = FlashFilter .Mode .MERGE ,
111115 ):
112116 """
@@ -126,17 +130,17 @@ def __init__(
126130 super ().__init__ ()
127131 self ._threshold : float = threshold
128132 self ._min_scene_len : int = min_scene_len
129- self ._last_above_threshold : Optional [int ] = None
130- self ._last_frame : Optional [ContentDetector ._FrameData ] = None
133+ self ._last_above_threshold : ty . Optional [int ] = None
134+ self ._last_frame : ty . Optional [ContentDetector ._FrameData ] = None
131135 self ._weights : ContentDetector .Components = weights
132136 if luma_only :
133137 self ._weights = ContentDetector .LUMA_ONLY_WEIGHTS
134- self ._kernel : Optional [numpy .ndarray ] = None
138+ self ._kernel : ty . Optional [numpy .ndarray ] = None
135139 if kernel_size is not None :
136140 if kernel_size < 3 or kernel_size % 2 == 0 :
137141 raise ValueError ("kernel_size must be odd integer >= 3" )
138142 self ._kernel = numpy .ones ((kernel_size , kernel_size ), numpy .uint8 )
139- self ._frame_score : Optional [float ] = None
143+ self ._frame_score : ty . Optional [float ] = None
140144 self ._flash_filter = FlashFilter (mode = filter_mode , length = min_scene_len )
141145
142146 def get_metrics (self ):
@@ -187,7 +191,7 @@ def _calculate_frame_score(self, frame_num: int, frame_img: numpy.ndarray) -> fl
187191 self ._last_frame = ContentDetector ._FrameData (hue , sat , lum , edges )
188192 return frame_score
189193
190- def process_frame (self , frame_num : int , frame_img : numpy .ndarray ) -> List [int ]:
194+ def process_frame (self , frame_num : int , frame_img : numpy .ndarray ) -> ty . List [int ]:
191195 """Process the next frame. `frame_num` is assumed to be sequential.
192196
193197 Args:
@@ -196,7 +200,7 @@ def process_frame(self, frame_num: int, frame_img: numpy.ndarray) -> List[int]:
196200 frame_img (numpy.ndarray or None): Video frame corresponding to `frame_img`.
197201
198202 Returns:
199- List[int]: List of frames where scene cuts have been detected. There may be 0
203+ ty. List[int]: List of frames where scene cuts have been detected. There may be 0
200204 or more frames in the list, and not necessarily the same as frame_num.
201205 """
202206 self ._frame_score = self ._calculate_frame_score (frame_num , frame_img )
@@ -237,3 +241,202 @@ def _detect_edges(self, lum: numpy.ndarray) -> numpy.ndarray:
237241 @property
238242 def event_buffer_length (self ) -> int :
239243 return self ._flash_filter .max_behind
244+
245+
246+ class ContentDetector2 (Detector ):
247+ """Detects fast cuts using changes in colour and intensity between frames.
248+
249+ The difference is calculated in the HSV color space, and compared against a set threshold to
250+ determine when a fast cut has occurred.
251+ """
252+
253+ # TODO: Come up with some good weights for a new default if there is one that can pass
254+ # a wider variety of test cases.
255+ class Components (NamedTuple ):
256+ """Components that make up a frame's score, and their default values."""
257+
258+ delta_hue : float = 1.0
259+ """Difference between pixel hue values of adjacent frames."""
260+ delta_sat : float = 1.0
261+ """Difference between pixel saturation values of adjacent frames."""
262+ delta_lum : float = 1.0
263+ """Difference between pixel luma (brightness) values of adjacent frames."""
264+ delta_edges : float = 0.0
265+ """Difference between calculated edges of adjacent frames.
266+
267+ Edge differences are typically larger than the other components, so the detection
268+ threshold may need to be adjusted accordingly."""
269+
270+ DEFAULT_COMPONENT_WEIGHTS = Components ()
271+ """Default component weights. Actual default values are specified in :class:`Components`
272+ to allow adding new components without breaking existing usage."""
273+
274+ LUMA_ONLY_WEIGHTS = Components (
275+ delta_hue = 0.0 ,
276+ delta_sat = 0.0 ,
277+ delta_lum = 1.0 ,
278+ delta_edges = 0.0 ,
279+ )
280+ """Component weights to use if `luma_only` is set."""
281+
282+ FRAME_SCORE_KEY = "content_val"
283+ """Key in statsfile representing the final frame score after weighed by specified components."""
284+
285+ METRIC_KEYS = [FRAME_SCORE_KEY , * Components ._fields ]
286+ """All statsfile keys this detector produces."""
287+
288+ @dataclass
289+ class _FrameData :
290+ """Data calculated for a given frame."""
291+
292+ hue : numpy .ndarray
293+ """Frame hue map [2D 8-bit]."""
294+ sat : numpy .ndarray
295+ """Frame saturation map [2D 8-bit]."""
296+ lum : numpy .ndarray
297+ """Frame luma/brightness map [2D 8-bit]."""
298+ edges : ty .Optional [numpy .ndarray ]
299+ """Frame edge map [2D 8-bit, edges are 255, non edges 0]. Affected by `kernel_size`."""
300+
301+ def __init__ (
302+ self ,
303+ threshold : float = 27.0 ,
304+ min_scene_len : FrameTimecode = 15 ,
305+ weights : "ContentDetector2.Components" = DEFAULT_COMPONENT_WEIGHTS ,
306+ luma_only : bool = False ,
307+ kernel_size : ty .Optional [int ] = None ,
308+ filter_mode : FlashFilter .Mode = FlashFilter .Mode .MERGE ,
309+ ):
310+ """
311+ Arguments:
312+ threshold: Threshold the average change in pixel intensity must exceed to trigger a cut.
313+ min_scene_len: Once a cut is detected, this many frames must pass before a new one can
314+ be added to the scene list. Can be an int or FrameTimecode type.
315+ weights: Weight to place on each component when calculating frame score
316+ (`content_val` in a statsfile, the value `threshold` is compared against).
317+ luma_only: If True, only considers changes in the luminance channel of the video.
318+ Equivalent to specifying `weights` as :data:`ContentDetector2.LUMA_ONLY`.
319+ Overrides `weights` if both are set.
320+ kernel_size: Size of kernel for expanding detected edges. Must be odd integer
321+ greater than or equal to 3. If None, automatically set using video resolution.
322+ filter_mode: Mode to use when filtering cuts to meet `min_scene_len`.
323+ """
324+ super ().__init__ ()
325+ self ._threshold : float = threshold
326+ self ._min_scene_len : FrameTimecode = min_scene_len
327+ self ._last_above_threshold : ty .Optional [FrameTimecode ] = None
328+ self ._last_frame : ty .Optional [ContentDetector2 ._FrameData ] = None
329+ self ._weights : ContentDetector2 .Components = weights
330+ if luma_only :
331+ self ._weights = ContentDetector2 .LUMA_ONLY_WEIGHTS
332+ self ._kernel : ty .Optional [numpy .ndarray ] = None
333+ if kernel_size is not None :
334+ if kernel_size < 3 or kernel_size % 2 == 0 :
335+ raise ValueError ("kernel_size must be odd integer >= 3" )
336+ self ._kernel = numpy .ones ((kernel_size , kernel_size ), numpy .uint8 )
337+ self ._frame_score : ty .Optional [float ] = None
338+ self ._flash_filter = FlashFilter (mode = filter_mode , length = min_scene_len )
339+ self ._stats : ty .Optional [StatsManager ] = None
340+
341+ def get_metrics (self ):
342+ return ContentDetector2 .METRIC_KEYS
343+
344+ def set_stats_manager (self , stats : StatsManager ):
345+ self ._stats = stats
346+
347+ def _calculate_frame_score (self , frame : numpy .ndarray , timecode : FrameTimecode ) -> float :
348+ """Calculate score representing relative amount of motion in `frame_img` compared to
349+ the last time the function was called (returns 0.0 on the first call)."""
350+ # TODO: Add option to enable motion estimation before calculating score components.
351+ # TODO: Investigate methods of performing cheaper alternatives, e.g. shifting or resizing
352+ # the frame to simulate camera movement, using optical flow, etc...
353+
354+ # Convert image into HSV colorspace.
355+ hue , sat , lum = cv2 .split (cv2 .cvtColor (frame , cv2 .COLOR_BGR2HSV ))
356+
357+ # Performance: Only calculate edges if we have to.
358+ calculate_edges : bool = (self ._weights .delta_edges > 0.0 ) or self ._stats is not None
359+ edges = self ._detect_edges (lum ) if calculate_edges else None
360+
361+ if self ._last_frame is None :
362+ # Need another frame to compare with for score calculation.
363+ self ._last_frame = ContentDetector2 ._FrameData (hue , sat , lum , edges )
364+ return 0.0
365+
366+ score_components = ContentDetector2 .Components (
367+ delta_hue = _mean_pixel_distance (hue , self ._last_frame .hue ),
368+ delta_sat = _mean_pixel_distance (sat , self ._last_frame .sat ),
369+ delta_lum = _mean_pixel_distance (lum , self ._last_frame .lum ),
370+ delta_edges = (
371+ 0.0 if edges is None else _mean_pixel_distance (edges , self ._last_frame .edges )
372+ ),
373+ )
374+
375+ frame_score : float = sum (
376+ component * weight for (component , weight ) in zip (score_components , self ._weights )
377+ ) / sum (abs (weight ) for weight in self ._weights )
378+
379+ # Record components and frame score if needed for analysis.
380+ if self ._stats is not None :
381+ metrics = {self .FRAME_SCORE_KEY : frame_score }
382+ metrics .update (score_components ._asdict ())
383+ self ._stats .set_metrics (timecode .frame_num , metrics )
384+
385+ # Store all data required to calculate the next frame's score.
386+ self ._last_frame = ContentDetector2 ._FrameData (hue , sat , lum , edges )
387+ return frame_score
388+
389+ def process (self , frame : numpy .ndarray , timecode : FrameTimecode ) -> ty .List [Event ]:
390+ """Process the next frame. `frame_num` is assumed to be sequential.
391+
392+ Args:
393+ frame_num (int): Frame number of frame that is being passed. Can start from any value
394+ but must remain sequential.
395+ frame_img (numpy.ndarray or None): Video frame corresponding to `frame_img`.
396+
397+ Returns:
398+ ty.List[int]: List of frames where scene cuts have been detected. There may be 0
399+ or more frames in the list, and not necessarily the same as frame_num.
400+ """
401+ self ._frame_score = self ._calculate_frame_score (frame )
402+ if self ._frame_score is None :
403+ return []
404+
405+ above_threshold : bool = self ._frame_score >= self ._threshold
406+ # TODO: Need to fix FlashFilter so we can enforce min_scene_length. We should be able to
407+ # just return `self._flash_filter.filter(timecode, above_threshold)` here.
408+ if above_threshold :
409+ return [Event (type = EventType .CUT , timecode = timecode )]
410+ return []
411+
412+ def _detect_edges (self , lum : numpy .ndarray ) -> numpy .ndarray :
413+ """Detect edges using the luma channel of a frame.
414+
415+ Arguments:
416+ lum: 2D 8-bit image representing the luma channel of a frame.
417+
418+ Returns:
419+ 2D 8-bit image of the same size as the input, where pixels with values of 255
420+ represent edges, and all other pixels are 0.
421+ """
422+ # Initialize kernel.
423+ if self ._kernel is None :
424+ kernel_size = _estimated_kernel_size (lum .shape [1 ], lum .shape [0 ])
425+ self ._kernel = numpy .ones ((kernel_size , kernel_size ), numpy .uint8 )
426+
427+ # Estimate levels for thresholding.
428+ # TODO: Add config file entries for sigma, aperture/kernel size, etc.
429+ sigma : float = 1.0 / 3.0
430+ median = numpy .median (lum )
431+ low = int (max (0 , (1.0 - sigma ) * median ))
432+ high = int (min (255 , (1.0 + sigma ) * median ))
433+
434+ # Calculate edges using Canny algorithm, and reduce noise by dilating the edges.
435+ # This increases edge overlap leading to improved robustness against noise and slow
436+ # camera movement. Note that very large kernel sizes can negatively affect accuracy.
437+ edges = cv2 .Canny (lum , low , high )
438+ return cv2 .dilate (edges , self ._kernel )
439+
440+ @property
441+ def event_buffer_length (self ) -> int :
442+ return self ._flash_filter .max_behind
0 commit comments