Skip to content

Commit 70ab54c

Browse files
committed
Insert a start_batch message during splices
In lightning/bolts#1160, we introduce a message to let our peer know how many `commit_sig` messages they will receive and treat them as a batch. This replaces our previous version that did something similar, but by adding a batch TLV in every `commit_sig` message we send. We currently do both: we keep inserting the experimental batch TLV, and we start by sending a `start_batch` message (with the same information). Since it is an odd message (127), it should be safely ignored if our peer doesn't understand it.
1 parent 252099f commit 70ab54c

7 files changed

Lines changed: 233 additions & 13 deletions

File tree

eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,10 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
208208

209209
case Event(msg: LightningMessage, d: ConnectedData) if sender() != d.transport => // if the message doesn't originate from the transport, it is an outgoing message
210210
msg match {
211-
case batch: CommitSigBatch => batch.messages.foreach(msg => d.transport forward msg)
211+
case batch: CommitSigBatch =>
212+
// We insert a start_batch message to let our peer know how many commit_sig they will receive.
213+
d.transport forward StartBatch.commitSigBatch(batch.channelId, batch.batchSize)
214+
batch.messages.foreach(msg => d.transport forward msg)
212215
case msg => d.transport forward msg
213216
}
214217
msg match {
@@ -349,7 +352,50 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
349352
// We immediately forward messages to the peer, unless they are part of a batch, in which case we wait to
350353
// receive the whole batch before forwarding.
351354
msg match {
355+
case msg: StartBatch =>
356+
if (!msg.messageType_opt.contains(132)) {
357+
log.debug("ignoring start_batch: we only support batching commit_sig messages")
358+
d.transport ! Warning(msg.channelId, "invalid start_batch message: we only support batching commit_sig messages")
359+
stay()
360+
} else if (msg.batchSize > 20) {
361+
log.debug("ignoring start_batch with batch_size = {} > 20", msg.batchSize)
362+
d.transport ! Warning(msg.channelId, "invalid start_batch message: batch_size must not be greater than 20")
363+
stay()
364+
} else {
365+
log.debug("starting commit_sig batch of size {} for channel_id={}", msg.batchSize, msg.channelId)
366+
d.commitSigBatch_opt match {
367+
case Some(pending) if pending.received.nonEmpty =>
368+
log.warning("starting batch with incomplete previous batch ({}/{} received)", pending.received.size, pending.batchSize)
369+
// This is a spec violation from our peer: this will likely lead to a force-close.
370+
d.transport ! Warning(msg.channelId, "invalid start_batch message: the previous batch is not done yet")
371+
d.peer ! CommitSigBatch(pending.received)
372+
case _ => ()
373+
}
374+
stay() using d.copy(commitSigBatch_opt = Some(PendingCommitSigBatch(msg.channelId, msg.batchSize, Nil)))
375+
}
376+
case msg: HasChannelId if d.commitSigBatch_opt.nonEmpty =>
377+
// We only support batches of commit_sig messages: other messages will simply be relayed individually.
378+
val pending = d.commitSigBatch_opt.get
379+
msg match {
380+
case msg: CommitSig if msg.channelId == pending.channelId =>
381+
val received1 = pending.received :+ msg
382+
if (received1.size == pending.batchSize) {
383+
log.debug("received last commit_sig in batch for channel_id={}", msg.channelId)
384+
d.peer ! CommitSigBatch(received1)
385+
stay() using d.copy(commitSigBatch_opt = None)
386+
} else {
387+
log.debug("received commit_sig {}/{} in batch for channel_id={}", received1.size, pending.batchSize, msg.channelId)
388+
stay() using d.copy(commitSigBatch_opt = Some(pending.copy(received = received1)))
389+
}
390+
case _ =>
391+
log.warning("received {} as part of a batch: we don't support batching that kind of messages", msg.getClass.getSimpleName)
392+
if (pending.received.nonEmpty) d.peer ! CommitSigBatch(pending.received)
393+
d.peer ! msg
394+
stay() using d.copy(commitSigBatch_opt = None)
395+
}
352396
case msg: CommitSig =>
397+
// We keep supporting the experimental version of splicing that older Phoenix wallets use.
398+
// Once we're confident that enough Phoenix users have upgraded, we should remove this branch.
353399
msg.tlvStream.get[CommitSigTlv.ExperimentalBatchTlv].map(_.size) match {
354400
case Some(batchSize) if batchSize > 25 =>
355401
log.warning("received legacy batch of commit_sig exceeding our threshold ({} > 25), processing messages individually", batchSize)
@@ -614,6 +660,7 @@ object PeerConnection {
614660
gossipTimestampFilter: Option[GossipTimestampFilter] = None,
615661
behavior: Behavior = Behavior(),
616662
expectedPong_opt: Option[ExpectedPong] = None,
663+
commitSigBatch_opt: Option[PendingCommitSigBatch] = None,
617664
legacyCommitSigBatch_opt: Option[PendingCommitSigBatch] = None,
618665
isPersistent: Boolean) extends Data with HasTransport
619666

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,13 @@ object ClosingSigTlv {
403403
)
404404
}
405405

406+
sealed trait StartBatchTlv extends Tlv
407+
408+
object StartBatchTlv {
409+
/** Type of [[LightningMessage]] that is included in the batch, when batching a single message type. */
410+
case class MessageType(tag: Int) extends StartBatchTlv
411+
412+
val startBatchTlvCodec: Codec[TlvStream[StartBatchTlv]] = tlvStream(discriminated[StartBatchTlv].by(varint)
413+
.typecase(UInt64(1), tlvField(uint16.as[MessageType]))
414+
)
415+
}

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ object CommitSigTlv {
108108

109109
/**
110110
* The experimental version of splicing included the number of [[CommitSig]] messages in the batch.
111-
* This TLV can be removed once Phoenix users have upgraded to the official version of splicing.
111+
* This TLV can be removed once Phoenix users have upgraded to the official version of splicing and use the [[StartBatch]] message.
112112
*/
113113
case class ExperimentalBatchTlv(size: Int) extends CommitSigTlv
114114

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,11 @@ object LightningMessageCodecs {
244244
("lockTime" | uint32) ::
245245
("tlvStream" | ClosingSigTlv.closingSigTlvCodec)).as[ClosingSig]
246246

247+
val startBatchCodec: Codec[StartBatch] = (
248+
("channelId" | bytes32) ::
249+
("batchSize" | uint16) ::
250+
("tlvStream" | StartBatchTlv.startBatchTlvCodec)).as[StartBatch]
251+
247252
val updateAddHtlcCodec: Codec[UpdateAddHtlc] = (
248253
("channelId" | bytes32) ::
249254
("id" | uint64overflow) ::
@@ -525,6 +530,7 @@ object LightningMessageCodecs {
525530
.typecase(72, txInitRbfCodec)
526531
.typecase(73, txAckRbfCodec)
527532
.typecase(74, txAbortCodec)
533+
.typecase(127, startBatchCodec)
528534
.typecase(128, updateAddHtlcCodec)
529535
.typecase(130, updateFulfillHtlcCodec)
530536
.typecase(131, updateFailHtlcCodec)

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,15 @@ case class ClosingSig(channelId: ByteVector32, closerScriptPubKey: ByteVector, c
478478
val nextCloseeNonce_opt: Option[IndividualNonce] = tlvStream.get[ClosingSigTlv.NextCloseeNonce].map(_.nonce)
479479
}
480480

481+
/** This message is used to indicate that the next [[batchSize]] messages form a single logical message. */
482+
case class StartBatch(channelId: ByteVector32, batchSize: Int, tlvStream: TlvStream[StartBatchTlv] = TlvStream.empty) extends ChannelMessage with HasChannelId {
483+
val messageType_opt: Option[Long] = tlvStream.get[StartBatchTlv.MessageType].map(_.tag)
484+
}
485+
486+
object StartBatch {
487+
def commitSigBatch(channelId: ByteVector32, batchSize: Int): StartBatch = StartBatch(channelId, batchSize, TlvStream(StartBatchTlv.MessageType(132)))
488+
}
489+
481490
case class UpdateAddHtlc(channelId: ByteVector32,
482491
id: Long,
483492
amountMsat: MilliSatoshi,

eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala

Lines changed: 142 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
346346
CommitSig(channelId, IndividualSignature(randomBytes64()), Nil),
347347
)
348348
probe.send(peerConnection, CommitSigBatch(commitSigs))
349+
// We insert a start_batch message.
350+
transport.expectMsg(StartBatch(channelId, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132))))
349351
commitSigs.foreach(commitSig => transport.expectMsg(commitSig))
350352
transport.expectNoMessage(100 millis)
351353
}
@@ -357,8 +359,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
357359
// We receive a batch of commit_sig messages from a first channel.
358360
val channelId1 = randomBytes32()
359361
val commitSigs1 = Seq(
360-
CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2),
361-
CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2),
362+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))),
363+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))),
362364
)
363365
transport.send(peerConnection, commitSigs1.head)
364366
transport.expectMsg(TransportHandler.ReadAck(commitSigs1.head))
@@ -370,9 +372,9 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
370372
// We receive a batch of commit_sig messages from a second channel.
371373
val channelId2 = randomBytes32()
372374
val commitSigs2 = Seq(
373-
CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3),
374-
CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3),
375-
CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3),
375+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(3))),
376+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(3))),
377+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(3))),
376378
)
377379
commitSigs2.dropRight(1).foreach(commitSig => {
378380
transport.send(peerConnection, commitSig)
@@ -385,8 +387,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
385387

386388
// We receive another batch of commit_sig messages from the first channel, with unrelated messages in the batch.
387389
val commitSigs3 = Seq(
388-
CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2),
389-
CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2),
390+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))),
391+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))),
390392
)
391393
transport.send(peerConnection, commitSigs3.head)
392394
transport.expectMsg(TransportHandler.ReadAck(commitSigs3.head))
@@ -406,9 +408,9 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
406408
// We start receiving a batch of commit_sig messages from the first channel, interleaved with a batch from the second
407409
// channel, which is not supported.
408410
val commitSigs4 = Seq(
409-
CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2),
410-
CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2),
411-
CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2),
411+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))),
412+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))),
413+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))),
412414
)
413415
transport.send(peerConnection, commitSigs4.head)
414416
transport.expectMsg(TransportHandler.ReadAck(commitSigs4.head))
@@ -421,14 +423,143 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
421423
peer.expectMsg(CommitSigBatch(commitSigs4.tail))
422424

