Skip to content

Commit 672947f

Browse files
committed
add informer sync timeout
Signed-off-by: Eric Pickard <piceri@github.com>
1 parent 1b2e649 commit 672947f

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

internal/controller/controller.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ const (
3838
// deployment record API. Once an artifact is known to be missing,
3939
// we suppress further API calls for this duration.
4040
unknownArtifactTTL = 1 * time.Hour
41+
42+
// informerSyncTimeoutDuration is the maximum duration of time allowed
43+
// for the informers to sync to prevent the controller from hanging indefinitely.
44+
informerSyncTimeoutDuration = 60 * time.Second
4145
)
4246

4347
type ttlCache interface {
@@ -92,6 +96,9 @@ type Controller struct {
9296
// best effort cache to suppress API calls for artifacts that
9397
// returned a 404 (no artifact found). Keyed by image digest.
9498
unknownArtifacts ttlCache
99+
// informerSyncTimeout is the maximum time allowed for all informers to sync
100+
// and prevents sync from hanging indefinitely.
101+
informerSyncTimeout time.Duration
95102
}
96103

97104
// New creates a new deployment tracker controller.
@@ -160,6 +167,7 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato
160167
cfg: cfg,
161168
observedDeployments: amcache.NewExpiring(),
162169
unknownArtifacts: amcache.NewExpiring(),
170+
informerSyncTimeout: informerSyncTimeoutDuration,
163171
}
164172

165173
// Add event handlers to the informer
@@ -320,16 +328,20 @@ func (c *Controller) Run(ctx context.Context, workers int) error {
320328

321329
// Wait for the caches to be synced
322330
slog.Info("Waiting for informer caches to sync")
323-
if !cache.WaitForCacheSync(ctx.Done(),
331+
informerSyncCxt, cancel := context.WithTimeout(ctx, c.informerSyncTimeout)
332+
333+
if !cache.WaitForCacheSync(informerSyncCxt.Done(),
324334
c.podInformer.HasSynced,
325335
c.deploymentInformer.HasSynced,
326336
c.daemonSetInformer.HasSynced,
327337
c.statefulSetInformer.HasSynced,
328338
c.jobInformer.HasSynced,
329339
c.cronJobInformer.HasSynced,
330340
) {
331-
return errors.New("timed out waiting for caches to sync")
341+
cancel()
342+
return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions")
332343
}
344+
cancel()
333345

334346
slog.Info("Starting workers",
335347
"count", workers,

internal/controller/controller_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import (
1212
"github.com/stretchr/testify/require"
1313
corev1 "k8s.io/api/core/v1"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/runtime"
1516
amcache "k8s.io/apimachinery/pkg/util/cache"
17+
"k8s.io/client-go/kubernetes/fake"
18+
k8stesting "k8s.io/client-go/testing"
19+
"k8s.io/client-go/util/workqueue"
1620
)
1721

1822
// mockPoster records all PostOne calls and returns a configurable error.
@@ -533,3 +537,46 @@ func TestIsTerminalPhase(t *testing.T) {
533537
})
534538
}
535539
}
540+
541+
func TestRun_InformerSyncTimeout(t *testing.T) {
542+
t.Parallel()
543+
fakeClient := fake.NewSimpleClientset()
544+
fakeClient.PrependReactor("list", "*", func(action k8stesting.Action) (bool, runtime.Object, error) {
545+
// Block until the test context is cancelled.
546+
<-make(chan struct{})
547+
return true, nil, nil
548+
})
549+
550+
factory := createInformerFactory(fakeClient, "", "")
551+
552+
ctrl := &Controller{
553+
clientset: fakeClient,
554+
podInformer: factory.Core().V1().Pods().Informer(),
555+
deploymentInformer: factory.Apps().V1().Deployments().Informer(),
556+
daemonSetInformer: factory.Apps().V1().DaemonSets().Informer(),
557+
statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(),
558+
jobInformer: factory.Batch().V1().Jobs().Informer(),
559+
cronJobInformer: factory.Batch().V1().CronJobs().Informer(),
560+
workqueue: workqueue.NewTypedRateLimitingQueue(
561+
workqueue.DefaultTypedControllerRateLimiter[PodEvent](),
562+
),
563+
apiClient: &mockPoster{},
564+
cfg: &Config{},
565+
observedDeployments: amcache.NewExpiring(),
566+
unknownArtifacts: amcache.NewExpiring(),
567+
informerSyncTimeout: 2 * time.Second,
568+
}
569+
570+
errCh := make(chan error, 1)
571+
go func() {
572+
errCh <- ctrl.Run(context.Background(), 1)
573+
}()
574+
575+
select {
576+
case err := <-errCh:
577+
require.Error(t, err)
578+
assert.Contains(t, err.Error(), "timed out waiting for caches to sync")
579+
case <-time.After(5 * time.Second):
580+
t.Fatal("Run did not return within 5 seconds — informer sync timeout was 2 seconds")
581+
}
582+
}

0 commit comments

Comments
 (0)