Skip to content

Commit f0f9918

Browse files
vkuttypclaude
andcommitted
v2.5.11: replace CosmoKvConnection._lock with Microsoft.VisualStudio.
Threading.AsyncReaderWriterLock The single SemaphoreSlim(1,1) on CosmoKvConnection has been the gating bottleneck behind every transport optimisation this session — UDS, binary protocol, client connection pool all invisible against it because the server-side serialised everything anyway. Two prior v2.5.7 attempts at rolling our own RW lock failed: drain-N-slots starved writers under sustained read load, writer-priority gate hit subtle timing races under xUnit parallelism. The Microsoft.VisualStudio. Threading.AsyncReaderWriterLock is the same primitive Visual Studio's project-model uses; bounded fairness + cooperative cancellation already correct. Wiring: - QueryAsync → ReadLockAsync (concurrent) - ExecuteAsync → WriteLockAsync (exclusive) - BeginTransactionAsync → WriteLockAsync - CommitAsync → WriteLockAsync - RollbackAsync → WriteLockAsync The Executor-side AsyncLocal<IKvAccess?> from v2.5.8 stays in place — it's the necessary per-call isolation for concurrent reads on the shared executor. The ConcurrentDictionary<,> swap for the parser cache stays too. 3 new xUnit tests in Phase30AsyncRWLockTests verify: 1. 32 concurrent SELECTs measurably faster than serial (<70% of serial time) — the observable that proves the read lock is non-exclusive. 2. Reader observing COUNT(*) during concurrent INSERTs never sees count decrease — write/read isolation. 3. Writer not starved under 8 hot reader tasks — the v2.5.7 drain-N attempt hung this test indefinitely. All 269 CosmoKv driver tests pass (was 266; +3 new). Pipes pool tests still 10/10. Once this lands on marivil, the binary-protocol + pool work from v2.5.10 should finally become a measurable win — the client-side parallelism now actually maps to server-side parallel execution. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5b5c2d0 commit f0f9918

4 files changed

Lines changed: 288 additions & 85 deletions

File tree

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>2.5.10</Version>
10+
<Version>2.5.11</Version>
1111
</PropertyGroup>
1212
</Project>

src/CosmoSQLClient.CosmoKv/CosmoKvConnection.cs

Lines changed: 89 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,22 @@ public sealed class CosmoKvConnection : ISqlDatabase
3636
private CosmoKvTxn? _txn;
3737

3838
/// <summary>
39-
/// Connection-level serialisation lock. The v2.5.7 attempt at an
40-
/// AsyncReaderWriterLock here regressed marivil's stress run badly —
41-
/// the "drain N slots" writer pattern starved under sustained read
42-
/// load (audit/RelayWorker writes blocked indefinitely), and a
43-
/// "writer-priority gate" rewrite hit subtler timing issues under
44-
/// the .NET test suite's parallel-test load. Reverted to the single
45-
/// SemaphoreSlim until a properly-engineered async RW lock can be
46-
/// designed; the <see cref="Execution.Executor"/>-side AsyncLocal
47-
/// change stays (it's harmless under serial access and enables the
48-
/// eventual RW-lock work without another driver-side refactor).
39+
/// Battle-tested writer-priority RW lock from Microsoft.VisualStudio.
40+
/// Threading. Multiple readers run concurrently (snapshot-isolated via
41+
/// MVCC); ExecuteAsync / Begin / Commit / Rollback acquire the write
42+
/// lock for exclusive access. Pre-v2.5.11 was a single SemaphoreSlim
43+
/// (everything serial), which was the gating bottleneck behind every
44+
/// transport optimisation this session (UDS, binary protocol, client
45+
/// pool — all invisible against this lock).
46+
///
47+
/// Two prior v2.5.7 attempts at rolling our own RW lock failed —
48+
/// the drain-N-slots writer pattern starved under sustained read
49+
/// load and a writer-priority gate rewrite hit subtle timing issues
50+
/// under xUnit's parallel harness. The MS lock primitive handles
51+
/// both correctly (it's the same one Visual Studio's project system
52+
/// uses for cross-thread project model access).
4953
/// </summary>
50-
private readonly SemaphoreSlim _lock = new(1, 1);
54+
private readonly Microsoft.VisualStudio.Threading.AsyncReaderWriterLock _rwLock = new();
5155
private bool _disposed;
5256

