Skip to content

Commit c290234

Browse files
committed
chore(pullsync): use upstream/downstream peer terminology in comments
1 parent 1a88740 commit c290234

2 files changed

Lines changed: 41 additions & 41 deletions

File tree

pkg/pullsync/pullsync.go

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ type Interface interface {
5656
// Sync syncs a batch of chunks starting at a start BinID.
5757
// It returns the topmost BinID safe for interval advancement and the total
5858
// number of chunks successfully stored.
59-
// topmost equals offer.Topmost (capped at the server's historical cursor)
60-
// when all delivered chunks passed validation. topmost is 0 when any chunk
61-
// failed validation (invalid stamp, unsolicited, or structural error), so
62-
// callers that use topmost to advance their interval will not skip
63-
// unverified BinIDs. ErrOverwriteNewerChunk does not zero topmost because
64-
// the chunk is already present in the reserve.
59+
// topmost equals offer.Topmost (capped at the downstream peer's historical
60+
// cursor) when all delivered chunks passed validation. topmost is 0 when
61+
// any chunk failed validation (invalid stamp, unsolicited, or structural
62+
// error), so callers that use topmost to advance their interval will not
63+
// skip unverified BinIDs. ErrOverwriteNewerChunk does not zero topmost
64+
// because the chunk is already present in the reserve.
6565
Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error)
6666
// GetCursors retrieves all cursors from a downstream peer.
6767
GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, uint64, error)
@@ -227,10 +227,10 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
227227
}
228228

