@@ -5,6 +5,7 @@ package controllers
55
66import (
77 "context"
8+ "errors"
89 "fmt"
910 "strings"
1011 "sync"
@@ -24,6 +25,7 @@ import (
2425 "sigs.k8s.io/controller-runtime/pkg/builder"
2526 "sigs.k8s.io/controller-runtime/pkg/client"
2627 "sigs.k8s.io/controller-runtime/pkg/log"
28+ "sigs.k8s.io/controller-runtime/pkg/manager"
2729 "sigs.k8s.io/controller-runtime/pkg/predicate"
2830)
2931
@@ -40,12 +42,6 @@ const AutoMigrateValue = "true"
4042// filter in addition to the opt-in label).
4143const ToolhiveGroup = "toolhive.stacklok.dev"
4244
43- // StorageVersionMigratorFieldManager is the Server-Side Apply field manager
44- // name used for all re-store writes. It is part of the public contract for
45- // the controller — do not change it across releases, as SSA ownership is
46- // permanent.
47- const StorageVersionMigratorFieldManager = "thv-storage-version-migrator"
48-
4945// EventReasonMigrationSucceeded and EventReasonMigrationFailed are the event
5046// reasons emitted on the owning CRD when a migration completes or fails.
5147const (
@@ -56,34 +52,63 @@ const (
5652const (
5753 defaultMigrationCacheTTL = 1 * time .Hour
5854 defaultListPageSize = 500
55+ defaultCacheGCInterval = 10 * time .Minute
5956)
6057
58+ // errMigrationRetriedDueToConflicts is returned by restoreCRs when at least one
59+ // CR re-store hit a typed Conflict (and no other errors occurred). The caller
60+ // must NOT trim CRD.status.storedVersions in this case: the post-conflict state
61+ // of the affected object is unverified, so reasoning about whether the storage
62+ // re-encode actually happened is unsafe. The next reconcile retries cleanly.
63+ var errMigrationRetriedDueToConflicts = errors .New (
64+ "storage version migration retried due to concurrent writes; storedVersions left unchanged" )
65+
66+ // The wildcard CR RBAC below is intentional. The set of opted-in CRDs isn't
67+ // known at codegen time — it's a per-CRD runtime label decision — so the
68+ // kubebuilder marker can't enumerate kinds. The runtime gate is the
69+ // isManagedCRD check inside Reconcile, which requires both the toolhive
70+ // API group AND the opt-in label. Wildcard RBAC plus isManagedCRD form the
71+ // defence in depth: RBAC bounds the controller to a single API group, and
72+ // the label gate further restricts it to opted-in CRDs.
73+
6174//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch
6275//+kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions/status,verbs=update;patch
63- //+kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=*,verbs=get;list;patch
64- //+kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=*/status,verbs=patch
76+ //+kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=*,verbs=get;list;update
77+ //+kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=*/status,verbs=update
6578
6679// StorageVersionMigratorReconciler reconciles CustomResourceDefinition objects
6780// in the toolhive.stacklok.dev group that carry the opt-in
6881// AutoMigrateLabel=AutoMigrateValue. For each such CRD it re-stores every CR
69- // at the current storage version by issuing a metadata-only Server-Side Apply
70- // against the /status subresource (to avoid admission webhooks), then patches
82+ // at the current storage version by doing a Get + Update on the live object
83+ // while toggling the MigrationTimestampAnnotation to force a real content
84+ // diff. The annotation toggle is required because the apiserver elides no-op
85+ // writes (an Update with bytes equal to what's already stored does not bump
86+ // resourceVersion and does not re-encode) — without a real diff the migration
87+ // would be a hollow operation. After all CRs have been re-stored it patches
7188// CRD.status.storedVersions down to [<currentStorageVersion>] so a future
7289// release can drop deprecated versions from spec.versions without orphaning
7390// etcd objects.
7491//
92+ // Webhook interaction: the Update goes through the main resource, so
93+ // validating/mutating admission webhooks on the kind DO see this write.
94+ // Webhooks that need to ignore migration-only writes should branch on the
95+ // presence of MigrationTimestampAnnotation in the diff.
96+ //
7597// Enabled by default. Opt out operator-wide via
7698// operator.features.storageVersionMigrator (ENABLE_STORAGE_VERSION_MIGRATOR=false)
7799// for admins who prefer to run kube-storage-version-migrator externally.
78100// Per-kind escape hatch: remove the label from the CRD (emergency only — will
79101// be re-applied by GitOps / helm upgrade).
80102type StorageVersionMigratorReconciler struct {
81- client.Client // cached reads for CRs (unused in this reconciler, kept for kubebuilder convention)
82- APIReader client.Reader // live reads for CRDs (bypasses informer)
83- Scheme * runtime.Scheme // kubebuilder reconciler convention
84- Recorder record.EventRecorder // MigrationSucceeded / MigrationFailed events on the CRD
85- PageSize int64 // overrideable for tests; defaults to defaultListPageSize
86- cache * migrationCache
103+ // used for CR Update writes and the CRD /status storedVersions patch;
104+ // reads go through APIReader to bypass the informer cache.
105+ client.Client
106+ APIReader client.Reader // live reads for CRDs and CR list pages (bypasses informer)
107+ Scheme * runtime.Scheme // kubebuilder reconciler convention
108+ Recorder record.EventRecorder // MigrationSucceeded / MigrationFailed events on the CRD
109+ PageSize int64 // overrideable for tests; defaults to defaultListPageSize
110+ CacheGCInterval time.Duration // overrideable for tests; defaults to defaultCacheGCInterval
111+ cache * migrationCache
87112}
88113
89114// Reconcile runs for each opted-in toolhive.stacklok.dev CRD event. See the
@@ -151,6 +176,11 @@ func (r *StorageVersionMigratorReconciler) Reconcile(ctx context.Context, req ct
151176// group. The filter is evaluated twice — once on informer events here, and again
152177// inside Reconcile after the live APIReader read — because label removals can
153178// briefly race the informer.
179+ //
180+ // It also registers a Runnable that periodically sweeps expired entries from
181+ // the migration cache so deleted CRs (whose UIDs never recur in subsequent
182+ // list pages and therefore never trigger lazy eviction in has()) don't grow
183+ // the map without bound on long-running operators with high CR churn.
154184func (r * StorageVersionMigratorReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
155185 r .ensureInitialized ()
156186
@@ -159,7 +189,7 @@ func (r *StorageVersionMigratorReconciler) SetupWithManager(mgr ctrl.Manager) er
159189 return fmt .Errorf ("parse label selector: %w" , err )
160190 }
161191
162- return ctrl .NewControllerManagedBy (mgr ).
192+ if err := ctrl .NewControllerManagedBy (mgr ).
163193 Named ("storageversionmigrator" ).
164194 For (
165195 & apiextensionsv1.CustomResourceDefinition {},
@@ -172,7 +202,24 @@ func (r *StorageVersionMigratorReconciler) SetupWithManager(mgr ctrl.Manager) er
172202 predicate.ResourceVersionChangedPredicate {},
173203 ),
174204 ).
175- Complete (r )
205+ Complete (r ); err != nil {
206+ return err
207+ }
208+
209+ // Periodic cache GC. Registered after Complete so the controller is fully
210+ // wired when the runnable starts.
211+ return mgr .Add (manager .RunnableFunc (func (ctx context.Context ) error {
212+ ticker := time .NewTicker (r .CacheGCInterval )
213+ defer ticker .Stop ()
214+ for {
215+ select {
216+ case <- ctx .Done ():
217+ return nil
218+ case <- ticker .C :
219+ r .cache .gc ()
220+ }
221+ }
222+ }))
176223}
177224
178225// ------------------------------------------------------------------
@@ -183,18 +230,27 @@ func (r *StorageVersionMigratorReconciler) ensureInitialized() {
183230 if r .PageSize == 0 {
184231 r .PageSize = defaultListPageSize
185232 }
233+ if r .CacheGCInterval == 0 {
234+ r .CacheGCInterval = defaultCacheGCInterval
235+ }
186236 if r .cache == nil {
187237 r .cache = newMigrationCache (defaultMigrationCacheTTL )
188238 }
189239}
190240
191241// restoreCRs lists all CRs of the CRD's served kind (served version = storageVersion)
192- // and issues a metadata-only Server-Side Apply against /status for each one,
193- // forcing the API server to re-encode the object at the current storage version.
242+ // and issues a main-resource Update on each one, forcing the apiserver to
243+ // re-encode the object at the current storage version.
194244//
195- // Swallowed per-CR errors: IsNotFound (object deleted between list and patch)
196- // and IsConflict (object updated elsewhere — storage is already fresh).
197- // All other errors are aggregated and returned.
245+ // Per-CR error handling:
246+ // - IsNotFound: silently skipped (object deleted between list and update —
247+ // it can't be at the old storage version anymore).
248+ // - IsConflict: silently skipped at the per-CR level, but a function-level
249+ // counter is incremented. After the loop, if any conflicts occurred and no
250+ // other errors did, errMigrationRetriedDueToConflicts is returned so the
251+ // caller leaves storedVersions untouched (the post-conflict state of the
252+ // conflicting object is unverified).
253+ // - All other errors are aggregated and returned.
198254func (r * StorageVersionMigratorReconciler ) restoreCRs (
199255 ctx context.Context ,
200256 crd * apiextensionsv1.CustomResourceDefinition ,
@@ -207,12 +263,11 @@ func (r *StorageVersionMigratorReconciler) restoreCRs(
207263 Kind : crd .Spec .Names .Kind ,
208264 }
209265
210- hasStatusSubresource := crdHasStatusSubresource (crd , storageVersion )
211-
212266 listGVK := gvk
213267 listGVK .Kind = crd .Spec .Names .ListKind
214268
215269 var errs []error
270+ conflicts := 0
216271 var continueToken string
217272 for {
218273 list := & unstructured.UnstructuredList {}
@@ -234,17 +289,23 @@ func (r *StorageVersionMigratorReconciler) restoreCRs(
234289 if r .cache .has (crd .Name , u .GetUID (), u .GetResourceVersion ()) {
235290 return nil
236291 }
237- if err := r .restoreOne (ctx , gvk , u , hasStatusSubresource ); err != nil {
238- if apierrors .IsNotFound (err ) || apierrors .IsConflict (err ) {
239- logger .V (1 ).Info ("skip CR — deleted or updated elsewhere" ,
292+ restored , err := r .restoreOne (ctx , gvk , u )
293+ if err != nil {
294+ switch {
295+ case apierrors .IsNotFound (err ):
296+ logger .V (1 ).Info ("skip CR — deleted" ,
297+ "object" , client .ObjectKeyFromObject (u ), "err" , err )
298+ case apierrors .IsConflict (err ):
299+ conflicts ++
300+ logger .V (1 ).Info ("skip CR — concurrent write conflict" ,
240301 "object" , client .ObjectKeyFromObject (u ), "err" , err )
241- return nil
302+ default :
303+ errs = append (errs , fmt .Errorf ("re-store %s/%s: %w" ,
304+ u .GetNamespace (), u .GetName (), err ))
242305 }
243- errs = append (errs , fmt .Errorf ("re-store %s/%s: %w" ,
244- u .GetNamespace (), u .GetName (), err ))
245306 return nil
246307 }
247- r .cache .add (crd .Name , u .GetUID (), u .GetResourceVersion ())
308+ r .cache .add (crd .Name , restored .GetUID (), restored .GetResourceVersion ())
248309 return nil
249310 }); err != nil {
250311 errs = append (errs , err )
@@ -256,36 +317,56 @@ func (r *StorageVersionMigratorReconciler) restoreCRs(
256317 }
257318 }
258319
320+ if len (errs ) == 0 && conflicts > 0 {
321+ return errMigrationRetriedDueToConflicts
322+ }
259323 return kerrors .NewAggregate (errs )
260324}
261325
262- // restoreOne issues a metadata-only SSA for a single CR. Prefers the /status
263- // subresource (to bypass validating/mutating webhooks), falls back to the main
264- // resource if the CRD has no status subresource.
326+ // MigrationTimestampAnnotation is set on each CR by the migrator to force a
327+ // real content diff on Update. The apiserver's storage layer elides no-op
328+ // writes (an Update with bytes equal to what's already in etcd does not
329+ // re-encode and does not bump resourceVersion), so a Get + Update on the live
330+ // object is not enough on its own — there must be at least one byte of
331+ // difference. Mutating this annotation guarantees the apiserver re-writes the
332+ // object, which is exactly the storage-version re-encode we need. The value
333+ // is the RFC3339Nano timestamp of the most recent successful migration.
334+ //
335+ // This matches the upstream kube-storage-version-migrator's approach (which
336+ // also uses an annotation toggle) and is part of the public contract: do not
337+ // remove or rename this constant across releases without a migration plan.
338+ const MigrationTimestampAnnotation = "toolhive.stacklok.dev/storage-version-migrated-at"
339+
340+ // restoreOne issues a Get + Update on the live CR, mutating the migration
341+ // timestamp annotation to force a real content diff so the apiserver actually
342+ // re-encodes the object at the current storage version. The Update goes
343+ // through the main resource (not /status), so any validating/mutating
344+ // admission webhooks on the kind WILL see this write. CRDs that need to avoid
345+ // webhook side effects from this controller should configure their webhooks
346+ // to ignore writes that change only this annotation. Returns the live object
347+ // after the update so the caller can record its post-update resourceVersion
348+ // in the cache.
265349func (r * StorageVersionMigratorReconciler ) restoreOne (
266350 ctx context.Context ,
267351 gvk schema.GroupVersionKind ,
268352 original * unstructured.Unstructured ,
269- hasStatusSubresource bool ,
270- ) error {
271- patch := & unstructured.Unstructured {}
272- patch .SetGroupVersionKind (gvk )
273- patch .SetName (original .GetName ())
274- patch .SetNamespace (original .GetNamespace ())
275- patch .SetUID (original .GetUID ()) // UID mismatch → typed conflict on races
276- patch .SetResourceVersion (original .GetResourceVersion ()) // RV mismatch → typed conflict on races
277-
278- applyConfig := client .ApplyConfigurationFromUnstructured (patch )
279- if hasStatusSubresource {
280- return r .Client .Status ().Apply (ctx , applyConfig ,
281- client .FieldOwner (StorageVersionMigratorFieldManager ),
282- client .ForceOwnership ,
283- )
353+ ) (* unstructured.Unstructured , error ) {
354+ live := & unstructured.Unstructured {}
355+ live .SetGroupVersionKind (gvk )
356+ if err := r .APIReader .Get (ctx , client .ObjectKeyFromObject (original ), live ); err != nil {
357+ // IsNotFound is propagated to the caller, which handles it.
358+ return nil , err
284359 }
285- return r .Apply (ctx , applyConfig ,
286- client .FieldOwner (StorageVersionMigratorFieldManager ),
287- client .ForceOwnership ,
288- )
360+ annotations := live .GetAnnotations ()
361+ if annotations == nil {
362+ annotations = map [string ]string {}
363+ }
364+ annotations [MigrationTimestampAnnotation ] = time .Now ().UTC ().Format (time .RFC3339Nano )
365+ live .SetAnnotations (annotations )
366+ if err := r .Update (ctx , live ); err != nil {
367+ return nil , err
368+ }
369+ return live , nil
289370}
290371
291372// patchStoredVersions overwrites CRD.status.storedVersions to exactly
@@ -327,41 +408,17 @@ func findStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) (string,
327408 return "" , false
328409}
329410
330- // isMigrationNeeded returns true if status.storedVersions contains any entry
331- // other than the current storage version. If storedVersions equals exactly
332- // [storageVersion] AND only one version is served, no work is needed — this
333- // second condition avoids spurious reconciles once a deprecated version has
334- // been fully removed from spec.versions .
411+ // isMigrationNeeded returns true iff status.storedVersions is anything other
412+ // than exactly [storageVersion]. The set of served versions does not affect
413+ // this check — under spec.conversion.strategy=None with identical schemas,
414+ // normal writers cannot reintroduce stale versions to storedVersions, so a
415+ // defensive re-scan based on servedCount has no scenario to defend against .
335416func isMigrationNeeded (
336417 crd * apiextensionsv1.CustomResourceDefinition ,
337418 storageVersion string ,
338419) bool {
339420 stored := crd .Status .StoredVersions
340- if len (stored ) != 1 || stored [0 ] != storageVersion {
341- return true
342- }
343- servedCount := 0
344- for _ , v := range crd .Spec .Versions {
345- if v .Served {
346- servedCount ++
347- }
348- }
349- return servedCount > 1
350- }
351-
352- // crdHasStatusSubresource returns true if the CRD's version declares a status
353- // subresource. Missing → fall back to main-resource SSA.
354- func crdHasStatusSubresource (
355- crd * apiextensionsv1.CustomResourceDefinition ,
356- version string ,
357- ) bool {
358- for _ , v := range crd .Spec .Versions {
359- if v .Name != version {
360- continue
361- }
362- return v .Subresources != nil && v .Subresources .Status != nil
363- }
364- return false
421+ return len (stored ) != 1 || stored [0 ] != storageVersion
365422}
366423
367424// ------------------------------------------------------------------
@@ -371,7 +428,12 @@ func crdHasStatusSubresource(
371428// migrationCache records successfully-migrated (UID, resourceVersion) pairs
372429// so subsequent reconciles within the TTL window skip already-fresh objects.
373430// It is a correctness optimization only — a cache miss simply issues a
374- // redundant (but harmless) SSA.
431+ // redundant (but harmless) Update.
432+ //
433+ // Eviction: lazy on lookup in has(), plus a periodic sweep via gc() driven
434+ // from a manager.Runnable registered in SetupWithManager. The periodic sweep
435+ // is required because lookups never recur for deleted CRs, so without it
436+ // their entries would persist forever.
375437type migrationCache struct {
376438 mu sync.Mutex
377439 entries map [string ]cacheEntry
@@ -417,6 +479,20 @@ func (c *migrationCache) add(crdName string, uid apitypes.UID, resourceVersion s
417479 }
418480}
419481
482+ // gc evicts every expired entry from the cache. Called from a periodic
483+ // manager.Runnable so entries for deleted CRs (whose UIDs never recur in
484+ // subsequent list pages) don't accumulate without bound.
485+ func (c * migrationCache ) gc () {
486+ c .mu .Lock ()
487+ defer c .mu .Unlock ()
488+ now := c .now ()
489+ for k , e := range c .entries {
490+ if now .After (e .expiresAt ) {
491+ delete (c .entries , k )
492+ }
493+ }
494+ }
495+
420496func (* migrationCache ) key (crdName string , uid apitypes.UID ) string {
421497 return crdName + "|" + string (uid )
422498}
0 commit comments