5357
/// <summary>
@@ -185,28 +189,29 @@ public static Task<CosmoKvConnection> OpenAsync(
185189
public async Task<IReadOnlyList<SqlRow>> QueryAsync(
186190
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
187191
{
188-
await _lock.WaitAsync(ct);
189-
try
192+
using (await _rwLock.ReadLockAsync(ct))
190193
{
191194
EnsureOpen();
192195
var stmt = TSqlParser.Parse(sql);
193196
// Snapshot _txn under the read lock so a concurrent BEGIN
194-
// (which holds the write lock and excludes us) can't change
195-
// our view mid-execution. Once snapshotted, the per-call
196-
// access lives in the Executor's AsyncLocal slot, so
197-
// sibling readers don't see our routing.
197+
// (which would hold the write lock and exclude us) can't
198+
// change our view mid-execution. The per-call access lives
199+
// in the Executor's AsyncLocal slot, so sibling readers
200+
// don't see our routing.
198201
var snapshotTxn = _txn;
199-
if (snapshotTxn is not null)
200-
_executor!.SetActiveAccess(new TxnKvAccess(snapshotTxn));
201-
else
202-
_executor!.SetActiveAccess(null);
203-
var (cols, rows) = await _executor!.QueryAsync(stmt, parameters, ct);
204-
return MaterializeRows(cols, rows);
205-
}
206-
finally
207-
{
208-
_executor?.SetActiveAccess(null);
209-
_lock.Release();
202+
try
203+
{
204+
if (snapshotTxn is not null)
205+
_executor!.SetActiveAccess(new TxnKvAccess(snapshotTxn));
206+
else
207+
_executor!.SetActiveAccess(null);
208+
var (cols, rows) = await _executor!.QueryAsync(stmt, parameters, ct);
209+
return MaterializeRows(cols, rows);
210+
}
211+
finally
212+
{
213+
_executor?.SetActiveAccess(null);
214+
}
210215
}
211216
}
212217

@@ -234,60 +239,61 @@ public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
234239
public async Task<int> ExecuteAsync(
235240
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
236241
{
237-
await _lock.WaitAsync(ct);
238-
try
242+
using (await _rwLock.WriteLockAsync(ct))
239243
{
240-
EnsureOpen();
241-
var stmt = TSqlParser.Parse(sql);
242-
243-
// SQL-level transaction commands — intercept before the executor.
244-
switch (stmt)
245-
{
246-
case BeginTransactionStatement:
247-
await BeginTransactionAsyncLocked(ct);
248-
return 0;
249-
case CommitTransactionStatement:
250-
await CommitAsyncLocked(ct);
251-
return 0;
252-
case RollbackTransactionStatement:
253-
await RollbackAsyncLocked(ct);
254-
return 0;
255-
}
256-
257-
if (_txn is not null)
258-
{
259-
// Inside an explicit transaction — buffer into it.
260-
_executor!.SetActiveAccess(new TxnKvAccess(_txn));
261-
return await _executor!.ExecuteAsync(stmt, parameters, ct);
262-
}
263-
264-
// Implicit per-statement transaction so multi-key writes (row +
265-
// index entries + IDENTITY counter) are atomic. CosmoKv's writer
266-
// batches the buffer into one append, so the cost of the wrap
267-
// is small.
268-
var t = _db!.BeginTransaction(update: true);
269244
try
270245
{
271-
_executor!.SetActiveAccess(new TxnKvAccess(t));
272-
int n = await _executor!.ExecuteAsync(stmt, parameters, ct);
273-
await t.CommitAsync(ct);
274-
return n;
275-
}
276-
catch (global::CosmoKv.ConflictException ce)
277-
{
278-
throw new SqlTransactionConflictException(
279-
"Implicit transaction failed due to a concurrent write conflict.", ce);
246+
EnsureOpen();
247+
var stmt = TSqlParser.Parse(sql);
248+
249+
// SQL-level transaction commands — intercept before the executor.
250+
switch (stmt)
251+
{
252+
case BeginTransactionStatement:
253+
await BeginTransactionAsyncLocked(ct);
254+
return 0;
255+
case CommitTransactionStatement:
256+
await CommitAsyncLocked(ct);
257+
return 0;
258+
case RollbackTransactionStatement:
259+
await RollbackAsyncLocked(ct);
260+
return 0;
261+
}
262+
263+
if (_txn is not null)
264+
{
265+
// Inside an explicit transaction — buffer into it.
266+
_executor!.SetActiveAccess(new TxnKvAccess(_txn));
267+
return await _executor!.ExecuteAsync(stmt, parameters, ct);
268+
}
269+
270+
// Implicit per-statement transaction so multi-key writes (row +
271+
// index entries + IDENTITY counter) are atomic. CosmoKv's writer
272+
// batches the buffer into one append, so the cost of the wrap
273+
// is small.
274+
var t = _db!.BeginTransaction(update: true);
275+
try
276+
{
277+
_executor!.SetActiveAccess(new TxnKvAccess(t));
278+
int n = await _executor!.ExecuteAsync(stmt, parameters, ct);
279+
await t.CommitAsync(ct);
280+
return n;
281+
}
282+
catch (global::CosmoKv.ConflictException ce)
283+
{
284+
throw new SqlTransactionConflictException(
285+
"Implicit transaction failed due to a concurrent write conflict.", ce);
286+
}
287+
finally
288+
{
289+
await t.DisposeAsync();
290+
}
280291
}
281292
finally
282293
{
283-
await t.DisposeAsync();
294+
_executor?.SetActiveAccess(null);
284295
}
285296
}
286-
finally
287-
{
288-
_executor?.SetActiveAccess(null);
289-
_lock.Release();
290-
}
291297
}
292298

293299
public async Task<SqlDataTable> QueryTableAsync(
@@ -336,23 +342,20 @@ private static IReadOnlyList<SqlRow> MaterializeRows(
336342

337343
public async Task BeginTransactionAsync(CancellationToken ct = default)
338344
{
339-
await _lock.WaitAsync(ct);
340-
try { await BeginTransactionAsyncLocked(ct); }
341-
finally { _lock.Release(); }
345+
using (await _rwLock.WriteLockAsync(ct))
346+
await BeginTransactionAsyncLocked(ct);
342347
}
343348

344349
public async Task CommitAsync(CancellationToken ct = default)
345350
{
346-
await _lock.WaitAsync(ct);
347-
try { await CommitAsyncLocked(ct); }
348-
finally { _lock.Release(); }
351+
using (await _rwLock.WriteLockAsync(ct))
352+
await CommitAsyncLocked(ct);
349353
}
350354

351355
public async Task RollbackAsync(CancellationToken ct = default)
352356
{
353-
await _lock.WaitAsync(ct);
354-
try { await RollbackAsyncLocked(ct); }
355-
finally { _lock.Release(); }
357+
using (await _rwLock.WriteLockAsync(ct))
358+
await RollbackAsyncLocked(ct);
356359
}
357360

358361
// ── Transaction internals (lock already held by caller) ─────────────────
@@ -421,7 +424,9 @@ public async ValueTask DisposeAsync()
421424
if (_disposed) return;
422425
_disposed = true;
423426
await CloseAsync();
424-
_lock.Dispose();
427+
// MS AsyncReaderWriterLock implements IDisposable internally only
428+
// via its CompleteAsync API; GC handles cleanup of pending nodes
429+
// once no references remain. No explicit dispose call needed.
425430
}
426431

427432
private void EnsureOpen()

src/CosmoSQLClient.CosmoKv/CosmoSQLClient.CosmoKv.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66

77
<ItemGroup>
88
<PackageReference Include="CosmoKv" Version="2.1.9" />
9+
<!-- Microsoft.VisualStudio.Threading.AsyncReaderWriterLock — battle-
10+
tested writer-priority RW lock for the CosmoKvConnection
11+
hot path. The v2.5.7 attempt at rolling our own failed under
12+
writer starvation; this primitive ships with cooperative
13+
cancellation and bounded fairness already correct. -->
14+
<PackageReference Include="Microsoft.VisualStudio.Threading" Version="17.14.15" />
915
</ItemGroup>
1016

1117
<PropertyGroup>

0 commit comments

Comments
 (0)