Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/worker/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/fystack/multichain-indexer/pkg/infra"
"github.com/fystack/multichain-indexer/pkg/retry"
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
"github.com/fystack/multichain-indexer/pkg/store/catchupstore"
"github.com/fystack/multichain-indexer/pkg/store/pubkeystore"
)

Expand Down Expand Up @@ -44,6 +45,7 @@ type BaseWorker struct {
chain indexer.Indexer
kvstore infra.KVStore
blockStore blockstore.Store
catchupStore catchupstore.Store
pubkeyStore pubkeystore.Store
emitter events.Emitter
failedChan chan FailedBlockEvent
Expand All @@ -64,6 +66,7 @@ func newWorkerWithMode(
cfg config.ChainConfig,
kv infra.KVStore,
blockStore blockstore.Store,
catchupStore catchupstore.Store,
emitter events.Emitter,
pubkeyStore pubkeystore.Store,
mode WorkerMode,
Expand All @@ -85,6 +88,7 @@ func newWorkerWithMode(
chain: chain,
kvstore: kv,
blockStore: blockStore,
catchupStore: catchupStore,
pubkeyStore: pubkeyStore,
emitter: emitter,
failedChan: failedChan,
Expand Down
123 changes: 45 additions & 78 deletions internal/worker/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/fystack/multichain-indexer/pkg/events"
"github.com/fystack/multichain-indexer/pkg/infra"
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
"github.com/fystack/multichain-indexer/pkg/store/catchupstore"
"github.com/fystack/multichain-indexer/pkg/store/pubkeystore"
)

Expand All @@ -36,6 +37,7 @@ func NewCatchupWorker(
cfg config.ChainConfig,
kv infra.KVStore,
blockStore blockstore.Store,
catchupStore catchupstore.Store,
emitter events.Emitter,
pubkeyStore pubkeystore.Store,
failedChan chan FailedBlockEvent,
Expand All @@ -47,6 +49,7 @@ func NewCatchupWorker(
cfg,
kv,
blockStore,
catchupStore,
emitter,
pubkeyStore,
ModeCatchup,
Expand Down Expand Up @@ -112,65 +115,23 @@ func (cw *CatchupWorker) runCatchup() {
}

func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange {
registry := status.EnsureStatusRegistry(cw.statusRegistry)
var ranges []blockstore.CatchupRange

// Load existing catchup ranges from database (they're already split when saved)
if progress, err := cw.blockStore.GetCatchupProgress(cw.chain.GetNetworkInternalCode()); err == nil {
cw.logger.Info("Loading existing catchup progress",
"chain", cw.chain.GetName(),
"progress_ranges", len(progress),
)
ranges = progress
registry.SetCatchupRanges(cw.chain.GetName(), progress)
} else {
cw.logger.Warn("Failed to load catchup progress, will create new range",
// Load existing catchup ranges from the store. The catchup worker only loads
// ranges; creating new ranges is the responsibility of the regular worker
// (via determineStartingBlock or skipAheadIfLagging).
progress, err := cw.catchupStore.GetProgress(cw.ctx, cw.chain.GetNetworkInternalCode())
if err != nil {
cw.logger.Warn("Failed to load catchup progress",
"chain", cw.chain.GetName(),
"error", err,
)
return nil
}

// Only create a new range if no existing ranges found
if len(ranges) == 0 {
if latest, err1 := cw.blockStore.GetLatestBlock(cw.chain.GetNetworkInternalCode()); err1 == nil {
if head, err2 := cw.chain.GetLatestBlockNumber(cw.ctx); err2 == nil && head > latest {
if head <= latest {
// no gap between head and latest
return ranges
}
start, end := latest+1, head
cw.logger.Info("Creating new catchup range",
"chain", cw.chain.GetName(),
"latest_block", latest,
"head_block", head,
"catchup_start", start, "catchup_end", end,
"blocks_to_catchup", end-latest,
)

// Split new range if it's too large
newRanges := cw.splitLargeRange(blockstore.CatchupRange{
Start: start, End: end, Current: start - 1,
})

// Batch save all split ranges to database
if err := cw.blockStore.SaveCatchupRanges(
cw.chain.GetNetworkInternalCode(),
newRanges,
); err != nil {
cw.logger.Error("Failed to batch save catchup ranges",
"chain", cw.chain.GetName(),
"count", len(newRanges),
"error", err,
)
} else {
registry.UpsertCatchupRanges(cw.chain.GetName(), newRanges)
}
ranges = append(ranges, newRanges...)
}
}
}

return ranges
cw.logger.Info("Loaded catchup progress",
"chain", cw.chain.GetName(),
"progress_ranges", len(progress),
)
return progress
}

// Split large ranges into smaller, more manageable chunks
Expand All @@ -195,15 +156,8 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error {
return nil
}

// Process multiple ranges in parallel
// Claim multiple ranges in parallel so multiple instances can split work.
var wg sync.WaitGroup
rangeChan := make(chan blockstore.CatchupRange, len(cw.blockRanges))

// Fill channel with ranges
for _, r := range cw.blockRanges {
rangeChan <- r
}
close(rangeChan)

// Start parallel workers
for i := 0; i < CATCHUP_WORKERS; i++ {
Expand All @@ -213,7 +167,11 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error {
defer cw.recoverPanic(fmt.Sprintf("catchup range worker %d", workerID))
cw.logger.Debug("Starting catchup worker", "worker_id", workerID)

for r := range rangeChan {
for {
r, ok := cw.claimNextRange()
if !ok {
return
}
if err := cw.processRange(r, workerID); err != nil {
cw.logger.Error("Failed to process range",
"worker_id", workerID,
Expand All @@ -233,6 +191,27 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error {
return nil
}

func (cw *CatchupWorker) claimNextRange() (blockstore.CatchupRange, bool) {
claimed, err := cw.catchupStore.GetNextRange(cw.ctx, cw.chain.GetNetworkInternalCode())
if err != nil {
cw.logger.Warn("Failed to claim catchup range",
"chain", cw.chain.GetName(),
"error", err,
)
return blockstore.CatchupRange{}, false
}
if claimed == nil {
return blockstore.CatchupRange{}, false
}

cw.logger.Info("Claimed catchup range",
"chain", cw.chain.GetName(),
"range", fmt.Sprintf("%d-%d", claimed.Start, claimed.End),
"current", claimed.Current,
)
return *claimed, true
}

func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) error {
batchCount := 0
startTime := time.Now()
Expand Down Expand Up @@ -353,14 +332,13 @@ func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) e
func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) {
cw.progressMu.Lock()
defer cw.progressMu.Unlock()
registry := status.EnsureStatusRegistry(cw.statusRegistry)
cw.logger.Debug("Saving catchup progress",
"chain", cw.chain.GetName(),
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
"current", current,
)
current = min(current, r.End)
if err := cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil {
if err := cw.catchupStore.SaveProgress(cw.ctx, cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil {
cw.logger.Warn("Failed to save catchup progress",
"chain", cw.chain.GetName(),
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
Expand All @@ -369,11 +347,6 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64)
)
return
}
registry.UpsertCatchupRanges(cw.chain.GetName(), []blockstore.CatchupRange{{
Start: r.Start,
End: r.End,
Current: current,
}})
for i := range cw.blockRanges {
if cw.blockRanges[i].Start == r.Start && cw.blockRanges[i].End == r.End {
cw.blockRanges[i].Current = current
Expand All @@ -385,23 +358,20 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64)
func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error {
cw.progressMu.Lock()
defer cw.progressMu.Unlock()
registry := status.EnsureStatusRegistry(cw.statusRegistry)

cw.logger.Info("Completing catchup range",
"chain", cw.chain.GetName(),
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
)

if err := cw.blockStore.DeleteCatchupRange(cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil {
if err := cw.catchupStore.DeleteRange(cw.ctx, cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil {
cw.logger.Warn("Failed to delete catchup range",
"chain", cw.chain.GetName(),
"range", fmt.Sprintf("%d-%d", r.Start, r.End),
"error", err,
)
return err
}
registry.DeleteCatchupRange(cw.chain.GetName(), r.Start, r.End)

// Remove from local ranges
for i, existing := range cw.blockRanges {
if existing.Start == r.Start && existing.End == r.End {
Expand Down Expand Up @@ -440,15 +410,12 @@ func (cw *CatchupWorker) Close() error {
)
}

if err := cw.blockStore.SaveCatchupRanges(cw.chain.GetNetworkInternalCode(), rangesToSave); err != nil {
if err := cw.catchupStore.SaveRanges(cw.ctx, cw.chain.GetNetworkInternalCode(), rangesToSave); err != nil {
cw.logger.Error("Failed to batch save progress on close",
"chain", cw.chain.GetName(),
"ranges", len(rangesToSave),
"error", err,
)
} else {
registry := status.EnsureStatusRegistry(cw.statusRegistry)
registry.UpsertCatchupRanges(cw.chain.GetName(), rangesToSave)
}

return nil
Expand Down
Loading
Loading