Skip to content

Commit e156d8d

Browse files
authored
Merge pull request #1017 from synonymdev/fix/cjit-channel-false-match
fix: match cjit entry to channel by funding tx
2 parents 65db306 + c5ad4fa commit e156d8d

7 files changed

Lines changed: 316 additions & 8 deletions

File tree

AGENTS.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ fun updateState(action: Action) {
153153

154154
```kotlin
155155
suspend fun getData(): Result<Data> = withContext(Dispatchers.IO) {
156-
runCatching {
156+
runSuspendCatching {
157157
apiService.fetchData()
158158
}.onFailure {
159159
Logger.error("Failed", it, context = TAG)
@@ -196,6 +196,8 @@ suspend fun getData(): Result<Data> = withContext(Dispatchers.IO) {
196196
- ALWAYS log errors at the final handling layer where the error is acted upon, not in intermediate layers that just propagate it
197197
- ALWAYS use the Result API instead of try-catch
198198
- NEVER wrap methods returning `Result<T>` in try-catch
199+
- ALWAYS use `runSuspendCatching` (from `ext/Coroutines.kt`) instead of `runCatching` when the block calls suspend functions or runs in a coroutine — it re-throws `CancellationException` so structured-concurrency cancellation is preserved; plain `runCatching` catches `Throwable` and swallows it. NEVER log a `CancellationException` as an error
200+
- EXCEPTION: when a `TimeoutCancellationException` from `withTimeout` must be treated as a retriable failure, use `runCatching` with an explicit `if (it is CancellationException && it !is TimeoutCancellationException) throw it` guard (e.g. `BlocktankRepo.refreshCjitEntries`)
199201
- PREFER to use `it` instead of explicit named parameters in lambdas e.g. `fn().onSuccess { log(it) }.onFailure { log(it) }`
200202
- NEVER inject ViewModels as dependencies - Only android activities and composable functions can use viewmodels
201203
- ALWAYS co-locate screen-specific ViewModels in the same package as their screen; only place ViewModels in `viewmodels/` when shared across multiple screens

app/src/main/java/to/bitkit/domain/commands/NotifyChannelReadyHandler.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineDispatcher
44
import kotlinx.coroutines.withContext
55
import to.bitkit.di.IoDispatcher
66
import to.bitkit.ext.amountOnClose
7+
import to.bitkit.ext.runSuspendCatching
78
import to.bitkit.models.NewTransactionSheetDetails
89
import to.bitkit.models.NewTransactionSheetDirection
910
import to.bitkit.models.NewTransactionSheetType
@@ -30,17 +31,17 @@ class NotifyChannelReadyHandler @Inject constructor(
3031
suspend operator fun invoke(
3132
command: NotifyChannelReady.Command,
3233
): Result<NotifyChannelReady.Result> = withContext(ioDispatcher) {
33-
runCatching {
34+
runSuspendCatching {
3435
val channel = lightningRepo.getChannels()
3536
?.find { it.channelId == command.event.channelId }
36-
?: return@runCatching NotifyChannelReady.Result.Skip
37+
?: return@runSuspendCatching NotifyChannelReady.Result.Skip
3738

3839
val cjitEntry = blocktankRepo.getCjitEntry(channel)
39-
?: return@runCatching NotifyChannelReady.Result.Skip
40+
?: return@runSuspendCatching NotifyChannelReady.Result.Skip
4041

4142
val inserted = activityRepo.insertActivityFromCjit(cjitEntry = cjitEntry, channel = channel)
4243
.getOrDefault(false)
43-
if (!inserted) return@runCatching NotifyChannelReady.Result.Duplicate
44+
if (!inserted) return@runSuspendCatching NotifyChannelReady.Result.Duplicate
4445

4546
val sats = channel.amountOnClose.toLong()
4647

app/src/main/java/to/bitkit/ext/Coroutines.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
package to.bitkit.ext
22

3+
import kotlinx.coroutines.CancellationException
34
import kotlinx.coroutines.Job
45
import to.bitkit.utils.Logger
56

7+
@Suppress("TooGenericExceptionCaught")
8+
suspend inline fun <R> runSuspendCatching(block: () -> R): Result<R> =
9+
try {
10+
Result.success(block())
11+
} catch (c: CancellationException) {
12+
throw c
13+
} catch (e: Throwable) {
14+
Result.failure(e)
15+
}
16+
617
fun Job.logCompletion(name: String = "") = invokeOnCompletion { err ->
718
if (err != null) {
819
Logger.verbose("Coroutine '$name' error: ${err.message}")

app/src/main/java/to/bitkit/repositories/BlocktankRepo.kt

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package to.bitkit.repositories
22

33
import androidx.compose.runtime.Stable
44
import com.synonym.bitkitcore.BtOrderState2
5+
import com.synonym.bitkitcore.CJitStateEnum
56
import com.synonym.bitkitcore.ChannelLiquidityOptions
67
import com.synonym.bitkitcore.ChannelLiquidityParams
78
import com.synonym.bitkitcore.CreateCjitOptions
@@ -18,10 +19,12 @@ import com.synonym.bitkitcore.giftPay
1819
import kotlinx.collections.immutable.ImmutableList
1920
import kotlinx.collections.immutable.persistentListOf
2021
import kotlinx.collections.immutable.toImmutableList
22+
import kotlinx.coroutines.CancellationException
2123
import kotlinx.coroutines.CoroutineDispatcher
2224
import kotlinx.coroutines.CoroutineScope
2325
import kotlinx.coroutines.CoroutineStart
2426
import kotlinx.coroutines.SupervisorJob
27+
import kotlinx.coroutines.TimeoutCancellationException
2528
import kotlinx.coroutines.async
2629
import kotlinx.coroutines.coroutineScope
2730
import kotlinx.coroutines.currentCoroutineContext
@@ -41,6 +44,7 @@ import kotlinx.coroutines.flow.update
4144
import kotlinx.coroutines.isActive
4245
import kotlinx.coroutines.launch
4346
import kotlinx.coroutines.withContext
47+
import kotlinx.coroutines.withTimeout
4448
import kotlinx.coroutines.withTimeoutOrNull
4549
import org.lightningdevkit.ldknode.Bolt11Invoice
4650
import org.lightningdevkit.ldknode.ChannelDetails
@@ -125,10 +129,48 @@ class BlocktankRepo @Inject constructor(
125129
}
126130

127131
suspend fun getCjitEntry(channel: ChannelDetails): IcJitEntry? = withContext(bgDispatcher) {
128-
return@withContext _blocktankState.value.cjitEntries.firstOrNull { order ->
129-
order.channelSizeSat == channel.channelValueSats &&
130-
order.lspNode.pubkey == channel.counterpartyNodeId
132+
val fundingTxo = channel.fundingTxo ?: return@withContext null
133+
134+
fun List<IcJitEntry>.matching(): IcJitEntry? = firstOrNull { entry ->
135+
val fundingTx = entry.channel?.fundingTx ?: return@firstOrNull false
136+
fundingTx.id == fundingTxo.txid && fundingTx.vout == fundingTxo.vout.toULong()
137+
}
138+
139+
val cached = _blocktankState.value.cjitEntries
140+
cached.matching()?.let { return@withContext it }
141+
142+
// A ChannelReady can only be a CJIT if a live cached entry is still awaiting its channel; otherwise skip
143+
// the server round-trip so a non-CJIT transfer confirmation isn't delayed by a slow Blocktank API.
144+
val hasPendingCjit = cached.any {
145+
it.channel == null && it.state != CJitStateEnum.EXPIRED && it.state != CJitStateEnum.FAILED
146+
}
147+
if (cached.isNotEmpty() && !hasPendingCjit) return@withContext null
148+
149+
// Match against the freshly fetched list so a concurrent refreshOrders() can't clobber state before we read.
150+
return@withContext refreshCjitEntries().matching()
151+
}
152+
153+
private suspend fun refreshCjitEntries(): List<IcJitEntry> {
154+
repeat(CJIT_REFRESH_ATTEMPTS) { attempt ->
155+
val entries = runCatching {
156+
withTimeout(CJIT_REFRESH_TIMEOUT) {
157+
coreService.blocktank.cjitEntries(refresh = true)
158+
}
159+
}.mapCatching { entries ->
160+
_blocktankState.update { it.copy(cjitEntries = entries.toImmutableList()) }
161+
entries
162+
}.getOrElse {
163+
if (it is CancellationException && it !is TimeoutCancellationException) throw it
164+
if (attempt == CJIT_REFRESH_ATTEMPTS - 1) {
165+
Logger.warn("Failed to refresh CJIT entries; using cached state", it, context = TAG)
166+
}
167+
null
168+
}
169+
170+
entries?.let { return it }
171+
delay(CJIT_REFRESH_RETRY_DELAY)
131172
}
173+
return _blocktankState.value.cjitEntries
132174
}
133175

134176
suspend fun refreshInfo() = withContext(bgDispatcher) {
@@ -553,6 +595,9 @@ class BlocktankRepo @Inject constructor(
553595
private const val PEER_CONNECTION_DELAY_MS = 2_000L
554596
private val TIMEOUT_GIFT_CODE = 30.seconds
555597
private val GIFT_PAYMENT_RECEIVE_TIMEOUT = 45.seconds
598+
private const val CJIT_REFRESH_ATTEMPTS = 3
599+
private val CJIT_REFRESH_TIMEOUT = 5.seconds
600+
private val CJIT_REFRESH_RETRY_DELAY = 1.seconds
556601
}
557602
}
558603

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package to.bitkit.ext
2+
3+
import kotlinx.coroutines.CancellationException
4+
import org.junit.Test
5+
import to.bitkit.test.BaseUnitTest
6+
import kotlin.test.assertEquals
7+
import kotlin.test.assertFailsWith
8+
import kotlin.test.assertTrue
9+
10+
class CoroutinesTest : BaseUnitTest() {
11+
12+
@Test
13+
fun `runSuspendCatching wraps a successful result`() = test {
14+
val result = runSuspendCatching { 42 }
15+
16+
assertEquals(42, result.getOrNull())
17+
}
18+
19+
@Test
20+
fun `runSuspendCatching wraps a thrown exception as failure`() = test {
21+
val error = IllegalStateException("boom")
22+
23+
val result = runSuspendCatching { throw error }
24+
25+
assertTrue(result.isFailure)
26+
assertEquals(error, result.exceptionOrNull())
27+
}
28+
29+
@Test
30+
fun `runSuspendCatching re-throws CancellationException`() = test {
31+
assertFailsWith<CancellationException> {
32+
runSuspendCatching { throw CancellationException("cancelled") }
33+
}
34+
}
35+
}

0 commit comments

Comments
 (0)