Skip to content

Commit 67c7d16

Browse files
committed
Add rolling transcode segment cleanup
1 parent e25d910 commit 67c7d16

10 files changed

Lines changed: 269 additions & 23 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ Copy `config.example.json` and change the upstream URL:
9898
"max_sessions": 2,
9999
"buffer_pause_seconds": 300,
100100
"buffer_resume_seconds": 120,
101+
"segment_retention_seconds": 300,
101102
"idle_timeout_seconds": 60
102103
}
103104
}
@@ -117,5 +118,6 @@ EmbyTranscoder keeps local FFmpeg sessions tied to Emby playback check-ins:
117118
- HLS playlist and segment requests refresh media activity.
118119
- When transcoded media gets more than `buffer_pause_seconds` ahead of playback, FFmpeg is paused.
119120
- When buffered media falls back under `buffer_resume_seconds`, FFmpeg resumes.
121+
- Segments older than `segment_retention_seconds` behind the current playback position are deleted from the local cache.
120122
- If neither playback activity nor HLS access arrives before `idle_timeout_seconds`, the idle reaper stops the session.
121123
- A new `master.m3u8` request with a different upstream stream URL, such as a seek with a different `StartTimeTicks`, restarts the local session.

config.example.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"max_sessions": 2,
1717
"buffer_pause_seconds": 300,
1818
"buffer_resume_seconds": 120,
19+
"segment_retention_seconds": 300,
1920
"idle_timeout_seconds": 60
2021
},
2122
"clients": [

docker/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ Edit `config/config.json` before startup, or copy it to `config/config.local.jso
3030
- video output is capped at 1920x1080 while preserving aspect ratio
3131
- playbackinfo rewrites prewarm the transcode session before the first playlist request
3232
- ffmpeg runs with low-latency startup and GOP settings to reduce first-segment delay
33+
- old `segment_*.ts` files are deleted once they are more than `transcode.segment_retention_seconds` behind playback
3334

3435
For GitHub Actions publishing to Docker Hub, set `DOCKERHUB_USERNAME` and `DOCKERHUB_TOKEN` in the repository secrets.
3536

36-
The transcode cache is stored in `docker/data/transcode`.
37+
The transcode cache is stored in `docker/data/transcode`; active sessions keep only the retained back-buffer plus the forward buffer.
3738

3839
## Operations
3940

docker/config/config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"max_sessions": 2,
1717
"buffer_pause_seconds": 300,
1818
"buffer_resume_seconds": 120,
19+
"segment_retention_seconds": 300,
1920
"idle_timeout_seconds": 60
2021
},
2122
"clients": [

internal/config/config.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@ type Upstream struct {
2626
}
2727

2828
type Transcode struct {
29-
Enabled bool `json:"enabled"`
30-
FFmpegPath string `json:"ffmpeg_path"`
31-
TempDir string `json:"temp_dir"`
32-
HardwareDecode string `json:"hardware_decode"`
33-
HardwareDevice string `json:"hardware_device"`
34-
MaxSessions int `json:"max_sessions"`
35-
BufferPauseSeconds int `json:"buffer_pause_seconds"`
36-
BufferResumeSeconds int `json:"buffer_resume_seconds"`
37-
IdleTimeoutSeconds int `json:"idle_timeout_seconds"`
38-
BufferPause time.Duration `json:"-"`
39-
BufferResume time.Duration `json:"-"`
40-
IdleTimeout time.Duration `json:"-"`
29+
Enabled bool `json:"enabled"`
30+
FFmpegPath string `json:"ffmpeg_path"`
31+
TempDir string `json:"temp_dir"`
32+
HardwareDecode string `json:"hardware_decode"`
33+
HardwareDevice string `json:"hardware_device"`
34+
MaxSessions int `json:"max_sessions"`
35+
BufferPauseSeconds int `json:"buffer_pause_seconds"`
36+
BufferResumeSeconds int `json:"buffer_resume_seconds"`
37+
SegmentRetentionSeconds int `json:"segment_retention_seconds"`
38+
IdleTimeoutSeconds int `json:"idle_timeout_seconds"`
39+
BufferPause time.Duration `json:"-"`
40+
BufferResume time.Duration `json:"-"`
41+
SegmentRetention time.Duration `json:"-"`
42+
IdleTimeout time.Duration `json:"-"`
4143
}
4244

4345
func (t *Transcode) UnmarshalJSON(data []byte) error {
@@ -85,13 +87,14 @@ func Default() Config {
8587
URL: "http://127.0.0.1:8096",
8688
},
8789
Transcode: Transcode{
88-
Enabled: true,
89-
FFmpegPath: "/usr/bin/ffmpeg",
90-
TempDir: "/var/lib/emby-transcoder/transcode",
91-
MaxSessions: 2,
92-
BufferPauseSeconds: 300,
93-
BufferResumeSeconds: 120,
94-
IdleTimeoutSeconds: 60,
90+
Enabled: true,
91+
FFmpegPath: "/usr/bin/ffmpeg",
92+
TempDir: "/var/lib/emby-transcoder/transcode",
93+
MaxSessions: 2,
94+
BufferPauseSeconds: 300,
95+
BufferResumeSeconds: 120,
96+
SegmentRetentionSeconds: 300,
97+
IdleTimeoutSeconds: 60,
9598
},
9699
Clients: []ClientProfile{
97100
{Name: "emby_android_tv", Match: []string{"Emby for Android TV", "Android TV"}, Transcode: true},
@@ -149,10 +152,14 @@ func normalize(cfg *Config) {
149152
if cfg.Transcode.BufferResumeSeconds <= 0 {
150153
cfg.Transcode.BufferResumeSeconds = 120
151154
}
155+
if cfg.Transcode.SegmentRetentionSeconds <= 0 {
156+
cfg.Transcode.SegmentRetentionSeconds = 300
157+
}
152158
if cfg.Transcode.IdleTimeoutSeconds <= 0 {
153159
cfg.Transcode.IdleTimeoutSeconds = 60
154160
}
155161
cfg.Transcode.BufferPause = time.Duration(cfg.Transcode.BufferPauseSeconds) * time.Second
156162
cfg.Transcode.BufferResume = time.Duration(cfg.Transcode.BufferResumeSeconds) * time.Second
163+
cfg.Transcode.SegmentRetention = time.Duration(cfg.Transcode.SegmentRetentionSeconds) * time.Second
157164
cfg.Transcode.IdleTimeout = time.Duration(cfg.Transcode.IdleTimeoutSeconds) * time.Second
158165
}

internal/config/config_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ func TestDefaultConfigIsUsable(t *testing.T) {
3232
if cfg.Transcode.BufferResumeSeconds != 120 {
3333
t.Fatalf("buffer resume seconds = %d", cfg.Transcode.BufferResumeSeconds)
3434
}
35+
if cfg.Transcode.SegmentRetentionSeconds != 300 {
36+
t.Fatalf("segment retention seconds = %d", cfg.Transcode.SegmentRetentionSeconds)
37+
}
3538
if cfg.Server.Debug {
3639
t.Fatal("server debug should default false")
3740
}
@@ -42,7 +45,7 @@ func TestLoadMergesJSONOverDefaults(t *testing.T) {
4245
err := os.WriteFile(path, []byte(`{
4346
"server": {"listen": ":9000", "debug": true},
4447
"upstream": {"url": "http://emby.local:8096"},
45-
"transcode": {"max_sessions": 4, "buffer_pause_seconds": 600, "buffer_resume_seconds": 90},
48+
"transcode": {"max_sessions": 4, "buffer_pause_seconds": 600, "buffer_resume_seconds": 90, "segment_retention_seconds": 180},
4649
"clients": [{"name": "yamby", "match": ["Yamby"], "transcode": true}]
4750
}`), 0o600)
4851
if err != nil {
@@ -69,6 +72,9 @@ func TestLoadMergesJSONOverDefaults(t *testing.T) {
6972
if cfg.Transcode.BufferPauseSeconds != 600 || cfg.Transcode.BufferResumeSeconds != 90 {
7073
t.Fatalf("buffer thresholds = %d/%d", cfg.Transcode.BufferPauseSeconds, cfg.Transcode.BufferResumeSeconds)
7174
}
75+
if cfg.Transcode.SegmentRetentionSeconds != 180 {
76+
t.Fatalf("segment retention seconds = %d", cfg.Transcode.SegmentRetentionSeconds)
77+
}
7278
if !cfg.Server.Debug {
7379
t.Fatal("server debug should be loaded from config")
7480
}

internal/proxy/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func NewWithTransport(cfg config.Config, transport http.RoundTripper) (*Server,
5252
HardwareDevice: cfg.Transcode.HardwareDevice,
5353
BufferPauseThreshold: cfg.Transcode.BufferPause,
5454
BufferResumeThreshold: cfg.Transcode.BufferResume,
55+
SegmentRetention: cfg.Transcode.SegmentRetention,
5556
IdleTimeout: cfg.Transcode.IdleTimeout,
5657
})
5758
if err != nil {

internal/transcode/handler.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func (h Handler) sessionForSegment(id string, segmentIndex int, name string, r *
145145
}
146146
request := requestFromHTTP(id, inputURL, segmentReq)
147147
request.SegmentStartIndex = segmentIndex
148+
request.SegmentRequest = true
148149
request.RequestedStartTimeTicks = int64Query(r.URL.Query().Get("StartTimeTicks"))
149150
if request.RequestedStartTimeTicks == 0 {
150151
request.RequestedStartTimeTicks = request.StartTimeTicks
@@ -188,12 +189,15 @@ func (h Handler) startupWait() time.Duration {
188189
}
189190

190191
func segmentReusable(session *Session, segmentIndex int, name string) bool {
191-
if fileExists(filepath.Join(session.Dir, name)) {
192-
return true
192+
if segmentIndex < session.OldestSegmentKept {
193+
return false
193194
}
194195
if segmentIndex < session.SegmentStartIndex {
195196
return false
196197
}
198+
if fileExists(filepath.Join(session.Dir, name)) {
199+
return true
200+
}
197201
highest := session.HighestSegmentSeen
198202
if highest < session.SegmentStartIndex {
199203
highest = session.SegmentStartIndex

internal/transcode/manager.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Options struct {
3535
HardwareDevice string
3636
BufferPauseThreshold time.Duration
3737
BufferResumeThreshold time.Duration
38+
SegmentRetention time.Duration
3839
BufferCheckInterval time.Duration
3940
IdleTimeout time.Duration
4041
ReapInterval time.Duration
@@ -53,6 +54,7 @@ type Request struct {
5354
StartTimeTicks int64
5455
RequestedStartTimeTicks int64
5556
SegmentStartIndex int
57+
SegmentRequest bool
5658
Media MediaInfo
5759
}
5860

@@ -73,6 +75,7 @@ type Session struct {
7375
StartTimeTicks int64
7476
RequestedStartTimeTicks int64
7577
SegmentStartIndex int
78+
OldestSegmentKept int
7679
HighestSegmentSeen int
7780
Media MediaInfo
7881
Dir string
@@ -155,6 +158,9 @@ func normalizeManagerOptions(options Options) Options {
155158
if options.BufferResumeThreshold <= 0 {
156159
options.BufferResumeThreshold = 2 * time.Minute
157160
}
161+
if options.SegmentRetention <= 0 {
162+
options.SegmentRetention = 5 * time.Minute
163+
}
158164
if options.BufferCheckInterval <= 0 {
159165
options.BufferCheckInterval = time.Second
160166
}
@@ -410,6 +416,7 @@ func (m *Manager) RecordProgress(event PlaybackEvent) int {
410416
now := time.Now()
411417
count := 0
412418
var actions []bufferAction
419+
var cleanups []segmentCleanupTask
413420
for _, session := range m.sessions {
414421
if !matchesPlayback(session, event) {
415422
continue
@@ -430,9 +437,15 @@ func (m *Manager) RecordProgress(event PlaybackEvent) int {
430437
if action, ok := m.bufferActionLocked(session); ok {
431438
actions = append(actions, action)
432439
}
440+
if cleanup, ok := m.segmentCleanupTaskLocked(session); ok {
441+
cleanups = append(cleanups, cleanup)
442+
}
433443
count++
434444
}
435445
m.mu.Unlock()
446+
for _, cleanup := range cleanups {
447+
cleanupOldSegments(cleanup)
448+
}
436449
for _, action := range actions {
437450
m.applyBufferAction(action)
438451
}
@@ -535,6 +548,9 @@ func (m *Manager) ReconcileBuffers() {
535548
}
536549

537550
func shouldRestart(session *Session, request Request) bool {
551+
if request.SegmentRequest && session.OldestSegmentKept > session.SegmentStartIndex && request.SegmentStartIndex < session.OldestSegmentKept {
552+
return true
553+
}
538554
if request.StartTimeTicks != session.StartTimeTicks {
539555
return true
540556
}
@@ -571,6 +587,9 @@ func touchSession(session *Session, request Request, now time.Time, mediaAccess
571587
session.StartTimeTicks = request.StartTimeTicks
572588
session.RequestedStartTimeTicks = request.RequestedStartTimeTicks
573589
session.SegmentStartIndex = request.SegmentStartIndex
590+
if session.OldestSegmentKept < request.SegmentStartIndex {
591+
session.OldestSegmentKept = request.SegmentStartIndex
592+
}
574593
if session.HighestSegmentSeen < request.SegmentStartIndex {
575594
session.HighestSegmentSeen = request.SegmentStartIndex
576595
}
@@ -591,6 +610,61 @@ type bufferAction struct {
591610
bufferTicks int64
592611
}
593612

613+
type segmentCleanupTask struct {
614+
sessionID string
615+
dir string
616+
beforeSegment int
617+
}
618+
619+
func (m *Manager) segmentCleanupTaskLocked(session *Session) (segmentCleanupTask, bool) {
620+
if session == nil || session.Dir == "" || m.options.SegmentRetention <= 0 {
621+
return segmentCleanupTask{}, false
622+
}
623+
retentionTicks := durationToTicks(m.options.SegmentRetention)
624+
if retentionTicks <= 0 || session.PositionTicks <= retentionTicks {
625+
return segmentCleanupTask{}, false
626+
}
627+
cutoffTicks := session.PositionTicks - retentionTicks
628+
beforeSegment := int(cutoffTicks / defaultSegmentTicks)
629+
if beforeSegment <= session.OldestSegmentKept {
630+
return segmentCleanupTask{}, false
631+
}
632+
session.OldestSegmentKept = beforeSegment
633+
return segmentCleanupTask{sessionID: session.ID, dir: session.Dir, beforeSegment: beforeSegment}, true
634+
}
635+
636+
func cleanupOldSegments(task segmentCleanupTask) {
637+
entries, err := os.ReadDir(task.dir)
638+
if err != nil {
639+
if !os.IsNotExist(err) {
640+
logging.Errorf("transcode cleanup error id=%s err=%v", task.sessionID, err)
641+
}
642+
return
643+
}
644+
645+
deleted := 0
646+
for _, entry := range entries {
647+
if entry.IsDir() {
648+
continue
649+
}
650+
segmentIndex, ok := segmentIndexFromName(entry.Name())
651+
if !ok || segmentIndex >= task.beforeSegment {
652+
continue
653+
}
654+
path := filepath.Join(task.dir, entry.Name())
655+
if err := os.Remove(path); err != nil {
656+
if !os.IsNotExist(err) {
657+
logging.Errorf("transcode cleanup error id=%s file=%s err=%v", task.sessionID, entry.Name(), err)
658+
}
659+
continue
660+
}
661+
deleted++
662+
}
663+
if deleted > 0 {
664+
logging.Infof("transcode cleanup id=%s deleted_segments=%d before_segment=%d", task.sessionID, deleted, task.beforeSegment)
665+
}
666+
}
667+
594668
func (m *Manager) bufferActionLocked(session *Session) (bufferAction, bool) {
595669
if session == nil || session.process == nil {
596670
return bufferAction{}, false

0 commit comments

Comments
 (0)