229229
// Sync syncs a batch of chunks starting at a start BinID.
230-
// topmost equals offer.Topmost capped at the server's historical cursor
231-
// (see collectAddrs). topmost is 0 when any delivered chunk failed validation
232-
// (invalid stamp, unsolicited, or structural error), preventing the caller
233-
// from silently skipping unverified BinIDs.
230+
// topmost equals offer.Topmost capped at the downstream peer's historical
231+
// cursor (see collectAddrs). topmost is 0 when any delivered chunk failed
232+
// validation (invalid stamp, unsolicited, or structural error), preventing
233+
// the caller from silently skipping unverified BinIDs.
234234
// ErrOverwriteNewerChunk does not zero topmost: the chunk is already present.
235235
// count is the number of chunks successfully written to the reserve.
236236
func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) {
@@ -324,7 +324,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
324324
// present in the reserve, so advancing the interval past it is correct.
325325
// Both are joined on return so callers can inspect individual errors via errors.Is.
326326
var (
327-
chunkErr error
327+
chunkErr error
328328
overwriteErr error
329329
)
330330
for ; ctr > 0; ctr-- {
@@ -441,10 +441,10 @@ type collectAddrsResult struct {
441441
// after which the function returns the collected slice of chunks.
442442
func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*storer.BinC, uint64, error) {
443443
v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (*collectAddrsResult, error) {
444-
// Snapshot the server's historical cursor for this bin before subscribing.
445-
// Any chunk with BinID beyond this cursor arrived live after the snapshot;
446-
// capping topmost at the cursor prevents live chunks from inflating the
447-
// interval the client records for this peer.
444+
// Snapshot the downstream peer's historical cursor for this bin before
445+
// subscribing. Any chunk with BinID beyond this cursor arrived live after
446+
// the snapshot; capping topmost at the cursor prevents live chunks from
447+
// inflating the interval the upstream peer records for this peer.
448448
cursors, _, err := s.store.ReserveLastBinIDs()
449449
if err != nil {
450450
return nil, fmt.Errorf("reserve last bin IDs: %w", err)
@@ -456,14 +456,14 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*
456456
// If bin is beyond the cursors slice (should not happen with a
457457
// well-behaved storer that always returns swarm.MaxBins entries),
458458
// historicalCursor stays 0. The cursor cap below then sets
459-
// topmost=0 for any non-empty offer, so the client never advances
460-
// its interval — a safe stall until the storer is consistent again.
459+
// topmost=0 for any non-empty offer, so the upstream peer never
460+
// advances its interval — a safe stall until the storer is consistent again.
461461

462462
var (
463-
chs []*storer.BinC
464-
topmost uint64
465-
timer *time.Timer
466-
timerC <-chan time.Time
463+
chs []*storer.BinC
464+
topmost uint64
465+
timer *time.Timer
466+
timerC <-chan time.Time
467467
contiguousEnd uint64 // highest BinID contiguous from start; only meaningful when start > 0
468468
)
469469
if start > 0 {
@@ -487,20 +487,20 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*
487487
break LOOP // The stream has been closed.
488488
}
489489

490-
// If the first chunk the server offers has a BinID beyond start
491-
// and within the historical range, the server has no chunks at
490+
// If the first chunk the downstream peer offers has a BinID beyond
491+
// start and within the historical range, it has no chunks at
492492
// [start, c.BinID-1]. Return an empty offer with Topmost set to
493-
// the gap boundary so the client advances its interval past only
494-
// the BinIDs that genuinely do not exist on this server, then
495-
// retries from c.BinID on the next round.
493+
// the gap boundary so the upstream peer advances its interval past
494+
// only the BinIDs that genuinely do not exist here, then retries
495+
// from c.BinID on the next round.
496496
if len(chs) == 0 && start > 0 && c.BinID > start && c.BinID <= historicalCursor {
497497
topmost = c.BinID - 1
498498
break LOOP
499499
}
500500

501501
// Track the contiguous frontier: the highest BinID reachable
502502
// from start without a gap. Used after the loop to cap Topmost
503-
// so the client's interval does not advance past missing BinIDs.
503+
// so the upstream peer's interval does not advance past missing BinIDs.
504504
if c.BinID == contiguousEnd+1 {
505505
contiguousEnd = c.BinID
506506
}
@@ -531,18 +531,18 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*
531531
}
532532

533533
// Cap topmost at the historical cursor. Live chunks (BinID > historicalCursor)
534-
// are included in the offer so the client can store them, but Topmost must
535-
// not advance the client's interval past the server's historical frontier.
534+
// are included in the offer so the upstream peer can store them, but Topmost
535+
// must not advance the upstream peer's interval past the historical frontier.
536536
if topmost > historicalCursor {
537537
topmost = historicalCursor
538538
}
539539

540540
// Cap topmost at the contiguous frontier. All collected chunks are
541-
// included in the offer for eager client-side storage, but Topmost is
542-
// bounded to the highest BinID reachable from start without a gap.
543-
// The client stores every chunk in one round trip; subsequent round
544-
// trips advance the interval bookkeeping via leading-gap empty offers
545-
// and empty Want bitvectors — no chunk data is retransmitted.
541+
// included in the offer for eager storage by the upstream peer, but
542+
// Topmost is bounded to the highest BinID reachable from start without
543+
// a gap. The upstream peer stores every chunk in one round trip;
544+
// subsequent round trips advance interval bookkeeping via leading-gap
545+
// empty offers and empty Want bitvectors — no chunk data is retransmitted.
546546
// The start > 0 guard matches the leading-gap check above: BinIDs
547547
// start at 1 in production and the puller always passes start >= 1.
548548
// For start=0 (used only in test helpers) contiguousEnd is ambiguous —

pkg/pullsync/pullsync_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -359,9 +359,9 @@ func TestSync_StampFailure_TopmostIsZero(t *testing.T) {
359359
}
360360

361361
// TestSync_LiveChunkTopCappedAtCursor verifies that a live chunk with a BinID
362-
// far beyond the server's historical cursor does not inflate offer.Topmost.
363-
// Without the server-side cap, the client would advance its interval to the
364-
// live chunk's BinID, permanently skipping the historical range in between.
362+
// far beyond the downstream peer's historical cursor does not inflate offer.Topmost.
363+
// Without the cap, the upstream peer would advance its interval to the live
364+
// chunk's BinID, permanently skipping the historical range in between.
365365
func TestSync_LiveChunkTopCappedAtCursor(t *testing.T) {
366366
synctest.Test(t, func(t *testing.T) {
367367
liveChunk := testingc.GenerateTestRandomChunk()
@@ -371,7 +371,7 @@ func TestSync_LiveChunkTopCappedAtCursor(t *testing.T) {
371371
}
372372

373373
// The subscribe response contains only the live chunk at a high BinID.
374-
// The server's historical cursor is set much lower.
374+
// The downstream peer's historical cursor is set much lower.
375375
const liveBinID = uint64(100)
376376
const historicalCursor = uint64(10)
377377
liveResult := []*storer.BinC{{
@@ -464,9 +464,9 @@ func TestSync_MidOfferGapCapsTopmostAtContiguousFrontier(t *testing.T) {
464464
const cursor = uint64(11)
465465

466466
var (
467-
ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(results, nil), mock.WithChunks(ch1, ch2, ch3), mock.WithCursors([]uint64{cursor}, 0))
468-
recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol()))
469-
psClient, db = newPullSync(t, recorder, 0)
467+
ps, _ = newPullSync(t, nil, 10, mock.WithSubscribeResp(results, nil), mock.WithChunks(ch1, ch2, ch3), mock.WithCursors([]uint64{cursor}, 0))
468+
recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol()))
469+
psClient, db = newPullSync(t, recorder, 0)
470470
)
471471

472472
topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 3)

0 commit comments

Comments
 (0)