From 252099f3ac3adcb9fa7a2ed62cfe5485bf58ded1 Mon Sep 17 00:00:00 2001 From: t-bast Date: Mon, 2 Mar 2026 16:04:31 +0100 Subject: [PATCH 1/2] Add `funding_txid` to `commit_sig` In https://github.com/lightning/bolts/pull/1160 we add a TLV field to `commit_sig` messages to let the receiver know to which `funding_txid` this signature applies. This is more resilient than relying on the order of the `commit_sig` messages in the batch. This is an odd TLV, so we can start writing it right now without creating compatibility issues. We also slightly refactor existing code to make it easier to introduce a backwards-compat layer when migrating to the official splicing. We also increase the default number of RBF attempts allowed. --- eclair-core/src/main/resources/reference.conf | 2 +- .../fr/acinq/eclair/channel/Commitments.scala | 12 ++++--- .../channel/fund/InteractiveTxBuilder.scala | 2 +- .../fr/acinq/eclair/io/PeerConnection.scala | 2 +- .../eclair/wire/protocol/ChannelTlv.scala | 8 +++++ .../acinq/eclair/wire/protocol/HtlcTlv.scala | 24 ++++++++++---- .../wire/protocol/LightningMessageTypes.scala | 24 ++++++++------ .../channel/InteractiveTxBuilderSpec.scala | 2 +- .../states/e/NormalSplicesStateSpec.scala | 32 +++++++++++++------ .../acinq/eclair/io/PeerConnectionSpec.scala | 22 ++++++------- .../protocol/LightningMessageCodecsSpec.scala | 8 ++--- 11 files changed, 88 insertions(+), 50 deletions(-) diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index ba318dbedf..e452e9be2a 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -109,7 +109,7 @@ eclair { funding { // Each RBF attempt adds more data that we need to store and process, so we want to limit our peers to a reasonable use of RBF. remote-rbf-limits { - max-attempts = 5 // maximum number of RBF attempts our peer is allowed to make + max-attempts = 10 // maximum number of RBF attempts our peer is allowed to make attempt-delta-blocks = 6 // minimum number of blocks between RBF attempts } // Duration after which we abort a channel creation. If our peer seems unresponsive and doesn't complete the diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index 901f45ec85..dd18fca801 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -210,14 +210,14 @@ case class RemoteCommit(index: Long, spec: CommitmentSpec, txId: TxId, remotePer commitmentFormat match { case _: SegwitV0CommitmentFormat => val sig = remoteCommitTx.sign(fundingKey, remoteFundingPubKey) - Right(CommitSig(channelParams.channelId, sig, htlcSigs.toList, batchSize)) + Right(CommitSig(channelParams.channelId, commitInput.outPoint.txid, sig, htlcSigs.toList, batchSize)) case _: SimpleTaprootChannelCommitmentFormat => remoteNonce_opt match { case Some(remoteNonce) => val localNonce = NonceGenerator.signingNonce(fundingKey.publicKey, remoteFundingPubKey, commitInput.outPoint.txid) remoteCommitTx.partialSign(fundingKey, remoteFundingPubKey, localNonce, Seq(localNonce.publicNonce, remoteNonce)) match { case Left(_) => Left(InvalidCommitNonce(channelParams.channelId, commitInput.outPoint.txid, index)) - case Right(psig) => Right(CommitSig(channelParams.channelId, psig, htlcSigs.toList, batchSize)) + case Right(psig) => Right(CommitSig(channelParams.channelId, commitInput.outPoint.txid, psig, htlcSigs.toList, batchSize)) } case None => Left(MissingCommitNonce(channelParams.channelId, commitInput.outPoint.txid, index)) } @@ -650,7 +650,7 @@ case class Commitment(fundingTxIndex: Long, case None => return Left(MissingCommitNonce(params.channelId, fundingTxId, remoteCommit.index + 1)) } } - val commitSig = CommitSig(params.channelId, sig, htlcSigs.toList, batchSize) + val commitSig = CommitSig(params.channelId, fundingTxId, sig, htlcSigs.toList, batchSize) val nextRemoteCommit = RemoteCommit(remoteCommit.index + 1, spec, remoteCommitTx.tx.txid, remoteNextPerCommitmentPoint) Right((copy(nextRemoteCommit_opt = Some(nextRemoteCommit)), commitSig)) } @@ -1089,9 +1089,11 @@ case class Commitments(channelParams: ChannelParams, case _: CommitSig if active.size > 1 => return Left(CommitSigCountMismatch(channelId, active.size, 1)) case commitSig: CommitSig => Seq(commitSig) } - // Signatures are sent in order (most recent first), calling `zip` will drop trailing sigs that are for deactivated/pruned commitments. val commitKeys = LocalCommitmentKeys(channelParams, channelKeys, localCommitIndex + 1) - val active1 = active.zip(sigs).map { case (commitment, commit) => + val active1 = active.zipWithIndex.map { case (commitment, idx) => + // If the funding_txid isn't provided, we assume that signatures are sent in order (most recent first). + // This matches the behavior of peers who only support the experimental version of splicing. + val commit = sigs.find(_.fundingTxId_opt.contains(commitment.fundingTxId)).getOrElse(sigs(idx)) commitment.receiveCommit(channelParams, channelKeys, commitKeys, changes, commit) match { case Left(f) => return Left(f) case Right(commitment1) => commitment1 diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index bd48a66a6f..5edd4564d8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -949,7 +949,7 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon case Right(localSigOfRemoteTx) => val htlcSignatures = sortedHtlcTxs.map(_.localSig(remoteCommitmentKeys)).toList log.info(s"built remote commit number=${purpose.remoteCommitIndex} toLocalMsat=${remoteSpec.toLocal.toLong} toRemoteMsat=${remoteSpec.toRemote.toLong} htlc_in={} htlc_out={} feeratePerKw=${remoteSpec.commitTxFeerate} txid=${remoteCommitTx.tx.txid} fundingTxId=${fundingTx.txid}", remoteSpec.htlcs.collect(DirectedHtlc.outgoing).map(_.id).mkString(","), remoteSpec.htlcs.collect(DirectedHtlc.incoming).map(_.id).mkString(",")) - val localCommitSig = CommitSig(fundingParams.channelId, localSigOfRemoteTx, htlcSignatures, batchSize = 1) + val localCommitSig = CommitSig(fundingParams.channelId, fundingTx.txid, localSigOfRemoteTx, htlcSignatures, batchSize = 1) val localCommit = UnsignedLocalCommit(purpose.localCommitIndex, localSpec, localCommitTx.tx.txid) val remoteCommit = RemoteCommit(purpose.remoteCommitIndex, remoteSpec, remoteCommitTx.tx.txid, purpose.remotePerCommitmentPoint) signFundingTx(completeTx, remoteFundingNonce_opt, remoteCommitNonces_opt.map(_.nextCommitNonce), localCommitSig, localCommit, remoteCommit) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala index 2ee93d0f2f..55ae36b8b9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala @@ -350,7 +350,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A // receive the whole batch before forwarding. msg match { case msg: CommitSig => - msg.tlvStream.get[CommitSigTlv.BatchTlv].map(_.size) match { + msg.tlvStream.get[CommitSigTlv.ExperimentalBatchTlv].map(_.size) match { case Some(batchSize) if batchSize > 25 => log.warning("received legacy batch of commit_sig exceeding our threshold ({} > 25), processing messages individually", batchSize) // We don't want peers to be able to exhaust our memory by sending batches of dummy messages that we keep in RAM. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala index ac0f921e50..521ddf8833 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala @@ -255,8 +255,16 @@ sealed trait ChannelReestablishTlv extends Tlv object ChannelReestablishTlv { + /** + * When disconnected in the middle of an interactive-tx session, this field is used to request a retransmission of + * [[TxSignatures]] for the given [[txId]]. + */ case class NextFundingTlv(txId: TxId) extends ChannelReestablishTlv + + /** The txid of the last [[ChannelReady]] or [[SpliceLocked]] message received before disconnecting, if any. */ case class YourLastFundingLockedTlv(txId: TxId) extends ChannelReestablishTlv + + /** The txid of our latest outgoing [[ChannelReady]] or [[SpliceLocked]] for this channel. */ case class MyCurrentFundingLockedTlv(txId: TxId) extends ChannelReestablishTlv /** diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala index e42c216713..c63271138b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala @@ -96,12 +96,23 @@ sealed trait CommitSigTlv extends Tlv object CommitSigTlv { - /** @param size the number of [[CommitSig]] messages in the batch */ - case class BatchTlv(size: Int) extends CommitSigTlv + /** + * While a splice is ongoing and not locked, we have multiple valid commitments. + * We send one [[CommitSig]] message for each valid commitment: this field maps it to the corresponding funding transaction. + * + * @param txId the funding transaction spent by this commitment. + */ + case class FundingTx(txId: TxId) extends CommitSigTlv - object BatchTlv { - val codec: Codec[BatchTlv] = tlvField(tu16) - } + private val fundingTxTlv: Codec[FundingTx] = tlvField(txIdAsHash) + + /** + * The experimental version of splicing included the number of [[CommitSig]] messages in the batch. + * This TLV can be removed once Phoenix users have upgraded to the official version of splicing. + */ + case class ExperimentalBatchTlv(size: Int) extends CommitSigTlv + + private val experimentalBatchTlv: Codec[ExperimentalBatchTlv] = tlvField(tu16) /** Partial signature signature for the current commitment transaction, along with the signing nonce used (when using taproot channels). */ case class PartialSignatureWithNonceTlv(partialSigWithNonce: PartialSignatureWithNonce) extends CommitSigTlv @@ -111,8 +122,9 @@ object CommitSigTlv { } val commitSigTlvCodec: Codec[TlvStream[CommitSigTlv]] = tlvStream(discriminated[CommitSigTlv].by(varint) + .typecase(UInt64(1), fundingTxTlv) .typecase(UInt64(2), PartialSignatureWithNonceTlv.codec) - .typecase(UInt64(0x47010005), BatchTlv.codec) + .typecase(UInt64(0x47010005), experimentalBatchTlv) ) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala index f4b60ccc30..bce1f33ce2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala @@ -102,7 +102,10 @@ case class TxAddInput(channelId: ByteVector32, object TxAddInput { def apply(channelId: ByteVector32, serialId: UInt64, sharedInput: OutPoint, sequence: Long): TxAddInput = { - TxAddInput(channelId, serialId, None, sharedInput.index, sequence, TlvStream(TxAddInputTlv.SharedInputTxId(sharedInput.txid))) + val tlvs = Set[TxAddInputTlv]( + TxAddInputTlv.SharedInputTxId(sharedInput.txid), + ) + TxAddInput(channelId, serialId, None, sharedInput.index, sequence, TlvStream(tlvs)) } } @@ -146,12 +149,11 @@ case class TxSignatures(channelId: ByteVector32, object TxSignatures { def apply(channelId: ByteVector32, tx: Transaction, witnesses: Seq[ScriptWitness], previousFundingSig_opt: Option[ChannelSpendSignature]): TxSignatures = { - val tlvs: Set[TxSignaturesTlv] = Set( - previousFundingSig_opt.map { - case IndividualSignature(sig) => TxSignaturesTlv.PreviousFundingTxSig(sig) - case partialSig: PartialSignatureWithNonce => TxSignaturesTlv.PreviousFundingTxPartialSig(partialSig) - } - ).flatten + val tlvs: Set[TxSignaturesTlv] = previousFundingSig_opt match { + case Some(IndividualSignature(sig)) => Set(TxSignaturesTlv.PreviousFundingTxSig(sig)) + case Some(partialSig: PartialSignatureWithNonce) => Set(TxSignaturesTlv.PreviousFundingTxPartialSig(partialSig)) + case None => Set.empty + } TxSignatures(channelId, tx.txid, witnesses, TlvStream(tlvs)) } } @@ -545,19 +547,21 @@ case class CommitSig(channelId: ByteVector32, signature: IndividualSignature, htlcSignatures: List[ByteVector64], tlvStream: TlvStream[CommitSigTlv] = TlvStream.empty) extends CommitSigs { + val fundingTxId_opt: Option[TxId] = tlvStream.get[CommitSigTlv.FundingTx].map(_.txId) val partialSignature_opt: Option[PartialSignatureWithNonce] = tlvStream.get[CommitSigTlv.PartialSignatureWithNonceTlv].map(_.partialSigWithNonce) val sigOrPartialSig: ChannelSpendSignature = partialSignature_opt.getOrElse(signature) } object CommitSig { - def apply(channelId: ByteVector32, signature: ChannelSpendSignature, htlcSignatures: List[ByteVector64], batchSize: Int): CommitSig = { + def apply(channelId: ByteVector32, fundingTxId: TxId, signature: ChannelSpendSignature, htlcSignatures: List[ByteVector64], batchSize: Int): CommitSig = { val (individualSig, partialSig_opt) = signature match { case sig: IndividualSignature => (sig, None) case psig: PartialSignatureWithNonce => (IndividualSignature(ByteVector64.Zeroes), Some(psig)) } val tlvs = Set( - if (batchSize > 1) Some(CommitSigTlv.BatchTlv(batchSize)) else None, - partialSig_opt.map(CommitSigTlv.PartialSignatureWithNonceTlv(_)) + Some(CommitSigTlv.FundingTx(fundingTxId)), + partialSig_opt.map(CommitSigTlv.PartialSignatureWithNonceTlv(_)), + if (batchSize > 1) Some(CommitSigTlv.ExperimentalBatchTlv(batchSize)) else None, ).flatten[CommitSigTlv] CommitSig(channelId, individualSig, htlcSignatures, TlvStream(tlvs)) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala index b8d7df7b83..def27c0db4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/InteractiveTxBuilderSpec.scala @@ -3030,7 +3030,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit bob ! ReceiveMessage(alice2bob.expectMsgType[SendMessage].msg.asInstanceOf[TxComplete]) // Alice <-- commit_sig --- Bob val successA1 = alice2bob.expectMsgType[Succeeded] - val invalidCommitSig = CommitSig(params.channelId, PartialSignatureWithNonce(randomBytes32(), txCompleteBob.commitNonces_opt.get.commitNonce), Nil, batchSize = 1) + val invalidCommitSig = CommitSig(params.channelId, successA1.signingSession.fundingTxId, PartialSignatureWithNonce(randomBytes32(), txCompleteBob.commitNonces_opt.get.commitNonce), Nil, batchSize = 1) val Left(error) = successA1.signingSession.receiveCommitSig(params.channelParamsA, params.channelKeysA, invalidCommitSig, params.nodeParamsA.currentBlockHeight)(akka.event.NoLogging) assert(error.isInstanceOf[InvalidCommitmentSignature]) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala index 17e2aebdcb..dfab1676eb 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala @@ -1650,16 +1650,18 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice ! CMD_SIGN() val sigsA = alice2bob.expectMsgType[CommitSigBatch] assert(sigsA.batchSize == 2) + assert(sigsA.messages.flatMap(_.fundingTxId_opt).toSet == alice.commitments.active.map(_.fundingTxId).toSet) alice2bob.forward(bob, sigsA) bob2alice.expectMsgType[RevokeAndAck] bob2alice.forward(alice) val sigsB = bob2alice.expectMsgType[CommitSigBatch] assert(sigsB.batchSize == 2) + assert(sigsB.messages.flatMap(_.fundingTxId_opt).toSet == alice.commitments.active.map(_.fundingTxId).toSet) bob2alice.forward(alice, sigsB) alice2bob.expectMsgType[RevokeAndAck] alice2bob.forward(bob) - awaitCond(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.active.forall(_.localCommit.spec.htlcs.size == 1)) - awaitCond(bob.stateData.asInstanceOf[DATA_NORMAL].commitments.active.forall(_.localCommit.spec.htlcs.size == 1)) + awaitCond(alice.commitments.active.forall(_.localCommit.spec.htlcs.size == 1)) + awaitCond(bob.commitments.active.forall(_.localCommit.spec.htlcs.size == 1)) } test("recv CMD_ADD_HTLC with multiple commitments (missing nonces)", Tag(ChannelStateTestsTags.OptionSimpleTaproot)) { f => @@ -1707,11 +1709,13 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice2bob.forward(bob) val sigsA = alice2bob.expectMsgType[CommitSigBatch] assert(sigsA.batchSize == 2) + assert(sigsA.messages.flatMap(_.fundingTxId_opt).toSet == alice.commitments.active.map(_.fundingTxId).toSet) alice2bob.forward(bob, sigsA) assert(bob2alice.expectMsgType[RevokeAndAck].nextCommitNonces.size == 2) bob2alice.forward(alice) val sigsB = bob2alice.expectMsgType[CommitSigBatch] assert(sigsB.batchSize == 2) + assert(sigsB.messages.flatMap(_.fundingTxId_opt).toSet == alice.commitments.active.map(_.fundingTxId).toSet) bob2alice.forward(alice, sigsB) assert(alice2bob.expectMsgType[RevokeAndAck].nextCommitNonces.size == 2) alice2bob.forward(bob) @@ -1840,7 +1844,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice2bob.forward(bob, commitSigsAlice) bob2alice.expectMsgType[RevokeAndAck] bob2alice.forward(alice) - bob2alice.expectMsgType[CommitSig] + assert(bob2alice.expectMsgType[CommitSig].fundingTxId_opt.contains(spliceTx2.txid)) bob2alice.forward(alice) alice2bob.expectMsgType[RevokeAndAck] alice2bob.forward(bob) @@ -1944,10 +1948,12 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val bobCommitIndex = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommitIndex val sender = initiateSpliceWithoutSigs(f, spliceIn_opt = Some(SpliceIn(500_000 sat)), spliceOut_opt = Some(SpliceOut(100_000 sat, defaultSpliceOutScriptPubKey))) - alice2bob.expectMsgType[CommitSig] // Bob doesn't receive Alice's commit_sig - bob2alice.expectMsgType[CommitSig] // Alice doesn't receive Bob's commit_sig + val commitSigA = alice2bob.expectMsgType[CommitSig] // Bob doesn't receive Alice's commit_sig + val commitSigB = bob2alice.expectMsgType[CommitSig] // Alice doesn't receive Bob's commit_sig awaitCond(alice.stateData.asInstanceOf[DATA_NORMAL].spliceStatus.isInstanceOf[SpliceStatus.SpliceWaitingForSigs]) val spliceStatus = alice.stateData.asInstanceOf[DATA_NORMAL].spliceStatus.asInstanceOf[SpliceStatus.SpliceWaitingForSigs] + assert(commitSigA.fundingTxId_opt.contains(spliceStatus.signingSession.fundingTx.txId)) + assert(commitSigB.fundingTxId_opt.contains(spliceStatus.signingSession.fundingTx.txId)) disconnect(f) @@ -1960,7 +1966,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice2bob.forward(bob, channelReestablishAlice1) bob2alice.forward(alice, channelReestablishBob1.copy(nextLocalCommitmentNumber = bobCommitIndex + 1)) // In that case Alice won't retransmit commit_sig and the splice won't complete since they haven't exchanged tx_signatures. - bob2alice.expectMsgType[CommitSig] + assert(bob2alice.expectMsgType[CommitSig].fundingTxId_opt.contains(spliceStatus.signingSession.fundingTx.txId)) bob2alice.forward(alice) alice2bob.expectNoMessage(100 millis) assert(alice.stateData.asInstanceOf[DATA_NORMAL].spliceStatus.isInstanceOf[SpliceStatus.SpliceWaitingForSigs]) @@ -1981,7 +1987,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(channelReestablishBob2.nextLocalCommitmentNumber == bobCommitIndex) // Alice retransmits commit_sig and both retransmit tx_signatures. - alice2bob.expectMsgType[CommitSig] + assert(alice2bob.expectMsgType[CommitSig].fundingTxId_opt.contains(spliceStatus.signingSession.fundingTx.txId)) alice2bob.forward(bob) bob2alice.expectMsgType[TxSignatures] bob2alice.forward(alice) @@ -1990,6 +1996,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik sender.expectMsgType[RES_SPLICE] val spliceTx = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.signedTx_opt.get + assert(spliceTx.txid == spliceStatus.signingSession.fundingTx.txId) alice2blockchain.expectWatchFundingConfirmed(spliceTx.txid) bob2blockchain.expectWatchFundingConfirmed(spliceTx.txid) alice ! WatchFundingConfirmedTriggered(BlockHeight(42), 0, spliceTx) @@ -2854,11 +2861,14 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice2bob.expectMsgType[TxSignatures] // Bob doesn't receive Alice's tx_signatures awaitCond(alice.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.NoSplice) + val fundingTxIds = alice.commitments.active.map(_.fundingTxId).toSet + assert(fundingTxIds.size == 2) val (_, cmd) = makeCmdAdd(25_000_000 msat, bob.nodeParams.nodeId, bob.nodeParams.currentBlockHeight) alice ! cmd.copy(commit = true) alice2bob.expectMsgType[UpdateAddHtlc] // Bob doesn't receive Alice's update_add_htlc inside(alice2bob.expectMsgType[CommitSigBatch]) { batch => // Bob doesn't receive Alice's commit_sigs assert(batch.batchSize == 2) + assert(batch.messages.flatMap(_.fundingTxId_opt).toSet == fundingTxIds) } alice2bob.expectNoMessage(100 millis) bob2alice.expectNoMessage(100 millis) @@ -2874,12 +2884,14 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice2bob.forward(bob) inside(alice2bob.expectMsgType[CommitSigBatch]) { batch => assert(batch.batchSize == 2) + assert(batch.messages.flatMap(_.fundingTxId_opt).toSet == fundingTxIds) alice2bob.forward(bob) } bob2alice.expectMsgType[RevokeAndAck] bob2alice.forward(alice) inside(bob2alice.expectMsgType[CommitSigBatch]) { batch => assert(batch.batchSize == 2) + assert(batch.messages.flatMap(_.fundingTxId_opt).toSet == fundingTxIds) bob2alice.forward(alice) } alice2bob.expectMsgType[RevokeAndAck] @@ -2932,12 +2944,12 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val sender = TestProbe() alice ! CMD_SIGN(Some(sender.ref)) sender.expectMsgType[RES_SUCCESS[CMD_SIGN]] - alice2bob.expectMsgType[CommitSig] + assert(alice2bob.expectMsgType[CommitSig].fundingTxId_opt.contains(fundingTx.txid)) alice2bob.forward(bob) bob2alice.forward(alice, bobSpliceLocked) bob2alice.expectMsgType[RevokeAndAck] bob2alice.forward(alice) - bob2alice.expectMsgType[CommitSig] + assert(bob2alice.expectMsgType[CommitSig].fundingTxId_opt.contains(fundingTx.txid)) bob2alice.forward(alice) alice2bob.expectMsgType[RevokeAndAck] alice2bob.forward(bob) @@ -2992,7 +3004,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(bob.commitments.active.size == 1) alice2bob.expectMsgType[UpdateAddHtlc] alice2bob.forward(bob) - assert(alice2bob.expectMsgType[CommitSig].tlvStream.get[CommitSigTlv.BatchTlv].isEmpty) + assert(alice2bob.expectMsgType[CommitSig].tlvStream.get[CommitSigTlv.ExperimentalBatchTlv].isEmpty) alice2bob.forward(bob) bob2alice.expectMsgType[RevokeAndAck] bob2alice.forward(alice) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala index 26aaccb9e0..0c0c8de527 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala @@ -357,8 +357,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We receive a batch of commit_sig messages from a first channel. val channelId1 = randomBytes32() val commitSigs1 = Seq( - CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), ) transport.send(peerConnection, commitSigs1.head) transport.expectMsg(TransportHandler.ReadAck(commitSigs1.head)) @@ -370,9 +370,9 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We receive a batch of commit_sig messages from a second channel. val channelId2 = randomBytes32() val commitSigs2 = Seq( - CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, batchSize = 3), - CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, batchSize = 3), - CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, batchSize = 3), + CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3), + CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3), + CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3), ) commitSigs2.dropRight(1).foreach(commitSig => { transport.send(peerConnection, commitSig) @@ -385,8 +385,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We receive another batch of commit_sig messages from the first channel, with unrelated messages in the batch. val commitSigs3 = Seq( - CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), ) transport.send(peerConnection, commitSigs3.head) transport.expectMsg(TransportHandler.ReadAck(commitSigs3.head)) @@ -406,9 +406,9 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We start receiving a batch of commit_sig messages from the first channel, interleaved with a batch from the second // channel, which is not supported. val commitSigs4 = Seq( - CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), ) transport.send(peerConnection, commitSigs4.head) transport.expectMsg(TransportHandler.ReadAck(commitSigs4.head)) @@ -421,7 +421,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi peer.expectMsg(CommitSigBatch(commitSigs4.tail)) // We receive a batch that exceeds our threshold: we process them individually. - val invalidCommitSigs = (0 until 30).map(_ => CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, batchSize = 30)) + val invalidCommitSigs = (0 until 30).map(_ => CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 30)) invalidCommitSigs.foreach(commitSig => { transport.send(peerConnection, commitSig) transport.expectMsg(TransportHandler.ReadAck(commitSig)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala index baaaf8c34b..366ede4d54 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala @@ -168,7 +168,7 @@ class LightningMessageCodecsSpec extends AnyFunSuite { hex"0088" ++ channelId ++ hex"0001020304050607 0809aabbccddeeff" ++ key.value ++ point.value ++ hex"fe47010000 07 bbbbbbbbbbbbbb" -> ChannelReestablish(channelId, 0x01020304050607L, 0x0809aabbccddeeffL, key, point, TlvStream[ChannelReestablishTlv](Set.empty[ChannelReestablishTlv], Set(GenericTlv(tlvTag, hex"bbbbbbbbbbbbbb")))), hex"0084" ++ channelId ++ signature ++ hex"0000" -> CommitSig(channelId, IndividualSignature(signature), Nil), - hex"0084" ++ channelId ++ ByteVector64.Zeroes ++ hex"0000" ++ hex"02 62" ++ partialSig ++ nonce.data -> CommitSig(channelId, PartialSignatureWithNonce(partialSig, nonce), Nil, batchSize = 1), + hex"0084" ++ channelId ++ ByteVector64.Zeroes ++ hex"0000" ++ hex"01 20" ++ txId.value.reverse ++ hex"02 62" ++ partialSig ++ nonce.data -> CommitSig(channelId, txId, PartialSignatureWithNonce(partialSig, nonce), Nil, batchSize = 1), hex"0084" ++ channelId ++ signature ++ hex"0000 fe47010000 00" -> CommitSig(channelId, IndividualSignature(signature), Nil, TlvStream[CommitSigTlv](Set.empty[CommitSigTlv], Set(GenericTlv(tlvTag, ByteVector.empty)))), hex"0084" ++ channelId ++ signature ++ hex"0000 fe47010000 07 cccccccccccccc" -> CommitSig(channelId, IndividualSignature(signature), Nil, TlvStream[CommitSigTlv](Set.empty[CommitSigTlv], Set(GenericTlv(tlvTag, hex"cccccccccccccc")))), @@ -696,9 +696,9 @@ class LightningMessageCodecsSpec extends AnyFunSuite { test("encode/decode commit_sig batch") { val channelId = randomBytes32() val batch = CommitSigBatch(Seq( - CommitSig(channelId, ChannelSpendSignature.IndividualSignature(randomBytes64()), Nil, batchSize = 3), - CommitSig(channelId, ChannelSpendSignature.IndividualSignature(randomBytes64()), Nil, batchSize = 3), - CommitSig(channelId, ChannelSpendSignature.IndividualSignature(randomBytes64()), Nil, batchSize = 3), + CommitSig(channelId, randomTxId(), ChannelSpendSignature.IndividualSignature(randomBytes64()), Nil, batchSize = 3), + CommitSig(channelId, randomTxId(), ChannelSpendSignature.IndividualSignature(randomBytes64()), Nil, batchSize = 3), + CommitSig(channelId, randomTxId(), ChannelSpendSignature.IndividualSignature(randomBytes64()), Nil, batchSize = 3), )) val encoded = lightningMessageCodec.encode(batch).require val decoded = lightningMessageCodec.decode(encoded).require.value From 70ab54c80d7b27f9d8485d98986e797231c79cbc Mon Sep 17 00:00:00 2001 From: t-bast Date: Mon, 2 Mar 2026 16:29:51 +0100 Subject: [PATCH 2/2] Insert a `start_batch` message during splices In https://github.com/lightning/bolts/pull/1160, we introduce a message to let our peer know how many `commit_sig` messages they will receive and treat them as a batch. This replaces our previous version that did something similar, but by adding a batch TLV in every `commit_sig` message we send. We currently do both: we keep inserting the experimental batch TLV, and we start by sending a `start_batch` message (with the same information). Since it is an odd message (127), it should be safely ignored if our peer doesn't understand it. --- .../fr/acinq/eclair/io/PeerConnection.scala | 49 +++++- .../eclair/wire/protocol/ChannelTlv.scala | 10 ++ .../acinq/eclair/wire/protocol/HtlcTlv.scala | 2 +- .../protocol/LightningMessageCodecs.scala | 6 + .../wire/protocol/LightningMessageTypes.scala | 9 ++ .../acinq/eclair/io/PeerConnectionSpec.scala | 153 ++++++++++++++++-- .../protocol/LightningMessageCodecsSpec.scala | 17 ++ 7 files changed, 233 insertions(+), 13 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala index 55ae36b8b9..78cbab37bc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala @@ -208,7 +208,10 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A case Event(msg: LightningMessage, d: ConnectedData) if sender() != d.transport => // if the message doesn't originate from the transport, it is an outgoing message msg match { - case batch: CommitSigBatch => batch.messages.foreach(msg => d.transport forward msg) + case batch: CommitSigBatch => + // We insert a start_batch message to let our peer know how many commit_sig they will receive. + d.transport forward StartBatch.commitSigBatch(batch.channelId, batch.batchSize) + batch.messages.foreach(msg => d.transport forward msg) case msg => d.transport forward msg } msg match { @@ -349,7 +352,50 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A // We immediately forward messages to the peer, unless they are part of a batch, in which case we wait to // receive the whole batch before forwarding. msg match { + case msg: StartBatch => + if (!msg.messageType_opt.contains(132)) { + log.debug("ignoring start_batch: we only support batching commit_sig messages") + d.transport ! Warning(msg.channelId, "invalid start_batch message: we only support batching commit_sig messages") + stay() + } else if (msg.batchSize > 20) { + log.debug("ignoring start_batch with batch_size = {} > 20", msg.batchSize) + d.transport ! Warning(msg.channelId, "invalid start_batch message: batch_size must not be greater than 20") + stay() + } else { + log.debug("starting commit_sig batch of size {} for channel_id={}", msg.batchSize, msg.channelId) + d.commitSigBatch_opt match { + case Some(pending) if pending.received.nonEmpty => + log.warning("starting batch with incomplete previous batch ({}/{} received)", pending.received.size, pending.batchSize) + // This is a spec violation from our peer: this will likely lead to a force-close. + d.transport ! Warning(msg.channelId, "invalid start_batch message: the previous batch is not done yet") + d.peer ! CommitSigBatch(pending.received) + case _ => () + } + stay() using d.copy(commitSigBatch_opt = Some(PendingCommitSigBatch(msg.channelId, msg.batchSize, Nil))) + } + case msg: HasChannelId if d.commitSigBatch_opt.nonEmpty => + // We only support batches of commit_sig messages: other messages will simply be relayed individually. + val pending = d.commitSigBatch_opt.get + msg match { + case msg: CommitSig if msg.channelId == pending.channelId => + val received1 = pending.received :+ msg + if (received1.size == pending.batchSize) { + log.debug("received last commit_sig in batch for channel_id={}", msg.channelId) + d.peer ! CommitSigBatch(received1) + stay() using d.copy(commitSigBatch_opt = None) + } else { + log.debug("received commit_sig {}/{} in batch for channel_id={}", received1.size, pending.batchSize, msg.channelId) + stay() using d.copy(commitSigBatch_opt = Some(pending.copy(received = received1))) + } + case _ => + log.warning("received {} as part of a batch: we don't support batching that kind of messages", msg.getClass.getSimpleName) + if (pending.received.nonEmpty) d.peer ! CommitSigBatch(pending.received) + d.peer ! msg + stay() using d.copy(commitSigBatch_opt = None) + } case msg: CommitSig => + // We keep supporting the experimental version of splicing that older Phoenix wallets use. + // Once we're confident that enough Phoenix users have upgraded, we should remove this branch. msg.tlvStream.get[CommitSigTlv.ExperimentalBatchTlv].map(_.size) match { case Some(batchSize) if batchSize > 25 => log.warning("received legacy batch of commit_sig exceeding our threshold ({} > 25), processing messages individually", batchSize) @@ -614,6 +660,7 @@ object PeerConnection { gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None, + commitSigBatch_opt: Option[PendingCommitSigBatch] = None, legacyCommitSigBatch_opt: Option[PendingCommitSigBatch] = None, isPersistent: Boolean) extends Data with HasTransport diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala index 521ddf8833..9aad296841 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/ChannelTlv.scala @@ -403,3 +403,13 @@ object ClosingSigTlv { ) } +sealed trait StartBatchTlv extends Tlv + +object StartBatchTlv { + /** Type of [[LightningMessage]] that is included in the batch, when batching a single message type. */ + case class MessageType(tag: Int) extends StartBatchTlv + + val startBatchTlvCodec: Codec[TlvStream[StartBatchTlv]] = tlvStream(discriminated[StartBatchTlv].by(varint) + .typecase(UInt64(1), tlvField(uint16.as[MessageType])) + ) +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala index c63271138b..42a733354e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/HtlcTlv.scala @@ -108,7 +108,7 @@ object CommitSigTlv { /** * The experimental version of splicing included the number of [[CommitSig]] messages in the batch. - * This TLV can be removed once Phoenix users have upgraded to the official version of splicing. + * This TLV can be removed once Phoenix users have upgraded to the official version of splicing and use the [[StartBatch]] message. */ case class ExperimentalBatchTlv(size: Int) extends CommitSigTlv diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala index 181268e474..f36c6d7b1f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala @@ -244,6 +244,11 @@ object LightningMessageCodecs { ("lockTime" | uint32) :: ("tlvStream" | ClosingSigTlv.closingSigTlvCodec)).as[ClosingSig] + val startBatchCodec: Codec[StartBatch] = ( + ("channelId" | bytes32) :: + ("batchSize" | uint16) :: + ("tlvStream" | StartBatchTlv.startBatchTlvCodec)).as[StartBatch] + val updateAddHtlcCodec: Codec[UpdateAddHtlc] = ( ("channelId" | bytes32) :: ("id" | uint64overflow) :: @@ -525,6 +530,7 @@ object LightningMessageCodecs { .typecase(72, txInitRbfCodec) .typecase(73, txAckRbfCodec) .typecase(74, txAbortCodec) + .typecase(127, startBatchCodec) .typecase(128, updateAddHtlcCodec) .typecase(130, updateFulfillHtlcCodec) .typecase(131, updateFailHtlcCodec) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala index bce1f33ce2..0ceb57534e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala @@ -478,6 +478,15 @@ case class ClosingSig(channelId: ByteVector32, closerScriptPubKey: ByteVector, c val nextCloseeNonce_opt: Option[IndividualNonce] = tlvStream.get[ClosingSigTlv.NextCloseeNonce].map(_.nonce) } +/** This message is used to indicate that the next [[batchSize]] messages form a single logical message. */ +case class StartBatch(channelId: ByteVector32, batchSize: Int, tlvStream: TlvStream[StartBatchTlv] = TlvStream.empty) extends ChannelMessage with HasChannelId { + val messageType_opt: Option[Long] = tlvStream.get[StartBatchTlv.MessageType].map(_.tag) +} + +object StartBatch { + def commitSigBatch(channelId: ByteVector32, batchSize: Int): StartBatch = StartBatch(channelId, batchSize, TlvStream(StartBatchTlv.MessageType(132))) +} + case class UpdateAddHtlc(channelId: ByteVector32, id: Long, amountMsat: MilliSatoshi, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala index 0c0c8de527..1250462270 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala @@ -346,6 +346,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi CommitSig(channelId, IndividualSignature(randomBytes64()), Nil), ) probe.send(peerConnection, CommitSigBatch(commitSigs)) + // We insert a start_batch message. + transport.expectMsg(StartBatch(channelId, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132)))) commitSigs.foreach(commitSig => transport.expectMsg(commitSig)) transport.expectNoMessage(100 millis) } @@ -357,8 +359,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We receive a batch of commit_sig messages from a first channel. val channelId1 = randomBytes32() val commitSigs1 = Seq( - CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))), ) transport.send(peerConnection, commitSigs1.head) transport.expectMsg(TransportHandler.ReadAck(commitSigs1.head)) @@ -370,9 +372,9 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We receive a batch of commit_sig messages from a second channel. val channelId2 = randomBytes32() val commitSigs2 = Seq( - CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3), - CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3), - CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 3), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(3))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(3))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(3))), ) commitSigs2.dropRight(1).foreach(commitSig => { transport.send(peerConnection, commitSig) @@ -385,8 +387,8 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We receive another batch of commit_sig messages from the first channel, with unrelated messages in the batch. val commitSigs3 = Seq( - CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))), ) transport.send(peerConnection, commitSigs3.head) transport.expectMsg(TransportHandler.ReadAck(commitSigs3.head)) @@ -406,9 +408,9 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi // We start receiving a batch of commit_sig messages from the first channel, interleaved with a batch from the second // channel, which is not supported. val commitSigs4 = Seq( - CommitSig(channelId1, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), - CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 2), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(2))), ) transport.send(peerConnection, commitSigs4.head) transport.expectMsg(TransportHandler.ReadAck(commitSigs4.head)) @@ -421,7 +423,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi peer.expectMsg(CommitSigBatch(commitSigs4.tail)) // We receive a batch that exceeds our threshold: we process them individually. - val invalidCommitSigs = (0 until 30).map(_ => CommitSig(channelId2, randomTxId(), IndividualSignature(randomBytes64()), Nil, batchSize = 30)) + val invalidCommitSigs = (0 until 30).map(_ => CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.ExperimentalBatchTlv(30)))) invalidCommitSigs.foreach(commitSig => { transport.send(peerConnection, commitSig) transport.expectMsg(TransportHandler.ReadAck(commitSig)) @@ -429,6 +431,135 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi }) } + test("receive batch of commit_sig messages") { f => + import f._ + connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer) + + // A first channel has a pending splice. + val channelId1 = randomBytes32() + val startBatch1 = StartBatch(channelId1, batchSize = 2, TlvStream(StartBatchTlv.MessageType(132))) + val commitSigs1 = Seq( + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + ) + transport.send(peerConnection, startBatch1) + transport.expectMsg(TransportHandler.ReadAck(startBatch1)) + transport.send(peerConnection, commitSigs1(0)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs1(0))) + peer.expectNoMessage(100 millis) + transport.send(peerConnection, commitSigs1(1)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs1(1))) + peer.expectMsg(CommitSigBatch(commitSigs1)) + + // Another channel has 2 pending splices. + val channelId2 = randomBytes32() + val startBatch2 = StartBatch(channelId2, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132))) + val commitSigs2 = Seq( + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + ) + transport.send(peerConnection, startBatch2) + transport.expectMsg(TransportHandler.ReadAck(startBatch2)) + transport.send(peerConnection, commitSigs2(0)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs2(0))) + transport.send(peerConnection, commitSigs2(1)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs2(1))) + peer.expectNoMessage(100 millis) + transport.send(peerConnection, commitSigs2(2)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs2(2))) + peer.expectMsg(CommitSigBatch(commitSigs2)) + + // We receive another batch of commit_sig messages for the first channel. + val startBatch3 = StartBatch(channelId1, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132))) + val commitSigs3 = Seq( + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + ) + transport.send(peerConnection, startBatch3) + transport.expectMsg(TransportHandler.ReadAck(startBatch3)) + transport.send(peerConnection, commitSigs3(0)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs3(0))) + transport.send(peerConnection, commitSigs3(1)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs3(1))) + peer.expectNoMessage(100 millis) + transport.send(peerConnection, commitSigs3(2)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs3(2))) + peer.expectMsg(CommitSigBatch(commitSigs3)) + + // We reject batches that mix unrelated channels. + val startBatch4 = StartBatch(channelId1, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132))) + val commitSigs4 = Seq( + CommitSig(channelId1, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + CommitSig(channelId2, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))), + ) + transport.send(peerConnection, startBatch4) + transport.expectMsg(TransportHandler.ReadAck(startBatch4)) + transport.send(peerConnection, commitSigs4(0)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs4(0))) + peer.expectNoMessage(100 millis) + transport.send(peerConnection, commitSigs4(1)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs4(1))) + peer.expectMsg(CommitSigBatch(commitSigs4.take(1))) + peer.expectMsg(commitSigs4(1)) + peer.expectNoMessage(100 millis) + transport.send(peerConnection, commitSigs4(2)) + transport.expectMsg(TransportHandler.ReadAck(commitSigs4(2))) + peer.expectMsg(commitSigs4(2)) + peer.expectNoMessage(100 millis) + } + + test("receive unsupported batch of channel messages") { f => + import f._ + connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer) + + // We receive a batch of splice_locked messages: we forward them individually. + val channelId = randomBytes32() + val startBatch1 = StartBatch(channelId, batchSize = 2) // note that start_batch doesn't indicate the message type + val spliceLocked1 = SpliceLocked(channelId, randomTxId()) + val spliceLocked2 = SpliceLocked(channelId, randomTxId()) + transport.send(peerConnection, startBatch1) + transport.expectMsg(TransportHandler.ReadAck(startBatch1)) + transport.expectMsgType[Warning] + transport.send(peerConnection, spliceLocked1) + transport.expectMsg(TransportHandler.ReadAck(spliceLocked1)) + peer.expectMsg(spliceLocked1) + transport.send(peerConnection, spliceLocked2) + transport.expectMsg(TransportHandler.ReadAck(spliceLocked2)) + peer.expectMsg(spliceLocked2) + + // We receive a batch containing commit_sig and an unrelated message. + val startBatch2 = StartBatch(channelId, batchSize = 3, TlvStream(StartBatchTlv.MessageType(132))) + val commitSig1 = CommitSig(channelId, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))) + val commitSig2 = CommitSig(channelId, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))) + val spliceLocked3 = SpliceLocked(channelId, randomTxId()) + transport.send(peerConnection, startBatch2) + transport.expectMsg(TransportHandler.ReadAck(startBatch2)) + transport.send(peerConnection, commitSig1) + transport.expectMsg(TransportHandler.ReadAck(commitSig1)) + transport.send(peerConnection, commitSig2) + transport.expectMsg(TransportHandler.ReadAck(commitSig2)) + peer.expectNoMessage(100 millis) + transport.send(peerConnection, spliceLocked3) + transport.expectMsg(TransportHandler.ReadAck(spliceLocked3)) + peer.expectMsg(CommitSigBatch(commitSig1 :: commitSig2 :: Nil)) + peer.expectMsg(spliceLocked3) + peer.expectNoMessage(100 millis) + + // We receive a batch exceeding 20 elements: we relay messages individually. + val startBatch3 = StartBatch(channelId, batchSize = 21, TlvStream(StartBatchTlv.MessageType(132))) + val commitSig3 = CommitSig(channelId, IndividualSignature(randomBytes64()), Nil, TlvStream(CommitSigTlv.FundingTx(randomTxId()))) + transport.send(peerConnection, startBatch3) + transport.expectMsg(TransportHandler.ReadAck(startBatch3)) + transport.expectMsgType[Warning] + transport.send(peerConnection, commitSig3) + transport.expectMsg(TransportHandler.ReadAck(commitSig3)) + peer.expectMsg(commitSig3) + peer.expectNoMessage(100 millis) + } + test("react to peer's bad behavior") { f => import f._ val probe = TestProbe() diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala index 366ede4d54..88ed2d15af 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala @@ -441,6 +441,23 @@ class LightningMessageCodecsSpec extends AnyFunSuite { } } + test("encode/decode start_batch message") { + val channelId = ByteVector32(hex"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + val testCases = Seq( + StartBatch(channelId, 1) -> hex"007f aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa 0001", + StartBatch(channelId, 7) -> hex"007f aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa 0007", + StartBatch.commitSigBatch(channelId, 7) -> hex"007f aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa 0007 01020084", + StartBatch(channelId, 7, TlvStream(StartBatchTlv.MessageType(57331))) -> hex"007f aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa 0007 0102dff3", + StartBatch(channelId, 32000) -> hex"007f aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa 7d00", + ) + testCases.foreach { case (msg, bin) => + val decoded = lightningMessageCodec.decode(bin.bits).require.value + assert(decoded == msg) + val encoded = lightningMessageCodec.encode(msg).require.bytes + assert(encoded == bin) + } + } + test("encode/decode closing_signed") { val defaultSig = ByteVector64(hex"01010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101") val testCases = Seq(