diff --git a/hlsvod/manager.go b/hlsvod/manager.go index 662c41d..e45ff79 100644 --- a/hlsvod/manager.go +++ b/hlsvod/manager.go @@ -24,6 +24,10 @@ const readyTimeout = 80 * time.Second // how long can it take for transcode to return first data const transcodeTimeout = 10 * time.Second +// transcodeSegmentsFn is a package-level hook so tests can stub out the actual +// FFmpeg-based implementation. In production it points to TranscodeSegments. +var transcodeSegmentsFn = TranscodeSegments + type ManagerCtx struct { mu sync.Mutex logger zerolog.Logger @@ -382,7 +386,7 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error { segmentTimes := m.breakpoints[offset : offset+limit+1] logger.Info().Interface("segments-times", segmentTimes).Msg("transcoding segments") - segments, err := TranscodeSegments(m.ctx, m.config.FFmpegBinary, TranscodeConfig{ + segments, err := transcodeSegmentsFn(m.ctx, m.config.FFmpegBinary, TranscodeConfig{ InputFilePath: m.config.MediaPath, OutputDirPath: m.config.TranscodeDir, SegmentPrefix: m.config.SegmentPrefix, // This does not need to match. @@ -458,44 +462,40 @@ func (m *ManagerCtx) transcodeFromSegment(index int) error { m.mu.Lock() defer m.mu.Unlock() - segmentsTotal := len(m.segments) - if segmentsTotal <= m.segmentBufferMax { - // if all our segments can fit in the buffer - // then we should transcode all of them - // regardless of the index + // Determine the upper bound (exclusive) of the segment range we will inspect + upperBound := len(m.segments) + if upperBound <= m.segmentBufferMax { + // All segments fit into the buffer – transcode everything index = 0 - } else if index+m.segmentBufferMax < segmentsTotal { - // cap transocded segments to the buffer size - segmentsTotal = index + m.segmentBufferMax + } else if index+m.segmentBufferMax < upperBound { + // Restrict the window to the current index + buffer size + upperBound = index + m.segmentBufferMax } - offset, limit := 0, 0 - for i := index; i < segmentsTotal-1; i++ { + processedCount, pendingCount := 0, 0 // processedCount is the number of segments already transcoded or enqueued + for i := index; i < upperBound; i++ { _, isEnqueued := m.waitForSegment(i) isTranscoded := m.isSegmentTranscoded(i) - // increase offset if transcoded without limit - if (isTranscoded || isEnqueued) && limit == 0 { - offset++ - } else - // increase limit if is not transcoded - if !(isTranscoded || isEnqueued) { - limit++ - } else - // break otherwise - { + // Skip already-handled segments until we find the first pending one + if (isTranscoded || isEnqueued) && pendingCount == 0 { + processedCount++ + } else if !(isTranscoded || isEnqueued) { + // Count segments that still need to be transcoded + pendingCount++ + } else { + // Once we have a mix of handled and pending segments, stop the scan break } } - // if offset is greater than our minimal offset, - // or limit is 0, we have enough segments available - if offset > m.segmentBufferMin || limit == 0 { + // If we already have enough handled segments in the buffer, or no work is pending, exit early + if processedCount > m.segmentBufferMin || pendingCount == 0 { return nil } - // otherwise transcode chosen segment range - return m.transcodeSegments(offset+index, limit) + // Otherwise, transcode the pending segment window + return m.transcodeSegments(index+processedCount, pendingCount) } func (m *ManagerCtx) Start() (err error) { diff --git a/hlsvod/manager_test.go b/hlsvod/manager_test.go new file mode 100644 index 0000000..493d189 --- /dev/null +++ b/hlsvod/manager_test.go @@ -0,0 +1,47 @@ +package hlsvod + +import ( + "context" + "testing" +) + +// TestTranscodeFromSegmentBufferSize ensures that transcodeFromSegment() +// queues exactly segmentBufferMax segments for transcoding. (This is +// a regression test, as the implementation previously only queued +// segmentBufferMax-1 segments.) +func TestTranscodeFromSegmentBufferSize(t *testing.T) { + const bufferMax = 5 + + // Prepare a ManagerCtx with exactly bufferMax segments. + m := &ManagerCtx{ + breakpoints: make([]float64, bufferMax+1), + segmentBufferMax: bufferMax, + segmentBufferMin: 3, // default value from constructor + segments: make(map[int]string), + segmentQueue: make(map[int]chan struct{}), + } + + // Populate the dummy segments map so that len(m.segments) == bufferMax. + for i := 0; i < bufferMax; i++ { + m.segments[i] = "" + } + + // Stub out the transcode function so the test doesn't invoke FFmpeg. + origFn := transcodeSegmentsFn + transcodeSegmentsFn = func(_ context.Context, _ string, _ TranscodeConfig) (chan string, error) { + ch := make(chan string) + close(ch) + return ch, nil + } + defer func() { transcodeSegmentsFn = origFn }() + + // Execute the code under test. + if err := m.transcodeFromSegment(0); err != nil { + t.Fatalf("transcodeFromSegment returned error: %v", err) + } + + // transcodeFromSegment should enqueue `bufferMax` segments + if got := len(m.segmentQueue); got != bufferMax { + t.Fatalf("expected %d queued segments, got %d", bufferMax, got) + } +}