Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/caddyserver/certmagic v0.21.6
github.com/coreos/go-semver v0.3.0
github.com/ethereum/go-ethereum v1.15.11
github.com/ethersphere/batch-archive v0.0.5
github.com/ethersphere/batch-archive v0.0.6
github.com/ethersphere/go-price-oracle-abi v0.6.9
github.com/ethersphere/go-storage-incentives-abi v0.9.4
github.com/ethersphere/go-sw3-abi v0.6.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ github.com/ethereum/go-ethereum v1.15.11 h1:JK73WKeu0WC0O1eyX+mdQAVHUV+UR1a9VB/d
github.com/ethereum/go-ethereum v1.15.11/go.mod h1:mf8YiHIb0GR4x4TipcvBUPxJLw1mFdmxzoDi11sDRoI=
github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8=
github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk=
github.com/ethersphere/batch-archive v0.0.5 h1:SM3g7Tuge4KhOn+NKgPcg2Uz2p8a/MLKZvZpmkKCyU4=
github.com/ethersphere/batch-archive v0.0.5/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC438/FFZdONY=
github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU=
github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0=
Expand Down
1 change: 0 additions & 1 deletion pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count))
}

// pulled at least one chunk
if top >= start {
if err := p.addPeerInterval(address, bin, start, top); err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
Expand Down
42 changes: 41 additions & 1 deletion pkg/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,53 @@ func TestContinueSyncing(t *testing.T) {
kad.Trigger()

err := spinlock.Wait(time.Second, func() bool {
return len(pullsync.SyncCalls(addr)) == 1
return len(pullsync.SyncCalls(addr)) >= 1
})
if err != nil {
t.Fatal(err)
}
}

// TestIntervalDoesNotAdvanceOnSyncError verifies that a sync error does not cause
// the interval to be advanced: the interval store must remain empty after a sync
// that returns an error, even if offer.Topmost is non-zero.
func TestIntervalDoesNotAdvanceOnSyncError(t *testing.T) {
t.Parallel()

addr := swarm.RandAddress(t)

_, s, kad, pullsync := newPuller(t, opts{
kad: []kadMock.Option{
kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}),
},
pullSync: []mockps.Option{
mockps.WithCursors([]uint64{100}, 0),
mockps.WithSyncError(errors.New("stamp error")),
mockps.WithReplies(
mockps.SyncReply{Start: 1, Topmost: 100, Peer: addr},
mockps.SyncReply{Start: 1, Topmost: 100, Peer: addr},
),
},
bins: 1,
rs: resMock.NewReserve(resMock.WithRadius(0)),
})

time.Sleep(100 * time.Millisecond)
kad.Trigger()

// wait until at least one sync call was made
err := spinlock.Wait(time.Second, func() bool {
return len(pullsync.SyncCalls(addr)) >= 1
})
if err != nil {
t.Fatal(err)
}

// interval must not have been advanced: it is initialized to [] (no ranges committed)
// and must remain [] since every sync call returned an error
checkIntervals(t, s, addr, "[]", 0)
}

func TestPeerGone(t *testing.T) {
t.Parallel()

Expand Down
7 changes: 6 additions & 1 deletion pkg/pullsync/mock/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8,
p.replies[id] = p.replies[id][1:]
p.syncCalls = append(p.syncCalls, reply)
p.mtx.Unlock()
return reply.Topmost, reply.Count, p.syncErr
// mirror real Sync: topmost is 0 on error so the interval is not advanced
top := reply.Topmost
if p.syncErr != nil {
top = 0
}
return top, reply.Count, p.syncErr
}
p.mtx.Unlock()
<-ctx.Done()
Expand Down
118 changes: 88 additions & 30 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ const (
cursorStreamName = "cursors"
)

var (
ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk")
)
var ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk")

