Skip to content

Commit f1211f4

Browse files
committed
Stop periodic jobs for idle repos
Repos that haven't had a client request in 24h (configurable via idle-timeout) no longer re-arm their snapshot/repack periodic jobs. When a request arrives for an idle repo, the jobs are re-scheduled.
1 parent 572fc02 commit f1211f4

7 files changed

Lines changed: 124 additions & 17 deletions

File tree

internal/gitclone/manager.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strconv"
1212
"strings"
1313
"sync"
14+
"sync/atomic"
1415
"syscall"
1516
"time"
1617

@@ -85,6 +86,19 @@ type Repository struct {
8586
refCheckValid bool
8687
fetchSem chan struct{}
8788
credentialProvider CredentialProvider
89+
lastAccessed atomic.Int64
90+
}
91+
92+
func (r *Repository) TouchAccessed() {
93+
r.lastAccessed.Store(time.Now().UnixNano())
94+
}
95+
96+
func (r *Repository) LastAccessed() time.Time {
97+
ns := r.lastAccessed.Load()
98+
if ns == 0 {
99+
return time.Time{}
100+
}
101+
return time.Unix(0, ns)
88102
}
89103

90104
type Manager struct {
@@ -199,6 +213,7 @@ func (m *Manager) GetOrCreate(_ context.Context, upstreamURL string) (*Repositor
199213
fetchSem: make(chan struct{}, 1),
200214
credentialProvider: m.credentialProvider,
201215
}
216+
repo.lastAccessed.Store(time.Now().UnixNano())
202217

203218
headFile := filepath.Join(clonePath, "HEAD")
204219
if _, err := os.Stat(headFile); err == nil {

internal/jobscheduler/jobs.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ type Config struct {
2222
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
2323
}
2424

25+
// ErrJobIdle is a sentinel error that a periodic job can return to signal that
26+
// it should not be re-armed. The scheduler treats this as a graceful stop, not
27+
// as a job failure.
28+
var ErrJobIdle = errors.New("job idle")
29+
2530
type queueJob struct {
2631
id string
2732
queue string
@@ -182,6 +187,12 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
182187
submit := func() {
183188
q.Submit(queue, id, func(ctx context.Context) error {
184189
err := run(ctx)
190+
// If the job signals idle, stop re-arming so the periodic chain
191+
// ends. The caller is responsible for re-scheduling when activity
192+
// resumes.
193+
if errors.Is(err, ErrJobIdle) {
194+
return err
195+
}
185196
if q.store != nil {
186197
if storeErr := q.store.SetLastRun(key, time.Now()); storeErr != nil {
187198
logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr)
@@ -291,10 +302,14 @@ func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job que
291302

292303
elapsed := time.Since(start)
293304
status := "success"
294-
if err != nil {
305+
switch {
306+
case errors.Is(err, ErrJobIdle):
307+
status = "idle"
308+
logger.InfoContext(ctx, "Periodic job idled out", "job", job, "elapsed", elapsed)
309+
case err != nil:
295310
status = "error"
296311
logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed)
297-
} else {
312+
default:
298313
logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed)
299314
}
300315
jobAttrs = append(jobAttrs, attribute.String("status", status))

internal/jobscheduler/jobs_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,42 @@ func TestJobSchedulerPeriodicJob(t *testing.T) {
263263
}, "periodic job should execute multiple times")
264264
}
265265

266+
func TestJobSchedulerPeriodicJobStopsOnIdle(t *testing.T) {
267+
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
268+
ctx, cancel := context.WithCancel(ctx)
269+
defer cancel()
270+
271+
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})
272+
273+
var executions atomic.Int32
274+
executed := make(chan struct{}, 10)
275+
276+
scheduler.SubmitPeriodicJob("queue1", "idle-job", 50*time.Millisecond, func(_ context.Context) error {
277+
n := executions.Add(1)
278+
executed <- struct{}{}
279+
if n >= 2 {
280+
return jobscheduler.ErrJobIdle
281+
}
282+
return nil
283+
})
284+
285+
// Wait for the idle return to take effect.
286+
eventually(t, time.Second, func() bool { return executions.Load() >= 2 },
287+
"periodic job should fire at least twice before idling out")
288+
289+
// Drain any buffered signals from before the idle return.
290+
for len(executed) > 0 {
291+
<-executed
292+
}
293+
294+
// No further executions should occur after ErrJobIdle.
295+
select {
296+
case <-executed:
297+
t.Fatal("periodic job should not re-arm after returning ErrJobIdle")
298+
case <-time.After(300 * time.Millisecond):
299+
}
300+
}
301+
266302
func TestJobSchedulerPeriodicJobWithError(t *testing.T) {
267303
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
268304
ctx, cancel := context.WithCancel(ctx)

internal/strategy/git/git.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Config struct {
4949
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"`
5050
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"`
5151
BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"`
52+
IdleTimeout time.Duration `hcl:"idle-timeout,optional" help:"Stop periodic jobs for repos with no client requests for this duration. 0 disables." default:"72h"`
5253
}
5354

5455
type Strategy struct {
@@ -66,6 +67,7 @@ type Strategy struct {
6667
snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry
6768
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
6869
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
70+
idledRepos sync.Map // keyed by upstream URL, set when periodic jobs idle out
6971
metrics *gitMetrics
7072
repoCounts *RepoCounts
7173
ready atomic.Bool
@@ -197,6 +199,38 @@ func (s *Strategy) Ready() bool {
197199
return s.ready.Load()
198200
}
199201

202+
func (s *Strategy) isRepoIdle(repo *gitclone.Repository) bool {
203+
if s.config.IdleTimeout <= 0 {
204+
return false
205+
}
206+
return time.Since(repo.LastAccessed()) > s.config.IdleTimeout
207+
}
208+
209+
func (s *Strategy) withIdleGuard(repo *gitclone.Repository, fn func(ctx context.Context) error) func(ctx context.Context) error {
210+
return func(ctx context.Context) error {
211+
if s.isRepoIdle(repo) {
212+
s.idledRepos.Store(repo.UpstreamURL(), true)
213+
logging.FromContext(ctx).InfoContext(ctx, "Stopping periodic job for idle repo", "upstream", repo.UpstreamURL())
214+
return jobscheduler.ErrJobIdle
215+
}
216+
return fn(ctx)
217+
}
218+
}
219+
220+
func (s *Strategy) touchAndReschedule(repo *gitclone.Repository) {
221+
repo.TouchAccessed()
222+
if _, wasIdle := s.idledRepos.LoadAndDelete(repo.UpstreamURL()); wasIdle {
223+
logging.FromContext(s.ctx).InfoContext(s.ctx, "Re-scheduling periodic jobs for previously idle repo",
224+
"upstream", repo.UpstreamURL())
225+
if s.config.SnapshotInterval > 0 {
226+
s.scheduleSnapshotJobs(repo)
227+
}
228+
if s.config.RepackInterval > 0 {
229+
s.scheduleRepackJobs(repo)
230+
}
231+
}
232+
}
233+
200234
// SetMetadataStore enables the per-repo clone histogram and schedules its
201235
// daily reaper. Called by config.Load after the metadata backend is built.
202236
func (s *Strategy) SetMetadataStore(store *metadatadb.Store) {
@@ -332,6 +366,7 @@ func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host
332366
http.Error(w, "Internal server error", http.StatusInternalServerError)
333367
return
334368
}
369+
s.touchAndReschedule(repo)
335370

336371
// Increment after GetOrCreate so unvalidated URLs can't bloat the keyspace.
337372
if isClone, cerr := RequestIsClone(pathValue, r); cerr != nil {

internal/strategy/git/refs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func (s *Strategy) handleEnsureRefs(w http.ResponseWriter, r *http.Request, host
7272
http.Error(w, "internal server error", http.StatusInternalServerError)
7373
return
7474
}
75+
s.touchAndReschedule(repo)
7576

7677
if repo.State() != gitclone.StateReady {
7778
if err := s.ensureCloneReady(ctx, repo); err != nil {

internal/strategy/git/repack.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) {
13-
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, func(ctx context.Context) error {
13+
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
1414
start := time.Now()
1515
err := repo.Repack(ctx)
1616
status := "success"
@@ -19,5 +19,5 @@ func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) {
1919
}
2020
s.metrics.recordOperation(ctx, "repack", status, time.Since(start))
2121
return errors.Wrap(err, "repack")
22-
})
22+
}))
2323
}

internal/strategy/git/snapshot.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -168,19 +168,19 @@ func (s *Strategy) generateAndUploadMirrorSnapshot(ctx context.Context, repo *gi
168168
}
169169

170170
func (s *Strategy) scheduleSnapshotJobs(repo *gitclone.Repository) {
171-
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error {
171+
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "snapshot-periodic", s.config.SnapshotInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
172172
return s.generateAndUploadSnapshot(ctx, repo)
173-
})
174-
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error {
173+
}))
174+
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
175175
return s.generateAndUploadLFSSnapshot(ctx, repo)
176-
})
176+
}))
177177
mirrorInterval := s.config.MirrorSnapshotInterval
178178
if mirrorInterval == 0 {
179179
mirrorInterval = s.config.SnapshotInterval
180180
}
181-
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "mirror-snapshot-periodic", mirrorInterval, func(ctx context.Context) error {
181+
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "mirror-snapshot-periodic", mirrorInterval, s.withIdleGuard(repo, func(ctx context.Context) error {
182182
return s.generateAndUploadMirrorSnapshot(ctx, repo)
183-
})
183+
}))
184184
}
185185

