diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index 2af7cdb..2ac36d7 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -77,7 +77,8 @@ func (cw *CatchupWorker) Start() { } // runCatchup is a tight loop that processes catchup ranges without PollInterval delays. -// Unlike the base run() method, it exits once all ranges are processed. +// When all ranges are processed, it polls for new ranges that may be created by the +// regular worker's skipAheadIfLagging. func (cw *CatchupWorker) runCatchup() { for { select { @@ -101,76 +102,41 @@ func (cw *CatchupWorker) runCatchup() { continue } - // If no ranges remain, catchup is done + // If no ranges remain, poll for new ranges that may have been + // created by the regular worker (e.g. via skipAheadIfLagging). if len(cw.blockRanges) == 0 { - cw.logger.Info("Catchup completed, no more ranges to process", - "chain", cw.chain.GetName(), - ) - return + select { + case <-cw.ctx.Done(): + return + case <-time.After(5 * time.Second): + cw.blockRanges = cw.loadCatchupProgress() + } } } } func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { registry := status.EnsureStatusRegistry(cw.statusRegistry) - var ranges []blockstore.CatchupRange - // Load existing catchup ranges from database (they're already split when saved) - if progress, err := cw.blockStore.GetCatchupProgress(cw.chain.GetNetworkInternalCode()); err == nil { - cw.logger.Info("Loading existing catchup progress", - "chain", cw.chain.GetName(), - "progress_ranges", len(progress), - ) - ranges = progress - registry.SetCatchupRanges(cw.chain.GetName(), progress) - } else { - cw.logger.Warn("Failed to load catchup progress, will create new range", + // Load existing catchup ranges from the store. The catchup worker only loads + // ranges; creating new ranges is the responsibility of the regular worker + // (via determineStartingBlock or skipAheadIfLagging). + progress, err := cw.blockStore.GetCatchupProgress(cw.chain.GetNetworkInternalCode()) + if err != nil { + cw.logger.Warn("Failed to load catchup progress", "chain", cw.chain.GetName(), "error", err, ) + registry.SetCatchupRanges(cw.chain.GetName(), nil) + return nil } - // Only create a new range if no existing ranges found - if len(ranges) == 0 { - if latest, err1 := cw.blockStore.GetLatestBlock(cw.chain.GetNetworkInternalCode()); err1 == nil { - if head, err2 := cw.chain.GetLatestBlockNumber(cw.ctx); err2 == nil && head > latest { - if head <= latest { - // no gap between head and latest - return ranges - } - start, end := latest+1, head - cw.logger.Info("Creating new catchup range", - "chain", cw.chain.GetName(), - "latest_block", latest, - "head_block", head, - "catchup_start", start, "catchup_end", end, - "blocks_to_catchup", end-latest, - ) - - // Split new range if it's too large - newRanges := cw.splitLargeRange(blockstore.CatchupRange{ - Start: start, End: end, Current: start - 1, - }) - - // Batch save all split ranges to database - if err := cw.blockStore.SaveCatchupRanges( - cw.chain.GetNetworkInternalCode(), - newRanges, - ); err != nil { - cw.logger.Error("Failed to batch save catchup ranges", - "chain", cw.chain.GetName(), - "count", len(newRanges), - "error", err, - ) - } else { - registry.UpsertCatchupRanges(cw.chain.GetName(), newRanges) - } - ranges = append(ranges, newRanges...) - } - } - } - - return ranges + cw.logger.Info("Loaded catchup progress", + "chain", cw.chain.GetName(), + "progress_ranges", len(progress), + ) + registry.SetCatchupRanges(cw.chain.GetName(), progress) + return progress } // Split large ranges into smaller, more manageable chunks diff --git a/internal/worker/catchup_test.go b/internal/worker/catchup_test.go new file mode 100644 index 0000000..8befd16 --- /dev/null +++ b/internal/worker/catchup_test.go @@ -0,0 +1,82 @@ +package worker + +import ( + "context" + "io" + "log/slog" + "testing" + "time" + + "github.com/fystack/multichain-indexer/internal/status" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/stretchr/testify/require" +) + +func TestCatchupWorkerPollsForNewRangesInsteadOfExiting(t *testing.T) { + t.Parallel() + + statusRegistry := status.NewRegistry() + statusRegistry.RegisterChain("aptos", "aptos_testnet", config.ChainConfig{ + NetworkId: "aptos_testnet", + InternalCode: "APTOS_TESTNET", + Type: enum.NetworkTypeApt, + }) + + store := &stubBlockStore{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cw := &CatchupWorker{ + BaseWorker: &BaseWorker{ + ctx: ctx, + cancel: cancel, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + config: config.ChainConfig{ + PollInterval: time.Millisecond, + Throttle: config.Throttle{BatchSize: 20}, + }, + chain: &stubIndexer{name: "aptos", internalCode: "APTOS_TESTNET", networkType: enum.NetworkTypeApt, latest: 100}, + blockStore: store, + statusRegistry: statusRegistry, + }, + blockRanges: []blockstore.CatchupRange{}, + workerPool: make(chan struct{}, CATCHUP_WORKERS), + } + + // Simulate: catchup worker starts with no ranges (empty after state wipe). + // After a short delay, the regular worker saves new catchup ranges to the store. + // The catchup worker should poll and pick them up instead of exiting. + go func() { + time.Sleep(100 * time.Millisecond) + // Simulate regular worker creating catchup ranges in the store + store.catchupProgress = []blockstore.CatchupRange{ + {Start: 50, End: 69, Current: 49}, + {Start: 70, End: 89, Current: 69}, + } + // Give the catchup worker time to poll and load the ranges + time.Sleep(200 * time.Millisecond) + cancel() + }() + + // runCatchup should NOT exit immediately when blockRanges is empty. + // It should poll, pick up the new ranges, and only stop when ctx is cancelled. + done := make(chan struct{}) + go func() { + cw.runCatchup() + close(done) + }() + + select { + case <-done: + // Verify the catchup worker loaded the new ranges before exiting + resp := statusRegistry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + // The worker should have picked up ranges from the store + // (it may have processed and completed them, or they may still be pending) + case <-time.After(5 * time.Second): + cancel() + t.Fatal("catchup worker did not exit after context cancellation") + } +}