Skip to content

Commit c0e77c6

Browse files
authored
Improve transaction flow. (#93) (#113)
1 parent b99eb95 commit c0e77c6

3 files changed

Lines changed: 62 additions & 33 deletions

File tree

storm-cli/storm.mjs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,6 @@ function renderFrame(now) {
333333
const mi = matrixIntensityAt(mx, y);
334334
if (hasBolt(x, y)) {
335335
if (mi >= 3) out += matrixColor(mi) + matrixGlyph(mx, y, now) + RESET;
336-
else if (mi > 0) out += matrixColor(mi) + '#' + RESET;
337336
else out += dbColor(y, false) + '#' + RESET;
338337
} else {
339338
out += dbColor(y, false) + '@' + RESET;

storm-kotlin/src/main/kotlin/st/orm/template/impl/CoroutineAwareConnectionProviderImpl.kt

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
3737
require(context is JdbcTransactionContext) { "Transaction context must be of type JdbcTransactionContext." }
3838
validateState()
3939
val connection = context.getConnection(dataSource)
40-
ConcurrencyDetector.beforeAccess(connection)
40+
ConcurrencyDetector.beforeAccess(connection, context)
4141
return connection
4242
}
4343
// If no programmatic transaction is active, obtain a new connection from the data source.
@@ -50,7 +50,7 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
5050
if (context.currentConnection() == connection) {
5151
// If this connection is the current transaction connection, do not close it. It will be closed when the
5252
// outermost transaction ends.
53-
ConcurrencyDetector.afterAccess(connection)
53+
ConcurrencyDetector.afterAccess(connection, context)
5454
return
5555
}
5656
}
@@ -111,8 +111,13 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
111111
}
112112

113113
/**
114-
* Detects concurrent access to transaction-scoped connections. Note that the same thread can access the same
115-
* connection multiple times.
114+
* Detects concurrent access to transaction-scoped connections.
115+
*
116+
* Ownership is tracked by transaction context identity rather than thread identity, because coroutines may resume
117+
* on a different virtual thread after suspension (especially with OpenTelemetry or other javaagent instrumentation
118+
* that wraps dispatched tasks).
119+
*
120+
* The same context can access the same connection multiple times (re-entrancy).
116121
*/
117122
object ConcurrencyDetector {
118123
private class ConnectionIdentity(connection: Connection, queue: ReferenceQueue<Connection>) : WeakReference<Connection>(connection, queue) {
@@ -121,7 +126,7 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
121126
override fun equals(other: Any?) = other is ConnectionIdentity && this.get() === other.get() && this.get() != null
122127
}
123128

124-
private data class Owner(var thread: Thread? = null, var depth: Int = 0)
129+
private data class Owner(var context: TransactionContext? = null, var depth: Int = 0)
125130
private val queue = ReferenceQueue<Connection>()
126131
private val owners = ConcurrentHashMap<ConnectionIdentity, Owner>()
127132

@@ -132,33 +137,31 @@ class CoroutineAwareConnectionProviderImpl : ConnectionProvider {
132137
}
133138
}
134139

