From 61fab37b0f031b1aabbd33c26262d10598d86103 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ljubi=C5=A1a=20Ga=C4=8Devi=C4=87?= <35105035+gacevicljubisa@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:10:10 +0100 Subject: [PATCH 01/10] chore: update postage snapshot to v0.0.6 (#5401) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index cf602914109..3e474dd7899 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/caddyserver/certmagic v0.21.6 github.com/coreos/go-semver v0.3.0 github.com/ethereum/go-ethereum v1.15.11 - github.com/ethersphere/batch-archive v0.0.5 + github.com/ethersphere/batch-archive v0.0.6 github.com/ethersphere/go-price-oracle-abi v0.6.9 github.com/ethersphere/go-storage-incentives-abi v0.9.4 github.com/ethersphere/go-sw3-abi v0.6.9 diff --git a/go.sum b/go.sum index fab9e1c2f69..c3653e0463e 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/ethereum/go-ethereum v1.15.11 h1:JK73WKeu0WC0O1eyX+mdQAVHUV+UR1a9VB/d github.com/ethereum/go-ethereum v1.15.11/go.mod h1:mf8YiHIb0GR4x4TipcvBUPxJLw1mFdmxzoDi11sDRoI= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/ethersphere/batch-archive v0.0.5 h1:SM3g7Tuge4KhOn+NKgPcg2Uz2p8a/MLKZvZpmkKCyU4= -github.com/ethersphere/batch-archive v0.0.5/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= +github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC438/FFZdONY= +github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= From 742949652075520b49c6c754d47003df712ca171 Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Mon, 30 Mar 2026 13:27:20 +0300 Subject: [PATCH 02/10] fix(pullsync): return topmost_stored from Sync; advance interval only on successful delivery Sync() previously returned offer.Topmost as topmost regardless of whether the offered chunks passed stamp validation and were actually stored. The puller used this value to advance the sync interval, permanently marking BinIDs as synced even when no chunk was stored there. Sync() now returns a separate stored value alongside topmost: - stored = offer.Topmost when all chunks were validated and stored - stored = 0 when any chunk failed validation The puller advances the interval only when stored >= start. topmost is retained for the MaxUint64 overflow guard. The mock mirrors this behaviour: stored defaults to Topmost on success and is forced to 0 when a sync error is configured. Known limitation: a serving peer whose historical reserve has a gap at [start, X-1] can return a live chunk at BinID X >> start, inflating offer.Topmost and therefore stored past BinIDs that were never held. Fixing this requires the server to cap offer.Topmost at its own historical cursor; that change is deferred. --- pkg/puller/puller.go | 10 +++--- pkg/puller/puller_test.go | 42 ++++++++++++++++++++++++- pkg/pullsync/mock/pullsync.go | 19 +++++++++-- pkg/pullsync/pullsync.go | 51 ++++++++++++++++++++---------- pkg/pullsync/pullsync_test.go | 59 ++++++++++++++++++++++++++++++----- 5 files changed, 148 insertions(+), 33 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 0bcdcfae9cb..dc0561246aa 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -334,7 +334,7 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncWorkerIterCounter.Inc() - top, count, err := p.syncer.Sync(ctx, address, bin, start) + top, stored, count, err := p.syncer.Sync(ctx, address, bin, start) if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() @@ -360,14 +360,14 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) } - // pulled at least one chunk - if top >= start { - if err := p.addPeerInterval(address, bin, start, top); err != nil { + // advance interval only when chunks were successfully stored + if stored >= start { + if err := p.addPeerInterval(address, bin, start, stored); err != nil { p.metrics.SyncWorkerErrCounter.Inc() p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", address) return } - start = top + 1 + start = stored + 1 } } } diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 29687138b9c..8d04c1b2864 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -500,13 +500,53 @@ func TestContinueSyncing(t *testing.T) { kad.Trigger() err := spinlock.Wait(time.Second, func() bool { - return len(pullsync.SyncCalls(addr)) == 1 + return len(pullsync.SyncCalls(addr)) >= 1 }) if err != nil { t.Fatal(err) } } +// TestIntervalDoesNotAdvanceOnSyncError verifies that a sync error does not cause +// the interval to be advanced: the interval store must remain empty after a sync +// that returns an error, even if offer.Topmost is non-zero. +func TestIntervalDoesNotAdvanceOnSyncError(t *testing.T) { + t.Parallel() + + addr := swarm.RandAddress(t) + + _, s, kad, pullsync := newPuller(t, opts{ + kad: []kadMock.Option{ + kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}), + }, + pullSync: []mockps.Option{ + mockps.WithCursors([]uint64{100}, 0), + mockps.WithSyncError(errors.New("stamp error")), + mockps.WithReplies( + mockps.SyncReply{Start: 1, Topmost: 100, Peer: addr}, + mockps.SyncReply{Start: 1, Topmost: 100, Peer: addr}, + ), + }, + bins: 1, + rs: resMock.NewReserve(resMock.WithRadius(0)), + }) + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + + // wait until at least one sync call was made + err := spinlock.Wait(time.Second, func() bool { + return len(pullsync.SyncCalls(addr)) >= 1 + }) + if err != nil { + t.Fatal(err) + } + + // interval must not have been advanced: it is initialized to [] (no ranges committed) + // and must remain [] since every sync call returned an error + checkIntervals(t, s, addr, "[]", 0) +} + func TestPeerGone(t *testing.T) { t.Parallel() diff --git a/pkg/pullsync/mock/pullsync.go b/pkg/pullsync/mock/pullsync.go index bb77934924f..53e73426141 100644 --- a/pkg/pullsync/mock/pullsync.go +++ b/pkg/pullsync/mock/pullsync.go @@ -40,11 +40,16 @@ func toID(a swarm.Address, bin uint8, start uint64) string { return fmt.Sprintf("%s-%d-%d", a, bin, start) } +// SyncReply configures a single response from PullSyncMock.Sync. +// Stored overrides the stored return value; if zero, stored defaults to +// Topmost (matching successful-delivery behaviour). Set Stored explicitly +// when testing scenarios where stored must differ from Topmost. type SyncReply struct { Peer swarm.Address Bin uint8 Start uint64 Topmost uint64 + Stored uint64 // 0 means "default to Topmost"; set explicitly to test partial/failed delivery Count int } @@ -71,7 +76,7 @@ func NewPullSync(opts ...Option) *PullSyncMock { return s } -func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { +func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, stored uint64, count int, err error) { p.mtx.Lock() @@ -83,11 +88,19 @@ func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, p.replies[id] = p.replies[id][1:] p.syncCalls = append(p.syncCalls, reply) p.mtx.Unlock() - return reply.Topmost, reply.Count, p.syncErr + // mirror real Sync behaviour: stored defaults to Topmost on success + s := reply.Topmost + if reply.Stored != 0 { + s = reply.Stored + } + if p.syncErr != nil { + s = 0 + } + return reply.Topmost, s, reply.Count, p.syncErr } p.mtx.Unlock() <-ctx.Done() - return 0, 0, ctx.Err() + return 0, 0, 0, ctx.Err() } func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, uint64, error) { diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 436600a2f76..0a55c491e20 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -56,9 +56,13 @@ const ( // Interface is the PullSync interface. type Interface interface { // Sync syncs a batch of chunks starting at a start BinID. - // It returns the BinID of highest chunk that was synced from the given - // batch and the total number of chunks the downstream peer has sent. - Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) + // It returns the BinID of the highest offered chunk (topmost), the BinID + // of the highest chunk that was actually stored (stored; 0 on any delivery + // error), and the total number of chunks successfully stored. + // Callers must use stored — not topmost — to advance their sync interval, + // so that BinIDs whose chunks failed validation are not silently skipped. + // topmost is retained for overflow detection and future HIST-completion use. + Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, stored uint64, count int, err error) // GetCursors retrieves all cursors from a downstream peer. GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, uint64, error) } @@ -225,13 +229,16 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea } // Sync syncs a batch of chunks starting at a start BinID. -// It returns the BinID of highest chunk that was synced from the given -// batch and the total number of chunks the downstream peer has sent. -func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { +// topmost is the highest BinID offered by the peer (used for overflow detection). +// stored is the highest BinID that was successfully validated and stored +// (0 if any chunk failed validation); callers must advance their sync interval +// using stored, not topmost. +// count is the number of chunks successfully stored. +func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, stored uint64, count int, err error) { stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) if err != nil { - return 0, 0, fmt.Errorf("new stream: %w", err) + return 0, 0, 0, fmt.Errorf("new stream: %w", err) } defer func() { if err != nil { @@ -245,18 +252,20 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start rangeMsg := &pb.Get{Bin: int32(bin), Start: start} if err = w.WriteMsgWithContext(ctx, rangeMsg); err != nil { - return 0, 0, fmt.Errorf("write get range: %w", err) + return 0, 0, 0, fmt.Errorf("write get range: %w", err) } var offer pb.Offer if err = r.ReadMsgWithContext(ctx, &offer); err != nil { - return 0, 0, fmt.Errorf("read offer: %w", err) + return 0, 0, 0, fmt.Errorf("read offer: %w", err) } // empty interval (no chunks present in interval). // return the end of the requested range as topmost. + // stored equals topmost: an empty offer advances the interval to the peer's + // watermark without delivering any chunks, which is correct. if len(offer.Chunks) == 0 { - return offer.Topmost, 0, nil + return offer.Topmost, offer.Topmost, 0, nil } topmost = offer.Topmost @@ -270,7 +279,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start bv, err := bitvector.New(bvLen) if err != nil { - return 0, 0, fmt.Errorf("new bitvector: %w", err) + return 0, 0, 0, fmt.Errorf("new bitvector: %w", err) } for i := 0; i < len(offer.Chunks); i++ { @@ -279,7 +288,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start batchID := offer.Chunks[i].BatchID stampHash := offer.Chunks[i].StampHash if len(addr) != swarm.HashSize { - return 0, 0, fmt.Errorf("inconsistent hash length") + return 0, 0, 0, fmt.Errorf("inconsistent hash length") } a := swarm.NewAddress(addr) @@ -293,7 +302,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start have, err = s.store.ReserveHas(a, batchID, stampHash) if err != nil { s.logger.Debug("storage has", "error", err) - return 0, 0, err + return 0, 0, 0, err } if !have { @@ -307,7 +316,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start wantMsg := &pb.Want{BitVector: bv.Bytes()} if err = w.WriteMsgWithContext(ctx, wantMsg); err != nil { - return 0, 0, fmt.Errorf("write want: %w", err) + return 0, 0, 0, fmt.Errorf("write want: %w", err) } chunksToPut := make([]swarm.Chunk, 0, ctr) @@ -316,7 +325,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start for ; ctr > 0; ctr-- { var delivery pb.Delivery if err = r.ReadMsgWithContext(ctx, &delivery); err != nil { - return 0, 0, errors.Join(chunkErr, fmt.Errorf("read delivery: %w", err)) + return 0, 0, 0, errors.Join(chunkErr, fmt.Errorf("read delivery: %w", err)) } addr := swarm.NewAddress(delivery.Address) @@ -389,13 +398,21 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start chunkErr = errors.Join(chunkErr, err) continue } - return 0, 0, errors.Join(chunkErr, err) + return 0, 0, 0, errors.Join(chunkErr, err) } chunksPut++ } } - return topmost, chunksPut, chunkErr + // stored is the BinID callers should use to advance their sync interval. + // When any chunk failed validation or storage, stored is 0 so the interval + // does not advance past unverified BinIDs. + stored = topmost + if chunkErr != nil { + stored = 0 + } + + return topmost, stored, chunksPut, chunkErr } // makeOffer tries to assemble an offer for a given requested interval. diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 03d6927c1da..f843e3f083e 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -67,7 +67,7 @@ func TestIncoming_WantNone(t *testing.T) { psClient, clientDb = newPullSync(t, recorder, 0, mock.WithChunks(chunks...)) ) - topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -91,7 +91,7 @@ func TestIncoming_ContextTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 0) cancel() - _, _, err := psClient.Sync(ctx, swarm.ZeroAddress, 0, 0) + _, _, _, err := psClient.Sync(ctx, swarm.ZeroAddress, 0, 0) if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("wanted error %v, got %v", context.DeadlineExceeded, err) } @@ -107,7 +107,7 @@ func TestIncoming_WantOne(t *testing.T) { psClient, clientDb = newPullSync(t, recorder, 0, mock.WithChunks(someChunks(1, 2, 3, 4)...)) ) - topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -133,7 +133,7 @@ func TestIncoming_WantAll(t *testing.T) { psClient, clientDb = newPullSync(t, recorder, 0) ) - topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -195,7 +195,7 @@ func TestIncoming_WantErrors(t *testing.T) { psClient, clientDb = newPullSyncWithStamperValidator(t, recorder, 0, validStamp, mock.WithPutHook(putHook)) ) - topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) for _, e := range []error{storage.ErrOverwriteNewerChunk, validStampErr, swarm.ErrInvalidChunk} { if !errors.Is(err, e) { t.Fatalf("expected error %v", err) @@ -230,7 +230,7 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { psClient, _ = newPullSync(t, recorder, 0) ) - _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + _, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if !errors.Is(err, pullsync.ErrUnsolicitedChunk) { t.Fatalf("expected err %v but got %v", pullsync.ErrUnsolicitedChunk, err) } @@ -247,7 +247,7 @@ func TestMissingChunk(t *testing.T) { psClient, _ = newPullSync(t, recorder, 0) ) - topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -311,6 +311,51 @@ func TestGetCursorsError(t *testing.T) { }) } +func TestSync_StampFailure_StoredIsZero(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + tChunks := testingc.GenerateTestRandomChunks(3) + + tResults := make([]*storer.BinC, len(tChunks)) + for i, c := range tChunks { + stampHash, err := c.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + tResults[i] = &storer.BinC{ + Address: c.Address(), + BatchID: c.Stamp().BatchID(), + BinID: uint64(i + 1), + StampHash: stampHash, + } + } + + stampErr := errors.New("stamp validation error") + validStamp := func(c swarm.Chunk) (swarm.Chunk, error) { + return nil, stampErr + } + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSyncWithStamperValidator(t, recorder, 0, validStamp) + ) + + topmost, stored, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + if !errors.Is(err, stampErr) { + t.Fatalf("expected stamp error but got %v", err) + } + if stored != 0 { + t.Fatalf("expected stored=0 on stamp failure but got %d", stored) + } + if count != 0 { + t.Fatalf("expected count=0 on stamp failure but got %d", count) + } + if topmost != uint64(len(tChunks)) { + t.Fatalf("expected topmost=%d but got %d", len(tChunks), topmost) + } + }) +} + func haveChunks(t *testing.T, s *mock.ReserveStore, chunks ...swarm.Chunk) { t.Helper() for _, c := range chunks { From 4f9650f2682b20e3d0cbe78dd77ee7effe36cf52 Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Tue, 31 Mar 2026 11:24:11 +0300 Subject: [PATCH 03/10] fix(pullsync): prevent interval advancement past unverified BinIDs Two correctness bugs are fixed. When a delivered chunk fails stamp validation, the peer's sync interval must not advance past that BinID. Previously, Sync() returned offer.Topmost unconditionally; the puller marked those BinIDs as done even though no chunk was stored. Now Sync() returns topmost=0 on any validation failure (invalid stamp, unsolicited, or structural error), so the puller's existing guard suppresses the interval write. ErrOverwriteNewerChunk is excluded: the chunk is already in the reserve with a newer stamp, so advancing past it is correct. collectAddrs blocked on SubscribeBin until the first chunk arrived. A live chunk at BinID X far beyond the server's historical frontier caused offer.Topmost=X, inflating the client's interval past historical BinIDs it had never received. collectAddrs now snapshots ReserveLastBinIDs() before subscribing and caps topmost at that cursor. Live chunks are still delivered; only Topmost is bounded to the historical frontier. Sync() retains its 3-value signature. topmost is the server's watermark (capped at the historical cursor) on success, and 0 on validation failure. The puller code is unchanged. --- pkg/puller/puller.go | 11 +-- pkg/pullsync/mock/pullsync.go | 20 ++---- pkg/pullsync/pullsync.go | 104 +++++++++++++++++---------- pkg/pullsync/pullsync_test.go | 128 ++++++++++++++++++++++++++++------ 4 files changed, 186 insertions(+), 77 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index dc0561246aa..29a25ad783a 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -334,7 +334,7 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncWorkerIterCounter.Inc() - top, stored, count, err := p.syncer.Sync(ctx, address, bin, start) + top, count, err := p.syncer.Sync(ctx, address, bin, start) if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() @@ -360,14 +360,15 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) } - // advance interval only when chunks were successfully stored - if stored >= start { - if err := p.addPeerInterval(address, bin, start, stored); err != nil { + // top is 0 on validation error (see pullsync.Sync), so this check + // prevents interval advancement when chunks were not successfully stored. + if top >= start { + if err := p.addPeerInterval(address, bin, start, top); err != nil { p.metrics.SyncWorkerErrCounter.Inc() p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", address) return } - start = stored + 1 + start = top + 1 } } } diff --git a/pkg/pullsync/mock/pullsync.go b/pkg/pullsync/mock/pullsync.go index 53e73426141..74b8981e305 100644 --- a/pkg/pullsync/mock/pullsync.go +++ b/pkg/pullsync/mock/pullsync.go @@ -40,16 +40,11 @@ func toID(a swarm.Address, bin uint8, start uint64) string { return fmt.Sprintf("%s-%d-%d", a, bin, start) } -// SyncReply configures a single response from PullSyncMock.Sync. -// Stored overrides the stored return value; if zero, stored defaults to -// Topmost (matching successful-delivery behaviour). Set Stored explicitly -// when testing scenarios where stored must differ from Topmost. type SyncReply struct { Peer swarm.Address Bin uint8 Start uint64 Topmost uint64 - Stored uint64 // 0 means "default to Topmost"; set explicitly to test partial/failed delivery Count int } @@ -76,7 +71,7 @@ func NewPullSync(opts ...Option) *PullSyncMock { return s } -func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, stored uint64, count int, err error) { +func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { p.mtx.Lock() @@ -88,19 +83,16 @@ func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, p.replies[id] = p.replies[id][1:] p.syncCalls = append(p.syncCalls, reply) p.mtx.Unlock() - // mirror real Sync behaviour: stored defaults to Topmost on success - s := reply.Topmost - if reply.Stored != 0 { - s = reply.Stored - } + // mirror real Sync: topmost is 0 on error so the interval is not advanced + top := reply.Topmost if p.syncErr != nil { - s = 0 + top = 0 } - return reply.Topmost, s, reply.Count, p.syncErr + return top, reply.Count, p.syncErr } p.mtx.Unlock() <-ctx.Done() - return 0, 0, 0, ctx.Err() + return 0, 0, ctx.Err() } func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, uint64, error) { diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 0a55c491e20..8a4a7694e93 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -56,13 +56,15 @@ const ( // Interface is the PullSync interface. type Interface interface { // Sync syncs a batch of chunks starting at a start BinID. - // It returns the BinID of the highest offered chunk (topmost), the BinID - // of the highest chunk that was actually stored (stored; 0 on any delivery - // error), and the total number of chunks successfully stored. - // Callers must use stored — not topmost — to advance their sync interval, - // so that BinIDs whose chunks failed validation are not silently skipped. - // topmost is retained for overflow detection and future HIST-completion use. - Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, stored uint64, count int, err error) + // It returns the topmost BinID safe for interval advancement and the total + // number of chunks successfully stored. + // topmost equals offer.Topmost (capped at the server's historical cursor) + // when all delivered chunks passed validation. topmost is 0 when any chunk + // failed validation (invalid stamp, unsolicited, or structural error), so + // callers that use topmost to advance their interval will not skip + // unverified BinIDs. ErrOverwriteNewerChunk does not zero topmost because + // the chunk is already present in the reserve. + Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) // GetCursors retrieves all cursors from a downstream peer. GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, uint64, error) } @@ -229,16 +231,17 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea } // Sync syncs a batch of chunks starting at a start BinID. -// topmost is the highest BinID offered by the peer (used for overflow detection). -// stored is the highest BinID that was successfully validated and stored -// (0 if any chunk failed validation); callers must advance their sync interval -// using stored, not topmost. -// count is the number of chunks successfully stored. -func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, stored uint64, count int, err error) { +// topmost equals offer.Topmost capped at the server's historical cursor +// (see collectAddrs). topmost is 0 when any delivered chunk failed validation +// (invalid stamp, unsolicited, or structural error), preventing the caller +// from silently skipping unverified BinIDs. +// ErrOverwriteNewerChunk does not zero topmost: the chunk is already present. +// count is the number of chunks successfully written to the reserve. +func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) if err != nil { - return 0, 0, 0, fmt.Errorf("new stream: %w", err) + return 0, 0, fmt.Errorf("new stream: %w", err) } defer func() { if err != nil { @@ -252,20 +255,18 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start rangeMsg := &pb.Get{Bin: int32(bin), Start: start} if err = w.WriteMsgWithContext(ctx, rangeMsg); err != nil { - return 0, 0, 0, fmt.Errorf("write get range: %w", err) + return 0, 0, fmt.Errorf("write get range: %w", err) } var offer pb.Offer if err = r.ReadMsgWithContext(ctx, &offer); err != nil { - return 0, 0, 0, fmt.Errorf("read offer: %w", err) + return 0, 0, fmt.Errorf("read offer: %w", err) } - // empty interval (no chunks present in interval). - // return the end of the requested range as topmost. - // stored equals topmost: an empty offer advances the interval to the peer's - // watermark without delivering any chunks, which is correct. + // empty interval: no chunks in range. + // return the peer's watermark so the caller can advance the interval. if len(offer.Chunks) == 0 { - return offer.Topmost, offer.Topmost, 0, nil + return offer.Topmost, 0, nil } topmost = offer.Topmost @@ -279,7 +280,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start bv, err := bitvector.New(bvLen) if err != nil { - return 0, 0, 0, fmt.Errorf("new bitvector: %w", err) + return 0, 0, fmt.Errorf("new bitvector: %w", err) } for i := 0; i < len(offer.Chunks); i++ { @@ -288,7 +289,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start batchID := offer.Chunks[i].BatchID stampHash := offer.Chunks[i].StampHash if len(addr) != swarm.HashSize { - return 0, 0, 0, fmt.Errorf("inconsistent hash length") + return 0, 0, fmt.Errorf("inconsistent hash length") } a := swarm.NewAddress(addr) @@ -302,7 +303,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start have, err = s.store.ReserveHas(a, batchID, stampHash) if err != nil { s.logger.Debug("storage has", "error", err) - return 0, 0, 0, err + return 0, 0, err } if !have { @@ -316,16 +317,19 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start wantMsg := &pb.Want{BitVector: bv.Bytes()} if err = w.WriteMsgWithContext(ctx, wantMsg); err != nil { - return 0, 0, 0, fmt.Errorf("write want: %w", err) + return 0, 0, fmt.Errorf("write want: %w", err) } chunksToPut := make([]swarm.Chunk, 0, ctr) - var chunkErr error + var ( + chunkErr error + hasValidationError bool // true when a chunk was not stored due to a validation failure + ) for ; ctr > 0; ctr-- { var delivery pb.Delivery if err = r.ReadMsgWithContext(ctx, &delivery); err != nil { - return 0, 0, 0, errors.Join(chunkErr, fmt.Errorf("read delivery: %w", err)) + return 0, 0, errors.Join(chunkErr, fmt.Errorf("read delivery: %w", err)) } addr := swarm.NewAddress(delivery.Address) @@ -340,11 +344,13 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start stamp := new(postage.Stamp) if err = stamp.UnmarshalBinary(delivery.Stamp); err != nil { chunkErr = errors.Join(chunkErr, err) + hasValidationError = true continue } stampHash, err := stamp.Hash() if err != nil { chunkErr = errors.Join(chunkErr, err) + hasValidationError = true continue } @@ -352,6 +358,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start if _, ok := wantChunks[wantChunkID]; !ok { s.logger.Debug("want chunks", "error", ErrUnsolicitedChunk, "peer_address", peer, "chunk_address", addr) chunkErr = errors.Join(chunkErr, ErrUnsolicitedChunk) + hasValidationError = true continue } @@ -361,6 +368,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start if err != nil { s.logger.Debug("unverified stamp", "error", err, "peer_address", peer, "chunk_address", newChunk) chunkErr = errors.Join(chunkErr, err) + hasValidationError = true continue } @@ -370,6 +378,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start addr, err := chunk.Address() if err != nil { chunkErr = errors.Join(chunkErr, err) + hasValidationError = true continue } s.logger.Debug("sync gsoc", "peer_address", peer, "chunk_address", addr, "wrapped_chunk_address", chunk.WrappedChunk().Address()) @@ -377,6 +386,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start } else { s.logger.Debug("invalid cac/soc chunk", "error", swarm.ErrInvalidChunk, "peer_address", peer, "chunk", chunk) chunkErr = errors.Join(chunkErr, swarm.ErrInvalidChunk) + hasValidationError = true s.metrics.ReceivedInvalidChunk.Inc() continue } @@ -396,23 +406,27 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start if errors.Is(err, storage.ErrOverwriteNewerChunk) { s.logger.Debug("overwrite newer chunk", "error", err, "peer_address", peer, "chunk", c) chunkErr = errors.Join(chunkErr, err) + // ErrOverwriteNewerChunk is not a validation failure: the chunk is + // already present in the reserve with a newer stamp. It is safe to + // advance the interval past it. continue } - return 0, 0, 0, errors.Join(chunkErr, err) + return 0, 0, errors.Join(chunkErr, err) } chunksPut++ } } - // stored is the BinID callers should use to advance their sync interval. - // When any chunk failed validation or storage, stored is 0 so the interval - // does not advance past unverified BinIDs. - stored = topmost - if chunkErr != nil { - stored = 0 + // Zero topmost when any chunk failed validation (stamp, solicitation, + // or structural check). This prevents the caller from advancing its + // interval past BinIDs whose chunks were never stored. + // ErrOverwriteNewerChunk does not zero topmost: the chunk is already + // present in the reserve with a newer stamp, so advancement is safe. + if hasValidationError { + topmost = 0 } - return topmost, stored, chunksPut, chunkErr + return topmost, chunksPut, chunkErr } // makeOffer tries to assemble an offer for a given requested interval. @@ -443,6 +457,19 @@ type collectAddrsResult struct { // after which the function returns the collected slice of chunks. func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*storer.BinC, uint64, error) { v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (*collectAddrsResult, error) { + // Snapshot the server's historical cursor for this bin before subscribing. + // Any chunk with BinID beyond this cursor arrived live after the snapshot; + // capping topmost at the cursor prevents live chunks from inflating the + // interval the client records for this peer. + cursors, _, err := s.store.ReserveLastBinIDs() + if err != nil { + return nil, fmt.Errorf("reserve last bin IDs: %w", err) + } + var historicalCursor uint64 + if int(bin) < len(cursors) { + historicalCursor = cursors[bin] + } + var ( chs []*storer.BinC topmost uint64 @@ -492,6 +519,13 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* } } + // Cap topmost at the historical cursor. Live chunks (BinID > historicalCursor) + // are included in the offer so the client can store them, but Topmost must + // not advance the client's interval past the server's historical frontier. + if topmost > historicalCursor { + topmost = historicalCursor + } + return &collectAddrsResult{chs: chs, topmost: topmost}, nil }) if err != nil { diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index f843e3f083e..61f78f37f28 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -62,12 +62,12 @@ func TestIncoming_WantNone(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSync(t, recorder, 0, mock.WithChunks(chunks...)) ) - topmost, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -84,14 +84,14 @@ func TestIncoming_WantNone(t *testing.T) { func TestIncoming_ContextTimeout(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( - ps, _ = newPullSync(t, nil, 0, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 0, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{4}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, _ = newPullSync(t, recorder, 0, mock.WithChunks(chunks...)) ) ctx, cancel := context.WithTimeout(context.Background(), 0) cancel() - _, _, _, err := psClient.Sync(ctx, swarm.ZeroAddress, 0, 0) + _, _, err := psClient.Sync(ctx, swarm.ZeroAddress, 0, 0) if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("wanted error %v, got %v", context.DeadlineExceeded, err) } @@ -102,12 +102,12 @@ func TestIncoming_WantOne(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSync(t, recorder, 0, mock.WithChunks(someChunks(1, 2, 3, 4)...)) ) - topmost, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -128,12 +128,12 @@ func TestIncoming_WantAll(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSync(t, recorder, 0) ) - topmost, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -190,12 +190,12 @@ func TestIncoming_WantErrors(t *testing.T) { var ( topMost = uint64(10) - ps, _ = newPullSync(t, nil, 20, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...)) + ps, _ = newPullSync(t, nil, 20, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSyncWithStamperValidator(t, recorder, 0, validStamp, mock.WithPutHook(putHook)) ) - topmost, _, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) for _, e := range []error{storage.ErrOverwriteNewerChunk, validStampErr, swarm.ErrInvalidChunk} { if !errors.Is(err, e) { t.Fatalf("expected error %v", err) @@ -206,8 +206,10 @@ func TestIncoming_WantErrors(t *testing.T) { t.Fatalf("got %d chunks but want %d", count, 3) } - if topmost != topMost { - t.Fatalf("got offer topmost %d but want %d", topmost, topMost) + // topmost must be 0: validation errors (validStampErr, ErrInvalidChunk) zero + // topmost so the caller cannot advance its interval past unverified BinIDs. + if topmost != 0 { + t.Fatalf("got topmost %d but want 0 (validation errors must zero topmost)", topmost) } haveChunks(t, clientDb, append(tChunks[:1], tChunks[3:5]...)...) @@ -225,12 +227,12 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { evil := swarm.NewChunk(evilAddr, evilData).WithStamp(stamp) var ( - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithEvilChunk(addrs[4], evil)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithEvilChunk(addrs[4], evil), mock.WithCursors([]uint64{4}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, _ = newPullSync(t, recorder, 0) ) - _, _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if !errors.Is(err, pullsync.ErrUnsolicitedChunk) { t.Fatalf("expected err %v but got %v", pullsync.ErrUnsolicitedChunk, err) } @@ -242,12 +244,12 @@ func TestMissingChunk(t *testing.T) { var ( zeroChunk = swarm.NewChunk(swarm.ZeroAddress, nil) topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks([]swarm.Chunk{zeroChunk}...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks([]swarm.Chunk{zeroChunk}...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, _ = newPullSync(t, recorder, 0) ) - topmost, _, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -311,7 +313,7 @@ func TestGetCursorsError(t *testing.T) { }) } -func TestSync_StampFailure_StoredIsZero(t *testing.T) { +func TestSync_StampFailure_TopmostIsZero(t *testing.T) { synctest.Test(t, func(t *testing.T) { tChunks := testingc.GenerateTestRandomChunks(3) @@ -334,24 +336,104 @@ func TestSync_StampFailure_StoredIsZero(t *testing.T) { return nil, stampErr } + cursor := uint64(len(tChunks)) // max BinID equals number of chunks + var ( - ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...)) + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...), mock.WithCursors([]uint64{cursor}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, _ = newPullSyncWithStamperValidator(t, recorder, 0, validStamp) ) - topmost, stored, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if !errors.Is(err, stampErr) { t.Fatalf("expected stamp error but got %v", err) } - if stored != 0 { - t.Fatalf("expected stored=0 on stamp failure but got %d", stored) + // topmost must be 0: stamp validation failure must prevent interval advancement + if topmost != 0 { + t.Fatalf("expected topmost=0 on stamp failure but got %d", topmost) } if count != 0 { t.Fatalf("expected count=0 on stamp failure but got %d", count) } - if topmost != uint64(len(tChunks)) { - t.Fatalf("expected topmost=%d but got %d", len(tChunks), topmost) + }) +} + +// TestSync_LiveChunkTopCappedAtCursor verifies that a live chunk with a BinID +// far beyond the server's historical cursor does not inflate offer.Topmost. +// Without the server-side cap, the client would advance its interval to the +// live chunk's BinID, permanently skipping the historical range in between. +func TestSync_LiveChunkTopCappedAtCursor(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + liveChunk := testingc.GenerateTestRandomChunk() + stampHash, err := liveChunk.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + // The subscribe response contains only the live chunk at a high BinID. + // The server's historical cursor is set much lower. + const liveBinID = uint64(100) + const historicalCursor = uint64(10) + liveResult := []*storer.BinC{{ + Address: liveChunk.Address(), + BatchID: liveChunk.Stamp().BatchID(), + BinID: liveBinID, + StampHash: stampHash, + }} + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(liveResult, nil), mock.WithChunks(liveChunk), mock.WithCursors([]uint64{historicalCursor}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 1) + if err != nil { + t.Fatal(err) + } + if topmost != historicalCursor { + t.Fatalf("topmost: got %d, want %d (live BinID %d must not inflate topmost)", topmost, historicalCursor, liveBinID) + } + }) +} + +// TestSync_OverwriteNewerChunkDoesNotBlockInterval verifies that +// ErrOverwriteNewerChunk does not prevent interval advancement. +// The chunk is already present in the reserve with a newer stamp, so it is +// safe to advance past it even though the put returned an error. +func TestSync_OverwriteNewerChunkDoesNotBlockInterval(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + tChunk := testingc.GenerateTestRandomChunk() + stampHash, err := tChunk.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + tResult := []*storer.BinC{{ + Address: tChunk.Address(), + BatchID: tChunk.Stamp().BatchID(), + BinID: 1, + StampHash: stampHash, + }} + + putHook := func(swarm.Chunk) error { return storage.ErrOverwriteNewerChunk } + + var ( + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(tResult, nil), mock.WithChunks(tChunk), mock.WithCursors([]uint64{1}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSyncWithStamperValidator(t, recorder, 0, func(c swarm.Chunk) (swarm.Chunk, error) { return c, nil }, mock.WithPutHook(putHook)) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 1) + if !errors.Is(err, storage.ErrOverwriteNewerChunk) { + t.Fatalf("expected ErrOverwriteNewerChunk but got %v", err) + } + // ErrOverwriteNewerChunk must not zero topmost: the chunk is already in + // the reserve with a newer stamp, so interval advancement is safe. + if topmost != 1 { + t.Fatalf("topmost: got %d, want 1 (ErrOverwriteNewerChunk must not block interval)", topmost) + } + if count != 0 { + t.Fatalf("count: got %d, want 0 (chunk not stored due to overwrite)", count) } }) } From 2c7d7b7f685cfc23afeb2c516f15a652b368f1f6 Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Sun, 5 Apr 2026 17:48:58 +0300 Subject: [PATCH 04/10] refactor(pullsync): replace hasValidationError flag with two-bucket error accumulators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hasValidationError bool was set at six separate sites and acted on in a deferred block at the end of Sync(). This is an outlier in the bee codebase, where the standard pattern is to use error accumulators whose nil-ness is the condition. Replace with two accumulators that express the existing semantic distinction structurally: chunkErr — stamp, solicitation, and structural failures. A non-nil chunkErr zeros topmost, preventing interval advancement past BinIDs whose chunks were never stored. overwriteErr — ErrOverwriteNewerChunk only. The chunk is already present in the reserve; advancing the interval past it is correct, so overwriteErr does not affect topmost. Both accumulators are joined on return, preserving the caller contract: all errors remain reachable via errors.Is. No interface change. No behaviour change. --- pkg/pullsync/pullsync.go | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 8a4a7694e93..658e48ad2bf 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -322,14 +322,20 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start chunksToPut := make([]swarm.Chunk, 0, ctr) + // chunkErr accumulates validation failures (stamp, solicitation, structural). + // A non-nil chunkErr zeros topmost, preventing interval advancement past + // BinIDs whose chunks were never stored. + // overwriteErr accumulates ErrOverwriteNewerChunk: the chunk is already + // present in the reserve, so advancing the interval past it is correct. + // Both are joined on return so callers can inspect individual errors via errors.Is. var ( - chunkErr error - hasValidationError bool // true when a chunk was not stored due to a validation failure + chunkErr error + overwriteErr error ) for ; ctr > 0; ctr-- { var delivery pb.Delivery if err = r.ReadMsgWithContext(ctx, &delivery); err != nil { - return 0, 0, errors.Join(chunkErr, fmt.Errorf("read delivery: %w", err)) + return 0, 0, errors.Join(chunkErr, overwriteErr, fmt.Errorf("read delivery: %w", err)) } addr := swarm.NewAddress(delivery.Address) @@ -344,13 +350,11 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start stamp := new(postage.Stamp) if err = stamp.UnmarshalBinary(delivery.Stamp); err != nil { chunkErr = errors.Join(chunkErr, err) - hasValidationError = true continue } stampHash, err := stamp.Hash() if err != nil { chunkErr = errors.Join(chunkErr, err) - hasValidationError = true continue } @@ -358,7 +362,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start if _, ok := wantChunks[wantChunkID]; !ok { s.logger.Debug("want chunks", "error", ErrUnsolicitedChunk, "peer_address", peer, "chunk_address", addr) chunkErr = errors.Join(chunkErr, ErrUnsolicitedChunk) - hasValidationError = true continue } @@ -368,7 +371,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start if err != nil { s.logger.Debug("unverified stamp", "error", err, "peer_address", peer, "chunk_address", newChunk) chunkErr = errors.Join(chunkErr, err) - hasValidationError = true continue } @@ -378,7 +380,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start addr, err := chunk.Address() if err != nil { chunkErr = errors.Join(chunkErr, err) - hasValidationError = true continue } s.logger.Debug("sync gsoc", "peer_address", peer, "chunk_address", addr, "wrapped_chunk_address", chunk.WrappedChunk().Address()) @@ -386,7 +387,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start } else { s.logger.Debug("invalid cac/soc chunk", "error", swarm.ErrInvalidChunk, "peer_address", peer, "chunk", chunk) chunkErr = errors.Join(chunkErr, swarm.ErrInvalidChunk) - hasValidationError = true s.metrics.ReceivedInvalidChunk.Inc() continue } @@ -401,32 +401,22 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start for _, c := range chunksToPut { if err := s.store.ReservePutter().Put(ctx, c); err != nil { - // in case of these errors, no new items are added to the storage, so it - // is safe to continue with the next chunk if errors.Is(err, storage.ErrOverwriteNewerChunk) { s.logger.Debug("overwrite newer chunk", "error", err, "peer_address", peer, "chunk", c) - chunkErr = errors.Join(chunkErr, err) - // ErrOverwriteNewerChunk is not a validation failure: the chunk is - // already present in the reserve with a newer stamp. It is safe to - // advance the interval past it. + overwriteErr = errors.Join(overwriteErr, err) continue } - return 0, 0, errors.Join(chunkErr, err) + return 0, 0, errors.Join(chunkErr, overwriteErr, err) } chunksPut++ } } - // Zero topmost when any chunk failed validation (stamp, solicitation, - // or structural check). This prevents the caller from advancing its - // interval past BinIDs whose chunks were never stored. - // ErrOverwriteNewerChunk does not zero topmost: the chunk is already - // present in the reserve with a newer stamp, so advancement is safe. - if hasValidationError { + if chunkErr != nil { topmost = 0 } - return topmost, chunksPut, chunkErr + return topmost, chunksPut, errors.Join(chunkErr, overwriteErr) } // makeOffer tries to assemble an offer for a given requested interval. From e0c56cc4b2c2b00189e149451c92fd5f0c10bcbc Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Sun, 5 Apr 2026 18:04:32 +0300 Subject: [PATCH 05/10] fix(pullsync): return empty offer at gap boundary to prevent interval over-advancement When SubscribeBin returns a first chunk whose BinID is beyond the requested start, the server has no chunks at [start, firstBinID-1]. Previously the server included the chunk in the offer with Topmost=firstBinID, causing the client to advance its interval across BinIDs it never received. collectAddrs now detects this case: when the first chunk is within the historical range (BinID <= historicalCursor) and its BinID is beyond the requested start, the server returns an empty offer with Topmost=firstBinID-1. The client advances its interval to the gap boundary only, then issues a new Get from firstBinID and receives the chunk cleanly on the next round. The check is skipped for start=0 (the BinID namespace starts at 1; the puller always uses start>=1 in practice, but test helpers may pass 0). A new test, TestSync_HistoricalGapReturnsEmptyOfferAtBoundary, covers the gap-boundary case directly. --- pkg/pullsync/pullsync.go | 11 +++++++++ pkg/pullsync/pullsync_test.go | 43 +++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 658e48ad2bf..c3c7731d756 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -484,6 +484,17 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* break LOOP // The stream has been closed. } + // If the first chunk the server offers has a BinID beyond start + // and within the historical range, the server has no chunks at + // [start, c.BinID-1]. Return an empty offer with Topmost set to + // the gap boundary so the client advances its interval past only + // the BinIDs that genuinely do not exist on this server, then + // retries from c.BinID on the next round. + if len(chs) == 0 && start > 0 && c.BinID > start && c.BinID <= historicalCursor { + topmost = c.BinID - 1 + break LOOP + } + chs = append(chs, &storer.BinC{Address: c.Address, BatchID: c.BatchID, StampHash: c.StampHash}) if c.BinID > topmost { topmost = c.BinID diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 61f78f37f28..480785ea8a6 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -397,6 +397,49 @@ func TestSync_LiveChunkTopCappedAtCursor(t *testing.T) { }) } +// TestSync_HistoricalGapReturnsEmptyOfferAtBoundary verifies that when the +// server's first available chunk has a BinID beyond the requested start, the +// server returns an empty offer with Topmost set to firstBinID-1. This lets +// the client advance its interval to the gap boundary without silently marking +// BinIDs that may exist on other peers as synced. +func TestSync_HistoricalGapReturnsEmptyOfferAtBoundary(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + chunk := testingc.GenerateTestRandomChunk() + stampHash, err := chunk.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + // Server holds one chunk at BinID 5; start=2 creates a gap at [2,4]. + const firstBinID = uint64(5) + const cursor = uint64(5) + result := []*storer.BinC{{ + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + BinID: firstBinID, + StampHash: stampHash, + }} + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(result, nil), mock.WithChunks(chunk), mock.WithCursors([]uint64{cursor}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 2) + if err != nil { + t.Fatal(err) + } + // Empty offer: gap boundary is firstBinID-1. + if topmost != firstBinID-1 { + t.Fatalf("topmost: got %d, want %d (gap boundary)", topmost, firstBinID-1) + } + if count != 0 { + t.Fatalf("count: got %d, want 0 (no chunks in gap)", count) + } + }) +} + // TestSync_OverwriteNewerChunkDoesNotBlockInterval verifies that // ErrOverwriteNewerChunk does not prevent interval advancement. // The chunk is already present in the reserve with a newer stamp, so it is From 4fd11e0d1dee26d6be1eda625ba0fee0fa67460e Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Sun, 5 Apr 2026 18:21:48 +0300 Subject: [PATCH 06/10] fix(pullsync): cap Topmost at contiguous frontier to prevent mid-offer gap advancement collectAddrs tracks the highest BinID reachable from start without a gap (contiguousEnd). When the server's store has internal gaps (e.g. BinIDs {3,7,11} with start=3), all matching chunks are still included in the offer for eager client-side storage, but Topmost is capped at contiguousEnd (3) so the client's sync interval does not advance past missing BinIDs. The cap only fires when contiguousEnd >= start (at least one historical chunk extended the frontier). This avoids capping when the offer contains only live chunks (BinID > historicalCursor), where topmost is already bounded by the existing historicalCursor cap. Adds TestSync_MidOfferGapCapsTopmostAtContiguousFrontier to cover this case. --- pkg/pullsync/pullsync.go | 29 ++++++++++++++++++---- pkg/pullsync/pullsync_test.go | 45 +++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index c3c7731d756..e2abc3842ea 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -461,11 +461,15 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* } var ( - chs []*storer.BinC - topmost uint64 - timer *time.Timer - timerC <-chan time.Time + chs []*storer.BinC + topmost uint64 + timer *time.Timer + timerC <-chan time.Time + contiguousEnd uint64 // highest BinID contiguous from start; only meaningful when start > 0 ) + if start > 0 { + contiguousEnd = start - 1 + } chC, unsub, errC := s.store.SubscribeBin(ctx, bin, start) defer func() { unsub() @@ -495,6 +499,13 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* break LOOP } + // Track the contiguous frontier: the highest BinID reachable + // from start without a gap. Used after the loop to cap Topmost + // so the client's interval does not advance past missing BinIDs. + if c.BinID == contiguousEnd+1 { + contiguousEnd = c.BinID + } + chs = append(chs, &storer.BinC{Address: c.Address, BatchID: c.BatchID, StampHash: c.StampHash}) if c.BinID > topmost { topmost = c.BinID @@ -527,6 +538,16 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* topmost = historicalCursor } + // Cap topmost at the contiguous frontier. All collected chunks are + // included in the offer for eager client-side storage, but Topmost is + // bounded to the highest BinID reachable from start without a gap. + // The client stores every chunk in one round trip; subsequent round + // trips advance the interval bookkeeping via leading-gap empty offers + // and empty Want bitvectors — no chunk data is retransmitted. + if start > 0 && len(chs) > 0 && contiguousEnd >= start && contiguousEnd < topmost { + topmost = contiguousEnd + } + return &collectAddrsResult{chs: chs, topmost: topmost}, nil }) if err != nil { diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 480785ea8a6..9c0d250ed24 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -440,6 +440,51 @@ func TestSync_HistoricalGapReturnsEmptyOfferAtBoundary(t *testing.T) { }) } +// TestSync_MidOfferGapCapsTopmostAtContiguousFrontier verifies that when the +// server's offer contains an internal gap (chunks at BinIDs {3,7,11} with +// start=3), Topmost is capped at the contiguous frontier (3) so the client's +// interval does not advance past the gap. All chunks are still included in +// the offer for eager storage — no chunk data is retransmitted on later rounds. +func TestSync_MidOfferGapCapsTopmostAtContiguousFrontier(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ch1 := testingc.GenerateTestRandomChunk() + ch2 := testingc.GenerateTestRandomChunk() + ch3 := testingc.GenerateTestRandomChunk() + + makeResult := func(c swarm.Chunk, binID uint64) *storer.BinC { + h, err := c.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + return &storer.BinC{Address: c.Address(), BatchID: c.Stamp().BatchID(), BinID: binID, StampHash: h} + } + + // BinIDs: 3, 7, 11 — gaps at [4,6] and [8,10]. + results := []*storer.BinC{makeResult(ch1, 3), makeResult(ch2, 7), makeResult(ch3, 11)} + const cursor = uint64(11) + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(results, nil), mock.WithChunks(ch1, ch2, ch3), mock.WithCursors([]uint64{cursor}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, db = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 3) + if err != nil { + t.Fatal(err) + } + // Topmost must be the contiguous frontier (3), not the max BinID (11). + if topmost != 3 { + t.Fatalf("topmost: got %d, want 3 (contiguous frontier)", topmost) + } + // All three chunks must be delivered and stored in this single round trip. + if count != 3 { + t.Fatalf("count: got %d, want 3 (all chunks delivered eagerly)", count) + } + haveChunks(t, db, ch1, ch2, ch3) + }) +} + // TestSync_OverwriteNewerChunkDoesNotBlockInterval verifies that // ErrOverwriteNewerChunk does not prevent interval advancement. // The chunk is already present in the reserve with a newer stamp, so it is From d5b6341746421eda0a5c06eee8f2730af1c69325 Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Sun, 5 Apr 2026 21:52:39 +0300 Subject: [PATCH 07/10] chore(pullsync): remove cosmetic regressions Commits 74294965 and 4f9650f2 introduced four style regressions: - ErrUnsolicitedChunk wrapped in a single-element var () block; unwrap to a plain var declaration matching the rest of the package. - Blank lines after the opening brace of New(), handler(), Sync(), and makeOffer(); remove to match bee's conventional style. Also note: commit 4fd11e0d silently added s.logger.Debug("batch timeout timer triggered") to the collectAddrs timer case; that addition is intentional and remains. No behaviour change. --- pkg/pullsync/pullsync.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index e2abc3842ea..35fe5c61ad5 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -41,9 +41,7 @@ const ( cursorStreamName = "cursors" ) -var ( - ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk") -) +var ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk") const ( MaxCursor = math.MaxUint64 @@ -98,7 +96,6 @@ func New( logger log.Logger, maxPage uint64, ) *Syncer { - return &Syncer{ streamer: streamer, store: store, @@ -134,7 +131,6 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec { // handler handles an incoming request to sync an interval func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { - select { case <-s.quit: return nil @@ -238,7 +234,6 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea // ErrOverwriteNewerChunk does not zero topmost: the chunk is already present. // count is the number of chunks successfully written to the reserve. func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { - stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) if err != nil { return 0, 0, fmt.Errorf("new stream: %w", err) @@ -421,7 +416,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start // makeOffer tries to assemble an offer for a given requested interval. func (s *Syncer) makeOffer(ctx context.Context, rn pb.Get) (*pb.Offer, error) { - addrs, top, err := s.collectAddrs(ctx, uint8(rn.Bin), rn.Start) if err != nil { return nil, err From 1a887407672dce2cbe1374568ceb710006582305 Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Sun, 5 Apr 2026 22:34:34 +0300 Subject: [PATCH 08/10] fix(pullsync): document cursor boundary invariants and add tests --- pkg/pullsync/pullsync.go | 10 +++++ pkg/pullsync/pullsync_test.go | 81 +++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 35fe5c61ad5..3351402adcf 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -453,6 +453,11 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* if int(bin) < len(cursors) { historicalCursor = cursors[bin] } + // If bin is beyond the cursors slice (should not happen with a + // well-behaved storer that always returns swarm.MaxBins entries), + // historicalCursor stays 0. The cursor cap below then sets + // topmost=0 for any non-empty offer, so the client never advances + // its interval — a safe stall until the storer is consistent again. var ( chs []*storer.BinC @@ -538,6 +543,11 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* // The client stores every chunk in one round trip; subsequent round // trips advance the interval bookkeeping via leading-gap empty offers // and empty Want bitvectors — no chunk data is retransmitted. + // The start > 0 guard matches the leading-gap check above: BinIDs + // start at 1 in production and the puller always passes start >= 1. + // For start=0 (used only in test helpers) contiguousEnd is ambiguous — + // uint64 underflow prevents the start-1 initialisation — so the cap + // is skipped and topmost is left at the historical cursor. if start > 0 && len(chs) > 0 && contiguousEnd >= start && contiguousEnd < topmost { topmost = contiguousEnd } diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 9c0d250ed24..45e740f4e74 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -526,6 +526,87 @@ func TestSync_OverwriteNewerChunkDoesNotBlockInterval(t *testing.T) { }) } +// TestSync_Start0_SkipsGapDetection confirms that when start=0 the leading-gap +// check and the contiguous-frontier cap are both skipped (both are guarded by +// start > 0). Even though the server's first chunk is at BinID=5, the offer is +// non-empty and topmost equals the historical cursor. +func TestSync_Start0_SkipsGapDetection(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ch := testingc.GenerateTestRandomChunk() + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + const binID = uint64(5) + result := []*storer.BinC{{ + Address: ch.Address(), + BatchID: ch.Stamp().BatchID(), + BinID: binID, + StampHash: stampHash, + }} + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(result, nil), mock.WithChunks(ch), mock.WithCursors([]uint64{binID}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + if err != nil { + t.Fatal(err) + } + // Neither gap check fires for start=0; topmost must equal the cursor. + if topmost != binID { + t.Fatalf("topmost: got %d, want %d (gap detection must be skipped for start=0)", topmost, binID) + } + if count != 1 { + t.Fatalf("count: got %d, want 1", count) + } + }) +} + +// TestSync_BinBeyondCursors_StallsInterval verifies the safe-stall behaviour +// when bin is beyond the cursors slice returned by ReserveLastBinIDs. +// historicalCursor defaults to 0, so the cursor cap sets topmost=0 for any +// non-empty offer. The client stores the chunk but does not advance the +// interval (top=0 < start=1 in the puller guard). +func TestSync_BinBeyondCursors_StallsInterval(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ch := testingc.GenerateTestRandomChunk() + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + result := []*storer.BinC{{ + Address: ch.Address(), + BatchID: ch.Stamp().BatchID(), + BinID: 1, + StampHash: stampHash, + }} + + // Cursors slice has only one entry (bin=0). Syncing bin=1 leaves + // historicalCursor=0, triggering the safe-stall path. + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(result, nil), mock.WithChunks(ch), mock.WithCursors([]uint64{10}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 1, 1) + if err != nil { + t.Fatal(err) + } + // historicalCursor=0 for bin=1 → cursor cap fires → topmost=0. + if topmost != 0 { + t.Fatalf("topmost: got %d, want 0 (bin beyond cursors must stall interval)", topmost) + } + // The chunk is still stored; only interval advancement is suppressed. + if count != 1 { + t.Fatalf("count: got %d, want 1 (chunk must be stored despite topmost=0)", count) + } + }) +} + func haveChunks(t *testing.T, s *mock.ReserveStore, chunks ...swarm.Chunk) { t.Helper() for _, c := range chunks { From 1051de6e7a2d89fa917e04a4d93a17a0a53ce25b Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Sun, 5 Apr 2026 22:35:06 +0300 Subject: [PATCH 09/10] chore(pullsync): use upstream/downstream peer terminology in comments --- pkg/pullsync/pullsync.go | 74 +++++++++++++++++------------------ pkg/pullsync/pullsync_test.go | 26 ++++++------ 2 files changed, 50 insertions(+), 50 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 3351402adcf..4006bbd0ce2 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -56,12 +56,12 @@ type Interface interface { // Sync syncs a batch of chunks starting at a start BinID. // It returns the topmost BinID safe for interval advancement and the total // number of chunks successfully stored. - // topmost equals offer.Topmost (capped at the server's historical cursor) - // when all delivered chunks passed validation. topmost is 0 when any chunk - // failed validation (invalid stamp, unsolicited, or structural error), so - // callers that use topmost to advance their interval will not skip - // unverified BinIDs. ErrOverwriteNewerChunk does not zero topmost because - // the chunk is already present in the reserve. + // topmost equals offer.Topmost (capped at the downstream peer's historical + // cursor) when all delivered chunks passed validation. topmost is 0 when + // any chunk failed validation (invalid stamp, unsolicited, or structural + // error), so callers that use topmost to advance their interval will not + // skip unverified BinIDs. ErrOverwriteNewerChunk does not zero topmost + // because the chunk is already present in the reserve. Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) // GetCursors retrieves all cursors from a downstream peer. GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, uint64, error) @@ -227,10 +227,10 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea } // Sync syncs a batch of chunks starting at a start BinID. -// topmost equals offer.Topmost capped at the server's historical cursor -// (see collectAddrs). topmost is 0 when any delivered chunk failed validation -// (invalid stamp, unsolicited, or structural error), preventing the caller -// from silently skipping unverified BinIDs. +// topmost equals offer.Topmost capped at the downstream peer's historical +// cursor (see collectAddrs). topmost is 0 when any delivered chunk failed +// validation (invalid stamp, unsolicited, or structural error), preventing +// the caller from silently skipping unverified BinIDs. // ErrOverwriteNewerChunk does not zero topmost: the chunk is already present. // count is the number of chunks successfully written to the reserve. func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { @@ -259,7 +259,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start } // empty interval: no chunks in range. - // return the peer's watermark so the caller can advance the interval. + // return offer.Topmost so the caller can advance the interval. if len(offer.Chunks) == 0 { return offer.Topmost, 0, nil } @@ -324,7 +324,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start // present in the reserve, so advancing the interval past it is correct. // Both are joined on return so callers can inspect individual errors via errors.Is. var ( - chunkErr error + chunkErr error overwriteErr error ) for ; ctr > 0; ctr-- { @@ -441,10 +441,10 @@ type collectAddrsResult struct { // after which the function returns the collected slice of chunks. func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*storer.BinC, uint64, error) { v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (*collectAddrsResult, error) { - // Snapshot the server's historical cursor for this bin before subscribing. - // Any chunk with BinID beyond this cursor arrived live after the snapshot; - // capping topmost at the cursor prevents live chunks from inflating the - // interval the client records for this peer. + // Snapshot the downstream peer's historical cursor for this bin before + // subscribing. Any chunk with BinID beyond this cursor arrived live after + // the snapshot; capping topmost at the cursor prevents live chunks from + // inflating the interval the upstream peer records for this peer. cursors, _, err := s.store.ReserveLastBinIDs() if err != nil { return nil, fmt.Errorf("reserve last bin IDs: %w", err) @@ -456,14 +456,14 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* // If bin is beyond the cursors slice (should not happen with a // well-behaved storer that always returns swarm.MaxBins entries), // historicalCursor stays 0. The cursor cap below then sets - // topmost=0 for any non-empty offer, so the client never advances - // its interval — a safe stall until the storer is consistent again. + // topmost=0 for any non-empty offer, so the upstream peer never + // advances its interval — a safe stall until the storer is consistent again. var ( - chs []*storer.BinC - topmost uint64 - timer *time.Timer - timerC <-chan time.Time + chs []*storer.BinC + topmost uint64 + timer *time.Timer + timerC <-chan time.Time contiguousEnd uint64 // highest BinID contiguous from start; only meaningful when start > 0 ) if start > 0 { @@ -487,20 +487,20 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* break LOOP // The stream has been closed. } - // If the first chunk the server offers has a BinID beyond start - // and within the historical range, the server has no chunks at + // If the first chunk the downstream peer offers has a BinID beyond + // start and within the historical range, it has no chunks at // [start, c.BinID-1]. Return an empty offer with Topmost set to - // the gap boundary so the client advances its interval past only - // the BinIDs that genuinely do not exist on this server, then - // retries from c.BinID on the next round. + // the gap boundary so the upstream peer advances its interval past + // only the BinIDs that genuinely do not exist here, then retries + // from c.BinID on the next round. if len(chs) == 0 && start > 0 && c.BinID > start && c.BinID <= historicalCursor { topmost = c.BinID - 1 break LOOP } - // Track the contiguous frontier: the highest BinID reachable + // Track the contiguous Topmost: the highest BinID reachable // from start without a gap. Used after the loop to cap Topmost - // so the client's interval does not advance past missing BinIDs. + // so the upstream peer's interval does not advance past missing BinIDs. if c.BinID == contiguousEnd+1 { contiguousEnd = c.BinID } @@ -531,18 +531,18 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* } // Cap topmost at the historical cursor. Live chunks (BinID > historicalCursor) - // are included in the offer so the client can store them, but Topmost must - // not advance the client's interval past the server's historical frontier. + // are included in the offer so the upstream peer can store them, but Topmost + // must not advance the upstream peer's interval past the historical cursor. if topmost > historicalCursor { topmost = historicalCursor } - // Cap topmost at the contiguous frontier. All collected chunks are - // included in the offer for eager client-side storage, but Topmost is - // bounded to the highest BinID reachable from start without a gap. - // The client stores every chunk in one round trip; subsequent round - // trips advance the interval bookkeeping via leading-gap empty offers - // and empty Want bitvectors — no chunk data is retransmitted. + // Cap topmost at the contiguous Topmost. All collected chunks are + // included in the offer for eager storage by the upstream peer, but + // Topmost is bounded to the highest BinID reachable from start without + // a gap. The upstream peer stores every chunk in one round trip; + // subsequent round trips advance interval bookkeeping via leading-gap + // empty offers and empty Want bitvectors — no chunk data is retransmitted. // The start > 0 guard matches the leading-gap check above: BinIDs // start at 1 in production and the puller always passes start >= 1. // For start=0 (used only in test helpers) contiguousEnd is ambiguous — diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 45e740f4e74..5fc9c2e8e5c 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -359,9 +359,9 @@ func TestSync_StampFailure_TopmostIsZero(t *testing.T) { } // TestSync_LiveChunkTopCappedAtCursor verifies that a live chunk with a BinID -// far beyond the server's historical cursor does not inflate offer.Topmost. -// Without the server-side cap, the client would advance its interval to the -// live chunk's BinID, permanently skipping the historical range in between. +// far beyond the downstream peer's historical cursor does not inflate offer.Topmost. +// Without the cap, the upstream peer would advance its interval to the live +// chunk's BinID, permanently skipping the historical range in between. func TestSync_LiveChunkTopCappedAtCursor(t *testing.T) { synctest.Test(t, func(t *testing.T) { liveChunk := testingc.GenerateTestRandomChunk() @@ -371,7 +371,7 @@ func TestSync_LiveChunkTopCappedAtCursor(t *testing.T) { } // The subscribe response contains only the live chunk at a high BinID. - // The server's historical cursor is set much lower. + // The downstream peer's historical cursor is set much lower. const liveBinID = uint64(100) const historicalCursor = uint64(10) liveResult := []*storer.BinC{{ @@ -440,12 +440,12 @@ func TestSync_HistoricalGapReturnsEmptyOfferAtBoundary(t *testing.T) { }) } -// TestSync_MidOfferGapCapsTopmostAtContiguousFrontier verifies that when the +// TestSync_MidOfferGapCapsAtContiguousTopmost verifies that when the // server's offer contains an internal gap (chunks at BinIDs {3,7,11} with -// start=3), Topmost is capped at the contiguous frontier (3) so the client's +// start=3), Topmost is capped at the contiguous Topmost (3) so the client's // interval does not advance past the gap. All chunks are still included in // the offer for eager storage — no chunk data is retransmitted on later rounds. -func TestSync_MidOfferGapCapsTopmostAtContiguousFrontier(t *testing.T) { +func TestSync_MidOfferGapCapsAtContiguousTopmost(t *testing.T) { synctest.Test(t, func(t *testing.T) { ch1 := testingc.GenerateTestRandomChunk() ch2 := testingc.GenerateTestRandomChunk() @@ -464,18 +464,18 @@ func TestSync_MidOfferGapCapsTopmostAtContiguousFrontier(t *testing.T) { const cursor = uint64(11) var ( - ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(results, nil), mock.WithChunks(ch1, ch2, ch3), mock.WithCursors([]uint64{cursor}, 0)) - recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) - psClient, db = newPullSync(t, recorder, 0) + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(results, nil), mock.WithChunks(ch1, ch2, ch3), mock.WithCursors([]uint64{cursor}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, db = newPullSync(t, recorder, 0) ) topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 3) if err != nil { t.Fatal(err) } - // Topmost must be the contiguous frontier (3), not the max BinID (11). + // Topmost must be the contiguous Topmost (3), not the max BinID (11). if topmost != 3 { - t.Fatalf("topmost: got %d, want 3 (contiguous frontier)", topmost) + t.Fatalf("topmost: got %d, want 3 (contiguous Topmost)", topmost) } // All three chunks must be delivered and stored in this single round trip. if count != 3 { @@ -527,7 +527,7 @@ func TestSync_OverwriteNewerChunkDoesNotBlockInterval(t *testing.T) { } // TestSync_Start0_SkipsGapDetection confirms that when start=0 the leading-gap -// check and the contiguous-frontier cap are both skipped (both are guarded by +// check and the contiguous-Topmost cap are both skipped (both are guarded by // start > 0). Even though the server's first chunk is at BinID=5, the offer is // non-empty and topmost equals the historical cursor. func TestSync_Start0_SkipsGapDetection(t *testing.T) { From 36d909dd7dd958a7238e48e2ea364948d14cbc1b Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Mon, 6 Apr 2026 00:02:12 +0300 Subject: [PATCH 10/10] fix(pullsync): skip undeliverable chunks to preserve liveness Per-chunk validation failures (invalid stamp, unsolicited chunk, unknown or expired batch, invalid CAC/SOC structure) are now logged at debug level and skipped. They no longer affect topmost or the error return. Previously chunkErr accumulated these failures and zeroed topmost, stalling interval advancement indefinitely. Since each failure is a permanent property of the chunk data, retrying the same BinID yields the same result. With multi-peer historical sync each peer's interval is tracked independently, so a chunk one peer cannot deliver will be covered by another. Stream-level errors (read failure, non-overwrite put error) still abort the sync and are returned to the caller unchanged. --- pkg/puller/puller.go | 2 - pkg/pullsync/pullsync.go | 49 +++++----------- pkg/pullsync/pullsync_test.go | 102 ++-------------------------------- 3 files changed, 21 insertions(+), 132 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 29a25ad783a..e42cf61765e 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -360,8 +360,6 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) } - // top is 0 on validation error (see pullsync.Sync), so this check - // prevents interval advancement when chunks were not successfully stored. if top >= start { if err := p.addPeerInterval(address, bin, start, top); err != nil { p.metrics.SyncWorkerErrCounter.Inc() diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 4006bbd0ce2..0c05bf08bf6 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -56,12 +56,11 @@ type Interface interface { // Sync syncs a batch of chunks starting at a start BinID. // It returns the topmost BinID safe for interval advancement and the total // number of chunks successfully stored. - // topmost equals offer.Topmost (capped at the downstream peer's historical - // cursor) when all delivered chunks passed validation. topmost is 0 when - // any chunk failed validation (invalid stamp, unsolicited, or structural - // error), so callers that use topmost to advance their interval will not - // skip unverified BinIDs. ErrOverwriteNewerChunk does not zero topmost - // because the chunk is already present in the reserve. + // topmost equals offer.Topmost capped at the downstream peer's historical + // cursor (see collectAddrs). Per-chunk errors (invalid stamp, unsolicited + // chunk, unknown batch, invalid structure) are logged and skipped; they do + // not affect topmost or the return error. err is non-nil only for + // stream-level failures that abort the entire sync. Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) // GetCursors retrieves all cursors from a downstream peer. GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, uint64, error) @@ -228,10 +227,10 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea // Sync syncs a batch of chunks starting at a start BinID. // topmost equals offer.Topmost capped at the downstream peer's historical -// cursor (see collectAddrs). topmost is 0 when any delivered chunk failed -// validation (invalid stamp, unsolicited, or structural error), preventing -// the caller from silently skipping unverified BinIDs. -// ErrOverwriteNewerChunk does not zero topmost: the chunk is already present. +// cursor (see collectAddrs). Per-chunk errors (invalid stamp, unsolicited +// chunk, unknown batch, invalid structure) are logged and skipped; they do +// not affect topmost or the return error. err is non-nil only for +// stream-level failures that abort the entire sync. // count is the number of chunks successfully written to the reserve. func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) @@ -317,20 +316,10 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start chunksToPut := make([]swarm.Chunk, 0, ctr) - // chunkErr accumulates validation failures (stamp, solicitation, structural). - // A non-nil chunkErr zeros topmost, preventing interval advancement past - // BinIDs whose chunks were never stored. - // overwriteErr accumulates ErrOverwriteNewerChunk: the chunk is already - // present in the reserve, so advancing the interval past it is correct. - // Both are joined on return so callers can inspect individual errors via errors.Is. - var ( - chunkErr error - overwriteErr error - ) for ; ctr > 0; ctr-- { var delivery pb.Delivery if err = r.ReadMsgWithContext(ctx, &delivery); err != nil { - return 0, 0, errors.Join(chunkErr, overwriteErr, fmt.Errorf("read delivery: %w", err)) + return 0, 0, fmt.Errorf("read delivery: %w", err) } addr := swarm.NewAddress(delivery.Address) @@ -344,19 +333,18 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start stamp := new(postage.Stamp) if err = stamp.UnmarshalBinary(delivery.Stamp); err != nil { - chunkErr = errors.Join(chunkErr, err) + s.logger.Debug("invalid stamp", "error", err, "peer_address", peer, "chunk_address", addr) continue } stampHash, err := stamp.Hash() if err != nil { - chunkErr = errors.Join(chunkErr, err) + s.logger.Debug("invalid stamp hash", "error", err, "peer_address", peer, "chunk_address", addr) continue } wantChunkID := addr.ByteString() + string(stamp.BatchID()) + string(stampHash) if _, ok := wantChunks[wantChunkID]; !ok { s.logger.Debug("want chunks", "error", ErrUnsolicitedChunk, "peer_address", peer, "chunk_address", addr) - chunkErr = errors.Join(chunkErr, ErrUnsolicitedChunk) continue } @@ -365,7 +353,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start chunk, err := s.validStamp(newChunk.WithStamp(stamp)) if err != nil { s.logger.Debug("unverified stamp", "error", err, "peer_address", peer, "chunk_address", newChunk) - chunkErr = errors.Join(chunkErr, err) continue } @@ -374,14 +361,13 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start } else if chunk, err := soc.FromChunk(chunk); err == nil { addr, err := chunk.Address() if err != nil { - chunkErr = errors.Join(chunkErr, err) + s.logger.Debug("invalid soc address", "error", err, "peer_address", peer) continue } s.logger.Debug("sync gsoc", "peer_address", peer, "chunk_address", addr, "wrapped_chunk_address", chunk.WrappedChunk().Address()) s.gsocHandler(chunk) } else { s.logger.Debug("invalid cac/soc chunk", "error", swarm.ErrInvalidChunk, "peer_address", peer, "chunk", chunk) - chunkErr = errors.Join(chunkErr, swarm.ErrInvalidChunk) s.metrics.ReceivedInvalidChunk.Inc() continue } @@ -398,20 +384,15 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start if err := s.store.ReservePutter().Put(ctx, c); err != nil { if errors.Is(err, storage.ErrOverwriteNewerChunk) { s.logger.Debug("overwrite newer chunk", "error", err, "peer_address", peer, "chunk", c) - overwriteErr = errors.Join(overwriteErr, err) continue } - return 0, 0, errors.Join(chunkErr, overwriteErr, err) + return 0, 0, err } chunksPut++ } } - if chunkErr != nil { - topmost = 0 - } - - return topmost, chunksPut, errors.Join(chunkErr, overwriteErr) + return topmost, chunksPut, nil } // makeOffer tries to assemble an offer for a given requested interval. diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 5fc9c2e8e5c..1d8e477aacf 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -196,20 +196,16 @@ func TestIncoming_WantErrors(t *testing.T) { ) topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) - for _, e := range []error{storage.ErrOverwriteNewerChunk, validStampErr, swarm.ErrInvalidChunk} { - if !errors.Is(err, e) { - t.Fatalf("expected error %v", err) - } + if err != nil { + t.Fatal(err) } if count != 3 { t.Fatalf("got %d chunks but want %d", count, 3) } - // topmost must be 0: validation errors (validStampErr, ErrInvalidChunk) zero - // topmost so the caller cannot advance its interval past unverified BinIDs. - if topmost != 0 { - t.Fatalf("got topmost %d but want 0 (validation errors must zero topmost)", topmost) + if topmost != topMost { + t.Fatalf("got offer topmost %d but want %d", topmost, topMost) } haveChunks(t, clientDb, append(tChunks[:1], tChunks[3:5]...)...) @@ -233,8 +229,8 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { ) _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) - if !errors.Is(err, pullsync.ErrUnsolicitedChunk) { - t.Fatalf("expected err %v but got %v", pullsync.ErrUnsolicitedChunk, err) + if err != nil { + t.Fatal(err) } }) } @@ -313,51 +309,6 @@ func TestGetCursorsError(t *testing.T) { }) } -func TestSync_StampFailure_TopmostIsZero(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - tChunks := testingc.GenerateTestRandomChunks(3) - - tResults := make([]*storer.BinC, len(tChunks)) - for i, c := range tChunks { - stampHash, err := c.Stamp().Hash() - if err != nil { - t.Fatal(err) - } - tResults[i] = &storer.BinC{ - Address: c.Address(), - BatchID: c.Stamp().BatchID(), - BinID: uint64(i + 1), - StampHash: stampHash, - } - } - - stampErr := errors.New("stamp validation error") - validStamp := func(c swarm.Chunk) (swarm.Chunk, error) { - return nil, stampErr - } - - cursor := uint64(len(tChunks)) // max BinID equals number of chunks - - var ( - ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...), mock.WithCursors([]uint64{cursor}, 0)) - recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) - psClient, _ = newPullSyncWithStamperValidator(t, recorder, 0, validStamp) - ) - - topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) - if !errors.Is(err, stampErr) { - t.Fatalf("expected stamp error but got %v", err) - } - // topmost must be 0: stamp validation failure must prevent interval advancement - if topmost != 0 { - t.Fatalf("expected topmost=0 on stamp failure but got %d", topmost) - } - if count != 0 { - t.Fatalf("expected count=0 on stamp failure but got %d", count) - } - }) -} - // TestSync_LiveChunkTopCappedAtCursor verifies that a live chunk with a BinID // far beyond the downstream peer's historical cursor does not inflate offer.Topmost. // Without the cap, the upstream peer would advance its interval to the live @@ -485,47 +436,6 @@ func TestSync_MidOfferGapCapsAtContiguousTopmost(t *testing.T) { }) } -// TestSync_OverwriteNewerChunkDoesNotBlockInterval verifies that -// ErrOverwriteNewerChunk does not prevent interval advancement. -// The chunk is already present in the reserve with a newer stamp, so it is -// safe to advance past it even though the put returned an error. -func TestSync_OverwriteNewerChunkDoesNotBlockInterval(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - tChunk := testingc.GenerateTestRandomChunk() - stampHash, err := tChunk.Stamp().Hash() - if err != nil { - t.Fatal(err) - } - tResult := []*storer.BinC{{ - Address: tChunk.Address(), - BatchID: tChunk.Stamp().BatchID(), - BinID: 1, - StampHash: stampHash, - }} - - putHook := func(swarm.Chunk) error { return storage.ErrOverwriteNewerChunk } - - var ( - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(tResult, nil), mock.WithChunks(tChunk), mock.WithCursors([]uint64{1}, 0)) - recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) - psClient, _ = newPullSyncWithStamperValidator(t, recorder, 0, func(c swarm.Chunk) (swarm.Chunk, error) { return c, nil }, mock.WithPutHook(putHook)) - ) - - topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 1) - if !errors.Is(err, storage.ErrOverwriteNewerChunk) { - t.Fatalf("expected ErrOverwriteNewerChunk but got %v", err) - } - // ErrOverwriteNewerChunk must not zero topmost: the chunk is already in - // the reserve with a newer stamp, so interval advancement is safe. - if topmost != 1 { - t.Fatalf("topmost: got %d, want 1 (ErrOverwriteNewerChunk must not block interval)", topmost) - } - if count != 0 { - t.Fatalf("count: got %d, want 0 (chunk not stored due to overwrite)", count) - } - }) -} - // TestSync_Start0_SkipsGapDetection confirms that when start=0 the leading-gap // check and the contiguous-Topmost cap are both skipped (both are guarded by // start > 0). Even though the server's first chunk is at BinID=5, the offer is