diff --git a/go.mod b/go.mod index b43d977a..7e17121d 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/fluxcd/pkg/cache v0.9.0 github.com/fluxcd/pkg/runtime v0.60.0 github.com/fluxcd/pkg/version v0.7.0 + github.com/go-logr/logr v1.4.2 github.com/google/go-containerregistry v0.20.5 github.com/google/go-containerregistry/pkg/authn/k8schain v0.0.0-20250225234217-098045d5e61f github.com/onsi/ginkgo v1.16.5 @@ -86,7 +87,6 @@ require ( github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.8.0 // indirect github.com/go-errors/errors v1.5.1 // indirect - github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.1 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect diff --git a/internal/database/badger_gc.go b/internal/database/badger_gc.go new file mode 100644 index 00000000..f884c5d4 --- /dev/null +++ b/internal/database/badger_gc.go @@ -0,0 +1,94 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package database + +import ( + "context" + "errors" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/go-logr/logr" + ctrl "sigs.k8s.io/controller-runtime" +) + +// BadgerGarbageCollector implements controller runtime's Runnable +type BadgerGarbageCollector struct { + // DiscardRatio must be a float between 0.0 and 1.0, inclusive + // See badger.DB.RunValueLogGC for more info + DiscardRatio float64 + Interval time.Duration + + name string + db *badger.DB + log logr.Logger +} + +// NewBadgerGarbageCollector creates and returns a new BadgerGarbageCollector +func NewBadgerGarbageCollector(name string, db *badger.DB, interval time.Duration, discardRatio float64) *BadgerGarbageCollector { + return &BadgerGarbageCollector{ + DiscardRatio: discardRatio, + Interval: interval, + + name: name, + db: db, + } +} + +// Start repeatedly runs the BadgerDB garbage collector with a delay inbetween +// runs. +// +// Start blocks until the context is cancelled. The database is expected to +// already be open and not be closed while this context is active. +// +// ctx should be a logr.Logger context. +func (gc *BadgerGarbageCollector) Start(ctx context.Context) error { + gc.log = ctrl.LoggerFrom(ctx).WithName(gc.name) + + gc.log.Info("Starting Badger GC") + timer := time.NewTimer(gc.Interval) + for { + select { + case <-timer.C: + gc.discardValueLogFiles() + timer.Reset(gc.Interval) + case <-ctx.Done(): + timer.Stop() + gc.log.Info("Stopped Badger GC") + return nil + } + } +} + +// upper bound for loop +const maxDiscards = 1000 + +func (gc *BadgerGarbageCollector) discardValueLogFiles() { + gc.log.V(1).Info("Running Badger GC") + for c := 0; c < maxDiscards; c++ { + err := gc.db.RunValueLogGC(gc.DiscardRatio) + if errors.Is(err, badger.ErrNoRewrite) { + // there is no more garbage to discard + gc.log.V(1).Info("Ran Badger GC", "discarded_vlogs", c) + return + } + if err != nil { + gc.log.Error(err, "Badger GC Error", "discarded_vlogs", c) + return + } + } + gc.log.Error(nil, "Warning: Badger GC ran for maximum discards", "discarded_vlogs", maxDiscards) +} diff --git a/internal/database/badger_gc_test.go b/internal/database/badger_gc_test.go new file mode 100644 index 00000000..61bb5c89 --- /dev/null +++ b/internal/database/badger_gc_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package database + +import ( + "context" + "os" + "testing" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" +) + +func TestBadgerGarbageCollectorDoesStop(t *testing.T) { + badger, db := createBadgerDatabaseForGC(t) + ctx, cancel := context.WithCancel( + logr.NewContext(context.Background(), + testr.NewWithOptions(t, testr.Options{Verbosity: 1, LogTimestamp: true}))) + + stop := make(chan struct{}) + go func() { + gc := NewBadgerGarbageCollector("test-badger-gc", badger, 500*time.Millisecond, 0.01) + gc.Start(ctx) + stop <- struct{}{} + }() + + time.Sleep(time.Second) + + tags := []string{"latest", "v0.0.1", "v0.0.2"} + fatalIfError(t, db.SetTags(testRepo, tags)) + _, err := db.Tags(testRepo) + fatalIfError(t, err) + t.Log("wrote tags successfully") + + time.Sleep(time.Second) + + cancel() + t.Log("waiting for GC stop") + select { + case <-time.NewTimer(5 * time.Second).C: + t.Fatalf("GC did not stop") + case <-stop: + t.Log("GC Stopped") + } +} + +func createBadgerDatabaseForGC(t *testing.T) (*badger.DB, *BadgerDatabase) { + t.Helper() + dir, err := os.MkdirTemp(os.TempDir(), t.Name()) + if err != nil { + t.Fatal(err) + } + db, err := badger.Open(badger.DefaultOptions(dir)) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + db.Close() + os.RemoveAll(dir) + }) + return db, NewBadgerDatabase(db) +} diff --git a/main.go b/main.go index 816f80d3..f60e95cb 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "os" + "time" "github.com/dgraph-io/badger/v3" flag "github.com/spf13/pflag" @@ -61,7 +62,10 @@ import ( "github.com/fluxcd/image-reflector-controller/internal/registry" ) -const controllerName = "image-reflector-controller" +const ( + controllerName = "image-reflector-controller" + discardRatio = 0.7 +) var ( scheme = runtime.NewScheme() @@ -90,6 +94,7 @@ func main() { watchOptions helper.WatchOptions storagePath string storageValueLogFileSize int64 + gcInterval uint16 // max value is 65535 minutes (~ 45 days) which is well under the maximum time.Duration concurrent int awsAutoLogin bool gcpAutoLogin bool @@ -105,6 +110,7 @@ func main() { flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.") flag.StringVar(&storagePath, "storage-path", "/data", "Where to store the persistent database of image metadata") flag.Int64Var(&storageValueLogFileSize, "storage-value-log-file-size", 1<<28, "Set the database's memory mapped value log file size in bytes. Effective memory usage is about two times this size.") + flag.Uint16Var(&gcInterval, "gc-interval", 10, "The number of minutes to wait between garbage collections. 0 disables the garbage collector.") flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent resource reconciles.") // NOTE: Deprecated flags. @@ -152,7 +158,14 @@ func main() { os.Exit(1) } defer badgerDB.Close() + db := database.NewBadgerDatabase(badgerDB) + var badgerGC *database.BadgerGarbageCollector + if gcInterval > 0 { + badgerGC = database.NewBadgerGarbageCollector("badger-gc", badgerDB, time.Duration(gcInterval)*time.Minute, discardRatio) + } else { + setupLog.V(1).Info("Badger garbage collector is disabled") + } watchNamespace := "" if !watchOptions.AllNamespaces { @@ -225,6 +238,10 @@ func main() { os.Exit(1) } + if badgerGC != nil { + mgr.Add(badgerGC) + } + probes.SetupChecks(mgr, setupLog) var eventRecorder *events.Recorder