Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 79 additions & 14 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -39,6 +41,22 @@ var (
},
[]string{"streamID"},
)
promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_hit_count",
Help: "Number of local observation cache hits",
},
[]string{"streamID"},
)
promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_miss_count",
Help: "Number of local observation cache misses",
},
[]string{"streamID"},
)
)

type Registry interface {
Expand Down Expand Up @@ -77,16 +95,32 @@ var _ llo.DataSource = &dataSource{}
type dataSource struct {
lggr logger.Logger
registry Registry
t Telemeter

t Telemeter
shouldCache *atomic.Bool
cache *cache.Cache
}

func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource {
return newDataSource(lggr, registry, t)
return newDataSource(lggr, registry, t, true)
}

func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSource {
return &dataSource{logger.Named(lggr, "DataSource"), registry, t}
func newDataSource(lggr logger.Logger, registry Registry, t Telemeter, cacheEnabled bool) *dataSource {
shouldCache := &atomic.Bool{}
shouldCache.Store(cacheEnabled)

return &dataSource{
lggr: logger.Named(lggr, "DataSource"),
registry: registry,
t: t,

// Cache valid observations between rounds for 1 second to avoid exhausting
// node network and the underlying adapter's resources when dealing
// with a large number of streams. It is cleaned up every minute to
// remove stale observations for removed streams.
shouldCache: shouldCache,
cache: cache.New(time.Second, time.Minute),
}
}

// Observe looks up all streams in the registry and populates a map of stream ID => value
Expand Down Expand Up @@ -137,17 +171,26 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()
val, err := oc.Observe(ctx, streamID, opts)
if err != nil {
strmIDStr := strconv.FormatUint(uint64(streamID), 10)
if errors.As(err, &MissingStreamError{}) {
promMissingStreamCount.WithLabelValues(strmIDStr).Inc()
var val llo.StreamValue
var err error

// check for valid cached value before observing
if val = d.fromCache(streamID); val == nil {
// no valid cached value, observe the stream
if val, err = oc.Observe(ctx, streamID, opts); err != nil {
strmIDStr := strconv.FormatUint(uint64(streamID), 10)
if errors.As(err, &MissingStreamError{}) {
promMissingStreamCount.WithLabelValues(strmIDStr).Inc()
}
promObservationErrorCount.WithLabelValues(strmIDStr).Inc()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"})
mu.Unlock()
return
}
promObservationErrorCount.WithLabelValues(strmIDStr).Inc()
mu.Lock()
errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"})
mu.Unlock()
return

// cache the observed value
d.toCache(streamID, val)
}

mu.Lock()
Expand Down Expand Up @@ -192,3 +235,25 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

return nil
}

func (d *dataSource) fromCache(streamID llotypes.StreamID) llo.StreamValue {
if d.shouldCache.Load() {
cacheKey := strconv.FormatUint(uint64(streamID), 10)
if cachedVal, found := d.cache.Get(cacheKey); found && cachedVal != nil {
streamValue := cachedVal.(llo.StreamValue)
promCacheHitCount.WithLabelValues(cacheKey).Inc()
return streamValue
}
promCacheMissCount.WithLabelValues(cacheKey).Inc()
}
return nil
}

func (d *dataSource) toCache(streamID llotypes.StreamID, val llo.StreamValue) {
if d.shouldCache.Load() && val != nil {
cacheKey := strconv.FormatUint(uint64(streamID), 10)

// set with default expiration
d.cache.SetDefault(cacheKey, val)
}
}
136 changes: 126 additions & 10 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"testing"
"time"

gocache "github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -21,9 +23,7 @@
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"

"github.com/smartcontractkit/chainlink-data-streams/llo"
datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest"
Expand Down Expand Up @@ -121,17 +121,17 @@
m.ch = make(chan interface{}, size)
return m.ch
}
func (m *mockTelemeter) GetOutcomeTelemetryCh() chan<- *datastreamsllo.LLOOutcomeTelemetry {
func (m *mockTelemeter) GetOutcomeTelemetryCh() chan<- *llo.LLOOutcomeTelemetry {
return nil
}
func (m *mockTelemeter) GetReportTelemetryCh() chan<- *datastreamsllo.LLOReportTelemetry { return nil }
func (m *mockTelemeter) CaptureEATelemetry() bool { return true }
func (m *mockTelemeter) CaptureObservationTelemetry() bool { return true }
func (m *mockTelemeter) GetReportTelemetryCh() chan<- *llo.LLOReportTelemetry { return nil }
func (m *mockTelemeter) CaptureEATelemetry() bool { return true }
func (m *mockTelemeter) CaptureObservationTelemetry() bool { return true }

func Test_DataSource(t *testing.T) {
lggr := logger.TestLogger(t)
reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)}
ds := newDataSource(lggr, reg, NullTelemeter)
ds := newDataSource(lggr, reg, telem.NullTelemeter, false)

Check failure on line 134 in core/services/llo/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: telem.NullTelemeter
ctx := testutils.Context(t)
opts := &mockOpts{}

Expand Down Expand Up @@ -168,8 +168,8 @@
assert.NoError(t, err)

assert.Equal(t, llo.StreamValues{
2: llo.ToDecimal(decimal.NewFromInt(40602)),
1: nil,
2: llo.ToDecimal(decimal.NewFromInt(40602)),
3: nil,
}, vals)
})
Expand Down Expand Up @@ -258,6 +258,122 @@
assert.Nil(t, pkt.val)
assert.Error(t, pkt.err)
})

