Skip to content

Commit 223520c

Browse files
committed
ethrpc: auto-split FilterLogs on block range / result size errors
Add automatic range splitting to Provider.FilterLogs() so callers get transparent retry when nodes reject eth_getLogs requests for being too large (block range exceeded, too many results, etc.). Uses an AIMD strategy (multiplicative decrease /1.5 on error, additive increase +10% on success) to discover the node's true limit. Once calibrated, the known-good range is reused directly with no further probing. New options: WithFilterLogsMaxRange(n), WithAutoFilterLogsRange(bool). Feature is disabled by default — zero behavioral change unless opted in.
1 parent b2ccd70 commit 223520c

3 files changed

Lines changed: 263 additions & 4 deletions

File tree

ethrpc/ethrpc.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,30 @@ type Provider struct {
4545
// cache cachestore.Store[[]byte] // NOTE: unused for now
4646
lastRequestID uint64
4747

48+
// filterLogsMaxRange controls the maximum block range per eth_getLogs request.
49+
// -1 = disabled (never auto-split, always pass through to node as-is)
50+
// 0 = auto mode (start with the full range, shrink on error, remember what worked)
51+
// >0 = explicit max range (start with this value, still shrink further if needed)
52+
// Default: -1 (disabled). Must opt-in via options.
53+
filterLogsMaxRange int64
54+
55+
// filterLogsLastRange is the last block range that succeeded in auto-split mode.
56+
// Used to avoid re-probing the node's limit on every call. Starts at 0, meaning
57+
// "not yet calibrated." Safe for concurrent access via atomic operations.
58+
filterLogsLastRange atomic.Int64
59+
60+
// filterLogsRangeCalibrated indicates that the auto-split range has been fully
61+
// calibrated — meaning we've discovered the node's true limit via a shrink+grow
62+
// cycle. Once true, we use filterLogsLastRange directly without probing higher.
63+
filterLogsRangeCalibrated atomic.Bool
64+
4865
mu sync.Mutex
4966
}
5067

