Skip to content

Commit 0b21c09

Browse files
committed
Merge branch 'main' into APIGOV-31899
2 parents a4319e0 + 2a298fc commit 0b21c09

55 files changed

Lines changed: 1944 additions & 1647 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.security-profile-branches.json

Lines changed: 0 additions & 17 deletions
This file was deleted.

.security-profile-latest.json

Lines changed: 0 additions & 17 deletions
This file was deleted.

go.sum

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
429429
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
430430
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
431431
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
432-
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
433-
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
434432
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
435433
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
436434
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -468,8 +466,6 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
468466
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
469467
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
470468
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
471-
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
472-
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
473469
golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI=
474470
golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY=
475471
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -508,8 +504,6 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
508504
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
509505
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
510506
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
511-
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
512-
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
513507
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
514508
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
515509
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -579,12 +573,8 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc
579573
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
580574
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
581575
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
582-
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
583-
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
584576
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
585577
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
586-
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 h1:bTLqdHv7xrGlFbvf5/TXNxy/iUwwdkjhqQTJDjW7aj0=
587-
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4/go.mod h1:g5NllXBEermZrmR51cJDQxmJUHUOfRAaNyWBM+R+548=
588578
golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c h1:6a8FdnNk6bTXBjR4AGKFgUKuo+7GnR3FX5L7CbveeZc=
589579
golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c/go.mod h1:TpUTTEp9frx7rTdLpC9gFG9kdI7zVLFTFFlqaH2Cncw=
590580
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -596,8 +586,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
596586
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
597587
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
598588
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
599-
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
600-
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
601589
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
602590
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
603591
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -653,8 +641,6 @@ golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f
653641
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
654642
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
655643
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
656-
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
657-
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
658644
golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s=
659645
golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0=
660646
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

