Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 11 additions & 27 deletions pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,23 @@ type batchService struct {
batchListener postage.BatchEventListener

checksum hash.Hash // checksum hasher

// snapshotResumeBlock is the chain block height the store was rebuilt to
// from a postage snapshot. When set, live sync resumes from here.
snapshotResumeBlock uint64
}

type Interface interface {
postage.EventUpdater
}

// Snapshot carries the optional inputs needed to rebuild the batch store from a
// postage snapshot. When passed to New (non-nil), the store is reset and
// replayed from the snapshot before the service is returned, and live sync
// later resumes from the snapshot's block height. New takes ownership of
// Listener and closes it once the snapshot has been replayed.
// Snapshot carries the optional inputs to rebuild the batch store from a postage
// snapshot. When passed to New (non-nil), the snapshot is replayed into the store
// before the service is returned; live sync then resumes from the last block the
// replay reached (held in the chain state). New owns Listener and closes it once
// the replay completes.
type Snapshot struct {
// Listener replays the snapshot's events into the batch store.
Listener postage.Listener
// StartBlock is the block height from which the snapshot is replayed (the
// postage contract start block).
StartBlock uint64
// ResumeBlock is the block height the snapshot reached, from which live sync
// resumes once the replay completes.
ResumeBlock uint64
}

// New will create a new BatchService.
Expand Down Expand Up @@ -178,8 +171,8 @@ func (svc *batchService) reset() error {
return nil
}

