Skip to content

Commit 5b5c2d0

Browse files
vkuttypclaude
andcommitted
v2.5.10: add CosmoKvPipesConnectionPool — fix single-socket bottleneck
v2.5.9's bare CosmoKvPipesConnection serialises every Query/Execute behind its SemaphoreSlim(1,1) because the underlying socket is one bidirectional stream. Under marivil's binary-protocol stress run 2026-05-21, that surfaced as IMAP throughput collapsing 87% and 493 `sp_Users_GetByEmail` 30s-timeouts — every SQL call on the mail-server process was queueing behind one socket while the HTTP path got connection pooling for free via HttpClient. CosmoKvPipesConnectionPool : ISqlDatabase wraps N pooled sockets, shape mirrors MsSqlConnectionPool: - Channel<T> idle queue + Interlocked _opened counter for lazy growth - AcquireAsync: try idle, then grow up to MaxPoolSize, then wait - ReleaseAsync: return to idle, or discard if dropped - QueryStreamAsync holds the lease for the FULL enumeration New `MaxPoolSize` field on CosmoKvPipesConfiguration (default 16), parseable via `MaxPoolSize=N` in the connection string. 4 new xUnit tests (10/10 in the project now): basic round-trip, 32 concurrent SELECTs complete under 5s, MaxPoolSize=1 still correct (just slow — sanity check), Dispose closes all sockets. This is the API CosmoMailDb.Create should wire — the bare CosmoKvPipesConnection is now treated as a building block (still public for advanced callers who want to manage their own pool). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 84244ce commit 5b5c2d0

4 files changed

Lines changed: 385 additions & 5 deletions

File tree

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>2.5.9</Version>
10+
<Version>2.5.10</Version>
1111
</PropertyGroup>
1212
</Project>

src/CosmoSQLClient.CosmoKvPipes/CosmoKvPipesConfiguration.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ public sealed record CosmoKvPipesConfiguration
1919
/// <summary>Per-request timeout. Default 30 s.</summary>
2020
public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(30);
2121

22+
/// <summary>
23+
/// Max concurrent sockets in the connection pool. Default 16 — the
24+
/// mail-server's typical SMTP/IMAP/EWS/EAS bursts fit comfortably,
25+
/// matching cosmokvd's default 32 auto-commit slots without over-
26+
/// committing. Set higher under heavy parallel SELECT load.
27+
/// </summary>
28+
public int MaxPoolSize { get; init; } = 16;
29+
2230
/// <summary>Absolute socket-file path parsed out of <see cref="Endpoint"/>.</summary>
2331
public string SocketPath
2432
{
@@ -33,7 +41,7 @@ public string SocketPath
3341
}
3442

3543
public string ConnectionString =>
36-
$"Endpoint={Endpoint};AuthToken={AuthToken};Timeout={(int)Timeout.TotalSeconds}";
44+
$"Endpoint={Endpoint};AuthToken={AuthToken};Timeout={(int)Timeout.TotalSeconds};MaxPoolSize={MaxPoolSize}";
3745

