Skip to content

Commit 855a3ea

Browse files
committed
fix: use redis lua to claim ranges safely across instances
1 parent cffbf86 commit 855a3ea

5 files changed

Lines changed: 387 additions & 9 deletions

File tree

internal/worker/catchup.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,8 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error {
156156
return nil
157157
}
158158

159-
// Process multiple ranges in parallel
159+
// Claim multiple ranges in parallel so multiple instances can split work.
160160
var wg sync.WaitGroup
161-
rangeChan := make(chan blockstore.CatchupRange, len(cw.blockRanges))
162-
163-
// Fill channel with ranges
164-
for _, r := range cw.blockRanges {
165-
rangeChan <- r
166-
}
167-
close(rangeChan)
168161

169162
// Start parallel workers
170163
for i := 0; i < CATCHUP_WORKERS; i++ {
@@ -174,7 +167,11 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error {
174167
defer cw.recoverPanic(fmt.Sprintf("catchup range worker %d", workerID))
175168
cw.logger.Debug("Starting catchup worker", "worker_id", workerID)
176169

177-
for r := range rangeChan {
170+
for {
171+
r, ok := cw.claimNextRange()
172+
if !ok {
173+
return
174+
}
178175
if err := cw.processRange(r, workerID); err != nil {
179176
cw.logger.Error("Failed to process range",
180177
"worker_id", workerID,
@@ -194,6 +191,27 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error {
194191
return nil
195192
}
196193

194+
func (cw *CatchupWorker) claimNextRange() (blockstore.CatchupRange, bool) {
195+
claimed, err := cw.catchupStore.GetNextRange(cw.ctx, cw.chain.GetNetworkInternalCode())
196+
if err != nil {
197+
cw.logger.Warn("Failed to claim catchup range",
198+
"chain", cw.chain.GetName(),
199+
"error", err,
200+
)
201+
return blockstore.CatchupRange{}, false
202+
}
203+
if claimed == nil {
204+
return blockstore.CatchupRange{}, false
205+
}
206+
207+
cw.logger.Info("Claimed catchup range",
208+
"chain", cw.chain.GetName(),
209+
"range", fmt.Sprintf("%d-%d", claimed.Start, claimed.End),
210+
"current", claimed.Current,
211+
)
212+
return *claimed, true
213+
}
214+
197215
func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) error {
198216
batchCount := 0
199217
startTime := time.Now()

internal/worker/catchup_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,21 @@ package worker
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"log/slog"
8+
"sync"
79
"testing"
810
"time"
911

1012
"github.com/fystack/multichain-indexer/internal/status"
13+
"github.com/fystack/multichain-indexer/internal/indexer"
1114
"github.com/fystack/multichain-indexer/pkg/common/config"
1215
"github.com/fystack/multichain-indexer/pkg/common/enum"
16+
"github.com/fystack/multichain-indexer/pkg/infra"
1317
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
18+
"github.com/fystack/multichain-indexer/pkg/store/catchupstore"
19+
"github.com/redis/go-redis/v9"
1420
"github.com/stretchr/testify/require"
1521
)
1622

@@ -82,3 +88,152 @@ func TestCatchupWorkerPollsForNewRangesInsteadOfExiting(t *testing.T) {
8288
t.Fatal("catchup worker did not exit after context cancellation")
8389
}
8490
}
91+
92+
func TestCatchupWorkersClaimDistinctRangesAcrossInstances(t *testing.T) {
93+
t.Parallel()
94+
95+
client, cleanup := setupWorkerTestRedis(t)
96+
defer cleanup()
97+
98+
statusRegistry := status.NewRegistry()
99+
statusRegistry.RegisterChain("aptos", "aptos_testnet", config.ChainConfig{
100+
NetworkId: "aptos_testnet",
101+
InternalCode: "APTOS_TESTNET",
102+
Type: enum.NetworkTypeApt,
103+
})
104+
105+
store := catchupstore.New(&workerTestRedisClient{client: client})
106+
ctx1, cancel1 := context.WithCancel(context.Background())
107+
ctx2, cancel2 := context.WithCancel(context.Background())
108+
defer cancel1()
109+
defer cancel2()
110+
111+
var mu sync.Mutex
112+
seen := make([]string, 0, 2)
113+
indexerStub := &stubIndexer{
114+
name: "aptos",
115+
internalCode: "APTOS_TESTNET",
116+
networkType: enum.NetworkTypeApt,
117+
getBlocksFunc: func(_ context.Context, from, to uint64, _ bool) ([]indexer.BlockResult, error) {
118+
mu.Lock()
119+
seen = append(seen, fmt.Sprintf("%d-%d", from, to))
120+
mu.Unlock()
121+
return nil, nil
122+
},
123+
}
124+
125+
require.NoError(t, store.SaveRanges(context.Background(), "APTOS_TESTNET", []blockstore.CatchupRange{
126+
{Start: 1, End: 10, Current: 0},
127+
{Start: 11, End: 20, Current: 10},
128+
}))
129+
130+
newWorker := func(ctx context.Context) *CatchupWorker {
131+
return &CatchupWorker{
132+
BaseWorker: &BaseWorker{
133+
ctx: ctx,
134+
cancel: func() {},
135+
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
136+
config: config.ChainConfig{
137+
PollInterval: time.Millisecond,
138+
Throttle: config.Throttle{BatchSize: 20},
139+
},
140+
chain: indexerStub,
141+
blockStore: &stubBlockStore{},
142+
catchupStore: store,
143+
statusRegistry: statusRegistry,
144+
},
145+
blockRanges: []blockstore.CatchupRange{},
146+
workerPool: make(chan struct{}, CATCHUP_WORKERS),
147+
}
148+
}
149+
150+
var wg sync.WaitGroup
151+
wg.Add(2)
152+
go func() {
153+
defer wg.Done()
154+
w := newWorker(ctx1)
155+
w.runCatchup()
156+
}()
157+
go func() {
158+
defer wg.Done()
159+
w := newWorker(ctx2)
160+
w.runCatchup()
161+
}()
162+
163+
time.Sleep(300 * time.Millisecond)
164+
cancel1()
165+
cancel2()
166+
167+
wg.Wait()
168+
mu.Lock()
169+
defer mu.Unlock()
170+
require.ElementsMatch(t, []string{"1-10", "11-20"}, seen)
171+
}
172+
173+
type workerTestRedisClient struct {
174+
client *redis.Client
175+
}
176+
177+
var _ infra.RedisClient = (*workerTestRedisClient)(nil)
178+
179+
func (r *workerTestRedisClient) GetClient() *redis.Client {
180+
return r.client
181+
}
182+
183+
func (r *workerTestRedisClient) Set(key string, value any, expiration time.Duration) error {
184+
return r.client.Set(context.Background(), key, value, expiration).Err()
185+
}
186+
187+
func (r *workerTestRedisClient) Get(key string) (string, error) {
188+
return r.client.Get(context.Background(), key).Result()
189+
}
190+
191+
func (r *workerTestRedisClient) Del(keys ...string) error {
192+
return r.client.Del(context.Background(), keys...).Err()
193+
}
194+
195+
func (r *workerTestRedisClient) ZAdd(key string, members ...redis.Z) error {
196+
return r.client.ZAdd(context.Background(), key, members...).Err()
197+
}
198+
199+
func (r *workerTestRedisClient) ZRem(key string, members ...interface{}) error {
200+
return r.client.ZRem(context.Background(), key, members...).Err()
201+
}
202+
203+
func (r *workerTestRedisClient) ZRange(key string, start, stop int64) ([]string, error) {
204+
return r.client.ZRange(context.Background(), key, start, stop).Result()
205+
}
206+
207+
func (r *workerTestRedisClient) ZRangeWithScores(key string, start, stop int64) ([]redis.Z, error) {
208+
return r.client.ZRangeWithScores(context.Background(), key, start, stop).Result()
209+
}
210+
211+
func (r *workerTestRedisClient) ZRevRangeWithScores(key string, start, stop int64) ([]redis.Z, error) {
212+
return r.client.ZRevRangeWithScores(context.Background(), key, start, stop).Result()
213+
}
214+
215+
func (r *workerTestRedisClient) Close() error {
216+
return r.client.Close()
217+
}
218+
219+
func setupWorkerTestRedis(t *testing.T) (*redis.Client, func()) {
220+
t.Helper()
221+
222+
client := redis.NewClient(&redis.Options{
223+
Addr: "localhost:6379",
224+
DB: 13,
225+
})
226+
227+
ctx := context.Background()
228+
if _, err := client.Ping(ctx).Result(); err != nil {
229+
t.Skip("Redis not available")
230+
}
231+
require.NoError(t, client.FlushDB(ctx).Err())
232+
233+
cleanup := func() {
234+
_ = client.FlushDB(ctx).Err()
235+
_ = client.Close()
236+
}
237+
238+
return client, cleanup
239+
}

internal/worker/regular_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,14 @@ func (s *stubCatchupStore) GetProgress(_ context.Context, _ string) ([]blockstor
401401
return append([]blockstore.CatchupRange(nil), s.catchupProgress...), nil
402402
}
403403

404+
func (s *stubCatchupStore) GetNextRange(_ context.Context, _ string) (*blockstore.CatchupRange, error) {
405+
if len(s.catchupProgress) == 0 {
406+
return nil, nil
407+
}
408+
rng := s.catchupProgress[0]
409+
return &rng, nil
410+
}
411+
404412
func (s *stubCatchupStore) DeleteRange(_ context.Context, _ string, start, end uint64) error {
405413
if s.deleteCatchupErr != nil {
406414
return s.deleteCatchupErr

0 commit comments

Comments
 (0)