Skip to content

Commit f2eca6b

Browse files
authored
Pool queries should honor connection timeout (#1586) (#1587)
See #1232 If a connection timeout is defined in connect options, the pool uses it when users acquire a connection with pool.getConnection, but not when they execute a query with pool.query Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent 68369c2 commit f2eca6b

3 files changed

Lines changed: 51 additions & 14 deletions

File tree

vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.atomic.AtomicReference;
4343
import java.util.stream.Collector;
4444

45+
import static java.util.concurrent.TimeUnit.SECONDS;
4546
import static java.util.stream.Collectors.mapping;
4647
import static java.util.stream.Collectors.toList;
4748

@@ -615,4 +616,17 @@ public void testConnectionClosedInHook(TestContext ctx) {
615616
}));
616617
}));
617618
}
619+
620+
@Test
621+
public void testPooledQueryTimeout(TestContext ctx) {
622+
Async async = ctx.async();
623+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setConnectionTimeout(1).setConnectionTimeoutUnit(SECONDS);
624+
Pool pool = createPool(options, poolOptions);
625+
pool.getConnection().onComplete(ctx.asyncAssertSuccess(conn -> {
626+
pool.query("SELECT 1").execute().onComplete(ctx.asyncAssertFailure(t -> {
627+
conn.close();
628+
async.complete();
629+
}));
630+
}));
631+
}
618632
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -202,34 +202,54 @@ private void endMetric(Object metric) {
202202
}
203203
}
204204

205+
private static final Exception POOL_QUERY_TIMEOUT_EXCEPTION = new VertxException("Timeout waiting for connection", true);
206+
205207
// TODO : try optimize without promise
206-
public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
208+
public <R> void execute(CommandBase<R> cmd, Completable<R> handler, long timeout) {
207209
ContextInternal context = vertx.getOrCreateContext();
208210
Promise<Lease<PooledConnection>> p = context.promise();
211+
long timerId;
212+
if (timeout > 0) {
213+
timerId = vertx.setTimer(timeout, t -> handler.fail(POOL_QUERY_TIMEOUT_EXCEPTION));
214+
} else {
215+
timerId = -1;
216+
}
209217
Object metric = enqueueMetric();
210218
pool.acquire(context, 0, p);
211219
p.future().compose(lease -> {
212220
dequeueMetric(metric);
213221
PooledConnection pooled = lease.get();
214-
pooled.timerMetric = beginMetric();
215-
Connection conn = pooled.conn;
216-
217222
Future<R> future;
218-
if (afterAcquire != null) {
219-
future = afterAcquire.apply(conn)
220-
.compose(v -> Future.<R>future(d -> pooled.schedule(cmd, d)))
221-
.eventually(() -> beforeRecycle.apply(conn));
223+
if (timerId != -1 && !vertx.cancelTimer(timerId)) {
224+
// We want to make sure the connection is released properly below
225+
// But we don't want to record begin/end pool metrics
226+
pooled.timerMetric = NO_METRICS;
227+
future = Future.failedFuture(POOL_QUERY_TIMEOUT_EXCEPTION);
222228
} else {
223-
PromiseInternal<R> pp = context.promise();
224-
pooled.schedule(cmd, pp);
225-
future = pp;
229+
pooled.timerMetric = beginMetric();
230+
if (afterAcquire != null) {
231+
Connection conn = pooled.conn;
232+
future = afterAcquire.apply(conn)
233+
.compose(v -> Future.<R>future(d -> pooled.schedule(cmd, d)))
234+
.eventually(() -> beforeRecycle.apply(conn));
235+
} else {
236+
PromiseInternal<R> pp = context.promise();
237+
pooled.schedule(cmd, pp);
238+
future = pp;
239+
}
226240
}
227241
return future.andThen(ar -> {
228242
endMetric(pooled.timerMetric);
229243
pooled.refresh();
230244
lease.recycle();
231245
});
232-
}).onComplete(handler);
246+
}).onComplete(ar -> {
247+
if (ar.succeeded()) {
248+
handler.succeed(ar.result());
249+
} else if (!POOL_QUERY_TIMEOUT_EXCEPTION.equals(ar.cause())) {
250+
handler.fail(ar.cause());
251+
}
252+
});
233253
}
234254

235255
public void acquire(ContextInternal context, long timeout, Completable<PooledConnection> handler) {

vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import io.vertx.core.internal.VertxInternal;
2626
import io.vertx.core.spi.metrics.PoolMetrics;
2727
import io.vertx.core.spi.metrics.VertxMetrics;
28-
import io.vertx.sqlclient.*;
28+
import io.vertx.sqlclient.Pool;
29+
import io.vertx.sqlclient.PoolOptions;
30+
import io.vertx.sqlclient.SqlConnection;
31+
import io.vertx.sqlclient.TransactionRollbackException;
2932
import io.vertx.sqlclient.impl.TransactionPropagationLocal;
3033
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
3134
import io.vertx.sqlclient.internal.Connection;
@@ -171,7 +174,7 @@ public Future<SqlConnection> getConnection() {
171174

172175
@Override
173176
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
174-
pool.execute(cmd, handler);
177+
pool.execute(cmd, handler, connectionTimeout);
175178
}
176179

177180
private void acquire(ContextInternal context, long timeout, Completable<SqlConnectionPool.PooledConnection> completionHandler) {

0 commit comments

Comments
 (0)