Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 24 additions & 58 deletions internal/worker/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (cw *CatchupWorker) Start() {
}

// runCatchup is a tight loop that processes catchup ranges without PollInterval delays.
// Unlike the base run() method, it exits once all ranges are processed.
// When all ranges are processed, it polls for new ranges that may be created by the
// regular worker's skipAheadIfLagging.
func (cw *CatchupWorker) runCatchup() {
for {
select {
Expand All @@ -101,76 +102,41 @@ func (cw *CatchupWorker) runCatchup() {
continue
}

// If no ranges remain, catchup is done
// If no ranges remain, poll for new ranges that may have been
// created by the regular worker (e.g. via skipAheadIfLagging).
if len(cw.blockRanges) == 0 {
cw.logger.Info("Catchup completed, no more ranges to process",
"chain", cw.chain.GetName(),
)
return
select {
case <-cw.ctx.Done():
return
case <-time.After(5 * time.Second):
cw.blockRanges = cw.loadCatchupProgress()
}
}
}
}

func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange {
registry := status.EnsureStatusRegistry(cw.statusRegistry)
var ranges []blockstore.CatchupRange

// Load existing catchup ranges from database (they're already split when saved)
if progress, err := cw.blockStore.GetCatchupProgress(cw.chain.GetNetworkInternalCode()); err == nil {
cw.logger.Info("Loading existing catchup progress",
"chain", cw.chain.GetName(),
"progress_ranges", len(progress),
)
ranges = progress
registry.SetCatchupRanges(cw.chain.GetName(), progress)
} else {
cw.logger.Warn("Failed to load catchup progress, will create new range",
// Load existing catchup ranges from the store. The catchup worker only loads
// ranges; creating new ranges is the responsibility of the regular worker
// (via determineStartingBlock or skipAheadIfLagging).
progress, err := cw.blockStore.GetCatchupProgress(cw.chain.GetNetworkInternalCode())
if err != nil {
cw.logger.Warn("Failed to load catchup progress",
"chain", cw.chain.GetName(),
"error", err,
)
registry.SetCatchupRanges(cw.chain.GetName(), nil)
return nil
}

// Only create a new range if no existing ranges found
if len(ranges) == 0 {
if latest, err1 := cw.blockStore.GetLatestBlock(cw.chain.GetNetworkInternalCode()); err1 == nil {
if head, err2 := cw.chain.GetLatestBlockNumber(cw.ctx); err2 == nil && head > latest {
if head <= latest {
// no gap between head and latest
return ranges
}
start, end := latest+1, head
cw.logger.Info("Creating new catchup range",
"chain", cw.chain.GetName(),
"latest_block", latest,
"head_block", head,
"catchup_start", start, "catchup_end", end,
"blocks_to_catchup", end-latest,
)

// Split new range if it's too large
newRanges := cw.splitLargeRange(blockstore.CatchupRange{
Start: start, End: end, Current: start - 1,
})

// Batch save all split ranges to database
if err := cw.blockStore.SaveCatchupRanges(
cw.chain.GetNetworkInternalCode(),
newRanges,
); err != nil {
cw.logger.Error("Failed to batch save catchup ranges",
"chain", cw.chain.GetName(),
"count", len(newRanges),
"error", err,
)
} else {
registry.UpsertCatchupRanges(cw.chain.GetName(), newRanges)
}
ranges = append(ranges, newRanges...)
}
}
}

return ranges
cw.logger.Info("Loaded catchup progress",
"chain", cw.chain.GetName(),
"progress_ranges", len(progress),
)
registry.SetCatchupRanges(cw.chain.GetName(), progress)
return progress
}

// Split large ranges into smaller, more manageable chunks
Expand Down
82 changes: 82 additions & 0 deletions internal/worker/catchup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package worker

import (
"context"
"io"
"log/slog"
"testing"
"time"

"github.com/fystack/multichain-indexer/internal/status"
"github.com/fystack/multichain-indexer/pkg/common/config"
"github.com/fystack/multichain-indexer/pkg/common/enum"
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
"github.com/stretchr/testify/require"
)

func TestCatchupWorkerPollsForNewRangesInsteadOfExiting(t *testing.T) {
t.Parallel()

statusRegistry := status.NewRegistry()
statusRegistry.RegisterChain("aptos", "aptos_testnet", config.ChainConfig{
NetworkId: "aptos_testnet",
InternalCode: "APTOS_TESTNET",
Type: enum.NetworkTypeApt,
})

store := &stubBlockStore{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cw := &CatchupWorker{
BaseWorker: &BaseWorker{
ctx: ctx,
cancel: cancel,
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
config: config.ChainConfig{
PollInterval: time.Millisecond,
Throttle: config.Throttle{BatchSize: 20},
},
chain: &stubIndexer{name: "aptos", internalCode: "APTOS_TESTNET", networkType: enum.NetworkTypeApt, latest: 100},
blockStore: store,
statusRegistry: statusRegistry,
},
blockRanges: []blockstore.CatchupRange{},
workerPool: make(chan struct{}, CATCHUP_WORKERS),
}

// Simulate: catchup worker starts with no ranges (empty after state wipe).
// After a short delay, the regular worker saves new catchup ranges to the store.
// The catchup worker should poll and pick them up instead of exiting.
go func() {
time.Sleep(100 * time.Millisecond)
// Simulate regular worker creating catchup ranges in the store
store.catchupProgress = []blockstore.CatchupRange{
{Start: 50, End: 69, Current: 49},
{Start: 70, End: 89, Current: 69},
}
// Give the catchup worker time to poll and load the ranges
time.Sleep(200 * time.Millisecond)
cancel()
}()

// runCatchup should NOT exit immediately when blockRanges is empty.
// It should poll, pick up the new ranges, and only stop when ctx is cancelled.
done := make(chan struct{})
go func() {
cw.runCatchup()
close(done)
}()

select {
case <-done:
// Verify the catchup worker loaded the new ranges before exiting
resp := statusRegistry.Snapshot("1.0.0")
require.Len(t, resp.Networks, 1)
// The worker should have picked up ranges from the store
// (it may have processed and completed them, or they may still be pending)
case <-time.After(5 * time.Second):
cancel()
t.Fatal("catchup worker did not exit after context cancellation")
}
}
Loading