Skip to content

Commit 2d90d3e

Browse files
committed
fix(Postgres): validate-on-checkout, mirror MsSql pool behaviour
The 1.9.49 retry-on-bad-conn was insufficient: after a postgres restart the pool typically holds many dead connections, so the first op fails (caught), AcquireAsync hands out ANOTHER dead one for the retry, and that second op also fails. Result: every request still errors with 'terminating connection due to administrator command', and the only fix is to restart the consuming process. Verified against a live restart on a production VPS — all six post-restart probes returned HTTP 500. Add validate-on-checkout, matching the MsSqlConnectionPool design: AcquireAsync now loops over idle connections, runs a quick SELECT 1 liveness probe, discards any that fail, and only returns a connection that successfully responded. If the pool is empty after discards, a fresh connection is opened. PostgresConnection.IsOpen alone is not sufficient — it stays true even after the backend kills the socket. Combined with the WithRetryAsync wrapper from 1.9.49, the pool now self-heals after a backend restart with zero caller intervention. Bump version to 1.9.51.
1 parent 12ede72 commit 2d90d3e

2 files changed

Lines changed: 49 additions & 17 deletions

File tree

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>1.9.50</Version>
10+
<Version>1.9.51</Version>
1111
</PropertyGroup>
1212
</Project>

src/CosmoSQLClient.Postgres/PostgresConnectionPool.cs

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ public sealed class PostgresConnectionPool : ISqlDatabase, IDisposable, IAsyncDi
1111
private readonly PostgresConfiguration _config;
1212
private readonly int _maxConnections;
1313
private readonly Channel<PostgresConnection> _idle;
14+
private readonly Func<PostgresConnection, CancellationToken, ValueTask<bool>> _validateConnection;
1415
private int _count;
1516
private bool _disposed;
1617

17-
public PostgresConnectionPool(PostgresConfiguration config, int maxConnections = 10)
18+
public PostgresConnectionPool(
19+
PostgresConfiguration config,
20+
int maxConnections = 10,
21+
Func<PostgresConnection, CancellationToken, ValueTask<bool>>? validateConnection = null)
1822
{
1923
_config = config;
2024
_maxConnections = maxConnections;
@@ -24,35 +28,63 @@ public PostgresConnectionPool(PostgresConfiguration config, int maxConnections =
2428
SingleReader = false,
2529
SingleWriter = false,
2630
});
31+
_validateConnection = validateConnection ?? DefaultValidateConnectionAsync;
2732
}
2833

2934
public bool IsOpen => !_disposed;
3035

36+
private static async ValueTask<bool> DefaultValidateConnectionAsync(PostgresConnection conn, CancellationToken ct)
37+
{
38+
if (!conn.IsOpen) return false;
39+
try
40+
{
41+
await conn.QueryAsync("SELECT 1", ct: ct).ConfigureAwait(false);
42+
return true;
43+
}
44+
catch (OperationCanceledException) { throw; }
45+
catch { return false; }
46+
}
47+
3148
// ── Pool acquire/release ───────────────────────────────────────────────────
3249

3350
public async Task<PostgresConnection> AcquireAsync(CancellationToken ct = default)
3451
{
35-
if (_idle.Reader.TryRead(out var conn))
36-
return conn;
37-
38-
int current = Interlocked.Increment(ref _count);
39-
if (current <= _maxConnections)
52+
while (true)
4053
{
41-
try
54+
if (_idle.Reader.TryRead(out var idleConn))
4255
{
43-
var newConn = new PostgresConnection(_config);
44-
await newConn.OpenAsync(ct).ConfigureAwait(false);
45-
return newConn;
56+
if (await _validateConnection(idleConn, ct).ConfigureAwait(false))
57+
return idleConn;
58+
59+
try { await idleConn.DisposeAsync().ConfigureAwait(false); } catch { }
60+
Interlocked.Decrement(ref _count);
61+
continue;
4662
}
47-
catch
63+
64+
int current = Interlocked.Increment(ref _count);
65+
if (current <= _maxConnections)
4866
{
49-
Interlocked.Decrement(ref _count);
50-
throw;
67+
try
68+
{
69+
var newConn = new PostgresConnection(_config);
70+
await newConn.OpenAsync(ct).ConfigureAwait(false);
71+
return newConn;
72+
}
73+
catch
74+
{
75+
Interlocked.Decrement(ref _count);
76+
throw;
77+
}
5178
}
52-
}
5379

54-
Interlocked.Decrement(ref _count);
55-
return await _idle.Reader.ReadAsync(ct).ConfigureAwait(false);
80+
Interlocked.Decrement(ref _count);
81+
var pooledConn = await _idle.Reader.ReadAsync(ct).ConfigureAwait(false);
82+
if (await _validateConnection(pooledConn, ct).ConfigureAwait(false))
83+
return pooledConn;
84+
85+
try { await pooledConn.DisposeAsync().ConfigureAwait(false); } catch { }
86+
Interlocked.Decrement(ref _count);
87+
}
5688
}
5789

5890
public async Task ReleaseAsync(PostgresConnection conn)

0 commit comments

Comments
 (0)