135-
fun beforeAccess(connection: Connection) {
140+
fun beforeAccess(connection: Connection, context: TransactionContext) {
136141
reap()
137142
val key = ConnectionIdentity(connection, queue)
138143
val owner = owners.computeIfAbsent(key) { Owner() }
139-
val t = Thread.currentThread()
140144
synchronized(owner) {
141-
when (owner.thread) {
145+
when (owner.context) {
142146
null -> {
143-
owner.thread = t
147+
owner.context = context
144148
owner.depth = 1
145149
}
146-
t -> owner.depth++
150+
context -> owner.depth++
147151
else -> throw PersistenceException("Concurrent access on $connection.")
148152
}
149153
}
150154
}
151155

152-
fun afterAccess(connection: Connection) {
156+
fun afterAccess(connection: Connection, context: TransactionContext) {
153157
reap()
154158
val key = ConnectionIdentity(connection, queue)
155159
val owner = owners[key] ?: return
156-
val t = Thread.currentThread()
157160
var clear = false
158161
synchronized(owner) {
159-
if (owner.thread !== t) return
162+
if (owner.context !== context) return
160163
if (--owner.depth == 0) {
161-
owner.thread = null
164+
owner.context = null
162165
clear = true
163166
}
164167
}

storm-kotlin/src/test/kotlin/st/orm/template/ConnectionProviderTest.kt

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration
1313
import org.springframework.test.context.jdbc.Sql
1414
import org.springframework.test.context.junit.jupiter.SpringExtension
1515
import st.orm.PersistenceException
16+
import st.orm.core.spi.TransactionContext
1617
import st.orm.repository.countAll
1718
import st.orm.template.impl.CoroutineAwareConnectionProviderImpl
1819
import st.orm.template.model.City
@@ -31,8 +32,6 @@ open class ConnectionProviderTest(
3132
@Autowired val dataSource: DataSource,
3233
) {
3334

34-
// Connection acquisition without transaction
35-
3635
@Test
3736
fun `getConnection without transaction should return new connection`() {
3837
val provider = CoroutineAwareConnectionProviderImpl()
@@ -52,8 +51,6 @@ open class ConnectionProviderTest(
5251
connection.isClosed.shouldBeTrue()
5352
}
5453

55-
// Connection within transaction
56-
5754
@Test
5855
fun `getConnection within transaction should reuse transaction connection`(): Unit = runBlocking {
5956
transactionBlocking {
@@ -72,48 +69,77 @@ open class ConnectionProviderTest(
7269
}
7370
}
7471

75-
// ConcurrencyDetector
72+
private fun stubContext(): TransactionContext = object : TransactionContext {
73+
override fun entityCache(entityType: Class<out st.orm.Entity<*>>, retention: st.orm.core.spi.CacheRetention) = throw UnsupportedOperationException()
74+
override fun getEntityCache(entityType: Class<out st.orm.Entity<*>>) = throw UnsupportedOperationException()
75+
override fun findEntityCache(entityType: Class<out st.orm.Entity<*>>) = null
76+
override fun clearAllEntityCaches() {}
77+
override fun <T : Any?> getDecorator(resourceType: Class<T>): TransactionContext.Decorator<T> = TransactionContext.Decorator { it }
78+
}
7679

7780
@Test
78-
fun `ConcurrencyDetector beforeAccess and afterAccess on same thread should succeed`() {
81+
fun `ConcurrencyDetector beforeAccess and afterAccess with same context should succeed`() {
7982
val connection = dataSource.connection
83+
val context = stubContext()
8084
try {
81-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
82-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
85+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
86+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
8387
} finally {
8488
connection.close()
8589
}
8690
}
8791

8892
@Test
89-
fun `ConcurrencyDetector should allow nested access on same thread`() {
93+
fun `ConcurrencyDetector should allow nested access with same context`() {
9094
val connection = dataSource.connection
95+
val context = stubContext()
9196
try {
92-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
93-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
94-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
95-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
97+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
98+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
99+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
100+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
101+
} finally {
102+
connection.close()
103+
}
104+
}
105+
106+
@Test
107+
fun `ConcurrencyDetector should allow same context from different thread`() {
108+
val connection = dataSource.connection
109+
val context = stubContext()
110+
try {
111+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
112+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
113+
// Same context, different thread — simulates virtual thread migration.
114+
val thread = Thread {
115+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context)
116+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
117+
}
118+
thread.start()
119+
thread.join()
96120
} finally {
97121
connection.close()
98122
}
99123
}
100124

101125
@Test
102-
fun `ConcurrencyDetector should detect concurrent access from different threads`() {
126+
fun `ConcurrencyDetector should detect concurrent access from different contexts`() {
103127
val connection = dataSource.connection
128+
val context1 = stubContext()
129+
val context2 = stubContext()
104130
try {
105-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
131+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context1)
106132
var caughtException: Throwable? = null
107133
val thread = Thread {
108-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection)
134+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.beforeAccess(connection, context2)
109135
}
110136
thread.setUncaughtExceptionHandler { _, throwable -> caughtException = throwable }
111137
thread.start()
112138
thread.join()
113139
assertThrows<PersistenceException> {
114140
caughtException?.let { throw it }
115141
}
116-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
142+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context1)
117143
} finally {
118144
connection.close()
119145
}
@@ -122,9 +148,10 @@ open class ConnectionProviderTest(
122148
@Test
123149
fun `ConcurrencyDetector afterAccess on unknown connection should be no-op`() {
124150
val connection = dataSource.connection
151+
val context = stubContext()
125152
try {
126153
// afterAccess on a connection never registered should not throw
127-
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection)
154+
CoroutineAwareConnectionProviderImpl.ConcurrencyDetector.afterAccess(connection, context)
128155
} finally {
129156
connection.close()
130157
}

0 commit comments

Comments
 (0)