Skip to content

Commit ca19808

Browse files
piceriajbeattie
andauthored
Use cluster endpoint at startup (#90)
* add logic to process syncEvents, prevent sync events from entering queue Signed-off-by: Eric Pickard <piceri@github.com> * add PostCluster to client Signed-off-by: Eric Pickard <piceri@github.com> * add logic to fillCaches after PostCluster Signed-off-by: Eric Pickard <piceri@github.com> * enable partial success, refactors and improvements Signed-off-by: Eric Pickard <piceri@github.com> * add tests for processSyncEvents Signed-off-by: Eric Pickard <piceri@github.com> * add tests for PostCluster Signed-off-by: Eric Pickard <piceri@github.com> * update url variable names Signed-off-by: Eric Pickard <piceri@github.com> * refactor record.go, address linting items Signed-off-by: Eric Pickard <piceri@github.com> * add logging and comments Signed-off-by: Eric Pickard <piceri@github.com> * adjust syncRecords to include terminated jobs Signed-off-by: Eric Pickard <piceri@github.com> * add logging to non-rate limit 4xx responses Signed-off-by: Eric Pickard <piceri@github.com> * feat: use new cluster job endpoint in place of synchronous set cluster endpoint * fix: lint error * fix: address PR comments * fix: address PR comments * fix: update comment * chore: remove dead code --------- Signed-off-by: Eric Pickard <piceri@github.com> Co-authored-by: Austin Beattie <ajbeattie@github.com>
1 parent c6742a5 commit ca19808

10 files changed

Lines changed: 1393 additions & 164 deletions

File tree

cmd/deployment-tracker/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func main() {
9696
GHAppPrivateKey: ghAppPrivateKey,
9797
GHAppPrivateKeyPath: getEnvOrDefault("GH_APP_PRIV_KEY_PATH", ""),
9898
Organization: os.Getenv("GITHUB_ORG"),
99+
BulkClusterSync: strings.EqualFold(getEnvOrDefault("BULK_CLUSTER_SYNC", "false"), "true"),
99100
}
100101

101102
if len(cntrlCfg.GHAppPrivateKey) > 0 && cntrlCfg.GHAppPrivateKeyPath != "" {

internal/controller/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ type Config struct {
2929
GHAppPrivateKey []byte
3030
GHAppPrivateKeyPath string
3131
Organization string
32+
// BulkClusterSync enables the async cluster job endpoint for startup
33+
// state sync. When false, startup sync is skipped and only individual
34+
// PostOne calls are used. **Note: this is experimental and not yet available
35+
// for public use.**
36+
BulkClusterSync bool
3237
}
3338

3439
// ValidTemplate verifies that at least one placeholder is present

internal/controller/controller.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log/slog"
88
"strings"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/github/deployment-tracker/internal/metadata"
@@ -47,7 +48,9 @@ type ttlCache interface {
4748
}
4849

4950
type deploymentRecordPoster interface {
50-
PostOne(ctx context.Context, record *deploymentrecord.DeploymentRecord) error
51+
PostOne(ctx context.Context, record *deploymentrecord.Record) error
52+
CreateClusterJob(ctx context.Context, records []*deploymentrecord.Record, cluster string) (*deploymentrecord.JobResponse, error)
53+
WaitForClusterJob(ctx context.Context, cluster string, jobID int64) (*deploymentrecord.JobStatus, error)
5154
}
5255

5356
type podMetadataAggregator interface {
@@ -87,6 +90,11 @@ type Controller struct {
8790
// informerSyncTimeout is the maximum time allowed for all informers to sync
8891
// and prevents sync from hanging indefinitely.
8992
informerSyncTimeout time.Duration
93+
// syncing gates informer event handlers during startup. When true,
94+
// pod add events are suppressed so they can be reported via the bulk
95+
// cluster job instead of individual PostOne calls. Only set when
96+
// BulkClusterSync is enabled.
97+
syncing atomic.Bool
9098
}
9199

92100
// New creates a new deployment tracker controller.
@@ -147,10 +155,21 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato
147155
unknownArtifacts: amcache.NewExpiring(),
148156
informerSyncTimeout: informerSyncTimeoutDuration,
149157
}
158+
// Only gate informer events when bulk cluster sync is enabled.
159+
// When disabled, all pods discovered during informer sync will be
160+
// enqueued as individual events.
161+
if cfg.BulkClusterSync {
162+
cntrl.syncing.Store(true)
163+
}
150164

151165
// Add event handlers to the informer
152166
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
153167
AddFunc: func(obj any) {
168+
// Skip adding sync events
169+
if cntrl.syncing.Load() {
170+
return
171+
}
172+
154173
pod, ok := obj.(*corev1.Pod)
155174
if !ok {
156175
slog.Error("Invalid object returned",
@@ -314,6 +333,12 @@ func (c *Controller) Run(ctx context.Context, workers int) error {
314333
return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions")
315334
}
316335
}
336+
c.syncing.Store(false)
337+
syncClusterPods := c.podInformer.GetIndexer().List()
338+
err := c.processSyncEvents(ctx, syncClusterPods)
339+
if err != nil {
340+
return fmt.Errorf("sync events failed: %w", err)
341+
}
317342

318343
slog.Info("Starting workers",
319344
"count", workers,

internal/controller/controller_integration_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,27 @@ import (
2626

2727
type mockRecordPoster struct {
2828
mu sync.Mutex
29-
records []*deploymentrecord.DeploymentRecord
29+
records []*deploymentrecord.Record
3030
err error // to simulate failures
3131
}
3232

33-
func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.DeploymentRecord) error {
33+
func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.Record) error {
3434
m.mu.Lock()
3535
defer m.mu.Unlock()
3636
m.records = append(m.records, record)
3737
return m.err
3838
}
3939

40+
func (m *mockRecordPoster) CreateClusterJob(_ context.Context, _ []*deploymentrecord.Record, _ string) (*deploymentrecord.JobResponse, error) {
41+
return &deploymentrecord.JobResponse{}, nil
42+
}
43+
44+
func (m *mockRecordPoster) WaitForClusterJob(_ context.Context, _ string, _ int64) (*deploymentrecord.JobStatus, error) {
45+
return &deploymentrecord.JobStatus{Status: "completed"}, nil
46+
}
47+
4048
// Helper that allows tests to read captured records safely.
41-
func (m *mockRecordPoster) getRecords() []*deploymentrecord.DeploymentRecord {
49+
func (m *mockRecordPoster) getRecords() []*deploymentrecord.Record {
4250
m.mu.Lock()
4351
defer m.mu.Unlock()
4452
return slices.Clone(m.records)

internal/controller/controller_test.go

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/github/deployment-tracker/internal/metadata"
1112
"github.com/github/deployment-tracker/internal/workload"
1213
"github.com/github/deployment-tracker/pkg/deploymentrecord"
1314
"github.com/stretchr/testify/assert"
@@ -23,35 +24,78 @@ import (
2324

2425
// mockPoster records all PostOne calls and returns a configurable error.
2526
type mockPoster struct {
26-
mu sync.Mutex
27-
calls int
28-
lastErr error
27+
mu sync.Mutex
28+
calls int
29+
clusterRecordCount int
30+
jobCalls int
31+
jobWaitCalls int
32+
lastErr error
33+
jobResp *deploymentrecord.JobResponse
34+
jobErr error
35+
jobStatus *deploymentrecord.JobStatus
36+
jobWaitErr error
2937
}
3038

31-
func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRecord) error {
39+
func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.Record) error {
3240
m.mu.Lock()
3341
defer m.mu.Unlock()
3442
m.calls++
3543
return m.lastErr
3644
}
3745

38-
func (m *mockPoster) getCalls() int {
46+
func (m *mockPoster) CreateClusterJob(_ context.Context, records []*deploymentrecord.Record, _ string) (*deploymentrecord.JobResponse, error) {
47+
m.mu.Lock()
48+
defer m.mu.Unlock()
49+
m.jobCalls++
50+
m.clusterRecordCount = len(records)
51+
return m.jobResp, m.jobErr
52+
}
53+
54+
func (m *mockPoster) WaitForClusterJob(_ context.Context, _ string, _ int64) (*deploymentrecord.JobStatus, error) {
55+
m.mu.Lock()
56+
defer m.mu.Unlock()
57+
m.jobWaitCalls++
58+
return m.jobStatus, m.jobWaitErr
59+
}
60+
61+
func (m *mockPoster) getPostOneCalls() int {
3962
m.mu.Lock()
4063
defer m.mu.Unlock()
4164
return m.calls
4265
}
4366

67+
func (m *mockPoster) getCreateClusterJobCalls() int {
68+
m.mu.Lock()
69+
defer m.mu.Unlock()
70+
return m.jobCalls
71+
}
72+
73+
func (m *mockPoster) getWaitForClusterJobCalls() int {
74+
m.mu.Lock()
75+
defer m.mu.Unlock()
76+
return m.jobWaitCalls
77+
}
78+
4479
// mockResolver is a test double for the workloadResolver interface.
45-
type mockResolver struct{}
80+
type mockResolver struct {
81+
name string
82+
}
4683

47-
func (*mockResolver) Resolve(_ *corev1.Pod) workload.Identity {
48-
return workload.Identity{}
84+
func (m *mockResolver) Resolve(_ *corev1.Pod) workload.Identity {
85+
return workload.Identity{Name: m.name}
4986
}
5087

5188
func (*mockResolver) IsActive(_ string, _ workload.Identity) bool {
5289
return false
5390
}
5491

92+
// mockMetadataAggregator is a test double for the podMetadataAggregator interface.
93+
type mockMetadataAggregator struct{}
94+
95+
func (*mockMetadataAggregator) BuildAggregatePodMetadata(_ context.Context, _ *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata {
96+
return nil
97+
}
98+
5599
// newTestController creates a minimal Controller suitable for unit-testing
56100
// recordContainer without a real Kubernetes cluster.
57101
func newTestController(poster *mockPoster) *Controller {
@@ -62,8 +106,10 @@ func newTestController(poster *mockPoster) *Controller {
62106
LogicalEnvironment: "test",
63107
PhysicalEnvironment: "test",
64108
Cluster: "test",
109+
BulkClusterSync: true,
65110
},
66111
workloadResolver: &mockResolver{},
112+
metadataAggregator: &mockMetadataAggregator{},
67113
observedDeployments: amcache.NewExpiring(),
68114
unknownArtifacts: amcache.NewExpiring(),
69115
}

0 commit comments

Comments
 (0)