Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 82 additions & 22 deletions pkg/capabilities/base_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings"
"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
)

type PendingEvent struct {
Expand Down Expand Up @@ -58,6 +60,9 @@ type BaseTriggerCapability[T proto.Message] struct {
newMsg func() T // factory to allocate a new T for unmarshalling
lggr logger.Logger
capabilityId string
// settings provides live CRE globals (BaseTriggerRetransmitEnabled, BaseTriggerRetryInterval).
// When nil, tRetransmit > 0 enables persistence/retry with fixed spacing.
settings settings.Getter

mu sync.Mutex
inboxes map[string]chan<- TriggerAndId[T] // triggerID --> registered send channel
Expand All @@ -82,6 +87,7 @@ func NewBaseTriggerCapability[T proto.Message](
tRetransmit time.Duration,
undeliveredWarning time.Duration,
undeliveredCritical time.Duration,
settings settings.Getter,
) *BaseTriggerCapability[T] {
ctx, cancel := context.WithCancel(context.Background())
metrics, err := NewBaseTriggerBeholderMetrics(capabilityId)
Expand All @@ -96,6 +102,7 @@ func NewBaseTriggerCapability[T proto.Message](
lggr: lggr,
capabilityId: capabilityId,
tRetransmit: tRetransmit,
settings: settings,
metrics: metrics,
undeliveredWarning: undeliveredWarning,
undeliveredCritical: undeliveredCritical,
Expand All @@ -108,17 +115,58 @@ func NewBaseTriggerCapability[T proto.Message](
}
}

func (b *BaseTriggerCapability[T]) retransmitEnabled() bool {
return b.tRetransmit > 0
// retransmitAllowed is true when events should be persisted and eligible for resend / ACK tracking.
func (b *BaseTriggerCapability[T]) retransmitAllowed(ctx context.Context) bool {
if b.settings == nil {
return b.tRetransmit > 0
}
enabled, err := cresettings.Default.BaseTriggerRetransmitEnabled.GetOrDefault(ctx, b.settings)
if err != nil {
b.lggr.Warnw("CRE settings read failed for BaseTriggerRetransmitEnabled; treating retransmit as disabled", "err", err)
return false
}
return enabled
}

func (b *BaseTriggerCapability[T]) Start(ctx context.Context) error {
b.lggr.Info("starting base trigger")
// retryInterval returns spacing between resend attempts. When settings is set, reads live CRE config.
func (b *BaseTriggerCapability[T]) retryInterval(ctx context.Context) time.Duration {
if b.settings == nil {
return b.tRetransmit
}
interval, err := cresettings.Default.BaseTriggerRetryInterval.GetOrDefault(ctx, b.settings)
if err != nil {
b.lggr.Warnw("CRE settings read failed for BaseTriggerRetryInterval; using schema default", "err", err)
return cresettings.Default.BaseTriggerRetryInterval.DefaultValue
}
return interval
}

if !b.retransmitEnabled() {
b.lggr.Warn("retransmits disabled (tRetransmit <= 0), events will be delivered once without persistence or ACK tracking")
return nil
// loopTickDuration is recomputed before each loop wait so BaseTriggerRetryInterval and enablement
// changes take effect without restarting. Uses half the retry interval.
func (b *BaseTriggerCapability[T]) loopTickDuration() time.Duration {
if b.settings != nil {
iv := b.retryInterval(b.ctx)
if iv <= 0 {
return time.Second
}
d := iv / 2
if d < time.Millisecond {
return time.Millisecond
}
return d
}
if b.tRetransmit > 0 {
d := b.tRetransmit / 2
if d < time.Millisecond {
return time.Millisecond
}
return d
}
return time.Second
}

func (b *BaseTriggerCapability[T]) Start(ctx context.Context) error {
b.lggr.Info("starting base trigger")

recs, err := b.store.List(ctx)
if err != nil {
Expand Down Expand Up @@ -183,7 +231,7 @@ func (b *BaseTriggerCapability[T]) DeliverEvent(
te TriggerEvent,
triggerID string,
) error {
if !b.retransmitEnabled() {
if !b.retransmitAllowed(ctx) {
return b.sendToInbox(triggerID, te.ID, te.Payload.GetValue())
}

Expand Down Expand Up @@ -243,12 +291,6 @@ func (b *BaseTriggerCapability[T]) sendToInbox(triggerID, eventID string, payloa

func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId string, eventId string) error {
b.lggr.Infow("Event ACK", "triggerID", triggerId, "eventID", eventId)
if !b.retransmitEnabled() {
b.lggr.Debugw("base trigger ACK skipped (retransmit disabled, no persistence/ACK tracking)",
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
b.metrics.IncAckMemoryOutcome("skipped_retransmit_disabled")
return nil
}

var (
attempts int
Expand Down Expand Up @@ -330,14 +372,15 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin
}

func (b *BaseTriggerCapability[T]) retransmitLoop() {
ticker := time.NewTicker(b.tRetransmit / 2)
defer ticker.Stop()

for {
timer := time.NewTimer(b.loopTickDuration())
select {
case <-b.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-ticker.C:
case <-timer.C:
b.lggr.Debug("retransmitting unacknowledged events")
b.scanPending()
}
Expand All @@ -346,19 +389,28 @@ func (b *BaseTriggerCapability[T]) retransmitLoop() {

func (b *BaseTriggerCapability[T]) scanPending() {
now := time.Now()
ctx := b.ctx

interval := b.retryInterval(ctx)
if !b.retransmitAllowed(ctx) || interval <= 0 {
return
}

warnThreshold := 5 * interval
critThreshold := 20 * interval

b.mu.Lock()
toResend := make([]PendingEvent, 0, len(b.pending))
for triggerID, pendingForTrigger := range b.pending {
for eventID, rec := range pendingForTrigger {
if rec.LastSentAt.IsZero() || now.Sub(rec.LastSentAt) >= b.tRetransmit {
if rec.LastSentAt.IsZero() || now.Sub(rec.LastSentAt) >= interval {
toResend = append(toResend, PendingEvent{
TriggerId: rec.TriggerId,
EventId: rec.EventId,
})
}

if b.undeliveredWarning == 0 && b.undeliveredCritical == 0 {
if warnThreshold == 0 && critThreshold == 0 {
continue
}
age := now.Sub(rec.FirstAt)
Expand All @@ -373,12 +425,13 @@ func (b *BaseTriggerCapability[T]) scanPending() {
b.undeliveredAlertStates[triggerID][eventID] = state
}

if b.undeliveredWarning > 0 && !state.emittedWarning && age >= b.undeliveredWarning {
// TODO: consider meters (in addition to logs) for warn/crit so the data is easy to chart.
if warnThreshold > 0 && !state.emittedWarning && age >= warnThreshold {
b.metrics.EmitUndeliveredWarning(triggerID, eventID)
Comment thread
DylanTinianov marked this conversation as resolved.
state.emittedWarning = true
}

if b.undeliveredCritical > 0 && !state.emittedCritical && age >= b.undeliveredCritical {
if critThreshold > 0 && !state.emittedCritical && age >= critThreshold {
b.metrics.EmitUndeliveredCritical(triggerID, eventID)
state.emittedCritical = true
}
Expand All @@ -395,6 +448,13 @@ func (b *BaseTriggerCapability[T]) scanPending() {
// It updates Attempts and LastSentAt on every attempt locally. Success is determined
// later by an AckEvent call.
func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) {
if !b.retransmitAllowed(b.ctx) {
return
}
if b.retryInterval(b.ctx) <= 0 {
return
}

b.mu.Lock()
eventsForTrigger, ok := b.pending[event.TriggerId]
if !ok || eventsForTrigger == nil {
Expand Down
48 changes: 16 additions & 32 deletions pkg/capabilities/base_trigger_cre.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package capabilities
import (
"context"
"fmt"
"time"

"google.golang.org/protobuf/proto"

Expand All @@ -12,34 +11,25 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
)

// ResolveBaseTriggerRetryInterval returns the retransmit ticker interval for [BaseTriggerCapability].
// When [cresettings.Default.BaseTriggerRetransmitEnabled] is false, it returns (0, nil) so the base
// trigger delivers fire-and-forget without persistence or ACK tracking.
// When enabled, [cresettings.Default.BaseTriggerRetryInterval] must be positive.
func ResolveBaseTriggerRetryInterval(ctx context.Context, g settings.Getter, lggr logger.Logger) (retryInterval time.Duration, err error) {
enabled, gerr := cresettings.Default.BaseTriggerRetransmitEnabled.GetOrDefault(ctx, g)
if gerr != nil {
lggr.Errorw("CRE settings read failed for base trigger retransmit flag; using default", "err", gerr)
// ValidateBaseTriggerRetryInterval returns an error if the configured retry interval is not positive.
// Retransmit enablement is evaluated dynamically at runtime via BaseTriggerRetransmitEnabled.
func ValidateBaseTriggerRetryInterval(ctx context.Context, g settings.Getter) error {
if g == nil {
return fmt.Errorf("base trigger CRE settings getter is nil")
}
if !enabled {
return 0, nil
}
retryInterval, gerr = cresettings.Default.BaseTriggerRetryInterval.GetOrDefault(ctx, g)
if gerr != nil {
lggr.Errorw("CRE settings read failed for base trigger retry interval; using default", "err", gerr)
iv, err := cresettings.Default.BaseTriggerRetryInterval.GetOrDefault(ctx, g)
if err != nil {
return fmt.Errorf("base trigger retry interval: %w", err)
}
if retryInterval <= 0 {
return 0, fmt.Errorf(
"BaseTriggerRetransmitEnabled is true but BaseTriggerRetryInterval must be positive (got %s)",
retryInterval,
)
if iv <= 0 {
return fmt.Errorf("BaseTriggerRetryInterval must be positive (got %v)", iv)
}
return retryInterval, nil
return nil
}

// NewBaseTriggerCapabilityWithCRESettings builds a [BaseTriggerCapability] using global CRE settings
// for retransmit enablement and interval. Undelivered warning/critical thresholds are derived from
// the resolved interval when retransmit is enabled.
// NewBaseTriggerCapabilityWithCRESettings builds a [BaseTriggerCapability] that reads
// [cresettings.Default.BaseTriggerRetransmitEnabled] and [cresettings.Default.BaseTriggerRetryInterval]
// on each delivery, resend, and scan so changes apply without restarting the node.
func NewBaseTriggerCapabilityWithCRESettings[T proto.Message](
ctx context.Context,
store EventStore,
Expand All @@ -48,14 +38,8 @@ func NewBaseTriggerCapabilityWithCRESettings[T proto.Message](
capabilityID string,
getter settings.Getter,
) (*BaseTriggerCapability[T], error) {
retry, err := ResolveBaseTriggerRetryInterval(ctx, getter, lggr)
if err != nil {
if err := ValidateBaseTriggerRetryInterval(ctx, getter); err != nil {
return nil, err
}
var undeliveredWarning, undeliveredCritical time.Duration
if retry > 0 {
undeliveredWarning = 5 * retry
undeliveredCritical = 20 * retry
}
return NewBaseTriggerCapability(store, newMsg, lggr, capabilityID, retry, undeliveredWarning, undeliveredCritical), nil
return NewBaseTriggerCapability(store, newMsg, lggr, capabilityID, 0, 0, 0, getter), nil
}
Loading
Loading