Skip to content

Commit 6d977d1

Browse files
committed
refactor: split catchup state into dedicated store
1 parent dd5ff94 commit 6d977d1

14 files changed

Lines changed: 736 additions & 243 deletions

File tree

internal/worker/base.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/fystack/multichain-indexer/pkg/infra"
1717
"github.com/fystack/multichain-indexer/pkg/retry"
1818
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
19+
"github.com/fystack/multichain-indexer/pkg/store/catchupstore"
1920
"github.com/fystack/multichain-indexer/pkg/store/pubkeystore"
2021
)
2122

@@ -43,6 +44,7 @@ type BaseWorker struct {
4344
chain indexer.Indexer
4445
kvstore infra.KVStore
4546
blockStore blockstore.Store
47+
catchupStore catchupstore.Store
4648
pubkeyStore pubkeystore.Store
4749
emitter events.Emitter
4850
failedChan chan FailedBlockEvent
@@ -63,6 +65,7 @@ func newWorkerWithMode(
6365
cfg config.ChainConfig,
6466
kv infra.KVStore,
6567
blockStore blockstore.Store,
68+
catchupStore catchupstore.Store,
6669
emitter events.Emitter,
6770
pubkeyStore pubkeystore.Store,
6871
mode WorkerMode,
@@ -84,6 +87,7 @@ func newWorkerWithMode(
8487
chain: chain,
8588
kvstore: kv,
8689
blockStore: blockStore,
90+
catchupStore: catchupStore,
8791
pubkeyStore: pubkeyStore,
8892
emitter: emitter,
8993
failedChan: failedChan,

internal/worker/catchup.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/fystack/multichain-indexer/pkg/events"
1414
"github.com/fystack/multichain-indexer/pkg/infra"
1515
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
16+
"github.com/fystack/multichain-indexer/pkg/store/catchupstore"
1617
"github.com/fystack/multichain-indexer/pkg/store/pubkeystore"
1718
)
1819

@@ -36,6 +37,7 @@ func NewCatchupWorker(
3637
cfg config.ChainConfig,
3738
kv infra.KVStore,
3839
blockStore blockstore.Store,
40+
catchupStore catchupstore.Store,
3941
emitter events.Emitter,
4042
pubkeyStore pubkeystore.Store,
4143
failedChan chan FailedBlockEvent,
@@ -47,6 +49,7 @@ func NewCatchupWorker(
4749
cfg,
4850
kv,
4951
blockStore,
52+
catchupStore,
5053
emitter,
5154
pubkeyStore,
5255
ModeCatchup,
@@ -116,26 +119,22 @@ func (cw *CatchupWorker) runCatchup() {
116119
}
117120

118121
func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange {
119-
registry := status.EnsureStatusRegistry(cw.statusRegistry)
120-
121122
// Load existing catchup ranges from the store. The catchup worker only loads
122123
// ranges; creating new ranges is the responsibility of the regular worker
123124
// (via determineStartingBlock or skipAheadIfLagging).
124-
progress, err := cw.blockStore.GetCatchupProgress(cw.chain.GetNetworkInternalCode())
125+
progress, err := cw.catchupStore.GetProgress(cw.ctx, cw.chain.GetNetworkInternalCode())
125126
if err != nil {
126127
cw.logger.Warn("Failed to load catchup progress",
127128
"chain", cw.chain.GetName(),
128129
"error", err,
129130
)
130-
registry.SetCatchupRanges(cw.chain.GetName(), nil)
131131
return nil
132132
}
133133

134134
cw.logger.Info("Loaded catchup progress",
135135
"chain", cw.chain.GetName(),
136136
"progress_ranges", len(progress),
137137
)
138-
registry.SetCatchupRanges(cw.chain.GetName(), progress)
139138
return progress
140139
}
141140

@@ -319,14 +318,13 @@ func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) e
319318
func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) {
320319
cw.progressMu.Lock()
321320
defer cw.progressMu.Unlock()
322-
registry := status.EnsureStatusRegistry(cw.statusRegistry)
323321
cw.logger.Debug("Saving catchup progress",
324322
"chain", cw.chain.GetName(),
325323
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
326324
"current", current,
327325
)
328326
current = min(current, r.End)
329-
if err := cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil {
327+
if err := cw.catchupStore.SaveProgress(cw.ctx, cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil {
330328
cw.logger.Warn("Failed to save catchup progress",
331329
"chain", cw.chain.GetName(),
332330
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
@@ -335,11 +333,6 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64)
335333
)
336334
return
337335
}
338-
registry.UpsertCatchupRanges(cw.chain.GetName(), []blockstore.CatchupRange{{
339-
Start: r.Start,
340-
End: r.End,
341-
Current: current,
342-
}})
343336
for i := range cw.blockRanges {
344337
if cw.blockRanges[i].Start == r.Start && cw.blockRanges[i].End == r.End {
345338
cw.blockRanges[i].Current = current
@@ -351,23 +344,20 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64)
351344
func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error {
352345
cw.progressMu.Lock()
353346
defer cw.progressMu.Unlock()
354-
registry := status.EnsureStatusRegistry(cw.statusRegistry)
355347

356348
cw.logger.Info("Completing catchup range",
357349
"chain", cw.chain.GetName(),
358350
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
359351
)
360352

361-
if err := cw.blockStore.DeleteCatchupRange(cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil {
353+
if err := cw.catchupStore.DeleteRange(cw.ctx, cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil {
362354
cw.logger.Warn("Failed to delete catchup range",
363355
"chain", cw.chain.GetName(),
364356
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
365357
"error", err,
366358
)
367359
return err
368360
}
369-
registry.DeleteCatchupRange(cw.chain.GetName(), r.Start, r.End)
370-
371361
// Remove from local ranges
372362
for i, existing := range cw.blockRanges {
373363
if existing.Start == r.Start && existing.End == r.End {
@@ -406,15 +396,12 @@ func (cw *CatchupWorker) Close() error {
406396
)
407397
}
408398

409-
if err := cw.blockStore.SaveCatchupRanges(cw.chain.GetNetworkInternalCode(), rangesToSave); err != nil {
399+
if err := cw.catchupStore.SaveRanges(cw.ctx, cw.chain.GetNetworkInternalCode(), rangesToSave); err != nil {
410400
cw.logger.Error("Failed to batch save progress on close",
411401
"chain", cw.chain.GetName(),
412402
"ranges", len(rangesToSave),
413403
"error", err,
414404
)
415-
} else {
416-
registry := status.EnsureStatusRegistry(cw.statusRegistry)
417-
registry.UpsertCatchupRanges(cw.chain.GetName(), rangesToSave)
418405
}
419406

420407
return nil

internal/worker/catchup_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func TestCatchupWorkerPollsForNewRangesInsteadOfExiting(t *testing.T) {
2525
})
2626

2727
store := &stubBlockStore{}
28+
catchupStore := &stubCatchupStore{}
2829
ctx, cancel := context.WithCancel(context.Background())
2930
defer cancel()
3031

@@ -39,6 +40,7 @@ func TestCatchupWorkerPollsForNewRangesInsteadOfExiting(t *testing.T) {
3940
},
4041
chain: &stubIndexer{name: "aptos", internalCode: "APTOS_TESTNET", networkType: enum.NetworkTypeApt, latest: 100},
4142
blockStore: store,
43+
catchupStore: catchupStore,
4244
statusRegistry: statusRegistry,
4345
},
4446
blockRanges: []blockstore.CatchupRange{},
@@ -51,7 +53,7 @@ func TestCatchupWorkerPollsForNewRangesInsteadOfExiting(t *testing.T) {
5153
go func() {
5254
time.Sleep(100 * time.Millisecond)
5355
// Simulate regular worker creating catchup ranges in the store
54-
store.catchupProgress = []blockstore.CatchupRange{
56+
catchupStore.catchupProgress = []blockstore.CatchupRange{
5557
{Start: 50, End: 69, Current: 49},
5658
{Start: 70, End: 89, Current: 69},
5759
}

internal/worker/factory.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/fystack/multichain-indexer/pkg/ratelimiter"
3030
"github.com/fystack/multichain-indexer/pkg/repository"
3131
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
32+
"github.com/fystack/multichain-indexer/pkg/store/catchupstore"
3233
"github.com/fystack/multichain-indexer/pkg/store/pubkeystore"
3334
tonaddr "github.com/xssnick/tonutils-go/address"
3435
"gorm.io/gorm"
@@ -39,6 +40,7 @@ type WorkerDeps struct {
3940
Ctx context.Context
4041
KVStore infra.KVStore
4142
BlockStore blockstore.Store
43+
CatchupStore catchupstore.Store
4244
Emitter events.Emitter
4345
Pubkey pubkeystore.Store
4446
Redis infra.RedisClient
@@ -104,6 +106,7 @@ func BuildWorkers(
104106
cfg,
105107
deps.KVStore,
106108
deps.BlockStore,
109+
deps.CatchupStore,
107110
deps.Emitter,
108111
deps.Pubkey,
109112
deps.FailedChan,
@@ -118,6 +121,7 @@ func BuildWorkers(
118121
cfg,
119122
deps.KVStore,
120123
deps.BlockStore,
124+
deps.CatchupStore,
121125
deps.Emitter,
122126
deps.Pubkey,
123127
deps.FailedChan,
@@ -778,10 +782,11 @@ func CreateManagerWithWorkers(
778782
) *Manager {
779783
// Shared stores
780784
blockStore := blockstore.NewBlockStore(kvstore)
785+
catchupStore := catchupstore.New(redisClient)
781786
pubkeyStore := pubkeystore.NewPublicKeyStore(addressBF)
782787
statusRegistry := status.NewRegistry()
783788

784-
manager := NewManager(ctx, kvstore, blockStore, emitter, pubkeyStore)
789+
manager := NewManager(ctx, kvstore, blockStore, catchupStore, emitter, pubkeyStore)
785790
manager.registry = statusRegistry
786791

787792
// Loop each chain
@@ -818,15 +823,6 @@ func CreateManagerWithWorkers(
818823
if existingFailed, err := blockStore.GetFailedBlocks(idxr.GetNetworkInternalCode()); err == nil {
819824
statusRegistry.SetFailedBlocks(idxr.GetName(), existingFailed)
820825
}
821-
if existingCatchup, err := blockStore.GetCatchupProgress(idxr.GetNetworkInternalCode()); err == nil {
822-
statusRegistry.SetCatchupRanges(idxr.GetName(), existingCatchup)
823-
} else {
824-
logger.Warn("Failed to load catchup progress for status registry",
825-
"chain", chainName,
826-
"internal_code", idxr.GetNetworkInternalCode(),
827-
"error", err,
828-
)
829-
}
830826

831827
failedChan := make(chan FailedBlockEvent, 100)
832828

@@ -835,6 +831,7 @@ func CreateManagerWithWorkers(
835831
Ctx: ctx,
836832
KVStore: kvstore,
837833
BlockStore: blockStore,
834+
CatchupStore: catchupStore,
838835
Emitter: emitter,
839836
Pubkey: pubkeyStore,
840837
Redis: redisClient,

internal/worker/factory_test.go

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,17 @@ package worker
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"log/slog"
87
"testing"
98
"time"
109

1110
"github.com/fystack/multichain-indexer/pkg/common/config"
12-
"github.com/fystack/multichain-indexer/pkg/common/constant"
1311
"github.com/fystack/multichain-indexer/pkg/common/enum"
1412
commonlogger "github.com/fystack/multichain-indexer/pkg/common/logger"
1513
"github.com/fystack/multichain-indexer/pkg/events"
1614
"github.com/fystack/multichain-indexer/pkg/infra"
17-
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
1815
"github.com/hashicorp/consul/api"
16+
"github.com/redis/go-redis/v9"
1917
"github.com/stretchr/testify/require"
2018
)
2119

@@ -173,21 +171,18 @@ func TestCreateManagerWithWorkersBootstrapsCatchupRangesIntoStatusRegistry(t *te
173171
},
174172
}
175173

176-
kv := &listKVStore{
177-
pairs: []*infra.KVPair{{
178-
Key: fmt.Sprintf("%s/%s/%s/%d-%d", blockstore.BlockStates, "a", constant.KVPrefixProgressCatchup, 1, 20),
179-
Value: []byte("10"),
180-
}},
181-
}
174+
redisClient, cleanup := setupFactoryTestRedis(t)
175+
defer cleanup()
176+
require.NoError(t, redisClient.HSet(context.Background(), "catchup_progress:a", "1-20", "10").Err())
182177

183178
manager := CreateManagerWithWorkers(
184179
context.Background(),
185180
cfg,
186-
kv,
181+
noopKVStore{},
187182
nil,
188183
nil,
189184
events.Emitter(nil),
190-
nil,
185+
&factoryTestRedisClient{client: redisClient},
191186
ManagerConfig{
192187
Chains: []string{"chain-a"},
193188
},
@@ -197,6 +192,14 @@ func TestCreateManagerWithWorkersBootstrapsCatchupRangesIntoStatusRegistry(t *te
197192
require.Len(t, resp.Networks, 1)
198193
require.Equal(t, 1, resp.Networks[0].CatchupRanges)
199194
require.Equal(t, uint64(10), resp.Networks[0].CatchupPendingBlocks)
195+
196+
require.NoError(t, redisClient.Del(context.Background(), "catchup_progress:a").Err())
197+
require.NoError(t, redisClient.HSet(context.Background(), "catchup_progress:a", "31-40", "35").Err())
198+
199+
resp = manager.StatusSnapshot("1.0.0")
200+
require.Len(t, resp.Networks, 1)
201+
require.Equal(t, 1, resp.Networks[0].CatchupRanges)
202+
require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks)
200203
}
201204

202205
type noopKVStore struct{}
@@ -244,3 +247,56 @@ func (s *listKVStore) List(prefix string) ([]*infra.KVPair, error) {
244247
}
245248
return out, nil
246249
}
250+
251+
type factoryTestRedisClient struct {
252+
client *redis.Client
253+
}
254+
255+
func (r *factoryTestRedisClient) GetClient() *redis.Client { return r.client }
256+
func (r *factoryTestRedisClient) Set(key string, value any, expiration time.Duration) error {
257+
return r.client.Set(context.Background(), key, value, expiration).Err()
258+
}
259+
func (r *factoryTestRedisClient) Get(key string) (string, error) {
260+
return r.client.Get(context.Background(), key).Result()
261+
}
262+
func (r *factoryTestRedisClient) Del(keys ...string) error {
263+
return r.client.Del(context.Background(), keys...).Err()
264+
}
265+
func (r *factoryTestRedisClient) ZAdd(key string, members ...redis.Z) error {
266+
return r.client.ZAdd(context.Background(), key, members...).Err()
267+
}
268+
func (r *factoryTestRedisClient) ZRem(key string, members ...interface{}) error {
269+
return r.client.ZRem(context.Background(), key, members...).Err()
270+
}
271+
func (r *factoryTestRedisClient) ZRange(key string, start, stop int64) ([]string, error) {
272+
return r.client.ZRange(context.Background(), key, start, stop).Result()
273+
}
274+
func (r *factoryTestRedisClient) ZRangeWithScores(key string, start, stop int64) ([]redis.Z, error) {
275+
return r.client.ZRangeWithScores(context.Background(), key, start, stop).Result()
276+
}
277+
func (r *factoryTestRedisClient) ZRevRangeWithScores(key string, start, stop int64) ([]redis.Z, error) {
278+
return r.client.ZRevRangeWithScores(context.Background(), key, start, stop).Result()
279+
}
280+
func (r *factoryTestRedisClient) Close() error { return r.client.Close() }
281+
282+
func setupFactoryTestRedis(t *testing.T) (*redis.Client, func()) {
283+
t.Helper()
284+
285+
client := redis.NewClient(&redis.Options{
286+
Addr: "localhost:6379",
287+
DB: 13,
288+
})
289+
290+
ctx := context.Background()
291+
if _, err := client.Ping(ctx).Result(); err != nil {
292+
t.Skip("Redis not available")
293+
}
294+
require.NoError(t, client.FlushDB(ctx).Err())
295+
296+
cleanup := func() {
297+
_ = client.FlushDB(ctx).Err()
298+
_ = client.Close()
299+
}
300+
301+
return client, cleanup
302+
}

0 commit comments

Comments
 (0)