Skip to content

Commit e364a98

Browse files
committed
feat: use new cluster job endpoint in place of synchronous set cluster endpoint
1 parent 7757dfe commit e364a98

10 files changed

Lines changed: 816 additions & 108 deletions

File tree

cmd/deployment-tracker/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func main() {
9696
GHAppPrivateKey: ghAppPrivateKey,
9797
GHAppPrivateKeyPath: getEnvOrDefault("GH_APP_PRIV_KEY_PATH", ""),
9898
Organization: os.Getenv("GITHUB_ORG"),
99+
BulkClusterSync: strings.EqualFold(getEnvOrDefault("BULK_CLUSTER_SYNC", "false"), "true"),
99100
}
100101

101102
if len(cntrlCfg.GHAppPrivateKey) > 0 && cntrlCfg.GHAppPrivateKeyPath != "" {

internal/controller/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ type Config struct {
2929
GHAppPrivateKey []byte
3030
GHAppPrivateKeyPath string
3131
Organization string
32+
// BulkClusterSync enables the async cluster job endpoint for startup
33+
// state sync. When false, startup sync is skipped and only individual
34+
// PostOne calls are used. Note: this is experimental and requires
35+
// enablement of a feature flag by GitHub. If enabled without the API-side
36+
// feature flag, the controller will not post deployment records at startup,
37+
// only ongoing pod events that arrive after the initial informer sync.
38+
BulkClusterSync bool
3239
}
3340

3441
// ValidTemplate verifies that at least one placeholder is present

internal/controller/controller.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ type ttlCache interface {
4949

5050
type deploymentRecordPoster interface {
5151
PostOne(ctx context.Context, record *deploymentrecord.Record) error
52-
PostCluster(ctx context.Context, records []*deploymentrecord.Record, cluster string) ([]byte, error)
52+
CreateClusterJob(ctx context.Context, records []*deploymentrecord.Record, cluster string) (*deploymentrecord.JobResponse, error)
53+
WaitForClusterJob(ctx context.Context, cluster string, jobID int64) (*deploymentrecord.JobStatus, error)
5354
}
5455

5556
type podMetadataAggregator interface {
@@ -89,7 +90,10 @@ type Controller struct {
8990
// informerSyncTimeout is the maximum time allowed for all informers to sync
9091
// and prevents sync from hanging indefinitely.
9192
informerSyncTimeout time.Duration
92-
// syncing tracks if the kubernetes informers have finished syncing
93+
// syncing gates informer event handlers during startup. When true,
94+
// pod add events are suppressed so they can be reported via the bulk
95+
// cluster job instead of individual PostOne calls. Only set when
96+
// BulkClusterSync is enabled.
9397
syncing atomic.Bool
9498
}
9599

@@ -151,7 +155,12 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato
151155
unknownArtifacts: amcache.NewExpiring(),
152156
informerSyncTimeout: informerSyncTimeoutDuration,
153157
}
154-
cntrl.syncing.Store(true)
158+
// Only gate informer events when bulk cluster sync is enabled.
159+
// When disabled, all pods discovered during informer sync will be
160+
// enqueued as individual events.
161+
if cfg.BulkClusterSync {
162+
cntrl.syncing.Store(true)
163+
}
155164

156165
// Add event handlers to the informer
157166
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{

internal/controller/controller_integration_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.R
3737
return m.err
3838
}
3939

40-
func (m *mockRecordPoster) PostCluster(_ context.Context, _ []*deploymentrecord.Record, _ string) ([]byte, error) {
41-
return nil, nil
40+
func (m *mockRecordPoster) CreateClusterJob(_ context.Context, _ []*deploymentrecord.Record, _ string) (*deploymentrecord.JobResponse, error) {
41+
return &deploymentrecord.JobResponse{}, nil
42+
}
43+
44+
func (m *mockRecordPoster) WaitForClusterJob(_ context.Context, _ string, _ int64) (*deploymentrecord.JobStatus, error) {
45+
return &deploymentrecord.JobStatus{Status: "completed"}, nil
4246
}
4347

4448
// Helper that allows tests to read captured records safely.

internal/controller/controller_test.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ import (
2626
type mockPoster struct {
2727
mu sync.Mutex
2828
calls int
29-
clusterCalls int
3029
clusterRecordCount int
30+
jobCalls int
31+
jobWaitCalls int
3132
lastErr error
32-
clusterResp []byte
33-
clusterErr error
33+
jobResp *deploymentrecord.JobResponse
34+
jobErr error
35+
jobStatus *deploymentrecord.JobStatus
36+
jobWaitErr error
3437
}
3538

3639
func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.Record) error {
@@ -40,12 +43,19 @@ func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.Record) erro
4043
return m.lastErr
4144
}
4245

43-
func (m *mockPoster) PostCluster(_ context.Context, records []*deploymentrecord.Record, _ string) ([]byte, error) {
46+
func (m *mockPoster) CreateClusterJob(_ context.Context, records []*deploymentrecord.Record, _ string) (*deploymentrecord.JobResponse, error) {
4447
m.mu.Lock()
4548
defer m.mu.Unlock()
46-
m.clusterCalls++
49+
m.jobCalls++
4750
m.clusterRecordCount = len(records)
48-
return m.clusterResp, m.clusterErr
51+
return m.jobResp, m.jobErr
52+
}
53+
54+
func (m *mockPoster) WaitForClusterJob(_ context.Context, _ string, _ int64) (*deploymentrecord.JobStatus, error) {
55+
m.mu.Lock()
56+
defer m.mu.Unlock()
57+
m.jobWaitCalls++
58+
return m.jobStatus, m.jobWaitErr
4959
}
5060

5161
func (m *mockPoster) getPostOneCalls() int {
@@ -54,10 +64,16 @@ func (m *mockPoster) getPostOneCalls() int {
5464
return m.calls
5565
}
5666

57-
func (m *mockPoster) getPostClusterCalls() int {
67+
func (m *mockPoster) getCreateClusterJobCalls() int {
68+
m.mu.Lock()
69+
defer m.mu.Unlock()
70+
return m.jobCalls
71+
}
72+
73+
func (m *mockPoster) getWaitForClusterJobCalls() int {
5874
m.mu.Lock()
5975
defer m.mu.Unlock()
60-
return m.clusterCalls
76+
return m.jobWaitCalls
6177
}
6278

6379
// mockResolver is a test double for the workloadResolver interface.
@@ -90,6 +106,7 @@ func newTestController(poster *mockPoster) *Controller {
90106
LogicalEnvironment: "test",
91107
PhysicalEnvironment: "test",
92108
Cluster: "test",
109+
BulkClusterSync: true,
93110
},
94111
workloadResolver: &mockResolver{},
95112
metadataAggregator: &mockMetadataAggregator{},

internal/controller/reporting.go

Lines changed: 86 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package controller
22

33
import (
44
"context"
5-
"encoding/json"
65
"errors"
76
"fmt"
87
"log/slog"
@@ -118,42 +117,73 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
118117
}
119118

120119
func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []any) error {
120+
if !c.cfg.BulkClusterSync {
121+
slog.Info("Async cluster sync disabled, skipping startup sync")
122+
return nil
123+
}
124+
121125
syncRecords := c.makeSyncRecords(ctx, syncClusterPods)
122126
if len(syncRecords) == 0 {
123127
slog.Info("No sync records to post")
124128
return nil
125129
}
126130

127-
respBody, err := c.apiClient.PostCluster(ctx, syncRecords, c.cfg.Cluster)
128-
var clusterNoRepositoriesError *deploymentrecord.ClusterNoRepositoriesError
131+
jobResp, err := c.apiClient.CreateClusterJob(ctx, syncRecords, c.cfg.Cluster)
129132
if err != nil {
130-
if errors.As(err, &clusterNoRepositoriesError) {
131-
slog.Info("Cluster sync found no creatable records",
133+
var conflictErr *deploymentrecord.ClusterJobConflictError
134+
var noReposErr *deploymentrecord.ClusterNoRepositoriesError
135+
136+
switch {
137+
case errors.As(err, &conflictErr):
138+
slog.Warn("Cluster job already in progress, skipping startup sync",
132139
"org", c.cfg.Organization,
133140
)
141+
c.fillCachesFromSubmitted(syncRecords)
134142
return nil
143+
144+
case errors.As(err, &noReposErr):
145+
slog.Info("Async cluster endpoint not available, skipping startup sync",
146+
"org", c.cfg.Organization,
147+
)
148+
return nil
149+
150+
default:
151+
slog.Error("Failed to create cluster job",
152+
"error", err,
153+
"record_count", len(syncRecords),
154+
)
155+
return fmt.Errorf("failed to create cluster job: %w", err)
135156
}
136-
slog.Error("Failed to post sync cluster records",
137-
"error", err,
138-
"record_count", len(syncRecords),
157+
}
158+
159+
if len(jobResp.Errors) > 0 {
160+
slog.Warn("Some deployments rejected from job submission",
161+
"job_id", jobResp.JobID,
162+
"rejected_count", len(jobResp.Errors),
139163
)
140-
return fmt.Errorf("failed to post sync cluster records: %w", err)
141164
}
142-
var deploymentRecords deploymentrecord.RecordsClusterResp
143-
err = json.Unmarshal(respBody, &deploymentRecords)
165+
166+
// Wait for job completion with a timeout to prevent indefinite startup delay.
167+
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
168+
defer cancel()
169+
jobStatus, err := c.apiClient.WaitForClusterJob(jobCtx, c.cfg.Cluster, jobResp.JobID)
144170
if err != nil {
145-
slog.Error("Failed to unmarshall response",
171+
slog.Error("Failed waiting for cluster job, filling caches from submitted records",
172+
"job_id", jobResp.JobID,
146173
"error", err,
147-
"record_count", len(syncRecords),
148174
)
175+
c.fillCachesFromSubmitted(syncRecords)
149176
return nil
150177
}
151-
slog.Info("Successfully posted sync cluster records",
152-
"created", len(deploymentRecords.DeploymentRecords),
153-
"errors", len(deploymentRecords.Errors),
178+
179+
slog.Info("Cluster job completed",
180+
"job_id", jobResp.JobID,
181+
"status", jobStatus.Status,
182+
"total_count", jobStatus.TotalCount,
183+
"errors", len(jobStatus.Errors),
154184
)
155185

156-
c.fillCaches(deploymentRecords)
186+
c.fillCachesFromJobResult(syncRecords, jobResp, jobStatus)
157187
return nil
158188
}
159189

@@ -224,20 +254,51 @@ func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any)
224254
return syncRecords
225255
}
226256

227-
func (c *Controller) fillCaches(deploymentRecords deploymentrecord.RecordsClusterResp) {
228-
slog.Info("Filling caches after posting sync cluster records")
229-
// Fill observedDeployments cache with successful digests
230-
for _, r := range deploymentRecords.DeploymentRecords {
257+
// fillCachesFromSubmitted populates the observedDeployments cache from the
258+
// records we submitted, without waiting for a response. Used when we can't
259+
// get a response (409 conflict, wait timeout, job failure).
260+
func (c *Controller) fillCachesFromSubmitted(records []*deploymentrecord.Record) {
261+
slog.Info("Filling observedDeployments cache from submitted records",
262+
"count", len(records),
263+
)
264+
for _, r := range records {
231265
cacheKey := getCacheKey(EventCreated, r.DeploymentName, r.Digest)
232266
c.observedDeployments.Set(cacheKey, true, 2*time.Minute)
233267
}
268+
}
269+
270+
// fillCachesFromJobResult populates both caches after an async job completes.
271+
// observedDeployments is filled from submitted records, and unknownArtifacts
272+
// is filled from error responses with cause "not_found".
273+
func (c *Controller) fillCachesFromJobResult(records []*deploymentrecord.Record, jobResp *deploymentrecord.JobResponse, jobStatus *deploymentrecord.JobStatus) {
274+
slog.Info("Filling caches after cluster job completion",
275+
"record_count", len(records),
276+
)
234277

235-
// Fill unknownArtifacts cache with unknown digests
236-
for _, r := range deploymentRecords.Errors {
237-
if r.Cause == "not_found" {
238-
c.unknownArtifacts.Set(r.Digest, true, unknownArtifactTTL)
278+
// Build a name→digests lookup from submitted records so we can
279+
// key unknownArtifacts by digest (which is how recordContainer looks them up).
280+
// Multiple records can share the same image name with different digests,
281+
// so we only cache when the mapping is unambiguous (exactly one digest per name).
282+
nameToDigests := make(map[string][]string, len(records))
283+
for _, r := range records {
284+
cacheKey := getCacheKey(EventCreated, r.DeploymentName, r.Digest)
285+
c.observedDeployments.Set(cacheKey, true, 2*time.Minute)
286+
nameToDigests[r.Name] = append(nameToDigests[r.Name], r.Digest)
287+
}
288+
289+
cacheUnknownDigests := func(errors []deploymentrecord.JobError) {
290+
for _, e := range errors {
291+
if e.Cause == "not_found" {
292+
if digests, ok := nameToDigests[e.Name]; ok && len(digests) == 1 {
293+
c.unknownArtifacts.Set(digests[0], true, unknownArtifactTTL)
294+
}
295+
}
239296
}
240297
}
298+
299+
// Fill unknownArtifacts from job submission and completion errors
300+
cacheUnknownDigests(jobResp.Errors)
301+
cacheUnknownDigests(jobStatus.Errors)
241302
}
242303

243304
// recordContainer records a single container's deployment info.

0 commit comments

Comments
 (0)