Skip to content

Commit 025d9d4

Browse files
fix(postage): resume live sync where snapshot replay stopped (#5517)
1 parent 5c739d1 commit 025d9d4

5 files changed

Lines changed: 292 additions & 57 deletions

File tree

pkg/postage/batchservice/batchservice.go

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,30 +39,23 @@ type batchService struct {
3939
batchListener postage.BatchEventListener
4040

4141
checksum hash.Hash // checksum hasher
42-
43-
// snapshotResumeBlock is the chain block height the store was rebuilt to
44-
// from a postage snapshot. When set, live sync resumes from here.
45-
snapshotResumeBlock uint64
4642
}
4743

4844
type Interface interface {
4945
postage.EventUpdater
5046
}
5147

52-
// Snapshot carries the optional inputs needed to rebuild the batch store from a
53-
// postage snapshot. When passed to New (non-nil), the store is reset and
54-
// replayed from the snapshot before the service is returned, and live sync
55-
// later resumes from the snapshot's block height. New takes ownership of
56-
// Listener and closes it once the snapshot has been replayed.
48+
// Snapshot carries the optional inputs to rebuild the batch store from a postage
49+
// snapshot. When passed to New (non-nil), the snapshot is replayed into the store
50+
// before the service is returned; live sync then resumes from the last block the
51+
// replay reached (held in the chain state). New owns Listener and closes it once
52+
// the replay completes.
5753
type Snapshot struct {
5854
// Listener replays the snapshot's events into the batch store.
5955
Listener postage.Listener
6056
// StartBlock is the block height from which the snapshot is replayed (the
6157
// postage contract start block).
6258
StartBlock uint64
63-
// ResumeBlock is the block height the snapshot reached, from which live sync
64-
// resumes once the replay completes.
65-
ResumeBlock uint64
6659
}
6760

6861
// New will create a new BatchService.
@@ -178,8 +171,8 @@ func (svc *batchService) reset() error {
178171
return nil
179172
}
180173

181-
// loadSnapshot rebuilds the (already reset) store from a postage snapshot by
182-
// replaying its events and records the block height to resume live sync from.
174+
// loadSnapshot replays a postage snapshot into the store, advancing the chain
175+
// state as it goes so live sync later resumes from the last block reached.
183176
func (svc *batchService) loadSnapshot(ctx context.Context, snapshot *Snapshot) error {
184177
defer func() {
185178
if err := snapshot.Listener.Close(); err != nil {
@@ -192,13 +185,7 @@ func (svc *batchService) loadSnapshot(ctx context.Context, snapshot *Snapshot) e
192185
startBlock = cs.Block
193186
}
194187

195-
if err := <-snapshot.Listener.Listen(ctx, startBlock+1, svc); err != nil {
196-
return err
197-
}
198-
199-
svc.snapshotResumeBlock = snapshot.ResumeBlock
200-
201-
return nil
188+
return <-snapshot.Listener.Listen(ctx, startBlock+1, svc)
202189
}
203190

204191
// Create will create a new batch with the given ID, owner value and depth and
@@ -345,16 +332,13 @@ func (svc *batchService) TransactionEnd() error {
345332
var ErrInterruped = errors.New("postage sync interrupted")
346333

347334
func (svc *batchService) Start(ctx context.Context, startBlock uint64) (err error) {
348-
// The store reset already happened in New, so Start only drives live sync.
335+
// Any store reset happened in New; Start only drives live sync. The chain
336+
// state holds the last synced block (advanced by the snapshot replay, if any),
337+
// so live sync resumes exactly where it left off.
349338
cs := svc.storer.GetChainState()
350339
if cs.Block > startBlock {
351340
startBlock = cs.Block
352341
}
353-
// When the store was rebuilt from a snapshot, resume live sync from the
354-
// snapshot's block height rather than the requested start block.
355-
if svc.snapshotResumeBlock > startBlock {
356-
startBlock = svc.snapshotResumeBlock
357-
}
358342

359343
syncedChan := svc.listener.Listen(ctx, startBlock+1, svc)
360344

pkg/postage/batchservice/batchservice_test.go

Lines changed: 103 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,23 @@ package batchservice_test
66

77
import (
88
"bytes"
9+
"compress/gzip"
910
"context"
11+
"encoding/json"
1012
"errors"
1113
"hash"
1214
"math/big"
1315
"testing"
16+
"time"
1417

18+
"github.com/ethereum/go-ethereum/accounts/abi"
1519
"github.com/ethereum/go-ethereum/common"
20+
"github.com/ethereum/go-ethereum/core/types"
1621
"github.com/ethersphere/bee/v2/pkg/log"
1722
"github.com/ethersphere/bee/v2/pkg/postage"
1823
"github.com/ethersphere/bee/v2/pkg/postage/batchservice"
1924
"github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
25+
"github.com/ethersphere/bee/v2/pkg/postage/snapshot"
2026
postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing"
2127
mocks "github.com/ethersphere/bee/v2/pkg/statestore/mock"
2228
"github.com/ethersphere/bee/v2/pkg/storage"
@@ -615,15 +621,20 @@ func TestResyncControlsReset(t *testing.T) {
615621

616622
type recordingListener struct {
617623
from uint64
624+
syncedTo uint64 // when non-zero, the replay advances the chain state to this block
618625
listened bool
619626
closed bool
620627
listenErr error
621628
closeErr error
622629
}
623630

624-
func (r *recordingListener) Listen(_ context.Context, from uint64, _ postage.EventUpdater) <-chan error {
631+
func (r *recordingListener) Listen(_ context.Context, from uint64, updater postage.EventUpdater) <-chan error {
625632
r.listened = true
626633
r.from = from
634+
// Mimic the real listener advancing the chain state during replay.
635+
if r.syncedTo != 0 && r.listenErr == nil {
636+
_ = updater.UpdateBlockNumber(r.syncedTo)
637+
}
627638
c := make(chan error, 1)
628639
c <- r.listenErr
629640
return c
@@ -634,28 +645,33 @@ func (r *recordingListener) Close() error {
634645
return r.closeErr
635646
}
636647

637-
// TestSnapshotRebuild covers the snapshot rebuild path and the #5495 fix: live
638-
// sync resumes from the snapshot's block height, and the store is reset at most
639-
// once even when --resync is set alongside a snapshot (never twice, which would
640-
// discard the freshly loaded snapshot).
648+
// TestSnapshotRebuild covers the snapshot rebuild path: live sync resumes from
649+
// the block the replay reached (not the snapshot tip), and the store is reset at
650+
// most once even when --resync is set alongside a snapshot (#5495).
641651
func TestSnapshotRebuild(t *testing.T) {
642652
t.Parallel()
643653

644654
newSnapshot := func() (*recordingListener, *batchservice.Snapshot) {
645655
snapListener := &recordingListener{}
646656
return snapListener, &batchservice.Snapshot{
647-
Listener: snapListener,
648-
StartBlock: 100,
649-
ResumeBlock: 4096,
657+
Listener: snapListener,
658+
StartBlock: 100,
650659
}
651660
}
652661

653-
t.Run("snapshot replays and live sync resumes from its block", func(t *testing.T) {
662+
t.Run("live sync resumes from where the replay stopped, not the snapshot tip", func(t *testing.T) {
654663
t.Parallel()
655664

656665
s := mocks.NewStateStore()
657666
store := mock.New()
658-
snapListener, snapshot := newSnapshot()
667+
// A valid chain state must exist before the replay advances it.
668+
putChainState(t, store, &postage.ChainState{Block: 0, TotalAmount: big.NewInt(0), CurrentPrice: big.NewInt(0)})
669+
670+
// The real replay stops a few blocks below the snapshot tip (the listener
671+
// trims its tip), so live sync must resume where it actually stopped, not
672+
// at the tip — otherwise the trimmed blocks are skipped (#5495).
673+
snapListener := &recordingListener{syncedTo: 4090}
674+
snapshot := &batchservice.Snapshot{Listener: snapListener, StartBlock: 100}
659675
liveListener := &recordingListener{}
660676

661677
svc, loaded, err := batchservice.New(context.Background(), s, store, testLog, liveListener, nil, nil, nil, snapshot, false)
@@ -677,13 +693,12 @@ func TestSnapshotRebuild(t *testing.T) {
677693
t.Fatal("expected snapshot listener to be closed")
678694
}
679695

680-
// Live sync resumes from the snapshot's block height, not the requested
681-
// start block.
696+
// Live sync resumes from cs.Block+1, where the replay stopped.
682697
if err := svc.Start(context.Background(), snapshot.StartBlock); err != nil {
683698
t.Fatal(err)
684699
}
685-
if liveListener.from != snapshot.ResumeBlock+1 {
686-
t.Fatalf("expect live sync from %d got %d", snapshot.ResumeBlock+1, liveListener.from)
700+
if liveListener.from != 4091 {
701+
t.Fatalf("expect live sync to resume from 4091 (replay stop +1) got %d", liveListener.from)
687702
}
688703
if c := store.ResetCalls(); c != 0 {
689704
t.Fatalf("expect store never reset, got %d", c)
@@ -730,9 +745,8 @@ func TestSnapshotCornerCases(t *testing.T) {
730745
newSnapshot := func(listenErr error) (*recordingListener, *batchservice.Snapshot) {
731746
snapListener := &recordingListener{listenErr: listenErr}
732747
return snapListener, &batchservice.Snapshot{
733-
Listener: snapListener,
734-
StartBlock: 100,
735-
ResumeBlock: 4096,
748+
Listener: snapListener,
749+
StartBlock: 100,
736750
}
737751
}
738752

@@ -848,7 +862,7 @@ func TestSnapshotCornerCases(t *testing.T) {
848862

849863
s := mocks.NewStateStore()
850864
store := mock.New()
851-
_, snapshot := newSnapshot(nil) // snapshot block 4096
865+
_, snapshot := newSnapshot(nil)
852866
liveListener := &recordingListener{}
853867

854868
svc, loaded, err := batchservice.New(context.Background(), s, store, testLog, liveListener, nil, nil, nil, snapshot, false)
@@ -859,7 +873,7 @@ func TestSnapshotCornerCases(t *testing.T) {
859873
t.Fatal("expected snapshot to be loaded")
860874
}
861875

862-
// A chain state further ahead than the snapshot must take precedence so
876+
// A chain state further ahead than the replay must take precedence so
863877
// live sync never rewinds and reprocesses events.
864878
putChainState(t, store, &postage.ChainState{Block: 5000, TotalAmount: big.NewInt(0), CurrentPrice: big.NewInt(0)})
865879

@@ -895,6 +909,76 @@ func TestSnapshotCornerCases(t *testing.T) {
895909
})
896910
}
897911

912+
// TestSnapshotHandoffNoGap guards the snapshot->RPC handoff (#5495): after a
913+
// snapshot replay, live sync must resume from where the replay stopped
914+
// (cs.Block+1), not the snapshot's nominal tip (maxBlock+1), which would skip the
915+
// blocks the listener trims off the tip.
916+
func TestSnapshotHandoffNoGap(t *testing.T) {
917+
t.Parallel()
918+
919+
const maxBlock = uint64(5000)
920+
921+
// Newest log at maxBlock; a non-matching address makes the listener filter
922+
// the events out, so it only advances the chain state per page.
923+
logs := []types.Log{
924+
{BlockNumber: 10, Address: common.HexToAddress("0x1"), Topics: []common.Hash{}},
925+
{BlockNumber: maxBlock, Address: common.HexToAddress("0x1"), Topics: []common.Hash{}},
926+
}
927+
snap, err := snapshot.New(context.Background(), testLog, rawSnapshotGetter(gzipSnapshot(t, logs)), nil,
928+
common.Address{}, abi.ABI{}, time.Second, time.Minute, time.Second, 0)
929+
if err != nil {
930+
t.Fatalf("snapshot.New: %v", err)
931+
}
932+
933+
s := mocks.NewStateStore()
934+
store := mock.New()
935+
// Valid chain state so the replay can advance it.
936+
putChainState(t, store, &postage.ChainState{Block: 0, TotalAmount: big.NewInt(0), CurrentPrice: big.NewInt(0)})
937+
938+
live := &recordingListener{}
939+
svc, loaded, err := batchservice.New(context.Background(), s, store, testLog, live, nil, nil, nil, snap, false)
940+
if err != nil {
941+
t.Fatalf("batchservice.New: %v", err)
942+
}
943+
if !loaded {
944+
t.Fatal("expected snapshot to be loaded")
945+
}
946+
947+
cs := store.GetChainState()
948+
if cs.Block >= maxBlock {
949+
t.Fatalf("replay reached %d, expected to stop below the snapshot max block %d", cs.Block, maxBlock)
950+
}
951+
952+
if err := svc.Start(context.Background(), 0); err != nil {
953+
t.Fatalf("start: %v", err)
954+
}
955+
956+
// Must resume where the replay stopped, not at the snapshot tip.
957+
if live.from != cs.Block+1 {
958+
t.Fatalf("live sync resumed from %d; must resume from cs.Block+1 = %d (resuming higher skips the snapshot's trimmed tail — see #5495)", live.from, cs.Block+1)
959+
}
960+
}
961+
962+
func gzipSnapshot(t *testing.T, logs []types.Log) []byte {
963+
t.Helper()
964+
var buf bytes.Buffer
965+
gz := gzip.NewWriter(&buf)
966+
enc := json.NewEncoder(gz)
967+
for _, l := range logs {
968+
if err := enc.Encode(l); err != nil {
969+
t.Fatalf("encode log: %v", err)
970+
}
971+
}
972+
if err := gz.Close(); err != nil {
973+
t.Fatalf("gzip close: %v", err)
974+
}
975+
return buf.Bytes()
976+
}
977+
978+
type rawSnapshotGetter []byte
979+
980+
func (g rawSnapshotGetter) GetBatchSnapshot() []byte { return g }
981+
898982
func TestChecksum(t *testing.T) {
899983
t.Parallel()
900984

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2026 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package archive_test
6+
7+
import (
8+
"context"
9+
"math/big"
10+
"testing"
11+
12+
"github.com/ethereum/go-ethereum"
13+
"github.com/ethereum/go-ethereum/common"
14+
"github.com/ethersphere/bee/v2/pkg/log"
15+
"github.com/ethersphere/bee/v2/pkg/postage/snapshot"
16+
"github.com/ethersphere/bee/v2/pkg/postage/snapshot/archive"
17+
"github.com/stretchr/testify/assert"
18+
"github.com/stretchr/testify/require"
19+
)
20+
21+
// TestSnapshotLogFilterer_RealSnapshot parses the snapshot blob actually embedded
22+
// in the binary. It guards against a missing, empty, or unparseable embed (e.g. a
23+
// bad batch-archive bump), which would otherwise only surface at runtime as a
24+
// stalled postage sync.
25+
func TestSnapshotLogFilterer_RealSnapshot(t *testing.T) {
26+
t.Parallel()
27+
28+
getter := archive.Getter{}
29+
30+
// Sanity, fail-fast before the filter subtests run against the filterer: the
31+
// embed must carry data, parse cleanly, and contain logs. Otherwise a bad
32+
// batch-archive bump only surfaces at runtime as a stalled postage sync.
33+
require.NotEmpty(t, getter.GetBatchSnapshot(), "embedded batch snapshot is empty")
34+
35+
filterer := snapshot.NewSnapshotLogFilterer(log.Noop, getter)
36+
37+
maxBlock, err := filterer.BlockNumber(context.Background())
38+
if err != nil {
39+
t.Fatalf("embedded batch snapshot failed to parse: %v", err)
40+
}
41+
if maxBlock == 0 {
42+
t.Fatal("embedded batch snapshot has no logs (max block height 0)")
43+
}
44+
45+
t.Run("filter range", func(t *testing.T) {
46+
// arbitrary range that should exist in the snapshot
47+
from := big.NewInt(20000000)
48+
to := big.NewInt(20001000)
49+
res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
50+
FromBlock: from,
51+
ToBlock: to,
52+
})
53+
require.NoError(t, err)
54+
for _, l := range res {
55+
assert.GreaterOrEqual(t, l.BlockNumber, from.Uint64())
56+
assert.LessOrEqual(t, l.BlockNumber, to.Uint64())
57+
}
58+
})
59+
60+
t.Run("filter address mismatch", func(t *testing.T) {
61+
// random address that should not match the postage stamp contract
62+
addr := common.HexToAddress("0x1234567890123456789012345678901234567890")
63+
res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
64+
Addresses: []common.Address{addr},
65+
})
66+
require.NoError(t, err)
67+
assert.Empty(t, res)
68+
})
69+
}
70+
71+
func BenchmarkNewSnapshotLogFilterer_Load(b *testing.B) {
72+
getter := archive.Getter{}
73+
74+
for b.Loop() {
75+
filterer := snapshot.NewSnapshotLogFilterer(log.Noop, getter)
76+
_, err := filterer.BlockNumber(context.Background())
77+
if err != nil {
78+
b.Fatal(err)
79+
}
80+
}
81+
}
82+
83+
func BenchmarkSnapshotLogFilterer(b *testing.B) {
84+
getter := archive.Getter{}
85+
filterer := snapshot.NewSnapshotLogFilterer(log.Noop, getter)
86+
// ensure loaded
87+
if _, err := filterer.BlockNumber(context.Background()); err != nil {
88+
b.Fatal(err)
89+
}
90+
91+
b.Run("FilterLogs", func(b *testing.B) {
92+
for b.Loop() {
93+
from := big.NewInt(20000000)
94+
to := big.NewInt(20001000)
95+
_, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
96+
FromBlock: from,
97+
ToBlock: to,
98+
})
99+
if err != nil {
100+
b.Fatal(err)
101+
}
102+
}
103+
})
104+
}

0 commit comments

Comments
 (0)