Skip to content

Commit 889c053

Browse files
committed
bitswap/httpnet: rework requestManager. Address comments. Docstrings.
1 parent 200bf49 commit 889c053

6 files changed

Lines changed: 138 additions & 99 deletions

File tree

bitswap/network/bsnet/ipfs_impl.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,6 @@ func (s *streamMessageSender) Reset() error {
155155
return nil
156156
}
157157

158-
// Close the stream
159-
func (s *streamMessageSender) Close() error {
160-
stream := s.stream.Load()
161-
if stream != nil {
162-
err := stream.Close()
163-
s.stream.Store(nil)
164-
return err
165-
}
166-
return nil
167-
}
168-
169158
// Indicates whether the peer supports HAVE / DONT_HAVE messages
170159
func (s *streamMessageSender) SupportsHave() bool {
171160
stream := s.stream.Load()

bitswap/network/httpnet/httpnet.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ var (
4747
DefaultMaxHTTPAddressesPerPeer = 10
4848
)
4949

50+
var pingCid = "bafkqaaa" // identity CID
51+
5052
// Option allows to configure the Network.
5153
type Option func(net *Network)
5254

@@ -182,7 +184,7 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork {
182184
opt(htnet)
183185
}
184186

185-
reqTracker := newRequestTracker(htnet.idleConnTimeout * 2)
187+
reqTracker := newRequestTracker()
186188
htnet.requestTracker = reqTracker
187189

188190
cooldownTracker := newCooldownTracker(DefaultMaxBackoff)
@@ -299,7 +301,6 @@ func (ht *Network) SendMessage(ctx context.Context, p peer.ID, msg bsmsg.BitSwap
299301
if err != nil {
300302
return err
301303
}
302-
defer sender.Close()
303304
return sender.SendMsg(ctx, msg)
304305
}
305306

@@ -350,12 +351,9 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error {
350351
// This allows re-using the connections that we are about to open next
351352
// time with the client. We call peer.Connected()
352353
// on success.
353-
//
354-
// TODO: Decide whether we want to connect to all, or just try until
355-
// we find a working one.
356354
var workingAddrs []multiaddr.Multiaddr
357355
for i, u := range urls {
358-
req, err := ht.buildRequest(ctx, p.ID, u, "GET", "bafyaabakaieac")
356+
req, err := ht.buildRequest(ctx, u, "GET", pingCid)
359357
if err != nil {
360358
log.Debug(err)
361359
return err
@@ -371,6 +369,11 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error {
371369
}
372370
continue
373371
}
372+
373+
if resp.Proto != "HTTP/2.0" {
374+
log.Warnf("%s://%q is not using HTTP/2 (%s)", req.URL.Scheme, req.URL.Host, resp.Proto)
375+
}
376+
374377
if resp.StatusCode >= 500 { // 5xx
375378
// We made a proper request and got a 5xx back.
376379
// We cannot consider this a working connection.
@@ -449,7 +452,7 @@ func (ht *Network) Stats() network.Stats {
449452
}
450453

451454
// buildRequests sets up common settings for making a requests.
452-
func (ht *Network) buildRequest(ctx context.Context, pid peer.ID, u network.ParsedURL, method string, cid string) (*http.Request, error) {
455+
func (ht *Network) buildRequest(ctx context.Context, u network.ParsedURL, method string, cid string) (*http.Request, error) {
453456
// copy url
454457
sendURL, _ := url.Parse(u.URL.String())
455458
sendURL.RawQuery = "format=raw"

bitswap/network/httpnet/msg_sender.go

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ var (
3232
// headers and must be at least 50ms.
3333
DefaultSendErrorBackoff = time.Second
3434

35+
// how many errors we tolerate before we give up making requests for a
36+
// block, when the error is client-side: 404, 401 etc. It makes no
37+
// sense to retry such requests, therefore 1.
3538
defaultMaxClientErrors = 1
3639
)
3740

@@ -114,6 +117,10 @@ func (sender *httpMsgSender) sortURLS() {
114117
func (sender *httpMsgSender) bestURL() (*senderURL, error) {
115118
sender.sortURLS()
116119
first := sender.urls[0]
120+
121+
// A nil URL and no error signals that we don't need to abort, but
122+
// cannot keep retrying the same. client errors will be reset for next
123+
// block in the wantlist.
117124
if first.clientErrors >= defaultMaxClientErrors {
118125
return nil, nil
119126
}
@@ -126,6 +133,7 @@ func (sender *httpMsgSender) bestURL() (*senderURL, error) {
126133
}
127134

128135
// resetClientErrors sets all clientErrors on urls to 0.
136+
// It is called every time we move to the next item in a wantlist.
129137
func (sender *httpMsgSender) resetClientErrors() {
130138
for i := range sender.urls {
131139
sender.urls[i].clientErrors = 0
@@ -188,7 +196,7 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
188196

189197
ctx, cancel := context.WithTimeout(ctx, sender.opts.SendTimeout)
190198
defer cancel()
191-
req, err := sender.ht.buildRequest(ctx, sender.peer, u.ParsedURL, method, entry.Cid.String())
199+
req, err := sender.ht.buildRequest(ctx, u.ParsedURL, method, entry.Cid.String())
192200
if err != nil {
193201
return &senderError{
194202
Type: typeFatal,
@@ -223,10 +231,6 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
223231
return serr
224232
}
225233

226-
if resp.Proto != "HTTP/2.0" {
227-
log.Warnf("%s://%q is not using HTTP/2 (%s)", req.URL.Scheme, req.URL.Host, resp.Proto)
228-
}
229-
230234
limReader := &io.LimitedReader{
231235
R: resp.Body,
232236
N: sender.ht.maxBlockSize,
@@ -301,7 +305,7 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
301305
return nil
302306
case http.StatusTooManyRequests,
303307
http.StatusServiceUnavailable:
304-
err := fmt.Errorf("%q -> %d: %s", req.URL, resp.StatusCode, string(body))
308+
err := fmt.Errorf("%q -> %d: %q", req.URL, resp.StatusCode, string(body))
305309
log.Error(err)
306310
retryAfter := resp.Header.Get("Retry-After")
307311
cooldownUntil, ok := parseRetryAfter(retryAfter)
@@ -320,7 +324,7 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
320324
// Tolerance for server errors per url is low. If after waiting etc.
321325
// it fails MaxRetries, we will fully disconnect.
322326
default:
323-
err := fmt.Errorf("%q -> %d: %s", req.URL, resp.StatusCode, string(body))
327+
err := fmt.Errorf("%q -> %d: %q", req.URL, resp.StatusCode, string(body))
324328
log.Error(err)
325329
sender.ht.cooldownTracker.setByDuration(req.URL.Host, sender.opts.SendErrorBackoff)
326330
return &senderError{
@@ -376,11 +380,17 @@ func (sender *httpMsgSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessa
376380
// This allows us to react when cancels arrive to wantlists
377381
// that we are going through.
378382
entryCtxs := make([]context.Context, len(wantlist))
383+
entryCancels := make([]context.CancelFunc, len(wantlist))
384+
nop := func() {}
379385
for i, entry := range wantlist {
380386
if entry.Cancel {
381387
entryCtxs[i] = ctx
388+
entryCancels[i] = nop
382389
}
383-
entryCtxs[i] = sender.ht.requestTracker.requestContext(ctx, entry.Cid)
390+
// The TTL here is just for auto-cleaning the request context
391+
// from the request tracker. It is set in a way that ensure that the request
392+
// has run
393+
entryCtxs[i], entryCancels[i] = sender.ht.requestTracker.requestContext(ctx, entry.Cid)
384394
}
385395

386396
WANTLIST_LOOP:
@@ -399,8 +409,12 @@ WANTLIST_LOOP:
399409
// we disconnect this peer
400410
// and avoid it for the time being.
401411
if err != nil {
402-
defer sender.ht.DisconnectFrom(ctx, sender.peer)
403-
break WANTLIST_LOOP
412+
// notify new blocks before disconnecting. As
413+
// disconnecting may trigger cleanups or
414+
// something.
415+
sender.notifyReceivers(bsresp)
416+
sender.ht.DisconnectFrom(ctx, sender.peer)
417+
return nil
404418
}
405419

406420
// we move to next block, no good urls
@@ -452,44 +466,45 @@ WANTLIST_LOOP:
452466
sender.resetClientErrors()
453467
}
454468

455-
lb := len(bsresp.Blocks())
456-
lh := len(bsresp.Haves())
457-
ldh := len(bsresp.DontHaves())
458-
if lb+lh+ldh > 0 {
459-
// send what we got ReceiveMessage and return
460-
go func(receivers []network.Receiver, p peer.ID, msg bsmsg.BitSwapMessage) {
461-
// TODO: do not hang if closing
462-
for i, recv := range receivers {
463-
log.Debugf("ReceiveMessage from %s#%d. Blocks: %d. Haves: %d", p, i, lb, lh)
464-
recv.ReceiveMessage(
465-
context.Background(), // todo: which context?
466-
p,
467-
msg,
468-
)
469-
}
470-
}(sender.ht.receivers, sender.peer, bsresp)
471-
} else {
472-
// If we did not manage to obtain anything, log errors
473-
sendErrors(err)
469+
// clean up request tracker
470+
for _, cancel := range entryCancels {
471+
cancel()
474472
}
473+
sender.ht.requestTracker.cleanEmptyRequests(wantlist)
474+
475+
go sender.notifyReceivers(bsresp)
476+
// This just logs errors apparently.
477+
sendErrors(err)
475478

476479
// We never return error. Whatever happened, we will be cooling down
477480
// urls etc. but we don't need to disconnect or report that "peer is
478-
// down" for the moment.
479-
// TODO: improve logic?
481+
// down" for the moment, as we disconnect manually on error.
480482
return nil
481483
}
482484

483-
// Close closes the message sender, aborting any ongoing operations.
484-
func (sender *httpMsgSender) Close() error {
485-
sender.closeOnce.Do(func() {
486-
close(sender.closing)
487-
})
488-
return nil
485+
func (sender *httpMsgSender) notifyReceivers(bsresp bsmsg.BitSwapMessage) {
486+
lb := len(bsresp.Blocks())
487+
lh := len(bsresp.Haves())
488+
ldh := len(bsresp.DontHaves())
489+
if lb+lh+ldh == 0 { // nothing to do
490+
return
491+
}
492+
493+
for i, recv := range sender.ht.receivers {
494+
log.Debugf("ReceiveMessage from %s#%d. Blocks: %d. Haves: %d", sender.peer, i, lb, lh)
495+
recv.ReceiveMessage(
496+
context.Background(), // todo: which context?
497+
sender.peer,
498+
bsresp,
499+
)
500+
}
489501
}
490502

491503
// Reset resets the sender (currently noop)
492504
func (sender *httpMsgSender) Reset() error {
505+
sender.closeOnce.Do(func() {
506+
close(sender.closing)
507+
})
493508
return nil
494509
}
495510

0 commit comments

Comments
 (0)