Skip to content

Commit 0a8e542

Browse files
committed
remove worker pool
1 parent f662a50 commit 0a8e542

9 files changed

Lines changed: 285 additions & 291 deletions

block/internal/syncing/da_retrieval_worker_pool.go

Lines changed: 0 additions & 113 deletions
This file was deleted.

block/internal/syncing/da_retrieval_worker_pool_test.go

Lines changed: 0 additions & 141 deletions
This file was deleted.

block/internal/syncing/da_retriever.go

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"slices"
9+
"sync"
810

911
"github.com/rs/zerolog"
1012
"google.golang.org/protobuf/proto"
@@ -20,7 +22,13 @@ import (
2022

2123
// DARetriever defines the interface for retrieving events from the DA layer
2224
type DARetriever interface {
25+
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
2326
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
27+
// QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints).
28+
// These heights take precedence over sequential fetching.
29+
QueuePriorityHeight(daHeight uint64)
30+
// PopPriorityHeight returns the next priority height to fetch, or 0 if none.
31+
PopPriorityHeight() uint64
2432
}
2533

2634
// daRetriever handles DA retrieval operations for syncing
@@ -38,6 +46,12 @@ type daRetriever struct {
3846
// strictMode indicates if the node has seen a valid DAHeaderEnvelope
3947
// and should now reject all legacy/unsigned headers.
4048
strictMode bool
49+
50+
// priorityMu protects priorityHeights from concurrent access
51+
priorityMu sync.Mutex
52+
// priorityHeights holds DA heights from P2P hints that should be fetched
53+
// before continuing sequential retrieval. Sorted in ascending order.
54+
priorityHeights []uint64
4155
}
4256

4357
// NewDARetriever creates a new DA retriever
@@ -48,14 +62,46 @@ func NewDARetriever(
4862
logger zerolog.Logger,
4963
) *daRetriever {
5064
return &daRetriever{
51-
client: client,
52-
cache: cache,
53-
genesis: genesis,
54-
logger: logger.With().Str("component", "da_retriever").Logger(),
55-
pendingHeaders: make(map[uint64]*types.SignedHeader),
56-
pendingData: make(map[uint64]*types.Data),
57-
strictMode: false,
65+
client: client,
66+
cache: cache,
67+
genesis: genesis,
68+
logger: logger.With().Str("component", "da_retriever").Logger(),
69+
pendingHeaders: make(map[uint64]*types.SignedHeader),
70+
pendingData: make(map[uint64]*types.Data),
71+
strictMode: false,
72+
priorityHeights: make([]uint64, 0),
73+
}
74+
}
75+
76+
// QueuePriorityHeight queues a DA height for priority retrieval.
77+
// Heights from P2P hints take precedence over sequential fetching.
78+
func (r *daRetriever) QueuePriorityHeight(daHeight uint64) {
79+
r.priorityMu.Lock()
80+
defer r.priorityMu.Unlock()
81+
82+
// Skip if already queued
83+
if slices.Contains(r.priorityHeights, daHeight) {
84+
return
85+
}
86+
87+
r.priorityHeights = append(r.priorityHeights, daHeight)
88+
// Keep sorted in ascending order so we process lower heights first
89+
slices.Sort(r.priorityHeights)
90+
}
91+
92+
// PopPriorityHeight returns the next priority height to fetch, or 0 if none.
93+
func (r *daRetriever) PopPriorityHeight() uint64 {
94+
r.priorityMu.Lock()
95+
defer r.priorityMu.Unlock()
96+
97+
if len(r.priorityHeights) == 0 {
98+
return 0
5899
}
100+
101+
height := r.priorityHeights[0]
102+
r.priorityHeights = r.priorityHeights[1:]
103+
104+
return height
59105
}
60106

61107
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events

0 commit comments

Comments
 (0)