186186
func (s *Strategy) snapshotMutexFor(upstreamURL string) *sync.Mutex {
@@ -202,6 +202,7 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
202202
http.Error(w, "Internal server error", http.StatusInternalServerError)
203203
return
204204
}
205+
s.touchAndReschedule(repo)
205206

206207
cacheKey := snapshotCacheKey(upstreamURL)
207208

@@ -346,6 +347,7 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h
346347
http.Error(w, "Internal server error", http.StatusInternalServerError)
347348
return
348349
}
350+
s.touchAndReschedule(repo)
349351
if cloneErr := s.ensureCloneReady(ctx, repo); cloneErr != nil {
350352
logger.ErrorContext(ctx, "Clone unavailable for bundle", "upstream", upstreamURL, "error", cloneErr)
351353
http.Error(w, "Repository unavailable", http.StatusServiceUnavailable)
@@ -832,13 +834,16 @@ func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Reque
832834
// restore + on-demand generation. Kick off a background mirror warm so
833835
// the periodic LFS snapshot job can fire once the mirror is ready.
834836
logger.InfoContext(ctx, "LFS snapshot cache miss, triggering background warm", "upstream", upstreamURL)
835-
if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil && repo.State() != gitclone.StateReady {
836-
s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error {
837-
if err := s.startClone(ctx, repo); err != nil {
838-
logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err)
839-
}
840-
return nil
841-
})
837+
if repo, repoErr := s.cloneManager.GetOrCreate(ctx, upstreamURL); repoErr == nil {
838+
s.touchAndReschedule(repo)
839+
if repo.State() != gitclone.StateReady {
840+
s.scheduler.Submit(upstreamURL, "lfs-mirror-warm", func(ctx context.Context) error {
841+
if err := s.startClone(ctx, repo); err != nil {
842+
logger.WarnContext(ctx, "Background mirror warm for LFS failed", "upstream", upstreamURL, "error", err)
843+
}
844+
return nil
845+
})
846+
}
842847
}
843848
http.Error(w, "LFS snapshot not found", http.StatusNotFound)
844849
}

0 commit comments

Comments
 (0)