3846
public static CosmoKvPipesConfiguration Parse(string connectionString)
3947
{
@@ -64,11 +72,16 @@ public static CosmoKvPipesConfiguration Parse(string connectionString)
6472
if (Get("Timeout") is string t && int.TryParse(t, out var sec) && sec > 0)
6573
timeout = TimeSpan.FromSeconds(sec);
6674

75+
int maxPool = 16;
76+
if (Get("MaxPoolSize", "Pooling") is string p && int.TryParse(p, out var pp) && pp > 0)
77+
maxPool = pp;
78+
6779
return new CosmoKvPipesConfiguration
6880
{
69-
Endpoint = endpoint,
70-
AuthToken = token,
71-
Timeout = timeout,
81+
Endpoint = endpoint,
82+
AuthToken = token,
83+
Timeout = timeout,
84+
MaxPoolSize = maxPool,
7285
};
7386
}
7487
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
using System.Runtime.CompilerServices;
2+
using System.Threading.Channels;
3+
using CosmoSQLClient.Core;
4+
5+
namespace CosmoSQLClient.CosmoKvPipes;
6+
7+
/// <summary>
8+
/// Connection pool for <see cref="CosmoKvPipesConnection"/>. Each pooled
9+
/// socket is independent — N concurrent SQL calls run over N sockets
10+
/// rather than queueing behind one in-process lock. This is the same
11+
/// shape <c>MsSqlConnectionPool</c> uses for its underlying connections.
12+
/// <para>
13+
/// Why: a single <c>CosmoKvPipesConnection</c> serialises every Query /
14+
/// Execute behind its <c>SemaphoreSlim(1, 1)</c> because the underlying
15+
/// socket is one bidirectional stream — interleaved frames from
16+
/// concurrent callers would corrupt the protocol. Without a pool, ALL
17+
/// SQL traffic on a mail-server process queues behind one connection,
18+
/// which marivil's binary-protocol stress run on 2026-05-21 surfaced
19+
/// as 493 <c>sp_Users_GetByEmail</c> 30 s-timeouts and IMAP throughput
20+
/// collapsing 87% vs the HTTP-over-UDS baseline.
21+
/// </para>
22+
/// <para>
23+
/// Implementation: <see cref="Channel{T}"/>-based idle queue + a count
24+
/// of "opened so far," same lazy-grow strategy <c>MsSqlConnectionPool</c>
25+
/// uses. Connections are created only when needed up to
26+
/// <see cref="MaxConnections"/>, then any caller arriving when the pool
27+
/// is saturated waits on the idle channel for a release.
28+
/// </para>
29+
/// </summary>
30+
public sealed class CosmoKvPipesConnectionPool : ISqlDatabase
31+
{
32+
private readonly CosmoKvPipesConfiguration _config;
33+
private readonly Channel<CosmoKvPipesConnection> _idle;
34+
private int _opened;
35+
private bool _disposed;
36+
37+
public int MaxConnections { get; }
38+
39+
public bool IsOpen => !_disposed;
40+
41+
public CosmoKvPipesConnectionPool(CosmoKvPipesConfiguration config, int maxConnections = 16)
42+
{
43+
_config = config;
44+
MaxConnections = Math.Max(1, maxConnections);
45+
_idle = Channel.CreateUnbounded<CosmoKvPipesConnection>(
46+
new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
47+
}
48+
49+
public CosmoKvPipesConnectionPool(string connectionString, int maxConnections = 16)
50+
: this(CosmoKvPipesConfiguration.Parse(connectionString), maxConnections) { }
51+
52+
// ── ISqlDatabase delegation ─────────────────────────────────────────────
53+
54+
public Task<IReadOnlyList<SqlRow>> QueryAsync(
55+
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
56+
=> WithConnAsync(c => c.QueryAsync(sql, parameters, ct), ct);
57+
58+
public Task<IReadOnlyList<T>> QueryAsync<T>(
59+
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
60+
where T : new()
61+
=> WithConnAsync(c => c.QueryAsync<T>(sql, parameters, ct), ct);
62+
63+
public Task<int> ExecuteAsync(
64+
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
65+
=> WithConnAsync(c => c.ExecuteAsync(sql, parameters, ct), ct);
66+
67+
public Task<SqlDataTable> QueryTableAsync(
68+
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
69+
=> WithConnAsync(c => c.QueryTableAsync(sql, parameters, ct), ct);
70+
71+
/// <summary>
72+
/// Streams query results. The lease is held for the FULL enumeration
73+
/// duration — closing the enumerator returns the connection. Don't
74+
/// hold the enumerator past your immediate consumption loop or you'll
75+
/// starve other callers of a slot.
76+
/// </summary>
77+
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
78+
string sql, IReadOnlyList<SqlParameter>? parameters = null,
79+
[EnumeratorCancellation] CancellationToken ct = default)
80+
{
81+
var conn = await AcquireAsync(ct).ConfigureAwait(false);
82+
try
83+
{
84+
await foreach (var row in conn.QueryStreamAsync(sql, parameters, ct).ConfigureAwait(false))
85+
yield return row;
86+
}
87+
finally
88+
{
89+
await ReleaseAsync(conn).ConfigureAwait(false);
90+
}
91+
}
92+
93+
public async IAsyncEnumerable<System.Text.Json.JsonElement> QueryJsonStreamAsync(
94+
string sql, IReadOnlyList<SqlParameter>? parameters = null, int jsonColumnIndex = 0,
95+
[EnumeratorCancellation] CancellationToken ct = default)
96+
{
97+
var conn = await AcquireAsync(ct).ConfigureAwait(false);
98+
try
99+
{
100+
await foreach (var elem in conn.QueryJsonStreamAsync(sql, parameters, jsonColumnIndex, ct).ConfigureAwait(false))
101+
yield return elem;
102+
}
103+
finally
104+
{
105+
await ReleaseAsync(conn).ConfigureAwait(false);
106+
}
107+
}
108+
109+
public Task BeginTransactionAsync(CancellationToken ct = default)
110+
=> throw new NotSupportedException(
111+
"Explicit transactions are not supported on the CosmoKvPipes binary protocol. " +
112+
"Use CosmoSQLClient.CosmoKvHttp for BEGIN/COMMIT/ROLLBACK semantics.");
113+
114+
public Task CommitAsync(CancellationToken ct = default) => BeginTransactionAsync(ct);
115+
public Task RollbackAsync(CancellationToken ct = default) => BeginTransactionAsync(ct);
116+
117+
public async Task CloseAsync()
118+
{
119+
if (_disposed) return;
120+
_idle.Writer.TryComplete();
121+
await foreach (var c in _idle.Reader.ReadAllAsync().ConfigureAwait(false))
122+
{
123+
try { await c.DisposeAsync().ConfigureAwait(false); } catch { /* best-effort */ }
124+
}
125+
}
126+
127+
public async ValueTask DisposeAsync()
128+
{
129+
if (_disposed) return;
130+
_disposed = true;
131+
await CloseAsync().ConfigureAwait(false);
132+
}
133+
134+
// ── Pool internals ──────────────────────────────────────────────────────
135+
136+
private async Task<T> WithConnAsync<T>(Func<CosmoKvPipesConnection, Task<T>> op, CancellationToken ct)
137+
{
138+
var conn = await AcquireAsync(ct).ConfigureAwait(false);
139+
try { return await op(conn).ConfigureAwait(false); }
140+
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
141+
}
142+
143+
private async Task<CosmoKvPipesConnection> AcquireAsync(CancellationToken ct)
144+
{
145+
if (_disposed) throw new ObjectDisposedException(nameof(CosmoKvPipesConnectionPool));
146+
147+
// Fast path: pick up an idle connection if there is one.
148+
if (_idle.Reader.TryRead(out var idle) && idle.IsOpen)
149+
return idle;
150+
151+
// No idle. If we haven't grown to MaxConnections yet, lazily open
152+
// a new one. Increment-and-check-then-decrement-on-fail is the
153+
// racing-safe form (multiple acquirers may briefly observe the
154+
// count past max; the decrement on failure keeps the bookkeeping
155+
// consistent).
156+
int n = Interlocked.Increment(ref _opened);
157+
if (n <= MaxConnections)
158+
{
159+
try { return await CosmoKvPipesConnection.OpenAsync(_config, ct).ConfigureAwait(false); }
160+
catch { Interlocked.Decrement(ref _opened); throw; }
161+
}
162+
Interlocked.Decrement(ref _opened);
163+
164+
// Pool is saturated — wait for a release on the idle channel.
165+
// Loop because a released-but-dead connection should be discarded
166+
// and the wait resumed.
167+
while (true)
168+
{
169+
var pooled = await _idle.Reader.ReadAsync(ct).ConfigureAwait(false);
170+
if (pooled.IsOpen) return pooled;
171+
await DiscardAsync(pooled).ConfigureAwait(false);
172+
}
173+
}
174+
175+
private async ValueTask ReleaseAsync(CosmoKvPipesConnection conn)
176+
{
177+
if (!conn.IsOpen || _disposed)
178+
{
179+
await DiscardAsync(conn).ConfigureAwait(false);
180+
return;
181+
}
182+
if (!_idle.Writer.TryWrite(conn))
183+
{
184+
// Channel was completed mid-release (Dispose racing with the
185+
// last call). Discard the connection to avoid leaking it.
186+
await DiscardAsync(conn).ConfigureAwait(false);
187+
}
188+
}
189+
190+
private async ValueTask DiscardAsync(CosmoKvPipesConnection conn)
191+
{
192+
Interlocked.Decrement(ref _opened);
193+
try { await conn.DisposeAsync().ConfigureAwait(false); } catch { /* best-effort */ }
194+
}
195+
}

0 commit comments

Comments
 (0)