const (
MaxCursor = math.MaxUint64
Expand All @@ -56,8 +54,13 @@ const (
// Interface is the PullSync interface.
type Interface interface {
// Sync syncs a batch of chunks starting at a start BinID.
// It returns the BinID of highest chunk that was synced from the given
// batch and the total number of chunks the downstream peer has sent.
// It returns the topmost BinID safe for interval advancement and the total
// number of chunks successfully stored.
// topmost equals offer.Topmost capped at the downstream peer's historical
// cursor (see collectAddrs). Per-chunk errors (invalid stamp, unsolicited
// chunk, unknown batch, invalid structure) are logged and skipped; they do
// not affect topmost or the return error. err is non-nil only for
// stream-level failures that abort the entire sync.
Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error)
// GetCursors retrieves all cursors from a downstream peer.
GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, uint64, error)
Expand Down Expand Up @@ -92,7 +95,6 @@ func New(
logger log.Logger,
maxPage uint64,
) *Syncer {

return &Syncer{
streamer: streamer,
store: store,
Expand Down Expand Up @@ -128,7 +130,6 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec {

// handler handles an incoming request to sync an interval
func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {

select {
case <-s.quit:
return nil
Expand Down Expand Up @@ -225,10 +226,13 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
}

// Sync syncs a batch of chunks starting at a start BinID.
// It returns the BinID of highest chunk that was synced from the given
// batch and the total number of chunks the downstream peer has sent.
// topmost equals offer.Topmost capped at the downstream peer's historical
// cursor (see collectAddrs). Per-chunk errors (invalid stamp, unsolicited
// chunk, unknown batch, invalid structure) are logged and skipped; they do
// not affect topmost or the return error. err is non-nil only for
// stream-level failures that abort the entire sync.
// count is the number of chunks successfully written to the reserve.
func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) {

stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return 0, 0, fmt.Errorf("new stream: %w", err)
Expand All @@ -253,8 +257,8 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
return 0, 0, fmt.Errorf("read offer: %w", err)
}

// empty interval (no chunks present in interval).
// return the end of the requested range as topmost.
// empty interval: no chunks in range.
// return offer.Topmost so the caller can advance the interval.
if len(offer.Chunks) == 0 {
return offer.Topmost, 0, nil
}
Expand Down Expand Up @@ -312,11 +316,10 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start

chunksToPut := make([]swarm.Chunk, 0, ctr)

var chunkErr error
for ; ctr > 0; ctr-- {
var delivery pb.Delivery
if err = r.ReadMsgWithContext(ctx, &delivery); err != nil {
return 0, 0, errors.Join(chunkErr, fmt.Errorf("read delivery: %w", err))
return 0, 0, fmt.Errorf("read delivery: %w", err)
}

addr := swarm.NewAddress(delivery.Address)
Expand All @@ -330,19 +333,18 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start

stamp := new(postage.Stamp)
if err = stamp.UnmarshalBinary(delivery.Stamp); err != nil {
chunkErr = errors.Join(chunkErr, err)
s.logger.Debug("invalid stamp", "error", err, "peer_address", peer, "chunk_address", addr)
continue
}
stampHash, err := stamp.Hash()
if err != nil {
chunkErr = errors.Join(chunkErr, err)
s.logger.Debug("invalid stamp hash", "error", err, "peer_address", peer, "chunk_address", addr)
continue
}

wantChunkID := addr.ByteString() + string(stamp.BatchID()) + string(stampHash)
if _, ok := wantChunks[wantChunkID]; !ok {
s.logger.Debug("want chunks", "error", ErrUnsolicitedChunk, "peer_address", peer, "chunk_address", addr)
chunkErr = errors.Join(chunkErr, ErrUnsolicitedChunk)
continue
}

Expand All @@ -351,7 +353,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
chunk, err := s.validStamp(newChunk.WithStamp(stamp))
if err != nil {
s.logger.Debug("unverified stamp", "error", err, "peer_address", peer, "chunk_address", newChunk)
chunkErr = errors.Join(chunkErr, err)
continue
}

Expand All @@ -360,14 +361,13 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
} else if chunk, err := soc.FromChunk(chunk); err == nil {
addr, err := chunk.Address()
if err != nil {
chunkErr = errors.Join(chunkErr, err)
s.logger.Debug("invalid soc address", "error", err, "peer_address", peer)
continue
}
s.logger.Debug("sync gsoc", "peer_address", peer, "chunk_address", addr, "wrapped_chunk_address", chunk.WrappedChunk().Address())
s.gsocHandler(chunk)
} else {
s.logger.Debug("invalid cac/soc chunk", "error", swarm.ErrInvalidChunk, "peer_address", peer, "chunk", chunk)
chunkErr = errors.Join(chunkErr, swarm.ErrInvalidChunk)
s.metrics.ReceivedInvalidChunk.Inc()
continue
}
Expand All @@ -382,25 +382,21 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start

for _, c := range chunksToPut {
if err := s.store.ReservePutter().Put(ctx, c); err != nil {
// in case of these errors, no new items are added to the storage, so it
// is safe to continue with the next chunk
if errors.Is(err, storage.ErrOverwriteNewerChunk) {
s.logger.Debug("overwrite newer chunk", "error", err, "peer_address", peer, "chunk", c)
chunkErr = errors.Join(chunkErr, err)
continue
}
return 0, 0, errors.Join(chunkErr, err)
return 0, 0, err
}
chunksPut++
}
}

return topmost, chunksPut, chunkErr
return topmost, chunksPut, nil
}

// makeOffer tries to assemble an offer for a given requested interval.
func (s *Syncer) makeOffer(ctx context.Context, rn pb.Get) (*pb.Offer, error) {

addrs, top, err := s.collectAddrs(ctx, uint8(rn.Bin), rn.Start)
if err != nil {
return nil, err
Expand All @@ -426,12 +422,34 @@ type collectAddrsResult struct {
// after which the function returns the collected slice of chunks.
func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*storer.BinC, uint64, error) {
v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (*collectAddrsResult, error) {
// Snapshot the downstream peer's historical cursor for this bin before
// subscribing. Any chunk with BinID beyond this cursor arrived live after
// the snapshot; capping topmost at the cursor prevents live chunks from
// inflating the interval the upstream peer records for this peer.
cursors, _, err := s.store.ReserveLastBinIDs()
if err != nil {
return nil, fmt.Errorf("reserve last bin IDs: %w", err)
}
var historicalCursor uint64
if int(bin) < len(cursors) {
historicalCursor = cursors[bin]
}
// If bin is beyond the cursors slice (should not happen with a
// well-behaved storer that always returns swarm.MaxBins entries),
// historicalCursor stays 0. The cursor cap below then sets
// topmost=0 for any non-empty offer, so the upstream peer never
// advances its interval — a safe stall until the storer is consistent again.

var (
chs []*storer.BinC
topmost uint64
timer *time.Timer
timerC <-chan time.Time
chs []*storer.BinC
topmost uint64
timer *time.Timer
timerC <-chan time.Time
contiguousEnd uint64 // highest BinID contiguous from start; only meaningful when start > 0
)
if start > 0 {
contiguousEnd = start - 1
}
chC, unsub, errC := s.store.SubscribeBin(ctx, bin, start)
defer func() {
unsub()
Expand All @@ -450,6 +468,24 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*
break LOOP // The stream has been closed.
}

// If the first chunk the downstream peer offers has a BinID beyond
// start and within the historical range, it has no chunks at
// [start, c.BinID-1]. Return an empty offer with Topmost set to
// the gap boundary so the upstream peer advances its interval past
// only the BinIDs that genuinely do not exist here, then retries
// from c.BinID on the next round.
if len(chs) == 0 && start > 0 && c.BinID > start && c.BinID <= historicalCursor {
topmost = c.BinID - 1
break LOOP
}

// Track the contiguous Topmost: the highest BinID reachable
// from start without a gap. Used after the loop to cap Topmost
// so the upstream peer's interval does not advance past missing BinIDs.
if c.BinID == contiguousEnd+1 {
contiguousEnd = c.BinID
}

chs = append(chs, &storer.BinC{Address: c.Address, BatchID: c.BatchID, StampHash: c.StampHash})
if c.BinID > topmost {
topmost = c.BinID
Expand All @@ -475,6 +511,28 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*
}
}

// Cap topmost at the historical cursor. Live chunks (BinID > historicalCursor)
// are included in the offer so the upstream peer can store them, but Topmost
// must not advance the upstream peer's interval past the historical cursor.
if topmost > historicalCursor {
topmost = historicalCursor
}

// Cap topmost at the contiguous Topmost. All collected chunks are
// included in the offer for eager storage by the upstream peer, but
// Topmost is bounded to the highest BinID reachable from start without
// a gap. The upstream peer stores every chunk in one round trip;
// subsequent round trips advance interval bookkeeping via leading-gap
// empty offers and empty Want bitvectors — no chunk data is retransmitted.
// The start > 0 guard matches the leading-gap check above: BinIDs
// start at 1 in production and the puller always passes start >= 1.
// For start=0 (used only in test helpers) contiguousEnd is ambiguous —
// uint64 underflow prevents the start-1 initialisation — so the cap
// is skipped and topmost is left at the historical cursor.
if start > 0 && len(chs) > 0 && contiguousEnd >= start && contiguousEnd < topmost {
topmost = contiguousEnd
}

return &collectAddrsResult{chs: chs, topmost: topmost}, nil
})
if err != nil {
Expand Down
Loading
Loading