// loadSnapshot rebuilds the (already reset) store from a postage snapshot by
// replaying its events and records the block height to resume live sync from.
// loadSnapshot replays a postage snapshot into the store, advancing the chain
// state as it goes so live sync later resumes from the last block reached.
func (svc *batchService) loadSnapshot(ctx context.Context, snapshot *Snapshot) error {
defer func() {
if err := snapshot.Listener.Close(); err != nil {
Expand All @@ -192,13 +185,7 @@ func (svc *batchService) loadSnapshot(ctx context.Context, snapshot *Snapshot) e
startBlock = cs.Block
}

if err := <-snapshot.Listener.Listen(ctx, startBlock+1, svc); err != nil {
return err
}

svc.snapshotResumeBlock = snapshot.ResumeBlock

return nil
return <-snapshot.Listener.Listen(ctx, startBlock+1, svc)
}

// Create will create a new batch with the given ID, owner value and depth and
Expand Down Expand Up @@ -345,16 +332,13 @@ func (svc *batchService) TransactionEnd() error {
var ErrInterruped = errors.New("postage sync interrupted")

func (svc *batchService) Start(ctx context.Context, startBlock uint64) (err error) {
// The store reset already happened in New, so Start only drives live sync.
// Any store reset happened in New; Start only drives live sync. The chain
// state holds the last synced block (advanced by the snapshot replay, if any),
// so live sync resumes exactly where it left off.
cs := svc.storer.GetChainState()
if cs.Block > startBlock {
startBlock = cs.Block
}
// When the store was rebuilt from a snapshot, resume live sync from the
// snapshot's block height rather than the requested start block.
if svc.snapshotResumeBlock > startBlock {
startBlock = svc.snapshotResumeBlock
}

syncedChan := svc.listener.Listen(ctx, startBlock+1, svc)

Expand Down
122 changes: 103 additions & 19 deletions pkg/postage/batchservice/batchservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ package batchservice_test

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"hash"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/postage/batchservice"
"github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
"github.com/ethersphere/bee/v2/pkg/postage/snapshot"
postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing"
mocks "github.com/ethersphere/bee/v2/pkg/statestore/mock"
"github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -615,15 +621,20 @@ func TestResyncControlsReset(t *testing.T) {

type recordingListener struct {
from uint64
syncedTo uint64 // when non-zero, the replay advances the chain state to this block
listened bool
closed bool
listenErr error
closeErr error
}

func (r *recordingListener) Listen(_ context.Context, from uint64, _ postage.EventUpdater) <-chan error {
func (r *recordingListener) Listen(_ context.Context, from uint64, updater postage.EventUpdater) <-chan error {
r.listened = true
r.from = from
// Mimic the real listener advancing the chain state during replay.
if r.syncedTo != 0 && r.listenErr == nil {
_ = updater.UpdateBlockNumber(r.syncedTo)
}
c := make(chan error, 1)
c <- r.listenErr
return c
Expand All @@ -634,28 +645,33 @@ func (r *recordingListener) Close() error {
return r.closeErr
}

// TestSnapshotRebuild covers the snapshot rebuild path and the #5495 fix: live
// sync resumes from the snapshot's block height, and the store is reset at most
// once even when --resync is set alongside a snapshot (never twice, which would
// discard the freshly loaded snapshot).
// TestSnapshotRebuild covers the snapshot rebuild path: live sync resumes from
// the block the replay reached (not the snapshot tip), and the store is reset at
// most once even when --resync is set alongside a snapshot (#5495).
func TestSnapshotRebuild(t *testing.T) {
t.Parallel()

newSnapshot := func() (*recordingListener, *batchservice.Snapshot) {
snapListener := &recordingListener{}
return snapListener, &batchservice.Snapshot{
Listener: snapListener,
StartBlock: 100,
ResumeBlock: 4096,
Listener: snapListener,
StartBlock: 100,
}
}

t.Run("snapshot replays and live sync resumes from its block", func(t *testing.T) {
t.Run("live sync resumes from where the replay stopped, not the snapshot tip", func(t *testing.T) {
t.Parallel()

s := mocks.NewStateStore()
store := mock.New()
snapListener, snapshot := newSnapshot()
// A valid chain state must exist before the replay advances it.
putChainState(t, store, &postage.ChainState{Block: 0, TotalAmount: big.NewInt(0), CurrentPrice: big.NewInt(0)})

// The real replay stops a few blocks below the snapshot tip (the listener
// trims its tip), so live sync must resume where it actually stopped, not
// at the tip — otherwise the trimmed blocks are skipped (#5495).
snapListener := &recordingListener{syncedTo: 4090}
snapshot := &batchservice.Snapshot{Listener: snapListener, StartBlock: 100}
liveListener := &recordingListener{}

svc, loaded, err := batchservice.New(context.Background(), s, store, testLog, liveListener, nil, nil, nil, snapshot, false)
Expand All @@ -677,13 +693,12 @@ func TestSnapshotRebuild(t *testing.T) {
t.Fatal("expected snapshot listener to be closed")
}

// Live sync resumes from the snapshot's block height, not the requested
// start block.
// Live sync resumes from cs.Block+1, where the replay stopped.
if err := svc.Start(context.Background(), snapshot.StartBlock); err != nil {
t.Fatal(err)
}
if liveListener.from != snapshot.ResumeBlock+1 {
t.Fatalf("expect live sync from %d got %d", snapshot.ResumeBlock+1, liveListener.from)
if liveListener.from != 4091 {
t.Fatalf("expect live sync to resume from 4091 (replay stop +1) got %d", liveListener.from)
}
if c := store.ResetCalls(); c != 0 {
t.Fatalf("expect store never reset, got %d", c)
Expand Down Expand Up @@ -730,9 +745,8 @@ func TestSnapshotCornerCases(t *testing.T) {
newSnapshot := func(listenErr error) (*recordingListener, *batchservice.Snapshot) {
snapListener := &recordingListener{listenErr: listenErr}
return snapListener, &batchservice.Snapshot{
Listener: snapListener,
StartBlock: 100,
ResumeBlock: 4096,
Listener: snapListener,
StartBlock: 100,
}
}

Expand Down Expand Up @@ -848,7 +862,7 @@ func TestSnapshotCornerCases(t *testing.T) {

s := mocks.NewStateStore()
store := mock.New()
_, snapshot := newSnapshot(nil) // snapshot block 4096
_, snapshot := newSnapshot(nil)
liveListener := &recordingListener{}

svc, loaded, err := batchservice.New(context.Background(), s, store, testLog, liveListener, nil, nil, nil, snapshot, false)
Expand All @@ -859,7 +873,7 @@ func TestSnapshotCornerCases(t *testing.T) {
t.Fatal("expected snapshot to be loaded")
}

// A chain state further ahead than the snapshot must take precedence so
// A chain state further ahead than the replay must take precedence so
// live sync never rewinds and reprocesses events.
putChainState(t, store, &postage.ChainState{Block: 5000, TotalAmount: big.NewInt(0), CurrentPrice: big.NewInt(0)})

Expand Down Expand Up @@ -895,6 +909,76 @@ func TestSnapshotCornerCases(t *testing.T) {
})
}

// TestSnapshotHandoffNoGap guards the snapshot->RPC handoff (#5495): after a
// snapshot replay, live sync must resume from where the replay stopped
// (cs.Block+1), not the snapshot's nominal tip (maxBlock+1), which would skip the
// blocks the listener trims off the tip.
func TestSnapshotHandoffNoGap(t *testing.T) {
t.Parallel()

const maxBlock = uint64(5000)

// Newest log at maxBlock; a non-matching address makes the listener filter
// the events out, so it only advances the chain state per page.
logs := []types.Log{
{BlockNumber: 10, Address: common.HexToAddress("0x1"), Topics: []common.Hash{}},
{BlockNumber: maxBlock, Address: common.HexToAddress("0x1"), Topics: []common.Hash{}},
}
snap, err := snapshot.New(context.Background(), testLog, rawSnapshotGetter(gzipSnapshot(t, logs)), nil,
common.Address{}, abi.ABI{}, time.Second, time.Minute, time.Second, 0)
if err != nil {
t.Fatalf("snapshot.New: %v", err)
}

s := mocks.NewStateStore()
store := mock.New()
// Valid chain state so the replay can advance it.
putChainState(t, store, &postage.ChainState{Block: 0, TotalAmount: big.NewInt(0), CurrentPrice: big.NewInt(0)})

live := &recordingListener{}
svc, loaded, err := batchservice.New(context.Background(), s, store, testLog, live, nil, nil, nil, snap, false)
if err != nil {
t.Fatalf("batchservice.New: %v", err)
}
if !loaded {
t.Fatal("expected snapshot to be loaded")
}

cs := store.GetChainState()
if cs.Block >= maxBlock {
t.Fatalf("replay reached %d, expected to stop below the snapshot max block %d", cs.Block, maxBlock)
}

if err := svc.Start(context.Background(), 0); err != nil {
t.Fatalf("start: %v", err)
}

// Must resume where the replay stopped, not at the snapshot tip.
if live.from != cs.Block+1 {
t.Fatalf("live sync resumed from %d; must resume from cs.Block+1 = %d (resuming higher skips the snapshot's trimmed tail — see #5495)", live.from, cs.Block+1)
}
}

func gzipSnapshot(t *testing.T, logs []types.Log) []byte {
t.Helper()
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
enc := json.NewEncoder(gz)
for _, l := range logs {
if err := enc.Encode(l); err != nil {
t.Fatalf("encode log: %v", err)
}
}
if err := gz.Close(); err != nil {
t.Fatalf("gzip close: %v", err)
}
return buf.Bytes()
}

type rawSnapshotGetter []byte

func (g rawSnapshotGetter) GetBatchSnapshot() []byte { return g }

func TestChecksum(t *testing.T) {
t.Parallel()

Expand Down
104 changes: 104 additions & 0 deletions pkg/postage/snapshot/archive/archive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package archive_test

import (
"context"
"math/big"
"testing"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage/snapshot"
"github.com/ethersphere/bee/v2/pkg/postage/snapshot/archive"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestSnapshotLogFilterer_RealSnapshot parses the snapshot blob actually embedded
// in the binary. It guards against a missing, empty, or unparseable embed (e.g. a
// bad batch-archive bump), which would otherwise only surface at runtime as a
// stalled postage sync.
func TestSnapshotLogFilterer_RealSnapshot(t *testing.T) {
t.Parallel()

getter := archive.Getter{}

// Sanity, fail-fast before the filter subtests run against the filterer: the
// embed must carry data, parse cleanly, and contain logs. Otherwise a bad
// batch-archive bump only surfaces at runtime as a stalled postage sync.
require.NotEmpty(t, getter.GetBatchSnapshot(), "embedded batch snapshot is empty")

filterer := snapshot.NewSnapshotLogFilterer(log.Noop, getter)

maxBlock, err := filterer.BlockNumber(context.Background())
if err != nil {
t.Fatalf("embedded batch snapshot failed to parse: %v", err)
}
if maxBlock == 0 {
t.Fatal("embedded batch snapshot has no logs (max block height 0)")
}

t.Run("filter range", func(t *testing.T) {
// arbitrary range that should exist in the snapshot
from := big.NewInt(20000000)
to := big.NewInt(20001000)
res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
})
require.NoError(t, err)
for _, l := range res {
assert.GreaterOrEqual(t, l.BlockNumber, from.Uint64())
assert.LessOrEqual(t, l.BlockNumber, to.Uint64())
}
})

t.Run("filter address mismatch", func(t *testing.T) {
// random address that should not match the postage stamp contract
addr := common.HexToAddress("0x1234567890123456789012345678901234567890")
res, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
Addresses: []common.Address{addr},
})
require.NoError(t, err)
assert.Empty(t, res)
})
}

func BenchmarkNewSnapshotLogFilterer_Load(b *testing.B) {
getter := archive.Getter{}

for b.Loop() {
filterer := snapshot.NewSnapshotLogFilterer(log.Noop, getter)
_, err := filterer.BlockNumber(context.Background())
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkSnapshotLogFilterer(b *testing.B) {
getter := archive.Getter{}
filterer := snapshot.NewSnapshotLogFilterer(log.Noop, getter)
// ensure loaded
if _, err := filterer.BlockNumber(context.Background()); err != nil {
b.Fatal(err)
}

b.Run("FilterLogs", func(b *testing.B) {
for b.Loop() {
from := big.NewInt(20000000)
to := big.NewInt(20001000)
_, err := filterer.FilterLogs(context.Background(), ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
})
if err != nil {
b.Fatal(err)
}
}
})
}
Loading
Loading