@@ -120,7 +120,6 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
120120 }
121121
122122 opts := []libp2p.Option {
123- libp2p .ShareTCPListener (),
124123 libp2p .ListenAddrStrings (listenAddrs ... ),
125124 security ,
126125 // Use dedicated peerstore instead the global DefaultPeerstore
@@ -374,16 +373,6 @@ func (s *Service) handleIncoming(stream network.Stream) {
374373 return
375374 }
376375
377- if i .FullNode {
378- err = s .addressbook .Put (i .BzzAddress .Overlay , * i .BzzAddress )
379- if err != nil {
380- s .logger .Debug ("stream handler: addressbook put error" , "peer_id" , peerID , "error" , err )
381- s .logger .Error (nil , "stream handler: unable to persist peer" , "peer_id" , peerID )
382- _ = s .Disconnect (i .BzzAddress .Overlay , "unable to persist peer in addressbook" )
383- return
384- }
385- }
386-
387376 peer := p2p.Peer {Address : overlay , FullNode : i .FullNode , EthereumAddress : i .BzzAddress .EthereumAddress }
388377
389378 s .protocolsmu .RLock ()
@@ -400,53 +389,25 @@ func (s *Service) handleIncoming(stream network.Stream) {
400389 s .protocolsmu .RUnlock ()
401390
402391 if s .notifier != nil {
403- if ! i .FullNode {
404- s .lightNodes .Connected (s .ctx , peer )
405- // light node announces explicitly
406- if err := s .notifier .Announce (s .ctx , peer .Address , i .FullNode ); err != nil {
407- s .logger .Debug ("stream handler: notifier.Announce failed" , "peer" , peer .Address , "error" , err )
408- }
392+ s .lightNodes .Connected (s .ctx , peer )
393+ // light node announces explicitly
394+ if err := s .notifier .Announce (s .ctx , peer .Address , i .FullNode ); err != nil {
395+ s .logger .Debug ("stream handler: notifier.Announce failed" , "peer" , peer .Address , "error" , err )
396+ }
409397
410- if s .lightNodes .Count () > s .lightNodeLimit {
411- // kick another node to fit this one in
412- p , err := s .lightNodes .RandomPeer (peer .Address )
413- if err != nil {
414- s .logger .Debug ("stream handler: can't find a peer slot for light node" , "error" , err )
415- _ = s .Disconnect (peer .Address , "unable to find peer slot for light node" )
416- return
417- } else {
418- loggerV1 .Debug ("stream handler: kicking away light node to make room for new node" , "old_peer" , p .String (), "new_peer" , peer .Address )
419-
420- _ = s .Disconnect (p , "kicking away light node to make room for peer" )
421- return
422- }
423- }
424- } else {
425- if err := s .notifier .Connected (s .ctx , peer , false ); err != nil {
426- s .logger .Debug ("stream handler: notifier.Connected: peer disconnected" , "peer" , i .BzzAddress .Overlay , "error" , err )
427- // note: this cannot be unit tested since the node
428- // waiting on handshakeStream.FullClose() on the other side
429- // might actually get a stream reset when we disconnect here
430- // resulting in a flaky response from the Connect method on
431- // the other side.
432- // that is why the Pick method has been added to the notifier
433- // interface, in addition to the possibility of deciding whether
434- // a peer connection is wanted prior to adding the peer to the
435- // peer registry and starting the protocols.
436- _ = s .Disconnect (overlay , "unable to signal connection notifier" )
398+ if s .lightNodes .Count () > s .lightNodeLimit {
399+ // kick another node to fit this one in
400+ p , err := s .lightNodes .RandomPeer (peer .Address )
401+ if err != nil {
402+ s .logger .Debug ("stream handler: can't find a peer slot for light node" , "error" , err )
403+ _ = s .Disconnect (peer .Address , "unable to find peer slot for light node" )
404+ return
405+ } else {
406+ loggerV1 .Debug ("stream handler: kicking away light node to make room for new node" , "old_peer" , p .String (), "new_peer" , peer .Address )
407+
408+ _ = s .Disconnect (p , "kicking away light node to make room for peer" )
437409 return
438410 }
439- // when a full node connects, we gossip about it to the
440- // light nodes so that they can also have a chance at building
441- // a solid topology.
442- _ = s .lightNodes .EachPeer (func (addr swarm.Address , _ uint8 ) (bool , bool , error ) {
443- go func (addressee , peer swarm.Address , fullnode bool ) {
444- if err := s .notifier .AnnounceTo (s .ctx , addressee , peer , fullnode ); err != nil {
445- s .logger .Debug ("stream handler: notifier.AnnounceTo failed" , "addressee" , addressee , "peer" , peer , "error" , err )
446- }
447- }(addr , peer .Address , i .FullNode )
448- return false , false , nil
449- })
450411 }
451412 }
452413
@@ -493,7 +454,6 @@ func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration, reaso
493454}
494455
495456func (s * Service ) Connect (ctx context.Context , addr ma.Multiaddr ) (address * bzz.Address , err error ) {
496- loggerV1 := s .logger .V (1 ).Register ()
497457
498458 defer func () {
499459 err = s .determineCurrentNetworkStatus (err )
@@ -534,86 +494,16 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
534494 }
535495
536496 handshakeStream := newStream (stream )
537- i , err : = s .handshakeService .Handshake (ctx , handshakeStream , stream .Conn ().RemoteMultiaddr (), stream .Conn ().RemotePeer ())
497+ _ , err = s .handshakeService .Handshake (ctx , handshakeStream , stream .Conn ().RemoteMultiaddr (), stream .Conn ().RemotePeer ())
538498 if err != nil {
539499 _ = handshakeStream .Reset ()
540500 _ = s .host .Network ().ClosePeer (info .ID )
541501 return nil , fmt .Errorf ("handshake: %w" , err )
542502 }
543503
544- if ! i .FullNode {
545- _ = handshakeStream .Reset ()
546- _ = s .host .Network ().ClosePeer (info .ID )
547- return nil , p2p .ErrDialLightNode
548- }
549-
550- overlay := i .BzzAddress .Overlay
551-
552- blocked , err := s .blocklist .Exists (overlay )
553- if err != nil {
554- s .logger .Debug ("blocklisting: exists failed" , "peer_id" , info .ID , "error" , err )
555- s .logger .Error (nil , "internal error while connecting with peer" , "peer_id" , info .ID )
556- _ = handshakeStream .Reset ()
557- _ = s .host .Network ().ClosePeer (info .ID )
558- return nil , err
559- }
560-
561- if blocked {
562- s .logger .Error (nil , "blocked connection to blocklisted peer" , "peer_id" , info .ID )
563- _ = handshakeStream .Reset ()
564- _ = s .host .Network ().ClosePeer (info .ID )
565- return nil , p2p .ErrPeerBlocklisted
566- }
567-
568- if exists := s .peers .addIfNotExists (stream .Conn (), overlay , i .FullNode ); exists {
569- if err := handshakeStream .FullClose (); err != nil {
570- _ = s .Disconnect (overlay , "failed closing handshake stream after connect" )
571- return nil , fmt .Errorf ("peer exists, full close: %w" , err )
572- }
573-
574- return i .BzzAddress , nil
575- }
576-
577- if err := handshakeStream .FullClose (); err != nil {
578- _ = s .Disconnect (overlay , "could not fully close handshake stream after connect" )
579- return nil , fmt .Errorf ("connect full close %w" , err )
580- }
581-
582- if i .FullNode {
583- err = s .addressbook .Put (overlay , * i .BzzAddress )
584- if err != nil {
585- _ = s .Disconnect (overlay , "failed storing peer in addressbook" )
586- return nil , fmt .Errorf ("storing bzz address: %w" , err )
587- }
588- }
589-
590- s .protocolsmu .RLock ()
591- for _ , tn := range s .protocols {
592- if tn .ConnectOut != nil {
593- if err := tn .ConnectOut (ctx , p2p.Peer {Address : overlay , FullNode : i .FullNode , EthereumAddress : i .BzzAddress .EthereumAddress }); err != nil {
594- s .logger .Debug ("connectOut: failed to connect" , "protocol" , tn .Name , "version" , tn .Version , "peer" , overlay , "error" , err )
595- _ = s .Disconnect (overlay , "failed to process outbound connection notifier" )
596- s .protocolsmu .RUnlock ()
597- return nil , fmt .Errorf ("connectOut: protocol: %s, version:%s: %w" , tn .Name , tn .Version , err )
598- }
599- }
600- }
601- s .protocolsmu .RUnlock ()
602-
603- if ! s .peers .Exists (overlay ) {
604- _ = s .Disconnect (overlay , "outbound peer does not exist" )
605- return nil , fmt .Errorf ("libp2p connect: peer %s does not exist %w" , overlay , p2p .ErrPeerNotFound )
606- }
607-
608- if s .reacher != nil {
609- s .reacher .Connected (overlay , i .BzzAddress .Underlay )
610- }
611-
612- peerUserAgent := appendSpace (s .peerUserAgent (ctx , info .ID ))
613-
614- loggerV1 .Debug ("successfully connected to peer (outbound)" , "addresses" , i .BzzAddress .ShortString (), "light" , i .LightString (), "user_agent" , peerUserAgent )
615- s .logger .Debug ("successfully connected to peer (outbound)" , "address" , i .BzzAddress .Overlay , "light" , i .LightString (), "user_agent" , peerUserAgent )
616- return i .BzzAddress , nil
504+ _ = handshakeStream .Reset ()
505+ _ = s .host .Network ().ClosePeer (info .ID )
506+ return nil , p2p .ErrDialLightNode
617507}
618508
619509func (s * Service ) Disconnect (overlay swarm.Address , reason string ) (err error ) {
0 commit comments