423425
// We receive a batch that exceeds our threshold: we process them individually.
424-
val invalidCommitSigs = (0 until 30).map(_ => CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 30))
426+
val invalidCommitSigs = (0 until 30).map(_ => CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(30))))
425427
invalidCommitSigs.foreach(commitSig => {
426428
transport.send(peerConnection, commitSig)
427429
transport.expectMsg(TransportHandler.ReadAck(commitSig))
428430
peer.expectMsg(commitSig)
429431
})
430432
}
431433

434+
test("receive batch of commit_sig messages") { f =>
435+
import f._
436+
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer)
437+
438+
// A first channel has a pending splice.
439+
val channelId1 = randomBytes32()
440+
val startBatch1 = StartBatch(channelId1, batchSize = 2, TlvStream(StartBatchTlv.MessageType(132)))
441+
val commitSigs1 = Seq(
442+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
443+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
444+
)
445+
transport.send(peerConnection, startBatch1)
446+
transport.expectMsg(TransportHandler.ReadAck(startBatch1))
447+
transport.send(peerConnection, commitSigs1(0))
448+
transport.expectMsg(TransportHandler.ReadAck(commitSigs1(0)))
449+
peer.expectNoMessage(100 millis)
450+
transport.send(peerConnection, commitSigs1(1))
451+
transport.expectMsg(TransportHandler.ReadAck(commitSigs1(1)))
452+
peer.expectMsg(CommitSigBatch(commitSigs1))
453+
454+
// Another channel has 2 pending splices.
455+
val channelId2 = randomBytes32()
456+
val startBatch2 = StartBatch(channelId2, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132)))
457+
val commitSigs2 = Seq(
458+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
459+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
460+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
461+
)
462+
transport.send(peerConnection, startBatch2)
463+
transport.expectMsg(TransportHandler.ReadAck(startBatch2))
464+
transport.send(peerConnection, commitSigs2(0))
465+
transport.expectMsg(TransportHandler.ReadAck(commitSigs2(0)))
466+
transport.send(peerConnection, commitSigs2(1))
467+
transport.expectMsg(TransportHandler.ReadAck(commitSigs2(1)))
468+
peer.expectNoMessage(100 millis)
469+
transport.send(peerConnection, commitSigs2(2))
470+
transport.expectMsg(TransportHandler.ReadAck(commitSigs2(2)))
471+
peer.expectMsg(CommitSigBatch(commitSigs2))
472+
473+
// We receive another batch of commit_sig messages for the first channel.
474+
val startBatch3 = StartBatch(channelId1, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132)))
475+
val commitSigs3 = Seq(
476+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
477+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
478+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
479+
)
480+
transport.send(peerConnection, startBatch3)
481+
transport.expectMsg(TransportHandler.ReadAck(startBatch3))
482+
transport.send(peerConnection, commitSigs3(0))
483+
transport.expectMsg(TransportHandler.ReadAck(commitSigs3(0)))
484+
transport.send(peerConnection, commitSigs3(1))
485+
transport.expectMsg(TransportHandler.ReadAck(commitSigs3(1)))
486+
peer.expectNoMessage(100 millis)
487+
transport.send(peerConnection, commitSigs3(2))
488+
transport.expectMsg(TransportHandler.ReadAck(commitSigs3(2)))
489+
peer.expectMsg(CommitSigBatch(commitSigs3))
490+
491+
// We reject batches that mix unrelated channels.
492+
val startBatch4 = StartBatch(channelId1, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132)))
493+
val commitSigs4 = Seq(
494+
CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
495+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
496+
CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))),
497+
)
498+
transport.send(peerConnection, startBatch4)
499+
transport.expectMsg(TransportHandler.ReadAck(startBatch4))
500+
transport.send(peerConnection, commitSigs4(0))
501+
transport.expectMsg(TransportHandler.ReadAck(commitSigs4(0)))
502+
peer.expectNoMessage(100 millis)
503+
transport.send(peerConnection, commitSigs4(1))
504+
transport.expectMsg(TransportHandler.ReadAck(commitSigs4(1)))
505+
peer.expectMsg(CommitSigBatch(commitSigs4.take(1)))
506+
peer.expectMsg(commitSigs4(1))
507+
peer.expectNoMessage(100 millis)
508+
transport.send(peerConnection, commitSigs4(2))
509+
transport.expectMsg(TransportHandler.ReadAck(commitSigs4(2)))
510+
peer.expectMsg(commitSigs4(2))
511+
peer.expectNoMessage(100 millis)
512+
}
513+
514+
test("receive unsupported batch of channel messages") { f =>
515+
import f._
516+
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer)
517+
518+
// We receive a batch of splice_locked messages: we forward them individually.
519+
val channelId = randomBytes32()
520+
val startBatch1 = StartBatch(channelId, batchSize = 2) // note that start_batch doesn't indicate the message type
521+
val spliceLocked1 = SpliceLocked(channelId, randomTxId())
522+
val spliceLocked2 = SpliceLocked(channelId, randomTxId())
523+
transport.send(peerConnection, startBatch1)
524+
transport.expectMsg(TransportHandler.ReadAck(startBatch1))
525+
transport.expectMsgType[Warning]
526+
transport.send(peerConnection, spliceLocked1)
527+
transport.expectMsg(TransportHandler.ReadAck(spliceLocked1))
528+
peer.expectMsg(spliceLocked1)
529+
transport.send(peerConnection, spliceLocked2)
530+
transport.expectMsg(TransportHandler.ReadAck(spliceLocked2))
531+
peer.expectMsg(spliceLocked2)
532+
533+
// We receive a batch containing commit_sig and an unrelated message.
534+
val startBatch2 = StartBatch(channelId, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132)))
535+
val commitSig1 = CommitSig(channelId, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId())))
536+
val commitSig2 = CommitSig(channelId, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId())))
537+
val spliceLocked3 = SpliceLocked(channelId, randomTxId())
538+
transport.send(peerConnection, startBatch2)
539+
transport.expectMsg(TransportHandler.ReadAck(startBatch2))
540+
transport.send(peerConnection, commitSig1)
541+
transport.expectMsg(TransportHandler.ReadAck(commitSig1))
542+
transport.send(peerConnection, commitSig2)
543+
transport.expectMsg(TransportHandler.ReadAck(commitSig2))
544+
peer.expectNoMessage(100 millis)
545+
transport.send(peerConnection, spliceLocked3)
546+
transport.expectMsg(TransportHandler.ReadAck(spliceLocked3))
547+
peer.expectMsg(CommitSigBatch(commitSig1 :: commitSig2 :: Nil))
548+
peer.expectMsg(spliceLocked3)
549+
peer.expectNoMessage(100 millis)
550+
551+
// We receive a batch exceeding 20 elements: we relay messages individually.
552+
val startBatch3 = StartBatch(channelId, batchSize = 21, TlvStream(StartBatchTlv.MessageType(132)))
553+
val commitSig3 = CommitSig(channelId, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId())))
554+
transport.send(peerConnection, startBatch3)
555+
transport.expectMsg(TransportHandler.ReadAck(startBatch3))
556+
transport.expectMsgType[Warning]
557+
transport.send(peerConnection, commitSig3)
558+
transport.expectMsg(TransportHandler.ReadAck(commitSig3))
559+
peer.expectMsg(commitSig3)
560+
peer.expectNoMessage(100 millis)
561+
}
562+
432563
test("react to peer's bad behavior") { f =>
433564
import f._
434565
val probe = TestProbe()

0 commit comments

Comments
 (0)