@@ -1421,7 +1421,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
14211421 // Create the Shutdown message.
14221422 shutdown , err := negotiateChanCloser .ShutdownChan ()
14231423 if err != nil {
1424- p .activeChanCloses . Delete (chanID )
1424+ p .deleteActiveChanCloser (chanID , chanPoint )
14251425 shutdownInfoErr = err
14261426
14271427 return
@@ -1465,7 +1465,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
14651465 // Creating this here ensures that any shutdown messages sent
14661466 // will be automatically routed by the msg router.
14671467 if _ , err := p .initRbfChanCloser (lnChan ); err != nil {
1468- p .activeChanCloses . Delete (chanID )
1468+ p .deleteActiveChanCloser (chanID , chanPoint )
14691469
14701470 return nil , fmt .Errorf ("unable to init RBF chan " +
14711471 "closer during peer connect: %w" , err )
@@ -1762,6 +1762,10 @@ func (p *Brontide) Disconnect(reason error) {
17621762 // Stop the onion peer actor if one was spawned.
17631763 p .StopOnionActorIfExists ()
17641764
1765+ // Unregister any RBF close actors registered for channels of this
1766+ // peer so we don't leave stale entries in the actor system.
1767+ p .unregisterRbfCloseActors ()
1768+
17651769 // Ensure that the TCP connection is properly closed before continuing.
17661770 p .cfg .Conn .Close ()
17671771
@@ -1793,6 +1797,54 @@ func (p *Brontide) StopOnionActorIfExists() {
17931797 )
17941798}
17951799
1800+ // unregisterRbfCloseActor removes any RBF close actor registered for the
1801+ // given channel point from the actor system. This is idempotent and safe to
1802+ // call whether or not an actor was registered for the channel point.
1803+ func (p * Brontide ) unregisterRbfCloseActor (chanPoint wire.OutPoint ) {
1804+ if p .cfg .ActorSystem == nil {
1805+ return
1806+ }
1807+
1808+ actorKey := NewRbfCloserPeerServiceKey (chanPoint )
1809+ actorKey .UnregisterAll (p .cfg .ActorSystem )
1810+ }
1811+
1812+ // deleteActiveChanCloser removes the chan closer for the given channel ID and
1813+ // also unregisters any RBF close actor associated with the channel point from
1814+ // the actor system. Callers should prefer this over calling
1815+ // activeChanCloses.Delete directly so the actor registry stays in sync with
1816+ // the active closers map.
1817+ func (p * Brontide ) deleteActiveChanCloser (chanID lnwire.ChannelID ,
1818+ chanPoint wire.OutPoint ) {
1819+
1820+ p .activeChanCloses .Delete (chanID )
1821+ p .unregisterRbfCloseActor (chanPoint )
1822+ }
1823+
1824+ // unregisterRbfCloseActors removes any RBF close actors registered for this
1825+ // peer's active channels from the actor system. This should be called on
1826+ // disconnect so we don't leave stale RBF close actors for a peer that is no
1827+ // longer connected. This is idempotent and safe to call multiple times.
1828+ func (p * Brontide ) unregisterRbfCloseActors () {
1829+ if p .cfg .ActorSystem == nil {
1830+ return
1831+ }
1832+
1833+ p .activeChannels .Range (func (_ lnwire.ChannelID ,
1834+ channel * lnwallet.LightningChannel ) bool {
1835+
1836+ // Pending channels are tracked with a nil value in the map,
1837+ // so skip those as they have no channel point to look up.
1838+ if channel == nil {
1839+ return true
1840+ }
1841+
1842+ p .unregisterRbfCloseActor (channel .ChannelPoint ())
1843+
1844+ return true
1845+ })
1846+ }
1847+
17961848// readNextMessage reads, and returns the next message on the wire along with
17971849// any additional raw payload.
17981850func (p * Brontide ) readNextMessage () (lnwire.Message , error ) {
@@ -3679,7 +3731,7 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
36793731 shutdownMsg , err := chanCloser .ShutdownChan ()
36803732 if err != nil {
36813733 p .log .Errorf ("unable to create shutdown message: %v" , err )
3682- p .activeChanCloses . Delete (chanID )
3734+ p .deleteActiveChanCloser (chanID , c . FundingOutpoint )
36833735 return nil , err
36843736 }
36853737
@@ -3784,7 +3836,7 @@ func (p *Brontide) initNegotiateChanCloser(req *htlcswitch.ChanClose,
37843836 // back to its normal state.
37853837 defer channel .ResetState ()
37863838
3787- p .activeChanCloses . Delete (chanID )
3839+ p .deleteActiveChanCloser (chanID , channel . ChannelPoint () )
37883840
37893841 return fmt .Errorf ("unable to shutdown channel: %w" , err )
37903842 }
@@ -3955,7 +4007,9 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser,
39554007 chanID := lnwire .NewChanIDFromOutPoint (
39564008 * closeReq .ChanPoint ,
39574009 )
3958- p .activeChanCloses .Delete (chanID )
4010+ p .deleteActiveChanCloser (
4011+ chanID , * closeReq .ChanPoint ,
4012+ )
39594013
39604014 return
39614015 }
@@ -4029,7 +4083,9 @@ func (c *chanErrorReporter) ReportError(chanErr error) {
40294083 }
40304084
40314085 if _ , err := c .peer .initRbfChanCloser (lnChan ); err != nil {
4032- c .peer .activeChanCloses .Delete (c .chanID )
4086+ c .peer .deleteActiveChanCloser (
4087+ c .chanID , lnChan .ChannelPoint (),
4088+ )
40334089
40344090 c .peer .log .Errorf ("unable to init RBF chan closer after " +
40354091 "error case: %v" , err )
@@ -4233,8 +4289,30 @@ func (p *Brontide) initRbfChanCloser(
42334289 "close: %w" , err )
42344290 }
42354291
4292+ // We store the closer first so that any lookups that race with actor
4293+ // registration will find the chan closer already in place.
42364294 p .activeChanCloses .Store (chanID , makeRbfCloser (& chanCloser ))
42374295
4296+ // In addition to the message router, we'll register the state machine
4297+ // with the actor system.
4298+ if p .cfg .ActorSystem != nil {
4299+ p .log .Infof ("Registering RBF actor for channel %v" ,
4300+ channel .ChannelPoint ())
4301+
4302+ actorWrapper := newRbfCloseActor (
4303+ channel .ChannelPoint (), p , p .cfg .ActorSystem ,
4304+ )
4305+ if err := actorWrapper .registerActor (); err != nil {
4306+ chanCloser .Stop ()
4307+ p .deleteActiveChanCloser (
4308+ chanID , channel .ChannelPoint (),
4309+ )
4310+
4311+ return nil , fmt .Errorf ("unable to register RBF close " +
4312+ "actor: %w" , err )
4313+ }
4314+ }
4315+
42384316 // Now that we've created the rbf closer state machine, we'll launch a
42394317 // new goroutine to eventually send in the ChannelFlushed event once
42404318 // needed.
@@ -4666,9 +4744,12 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
46664744 chanPoint := chanCloser .Channel ().ChannelPoint ()
46674745 p .WipeChannel (& chanPoint )
46684746
4669- // Also clear the activeChanCloses map of this channel.
4747+ // Also clear the activeChanCloses map of this channel, and unregister
4748+ // any RBF close actor that was registered for this channel point.
4749+ //
4750+ // TODO(roasbeef): existing race.
46704751 cid := lnwire .NewChanIDFromOutPoint (chanPoint )
4671- p .activeChanCloses . Delete (cid ) // TODO(roasbeef): existing race
4752+ p .deleteActiveChanCloser (cid , chanPoint )
46724753
46734754 // Next, we'll launch a goroutine which will request to be notified by
46744755 // the ChainNotifier once the closure transaction obtains a single
@@ -5211,7 +5292,9 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
52115292 chanCloser .CloseRequest ().Err <- err
52125293 }
52135294
5214- p .activeChanCloses .Delete (msg .cid )
5295+ p .deleteActiveChanCloser (
5296+ msg .cid , chanCloser .Channel ().ChannelPoint (),
5297+ )
52155298
52165299 p .Disconnect (err )
52175300 }
@@ -5551,7 +5634,7 @@ func (p *Brontide) addActiveChannel(c *lnpeer.NewChannel) error {
55515634 // Creating this here ensures that any shutdown messages sent will be
55525635 // automatically routed by the msg router.
55535636 if _ , err := p .initRbfChanCloser (lnChan ); err != nil {
5554- p .activeChanCloses . Delete (chanID )
5637+ p .deleteActiveChanCloser (chanID , lnChan . ChannelPoint () )
55555638
55565639 return fmt .Errorf ("unable to init RBF chan closer for new " +
55575640 "chan: %w" , err )
@@ -5817,42 +5900,3 @@ func (p *Brontide) ChanHasRbfCoopCloser(chanPoint wire.OutPoint) bool {
58175900
58185901 return chanCloser .IsRight ()
58195902}
5820-
5821- // TriggerCoopCloseRbfBump given a chan ID, and the params needed to trigger a
5822- // new RBF co-op close update, a bump is attempted. A channel used for updates,
5823- // along with one used to o=communicate any errors is returned. If no chan
5824- // closer is found, then false is returned for the second argument.
5825- func (p * Brontide ) TriggerCoopCloseRbfBump (ctx context.Context ,
5826- chanPoint wire.OutPoint , feeRate chainfee.SatPerKWeight ,
5827- deliveryScript lnwire.DeliveryAddress ) (* CoopCloseUpdates , error ) {
5828-
5829- // If RBF coop close isn't permitted, then we'll an error.
5830- if ! p .rbfCoopCloseAllowed () {
5831- return nil , fmt .Errorf ("rbf coop close not enabled for " +
5832- "channel" )
5833- }
5834-
5835- closeUpdates := & CoopCloseUpdates {
5836- UpdateChan : make (chan interface {}, 1 ),
5837- ErrChan : make (chan error , 1 ),
5838- }
5839-
5840- // We'll re-use the existing switch struct here, even though we're
5841- // bypassing the switch entirely.
5842- closeReq := htlcswitch.ChanClose {
5843- CloseType : contractcourt .CloseRegular ,
5844- ChanPoint : & chanPoint ,
5845- TargetFeePerKw : feeRate ,
5846- DeliveryScript : deliveryScript ,
5847- Updates : closeUpdates .UpdateChan ,
5848- Err : closeUpdates .ErrChan ,
5849- Ctx : ctx ,
5850- }
5851-
5852- err := p .startRbfChanCloser (newRPCShutdownInit (& closeReq ), chanPoint )
5853- if err != nil {
5854- return nil , err
5855- }
5856-
5857- return closeUpdates , nil
5858- }
0 commit comments