Skip to content

Commit c74256e

Browse files
vkuttypclaude
andcommitted
v2.5.7: CosmoKvConnection allows concurrent reads, AsyncLocal active-access
Before: every QueryAsync and ExecuteAsync went through a single SemaphoreSlim(1, 1) at the CosmoKvConnection layer, so even embarrassingly-parallel SELECTs ran strictly one-at-a-time on a shared connection. The cosmokvd UDS rollout finding ("UDS savings invisible at marivil stress") traced back here: the per-call transport cost was dwarfed by queueing behind this single lock. Three changes: 1. New AsyncReaderWriterLock — counting semaphore with 256 slots; readers take 1, writers drain all. Mirrors the cosmokvd TxnRegistry pattern. Fairness caveat: under sustained read load a pending writer can wait indefinitely, but the mail-server workload (read-heavy, short bursts of writes) doesn't hit this in practice. Replace with a queue-fair lock if it ever bites. 2. Executor._kv → Executor._activeKv (AsyncLocal<IKvAccess?>). The single mutable field that SetActiveAccess was writing depended on the connection-level lock for race-freedom; AsyncLocal threads each statement's value through its own async flow so concurrent QueryAsync calls don't trample each other's routing. Shared _defaultExprCache moves to ConcurrentDictionary for the same concurrency reason. 3. CosmoKvConnection.QueryAsync now takes the READ lock; ExecuteAsync / BeginTransactionAsync / CommitAsync / RollbackAsync take the WRITE lock. Reads run in parallel up to the rwlock's reader capacity; writes still mutually exclude reads and other writers. 3 new xUnit tests in Phase29ConcurrentReadsTests verify: - 16 concurrent SELECTs measurably faster than serial (proves reads aren't serialised) - 32 concurrent SELECTs filtered by bucket each return their correct row count (proves AsyncLocal doesn't bleed across reads) - Reader observing COUNT(*) during concurrent inserts never sees a count-going-backwards (proves snapshot stability) All 269 CosmoKv driver tests pass (266 prior + 3 new). Companion deployment work (cosmokvd + mail-server) will follow once this package indexes on NuGet. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bdf8889 commit c74256e

5 files changed

Lines changed: 329 additions & 48 deletions

File tree

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>2.5.6</Version>
10+
<Version>2.5.7</Version>
1111
</PropertyGroup>
1212
</Project>
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
namespace CosmoSQLClient.CosmoKv;
2+
3+
/// <summary>
4+
/// Minimal async reader-writer lock. Multiple readers can hold the
5+
/// lock concurrently; a writer acquires exclusively against all
6+
/// readers AND other writers. Implemented as a counting semaphore
7+
/// where readers take 1 slot and writers drain all slots — the same
8+
/// pattern <c>CosmoKvD.Services.TxnRegistry</c> uses for its
9+
/// auto-commit vs explicit-BEGIN coordination.
10+
///
11+
/// Fairness: under sustained read load a pending writer can wait
12+
/// indefinitely if readers keep arriving. The current mail-server
13+
/// workload is read-heavy with infrequent bursts of writes, and
14+
/// writes here are short (single SQL statement), so starvation
15+
/// hasn't been observed. If it becomes an issue, replace with a
16+
/// queue-fair RW lock at that point.
17+
/// </summary>
18+
internal sealed class AsyncReaderWriterLock
19+
{
20+
private readonly SemaphoreSlim _slots;
21+
private readonly int _capacity;
22+
23+
public AsyncReaderWriterLock(int readerCapacity = 256)
24+
{
25+
_capacity = readerCapacity;
26+
_slots = new SemaphoreSlim(readerCapacity, readerCapacity);
27+
}
28+
29+
/// <summary>Acquire one reader slot. Returns immediately if capacity is available.</summary>
30+
public Task AcquireReadAsync(CancellationToken ct) => _slots.WaitAsync(ct);
31+
32+
/// <summary>Release one reader slot previously acquired by <see cref="AcquireReadAsync"/>.</summary>
33+
public void ReleaseRead() => _slots.Release();
34+
35+
/// <summary>
36+
/// Acquire exclusive write access — drains every reader slot.
37+
/// Blocks until all currently-running readers have finished.
38+
/// </summary>
39+
public async Task AcquireWriteAsync(CancellationToken ct)
40+
{
41+
int acquired = 0;
42+
try
43+
{
44+
for (int i = 0; i < _capacity; i++)
45+
{
46+
await _slots.WaitAsync(ct).ConfigureAwait(false);
47+
acquired++;
48+
}
49+
}
50+
catch
51+
{
52+
// Release any slots we managed to grab before cancellation —
53+
// we'd otherwise wedge the lock for everyone else.
54+
for (int i = 0; i < acquired; i++) _slots.Release();
55+
throw;
56+
}
57+
}
58+
59+
/// <summary>Release exclusive write access — restores all reader slots.</summary>
60+
public void ReleaseWrite()
61+
{
62+
_slots.Release(_capacity);
63+
}
64+
}

src/CosmoSQLClient.CosmoKv/CosmoKvConnection.cs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,16 @@ public sealed class CosmoKvConnection : ISqlDatabase
3535
/// <summary>The active explicit transaction, or <c>null</c> for auto-commit.</summary>
3636
private CosmoKvTxn? _txn;
3737

38-
private readonly SemaphoreSlim _lock = new(1, 1);
38+
/// <summary>
39+
/// Reader-writer lock — multiple QueryAsync calls run concurrently
40+
/// (CosmoKv's MVCC already gives them snapshot isolation; the
41+
/// Executor's per-call state is now AsyncLocal so concurrent reads
42+
/// don't trample each other). ExecuteAsync and Begin/Commit/Rollback
43+
/// take the write lock for exclusive access — they mutate <see cref="_txn"/>
44+
/// and the underlying writer task. Replaces the prior single
45+
/// SemaphoreSlim(1,1) that serialised everything.
46+
/// </summary>
47+
private readonly AsyncReaderWriterLock _rwLock = new();
3948
private bool _disposed;
4049

4150
/// <summary>
@@ -173,17 +182,19 @@ public static Task<CosmoKvConnection> OpenAsync(
173182
public async Task<IReadOnlyList<SqlRow>> QueryAsync(
174183
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
175184
{
176-
await _lock.WaitAsync(ct);
185+
await _rwLock.AcquireReadAsync(ct).ConfigureAwait(false);
177186
try
178187
{
179188
EnsureOpen();
180189
var stmt = TSqlParser.Parse(sql);
181-
// Reads piggyback on the explicit txn for snapshot consistency,
182-
// or hit the Db directly otherwise (single-statement reads
183-
// don't need their own implicit txn — CosmoKv's iterator gives
184-
// sufficient consistency for an isolated read).
185-
if (_txn is not null)
186-
_executor!.SetActiveAccess(new TxnKvAccess(_txn));
190+
// Snapshot _txn under the read lock so a concurrent BEGIN
191+
// (which holds the write lock and excludes us) can't change
192+
// our view mid-execution. Once snapshotted, the per-call
193+
// access lives in the Executor's AsyncLocal slot, so
194+
// sibling readers don't see our routing.
195+
var snapshotTxn = _txn;
196+
if (snapshotTxn is not null)
197+
_executor!.SetActiveAccess(new TxnKvAccess(snapshotTxn));
187198
else
188199
_executor!.SetActiveAccess(null);
189200
var (cols, rows) = await _executor!.QueryAsync(stmt, parameters, ct);
@@ -192,7 +203,7 @@ public async Task<IReadOnlyList<SqlRow>> QueryAsync(
192203
finally
193204
{
194205
_executor?.SetActiveAccess(null);
195-
_lock.Release();
206+
_rwLock.ReleaseRead();
196207
}
197208
}
198209

@@ -220,7 +231,7 @@ public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
220231
public async Task<int> ExecuteAsync(
221232
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
222233
{
223-
await _lock.WaitAsync(ct);
234+
await _rwLock.AcquireWriteAsync(ct).ConfigureAwait(false);
224235
try
225236
{
226237
EnsureOpen();
@@ -272,7 +283,7 @@ public async Task<int> ExecuteAsync(
272283
finally
273284
{
274285
_executor?.SetActiveAccess(null);
275-
_lock.Release();
286+
_rwLock.ReleaseWrite();
276287
}
277288
}
278289

@@ -322,23 +333,23 @@ private static IReadOnlyList<SqlRow> MaterializeRows(
322333

323334
public async Task BeginTransactionAsync(CancellationToken ct = default)
324335
{
325-
await _lock.WaitAsync(ct);
336+
await _rwLock.AcquireWriteAsync(ct).ConfigureAwait(false);
326337
try { await BeginTransactionAsyncLocked(ct); }
327-
finally { _lock.Release(); }
338+
finally { _rwLock.ReleaseWrite(); }
328339
}
329340

330341
public async Task CommitAsync(CancellationToken ct = default)
331342
{
332-
await _lock.WaitAsync(ct);
343+
await _rwLock.AcquireWriteAsync(ct).ConfigureAwait(false);
333344
try { await CommitAsyncLocked(ct); }
334-
finally { _lock.Release(); }
345+
finally { _rwLock.ReleaseWrite(); }
335346
}
336347

337348
public async Task RollbackAsync(CancellationToken ct = default)
338349
{
339-
await _lock.WaitAsync(ct);
350+
await _rwLock.AcquireWriteAsync(ct).ConfigureAwait(false);
340351
try { await RollbackAsyncLocked(ct); }
341-
finally { _lock.Release(); }
352+
finally { _rwLock.ReleaseWrite(); }
342353
}
343354

344355
// ── Transaction internals (lock already held by caller) ─────────────────
@@ -407,7 +418,9 @@ public async ValueTask DisposeAsync()
407418
if (_disposed) return;
408419
_disposed = true;
409420
await CloseAsync();
410-
_lock.Dispose();
421+
// _rwLock holds a SemaphoreSlim; .NET will reclaim it on GC.
422+
// We don't expose a hard-disposal hook because outstanding async
423+
// operations might still be holding slots when DisposeAsync runs.
411424
}
412425

413426
private void EnsureOpen()

src/CosmoSQLClient.CosmoKv/Execution/Executor.cs

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,48 @@ internal sealed class Executor
2020
private readonly Catalog _catalog;
2121

2222
/// <summary>
23-
/// Active storage handle for the current statement. The connection
24-
/// swaps this to <see cref="TxnKvAccess"/> when a transaction (explicit
25-
/// or implicit-per-statement) is active, otherwise it routes through
26-
/// <see cref="DbKvAccess"/>. The connection's <c>SemaphoreSlim</c>
27-
/// serializes calls so this single-field handoff is race-free.
23+
/// Per-call active storage handle. Stored in <see cref="AsyncLocal{T}"/>
24+
/// so concurrent statement executions on the same Executor (which is
25+
/// shared across an entire CosmoKvConnection) don't trample each
26+
/// other's access routing. The connection sets this to a
27+
/// <see cref="TxnKvAccess"/> when running inside an explicit (or
28+
/// implicit-per-statement) transaction, and null otherwise; the
29+
/// getter <see cref="Kv"/> falls back to a shared
30+
/// <see cref="DbKvAccess"/> for auto-commit reads.
31+
///
32+
/// Pre-refactor this was a single mutable field, race-free only
33+
/// because CosmoKvConnection's <c>SemaphoreSlim</c> serialized every
34+
/// QueryAsync/ExecuteAsync call. Moving to AsyncLocal unblocks
35+
/// concurrent reads at the connection layer (the actual bottleneck
36+
/// per the UDS finding).
2837
/// </summary>
29-
private IKvAccess _kv;
38+
private readonly AsyncLocal<IKvAccess?> _activeKv = new();
39+
40+
/// <summary>
41+
/// Shared fresh-snapshot auto-commit access. Returned by <see cref="Kv"/>
42+
/// whenever no per-call <see cref="_activeKv"/> override is set.
43+
/// </summary>
44+
private readonly IKvAccess _defaultKv;
45+
46+
/// <summary>Effective storage handle for the in-flight statement.</summary>
47+
private IKvAccess Kv => _activeKv.Value ?? _defaultKv;
3048

3149
internal Executor(CosmoKvDb db, Catalog catalog)
3250
{
33-
_db = db;
34-
_catalog = catalog;
35-
_kv = new DbKvAccess(db);
51+
_db = db;
52+
_catalog = catalog;
53+
_defaultKv = new DbKvAccess(db);
3654
}
3755

3856
/// <summary>
39-
/// Swap the active <see cref="IKvAccess"/>. Pass <c>null</c> to revert
40-
/// to a fresh auto-committing <see cref="DbKvAccess"/>. Called by
41-
/// <c>CosmoKvConnection</c> before and after each statement.
57+
/// Swap the active <see cref="IKvAccess"/> for the current async flow.
58+
/// Pass <c>null</c> to fall back to the shared auto-commit
59+
/// <see cref="DbKvAccess"/>. Each async flow has its own AsyncLocal
60+
/// slot, so concurrent statements on the same Executor see independent
61+
/// values.
4262
/// </summary>
4363
public void SetActiveAccess(IKvAccess? kv)
44-
=> _kv = kv ?? new DbKvAccess(_db);
64+
=> _activeKv.Value = kv;
4565

4666
/// <summary>
4767
/// When <c>true</c>, INSERTs may carry explicit values for IDENTITY
@@ -53,8 +73,9 @@ public void SetActiveAccess(IKvAccess? kv)
5373

5474
// Cache of parsed DEFAULT expressions keyed by source SQL — the same
5575
// string fires once per row inserted, parser is cheap but constant
56-
// overhead adds up under bulk loads.
57-
private readonly Dictionary<string, Ast.Expression> _defaultExprCache = new();
76+
// overhead adds up under bulk loads. ConcurrentDictionary so the
77+
// cache is safe under the AsyncLocal-based concurrent-read model.
78+
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, Ast.Expression> _defaultExprCache = new();
5879

5980
/// <summary>
6081
/// Evaluate a column's <c>DefaultExpressionSql</c> (v2.4 non-literal
@@ -177,7 +198,7 @@ private async Task<int> ExecuteCreateIndex(CreateIndexStatement s, CancellationT
177198
{
178199
var enc = IndexKeyCodec.Encode(ProjectIndexedValues(row, colIdxs));
179200
var key = KvKeys.IndexEntry(table.Name, index.Name, enc, rowId);
180-
await _kv.SetAsync(key, Array.Empty<byte>(), ct);
201+
await Kv.SetAsync(key, Array.Empty<byte>(), ct);
181202
}
182203
return 0;
183204
}
@@ -343,7 +364,7 @@ SqlTypeFamily.TinyInt or SqlTypeFamily.SmallInt or SqlTypeFamily.Int
343364

344365
var key = KvKeys.Row(table.Name, rowId);
345366
var bytes = RowCodec.Encode(record);
346-
await _kv.SetAsync(key, bytes, ct);
367+
await Kv.SetAsync(key, bytes, ct);
347368
await WriteIndexEntriesAsync(table, record, rowId, ct);
348369
rowsInserted++;
349370

@@ -429,7 +450,7 @@ private async Task WriteIndexEntriesAsync(
429450
}
430451
var enc = IndexKeyCodec.Encode(vals);
431452
var key = KvKeys.IndexEntry(table.Name, idx.Name, enc, rowId);
432-
await _kv.SetAsync(key, Array.Empty<byte>(), ct);
453+
await Kv.SetAsync(key, Array.Empty<byte>(), ct);
433454
}
434455
}
435456

@@ -450,7 +471,7 @@ private async Task EnforceUniqueAsync(
450471
Buffer.BlockCopy(prefix, 0, fullPrefix, 0, prefix.Length);
451472
Buffer.BlockCopy(encWithTerm, 0, fullPrefix, prefix.Length, encWithTerm.Length);
452473

453-
await foreach (var item in _kv.IterateAsync(
474+
await foreach (var item in Kv.IterateAsync(
454475
new global::CosmoKv.IteratorOptions { Prefix = fullPrefix }, ct))
455476
{
456477
long otherRowId = KvKeys.ExtractRowId(item.Key);
@@ -471,7 +492,7 @@ private async Task DeleteIndexEntriesAsync(
471492
var colIdxs = ResolveIndexColumns(table, idx);
472493
var enc = IndexKeyCodec.Encode(ProjectIndexedValues(row, colIdxs));
473494
var key = KvKeys.IndexEntry(table.Name, idx.Name, enc, rowId);
474-
await _kv.DeleteAsync(key, ct);
495+
await Kv.DeleteAsync(key, ct);
475496
}
476497
}
477498

@@ -498,15 +519,15 @@ private static int[] ResolveColumnMapping(InsertStatement s, TableSchema t)
498519
private async Task<long> NextIdentityAsync(string tableName, long seed, long increment)
499520
{
500521
var key = KvKeys.SeqCounter(tableName);
501-
var existing = await _kv.GetAsync(key);
522+
var existing = await Kv.GetAsync(key);
502523
long next;
503524
if (existing is null || existing.Length == 0)
504525
next = seed;
505526
else
506527
next = BinaryPrimitives.ReadInt64BigEndian(existing) + increment;
507528
Span<byte> buf = stackalloc byte[8];
508529
BinaryPrimitives.WriteInt64BigEndian(buf, next);
509-
await _kv.SetAsync(key, buf.ToArray());
530+
await Kv.SetAsync(key, buf.ToArray());
510531
return next;
511532
}
512533

@@ -519,14 +540,14 @@ private async Task<long> NextIdentityAsync(string tableName, long seed, long inc
519540
private async Task BumpIdentityCounterAsync(string tableName, long supplied)
520541
{
521542
var key = KvKeys.SeqCounter(tableName);
522-
var existing = await _kv.GetAsync(key);
543+
var existing = await Kv.GetAsync(key);
523544
long current = (existing is null || existing.Length == 0)
524545
? 0
525546
: BinaryPrimitives.ReadInt64BigEndian(existing);
526547
if (supplied <= current) return;
527548
Span<byte> buf = stackalloc byte[8];
528549
BinaryPrimitives.WriteInt64BigEndian(buf, supplied);
529-
await _kv.SetAsync(key, buf.ToArray());
550+
await Kv.SetAsync(key, buf.ToArray());
530551
}
531552

532553
// ── SELECT (with Phase 2 WHERE/ORDER BY/TOP/OFFSET) ─────────────────────
@@ -1213,7 +1234,7 @@ private async IAsyncEnumerable<SqlValue[]> ScanRowsAsync(
12131234
{
12141235
var prefix = Encoding.UTF8.GetBytes(
12151236
$"{KvKeys.DataPrefix}{table.Name.ToLowerInvariant()}:");
1216-
await foreach (var item in _kv.IterateAsync(
1237+
await foreach (var item in Kv.IterateAsync(
12171238
new global::CosmoKv.IteratorOptions { Prefix = prefix }, ct))
12181239
{
12191240
byte[] bytes = await item.ReadValueAsync(ct);
@@ -1243,7 +1264,7 @@ private async IAsyncEnumerable<SqlValue[]> ScanRowsAsync(
12431264
// Prefix, so we walk the prefix and skip entries outside [lower,upper).
12441265
// Phase 4-bench may push this into the iterator if the entry count
12451266
// becomes the dominant cost.
1246-
await foreach (var item in _kv.IterateAsync(iterOpts, ct))
1267+
await foreach (var item in Kv.IterateAsync(iterOpts, ct))
12471268
{
12481269
var keyAfterPrefix = ((ReadOnlySpan<byte>)item.Key)[prefix.Length..];
12491270
// The keyAfterPrefix is `<encoded-value><rowid-be>`.
@@ -1258,7 +1279,7 @@ private async IAsyncEnumerable<SqlValue[]> ScanRowsAsync(
12581279
continue;
12591280
}
12601281
long rowId = KvKeys.ExtractRowId(item.Key);
1261-
var rowBytes = await _kv.GetAsync(KvKeys.Row(plan.Table.Name, rowId), ct);
1282+
var rowBytes = await Kv.GetAsync(KvKeys.Row(plan.Table.Name, rowId), ct);
12621283
if (rowBytes is null || rowBytes.Length == 0) continue;
12631284
yield return (RowCodec.Decode(rowBytes), rowId);
12641285
}
@@ -1349,7 +1370,7 @@ private async Task<UpdateResult> ExecuteUpdateCore(
13491370
foreach (var (rowId, oldRow, newRow) in pending)
13501371
{
13511372
await DeleteIndexEntriesAsync(table, oldRow, rowId, ct);
1352-
await _kv.SetAsync(KvKeys.Row(table.Name, rowId), RowCodec.Encode(newRow), ct);
1373+
await Kv.SetAsync(KvKeys.Row(table.Name, rowId), RowCodec.Encode(newRow), ct);
13531374
await WriteIndexEntriesAsync(table, newRow, rowId, ct);
13541375

13551376
if (outputPlan is not null)
@@ -1427,7 +1448,7 @@ private async Task<DeleteResult> ExecuteDeleteCore(
14271448
foreach (var (rowId, row) in victims)
14281449
{
14291450
await DeleteIndexEntriesAsync(table, row, rowId, ct);
1430-
await _kv.DeleteAsync(KvKeys.Row(table.Name, rowId), ct);
1451+
await Kv.DeleteAsync(KvKeys.Row(table.Name, rowId), ct);
14311452

14321453
if (outputPlan is not null)
14331454
{

0 commit comments

Comments
 (0)