Skip to content

Commit 169c179

Browse files
authored
Merge branch 'main' into tejaswi/tee-attestation
2 parents c1870a5 + b11b4a9 commit 169c179

14 files changed

Lines changed: 689 additions & 13 deletions

File tree

.tool-versions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
golang 1.25.3
22
protoc 29.3
33
protoc-gen-go-grpc 1.3.0
4-
golangci-lint 2.4.0
4+
golangci-lint 2.11.4
55
mockery 2.53.3

pkg/capabilities/base_trigger.go

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ type BaseTriggerMetrics interface {
3939
IncInboxFull(triggerID string)
4040
EmitUndeliveredWarning(triggerID, eventID string)
4141
EmitUndeliveredCritical(triggerID, eventID string)
42+
// IncAckError counts ACK paths that return an error (e.g. store delete failure). reason is a stable identifier for dashboards.
43+
IncAckError(reason string)
44+
// IncAckMemoryOutcome records how an ACK related to the in-memory pending map: hit, miss_no_trigger_bucket, miss_no_event, miss_nil_record.
45+
IncAckMemoryOutcome(outcome string)
4246
}
4347

4448
type undeliveredState struct {
@@ -192,8 +196,12 @@ func (b *BaseTriggerCapability[T]) DeliverEvent(
192196
}
193197

194198
if err := b.store.Insert(ctx, rec); err != nil {
199+
b.lggr.Errorw("base trigger failed to persist pending event",
200+
"capabilityID", b.capabilityId, "triggerID", triggerID, "eventID", te.ID, "err", err)
195201
return err
196202
}
203+
b.lggr.Infow("base trigger persisted pending event for ACK tracking",
204+
"capabilityID", b.capabilityId, "triggerID", triggerID, "eventID", te.ID)
197205

198206
b.mu.Lock()
199207
if b.pending[triggerID] == nil {
@@ -236,27 +244,45 @@ func (b *BaseTriggerCapability[T]) sendToInbox(triggerID, eventID string, payloa
236244
func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId string, eventId string) error {
237245
b.lggr.Infow("Event ACK", "triggerID", triggerId, "eventID", eventId)
238246
if !b.retransmitEnabled() {
247+
b.lggr.Debugw("base trigger ACK skipped (retransmit disabled, no persistence/ACK tracking)",
248+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
249+
b.metrics.IncAckMemoryOutcome("skipped_retransmit_disabled")
239250
return nil
240251
}
241252

242253
var (
243-
attempts int
244-
firstAt time.Time
245-
found bool
254+
attempts int
255+
firstAt time.Time
256+
found bool
257+
hadTriggerBucket bool
258+
hadEventKey bool
259+
hadNilPendingRecord bool
246260
)
247261

248262
b.mu.Lock()
249-
if eventsForTrigger, ok := b.pending[triggerId]; ok && eventsForTrigger != nil {
250-
if rec, recOk := eventsForTrigger[eventId]; recOk && rec != nil {
263+
eventsForTrigger, ok := b.pending[triggerId]
264+
hadTriggerBucket = ok && eventsForTrigger != nil
265+
if hadTriggerBucket {
266+
rec, recOk := eventsForTrigger[eventId]
267+
hadEventKey = recOk
268+
switch {
269+
case recOk && rec != nil:
251270
attempts = rec.Attempts
252271
firstAt = rec.FirstAt
253272
found = true
273+
case recOk && rec == nil:
274+
hadNilPendingRecord = true
275+
b.metrics.IncAckMemoryOutcome("miss_nil_record")
276+
default:
277+
b.metrics.IncAckMemoryOutcome("miss_no_event")
254278
}
255279

256280
delete(eventsForTrigger, eventId)
257281
if len(eventsForTrigger) == 0 {
258282
delete(b.pending, triggerId)
259283
}
284+
} else {
285+
b.metrics.IncAckMemoryOutcome("miss_no_trigger_bucket")
260286
}
261287

262288
if m, ok := b.undeliveredAlertStates[triggerId]; ok {
@@ -267,12 +293,40 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin
267293
}
268294
b.mu.Unlock()
269295

270-
if found {
296+
switch {
297+
case found:
298+
b.lggr.Infow("base trigger ACK matched in-memory pending event",
299+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId,
300+
"attempts", attempts, "firstAt", firstAt)
301+
b.metrics.IncAckMemoryOutcome("hit")
271302
b.metrics.IncAck(triggerId, eventId)
272303
b.metrics.ObserveTimeToAck(triggerId, eventId, time.Since(firstAt), attempts)
304+
case hadNilPendingRecord:
305+
b.lggr.Warnw("base trigger ACK: pending map had nil record for event (treating as miss; reconciling store)",
306+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
307+
case hadTriggerBucket && !hadEventKey:
308+
b.lggr.Infow("base trigger ACK: event id not in in-memory pending map for trigger (may exist only in store; reconciling)",
309+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
310+
case !hadTriggerBucket:
311+
b.lggr.Infow("base trigger ACK: no in-memory pending bucket for trigger (not pending here; still deleting from store if row exists)",
312+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
273313
}
274314

275-
return b.store.DeleteEvent(ctx, triggerId, eventId)
315+
if err := b.store.DeleteEvent(ctx, triggerId, eventId); err != nil {
316+
b.lggr.Errorw("base trigger ACK failed to delete event from store",
317+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId,
318+
"foundInMemory", found, "err", err)
319+
b.metrics.IncAckError("store_delete_failed")
320+
return err
321+
}
322+
if found {
323+
b.lggr.Debugw("base trigger ACK store delete succeeded",
324+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
325+
} else {
326+
b.lggr.Infow("base trigger ACK store delete succeeded (memory miss path; store row removed if present)",
327+
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
328+
}
329+
return nil
276330
}
277331

278332
func (b *BaseTriggerCapability[T]) retransmitLoop() {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package capabilities
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"google.golang.org/protobuf/proto"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
11+
"github.com/smartcontractkit/chainlink-common/pkg/settings"
12+
"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
13+
)
14+
15+
// ResolveBaseTriggerRetryInterval returns the retransmit ticker interval for [BaseTriggerCapability].
16+
// When [cresettings.Default.BaseTriggerRetransmitEnabled] is false, it returns (0, nil) so the base
17+
// trigger delivers fire-and-forget without persistence or ACK tracking.
18+
// When enabled, [cresettings.Default.BaseTriggerRetryInterval] must be positive.
19+
func ResolveBaseTriggerRetryInterval(ctx context.Context, g settings.Getter, lggr logger.Logger) (retryInterval time.Duration, err error) {
20+
enabled, gerr := cresettings.Default.BaseTriggerRetransmitEnabled.GetOrDefault(ctx, g)
21+
if gerr != nil {
22+
lggr.Errorw("CRE settings read failed for base trigger retransmit flag; using default", "err", gerr)
23+
}
24+
if !enabled {
25+
return 0, nil
26+
}
27+
retryInterval, gerr = cresettings.Default.BaseTriggerRetryInterval.GetOrDefault(ctx, g)
28+
if gerr != nil {
29+
lggr.Errorw("CRE settings read failed for base trigger retry interval; using default", "err", gerr)
30+
}
31+
if retryInterval <= 0 {
32+
return 0, fmt.Errorf(
33+
"BaseTriggerRetransmitEnabled is true but BaseTriggerRetryInterval must be positive (got %s)",
34+
retryInterval,
35+
)
36+
}
37+
return retryInterval, nil
38+
}
39+
40+
// NewBaseTriggerCapabilityWithCRESettings builds a [BaseTriggerCapability] using global CRE settings
41+
// for retransmit enablement and interval. Undelivered warning/critical thresholds are derived from
42+
// the resolved interval when retransmit is enabled.
43+
func NewBaseTriggerCapabilityWithCRESettings[T proto.Message](
44+
ctx context.Context,
45+
store EventStore,
46+
newMsg func() T,
47+
lggr logger.Logger,
48+
capabilityID string,
49+
getter settings.Getter,
50+
) (*BaseTriggerCapability[T], error) {
51+
retry, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
52+
if err != nil {
53+
return nil, err
54+
}
55+
var undeliveredWarning, undeliveredCritical time.Duration
56+
if retry > 0 {
57+
undeliveredWarning = 5 * retry
58+
undeliveredCritical = 20 * retry
59+
}
60+
return NewBaseTriggerCapability(store, newMsg, lggr, capabilityID, retry, undeliveredWarning, undeliveredCritical), nil
61+
}

pkg/capabilities/base_trigger_metrics.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ type BaseTriggerBeholderMetrics struct {
1414
capabilityID string
1515
retryCount metric.Int64Counter
1616
ackCount metric.Int64Counter
17+
ackErrorCount metric.Int64Counter
18+
ackMemoryOutcomeCount metric.Int64Counter
1719
inboxMissingCount metric.Int64Counter
1820
inboxFullCount metric.Int64Counter
1921
undeliveredWarningCount metric.Int64Counter
@@ -34,6 +36,14 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
3436
if err != nil {
3537
return nil, err
3638
}
39+
ackErrorCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_ack_error_total")
40+
if err != nil {
41+
return nil, err
42+
}
43+
ackMemoryOutcomeCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_ack_memory_outcome_total")
44+
if err != nil {
45+
return nil, err
46+
}
3747
inboxMissingCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_inbox_missing_total")
3848
if err != nil {
3949
return nil, err
@@ -69,6 +79,8 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
6979
capabilityID: capabilityID,
7080
retryCount: retryCount,
7181
ackCount: ackCount,
82+
ackErrorCount: ackErrorCount,
83+
ackMemoryOutcomeCount: ackMemoryOutcomeCount,
7284
inboxMissingCount: inboxMissingCount,
7385
inboxFullCount: inboxFullCount,
7486
undeliveredWarningCount: undeliveredWarningCount,
@@ -111,6 +123,24 @@ func (m *BaseTriggerBeholderMetrics) IncAck(triggerID, eventID string) {
111123
)
112124
}
113125

126+
func (m *BaseTriggerBeholderMetrics) IncAckError(reason string) {
127+
m.ackErrorCount.Add(context.Background(), 1,
128+
metric.WithAttributes(
129+
attribute.String("capability_id", m.capabilityID),
130+
attribute.String("reason", reason),
131+
),
132+
)
133+
}
134+
135+
func (m *BaseTriggerBeholderMetrics) IncAckMemoryOutcome(outcome string) {
136+
m.ackMemoryOutcomeCount.Add(context.Background(), 1,
137+
metric.WithAttributes(
138+
attribute.String("capability_id", m.capabilityID),
139+
attribute.String("outcome", outcome),
140+
),
141+
)
142+
}
143+
114144
func (m *BaseTriggerBeholderMetrics) ObserveTimeToAck(triggerID, eventID string, d time.Duration, attempts int) {
115145
m.timeToAckMs.Record(context.Background(), d.Milliseconds(),
116146
metric.WithAttributes(m.attrs(triggerID, eventID)...),
@@ -163,3 +193,5 @@ func (noopBaseTriggerMetrics) IncInboxMissing(string)
163193
func (noopBaseTriggerMetrics) IncInboxFull(string) {}
164194
func (noopBaseTriggerMetrics) EmitUndeliveredWarning(string, string) {}
165195
func (noopBaseTriggerMetrics) EmitUndeliveredCritical(string, string) {}
196+
func (noopBaseTriggerMetrics) IncAckError(string) {}
197+
func (noopBaseTriggerMetrics) IncAckMemoryOutcome(string) {}

pkg/capabilities/base_trigger_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,60 @@ import (
1010
"google.golang.org/protobuf/types/known/wrapperspb"
1111

1212
"github.com/smartcontractkit/chainlink-common/pkg/logger"
13+
"github.com/smartcontractkit/chainlink-common/pkg/settings"
1314
)
1415

16+
func TestResolveBaseTriggerRetryInterval(t *testing.T) {
17+
lggr, err := logger.New()
18+
require.NoError(t, err)
19+
ctx := context.Background()
20+
21+
t.Run("nil getter uses defaults", func(t *testing.T) {
22+
d, err := ResolveBaseTriggerRetryInterval(ctx, nil, lggr)
23+
require.NoError(t, err)
24+
require.Zero(t, d, "default BaseTriggerRetransmitEnabled is false, so retry interval is disabled")
25+
})
26+
27+
t.Run("global JSON enables interval", func(t *testing.T) {
28+
getter, err := settings.NewJSONGetter([]byte(`{
29+
"global": {
30+
"BaseTriggerRetransmitEnabled": "true",
31+
"BaseTriggerRetryInterval": "7s"
32+
}
33+
}`))
34+
require.NoError(t, err)
35+
d, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
36+
require.NoError(t, err)
37+
require.Equal(t, 7*time.Second, d)
38+
})
39+
40+
t.Run("disabled returns zero", func(t *testing.T) {
41+
getter, err := settings.NewJSONGetter([]byte(`{
42+
"global": {
43+
"BaseTriggerRetransmitEnabled": "false",
44+
"BaseTriggerRetryInterval": "7s"
45+
}
46+
}`))
47+
require.NoError(t, err)
48+
d, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
49+
require.NoError(t, err)
50+
require.Zero(t, d)
51+
})
52+
53+
t.Run("enabled with zero interval errors", func(t *testing.T) {
54+
getter, err := settings.NewJSONGetter([]byte(`{
55+
"global": {
56+
"BaseTriggerRetransmitEnabled": "true",
57+
"BaseTriggerRetryInterval": "0s"
58+
}
59+
}`))
60+
require.NoError(t, err)
61+
_, err = ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
62+
require.Error(t, err)
63+
require.Contains(t, err.Error(), "BaseTriggerRetryInterval must be positive")
64+
})
65+
}
66+
1567
func newBase(t *testing.T, store EventStore) *BaseTriggerCapability[*wrapperspb.BytesValue] {
1668
return newBaseWithRetransmit(t, store, 100*time.Millisecond)
1769
}

pkg/logger/logger.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package logger
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"reflect"
78
"testing"
89

10+
"go.opentelemetry.io/otel/trace"
911
"go.uber.org/zap"
1012
"go.uber.org/zap/zapcore"
1113
"go.uber.org/zap/zaptest"
@@ -306,3 +308,26 @@ func Criticalw(l Logger, msg string, keysAndValues ...any) {
306308
s := &sugared{Logger: l, h: Helper(l, 2)}
307309
s.Criticalw(msg, keysAndValues...)
308310
}
311+
312+
// CtxKeyVals returns a slice of logger keyvals derived from the context. Values are looked up and passed along with the
313+
// given keys, and if an otel span is present then the trace_id, trace_flags, and trace_flags will be included as well.
314+
// Example: l.With(CtxKeyVals(ctx, "keyFoo", ctxKeyFoo, "keyBar", ctxKeyBar)...)
315+
// See: [SugaredLogger.WithCtx]
316+
func CtxKeyVals(ctx context.Context, keyvals ...any) []any {
317+
var kvs []any
318+
spanCtx := trace.SpanFromContext(ctx).SpanContext()
319+
if spanCtx.HasTraceID() {
320+
kvs = append(kvs, "trace_id", spanCtx.TraceID().String())
321+
kvs = append(kvs, "trace_flags", spanCtx.TraceFlags().String())
322+
}
323+
if spanCtx.HasSpanID() {
324+
kvs = append(kvs, "span_id", spanCtx.SpanID().String())
325+
}
326+
for i := 0; i < len(keyvals); i += 2 {
327+
kvs = append(kvs, keyvals[i])
328+
if i+1 < len(keyvals) { // avoid panic on odd length
329+
kvs = append(kvs, ctx.Value(keyvals[i+1]))
330+
}
331+
}
332+
return kvs
333+
}

pkg/logger/sugared.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package logger
22

3-
import "go.uber.org/zap"
3+
import (
4+
"context"
45

5-
// SugaredLogger extends the base Logger interface with syntactic sugar, similar to zap.SugaredLogger, include two new levels.
6+
"go.uber.org/zap"
7+
)
8+
9+
// SugaredLogger extends the base Logger interface with syntactic sugar, similar to [zap.SugaredLogger], include two new levels.
610
// - Critical: Requires quick action from the node op, obviously these should happen extremely rarely. Example: failed to listen on TCP port
711
// - Trace: Only included if compiled with the trace tag. For example: go test -tags trace ...
812
type SugaredLogger interface {
@@ -42,6 +46,10 @@ type SugaredLogger interface {
4246
// With returns a new Logger with the given arguments.
4347
With(keyvals ...any) SugaredLogger
4448
WithOptions(opts ...zap.Option) SugaredLogger
49+
// WithCtx returns a new Logger with keyvals from the given context.
50+
// Example: l.WithCtx(ctx, "keyFoo", ctxKeyFoo, "keyBar", ctxKeyBar)
51+
// See [CtxKeyVals].
52+
WithCtx(context.Context, ...any) SugaredLogger
4553
// Helper returns a new logger with the number of callers skipped by caller annotation increased by skip.
4654
// This allows wrappers and helpers to point higher up the stack (like testing.T.Helper()).
4755
Helper(skip int) SugaredLogger
@@ -155,3 +163,7 @@ func (s *sugared) WithOptions(opts ...zap.Option) SugaredLogger {
155163
func (s *sugared) Helper(skip int) SugaredLogger {
156164
return Sugared(Helper(s.Logger, skip))
157165
}
166+
167+
func (s *sugared) WithCtx(ctx context.Context, keyvals ...any) SugaredLogger {
168+
return s.With(CtxKeyVals(ctx, keyvals...)...)
169+
}

0 commit comments

Comments
 (0)