Skip to content

Commit 5e35f8b

Browse files
committed
fix(Postgres): pool retries once on a fresh connection when a pooled one is dead
After a postgres restart the pool used to keep handing out dead connections forever — every subsequent QueryAsync/ExecuteAsync threw SqlException 'terminating connection due to administrator command' or 'server closed the connection' and the only fix was to restart the consuming process. Wrap the public ISqlDatabase methods with a single transparent retry: on the first failure, if the connection is unhealthy (closed, broken socket, IOException, or a SqlException whose message indicates server termination), discard the connection without releasing it to the pool, acquire a fresh one, and retry the operation once. Streaming methods (QueryStreamAsync / QueryJsonStreamAsync) are left as-is because re-running mid-stream is messy; they'll surface the error and rely on the caller to start a new enumeration, which will get a fresh connection from the now-cleaned pool. Retries are at-least-once; non-idempotent statements may be applied twice if the connection dies after the server commit but before the client received the response. For typical row-by-row metadata writes this is acceptable. Bump version to 1.9.49. Closes #1.
1 parent b25fb8b commit 5e35f8b

2 files changed

Lines changed: 77 additions & 33 deletions

File tree

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>1.9.48</Version>
10+
<Version>1.9.49</Version>
1111
</PropertyGroup>
1212
</Project>

src/CosmoSQLClient.Postgres/PostgresConnectionPool.cs

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,60 +70,104 @@ public async Task ReleaseAsync(PostgresConnection conn)
7070
Interlocked.Decrement(ref _count);
7171
}
7272

73-
// ── ISqlDatabase ───────────────────────────────────────────────────────────
74-
75-
public async Task<IReadOnlyList<SqlRow>> QueryAsync(
76-
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
73+
private async Task DiscardAsync(PostgresConnection conn)
7774
{
78-
var conn = await AcquireAsync(ct).ConfigureAwait(false);
79-
try { return await conn.QueryAsync(sql, parameters, ct).ConfigureAwait(false); }
80-
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
75+
Interlocked.Decrement(ref _count);
76+
try { await conn.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ }
8177
}
8278

83-
public async Task<IReadOnlyList<T>> QueryAsync<T>(
84-
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
85-
where T : new()
79+
private static bool IsConnectionDead(Exception ex, PostgresConnection conn)
8680
{
87-
var conn = await AcquireAsync(ct).ConfigureAwait(false);
88-
try { return await conn.QueryAsync<T>(sql, parameters, ct).ConfigureAwait(false); }
89-
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
81+
if (!conn.IsOpen) return true;
82+
if (ex is SqlException sex && sex.Kind == SqlErrorKind.ConnectionError) return true;
83+
if (ex is System.IO.IOException) return true;
84+
if (ex is System.Net.Sockets.SocketException) return true;
85+
if (ex is System.IO.EndOfStreamException) return true;
86+
if (ex is ObjectDisposedException) return true;
87+
if (ex is SqlException qex)
88+
{
89+
var msg = qex.Message ?? string.Empty;
90+
if (msg.Contains("terminating connection", StringComparison.OrdinalIgnoreCase) ||
91+
msg.Contains("server closed the connection", StringComparison.OrdinalIgnoreCase) ||
92+
msg.Contains("connection was forcibly closed", StringComparison.OrdinalIgnoreCase) ||
93+
msg.Contains("the database system is shutting down", StringComparison.OrdinalIgnoreCase))
94+
return true;
95+
}
96+
return false;
9097
}
9198

92-
public async Task<int> ExecuteAsync(
93-
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
99+
/// <summary>
100+
/// Acquire → run → release with one transparent retry on a fresh connection
101+
/// when the first attempt fails because the pooled connection died (e.g. after
102+
/// a backend restart). Retries are at-least-once; non-idempotent statements may
103+
/// be applied twice if the failure happens after server-side commit.
104+
/// </summary>
105+
private async Task<T> WithRetryAsync<T>(Func<PostgresConnection, Task<T>> op, CancellationToken ct)
94106
{
95107
var conn = await AcquireAsync(ct).ConfigureAwait(false);
96-
try { return await conn.ExecuteAsync(sql, parameters, ct).ConfigureAwait(false); }
97-
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
108+
try
109+
{
110+
try
111+
{
112+
return await op(conn).ConfigureAwait(false);
113+
}
114+
catch (Exception ex) when (IsConnectionDead(ex, conn))
115+
{
116+
await DiscardAsync(conn).ConfigureAwait(false);
117+
conn = await AcquireAsync(ct).ConfigureAwait(false);
118+
return await op(conn).ConfigureAwait(false);
119+
}
120+
}
121+
finally
122+
{
123+
await ReleaseAsync(conn).ConfigureAwait(false);
124+
}
98125
}
99126

100-
public async Task<SqlDataTable> QueryTableAsync(
127+
// ── ISqlDatabase ───────────────────────────────────────────────────────────
128+
129+
public Task<IReadOnlyList<SqlRow>> QueryAsync(
101130
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
102-
{
103-
var conn = await AcquireAsync(ct).ConfigureAwait(false);
104-
try { return await conn.QueryTableAsync(sql, parameters, ct).ConfigureAwait(false); }
105-
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
106-
}
131+
=> WithRetryAsync(c => c.QueryAsync(sql, parameters, ct), ct);
132+
133+
public Task<IReadOnlyList<T>> QueryAsync<T>(
134+
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
135+
where T : new()
136+
=> WithRetryAsync(c => c.QueryAsync<T>(sql, parameters, ct), ct);
137+
138+
public Task<int> ExecuteAsync(
139+
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
140+
=> WithRetryAsync(c => c.ExecuteAsync(sql, parameters, ct), ct);
141+
142+
public Task<SqlDataTable> QueryTableAsync(
143+
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
144+
=> WithRetryAsync(c => c.QueryTableAsync(sql, parameters, ct), ct);
107145

108146
public async Task BeginTransactionAsync(CancellationToken ct = default)
109147
{
110-
var conn = await AcquireAsync(ct).ConfigureAwait(false);
111-
try { await ((ISqlDatabase)conn).BeginTransactionAsync(ct).ConfigureAwait(false); }
112-
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
148+
await WithRetryAsync<int>(async c =>
149+
{
150+
await ((ISqlDatabase)c).BeginTransactionAsync(ct).ConfigureAwait(false);
151+
return 0;
152+
}, ct).ConfigureAwait(false);
113153
}
114154

115155
public async Task CommitAsync(CancellationToken ct = default)
116156
{
117-
var conn = await AcquireAsync(ct).ConfigureAwait(false);
118-
try { await ((ISqlDatabase)conn).CommitAsync(ct).ConfigureAwait(false); }
119-
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
157+
await WithRetryAsync<int>(async c =>
158+
{
159+
await ((ISqlDatabase)c).CommitAsync(ct).ConfigureAwait(false);
160+
return 0;
161+
}, ct).ConfigureAwait(false);
120162
}
121163

122164
public async Task RollbackAsync(CancellationToken ct = default)
123165
{
124-
var conn = await AcquireAsync(ct).ConfigureAwait(false);
125-
try { await ((ISqlDatabase)conn).RollbackAsync(ct).ConfigureAwait(false); }
126-
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
166+
await WithRetryAsync<int>(async c =>
167+
{
168+
await ((ISqlDatabase)c).RollbackAsync(ct).ConfigureAwait(false);
169+
return 0;
170+
}, ct).ConfigureAwait(false);
127171
}
128172

129173
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(

0 commit comments

Comments
 (0)