-
Notifications
You must be signed in to change notification settings - Fork 268
Expand file tree
/
Copy pathp2p_handler.go
More file actions
135 lines (118 loc) · 4.23 KB
/
Copy pathp2p_handler.go
File metadata and controls
135 lines (118 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package syncing
import (
"bytes"
"context"
"fmt"
"sync/atomic"
"github.com/celestiaorg/go-header"
"github.com/rs/zerolog"
"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/types"
)
type p2pHandler interface {
ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error
SetProcessedHeight(height uint64)
}
// P2PHandler coordinates block retrieval from P2P stores for the syncer.
// It waits for both header and data to be available at a given height,
// validates their consistency, and emits events to the syncer for processing.
//
// The handler maintains a processedHeight to track the highest block that has been
// successfully validated and sent to the syncer, preventing duplicate processing.
type P2PHandler struct {
headerStore header.Store[*types.P2PSignedHeader]
dataStore header.Store[*types.P2PData]
cache cache.CacheManager
genesis genesis.Genesis
logger zerolog.Logger
processedHeight atomic.Uint64
}
// NewP2PHandler creates a new P2P handler.
func NewP2PHandler(
headerStore header.Store[*types.P2PSignedHeader],
dataStore header.Store[*types.P2PData],
cache cache.CacheManager,
genesis genesis.Genesis,
logger zerolog.Logger,
) *P2PHandler {
return &P2PHandler{
headerStore: headerStore,
dataStore: dataStore,
cache: cache,
genesis: genesis,
logger: logger.With().Str("component", "p2p_handler").Logger(),
}
}
// SetProcessedHeight updates the highest processed block height.
func (h *P2PHandler) SetProcessedHeight(height uint64) {
for range 1_000 {
current := h.processedHeight.Load()
if height <= current {
return
}
if h.processedHeight.CompareAndSwap(current, height) {
return
}
}
}
// ProcessHeight retrieves and validates both header and data for the given height from P2P stores.
// It blocks until both are available, validates consistency (proposer address and data hash match),
// then emits the event to heightInCh or stores it as pending. Updates processedHeight on success.
func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error {
if height <= h.processedHeight.Load() {
return nil
}
p2pHeader, err := h.headerStore.GetByHeight(ctx, height)
if err != nil {
if ctx.Err() == nil {
h.logger.Debug().Uint64("height", height).Err(err).Msg("header unavailable in store")
}
return err
}
if err := h.assertExpectedProposer(p2pHeader.ProposerAddress); err != nil {
h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P")
return err
}
p2pData, err := h.dataStore.GetByHeight(ctx, height)
if err != nil {
if ctx.Err() == nil {
h.logger.Debug().Uint64("height", height).Err(err).Msg("data unavailable in store")
}
return err
}
dataCommitment := p2pData.DACommitment()
if !bytes.Equal(p2pHeader.DataHash[:], dataCommitment[:]) {
err := fmt.Errorf("data hash mismatch: header %x, data %x", p2pHeader.DataHash, dataCommitment)
h.logger.Warn().Uint64("height", height).Err(err).Msg("discarding inconsistent block from P2P")
return err
}
// Memoize hash before the header enters the event pipeline so that downstream
// callers (processHeightEvent, TrySyncNextBlock) get cache hits.
p2pHeader.MemoizeHash()
// further header validation (signature) is done in validateBlock.
// we need to be sure that the previous block n-1 was executed before validating block n
event := common.DAHeightEvent{
Header: p2pHeader.SignedHeader,
Data: p2pData.Data,
Source: common.SourceP2P,
DaHeightHints: [2]uint64{p2pHeader.DAHint(), p2pData.DAHint()},
}
select {
case heightInCh <- event:
default:
h.cache.SetPendingEvent(event.Header.Height(), &event)
}
h.SetProcessedHeight(height)
h.logger.Debug().Uint64("height", height).Msg("processed event from P2P")
return nil
}
// assertExpectedProposer validates the proposer address.
func (h *P2PHandler) assertExpectedProposer(proposerAddr []byte) error {
if !bytes.Equal(h.genesis.ProposerAddress, proposerAddr) {
return fmt.Errorf("proposer address mismatch: got %x, expected %x",
proposerAddr, h.genesis.ProposerAddress)
}
return nil
}