t.Run("uses cached values when available", func(t *testing.T) {
ds := newDataSource(lggr, reg, telem.NullTelemeter, true)

Check failure on line 263 in core/services/llo/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: telem.NullTelemeter

// First observation to populate cache
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil)
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil)

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
require.NoError(t, err)

// Verify initial values
assert.Equal(t, llo.StreamValues{
1: llo.ToDecimal(decimal.NewFromInt(2181)),
2: llo.ToDecimal(decimal.NewFromInt(40602)),
3: nil,
}, vals)

// Change pipeline results
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(9999), nil)
reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(8888), nil)

// Second observation should use cached values
vals = makeStreamValues()
err = ds.Observe(ctx, vals, opts)
require.NoError(t, err)

// Should still have original values from cache
assert.Equal(t, llo.StreamValues{
1: llo.ToDecimal(decimal.NewFromInt(2181)),
2: llo.ToDecimal(decimal.NewFromInt(40602)),
3: nil,
}, vals)

// Verify cache metrics
assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheHitCount.WithLabelValues("1")), 0.0001)
assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheHitCount.WithLabelValues("2")), 0.0001)
assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheMissCount.WithLabelValues("1")), 0.0001)
assert.InEpsilon(t, float64(1), testutil.ToFloat64(promCacheMissCount.WithLabelValues("2")), 0.0001)
})

t.Run("refreshes cache after expiration", func(t *testing.T) {
// Create a new data source with a very short cache TTL
ds := newDataSource(lggr, reg, telem.NullTelemeter, true)

Check failure on line 305 in core/services/llo/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: telem.NullTelemeter
ds.cache = gocache.New(10*time.Millisecond, 1*time.Minute)

// First observation
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil)
vals := llo.StreamValues{1: nil}

err := ds.Observe(ctx, vals, opts)
require.NoError(t, err)

// Wait for cache to expire
time.Sleep(20 * time.Millisecond)

// Change pipeline result
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(200), nil)

// Second observation should use new value
vals = llo.StreamValues{1: nil}
err = ds.Observe(ctx, vals, opts)
require.NoError(t, err)

assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(200))}, vals)
})

t.Run("handles concurrent cache access", func(t *testing.T) {
// Create a new data source
ds := newDataSource(lggr, reg, telem.NullTelemeter, true)

Check failure on line 331 in core/services/llo/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: telem.NullTelemeter

// Set up pipeline to return different values
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil)

// First observation to cache
vals := llo.StreamValues{1: nil}
err := ds.Observe(ctx, vals, opts)
require.NoError(t, err)

// Run multiple observations concurrently
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
vals := llo.StreamValues{1: nil}
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)
assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals)
}()
}
wg.Wait()

// Verify pipeline was only called once
assert.Equal(t, 1, reg.pipelines[1].runCount)
})

t.Run("handles cache errors gracefully", func(t *testing.T) {
ds := newDataSource(lggr, reg, telem.NullTelemeter, true)

Check failure on line 360 in core/services/llo/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: telem.NullTelemeter
ds.cache = gocache.New(100*time.Millisecond, 1*time.Minute)

// First observation with error
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, nil, errors.New("pipeline error"))
vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
require.NoError(t, err) // Observe returns nil error even if some streams fail

// Second observation should try again (not use cache for error case)
reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil)
vals = llo.StreamValues{1: nil}
err = ds.Observe(ctx, vals, opts)
require.NoError(t, err)

assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals)
})
})
}

Expand Down Expand Up @@ -317,15 +433,15 @@

result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
result3 -> result3_parse -> multiply3;
`, i+n, i+2*n, i+3*n),
},
}
err := r.Register(jb, nil)
require.NoError(b, err)
}

ds := newDataSource(lggr, r, NullTelemeter)
ds := newDataSource(lggr, r, telem.NullTelemeter, false)

Check failure on line 444 in core/services/llo/data_source_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

undefined: telem.NullTelemeter
vals := make(map[llotypes.StreamID]llo.StreamValue)
for i := uint32(0); i < 4*n; i++ {
vals[i] = nil
Expand Down
6 changes: 3 additions & 3 deletions core/services/llo/mercurytransmitter/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,10 @@ func (o *orm) Prune(ctx context.Context, serverURL string, maxSize, batchSize in
res, err = o.ds.ExecContext(ctx, `
DELETE FROM llo_mercury_transmit_queue AS q
USING (
SELECT transmission_hash
SELECT transmission_hash
FROM llo_mercury_transmit_queue
WHERE don_id = $1
AND server_url = $2
WHERE don_id = $1
AND server_url = $2
AND seq_nr < $3
ORDER BY seq_nr ASC
LIMIT $4
Expand Down
6 changes: 6 additions & 0 deletions core/services/llo/mercurytransmitter/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ func (tq *transmitQueue) IsEmpty() bool {
return tq.pq.Len() == 0
}

func (tq *transmitQueue) Len() int {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.pq.Len()
}

func (tq *transmitQueue) Start(context.Context) error {
return tq.StartOnce("TransmitQueue", func() error {
t := services.NewTicker(promInterval)
Expand Down
2 changes: 1 addition & 1 deletion core/services/llo/mercurytransmitter/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func Test_Queue(t *testing.T) {
}

tq.Push(testTransmissions[maxSize+3]) // push one more to trigger eviction
require.Equal(t, maxSize, tq.(*transmitQueue).pq.Len())
require.Equal(t, maxSize, tq.(*transmitQueue).Len())
require.Len(t, deleter.hashes, 4) // evicted overfill entries (3 oversize plus 1 more to make room)

// oldest entries removed
Expand Down
Loading
Loading