pkg/agent/events/eventlistener.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type APIClient interface {
2828
// EventListener holds the various caches to save events into as they get written to the source channel.
2929
type EventListener struct {
3030
ctx context.Context
31-
cancel context.CancelFunc
31+
cancel context.CancelCauseFunc
3232
client APIClient
3333
handlers []handler.Handler
3434
logger log.FieldLogger
@@ -37,10 +37,10 @@ type EventListener struct {
3737
}
3838

3939
// NewListenerFunc type for creating a new listener
40-
type NewListenerFunc func(ctx context.Context, cancel context.CancelFunc, source chan *proto.Event, client APIClient, sequenceManager SequenceProvider, cbs ...handler.Handler) *EventListener
40+
type NewListenerFunc func(ctx context.Context, cancel context.CancelCauseFunc, source chan *proto.Event, client APIClient, sequenceManager SequenceProvider, cbs ...handler.Handler) *EventListener
4141

4242
// NewEventListener creates a new EventListener to process events based on the provided Handlers.
43-
func NewEventListener(ctx context.Context, cancel context.CancelFunc, source chan *proto.Event, client APIClient, sequenceManager SequenceProvider, cbs ...handler.Handler) *EventListener {
43+
func NewEventListener(ctx context.Context, cancel context.CancelCauseFunc, source chan *proto.Event, client APIClient, sequenceManager SequenceProvider, cbs ...handler.Handler) *EventListener {
4444
logger := log.NewFieldLogger().
4545
WithComponent("EventListener").
4646
WithPackage("sdk.agent.events")
@@ -59,7 +59,7 @@ func NewEventListener(ctx context.Context, cancel context.CancelFunc, source cha
5959
// Stop stops the listener
6060
func (em *EventListener) Stop() {
6161
if em != nil {
62-
em.cancel()
62+
em.cancel(nil)
6363
}
6464
}
6565

@@ -92,10 +92,10 @@ func (em *EventListener) start() (done bool, err error) {
9292
}
9393

9494
if err := em.handleEvent(event); err != nil {
95-
em.logger.WithError(err).Error("stream event listener error")
95+
em.logger.WithError(err).Error("stream event listener error handling event")
9696
}
9797
case <-em.ctx.Done():
98-
em.logger.Trace("stream event listener has been gracefully stopped")
98+
em.logger.Trace("stream event listener context is done")
9999
done = true
100100
err = nil
101101
break

pkg/agent/events/eventlistener_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestEventListener_start(t *testing.T) {
5959
sequenceManager := NewSequenceProvider(cacheManager, "testWatch")
6060
for _, tc := range tests {
6161
t.Run(tc.name, func(t *testing.T) {
62-
ctx, cancel := context.WithCancel(context.Background())
62+
ctx, cancel := context.WithCancelCause(context.Background())
6363
listener := NewEventListener(ctx, cancel, tc.events, tc.client, sequenceManager, tc.handler)
6464

6565
errCh := make(chan error)
@@ -100,14 +100,14 @@ func TestEventListener_Listen(t *testing.T) {
100100
cacheManager := agentcache.NewAgentCacheManager(&config.CentralConfiguration{}, false)
101101
sequenceManager := NewSequenceProvider(cacheManager, "testWatch")
102102
events := make(chan *proto.Event)
103-
ctx, cancel := context.WithCancel(context.Background())
103+
ctx, cancel := context.WithCancelCause(context.Background())
104104
listener := NewEventListener(ctx, cancel, events, &mockAPIClient{}, sequenceManager, &mockHandler{})
105105
listener.Listen()
106106
listener.Stop()
107107
err := ctx.Err()
108108
assert.NotNil(t, err)
109109

110-
ctx, cancel = context.WithCancel(context.Background())
110+
ctx, cancel = context.WithCancelCause(context.Background())
111111
listener = NewEventListener(ctx, cancel, events, &mockAPIClient{}, sequenceManager, &mockHandler{})
112112
listener.Listen()
113113
close(events)
@@ -173,7 +173,7 @@ func TestEventListener_handleEvent(t *testing.T) {
173173
},
174174
}
175175

176-
ctx, cancel := context.WithCancel(context.Background())
176+
ctx, cancel := context.WithCancelCause(context.Background())
177177
listener := NewEventListener(ctx, cancel, make(chan *proto.Event), tc.client, sequenceManager, tc.handler)
178178

179179
err := listener.handleEvent(event)

pkg/agent/events/requestqueue.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,18 @@ type RequestQueue interface {
2020
// requestQueue
2121
type requestQueue struct {
2222
ctx context.Context
23-
cancel context.CancelFunc
23+
cancel context.CancelCauseFunc
2424
logger log.FieldLogger
2525
requestCh chan *proto.Request
2626
receiveCh chan *proto.Request
2727
isActive atomic.Bool
2828
}
2929

3030
// NewRequestQueueFunc type for creating a new request queue
31-
type NewRequestQueueFunc func(ctx context.Context, cancel context.CancelFunc, requestCh chan *proto.Request) RequestQueue
31+
type NewRequestQueueFunc func(ctx context.Context, cancel context.CancelCauseFunc, requestCh chan *proto.Request) RequestQueue
3232

3333
// NewRequestQueue creates a new queue for the requests to be sent for watch subscription
34-
func NewRequestQueue(ctx context.Context, cancel context.CancelFunc, requestCh chan *proto.Request) RequestQueue {
34+
func NewRequestQueue(ctx context.Context, cancel context.CancelCauseFunc, requestCh chan *proto.Request) RequestQueue {
3535
logger := log.NewFieldLogger().
3636
WithComponent("requestQueue").
3737
WithPackage("sdk.agent.events")
@@ -52,7 +52,7 @@ func (q *requestQueue) Stop() {
5252
}
5353

5454
defer q.isActive.Store(false)
55-
q.cancel()
55+
q.cancel(nil)
5656
close(q.receiveCh)
5757
}
5858

@@ -66,8 +66,12 @@ func (q *requestQueue) Write(request *proto.Request) error {
6666
}
6767

6868
q.logger.WithField("requestType", request.RequestType).Trace("received stream request")
69-
q.receiveCh <- request
70-
return nil
69+
select {
70+
case q.receiveCh <- request:
71+
return nil
72+
case <-q.ctx.Done():
73+
return q.ctx.Err()
74+
}
7175
}
7276

7377
func (q *requestQueue) Start() {

pkg/agent/events/requestqueue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestRequestQueue(t *testing.T) {
2929
for _, tc := range cases {
3030
requestCh := make(chan *proto.Request, 1)
3131
t.Run(tc.name, func(t *testing.T) {
32-
ctx, cancel := context.WithCancel(context.Background())
32+
ctx, cancel := context.WithCancelCause(context.Background())
3333
q := NewRequestQueue(ctx, cancel, requestCh)
3434
var receivedReq *proto.Request
3535
wg := sync.WaitGroup{}

pkg/agent/poller/client.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ func NewPollClient(
6565

6666
// Start the polling client
6767
func (p *PollClient) Start() error {
68-
ctx, cancel := context.WithCancel(context.Background())
69-
defer cancel()
70-
eventCh, eventErrorCh := make(chan *proto.Event), make(chan error)
68+
ctx, cancel := context.WithCancelCause(context.Background())
69+
defer cancel(nil)
70+
eventCh := make(chan *proto.Event)
7171

7272
p.mutex.Lock()
7373

@@ -76,7 +76,7 @@ func (p *PollClient) Start() error {
7676
p.poller = p.newPollManager(p.interval, withOnStop(p.onClientStop), withHarvester(p.harvesterConfig), WithContext(ctx, cancel))
7777
p.mutex.Unlock()
7878
p.listener.Listen()
79-
p.poller.RegisterWatch(eventCh, eventErrorCh)
79+
p.poller.RegisterWatch(eventCh)
8080

8181
if p.onStreamConnection != nil {
8282
p.onStreamConnection()
@@ -86,12 +86,11 @@ func (p *PollClient) Start() error {
8686
p.initialized = true
8787
p.mutex.Unlock()
8888

89-
select {
90-
case err := <-eventErrorCh:
91-
return err
92-
case <-ctx.Done():
93-
return fmt.Errorf("poll client context has been closed")
89+
<-ctx.Done()
90+
if cause := context.Cause(ctx); cause != nil {
91+
return cause
9492
}
93+
return fmt.Errorf("poll client context has been closed")
9594
}
9695

9796
// Status returns an error if the poller is not running

pkg/agent/poller/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func withOnStop(cb onClientStopCb) executorOpt {
5353
}
5454
}
5555

56-
func WithContext(ctx context.Context, cancel context.CancelFunc) executorOpt {
56+
func WithContext(ctx context.Context, cancel context.CancelCauseFunc) executorOpt {
5757
return func(m *pollExecutor) {
5858
m.ctx = ctx
5959
m.cancel = cancel

pkg/agent/poller/poller.go

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
type pollExecutor struct {
1616
ctx context.Context
17-
cancel context.CancelFunc
17+
cancel context.CancelCauseFunc
1818
harvester harvester.Harvest
1919
sequence events.SequenceProvider
2020
topicSelfLink string
@@ -33,7 +33,7 @@ func newPollExecutor(interval time.Duration, options ...executorOpt) *pollExecut
3333
WithComponent("pollExecutor").
3434
WithPackage("sdk.agent.poller")
3535

36-
ctx, cancel := context.WithCancel(context.Background())
36+
ctx, cancel := context.WithCancelCause(context.Background())
3737
pm := &pollExecutor{
3838
logger: logger,
3939
timer: time.NewTimer(interval),
@@ -50,32 +50,28 @@ func newPollExecutor(interval time.Duration, options ...executorOpt) *pollExecut
5050
}
5151

5252
// RegisterWatch registers a watch topic for polling events and publishing events on a channel
53-
func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan error) {
53+
func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event) {
5454
m.logger.Trace("register watch topic for polling and publishing events")
5555
if m.harvester == nil {
56-
go func() {
57-
m.Stop()
58-
errChan <- fmt.Errorf("harvester is not configured for the polling client")
59-
}()
56+
err := fmt.Errorf("harvester is not configured for the polling client")
57+
m.cancel(err)
58+
m.Stop()
6059
return
6160
}
6261

6362
if m.sequence.GetSequence() < 0 {
6463
m.onHarvesterErr()
65-
go func() {
66-
m.Stop()
67-
errChan <- fmt.Errorf("do not have a sequence id, stopping poller")
68-
}()
64+
err := fmt.Errorf("do not have a sequence id, stopping poller")
65+
m.cancel(err)
66+
m.Stop()
6967
return
7068
}
7169

7270
if err := m.harvester.EventCatchUp(m.ctx, m.topicSelfLink, eventChan); err != nil {
7371
m.logger.WithError(err).Error("harvester returned an error when syncing events")
7472
m.onHarvesterErr()
75-
go func() {
76-
m.Stop()
77-
errChan <- err
78-
}()
73+
m.cancel(err)
74+
m.Stop()
7975
return
8076
}
8177

@@ -84,9 +80,10 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e
8480
m.lock.Unlock()
8581

8682
go func() {
87-
err := m.sync(m.topicSelfLink, eventChan)
83+
if err := m.sync(m.topicSelfLink, eventChan); err != nil {
84+
m.cancel(err)
85+
}
8886
m.Stop()
89-
errChan <- err
9087
}()
9188
}
9289

@@ -142,7 +139,7 @@ func (m *pollExecutor) onHarvesterErr() {
142139
// Stop stops the poller
143140
func (m *pollExecutor) Stop() {
144141
m.timer.Stop()
145-
m.cancel()
142+
m.cancel(nil)
146143

147144
m.lock.Lock()
148145
defer m.lock.Unlock()

0 commit comments

Comments
 (0)