Skip to content

Commit da54c68

Browse files
committed
test(syncing): add integration coverage for double-sign halt pipeline
1 parent 300adc2 commit da54c68

1 file changed

Lines changed: 317 additions & 0 deletions

File tree

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
package syncing
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
ds "github.com/ipfs/go-datastore"
11+
dssync "github.com/ipfs/go-datastore/sync"
12+
"github.com/libp2p/go-libp2p/core/crypto"
13+
"github.com/rs/zerolog"
14+
"github.com/stretchr/testify/mock"
15+
"github.com/stretchr/testify/require"
16+
17+
"github.com/evstack/ev-node/block/internal/cache"
18+
"github.com/evstack/ev-node/block/internal/common"
19+
"github.com/evstack/ev-node/pkg/config"
20+
"github.com/evstack/ev-node/pkg/genesis"
21+
signerpkg "github.com/evstack/ev-node/pkg/signer"
22+
"github.com/evstack/ev-node/pkg/store"
23+
testmocks "github.com/evstack/ev-node/test/mocks"
24+
extmocks "github.com/evstack/ev-node/test/mocks/external"
25+
"github.com/evstack/ev-node/types"
26+
)
27+
28+
// integrationHarness wires the Syncer the way Start() does for tests
29+
// to drive headers through the real entry points and assert on the halt pipeline.
30+
type integrationHarness struct {
31+
t *testing.T
32+
syncer *Syncer
33+
store store.Store
34+
cache cache.CacheManager
35+
gen genesis.Genesis
36+
addr []byte
37+
pub crypto.PubKey
38+
signer signerpkg.Signer
39+
errCh chan error
40+
dsCount *atomic.Int64
41+
}
42+
43+
func newIntegrationHarness(t *testing.T) *integrationHarness {
44+
t.Helper()
45+
memDS := dssync.MutexWrap(ds.NewMapDatastore())
46+
st := store.New(memDS)
47+
48+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
49+
require.NoError(t, err)
50+
51+
addr, pub, signer := buildSyncTestSigner(t)
52+
cfg := config.DefaultConfig()
53+
gen := genesis.Genesis{
54+
ChainID: "syncer-ds-integration", InitialHeight: 1,
55+
StartTime: time.Now().Add(-time.Second), ProposerAddress: addr,
56+
}
57+
58+
mockExec := testmocks.NewMockExecutor(t)
59+
mockExec.EXPECT().
60+
InitChain(mock.Anything, mock.Anything, uint64(1), gen.ChainID).
61+
Return([]byte("app0"), nil).Once()
62+
63+
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
64+
mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe()
65+
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
66+
mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe()
67+
68+
metrics := common.NopMetrics()
69+
var dsCount atomic.Int64
70+
metrics.DoubleSignsDetected = &counterCtr{n: &dsCount}
71+
72+
errCh := make(chan error, 4)
73+
s := NewSyncer(
74+
st, mockExec, nil, cm, metrics, cfg, gen,
75+
mockHeaderStore, mockDataStore, zerolog.Nop(),
76+
common.DefaultBlockOptions(), errCh, nil,
77+
)
78+
require.NoError(t, s.initializeState())
79+
s.doubleSignSeen = newDoubleSignDedup() // normally set up by Start()
80+
81+
return &integrationHarness{
82+
t: t,
83+
syncer: s,
84+
store: st,
85+
cache: cm,
86+
gen: gen,
87+
addr: addr,
88+
pub: pub,
89+
signer: signer,
90+
errCh: errCh,
91+
dsCount: &dsCount,
92+
}
93+
}
94+
95+
func (h *integrationHarness) sign(height uint64, variant byte) *types.SignedHeader {
96+
h.t.Helper()
97+
// Vary AppHash AND LastHeaderHash by variant so distinct variants always
98+
// produce distinct hashes regardless of timing or pool state.
99+
_, hdr := makeSignedHeaderBytes(h.t, h.gen.ChainID, height, h.addr, h.pub, h.signer,
100+
[]byte{variant, variant, variant}, nil, []byte{variant})
101+
return hdr
102+
}
103+
104+
// persistHeader writes the header to the store as if it had been synced.
105+
func (h *integrationHarness) persistHeader(hdr *types.SignedHeader) {
106+
h.t.Helper()
107+
batch, err := h.store.NewBatch(context.Background())
108+
require.NoError(h.t, err)
109+
require.NoError(h.t, batch.SaveBlockData(hdr, &types.Data{
110+
Metadata: &types.Metadata{ChainID: h.gen.ChainID, Height: hdr.Height(), Time: hdr.BaseHeader.Time},
111+
}, &hdr.Signature))
112+
require.NoError(h.t, batch.SetHeight(hdr.Height()))
113+
require.NoError(h.t, batch.Commit())
114+
}
115+
116+
// newDARetriever builds a daRetriever wired to the harness's syncer.
117+
func (h *integrationHarness) newDARetriever() *daRetriever {
118+
mockClient := testmocks.NewMockClient(h.t)
119+
mockClient.On("GetHeaderNamespace").Return([]byte("ns")).Maybe()
120+
mockClient.On("GetDataNamespace").Return([]byte("ns")).Maybe()
121+
return NewDARetriever(mockClient, h.cache, h.gen, zerolog.Nop(), h.syncer.detectDoubleSign)
122+
}
123+
124+
// newP2PHandler builds a P2PHandler with the header store mocked to return hdr at the given height.
125+
func (h *integrationHarness) newP2PHandler(height uint64, hdr *types.SignedHeader) (*P2PHandler, chan common.DAHeightEvent) {
126+
headerStore := extmocks.NewMockStore[*types.P2PSignedHeader](h.t)
127+
dataStore := extmocks.NewMockStore[*types.P2PData](h.t)
128+
headerStore.EXPECT().
129+
GetByHeight(mock.Anything, height).
130+
Return(&types.P2PSignedHeader{SignedHeader: hdr}, nil).
131+
Once()
132+
p2p := NewP2PHandler(headerStore, dataStore, h.cache, h.gen, zerolog.Nop(), h.syncer.detectDoubleSign)
133+
return p2p, make(chan common.DAHeightEvent, 1)
134+
}
135+
136+
// requireHalted asserts the full halt pipeline fired exactly once.
137+
func (h *integrationHarness) requireHalted(altHash []byte, height uint64) {
138+
h.t.Helper()
139+
select {
140+
case got := <-h.errCh:
141+
require.ErrorIs(h.t, got, ErrDoubleSign)
142+
case <-time.After(time.Second):
143+
h.t.Fatal("timed out waiting for critical error on errCh")
144+
}
145+
require.True(h.t, h.syncer.hasCriticalError.Load())
146+
require.Equal(h.t, int64(1), h.dsCount.Load())
147+
148+
blob, err := h.store.GetMetadata(context.Background(), store.GetDoubleSignEvidenceKey(height, altHash))
149+
require.NoError(h.t, err)
150+
require.NotEmpty(h.t, blob)
151+
}
152+
153+
// Two conflicting headers in the same DA blob batch must halt the syncer.
154+
func TestSyncerIntegration_InBatchDA_HaltsViaRealRetrieverPipeline(t *testing.T) {
155+
h := newIntegrationHarness(t)
156+
r := h.newDARetriever()
157+
158+
first := h.sign(5, 0x01)
159+
alt := h.sign(5, 0x02)
160+
require.NotEqual(t, first.Hash().String(), alt.Hash().String())
161+
162+
firstBin, err := first.MarshalBinary()
163+
require.NoError(t, err)
164+
altBin, err := alt.MarshalBinary()
165+
require.NoError(t, err)
166+
167+
events := r.ProcessBlobs(context.Background(), [][]byte{firstBin, altBin}, 100)
168+
require.Empty(t, events)
169+
170+
h.requireHalted(alt.Hash(), 5)
171+
}
172+
173+
// An alternate in a later DA batch is detected against the persisted canonical.
174+
func TestSyncerIntegration_CrossBatchDA_HaltsViaStoreLookup(t *testing.T) {
175+
h := newIntegrationHarness(t)
176+
r := h.newDARetriever()
177+
178+
first := h.sign(5, 0x01)
179+
h.persistHeader(first)
180+
181+
alt := h.sign(5, 0x02)
182+
altBin, err := alt.MarshalBinary()
183+
require.NoError(t, err)
184+
185+
events := r.ProcessBlobs(context.Background(), [][]byte{altBin}, 101)
186+
require.Empty(t, events)
187+
188+
h.requireHalted(alt.Hash(), 5)
189+
}
190+
191+
// An alternate via P2P must halt when the canonical was first observed via DA.
192+
func TestSyncerIntegration_CrossSource_DAFirstThenP2P_Halts(t *testing.T) {
193+
h := newIntegrationHarness(t)
194+
r := h.newDARetriever()
195+
196+
first := h.sign(7, 0x01)
197+
firstBin, err := first.MarshalBinary()
198+
require.NoError(t, err)
199+
_ = r.ProcessBlobs(context.Background(), [][]byte{firstBin}, 100)
200+
201+
cached, source, ok := h.cache.GetPendingSignedHeader(7)
202+
require.True(t, ok)
203+
require.True(t, bytes.Equal(cached.Hash(), first.Hash()))
204+
require.Equal(t, types.EvidenceSourceDA, source)
205+
206+
alt := h.sign(7, 0x02)
207+
p2p, ch := h.newP2PHandler(7, alt)
208+
require.NoError(t, p2p.ProcessHeight(context.Background(), 7, ch))
209+
require.Empty(t, ch)
210+
211+
h.requireHalted(alt.Hash(), 7)
212+
}
213+
214+
// An alternate via DA must halt when the canonical was first observed via P2P.
215+
func TestSyncerIntegration_CrossSource_P2PFirstThenDA_Halts(t *testing.T) {
216+
h := newIntegrationHarness(t)
217+
218+
first := h.sign(7, 0x01)
219+
p2p, ch := h.newP2PHandler(7, first)
220+
p2p.SetProcessedHeight(7)
221+
require.NoError(t, p2p.ProcessHeight(context.Background(), 7, ch))
222+
require.Empty(t, ch)
223+
224+
cached, source, ok := h.cache.GetPendingSignedHeader(7)
225+
require.True(t, ok)
226+
require.True(t, bytes.Equal(cached.Hash(), first.Hash()))
227+
require.Equal(t, types.EvidenceSourceP2P, source)
228+
229+
alt := h.sign(7, 0x02)
230+
altBin, err := alt.MarshalBinary()
231+
require.NoError(t, err)
232+
233+
r := h.newDARetriever()
234+
events := r.ProcessBlobs(context.Background(), [][]byte{altBin}, 100)
235+
require.Empty(t, events)
236+
237+
h.requireHalted(alt.Hash(), 7)
238+
}
239+
240+
// An alternate via P2P must halt when the canonical was already persisted.
241+
func TestSyncerIntegration_CrossSource_StoreFirstThenP2P_Halts(t *testing.T) {
242+
h := newIntegrationHarness(t)
243+
244+
first := h.sign(5, 0x01)
245+
h.persistHeader(first)
246+
247+
alt := h.sign(5, 0x02)
248+
p2p, ch := h.newP2PHandler(5, alt)
249+
require.NoError(t, p2p.ProcessHeight(context.Background(), 5, ch))
250+
require.Empty(t, ch)
251+
252+
h.requireHalted(alt.Hash(), 5)
253+
}
254+
255+
// Identical bytes seen twice in the same DA batch must not halt.
256+
func TestSyncerIntegration_BenignDuplicate_InBatch_DoesNotHalt(t *testing.T) {
257+
h := newIntegrationHarness(t)
258+
r := h.newDARetriever()
259+
260+
hdr := h.sign(5, 0x01)
261+
bin, err := hdr.MarshalBinary()
262+
require.NoError(t, err)
263+
264+
_ = r.ProcessBlobs(context.Background(), [][]byte{bin, bin}, 100)
265+
266+
require.False(t, h.syncer.hasCriticalError.Load())
267+
require.Equal(t, int64(0), h.dsCount.Load())
268+
require.Empty(t, h.errCh)
269+
}
270+
271+
// Re-publishing the canonical at a different DA height must not halt.
272+
func TestSyncerIntegration_BenignDuplicate_AcrossBatches_DoesNotHalt(t *testing.T) {
273+
h := newIntegrationHarness(t)
274+
r := h.newDARetriever()
275+
276+
first := h.sign(5, 0x01)
277+
h.persistHeader(first)
278+
279+
bin, err := first.MarshalBinary()
280+
require.NoError(t, err)
281+
_ = r.ProcessBlobs(context.Background(), [][]byte{bin}, 101)
282+
283+
require.False(t, h.syncer.hasCriticalError.Load())
284+
require.Equal(t, int64(0), h.dsCount.Load())
285+
require.Empty(t, h.errCh)
286+
}
287+
288+
// The same (height, altHash) seen twice must report only once.
289+
func TestSyncerIntegration_DuplicateAlternates_DedupedToOneHalt(t *testing.T) {
290+
h := newIntegrationHarness(t)
291+
r := h.newDARetriever()
292+
293+
first := h.sign(11, 0x01)
294+
alt := h.sign(11, 0x02)
295+
firstBin, err := first.MarshalBinary()
296+
require.NoError(t, err)
297+
altBin, err := alt.MarshalBinary()
298+
require.NoError(t, err)
299+
300+
_ = r.ProcessBlobs(context.Background(), [][]byte{firstBin, altBin}, 100)
301+
_ = r.ProcessBlobs(context.Background(), [][]byte{altBin}, 101)
302+
303+
require.Equal(t, int64(1), h.dsCount.Load())
304+
305+
timeout := time.After(50 * time.Millisecond)
306+
count := 0
307+
loop:
308+
for {
309+
select {
310+
case <-h.errCh:
311+
count++
312+
case <-timeout:
313+
break loop
314+
}
315+
}
316+
require.Equal(t, 1, count)
317+
}

0 commit comments

Comments
 (0)