From 29d66a3e227f70b24605dee5be249889b5a11a06 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 27 Mar 2026 12:32:31 -0400 Subject: [PATCH 1/6] Add retry config --- pkg/settings/cresettings/defaults.json | 2 ++ pkg/settings/cresettings/defaults.toml | 2 ++ pkg/settings/cresettings/settings.go | 6 ++++++ 3 files changed, 10 insertions(+) diff --git a/pkg/settings/cresettings/defaults.json b/pkg/settings/cresettings/defaults.json index 58c7bddb7a..56a5f0bb19 100644 --- a/pkg/settings/cresettings/defaults.json +++ b/pkg/settings/cresettings/defaults.json @@ -8,6 +8,8 @@ "GatewayHTTPGlobalRate": "500rps:500", "GatewayHTTPPerNodeRate": "100rps:100", "TriggerRegistrationStatusUpdateTimeout": "0s", + "BaseTriggerRetransmitEnabled": "true", + "BaseTriggerRetryInterval": "30s", "VaultCiphertextSizeLimit": "2kb", "VaultShareSizeLimit": "600b", "VaultIdentifierKeySizeLimit": "64b", diff --git a/pkg/settings/cresettings/defaults.toml b/pkg/settings/cresettings/defaults.toml index d7d6e05966..c4aa7bb0ad 100644 --- a/pkg/settings/cresettings/defaults.toml +++ b/pkg/settings/cresettings/defaults.toml @@ -7,6 +7,8 @@ VaultOrgIdAsSecretOwnerEnabled = 'false' GatewayHTTPGlobalRate = '500rps:500' GatewayHTTPPerNodeRate = '100rps:100' TriggerRegistrationStatusUpdateTimeout = '0s' +BaseTriggerRetransmitEnabled = 'true' +BaseTriggerRetryInterval = '30s' VaultCiphertextSizeLimit = '2kb' VaultShareSizeLimit = '600b' VaultIdentifierKeySizeLimit = '64b' diff --git a/pkg/settings/cresettings/settings.go b/pkg/settings/cresettings/settings.go index c99c88b367..ff723a770b 100644 --- a/pkg/settings/cresettings/settings.go +++ b/pkg/settings/cresettings/settings.go @@ -61,6 +61,9 @@ var Default = Schema{ GatewayHTTPGlobalRate: Rate(rate.Limit(500), 500), GatewayHTTPPerNodeRate: Rate(rate.Limit(100), 100), TriggerRegistrationStatusUpdateTimeout: Duration(0 * time.Second), + BaseTriggerRetransmitEnabled: Bool(true), + BaseTriggerRetryInterval: Duration(30 * time.Second), + // DANGER(cedric): Be extremely careful changing these vault limits as they act as a default value // used by the Vault OCR plugin -- changing these values could cause issues with the plugin during an image // upgrade as nodes apply the old and new values inconsistently. A safe upgrade path @@ -233,6 +236,9 @@ type Schema struct { GatewayHTTPPerNodeRate Setting[config.Rate] TriggerRegistrationStatusUpdateTimeout Setting[time.Duration] + BaseTriggerRetransmitEnabled Setting[bool] + BaseTriggerRetryInterval Setting[time.Duration] + VaultCiphertextSizeLimit Setting[config.Size] VaultShareSizeLimit Setting[config.Size] VaultIdentifierKeySizeLimit Setting[config.Size] From 16aec8b357d8e0c8f1b7adccd7fa765b07187164 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 30 Mar 2026 10:11:11 -0400 Subject: [PATCH 2/6] Update README.md --- pkg/settings/cresettings/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/settings/cresettings/README.md b/pkg/settings/cresettings/README.md index cf04af2339..ac8eeeb5d9 100644 --- a/pkg/settings/cresettings/README.md +++ b/pkg/settings/cresettings/README.md @@ -93,6 +93,10 @@ flowchart PerWorkflow.LogTrigger.FilterAddressLimit{{FilterAddressLimit}}:::bound PerWorkflow.LogTrigger.FilterTopicsPerSlotLimit{{FilterTopicsPerSlotLimit}}:::bound end + subgraph EVMLogTriggerCapability[EVM log trigger capability startup] + BaseTriggerRetransmitEnabled[/BaseTriggerRetransmitEnabled\]:::gate + BaseTriggerRetryInterval>BaseTriggerRetryInterval]:::time + end end subgraph Engine.handleAllTriggerEvents From 898d5c202a01f7e57fc22426c69e513b810e5342 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 30 Mar 2026 10:48:23 -0400 Subject: [PATCH 3/6] Resolve baseTrigger retransmit enabled --- pkg/capabilities/base_trigger_cre.go | 61 +++++++++++++++++++++++++++ pkg/capabilities/base_trigger_test.go | 52 +++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 pkg/capabilities/base_trigger_cre.go diff --git a/pkg/capabilities/base_trigger_cre.go b/pkg/capabilities/base_trigger_cre.go new file mode 100644 index 0000000000..f60b828d44 --- /dev/null +++ b/pkg/capabilities/base_trigger_cre.go @@ -0,0 +1,61 @@ +package capabilities + +import ( + "context" + "fmt" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" +) + +// ResolveBaseTriggerRetryInterval returns the retransmit ticker interval for [BaseTriggerCapability]. +// When [cresettings.Default.BaseTriggerRetransmitEnabled] is false, it returns (0, nil) so the base +// trigger delivers fire-and-forget without persistence or ACK tracking. +// When enabled, [cresettings.Default.BaseTriggerRetryInterval] must be positive. +func ResolveBaseTriggerRetryInterval(ctx context.Context, g settings.Getter, lggr logger.Logger) (retryInterval time.Duration, err error) { + enabled, gerr := cresettings.Default.BaseTriggerRetransmitEnabled.GetOrDefault(ctx, g) + if gerr != nil { + lggr.Errorw("CRE settings read failed for base trigger retransmit flag; using default", "err", gerr) + } + if !enabled { + return 0, nil + } + retryInterval, gerr = cresettings.Default.BaseTriggerRetryInterval.GetOrDefault(ctx, g) + if gerr != nil { + lggr.Errorw("CRE settings read failed for base trigger retry interval; using default", "err", gerr) + } + if retryInterval <= 0 { + return 0, fmt.Errorf( + "BaseTriggerRetransmitEnabled is true but BaseTriggerRetryInterval must be positive (got %s)", + retryInterval, + ) + } + return retryInterval, nil +} + +// NewBaseTriggerCapabilityWithCRESettings builds a [BaseTriggerCapability] using global CRE settings +// for retransmit enablement and interval. Undelivered warning/critical thresholds are derived from +// the resolved interval when retransmit is enabled. +func NewBaseTriggerCapabilityWithCRESettings[T proto.Message]( + ctx context.Context, + store EventStore, + newMsg func() T, + lggr logger.Logger, + capabilityID string, + getter settings.Getter, +) (*BaseTriggerCapability[T], error) { + retry, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr) + if err != nil { + return nil, err + } + var undeliveredWarning, undeliveredCritical time.Duration + if retry > 0 { + undeliveredWarning = 5 * retry + undeliveredCritical = 20 * retry + } + return NewBaseTriggerCapability(store, newMsg, lggr, capabilityID, retry, undeliveredWarning, undeliveredCritical), nil +} diff --git a/pkg/capabilities/base_trigger_test.go b/pkg/capabilities/base_trigger_test.go index 0bb698ba74..7c1b566114 100644 --- a/pkg/capabilities/base_trigger_test.go +++ b/pkg/capabilities/base_trigger_test.go @@ -10,8 +10,60 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings" ) +func TestResolveBaseTriggerRetryInterval(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + ctx := context.Background() + + t.Run("nil getter uses defaults", func(t *testing.T) { + d, err := ResolveBaseTriggerRetryInterval(ctx, nil, lggr) + require.NoError(t, err) + require.Equal(t, 30*time.Second, d) + }) + + t.Run("global JSON enables interval", func(t *testing.T) { + getter, err := settings.NewJSONGetter([]byte(`{ + "global": { + "BaseTriggerRetransmitEnabled": "true", + "BaseTriggerRetryInterval": "7s" + } + }`)) + require.NoError(t, err) + d, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr) + require.NoError(t, err) + require.Equal(t, 7*time.Second, d) + }) + + t.Run("disabled returns zero", func(t *testing.T) { + getter, err := settings.NewJSONGetter([]byte(`{ + "global": { + "BaseTriggerRetransmitEnabled": "false", + "BaseTriggerRetryInterval": "7s" + } + }`)) + require.NoError(t, err) + d, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr) + require.NoError(t, err) + require.Zero(t, d) + }) + + t.Run("enabled with zero interval errors", func(t *testing.T) { + getter, err := settings.NewJSONGetter([]byte(`{ + "global": { + "BaseTriggerRetransmitEnabled": "true", + "BaseTriggerRetryInterval": "0s" + } + }`)) + require.NoError(t, err) + _, err = ResolveBaseTriggerRetryInterval(ctx, getter, lggr) + require.Error(t, err) + require.Contains(t, err.Error(), "BaseTriggerRetryInterval must be positive") + }) +} + func newBase(t *testing.T, store EventStore) *BaseTriggerCapability[*wrapperspb.BytesValue] { return newBaseWithRetransmit(t, store, 100*time.Millisecond) } From 99b52ac327d8f7584f96976acb7d1dcc89c7643a Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 30 Mar 2026 10:48:56 -0400 Subject: [PATCH 4/6] Update defaults.json --- pkg/settings/cresettings/defaults.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/settings/cresettings/defaults.json b/pkg/settings/cresettings/defaults.json index 947bcd5ea5..09d4fc8647 100644 --- a/pkg/settings/cresettings/defaults.json +++ b/pkg/settings/cresettings/defaults.json @@ -8,7 +8,7 @@ "GatewayHTTPGlobalRate": "500rps:500", "GatewayHTTPPerNodeRate": "100rps:100", "TriggerRegistrationStatusUpdateTimeout": "0s", - "BaseTriggerRetransmitEnabled": "true", + "BaseTriggerRetransmitEnabled": "false", "BaseTriggerRetryInterval": "30s", "VaultCiphertextSizeLimit": "2kb", "VaultShareSizeLimit": "600b", From f31328ea2988b5564fcee6971c60f3a601996ce7 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 30 Mar 2026 11:19:45 -0400 Subject: [PATCH 5/6] Add logging and metrics --- pkg/capabilities/base_trigger.go | 68 +++++++++++++++++++++--- pkg/capabilities/base_trigger_metrics.go | 32 +++++++++++ 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/pkg/capabilities/base_trigger.go b/pkg/capabilities/base_trigger.go index 688945bb6e..b0ef1024ef 100644 --- a/pkg/capabilities/base_trigger.go +++ b/pkg/capabilities/base_trigger.go @@ -39,6 +39,10 @@ type BaseTriggerMetrics interface { IncInboxFull(triggerID string) EmitUndeliveredWarning(triggerID, eventID string) EmitUndeliveredCritical(triggerID, eventID string) + // IncAckError counts ACK paths that return an error (e.g. store delete failure). reason is a stable identifier for dashboards. + IncAckError(reason string) + // IncAckMemoryOutcome records how an ACK related to the in-memory pending map: hit, miss_no_trigger_bucket, miss_no_event, miss_nil_record. + IncAckMemoryOutcome(outcome string) } type undeliveredState struct { @@ -192,8 +196,12 @@ func (b *BaseTriggerCapability[T]) DeliverEvent( } if err := b.store.Insert(ctx, rec); err != nil { + b.lggr.Errorw("base trigger failed to persist pending event", + "capabilityID", b.capabilityId, "triggerID", triggerID, "eventID", te.ID, "err", err) return err } + b.lggr.Infow("base trigger persisted pending event for ACK tracking", + "capabilityID", b.capabilityId, "triggerID", triggerID, "eventID", te.ID) b.mu.Lock() if b.pending[triggerID] == nil { @@ -236,27 +244,45 @@ func (b *BaseTriggerCapability[T]) sendToInbox(triggerID, eventID string, payloa func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId string, eventId string) error { b.lggr.Infow("Event ACK", "triggerID", triggerId, "eventID", eventId) if !b.retransmitEnabled() { + b.lggr.Debugw("base trigger ACK skipped (retransmit disabled, no persistence/ACK tracking)", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId) + b.metrics.IncAckMemoryOutcome("skipped_retransmit_disabled") return nil } var ( - attempts int - firstAt time.Time - found bool + attempts int + firstAt time.Time + found bool + hadTriggerBucket bool + hadEventKey bool + hadNilPendingRecord bool ) b.mu.Lock() - if eventsForTrigger, ok := b.pending[triggerId]; ok && eventsForTrigger != nil { - if rec, recOk := eventsForTrigger[eventId]; recOk && rec != nil { + eventsForTrigger, ok := b.pending[triggerId] + hadTriggerBucket = ok && eventsForTrigger != nil + if hadTriggerBucket { + rec, recOk := eventsForTrigger[eventId] + hadEventKey = recOk + switch { + case recOk && rec != nil: attempts = rec.Attempts firstAt = rec.FirstAt found = true + case recOk && rec == nil: + hadNilPendingRecord = true + b.metrics.IncAckMemoryOutcome("miss_nil_record") + default: + b.metrics.IncAckMemoryOutcome("miss_no_event") } delete(eventsForTrigger, eventId) if len(eventsForTrigger) == 0 { delete(b.pending, triggerId) } + } else { + b.metrics.IncAckMemoryOutcome("miss_no_trigger_bucket") } if m, ok := b.undeliveredAlertStates[triggerId]; ok { @@ -267,12 +293,40 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin } b.mu.Unlock() - if found { + switch { + case found: + b.lggr.Infow("base trigger ACK matched in-memory pending event", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId, + "attempts", attempts, "firstAt", firstAt) + b.metrics.IncAckMemoryOutcome("hit") b.metrics.IncAck(triggerId, eventId) b.metrics.ObserveTimeToAck(triggerId, eventId, time.Since(firstAt), attempts) + case hadNilPendingRecord: + b.lggr.Warnw("base trigger ACK: pending map had nil record for event (treating as miss; reconciling store)", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId) + case hadTriggerBucket && !hadEventKey: + b.lggr.Infow("base trigger ACK: event id not in in-memory pending map for trigger (may exist only in store; reconciling)", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId) + case !hadTriggerBucket: + b.lggr.Infow("base trigger ACK: no in-memory pending bucket for trigger (not pending here; still deleting from store if row exists)", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId) } - return b.store.DeleteEvent(ctx, triggerId, eventId) + if err := b.store.DeleteEvent(ctx, triggerId, eventId); err != nil { + b.lggr.Errorw("base trigger ACK failed to delete event from store", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId, + "foundInMemory", found, "err", err) + b.metrics.IncAckError("store_delete_failed") + return err + } + if found { + b.lggr.Debugw("base trigger ACK store delete succeeded", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId) + } else { + b.lggr.Infow("base trigger ACK store delete succeeded (memory miss path; store row removed if present)", + "capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId) + } + return nil } func (b *BaseTriggerCapability[T]) retransmitLoop() { diff --git a/pkg/capabilities/base_trigger_metrics.go b/pkg/capabilities/base_trigger_metrics.go index cad1d69705..24466ddae8 100644 --- a/pkg/capabilities/base_trigger_metrics.go +++ b/pkg/capabilities/base_trigger_metrics.go @@ -14,6 +14,8 @@ type BaseTriggerBeholderMetrics struct { capabilityID string retryCount metric.Int64Counter ackCount metric.Int64Counter + ackErrorCount metric.Int64Counter + ackMemoryOutcomeCount metric.Int64Counter inboxMissingCount metric.Int64Counter inboxFullCount metric.Int64Counter undeliveredWarningCount metric.Int64Counter @@ -34,6 +36,14 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err if err != nil { return nil, err } + ackErrorCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_ack_error_total") + if err != nil { + return nil, err + } + ackMemoryOutcomeCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_ack_memory_outcome_total") + if err != nil { + return nil, err + } inboxMissingCount, err := beholder.GetMeter().Int64Counter("capabilities_base_trigger_inbox_missing_total") if err != nil { return nil, err @@ -69,6 +79,8 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err capabilityID: capabilityID, retryCount: retryCount, ackCount: ackCount, + ackErrorCount: ackErrorCount, + ackMemoryOutcomeCount: ackMemoryOutcomeCount, inboxMissingCount: inboxMissingCount, inboxFullCount: inboxFullCount, undeliveredWarningCount: undeliveredWarningCount, @@ -111,6 +123,24 @@ func (m *BaseTriggerBeholderMetrics) IncAck(triggerID, eventID string) { ) } +func (m *BaseTriggerBeholderMetrics) IncAckError(reason string) { + m.ackErrorCount.Add(context.Background(), 1, + metric.WithAttributes( + attribute.String("capability_id", m.capabilityID), + attribute.String("reason", reason), + ), + ) +} + +func (m *BaseTriggerBeholderMetrics) IncAckMemoryOutcome(outcome string) { + m.ackMemoryOutcomeCount.Add(context.Background(), 1, + metric.WithAttributes( + attribute.String("capability_id", m.capabilityID), + attribute.String("outcome", outcome), + ), + ) +} + func (m *BaseTriggerBeholderMetrics) ObserveTimeToAck(triggerID, eventID string, d time.Duration, attempts int) { m.timeToAckMs.Record(context.Background(), d.Milliseconds(), metric.WithAttributes(m.attrs(triggerID, eventID)...), @@ -163,3 +193,5 @@ func (noopBaseTriggerMetrics) IncInboxMissing(string) func (noopBaseTriggerMetrics) IncInboxFull(string) {} func (noopBaseTriggerMetrics) EmitUndeliveredWarning(string, string) {} func (noopBaseTriggerMetrics) EmitUndeliveredCritical(string, string) {} +func (noopBaseTriggerMetrics) IncAckError(string) {} +func (noopBaseTriggerMetrics) IncAckMemoryOutcome(string) {} From 5d8f163f0727c120267c14945df3d57b32505f2a Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 30 Mar 2026 11:52:41 -0400 Subject: [PATCH 6/6] Fix tests --- pkg/capabilities/base_trigger_test.go | 2 +- pkg/settings/cresettings/defaults.toml | 2 +- pkg/settings/cresettings/settings.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/capabilities/base_trigger_test.go b/pkg/capabilities/base_trigger_test.go index 7c1b566114..c5e23d5ea5 100644 --- a/pkg/capabilities/base_trigger_test.go +++ b/pkg/capabilities/base_trigger_test.go @@ -21,7 +21,7 @@ func TestResolveBaseTriggerRetryInterval(t *testing.T) { t.Run("nil getter uses defaults", func(t *testing.T) { d, err := ResolveBaseTriggerRetryInterval(ctx, nil, lggr) require.NoError(t, err) - require.Equal(t, 30*time.Second, d) + require.Zero(t, d, "default BaseTriggerRetransmitEnabled is false, so retry interval is disabled") }) t.Run("global JSON enables interval", func(t *testing.T) { diff --git a/pkg/settings/cresettings/defaults.toml b/pkg/settings/cresettings/defaults.toml index a0ac7e9569..b7702c1d2c 100644 --- a/pkg/settings/cresettings/defaults.toml +++ b/pkg/settings/cresettings/defaults.toml @@ -7,7 +7,7 @@ VaultOrgIdAsSecretOwnerEnabled = 'false' GatewayHTTPGlobalRate = '500rps:500' GatewayHTTPPerNodeRate = '100rps:100' TriggerRegistrationStatusUpdateTimeout = '0s' -BaseTriggerRetransmitEnabled = 'true' +BaseTriggerRetransmitEnabled = 'false' BaseTriggerRetryInterval = '30s' VaultCiphertextSizeLimit = '2kb' VaultShareSizeLimit = '600b' diff --git a/pkg/settings/cresettings/settings.go b/pkg/settings/cresettings/settings.go index 9ffec5babd..28a1e0de70 100644 --- a/pkg/settings/cresettings/settings.go +++ b/pkg/settings/cresettings/settings.go @@ -61,7 +61,7 @@ var Default = Schema{ GatewayHTTPGlobalRate: Rate(rate.Limit(500), 500), GatewayHTTPPerNodeRate: Rate(rate.Limit(100), 100), TriggerRegistrationStatusUpdateTimeout: Duration(0 * time.Second), - BaseTriggerRetransmitEnabled: Bool(true), + BaseTriggerRetransmitEnabled: Bool(false), BaseTriggerRetryInterval: Duration(30 * time.Second), // DANGER(cedric): Be extremely careful changing these vault limits as they act as a default value