-
Notifications
You must be signed in to change notification settings - Fork 388
Expand file tree
/
Copy pathpullsync.go
More file actions
161 lines (133 loc) · 3.58 KB
/
Copy pathpullsync.go
File metadata and controls
161 lines (133 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethersphere/bee/v2/pkg/pullsync"
"github.com/ethersphere/bee/v2/pkg/swarm"
)
var _ pullsync.Interface = (*PullSyncMock)(nil)
func WithSyncError(err error) Option {
return optionFunc(func(p *PullSyncMock) {
p.syncErr = err
})
}
func WithCursors(v []uint64, e uint64) Option {
return optionFunc(func(p *PullSyncMock) {
p.cursors = v
p.epoch = e
})
}
func WithReplies(replies ...SyncReply) Option {
return optionFunc(func(p *PullSyncMock) {
for _, r := range replies {
p.replies[toID(r.Peer, r.Bin, r.Start)] = append(p.replies[toID(r.Peer, r.Bin, r.Start)], r)
}
})
}
// WithSyncCancelDelay adds an artificial delay after context cancellation in
// the blocking Sync path (when no reply is configured), simulating slow TCP
// teardown latency.
func WithSyncCancelDelay(d time.Duration) Option {
return optionFunc(func(p *PullSyncMock) {
p.cancelDelay = d
})
}
func toID(a swarm.Address, bin uint8, start uint64) string {
return fmt.Sprintf("%s-%d-%d", a, bin, start)
}
type SyncReply struct {
Peer swarm.Address
Bin uint8
Start uint64
Topmost uint64
Count int
}
type PullSyncMock struct {
mtx sync.Mutex
syncCalls []SyncReply
totalSyncCalls int // every Sync() entry, including blocking ones
syncErr error
cursors []uint64
epoch uint64
getCursorsPeers []swarm.Address
replies map[string][]SyncReply
cancelDelay time.Duration // extra delay after ctx cancellation in blocking path
quit chan struct{}
}
func NewPullSync(opts ...Option) *PullSyncMock {
s := &PullSyncMock{
quit: make(chan struct{}),
replies: make(map[string][]SyncReply),
}
for _, v := range opts {
v.apply(s)
}
return s
}
func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) {
p.mtx.Lock()
p.totalSyncCalls++
id := toID(peer, bin, start)
replies := p.replies[id]
if len(replies) > 0 {
reply := replies[0]
p.replies[id] = p.replies[id][1:]
p.syncCalls = append(p.syncCalls, reply)
p.mtx.Unlock()
return reply.Topmost, reply.Count, p.syncErr
}
cancelDelay := p.cancelDelay
p.mtx.Unlock()
<-ctx.Done()
if cancelDelay > 0 {
time.Sleep(cancelDelay)
}
return 0, 0, ctx.Err()
}
// TotalSyncCalls returns the total number of Sync() invocations, including blocking ones.
func (p *PullSyncMock) TotalSyncCalls() int {
p.mtx.Lock()
defer p.mtx.Unlock()
return p.totalSyncCalls
}
func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, uint64, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.getCursorsPeers = append(p.getCursorsPeers, peer)
return p.cursors, p.epoch, nil
}
func (p *PullSyncMock) ResetCalls(peer swarm.Address) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.syncCalls = nil
}
func (p *PullSyncMock) SyncCalls(peer swarm.Address) (res []SyncReply) {
p.mtx.Lock()
defer p.mtx.Unlock()
for _, v := range p.syncCalls {
if v.Peer.Equal(peer) {
res = append(res, v)
}
}
return res
}
func (p *PullSyncMock) CursorsCalls(peer swarm.Address) bool {
p.mtx.Lock()
defer p.mtx.Unlock()
return swarm.ContainsAddress(p.getCursorsPeers, peer)
}
func (p *PullSyncMock) SetEpoch(epoch uint64) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.epoch = epoch
}
type Option interface {
apply(*PullSyncMock)
}
type optionFunc func(*PullSyncMock)
func (f optionFunc) apply(r *PullSyncMock) { f(r) }