diff --git a/internal/worker/base.go b/internal/worker/base.go index a36fb8a..a00d619 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -17,6 +17,7 @@ import ( "github.com/fystack/multichain-indexer/pkg/infra" "github.com/fystack/multichain-indexer/pkg/retry" "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/fystack/multichain-indexer/pkg/store/catchupstore" "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) @@ -44,6 +45,7 @@ type BaseWorker struct { chain indexer.Indexer kvstore infra.KVStore blockStore blockstore.Store + catchupStore catchupstore.Store pubkeyStore pubkeystore.Store emitter events.Emitter failedChan chan FailedBlockEvent @@ -64,6 +66,7 @@ func newWorkerWithMode( cfg config.ChainConfig, kv infra.KVStore, blockStore blockstore.Store, + catchupStore catchupstore.Store, emitter events.Emitter, pubkeyStore pubkeystore.Store, mode WorkerMode, @@ -85,6 +88,7 @@ func newWorkerWithMode( chain: chain, kvstore: kv, blockStore: blockStore, + catchupStore: catchupStore, pubkeyStore: pubkeyStore, emitter: emitter, failedChan: failedChan, diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index 2af7cdb..7f6d50f 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -13,6 +13,7 @@ import ( "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/fystack/multichain-indexer/pkg/store/catchupstore" "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) @@ -36,6 +37,7 @@ func NewCatchupWorker( cfg config.ChainConfig, kv infra.KVStore, blockStore blockstore.Store, + catchupStore catchupstore.Store, emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, @@ -47,6 +49,7 @@ func NewCatchupWorker( cfg, kv, blockStore, + catchupStore, emitter, pubkeyStore, ModeCatchup, @@ -112,65 +115,23 @@ func (cw *CatchupWorker) runCatchup() { } func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { - registry := status.EnsureStatusRegistry(cw.statusRegistry) - var ranges []blockstore.CatchupRange - - // Load existing catchup ranges from database (they're already split when saved) - if progress, err := cw.blockStore.GetCatchupProgress(cw.chain.GetNetworkInternalCode()); err == nil { - cw.logger.Info("Loading existing catchup progress", - "chain", cw.chain.GetName(), - "progress_ranges", len(progress), - ) - ranges = progress - registry.SetCatchupRanges(cw.chain.GetName(), progress) - } else { - cw.logger.Warn("Failed to load catchup progress, will create new range", + // Load existing catchup ranges from the store. The catchup worker only loads + // ranges; creating new ranges is the responsibility of the regular worker + // (via determineStartingBlock or skipAheadIfLagging). + progress, err := cw.catchupStore.GetProgress(cw.ctx, cw.chain.GetNetworkInternalCode()) + if err != nil { + cw.logger.Warn("Failed to load catchup progress", "chain", cw.chain.GetName(), "error", err, ) + return nil } - // Only create a new range if no existing ranges found - if len(ranges) == 0 { - if latest, err1 := cw.blockStore.GetLatestBlock(cw.chain.GetNetworkInternalCode()); err1 == nil { - if head, err2 := cw.chain.GetLatestBlockNumber(cw.ctx); err2 == nil && head > latest { - if head <= latest { - // no gap between head and latest - return ranges - } - start, end := latest+1, head - cw.logger.Info("Creating new catchup range", - "chain", cw.chain.GetName(), - "latest_block", latest, - "head_block", head, - "catchup_start", start, "catchup_end", end, - "blocks_to_catchup", end-latest, - ) - - // Split new range if it's too large - newRanges := cw.splitLargeRange(blockstore.CatchupRange{ - Start: start, End: end, Current: start - 1, - }) - - // Batch save all split ranges to database - if err := cw.blockStore.SaveCatchupRanges( - cw.chain.GetNetworkInternalCode(), - newRanges, - ); err != nil { - cw.logger.Error("Failed to batch save catchup ranges", - "chain", cw.chain.GetName(), - "count", len(newRanges), - "error", err, - ) - } else { - registry.UpsertCatchupRanges(cw.chain.GetName(), newRanges) - } - ranges = append(ranges, newRanges...) - } - } - } - - return ranges + cw.logger.Info("Loaded catchup progress", + "chain", cw.chain.GetName(), + "progress_ranges", len(progress), + ) + return progress } // Split large ranges into smaller, more manageable chunks @@ -195,15 +156,8 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error { return nil } - // Process multiple ranges in parallel + // Claim multiple ranges in parallel so multiple instances can split work. var wg sync.WaitGroup - rangeChan := make(chan blockstore.CatchupRange, len(cw.blockRanges)) - - // Fill channel with ranges - for _, r := range cw.blockRanges { - rangeChan <- r - } - close(rangeChan) // Start parallel workers for i := 0; i < CATCHUP_WORKERS; i++ { @@ -213,7 +167,11 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error { defer cw.recoverPanic(fmt.Sprintf("catchup range worker %d", workerID)) cw.logger.Debug("Starting catchup worker", "worker_id", workerID) - for r := range rangeChan { + for { + r, ok := cw.claimNextRange() + if !ok { + return + } if err := cw.processRange(r, workerID); err != nil { cw.logger.Error("Failed to process range", "worker_id", workerID, @@ -233,6 +191,27 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error { return nil } +func (cw *CatchupWorker) claimNextRange() (blockstore.CatchupRange, bool) { + claimed, err := cw.catchupStore.GetNextRange(cw.ctx, cw.chain.GetNetworkInternalCode()) + if err != nil { + cw.logger.Warn("Failed to claim catchup range", + "chain", cw.chain.GetName(), + "error", err, + ) + return blockstore.CatchupRange{}, false + } + if claimed == nil { + return blockstore.CatchupRange{}, false + } + + cw.logger.Info("Claimed catchup range", + "chain", cw.chain.GetName(), + "range", fmt.Sprintf("%d-%d", claimed.Start, claimed.End), + "current", claimed.Current, + ) + return *claimed, true +} + func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) error { batchCount := 0 startTime := time.Now() @@ -353,14 +332,13 @@ func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) e func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) { cw.progressMu.Lock() defer cw.progressMu.Unlock() - registry := status.EnsureStatusRegistry(cw.statusRegistry) cw.logger.Debug("Saving catchup progress", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), "current", current, ) current = min(current, r.End) - if err := cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil { + if err := cw.catchupStore.SaveProgress(cw.ctx, cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil { cw.logger.Warn("Failed to save catchup progress", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), @@ -369,11 +347,6 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) ) return } - registry.UpsertCatchupRanges(cw.chain.GetName(), []blockstore.CatchupRange{{ - Start: r.Start, - End: r.End, - Current: current, - }}) for i := range cw.blockRanges { if cw.blockRanges[i].Start == r.Start && cw.blockRanges[i].End == r.End { cw.blockRanges[i].Current = current @@ -385,14 +358,13 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { cw.progressMu.Lock() defer cw.progressMu.Unlock() - registry := status.EnsureStatusRegistry(cw.statusRegistry) cw.logger.Info("Completing catchup range", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), ) - if err := cw.blockStore.DeleteCatchupRange(cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil { + if err := cw.catchupStore.DeleteRange(cw.ctx, cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil { cw.logger.Warn("Failed to delete catchup range", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), @@ -400,8 +372,6 @@ func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { ) return err } - registry.DeleteCatchupRange(cw.chain.GetName(), r.Start, r.End) - // Remove from local ranges for i, existing := range cw.blockRanges { if existing.Start == r.Start && existing.End == r.End { @@ -440,15 +410,12 @@ func (cw *CatchupWorker) Close() error { ) } - if err := cw.blockStore.SaveCatchupRanges(cw.chain.GetNetworkInternalCode(), rangesToSave); err != nil { + if err := cw.catchupStore.SaveRanges(cw.ctx, cw.chain.GetNetworkInternalCode(), rangesToSave); err != nil { cw.logger.Error("Failed to batch save progress on close", "chain", cw.chain.GetName(), "ranges", len(rangesToSave), "error", err, ) - } else { - registry := status.EnsureStatusRegistry(cw.statusRegistry) - registry.UpsertCatchupRanges(cw.chain.GetName(), rangesToSave) } return nil diff --git a/internal/worker/catchup_test.go b/internal/worker/catchup_test.go new file mode 100644 index 0000000..8647e19 --- /dev/null +++ b/internal/worker/catchup_test.go @@ -0,0 +1,239 @@ +package worker + +import ( + "context" + "fmt" + "io" + "log/slog" + "sync" + "testing" + "time" + + "github.com/fystack/multichain-indexer/internal/status" + "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/infra" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/fystack/multichain-indexer/pkg/store/catchupstore" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +func TestCatchupWorkerPollsForNewRangesInsteadOfExiting(t *testing.T) { + t.Parallel() + + statusRegistry := status.NewRegistry() + statusRegistry.RegisterChain("aptos", "aptos_testnet", config.ChainConfig{ + NetworkId: "aptos_testnet", + InternalCode: "APTOS_TESTNET", + Type: enum.NetworkTypeApt, + }) + + store := &stubBlockStore{} + catchupStore := &stubCatchupStore{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cw := &CatchupWorker{ + BaseWorker: &BaseWorker{ + ctx: ctx, + cancel: cancel, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + config: config.ChainConfig{ + PollInterval: time.Millisecond, + Throttle: config.Throttle{BatchSize: 20}, + }, + chain: &stubIndexer{name: "aptos", internalCode: "APTOS_TESTNET", networkType: enum.NetworkTypeApt, latest: 100}, + blockStore: store, + catchupStore: catchupStore, + statusRegistry: statusRegistry, + }, + blockRanges: []blockstore.CatchupRange{}, + workerPool: make(chan struct{}, CATCHUP_WORKERS), + } + + // Simulate: catchup worker starts with no ranges (empty after state wipe). + // After a short delay, the regular worker saves new catchup ranges to the store. + // The catchup worker should poll and pick them up instead of exiting. + go func() { + time.Sleep(100 * time.Millisecond) + // Simulate regular worker creating catchup ranges in the store + catchupStore.catchupProgress = []blockstore.CatchupRange{ + {Start: 50, End: 69, Current: 49}, + {Start: 70, End: 89, Current: 69}, + } + // Give the catchup worker time to poll and load the ranges + time.Sleep(200 * time.Millisecond) + cancel() + }() + + // runCatchup should NOT exit immediately when blockRanges is empty. + // It should poll, pick up the new ranges, and only stop when ctx is cancelled. + done := make(chan struct{}) + go func() { + cw.runCatchup() + close(done) + }() + + select { + case <-done: + // Verify the catchup worker loaded the new ranges before exiting + resp := statusRegistry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + // The worker should have picked up ranges from the store + // (it may have processed and completed them, or they may still be pending) + case <-time.After(5 * time.Second): + cancel() + t.Fatal("catchup worker did not exit after context cancellation") + } +} + +func TestCatchupWorkersClaimDistinctRangesAcrossInstances(t *testing.T) { + t.Parallel() + + client, cleanup := setupWorkerTestRedis(t) + defer cleanup() + + statusRegistry := status.NewRegistry() + statusRegistry.RegisterChain("aptos", "aptos_testnet", config.ChainConfig{ + NetworkId: "aptos_testnet", + InternalCode: "APTOS_TESTNET", + Type: enum.NetworkTypeApt, + }) + + store := catchupstore.New(&workerTestRedisClient{client: client}) + ctx1, cancel1 := context.WithCancel(context.Background()) + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel1() + defer cancel2() + + var mu sync.Mutex + seen := make([]string, 0, 2) + indexerStub := &stubIndexer{ + name: "aptos", + internalCode: "APTOS_TESTNET", + networkType: enum.NetworkTypeApt, + getBlocksFunc: func(_ context.Context, from, to uint64, _ bool) ([]indexer.BlockResult, error) { + mu.Lock() + seen = append(seen, fmt.Sprintf("%d-%d", from, to)) + mu.Unlock() + return nil, nil + }, + } + + require.NoError(t, store.SaveRanges(context.Background(), "APTOS_TESTNET", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 0}, + {Start: 11, End: 20, Current: 10}, + })) + + newWorker := func(ctx context.Context) *CatchupWorker { + return &CatchupWorker{ + BaseWorker: &BaseWorker{ + ctx: ctx, + cancel: func() {}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + config: config.ChainConfig{ + PollInterval: time.Millisecond, + Throttle: config.Throttle{BatchSize: 20}, + }, + chain: indexerStub, + blockStore: &stubBlockStore{}, + catchupStore: store, + statusRegistry: statusRegistry, + }, + blockRanges: []blockstore.CatchupRange{}, + workerPool: make(chan struct{}, CATCHUP_WORKERS), + } + } + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + w := newWorker(ctx1) + w.runCatchup() + }() + go func() { + defer wg.Done() + w := newWorker(ctx2) + w.runCatchup() + }() + + time.Sleep(300 * time.Millisecond) + cancel1() + cancel2() + + wg.Wait() + mu.Lock() + defer mu.Unlock() + require.ElementsMatch(t, []string{"1-10", "11-20"}, seen) +} + +type workerTestRedisClient struct { + client *redis.Client +} + +var _ infra.RedisClient = (*workerTestRedisClient)(nil) + +func (r *workerTestRedisClient) GetClient() *redis.Client { + return r.client +} + +func (r *workerTestRedisClient) Set(key string, value any, expiration time.Duration) error { + return r.client.Set(context.Background(), key, value, expiration).Err() +} + +func (r *workerTestRedisClient) Get(key string) (string, error) { + return r.client.Get(context.Background(), key).Result() +} + +func (r *workerTestRedisClient) Del(keys ...string) error { + return r.client.Del(context.Background(), keys...).Err() +} + +func (r *workerTestRedisClient) ZAdd(key string, members ...redis.Z) error { + return r.client.ZAdd(context.Background(), key, members...).Err() +} + +func (r *workerTestRedisClient) ZRem(key string, members ...interface{}) error { + return r.client.ZRem(context.Background(), key, members...).Err() +} + +func (r *workerTestRedisClient) ZRange(key string, start, stop int64) ([]string, error) { + return r.client.ZRange(context.Background(), key, start, stop).Result() +} + +func (r *workerTestRedisClient) ZRangeWithScores(key string, start, stop int64) ([]redis.Z, error) { + return r.client.ZRangeWithScores(context.Background(), key, start, stop).Result() +} + +func (r *workerTestRedisClient) ZRevRangeWithScores(key string, start, stop int64) ([]redis.Z, error) { + return r.client.ZRevRangeWithScores(context.Background(), key, start, stop).Result() +} + +func (r *workerTestRedisClient) Close() error { + return r.client.Close() +} + +func setupWorkerTestRedis(t *testing.T) (*redis.Client, func()) { + t.Helper() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 13, + }) + + ctx := context.Background() + if _, err := client.Ping(ctx).Result(); err != nil { + t.Skip("Redis not available") + } + require.NoError(t, client.FlushDB(ctx).Err()) + + cleanup := func() { + _ = client.FlushDB(ctx).Err() + _ = client.Close() + } + + return client, cleanup +} diff --git a/internal/worker/factory.go b/internal/worker/factory.go index e5d0119..7f29f6f 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -31,6 +31,7 @@ import ( "github.com/fystack/multichain-indexer/pkg/ratelimiter" "github.com/fystack/multichain-indexer/pkg/repository" "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/fystack/multichain-indexer/pkg/store/catchupstore" "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" tonaddr "github.com/xssnick/tonutils-go/address" "gorm.io/gorm" @@ -41,6 +42,7 @@ type WorkerDeps struct { Ctx context.Context KVStore infra.KVStore BlockStore blockstore.Store + CatchupStore catchupstore.Store Emitter events.Emitter Pubkey pubkeystore.Store Redis infra.RedisClient @@ -106,6 +108,7 @@ func BuildWorkers( cfg, deps.KVStore, deps.BlockStore, + deps.CatchupStore, deps.Emitter, deps.Pubkey, deps.FailedChan, @@ -120,6 +123,7 @@ func BuildWorkers( cfg, deps.KVStore, deps.BlockStore, + deps.CatchupStore, deps.Emitter, deps.Pubkey, deps.FailedChan, @@ -863,10 +867,11 @@ func CreateManagerWithWorkers( ) *Manager { // Shared stores blockStore := blockstore.NewBlockStore(kvstore) + catchupStore := catchupstore.New(redisClient) pubkeyStore := pubkeystore.NewPublicKeyStore(addressBF) statusRegistry := status.NewRegistry() - manager := NewManager(ctx, kvstore, blockStore, emitter, pubkeyStore) + manager := NewManager(ctx, kvstore, blockStore, catchupStore, emitter, pubkeyStore) manager.registry = statusRegistry // Loop each chain @@ -907,15 +912,6 @@ func CreateManagerWithWorkers( if existingFailed, err := blockStore.GetFailedBlocks(idxr.GetNetworkInternalCode()); err == nil { statusRegistry.SetFailedBlocks(idxr.GetName(), existingFailed) } - if existingCatchup, err := blockStore.GetCatchupProgress(idxr.GetNetworkInternalCode()); err == nil { - statusRegistry.SetCatchupRanges(idxr.GetName(), existingCatchup) - } else { - logger.Warn("Failed to load catchup progress for status registry", - "chain", chainName, - "internal_code", idxr.GetNetworkInternalCode(), - "error", err, - ) - } failedChan := make(chan FailedBlockEvent, 100) @@ -924,6 +920,7 @@ func CreateManagerWithWorkers( Ctx: ctx, KVStore: kvstore, BlockStore: blockStore, + CatchupStore: catchupStore, Emitter: emitter, Pubkey: pubkeyStore, Redis: redisClient, diff --git a/internal/worker/factory_test.go b/internal/worker/factory_test.go index be6261b..6fc6b74 100644 --- a/internal/worker/factory_test.go +++ b/internal/worker/factory_test.go @@ -3,19 +3,17 @@ package worker import ( "context" "errors" - "fmt" "log/slog" "testing" "time" "github.com/fystack/multichain-indexer/pkg/common/config" - "github.com/fystack/multichain-indexer/pkg/common/constant" "github.com/fystack/multichain-indexer/pkg/common/enum" commonlogger "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" - "github.com/fystack/multichain-indexer/pkg/store/blockstore" "github.com/hashicorp/consul/api" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) @@ -173,21 +171,18 @@ func TestCreateManagerWithWorkersBootstrapsCatchupRangesIntoStatusRegistry(t *te }, } - kv := &listKVStore{ - pairs: []*infra.KVPair{{ - Key: fmt.Sprintf("%s/%s/%s/%d-%d", blockstore.BlockStates, "a", constant.KVPrefixProgressCatchup, 1, 20), - Value: []byte("10"), - }}, - } + redisClient, cleanup := setupFactoryTestRedis(t) + defer cleanup() + require.NoError(t, redisClient.HSet(context.Background(), "catchup_progress:a", "1-20", "10").Err()) manager := CreateManagerWithWorkers( context.Background(), cfg, - kv, + noopKVStore{}, nil, nil, events.Emitter(nil), - nil, + &factoryTestRedisClient{client: redisClient}, ManagerConfig{ Chains: []string{"chain-a"}, }, @@ -197,6 +192,14 @@ func TestCreateManagerWithWorkersBootstrapsCatchupRangesIntoStatusRegistry(t *te require.Len(t, resp.Networks, 1) require.Equal(t, 1, resp.Networks[0].CatchupRanges) require.Equal(t, uint64(10), resp.Networks[0].CatchupPendingBlocks) + + require.NoError(t, redisClient.Del(context.Background(), "catchup_progress:a").Err()) + require.NoError(t, redisClient.HSet(context.Background(), "catchup_progress:a", "31-40", "35").Err()) + + resp = manager.StatusSnapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks) } type noopKVStore struct{} @@ -244,3 +247,56 @@ func (s *listKVStore) List(prefix string) ([]*infra.KVPair, error) { } return out, nil } + +type factoryTestRedisClient struct { + client *redis.Client +} + +func (r *factoryTestRedisClient) GetClient() *redis.Client { return r.client } +func (r *factoryTestRedisClient) Set(key string, value any, expiration time.Duration) error { + return r.client.Set(context.Background(), key, value, expiration).Err() +} +func (r *factoryTestRedisClient) Get(key string) (string, error) { + return r.client.Get(context.Background(), key).Result() +} +func (r *factoryTestRedisClient) Del(keys ...string) error { + return r.client.Del(context.Background(), keys...).Err() +} +func (r *factoryTestRedisClient) ZAdd(key string, members ...redis.Z) error { + return r.client.ZAdd(context.Background(), key, members...).Err() +} +func (r *factoryTestRedisClient) ZRem(key string, members ...interface{}) error { + return r.client.ZRem(context.Background(), key, members...).Err() +} +func (r *factoryTestRedisClient) ZRange(key string, start, stop int64) ([]string, error) { + return r.client.ZRange(context.Background(), key, start, stop).Result() +} +func (r *factoryTestRedisClient) ZRangeWithScores(key string, start, stop int64) ([]redis.Z, error) { + return r.client.ZRangeWithScores(context.Background(), key, start, stop).Result() +} +func (r *factoryTestRedisClient) ZRevRangeWithScores(key string, start, stop int64) ([]redis.Z, error) { + return r.client.ZRevRangeWithScores(context.Background(), key, start, stop).Result() +} +func (r *factoryTestRedisClient) Close() error { return r.client.Close() } + +func setupFactoryTestRedis(t *testing.T) (*redis.Client, func()) { + t.Helper() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 13, + }) + + ctx := context.Background() + if _, err := client.Ping(ctx).Result(); err != nil { + t.Skip("Redis not available") + } + require.NoError(t, client.FlushDB(ctx).Err()) + + cleanup := func() { + _ = client.FlushDB(ctx).Err() + _ = client.Close() + } + + return client, cleanup +} diff --git a/internal/worker/manager.go b/internal/worker/manager.go index bcf6e6a..1cda23d 100644 --- a/internal/worker/manager.go +++ b/internal/worker/manager.go @@ -11,34 +11,38 @@ import ( "github.com/fystack/multichain-indexer/pkg/infra" "github.com/fystack/multichain-indexer/pkg/ratelimiter" "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/fystack/multichain-indexer/pkg/store/catchupstore" "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) const defaultShutdownTimeout = 30 * time.Second type Manager struct { - ctx context.Context - workers []Worker - kvstore infra.KVStore - blockStore blockstore.Store - emitter events.Emitter - pubkeyStore pubkeystore.Store - registry *status.Registry + ctx context.Context + workers []Worker + kvstore infra.KVStore + blockStore blockstore.Store + catchupStore catchupstore.Store + emitter events.Emitter + pubkeyStore pubkeystore.Store + registry *status.Registry } func NewManager( ctx context.Context, kvstore infra.KVStore, blockStore blockstore.Store, + catchupStore catchupstore.Store, emitter events.Emitter, pubkeyStore pubkeystore.Store, ) *Manager { return &Manager{ - ctx: ctx, - kvstore: kvstore, - blockStore: blockStore, - emitter: emitter, - pubkeyStore: pubkeyStore, + ctx: ctx, + kvstore: kvstore, + blockStore: blockStore, + catchupStore: catchupStore, + emitter: emitter, + pubkeyStore: pubkeyStore, } } @@ -101,6 +105,21 @@ func (m *Manager) StatusSnapshot(version string) status.StatusResponse { Networks: []status.NetworkStatus{}, } } + + if m.catchupStore != nil { + for _, network := range m.registry.Snapshot(version).Networks { + ranges, err := m.catchupStore.GetProgress(m.ctx, network.InternalCode) + if err != nil { + logger.Warn("Failed to refresh catchup progress for status snapshot", + "chain", network.ChainName, + "internal_code", network.InternalCode, + "error", err, + ) + continue + } + m.registry.SetCatchupRanges(network.ChainName, ranges) + } + } return m.registry.Snapshot(version) } diff --git a/internal/worker/manual.go b/internal/worker/manual.go index 64ebe62..5bf4dcf 100644 --- a/internal/worker/manual.go +++ b/internal/worker/manual.go @@ -51,6 +51,7 @@ func NewManualWorker( cfg, kv, blockStore, + nil, emitter, pubkeyStore, ModeManual, diff --git a/internal/worker/mempool.go b/internal/worker/mempool.go index 9bf9f75..a608ee4 100644 --- a/internal/worker/mempool.go +++ b/internal/worker/mempool.go @@ -42,6 +42,7 @@ func NewMempoolWorker( cfg, kv, blockStore, + nil, emitter, pubkeyStore, ModeMempool, diff --git a/internal/worker/regular.go b/internal/worker/regular.go index 4d35d76..a82b35e 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -14,6 +14,7 @@ import ( "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/fystack/multichain-indexer/pkg/store/catchupstore" "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) @@ -41,6 +42,7 @@ func NewRegularWorker( cfg config.ChainConfig, kv infra.KVStore, blockStore blockstore.Store, + catchupStore catchupstore.Store, emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, @@ -52,6 +54,7 @@ func NewRegularWorker( cfg, kv, blockStore, + catchupStore, emitter, pubkeyStore, ModeRegular, @@ -182,7 +185,6 @@ func (rw *RegularWorker) processRegularBlocks() error { } func (rw *RegularWorker) determineStartingBlock() uint64 { - registry := status.EnsureStatusRegistry(rw.statusRegistry) chainLatest, err1 := rw.chain.GetLatestBlockNumber(rw.ctx) kvLatest, err2 := rw.blockStore.GetLatestBlock(rw.chain.GetNetworkInternalCode()) @@ -216,7 +218,8 @@ func (rw *RegularWorker) determineStartingBlock() uint64 { }, MAX_RANGE_SIZE) // Batch save all split ranges - if err := rw.blockStore.SaveCatchupRanges( + if err := rw.catchupStore.SaveRanges( + rw.ctx, rw.chain.GetNetworkInternalCode(), ranges, ); err != nil { @@ -225,8 +228,6 @@ func (rw *RegularWorker) determineStartingBlock() uint64 { "count", len(ranges), "error", err, ) - } else { - registry.UpsertCatchupRanges(rw.chain.GetName(), ranges) } rw.logger.Info("Queued catchup ranges", @@ -354,7 +355,6 @@ func (rw *RegularWorker) flushBlockHashes() { // skipAheadIfLagging checks if the regular worker is too far behind the chain head. // If so, it queues the skipped range for catchup and jumps currentBlock to chain head. func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { - registry := status.EnsureStatusRegistry(rw.statusRegistry) maxLag := rw.config.MaxLag if maxLag == 0 { maxLag = constant.DefaultMaxLag @@ -380,7 +380,8 @@ func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { Start: skipStart, End: skipEnd, Current: skipStart - 1, }, MAX_RANGE_SIZE) - if err := rw.blockStore.SaveCatchupRanges( + if err := rw.catchupStore.SaveRanges( + rw.ctx, rw.chain.GetNetworkInternalCode(), ranges, ); err != nil { @@ -389,8 +390,6 @@ func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { "count", len(ranges), "error", err, ) - } else { - registry.UpsertCatchupRanges(rw.chain.GetName(), ranges) } rw.currentBlock = latest diff --git a/internal/worker/regular_test.go b/internal/worker/regular_test.go index 73573f1..380c8a7 100644 --- a/internal/worker/regular_test.go +++ b/internal/worker/regular_test.go @@ -14,6 +14,7 @@ import ( "github.com/fystack/multichain-indexer/pkg/common/enum" "github.com/fystack/multichain-indexer/pkg/common/types" "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/fystack/multichain-indexer/pkg/store/catchupstore" "github.com/stretchr/testify/require" ) @@ -69,7 +70,8 @@ func TestRegularWorkerProcessRegularBlocksRecoversGapViaGetBlock(t *testing.T) { }, } store := &stubBlockStore{} - rw := newTestRegularWorker(chain, store, 100, 3) + catchupStore := &stubCatchupStore{} + rw := newTestRegularWorker(chain, store, catchupStore, 100, 3) err := rw.processRegularBlocks() require.NoError(t, err) @@ -108,7 +110,8 @@ func TestRegularWorkerProcessRegularBlocksMarksUnresolvedGapFailed(t *testing.T) }, } store := &stubBlockStore{} - rw := newTestRegularWorker(chain, store, 100, 2) + catchupStore := &stubCatchupStore{} + rw := newTestRegularWorker(chain, store, catchupStore, 100, 2) err := rw.processRegularBlocks() require.Error(t, err) @@ -155,25 +158,23 @@ func TestRegularWorkerDetermineStartingBlockUpdatesCatchupRegistry(t *testing.T) latest: 25, } store := &stubBlockStore{latestBlock: 20} + catchupStore := &stubCatchupStore{} statusRegistry := status.NewRegistry() statusRegistry.RegisterChain("ethereum", "ethereum", config.ChainConfig{ NetworkId: "eth-mainnet", InternalCode: "ETH", Type: enum.NetworkTypeEVM, }) - rw := newTestRegularWorker(chain, store, 20, 2) + rw := newTestRegularWorker(chain, store, catchupStore, 20, 2) rw.statusRegistry = statusRegistry start := rw.determineStartingBlock() require.Equal(t, uint64(25), start) - - resp := statusRegistry.Snapshot("1.0.0") - require.Len(t, resp.Networks, 1) - require.Equal(t, 1, resp.Networks[0].CatchupRanges) - require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks) + require.Len(t, catchupStore.savedCatchupRanges, 1) + require.Equal(t, blockstore.CatchupRange{Start: 21, End: 25, Current: 20}, catchupStore.savedCatchupRanges[0]) } -func TestCatchupWorkerUpdatesCatchupRegistryOnProgressAndCompletion(t *testing.T) { +func TestCatchupWorkerPersistsProgressAndCompletion(t *testing.T) { t.Parallel() statusRegistry := status.NewRegistry() @@ -189,6 +190,7 @@ func TestCatchupWorkerUpdatesCatchupRegistryOnProgressAndCompletion(t *testing.T }}) store := &stubBlockStore{} + catchupStore := &stubCatchupStore{} cw := &CatchupWorker{ BaseWorker: &BaseWorker{ ctx: context.Background(), @@ -196,6 +198,7 @@ func TestCatchupWorkerUpdatesCatchupRegistryOnProgressAndCompletion(t *testing.T logger: slog.New(slog.NewTextHandler(io.Discard, nil)), chain: &stubIndexer{name: "ethereum", internalCode: "ETH", networkType: enum.NetworkTypeEVM}, blockStore: store, + catchupStore: catchupStore, statusRegistry: statusRegistry, }, blockRanges: []blockstore.CatchupRange{{ @@ -206,18 +209,18 @@ func TestCatchupWorkerUpdatesCatchupRegistryOnProgressAndCompletion(t *testing.T } cw.saveProgress(blockstore.CatchupRange{Start: 1, End: 10, Current: 0}, 5) - - resp := statusRegistry.Snapshot("1.0.0") - require.Len(t, resp.Networks, 1) - require.Equal(t, 1, resp.Networks[0].CatchupRanges) - require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks) + require.Equal(t, []blockstore.CatchupRange{{ + Start: 1, + End: 10, + Current: 5, + }}, catchupStore.catchupProgress) err := cw.completeRange(blockstore.CatchupRange{Start: 1, End: 10, Current: 5}) require.NoError(t, err) - - resp = statusRegistry.Snapshot("1.0.0") - require.Equal(t, 0, resp.Networks[0].CatchupRanges) - require.Equal(t, uint64(0), resp.Networks[0].CatchupPendingBlocks) + require.Equal(t, []blockstore.CatchupRange{{ + Start: 1, + End: 10, + }}, catchupStore.deleteCatchupCalls) } func testChainConfig() config.ChainConfig { @@ -229,19 +232,26 @@ func testChainConfig() config.ChainConfig { } } -func newTestRegularWorker(chain *stubIndexer, store *stubBlockStore, currentBlock uint64, batchSize int) *RegularWorker { +func newTestRegularWorker( + chain *stubIndexer, + store *stubBlockStore, + catchupStore catchupstore.Store, + currentBlock uint64, + batchSize int, +) *RegularWorker { cfg := testChainConfig() cfg.Throttle.BatchSize = batchSize return &RegularWorker{ BaseWorker: &BaseWorker{ - ctx: context.Background(), - cancel: func() {}, - logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - config: cfg, - chain: chain, - blockStore: store, - failedChan: make(chan FailedBlockEvent, 1), + ctx: context.Background(), + cancel: func() {}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + config: cfg, + chain: chain, + blockStore: store, + catchupStore: catchupStore, + failedChan: make(chan FailedBlockEvent, 1), }, currentBlock: currentBlock, blockHashes: make([]blockstore.BlockHashEntry, 0, MaxBlockHashSize), @@ -298,17 +308,10 @@ func (s *stubIndexer) IsHealthy() bool { } type stubBlockStore struct { - latestBlock uint64 - savedLatest []uint64 - failedBlocks []uint64 - savedCatchupRanges []blockstore.CatchupRange - catchupProgress []blockstore.CatchupRange - deleteCatchupCalls []blockstore.CatchupRange - getLatestBlockErr error - getCatchupProgressErr error - saveCatchupRangesErr error - saveCatchupProgressErr error - deleteCatchupErr error + latestBlock uint64 + savedLatest []uint64 + failedBlocks []uint64 + getLatestBlockErr error } func (s *stubBlockStore) GetLatestBlock(string) (uint64, error) { @@ -343,7 +346,29 @@ func (s *stubBlockStore) RemoveFailedBlocks(string, []uint64) error { return nil } -func (s *stubBlockStore) SaveCatchupRanges(_ string, ranges []blockstore.CatchupRange) error { +func (s *stubBlockStore) GetBlockHashes(string) ([]blockstore.BlockHashEntry, error) { + return nil, nil +} + +func (s *stubBlockStore) SaveBlockHashes(string, []blockstore.BlockHashEntry) error { + return nil +} + +func (s *stubBlockStore) Close() error { + return nil +} + +type stubCatchupStore struct { + savedCatchupRanges []blockstore.CatchupRange + catchupProgress []blockstore.CatchupRange + deleteCatchupCalls []blockstore.CatchupRange + getCatchupProgressErr error + saveCatchupRangesErr error + saveCatchupProgressErr error + deleteCatchupErr error +} + +func (s *stubCatchupStore) SaveRanges(_ context.Context, _ string, ranges []blockstore.CatchupRange) error { if s.saveCatchupRangesErr != nil { return s.saveCatchupRangesErr } @@ -351,7 +376,7 @@ func (s *stubBlockStore) SaveCatchupRanges(_ string, ranges []blockstore.Catchup return nil } -func (s *stubBlockStore) SaveCatchupProgress(_ string, start, end, current uint64) error { +func (s *stubCatchupStore) SaveProgress(_ context.Context, _ string, start, end, current uint64) error { if s.saveCatchupProgressErr != nil { return s.saveCatchupProgressErr } @@ -369,14 +394,22 @@ func (s *stubBlockStore) SaveCatchupProgress(_ string, start, end, current uint6 return nil } -func (s *stubBlockStore) GetCatchupProgress(string) ([]blockstore.CatchupRange, error) { +func (s *stubCatchupStore) GetProgress(_ context.Context, _ string) ([]blockstore.CatchupRange, error) { if s.getCatchupProgressErr != nil { return nil, s.getCatchupProgressErr } return append([]blockstore.CatchupRange(nil), s.catchupProgress...), nil } -func (s *stubBlockStore) DeleteCatchupRange(_ string, start, end uint64) error { +func (s *stubCatchupStore) GetNextRange(_ context.Context, _ string) (*blockstore.CatchupRange, error) { + if len(s.catchupProgress) == 0 { + return nil, nil + } + rng := s.catchupProgress[0] + return &rng, nil +} + +func (s *stubCatchupStore) DeleteRange(_ context.Context, _ string, start, end uint64) error { if s.deleteCatchupErr != nil { return s.deleteCatchupErr } @@ -394,15 +427,3 @@ func (s *stubBlockStore) DeleteCatchupRange(_ string, start, end uint64) error { s.catchupProgress = filtered return nil } - -func (s *stubBlockStore) GetBlockHashes(string) ([]blockstore.BlockHashEntry, error) { - return nil, nil -} - -func (s *stubBlockStore) SaveBlockHashes(string, []blockstore.BlockHashEntry) error { - return nil -} - -func (s *stubBlockStore) Close() error { - return nil -} diff --git a/internal/worker/rescanner.go b/internal/worker/rescanner.go index 72ff67e..9197664 100644 --- a/internal/worker/rescanner.go +++ b/internal/worker/rescanner.go @@ -55,6 +55,7 @@ func NewRescannerWorker( cfg, kv, blockStore, + nil, emitter, pubkeyStore, ModeRescanner, diff --git a/pkg/store/blockstore/store.go b/pkg/store/blockstore/store.go index 69c2729..b877366 100644 --- a/pkg/store/blockstore/store.go +++ b/pkg/store/blockstore/store.go @@ -6,10 +6,8 @@ import ( "fmt" "slices" "strconv" - "strings" "github.com/fystack/multichain-indexer/pkg/common/constant" - "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/infra" ) @@ -56,18 +54,10 @@ func failedBlocksKey(chainName string) string { } // Catchup progress keys -func composeCatchupKey(chain string) string { - return fmt.Sprintf("%s/%s/%s/", BlockStates, chain, constant.KVPrefixProgressCatchup) -} - func blockHashesKey(chainName string) string { return fmt.Sprintf("%s/%s/%s", BlockStates, chainName, constant.KVPrefixBlockHash) } -func catchupKey(chain string, start, end uint64) string { - return fmt.Sprintf("%s/%s/%s/%d-%d", BlockStates, chain, constant.KVPrefixProgressCatchup, start, end) -} - type blockStore struct { store infra.KVStore } @@ -81,11 +71,6 @@ type Store interface { SaveFailedBlocks(chainName string, blockNumbers []uint64) error RemoveFailedBlocks(chainName string, blockNumbers []uint64) error - SaveCatchupProgress(chain string, start, end, current uint64) error - SaveCatchupRanges(chain string, ranges []CatchupRange) error - GetCatchupProgress(chain string) ([]CatchupRange, error) - DeleteCatchupRange(chain string, start, end uint64) error - // Block hash persistence for reorg detection across restarts SaveBlockHashes(chainName string, hashes []BlockHashEntry) error GetBlockHashes(chainName string) ([]BlockHashEntry, error) @@ -213,101 +198,6 @@ func (bs *blockStore) RemoveFailedBlocks(chainName string, blockNumbers []uint64 return bs.store.SetAny(key, filtered) } -// SaveCatchupProgress saves or updates a catchup range with current progress. -func (bs *blockStore) SaveCatchupProgress(chain string, start, end, current uint64) error { - if chain == "" || start == 0 || end < start { - return errors.New("invalid catchup range") - } - key := catchupKey(chain, start, end) - logger.Debug("Saving catchup progress to store", - "chain", chain, - "range", fmt.Sprintf("%d-%d", start, end), - "current", current, - ) - return bs.store.Set(key, fmt.Sprintf("%d", current)) -} - -// SaveCatchupRanges batch-writes multiple catchup ranges atomically. -func (bs *blockStore) SaveCatchupRanges(chain string, ranges []CatchupRange) error { - if chain == "" { - return errors.New("chain name is required") - } - if len(ranges) == 0 { - return nil - } - - pairs := make([]infra.KVPair, 0, len(ranges)) - for _, r := range ranges { - if r.Start == 0 || r.End < r.Start { - continue - } - pairs = append(pairs, infra.KVPair{ - Key: catchupKey(chain, r.Start, r.End), - Value: []byte(fmt.Sprintf("%d", r.Current)), - }) - } - - if len(pairs) == 0 { - return nil - } - - logger.Debug("Batch saving catchup ranges", - "chain", chain, - "count", len(pairs), - ) - return bs.store.BatchSet(pairs) -} - -// GetCatchupProgress returns all catchup ranges (struct-based). -func (bs *blockStore) GetCatchupProgress(chain string) ([]CatchupRange, error) { - if chain == "" { - return nil, errors.New("chain name is required") - } - prefix := composeCatchupKey(chain) - kvs, err := bs.store.List(prefix) - if err != nil { - return nil, err - } - - var ranges []CatchupRange - for _, kv := range kvs { - s, e := extractRangeFromKey(kv.Key) - if s == 0 || e == 0 { - continue - } - cur, _ := strconv.ParseUint(string(kv.Value), 10, 64) - ranges = append(ranges, CatchupRange{Start: s, End: e, Current: cur}) - logger.Debug("Found catchup range in store", - "chain", chain, - "range", fmt.Sprintf("%d-%d", s, e), - "current", cur, - ) - } - logger.Info("Loaded catchup progress from store", - "chain", chain, - "ranges_count", len(ranges), - ) - return ranges, nil -} - -// DeleteCatchupRange removes a saved range. -func (bs *blockStore) DeleteCatchupRange(chain string, start, end uint64) error { - if chain == "" || start == 0 || end < start { - return nil - } - key := catchupKey(chain, start, end) - err := bs.store.Delete(key) - if err != nil { - logger.Error("Failed to delete catchup range from store", - "chain", chain, - "range", fmt.Sprintf("%d-%d", start, end), - "key", key, - "error", err, - ) - } - return err -} - // SaveBlockHashes persists block hashes for reorg detection across restarts. func (bs *blockStore) SaveBlockHashes(chainName string, hashes []BlockHashEntry) error { if chainName == "" { @@ -339,21 +229,3 @@ func (bs *blockStore) GetBlockHashes(chainName string) ([]BlockHashEntry, error) func (bs *blockStore) Close() error { return bs.store.Close() } - -func extractRangeFromKey(key string) (uint64, uint64) { - // /catchup/- - parts := strings.Split(key, "/") - if len(parts) < 4 { - return 0, 0 - } - se := strings.Split(parts[len(parts)-1], "-") - if len(se) != 2 { - return 0, 0 - } - s, err1 := strconv.ParseUint(se[0], 10, 64) - e, err2 := strconv.ParseUint(se[1], 10, 64) - if err1 == nil && err2 == nil && s <= e { - return s, e - } - return 0, 0 -} diff --git a/pkg/store/catchupstore/store.go b/pkg/store/catchupstore/store.go new file mode 100644 index 0000000..5cff1db --- /dev/null +++ b/pkg/store/catchupstore/store.go @@ -0,0 +1,460 @@ +package catchupstore + +import ( + "context" + "errors" + "fmt" + "slices" + "strconv" + "strings" + "time" + + "github.com/fystack/multichain-indexer/pkg/infra" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/redis/go-redis/v9" +) + +const ( + catchupProgressKeyPrefix = "catchup_progress" + catchupLockKeyPrefix = "catchup_lock" + defaultTimeout = 5 * time.Second + lockTimeout = 3 * time.Second +) + +const addRangesScript = ` + local key = KEYS[1] + local newStart = tonumber(ARGV[1]) + local newEnd = tonumber(ARGV[2]) + local newCurrent = tonumber(ARGV[3]) + + if not newStart or not newEnd or newStart <= 0 or newEnd < newStart then + return redis.error_reply("invalid catchup range") + end + + if newCurrent > newEnd then + newCurrent = newEnd + end + if newCurrent + 1 < newStart then + newCurrent = newStart - 1 + end + + local mergedStart = newStart + local mergedEnd = newEnd + local mergedCurrent = newCurrent + local entries = redis.call('HGETALL', key) + + for i = 1, #entries, 2 do + local field = entries[i] + local value = entries[i + 1] + local existingStart, existingEnd = string.match(field, '^(%d+)%-(%d+)$') + if existingStart and existingEnd then + existingStart = tonumber(existingStart) + existingEnd = tonumber(existingEnd) + local existingCurrent = tonumber(value) + + if existingCurrent then + if existingCurrent > existingEnd then + existingCurrent = existingEnd + end + if existingCurrent + 1 < existingStart then + existingCurrent = existingStart - 1 + end + + if existingStart <= mergedEnd + 1 and existingEnd + 1 >= mergedStart then + if existingStart < mergedStart then + mergedStart = existingStart + end + if existingEnd > mergedEnd then + mergedEnd = existingEnd + end + if existingCurrent < mergedCurrent then + mergedCurrent = existingCurrent + end + redis.call('HDEL', key, field) + end + end + end + end + + if mergedCurrent > mergedEnd then + mergedCurrent = mergedEnd + end + if mergedCurrent + 1 < mergedStart then + mergedCurrent = mergedStart - 1 + end + + local mergedField = tostring(mergedStart) .. '-' .. tostring(mergedEnd) + redis.call('HSET', key, mergedField, tostring(mergedCurrent)) + return {mergedStart, mergedEnd, mergedCurrent} +` + +const claimRangeScript = ` + local key = KEYS[1] + local lockPrefix = ARGV[1] + local lockExpiration = tonumber(ARGV[2]) + + local entries = redis.call('HGETALL', key) + for i = 1, #entries, 2 do + local field = entries[i] + local value = entries[i + 1] + local startText, endText = string.match(field, '^(%d+)%-(%d+)$') + if startText and endText then + local startNum = tonumber(startText) + local endNum = tonumber(endText) + local currentNum = tonumber(value) + + if startNum and endNum and currentNum then + if currentNum > endNum then + currentNum = endNum + end + if currentNum + 1 < startNum then + currentNum = startNum - 1 + end + + local lockKey = lockPrefix .. field + local locked = redis.call('SET', lockKey, 'locked', 'NX', 'EX', lockExpiration) + if locked then + return {startNum, endNum, currentNum} + end + end + end + end + + return nil +` + +type Store interface { + SaveRanges(ctx context.Context, chain string, ranges []blockstore.CatchupRange) error + SaveProgress(ctx context.Context, chain string, start, end, current uint64) error + GetProgress(ctx context.Context, chain string) ([]blockstore.CatchupRange, error) + GetNextRange(ctx context.Context, chain string) (*blockstore.CatchupRange, error) + DeleteRange(ctx context.Context, chain string, start, end uint64) error +} + +type noopStore struct{} + +type catchupStore struct { + redisClient infra.RedisClient + addScript *redis.Script + claimScript *redis.Script +} + +func New(redisClient infra.RedisClient) Store { + if redisClient == nil || redisClient.GetClient() == nil { + return noopStore{} + } + return &catchupStore{ + redisClient: redisClient, + addScript: redis.NewScript(addRangesScript), + claimScript: redis.NewScript(claimRangeScript), + } +} + +func (noopStore) SaveRanges(context.Context, string, []blockstore.CatchupRange) error { return nil } +func (noopStore) SaveProgress(context.Context, string, uint64, uint64, uint64) error { return nil } +func (noopStore) GetProgress(context.Context, string) ([]blockstore.CatchupRange, error) { + return nil, nil +} +func (noopStore) GetNextRange(context.Context, string) (*blockstore.CatchupRange, error) { + return nil, nil +} +func (noopStore) DeleteRange(context.Context, string, uint64, uint64) error { return nil } + +func composeKey(chain string) string { + return fmt.Sprintf("%s:%s", catchupProgressKeyPrefix, chain) +} + +func composeField(start, end uint64) string { + return fmt.Sprintf("%d-%d", start, end) +} + +func composeLockKey(chain string, start, end uint64) string { + return fmt.Sprintf("%s:%s:%d-%d", catchupLockKeyPrefix, chain, start, end) +} + +func parseField(field string) (uint64, uint64, bool) { + parts := strings.Split(field, "-") + if len(parts) != 2 { + return 0, 0, false + } + + start, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return 0, 0, false + } + end, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil || end < start || start == 0 { + return 0, 0, false + } + return start, end, true +} + +func (s *catchupStore) SaveRanges( + ctx context.Context, + chain string, + ranges []blockstore.CatchupRange, +) error { + if chain == "" { + return errors.New("chain name is required") + } + if len(ranges) == 0 { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + key := composeKey(chain) + for _, r := range normalizeRanges(ranges) { + if _, err := s.addScript.Run( + ctx, + s.redisClient.GetClient(), + []string{key}, + r.Start, + r.End, + r.Current, + ).Result(); err != nil { + return fmt.Errorf("save catchup ranges: %w", err) + } + } + + return nil +} + +func (s *catchupStore) SaveProgress( + ctx context.Context, + chain string, + start, end, current uint64, +) error { + if chain == "" || start == 0 || end < start { + return errors.New("invalid catchup range") + } + + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + if err := s.redisClient.GetClient(). + HSet(ctx, composeKey(chain), composeField(start, end), strconv.FormatUint(current, 10)). + Err(); err != nil { + return fmt.Errorf("save catchup progress: %w", err) + } + + return nil +} + +func (s *catchupStore) GetProgress( + ctx context.Context, + chain string, +) ([]blockstore.CatchupRange, error) { + if chain == "" { + return nil, errors.New("chain name is required") + } + + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + values, err := s.redisClient.GetClient().HGetAll(ctx, composeKey(chain)).Result() + if err != nil { + return nil, fmt.Errorf("get catchup progress: %w", err) + } + + return parseProgressMap(values), nil +} + +func (s *catchupStore) GetNextRange( + ctx context.Context, + chain string, +) (*blockstore.CatchupRange, error) { + if chain == "" { + return nil, errors.New("chain name is required") + } + + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + result, err := s.claimScript.Run( + ctx, + s.redisClient.GetClient(), + []string{composeKey(chain)}, + fmt.Sprintf("%s:%s:", catchupLockKeyPrefix, chain), + int(lockTimeout.Seconds()), + ).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil, nil + } + return nil, fmt.Errorf("claim catchup range: %w", err) + } + + if result == nil { + return nil, nil + } + + values, ok := result.([]interface{}) + if !ok || len(values) != 3 { + return nil, fmt.Errorf("unexpected claim result type: %T", result) + } + + start, ok := toUint64(values[0]) + if !ok { + return nil, fmt.Errorf("invalid claim start type: %T", values[0]) + } + end, ok := toUint64(values[1]) + if !ok { + return nil, fmt.Errorf("invalid claim end type: %T", values[1]) + } + current, ok := toUint64(values[2]) + if !ok { + return nil, fmt.Errorf("invalid claim current type: %T", values[2]) + } + + claimed := blockstore.CatchupRange{Start: start, End: end, Current: current} + return &claimed, nil +} + +func (s *catchupStore) DeleteRange( + ctx context.Context, + chain string, + start, end uint64, +) error { + if chain == "" || start == 0 || end < start { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + key := composeKey(chain) + field := composeField(start, end) + pipe := s.redisClient.GetClient().Pipeline() + delCmd := pipe.HDel(ctx, key, field) + lenCmd := pipe.HLen(ctx, key) + pipe.Del(ctx, composeLockKey(chain, start, end)) + if _, err := pipe.Exec(ctx); err != nil { + return fmt.Errorf("delete catchup range: %w", err) + } + if delCmd.Val() == 0 { + return nil + } + if lenCmd.Val() == 0 { + if err := s.redisClient.GetClient().Del(ctx, key).Err(); err != nil { + return fmt.Errorf("cleanup catchup key: %w", err) + } + } + + return nil +} + +func parseProgressMap(values map[string]string) []blockstore.CatchupRange { + ranges := make([]blockstore.CatchupRange, 0, len(values)) + for field, currentText := range values { + start, end, ok := parseField(field) + if !ok { + continue + } + current, err := strconv.ParseUint(currentText, 10, 64) + if err != nil { + continue + } + if current > end { + current = end + } + ranges = append(ranges, blockstore.CatchupRange{ + Start: start, + End: end, + Current: current, + }) + } + + slices.SortFunc(ranges, func(a, b blockstore.CatchupRange) int { + switch { + case a.Start < b.Start: + return -1 + case a.Start > b.Start: + return 1 + case a.End < b.End: + return -1 + case a.End > b.End: + return 1 + default: + return 0 + } + }) + + return ranges +} + +func normalizeRanges(ranges []blockstore.CatchupRange) []blockstore.CatchupRange { + filtered := make([]blockstore.CatchupRange, 0, len(ranges)) + for _, r := range ranges { + if r.Start == 0 || r.End < r.Start { + continue + } + if r.Current > r.End { + r.Current = r.End + } + if r.Current+1 < r.Start { + r.Current = r.Start - 1 + } + filtered = append(filtered, r) + } + if len(filtered) == 0 { + return nil + } + + slices.SortFunc(filtered, func(a, b blockstore.CatchupRange) int { + switch { + case a.Start < b.Start: + return -1 + case a.Start > b.Start: + return 1 + case a.End < b.End: + return -1 + case a.End > b.End: + return 1 + default: + return 0 + } + }) + + merged := []blockstore.CatchupRange{filtered[0]} + for _, next := range filtered[1:] { + last := &merged[len(merged)-1] + if next.Start > last.End+1 { + merged = append(merged, next) + continue + } + + if next.End > last.End { + last.End = next.End + } + if next.Current < last.Current { + last.Current = next.Current + } + if last.Current+1 < last.Start { + last.Current = last.Start - 1 + } + if last.Current > last.End { + last.Current = last.End + } + } + + return merged +} + +func toUint64(v interface{}) (uint64, bool) { + switch n := v.(type) { + case int64: + return uint64(n), true + case uint64: + return n, true + case string: + parsed, err := strconv.ParseUint(n, 10, 64) + return parsed, err == nil + case []byte: + parsed, err := strconv.ParseUint(string(n), 10, 64) + return parsed, err == nil + default: + return 0, false + } +} diff --git a/pkg/store/catchupstore/store_test.go b/pkg/store/catchupstore/store_test.go new file mode 100644 index 0000000..23f324c --- /dev/null +++ b/pkg/store/catchupstore/store_test.go @@ -0,0 +1,278 @@ +package catchupstore + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +func TestCatchupStoreRoundTripAndDelete(t *testing.T) { + t.Parallel() + + client, cleanup := setupTestRedis(t) + defer cleanup() + + store := New(&realRedisClient{client: client}) + ctx := context.Background() + + err := store.SaveRanges(ctx, "eth", []blockstore.CatchupRange{ + {Start: 21, End: 30, Current: 20}, + {Start: 1, End: 10, Current: 5}, + }) + require.NoError(t, err) + + err = store.SaveProgress(ctx, "eth", 21, 30, 25) + require.NoError(t, err) + + ranges, err := store.GetProgress(ctx, "eth") + require.NoError(t, err) + require.Equal(t, []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 5}, + {Start: 21, End: 30, Current: 25}, + }, ranges) + + err = store.DeleteRange(ctx, "eth", 1, 10) + require.NoError(t, err) + + ranges, err = store.GetProgress(ctx, "eth") + require.NoError(t, err) + require.Equal(t, []blockstore.CatchupRange{ + {Start: 21, End: 30, Current: 25}, + }, ranges) + + err = store.DeleteRange(ctx, "eth", 21, 30) + require.NoError(t, err) + + ranges, err = store.GetProgress(ctx, "eth") + require.NoError(t, err) + require.Empty(t, ranges) + + exists, err := client.Exists(ctx, composeKey("eth")).Result() + require.NoError(t, err) + require.Zero(t, exists) +} + +func TestCatchupStoreSkipsMalformedEntries(t *testing.T) { + t.Parallel() + + client, cleanup := setupTestRedis(t) + defer cleanup() + + ctx := context.Background() + require.NoError(t, client.HSet(ctx, composeKey("eth"), + "bad", "1", + "5-4", "2", + "10-20", "NaN", + "1-9", "3", + ).Err()) + + store := New(&realRedisClient{client: client}) + ranges, err := store.GetProgress(ctx, "eth") + require.NoError(t, err) + require.Equal(t, []blockstore.CatchupRange{ + {Start: 1, End: 9, Current: 3}, + }, ranges) +} + +func TestCatchupStoreSaveRangesMergesAdjacentAndOverlappingRanges(t *testing.T) { + t.Parallel() + + client, cleanup := setupTestRedis(t) + defer cleanup() + + store := New(&realRedisClient{client: client}) + ctx := context.Background() + + err := store.SaveRanges(ctx, "apt", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 5}, + {Start: 11, End: 20, Current: 10}, + {Start: 8, End: 15, Current: 7}, + {Start: 30, End: 40, Current: 29}, + }) + require.NoError(t, err) + + ranges, err := store.GetProgress(ctx, "apt") + require.NoError(t, err) + require.Equal(t, []blockstore.CatchupRange{ + {Start: 1, End: 20, Current: 5}, + {Start: 30, End: 40, Current: 29}, + }, ranges) +} + +func TestCatchupStoreSaveRangesMergesWithExistingRedisRanges(t *testing.T) { + t.Parallel() + + client, cleanup := setupTestRedis(t) + defer cleanup() + + store := New(&realRedisClient{client: client}) + ctx := context.Background() + + err := store.SaveRanges(ctx, "apt", []blockstore.CatchupRange{ + {Start: 100, End: 120, Current: 110}, + }) + require.NoError(t, err) + + err = store.SaveRanges(ctx, "apt", []blockstore.CatchupRange{ + {Start: 121, End: 140, Current: 120}, + {Start: 130, End: 150, Current: 129}, + }) + require.NoError(t, err) + + ranges, err := store.GetProgress(ctx, "apt") + require.NoError(t, err) + require.Equal(t, []blockstore.CatchupRange{ + {Start: 100, End: 150, Current: 110}, + }, ranges) +} + +func TestCatchupStoreGetNextRangeClaimsOneRange(t *testing.T) { + t.Parallel() + + client, cleanup := setupTestRedis(t) + defer cleanup() + + store := New(&realRedisClient{client: client}) + ctx := context.Background() + + err := store.SaveRanges(ctx, "apt", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 0}, + {Start: 11, End: 20, Current: 10}, + }) + require.NoError(t, err) + + claimed, err := store.GetNextRange(ctx, "apt") + require.NoError(t, err) + require.NotNil(t, claimed) + require.Equal(t, uint64(1), claimed.Start) + require.Equal(t, uint64(10), claimed.End) + + claimed2, err := store.GetNextRange(ctx, "apt") + require.NoError(t, err) + require.NotNil(t, claimed2) + require.Equal(t, uint64(11), claimed2.Start) + require.Equal(t, uint64(20), claimed2.End) + + claimed3, err := store.GetNextRange(ctx, "apt") + require.NoError(t, err) + require.Nil(t, claimed3) +} + +func TestCatchupStoreGetNextRangeClaimsDistinctRangesConcurrently(t *testing.T) { + t.Parallel() + + client, cleanup := setupTestRedis(t) + defer cleanup() + + store := New(&realRedisClient{client: client}) + ctx := context.Background() + + err := store.SaveRanges(ctx, "apt", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 0}, + {Start: 11, End: 20, Current: 10}, + }) + require.NoError(t, err) + + type claimResult struct { + rng *blockstore.CatchupRange + err error + } + + results := make(chan claimResult, 2) + var wg sync.WaitGroup + wg.Add(2) + for i := 0; i < 2; i++ { + go func() { + defer wg.Done() + rng, err := store.GetNextRange(ctx, "apt") + results <- claimResult{rng: rng, err: err} + }() + } + + wg.Wait() + close(results) + + claimed := make(map[uint64]struct{}) + for res := range results { + require.NoError(t, res.err) + require.NotNil(t, res.rng) + claimed[res.rng.Start] = struct{}{} + } + + require.Len(t, claimed, 2) + _, firstOK := claimed[1] + _, secondOK := claimed[11] + require.True(t, firstOK) + require.True(t, secondOK) +} + +type realRedisClient struct { + client *redis.Client +} + +func (r *realRedisClient) GetClient() *redis.Client { + return r.client +} + +func (r *realRedisClient) Set(key string, value any, expiration time.Duration) error { + return r.client.Set(context.Background(), key, value, expiration).Err() +} + +func (r *realRedisClient) Get(key string) (string, error) { + return r.client.Get(context.Background(), key).Result() +} + +func (r *realRedisClient) Del(keys ...string) error { + return r.client.Del(context.Background(), keys...).Err() +} + +func (r *realRedisClient) ZAdd(key string, members ...redis.Z) error { + return r.client.ZAdd(context.Background(), key, members...).Err() +} + +func (r *realRedisClient) ZRem(key string, members ...interface{}) error { + return r.client.ZRem(context.Background(), key, members...).Err() +} + +func (r *realRedisClient) ZRange(key string, start, stop int64) ([]string, error) { + return r.client.ZRange(context.Background(), key, start, stop).Result() +} + +func (r *realRedisClient) ZRangeWithScores(key string, start, stop int64) ([]redis.Z, error) { + return r.client.ZRangeWithScores(context.Background(), key, start, stop).Result() +} + +func (r *realRedisClient) ZRevRangeWithScores(key string, start, stop int64) ([]redis.Z, error) { + return r.client.ZRevRangeWithScores(context.Background(), key, start, stop).Result() +} + +func (r *realRedisClient) Close() error { + return r.client.Close() +} + +func setupTestRedis(t *testing.T) (*redis.Client, func()) { + t.Helper() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 14, + }) + + ctx := context.Background() + if _, err := client.Ping(ctx).Result(); err != nil { + t.Skip("Redis not available") + } + require.NoError(t, client.FlushDB(ctx).Err()) + + cleanup := func() { + _ = client.FlushDB(ctx).Err() + _ = client.Close() + } + + return client, cleanup +}