diff --git a/go.mod b/go.mod index cf602914109..3e474dd7899 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/caddyserver/certmagic v0.21.6 github.com/coreos/go-semver v0.3.0 github.com/ethereum/go-ethereum v1.15.11 - github.com/ethersphere/batch-archive v0.0.5 + github.com/ethersphere/batch-archive v0.0.6 github.com/ethersphere/go-price-oracle-abi v0.6.9 github.com/ethersphere/go-storage-incentives-abi v0.9.4 github.com/ethersphere/go-sw3-abi v0.6.9 diff --git a/go.sum b/go.sum index fab9e1c2f69..c3653e0463e 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/ethereum/go-ethereum v1.15.11 h1:JK73WKeu0WC0O1eyX+mdQAVHUV+UR1a9VB/d github.com/ethereum/go-ethereum v1.15.11/go.mod h1:mf8YiHIb0GR4x4TipcvBUPxJLw1mFdmxzoDi11sDRoI= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/ethersphere/batch-archive v0.0.5 h1:SM3g7Tuge4KhOn+NKgPcg2Uz2p8a/MLKZvZpmkKCyU4= -github.com/ethersphere/batch-archive v0.0.5/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= +github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC438/FFZdONY= +github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 0bcdcfae9cb..e42cf61765e 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -360,7 +360,6 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) } - // pulled at least one chunk if top >= start { if err := p.addPeerInterval(address, bin, start, top); err != nil { p.metrics.SyncWorkerErrCounter.Inc() diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 29687138b9c..8d04c1b2864 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -500,13 +500,53 @@ func TestContinueSyncing(t *testing.T) { kad.Trigger() err := spinlock.Wait(time.Second, func() bool { - return len(pullsync.SyncCalls(addr)) == 1 + return len(pullsync.SyncCalls(addr)) >= 1 }) if err != nil { t.Fatal(err) } } +// TestIntervalDoesNotAdvanceOnSyncError verifies that a sync error does not cause +// the interval to be advanced: the interval store must remain empty after a sync +// that returns an error, even if offer.Topmost is non-zero. +func TestIntervalDoesNotAdvanceOnSyncError(t *testing.T) { + t.Parallel() + + addr := swarm.RandAddress(t) + + _, s, kad, pullsync := newPuller(t, opts{ + kad: []kadMock.Option{ + kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}), + }, + pullSync: []mockps.Option{ + mockps.WithCursors([]uint64{100}, 0), + mockps.WithSyncError(errors.New("stamp error")), + mockps.WithReplies( + mockps.SyncReply{Start: 1, Topmost: 100, Peer: addr}, + mockps.SyncReply{Start: 1, Topmost: 100, Peer: addr}, + ), + }, + bins: 1, + rs: resMock.NewReserve(resMock.WithRadius(0)), + }) + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + + // wait until at least one sync call was made + err := spinlock.Wait(time.Second, func() bool { + return len(pullsync.SyncCalls(addr)) >= 1 + }) + if err != nil { + t.Fatal(err) + } + + // interval must not have been advanced: it is initialized to [] (no ranges committed) + // and must remain [] since every sync call returned an error + checkIntervals(t, s, addr, "[]", 0) +} + func TestPeerGone(t *testing.T) { t.Parallel() diff --git a/pkg/pullsync/mock/pullsync.go b/pkg/pullsync/mock/pullsync.go index bb77934924f..74b8981e305 100644 --- a/pkg/pullsync/mock/pullsync.go +++ b/pkg/pullsync/mock/pullsync.go @@ -83,7 +83,12 @@ func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, p.replies[id] = p.replies[id][1:] p.syncCalls = append(p.syncCalls, reply) p.mtx.Unlock() - return reply.Topmost, reply.Count, p.syncErr + // mirror real Sync: topmost is 0 on error so the interval is not advanced + top := reply.Topmost + if p.syncErr != nil { + top = 0 + } + return top, reply.Count, p.syncErr } p.mtx.Unlock() <-ctx.Done() diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 436600a2f76..0c05bf08bf6 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -41,9 +41,7 @@ const ( cursorStreamName = "cursors" ) -var ( - ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk") -) +var ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk") const ( MaxCursor = math.MaxUint64 @@ -56,8 +54,13 @@ const ( // Interface is the PullSync interface. type Interface interface { // Sync syncs a batch of chunks starting at a start BinID. - // It returns the BinID of highest chunk that was synced from the given - // batch and the total number of chunks the downstream peer has sent. + // It returns the topmost BinID safe for interval advancement and the total + // number of chunks successfully stored. + // topmost equals offer.Topmost capped at the downstream peer's historical + // cursor (see collectAddrs). Per-chunk errors (invalid stamp, unsolicited + // chunk, unknown batch, invalid structure) are logged and skipped; they do + // not affect topmost or the return error. err is non-nil only for + // stream-level failures that abort the entire sync. Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) // GetCursors retrieves all cursors from a downstream peer. GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, uint64, error) @@ -92,7 +95,6 @@ func New( logger log.Logger, maxPage uint64, ) *Syncer { - return &Syncer{ streamer: streamer, store: store, @@ -128,7 +130,6 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec { // handler handles an incoming request to sync an interval func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { - select { case <-s.quit: return nil @@ -225,10 +226,13 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea } // Sync syncs a batch of chunks starting at a start BinID. -// It returns the BinID of highest chunk that was synced from the given -// batch and the total number of chunks the downstream peer has sent. +// topmost equals offer.Topmost capped at the downstream peer's historical +// cursor (see collectAddrs). Per-chunk errors (invalid stamp, unsolicited +// chunk, unknown batch, invalid structure) are logged and skipped; they do +// not affect topmost or the return error. err is non-nil only for +// stream-level failures that abort the entire sync. +// count is the number of chunks successfully written to the reserve. func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) { - stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) if err != nil { return 0, 0, fmt.Errorf("new stream: %w", err) @@ -253,8 +257,8 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start return 0, 0, fmt.Errorf("read offer: %w", err) } - // empty interval (no chunks present in interval). - // return the end of the requested range as topmost. + // empty interval: no chunks in range. + // return offer.Topmost so the caller can advance the interval. if len(offer.Chunks) == 0 { return offer.Topmost, 0, nil } @@ -312,11 +316,10 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start chunksToPut := make([]swarm.Chunk, 0, ctr) - var chunkErr error for ; ctr > 0; ctr-- { var delivery pb.Delivery if err = r.ReadMsgWithContext(ctx, &delivery); err != nil { - return 0, 0, errors.Join(chunkErr, fmt.Errorf("read delivery: %w", err)) + return 0, 0, fmt.Errorf("read delivery: %w", err) } addr := swarm.NewAddress(delivery.Address) @@ -330,19 +333,18 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start stamp := new(postage.Stamp) if err = stamp.UnmarshalBinary(delivery.Stamp); err != nil { - chunkErr = errors.Join(chunkErr, err) + s.logger.Debug("invalid stamp", "error", err, "peer_address", peer, "chunk_address", addr) continue } stampHash, err := stamp.Hash() if err != nil { - chunkErr = errors.Join(chunkErr, err) + s.logger.Debug("invalid stamp hash", "error", err, "peer_address", peer, "chunk_address", addr) continue } wantChunkID := addr.ByteString() + string(stamp.BatchID()) + string(stampHash) if _, ok := wantChunks[wantChunkID]; !ok { s.logger.Debug("want chunks", "error", ErrUnsolicitedChunk, "peer_address", peer, "chunk_address", addr) - chunkErr = errors.Join(chunkErr, ErrUnsolicitedChunk) continue } @@ -351,7 +353,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start chunk, err := s.validStamp(newChunk.WithStamp(stamp)) if err != nil { s.logger.Debug("unverified stamp", "error", err, "peer_address", peer, "chunk_address", newChunk) - chunkErr = errors.Join(chunkErr, err) continue } @@ -360,14 +361,13 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start } else if chunk, err := soc.FromChunk(chunk); err == nil { addr, err := chunk.Address() if err != nil { - chunkErr = errors.Join(chunkErr, err) + s.logger.Debug("invalid soc address", "error", err, "peer_address", peer) continue } s.logger.Debug("sync gsoc", "peer_address", peer, "chunk_address", addr, "wrapped_chunk_address", chunk.WrappedChunk().Address()) s.gsocHandler(chunk) } else { s.logger.Debug("invalid cac/soc chunk", "error", swarm.ErrInvalidChunk, "peer_address", peer, "chunk", chunk) - chunkErr = errors.Join(chunkErr, swarm.ErrInvalidChunk) s.metrics.ReceivedInvalidChunk.Inc() continue } @@ -382,25 +382,21 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start for _, c := range chunksToPut { if err := s.store.ReservePutter().Put(ctx, c); err != nil { - // in case of these errors, no new items are added to the storage, so it - // is safe to continue with the next chunk if errors.Is(err, storage.ErrOverwriteNewerChunk) { s.logger.Debug("overwrite newer chunk", "error", err, "peer_address", peer, "chunk", c) - chunkErr = errors.Join(chunkErr, err) continue } - return 0, 0, errors.Join(chunkErr, err) + return 0, 0, err } chunksPut++ } } - return topmost, chunksPut, chunkErr + return topmost, chunksPut, nil } // makeOffer tries to assemble an offer for a given requested interval. func (s *Syncer) makeOffer(ctx context.Context, rn pb.Get) (*pb.Offer, error) { - addrs, top, err := s.collectAddrs(ctx, uint8(rn.Bin), rn.Start) if err != nil { return nil, err @@ -426,12 +422,34 @@ type collectAddrsResult struct { // after which the function returns the collected slice of chunks. func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*storer.BinC, uint64, error) { v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (*collectAddrsResult, error) { + // Snapshot the downstream peer's historical cursor for this bin before + // subscribing. Any chunk with BinID beyond this cursor arrived live after + // the snapshot; capping topmost at the cursor prevents live chunks from + // inflating the interval the upstream peer records for this peer. + cursors, _, err := s.store.ReserveLastBinIDs() + if err != nil { + return nil, fmt.Errorf("reserve last bin IDs: %w", err) + } + var historicalCursor uint64 + if int(bin) < len(cursors) { + historicalCursor = cursors[bin] + } + // If bin is beyond the cursors slice (should not happen with a + // well-behaved storer that always returns swarm.MaxBins entries), + // historicalCursor stays 0. The cursor cap below then sets + // topmost=0 for any non-empty offer, so the upstream peer never + // advances its interval — a safe stall until the storer is consistent again. + var ( - chs []*storer.BinC - topmost uint64 - timer *time.Timer - timerC <-chan time.Time + chs []*storer.BinC + topmost uint64 + timer *time.Timer + timerC <-chan time.Time + contiguousEnd uint64 // highest BinID contiguous from start; only meaningful when start > 0 ) + if start > 0 { + contiguousEnd = start - 1 + } chC, unsub, errC := s.store.SubscribeBin(ctx, bin, start) defer func() { unsub() @@ -450,6 +468,24 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* break LOOP // The stream has been closed. } + // If the first chunk the downstream peer offers has a BinID beyond + // start and within the historical range, it has no chunks at + // [start, c.BinID-1]. Return an empty offer with Topmost set to + // the gap boundary so the upstream peer advances its interval past + // only the BinIDs that genuinely do not exist here, then retries + // from c.BinID on the next round. + if len(chs) == 0 && start > 0 && c.BinID > start && c.BinID <= historicalCursor { + topmost = c.BinID - 1 + break LOOP + } + + // Track the contiguous Topmost: the highest BinID reachable + // from start without a gap. Used after the loop to cap Topmost + // so the upstream peer's interval does not advance past missing BinIDs. + if c.BinID == contiguousEnd+1 { + contiguousEnd = c.BinID + } + chs = append(chs, &storer.BinC{Address: c.Address, BatchID: c.BatchID, StampHash: c.StampHash}) if c.BinID > topmost { topmost = c.BinID @@ -475,6 +511,28 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]* } } + // Cap topmost at the historical cursor. Live chunks (BinID > historicalCursor) + // are included in the offer so the upstream peer can store them, but Topmost + // must not advance the upstream peer's interval past the historical cursor. + if topmost > historicalCursor { + topmost = historicalCursor + } + + // Cap topmost at the contiguous Topmost. All collected chunks are + // included in the offer for eager storage by the upstream peer, but + // Topmost is bounded to the highest BinID reachable from start without + // a gap. The upstream peer stores every chunk in one round trip; + // subsequent round trips advance interval bookkeeping via leading-gap + // empty offers and empty Want bitvectors — no chunk data is retransmitted. + // The start > 0 guard matches the leading-gap check above: BinIDs + // start at 1 in production and the puller always passes start >= 1. + // For start=0 (used only in test helpers) contiguousEnd is ambiguous — + // uint64 underflow prevents the start-1 initialisation — so the cap + // is skipped and topmost is left at the historical cursor. + if start > 0 && len(chs) > 0 && contiguousEnd >= start && contiguousEnd < topmost { + topmost = contiguousEnd + } + return &collectAddrsResult{chs: chs, topmost: topmost}, nil }) if err != nil { diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 03d6927c1da..1d8e477aacf 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -62,7 +62,7 @@ func TestIncoming_WantNone(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSync(t, recorder, 0, mock.WithChunks(chunks...)) ) @@ -84,7 +84,7 @@ func TestIncoming_WantNone(t *testing.T) { func TestIncoming_ContextTimeout(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( - ps, _ = newPullSync(t, nil, 0, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 0, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{4}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, _ = newPullSync(t, recorder, 0, mock.WithChunks(chunks...)) ) @@ -102,7 +102,7 @@ func TestIncoming_WantOne(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSync(t, recorder, 0, mock.WithChunks(someChunks(1, 2, 3, 4)...)) ) @@ -128,7 +128,7 @@ func TestIncoming_WantAll(t *testing.T) { synctest.Test(t, func(t *testing.T) { var ( topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSync(t, recorder, 0) ) @@ -190,16 +190,14 @@ func TestIncoming_WantErrors(t *testing.T) { var ( topMost = uint64(10) - ps, _ = newPullSync(t, nil, 20, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...)) + ps, _ = newPullSync(t, nil, 20, mock.WithSubscribeResp(tResults, nil), mock.WithChunks(tChunks...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, clientDb = newPullSyncWithStamperValidator(t, recorder, 0, validStamp, mock.WithPutHook(putHook)) ) topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) - for _, e := range []error{storage.ErrOverwriteNewerChunk, validStampErr, swarm.ErrInvalidChunk} { - if !errors.Is(err, e) { - t.Fatalf("expected error %v", err) - } + if err != nil { + t.Fatal(err) } if count != 3 { @@ -225,14 +223,14 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { evil := swarm.NewChunk(evilAddr, evilData).WithStamp(stamp) var ( - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithEvilChunk(addrs[4], evil)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks(chunks...), mock.WithEvilChunk(addrs[4], evil), mock.WithCursors([]uint64{4}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, _ = newPullSync(t, recorder, 0) ) _, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) - if !errors.Is(err, pullsync.ErrUnsolicitedChunk) { - t.Fatalf("expected err %v but got %v", pullsync.ErrUnsolicitedChunk, err) + if err != nil { + t.Fatal(err) } }) } @@ -242,7 +240,7 @@ func TestMissingChunk(t *testing.T) { var ( zeroChunk = swarm.NewChunk(swarm.ZeroAddress, nil) topMost = uint64(4) - ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks([]swarm.Chunk{zeroChunk}...)) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks([]swarm.Chunk{zeroChunk}...), mock.WithCursors([]uint64{topMost}, 0)) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) psClient, _ = newPullSync(t, recorder, 0) ) @@ -311,6 +309,214 @@ func TestGetCursorsError(t *testing.T) { }) } +// TestSync_LiveChunkTopCappedAtCursor verifies that a live chunk with a BinID +// far beyond the downstream peer's historical cursor does not inflate offer.Topmost. +// Without the cap, the upstream peer would advance its interval to the live +// chunk's BinID, permanently skipping the historical range in between. +func TestSync_LiveChunkTopCappedAtCursor(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + liveChunk := testingc.GenerateTestRandomChunk() + stampHash, err := liveChunk.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + // The subscribe response contains only the live chunk at a high BinID. + // The downstream peer's historical cursor is set much lower. + const liveBinID = uint64(100) + const historicalCursor = uint64(10) + liveResult := []*storer.BinC{{ + Address: liveChunk.Address(), + BatchID: liveChunk.Stamp().BatchID(), + BinID: liveBinID, + StampHash: stampHash, + }} + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(liveResult, nil), mock.WithChunks(liveChunk), mock.WithCursors([]uint64{historicalCursor}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 1) + if err != nil { + t.Fatal(err) + } + if topmost != historicalCursor { + t.Fatalf("topmost: got %d, want %d (live BinID %d must not inflate topmost)", topmost, historicalCursor, liveBinID) + } + }) +} + +// TestSync_HistoricalGapReturnsEmptyOfferAtBoundary verifies that when the +// server's first available chunk has a BinID beyond the requested start, the +// server returns an empty offer with Topmost set to firstBinID-1. This lets +// the client advance its interval to the gap boundary without silently marking +// BinIDs that may exist on other peers as synced. +func TestSync_HistoricalGapReturnsEmptyOfferAtBoundary(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + chunk := testingc.GenerateTestRandomChunk() + stampHash, err := chunk.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + // Server holds one chunk at BinID 5; start=2 creates a gap at [2,4]. + const firstBinID = uint64(5) + const cursor = uint64(5) + result := []*storer.BinC{{ + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + BinID: firstBinID, + StampHash: stampHash, + }} + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(result, nil), mock.WithChunks(chunk), mock.WithCursors([]uint64{cursor}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 2) + if err != nil { + t.Fatal(err) + } + // Empty offer: gap boundary is firstBinID-1. + if topmost != firstBinID-1 { + t.Fatalf("topmost: got %d, want %d (gap boundary)", topmost, firstBinID-1) + } + if count != 0 { + t.Fatalf("count: got %d, want 0 (no chunks in gap)", count) + } + }) +} + +// TestSync_MidOfferGapCapsAtContiguousTopmost verifies that when the +// server's offer contains an internal gap (chunks at BinIDs {3,7,11} with +// start=3), Topmost is capped at the contiguous Topmost (3) so the client's +// interval does not advance past the gap. All chunks are still included in +// the offer for eager storage — no chunk data is retransmitted on later rounds. +func TestSync_MidOfferGapCapsAtContiguousTopmost(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ch1 := testingc.GenerateTestRandomChunk() + ch2 := testingc.GenerateTestRandomChunk() + ch3 := testingc.GenerateTestRandomChunk() + + makeResult := func(c swarm.Chunk, binID uint64) *storer.BinC { + h, err := c.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + return &storer.BinC{Address: c.Address(), BatchID: c.Stamp().BatchID(), BinID: binID, StampHash: h} + } + + // BinIDs: 3, 7, 11 — gaps at [4,6] and [8,10]. + results := []*storer.BinC{makeResult(ch1, 3), makeResult(ch2, 7), makeResult(ch3, 11)} + const cursor = uint64(11) + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(results, nil), mock.WithChunks(ch1, ch2, ch3), mock.WithCursors([]uint64{cursor}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, db = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 3) + if err != nil { + t.Fatal(err) + } + // Topmost must be the contiguous Topmost (3), not the max BinID (11). + if topmost != 3 { + t.Fatalf("topmost: got %d, want 3 (contiguous Topmost)", topmost) + } + // All three chunks must be delivered and stored in this single round trip. + if count != 3 { + t.Fatalf("count: got %d, want 3 (all chunks delivered eagerly)", count) + } + haveChunks(t, db, ch1, ch2, ch3) + }) +} + +// TestSync_Start0_SkipsGapDetection confirms that when start=0 the leading-gap +// check and the contiguous-Topmost cap are both skipped (both are guarded by +// start > 0). Even though the server's first chunk is at BinID=5, the offer is +// non-empty and topmost equals the historical cursor. +func TestSync_Start0_SkipsGapDetection(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ch := testingc.GenerateTestRandomChunk() + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + const binID = uint64(5) + result := []*storer.BinC{{ + Address: ch.Address(), + BatchID: ch.Stamp().BatchID(), + BinID: binID, + StampHash: stampHash, + }} + + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(result, nil), mock.WithChunks(ch), mock.WithCursors([]uint64{binID}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + if err != nil { + t.Fatal(err) + } + // Neither gap check fires for start=0; topmost must equal the cursor. + if topmost != binID { + t.Fatalf("topmost: got %d, want %d (gap detection must be skipped for start=0)", topmost, binID) + } + if count != 1 { + t.Fatalf("count: got %d, want 1", count) + } + }) +} + +// TestSync_BinBeyondCursors_StallsInterval verifies the safe-stall behaviour +// when bin is beyond the cursors slice returned by ReserveLastBinIDs. +// historicalCursor defaults to 0, so the cursor cap sets topmost=0 for any +// non-empty offer. The client stores the chunk but does not advance the +// interval (top=0 < start=1 in the puller guard). +func TestSync_BinBeyondCursors_StallsInterval(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ch := testingc.GenerateTestRandomChunk() + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + result := []*storer.BinC{{ + Address: ch.Address(), + BatchID: ch.Stamp().BatchID(), + BinID: 1, + StampHash: stampHash, + }} + + // Cursors slice has only one entry (bin=0). Syncing bin=1 leaves + // historicalCursor=0, triggering the safe-stall path. + var ( + ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(result, nil), mock.WithChunks(ch), mock.WithCursors([]uint64{10}, 0)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 1, 1) + if err != nil { + t.Fatal(err) + } + // historicalCursor=0 for bin=1 → cursor cap fires → topmost=0. + if topmost != 0 { + t.Fatalf("topmost: got %d, want 0 (bin beyond cursors must stall interval)", topmost) + } + // The chunk is still stored; only interval advancement is suppressed. + if count != 1 { + t.Fatalf("count: got %d, want 1 (chunk must be stored despite topmost=0)", count) + } + }) +} + func haveChunks(t *testing.T, s *mock.ReserveStore, chunks ...swarm.Chunk) { t.Helper() for _, c := range chunks {