From 4467cb501c3638d1760fede841ddba34b53d8b3e Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Tue, 2 Dec 2025 16:15:29 +0100 Subject: [PATCH] Pool queries should honor connection timeout (#1586) See #1232 If a connection timeout is defined in connect options, the pool uses it when users acquire a connection with pool.getConnection, but not when they execute a query with pool.query Signed-off-by: Thomas Segismont --- .../io/vertx/tests/pgclient/PgPoolTest.java | 14 ++++++ .../impl/pool/SqlConnectionPool.java | 44 ++++++++++++++----- .../sqlclient/internal/pool/PoolImpl.java | 7 ++- 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java index d32377f39..83cf43321 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgPoolTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collector; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; @@ -615,4 +616,17 @@ public void testConnectionClosedInHook(TestContext ctx) { })); })); } + + @Test + public void testPooledQueryTimeout(TestContext ctx) { + Async async = ctx.async(); + PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setConnectionTimeout(1).setConnectionTimeoutUnit(SECONDS); + Pool pool = createPool(options, poolOptions); + pool.getConnection().onComplete(ctx.asyncAssertSuccess(conn -> { + pool.query("SELECT 1").execute().onComplete(ctx.asyncAssertFailure(t -> { + conn.close(); + async.complete(); + })); + })); + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index ac5b68f17..a3d7343ca 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -202,34 +202,54 @@ private void endMetric(Object metric) { } } + private static final Exception POOL_QUERY_TIMEOUT_EXCEPTION = new VertxException("Timeout waiting for connection", true); + // TODO : try optimize without promise - public void execute(CommandBase cmd, Completable handler) { + public void execute(CommandBase cmd, Completable handler, long timeout) { ContextInternal context = vertx.getOrCreateContext(); Promise> p = context.promise(); + long timerId; + if (timeout > 0) { + timerId = vertx.setTimer(timeout, t -> handler.fail(POOL_QUERY_TIMEOUT_EXCEPTION)); + } else { + timerId = -1; + } Object metric = enqueueMetric(); pool.acquire(context, 0, p); p.future().compose(lease -> { dequeueMetric(metric); PooledConnection pooled = lease.get(); - pooled.timerMetric = beginMetric(); - Connection conn = pooled.conn; - Future future; - if (afterAcquire != null) { - future = afterAcquire.apply(conn) - .compose(v -> Future.future(d -> pooled.schedule(cmd, d))) - .eventually(() -> beforeRecycle.apply(conn)); + if (timerId != -1 && !vertx.cancelTimer(timerId)) { + // We want to make sure the connection is released properly below + // But we don't want to record begin/end pool metrics + pooled.timerMetric = NO_METRICS; + future = Future.failedFuture(POOL_QUERY_TIMEOUT_EXCEPTION); } else { - PromiseInternal pp = context.promise(); - pooled.schedule(cmd, pp); - future = pp; + pooled.timerMetric = beginMetric(); + if (afterAcquire != null) { + Connection conn = pooled.conn; + future = afterAcquire.apply(conn) + .compose(v -> Future.future(d -> pooled.schedule(cmd, d))) + .eventually(() -> beforeRecycle.apply(conn)); + } else { + PromiseInternal pp = context.promise(); + pooled.schedule(cmd, pp); + future = pp; + } } return future.andThen(ar -> { endMetric(pooled.timerMetric); pooled.refresh(); lease.recycle(); }); - }).onComplete(handler); + }).onComplete(ar -> { + if (ar.succeeded()) { + handler.succeed(ar.result()); + } else if (!POOL_QUERY_TIMEOUT_EXCEPTION.equals(ar.cause())) { + handler.fail(ar.cause()); + } + }); } public void acquire(ContextInternal context, long timeout, Completable handler) { diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java index 1a82ba721..65c6bf5b6 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java @@ -25,7 +25,10 @@ import io.vertx.core.internal.VertxInternal; import io.vertx.core.spi.metrics.PoolMetrics; import io.vertx.core.spi.metrics.VertxMetrics; -import io.vertx.sqlclient.*; +import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.TransactionRollbackException; import io.vertx.sqlclient.impl.TransactionPropagationLocal; import io.vertx.sqlclient.impl.pool.SqlConnectionPool; import io.vertx.sqlclient.internal.Connection; @@ -171,7 +174,7 @@ public Future getConnection() { @Override public void schedule(CommandBase cmd, Completable handler) { - pool.execute(cmd, handler); + pool.execute(cmd, handler, connectionTimeout); } private void acquire(ContextInternal context, long timeout, Completable completionHandler) {