5168
func NewProvider(nodeURL string, options ...Option) (*Provider, error) {
5269
p := &Provider{
53-
nodeURL: nodeURL,
70+
nodeURL: nodeURL,
71+
filterLogsMaxRange: -1, // disabled by default
5472
}
5573
for _, opt := range options {
5674
if opt == nil {
@@ -446,9 +464,11 @@ func (p *Provider) RawFilterLogs(ctx context.Context, q ethereum.FilterQuery) (j
446464
}
447465

448466
func (p *Provider) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
449-
var logs []types.Log
450-
_, err := p.Do(ctx, FilterLogs(q).Strict(p.strictness).Into(&logs))
451-
return logs, err
467+
// Fast path: feature disabled or query uses BlockHash (no range to split)
468+
if p.filterLogsMaxRange < 0 || q.BlockHash != nil {
469+
return p.filterLogs(ctx, q)
470+
}
471+
return p.filterLogsAutoSplit(ctx, q)
452472
}
453473

454474
func (p *Provider) PendingBalanceAt(ctx context.Context, account common.Address) (*big.Int, error) {

ethrpc/filter_logs.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package ethrpc
2+
3+
import (
4+
"context"
5+
"math"
6+
"math/big"
7+
"strings"
8+
9+
"github.com/0xsequence/ethkit/go-ethereum"
10+
"github.com/0xsequence/ethkit/go-ethereum/core/types"
11+
)
12+
13+
// filterLogs executes a standard eth_getLogs JSON-RPC call for the given query.
14+
func (p *Provider) filterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
15+
var logs []types.Log
16+
_, err := p.Do(ctx, FilterLogs(q).Strict(p.strictness).Into(&logs))
17+
return logs, err
18+
}
19+
20+
// filterLogsAutoSplit executes eth_getLogs with automatic range splitting when the node
21+
// returns a "too much data" style error. It uses an AIMD (Additive Increase,
22+
// Multiplicative Decrease) strategy to adapt the batch range over time.
23+
//
24+
// Calibration: on the first call the provider tries the full range (or the explicit max).
25+
// If the node rejects it, we shrink (multiplicative decrease /1.5) and retry. Within a
26+
// single call, after each successful sub-query we attempt additive increase (+10%). If
27+
// the increase triggers another error, we shrink back and mark the range as "calibrated"
28+
// — meaning the node's true limit has been found. Once calibrated, subsequent calls skip
29+
// the probing entirely and use the known-good range directly.
30+
func (p *Provider) filterLogsAutoSplit(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
31+
from, to, err := p.resolveFilterBlockRange(ctx, q)
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
// Single block: just do a direct call. Empty/invalid range: return early.
37+
if from == to {
38+
return p.filterLogs(ctx, q)
39+
}
40+
if from > to {
41+
return nil, nil
42+
}
43+
44+
totalRange := to - from
45+
batchRange := p.effectiveFilterLogsBatchRange(totalRange)
46+
47+
// Additive factor: 10% of the starting batch range, minimum 1
48+
additiveFactor := uint64(math.Ceil(float64(batchRange) * 0.10))
49+
if additiveFactor < 1 {
50+
additiveFactor = 1
51+
}
52+
53+
// Track whether we've succeeded after a shrink. We only mark calibrated
54+
// on a shrink→success→grow→fail cycle, which means we've bracketed the
55+
// true limit. A shrink→shrink sequence (consecutive failures) does not
56+
// count as calibration.
57+
succeededSinceShrink := false
58+
59+
var allLogs []types.Log
60+
61+
for cursor := from; cursor <= to; {
62+
if err := ctx.Err(); err != nil {
63+
return nil, err
64+
}
65+
66+
end := cursor + batchRange
67+
if end > to {
68+
end = to
69+
}
70+
71+
subQ := q // copy value (FilterQuery is a struct)
72+
subQ.FromBlock = new(big.Int).SetUint64(cursor)
73+
subQ.ToBlock = new(big.Int).SetUint64(end)
74+
75+
logs, err := p.filterLogs(ctx, subQ)
76+
if err != nil {
77+
if !isFilterLogsTooMuchDataError(err) {
78+
return nil, err
79+
}
80+
81+
// If we previously grew after a successful shrink and now failed,
82+
// we've bracketed the node's true limit. Mark calibrated.
83+
if succeededSinceShrink {
84+
p.filterLogsRangeCalibrated.Store(true)
85+
}
86+
succeededSinceShrink = false
87+
88+
// Multiplicative decrease: shrink by /1.5
89+
batchRange = uint64(float64(batchRange) / 1.5)
90+
if batchRange < 1 {
91+
batchRange = 1
92+
}
93+
94+
continue // retry the same cursor with a smaller range
95+
}
96+
97+
allLogs = append(allLogs, logs...)
98+
succeededSinceShrink = true
99+
100+
// Store what worked
101+
p.filterLogsLastRange.Store(int64(batchRange))
102+
103+
// Advance past the successfully fetched range
104+
cursor = end + 1
105+
106+
// Check context before continuing to next batch
107+
if err := ctx.Err(); err != nil {
108+
return nil, err
109+
}
110+
111+
// Additive increase: try to grow the batch range if not yet calibrated
112+
if cursor <= to && !p.filterLogsRangeCalibrated.Load() {
113+
ceiling := p.filterLogsCeiling()
114+
grown := batchRange + additiveFactor
115+
if grown > ceiling {
116+
grown = ceiling
117+
}
118+
batchRange = grown
119+
}
120+
}
121+
122+
return allLogs, nil
123+
}
124+
125+
// resolveFilterBlockRange resolves the actual uint64 from/to block numbers from a
126+
// FilterQuery, fetching the latest block number if ToBlock is nil.
127+
func (p *Provider) resolveFilterBlockRange(ctx context.Context, q ethereum.FilterQuery) (uint64, uint64, error) {
128+
var from uint64
129+
if q.FromBlock != nil {
130+
from = q.FromBlock.Uint64()
131+
}
132+
133+
if q.ToBlock != nil {
134+
return from, q.ToBlock.Uint64(), nil
135+
}
136+
137+
// ToBlock is nil → fetch latest block number
138+
latest, err := p.BlockNumber(ctx)
139+
if err != nil {
140+
return 0, 0, err
141+
}
142+
return from, latest, nil
143+
}
144+
145+
// effectiveFilterLogsBatchRange picks the starting batch range for a filterLogsAutoSplit call.
146+
func (p *Provider) effectiveFilterLogsBatchRange(totalRange uint64) uint64 {
147+
// If we have a previously discovered value, use it directly
148+
if last := p.filterLogsLastRange.Load(); last > 0 {
149+
v := uint64(last)
150+
// Respect the explicit ceiling if set
151+
if p.filterLogsMaxRange > 0 && v > uint64(p.filterLogsMaxRange) {
152+
v = uint64(p.filterLogsMaxRange)
153+
}
154+
if v > totalRange {
155+
return totalRange
156+
}
157+
return v
158+
}
159+
160+
// Not yet calibrated: use explicit max if set
161+
if p.filterLogsMaxRange > 0 {
162+
if uint64(p.filterLogsMaxRange) < totalRange {
163+
return uint64(p.filterLogsMaxRange)
164+
}
165+
return totalRange
166+
}
167+
168+
// Auto mode (filterLogsMaxRange == 0), not yet calibrated: try the full range
169+
return totalRange
170+
}
171+
172+
// filterLogsCeiling returns the upper bound for additive increase.
173+
func (p *Provider) filterLogsCeiling() uint64 {
174+
if p.filterLogsMaxRange > 0 {
175+
return uint64(p.filterLogsMaxRange)
176+
}
177+
// Auto mode: no artificial ceiling, let AIMD discover it
178+
return math.MaxUint64
179+
}
180+
181+
// isFilterLogsTooMuchDataError checks whether an error from the node indicates
182+
// the requested range or result set was too large.
183+
func isFilterLogsTooMuchDataError(err error) bool {
184+
if err == nil {
185+
return false
186+
}
187+
msg := strings.ToLower(err.Error())
188+
for _, pattern := range filterLogsTooMuchDataPatterns {
189+
if strings.Contains(msg, pattern) {
190+
return true
191+
}
192+
}
193+
return false
194+
}
195+
196+
// filterLogsTooMuchDataPatterns are lowercase substrings matched against error messages
197+
// from various RPC node providers to detect "too much data" / "block range too large" errors.
198+
var filterLogsTooMuchDataPatterns = []string{
199+
"query returned more than", // Infura, Alchemy, generic (e.g. 10000 results)
200+
"query exceeds max results", // Telos
201+
"response is too big", // Soneium
202+
"response exceed size limit", // Various
203+
"log response size exceeded", // Various
204+
"block range", // Catches "block range too large", "block range exceeded", etc.
205+
"too many blocks", // Various
206+
"logs matched by query exceeds", // Various
207+
"query timeout exceeded", // Timeout due to large range
208+
"read limit exceeded", // Various
209+
"exceed maximum block range", // Various
210+
"too much data", // Generic
211+
"result too large", // Generic
212+
}

ethrpc/option.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,30 @@ func WithStrictValidation() Option {
6767
p.strictness = StrictnessLevel_Strict
6868
}
6969
}
70+
71+
// WithFilterLogsMaxRange sets an explicit maximum block range per eth_getLogs call.
72+
// The provider will automatically split larger ranges into batches of at most maxRange
73+
// blocks, shrinking further if the node still rejects the request.
74+
//
75+
// Pass 0 for auto-detection mode (equivalent to WithAutoFilterLogsRange(true)).
76+
// Pass -1 to disable (default, equivalent to WithAutoFilterLogsRange(false)).
77+
func WithFilterLogsMaxRange(maxRange int64) Option {
78+
return func(p *Provider) {
79+
p.filterLogsMaxRange = maxRange
80+
}
81+
}
82+
83+
// WithAutoFilterLogsRange enables or disables automatic range detection and splitting
84+
// for eth_getLogs calls. When enabled, the provider will probe the node's limit on the
85+
// first call and remember it for subsequent calls.
86+
//
87+
// Equivalent to WithFilterLogsMaxRange(0) when true, WithFilterLogsMaxRange(-1) when false.
88+
func WithAutoFilterLogsRange(enabled bool) Option {
89+
return func(p *Provider) {
90+
if enabled {
91+
p.filterLogsMaxRange = 0
92+
} else {
93+
p.filterLogsMaxRange = -1
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)