Skip to content

Commit 6fdc59f

Browse files
authored
vertx_pool_queue_pending grows indefinitely when connection acquisition fails (#1626) (#1627)
See #1625 We must invoke dequeueMetric in all cases, not just in case of timeout. Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent 67963cf commit 6fdc59f

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ public <R> void execute(CommandBase<R> cmd, Completable<R> handler, long timeout
243243
pooled.refresh();
244244
lease.recycle();
245245
});
246+
}, t -> {
247+
dequeueMetric(metric);
248+
return Future.failedFuture(t);
246249
}).onComplete(ar -> {
247250
if (ar.succeeded()) {
248251
handler.succeed(ar.result());
@@ -281,6 +284,7 @@ public void complete(Lease<PooledConnection> lease, Throwable failure) {
281284
handle(lease);
282285
}
283286
} else {
287+
dequeueMetric(metric);
284288
handler.fail(failure);
285289
}
286290
}

vertx-sql-client/src/test/java/io/vertx/tests/sqlclient/tck/MetricsTestBase.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,4 +340,65 @@ public void requestReset(Object requestMetric) {
340340
}));
341341
});
342342
}
343+
344+
@Test
345+
public void testGetConnectionFailure(TestContext ctx) throws Exception {
346+
testConnectionFailure(ctx, true);
347+
}
348+
349+
@Test
350+
public void testPooledConnectionFailure(TestContext ctx) throws Exception {
351+
testConnectionFailure(ctx, false);
352+
}
353+
354+
private void testConnectionFailure(TestContext ctx, boolean useGetConnection) throws Exception {
355+
AtomicInteger queueSize = new AtomicInteger();
356+
List<Object> enqueueMetrics = Collections.synchronizedList(new ArrayList<>());
357+
List<Object> dequeueMetrics = Collections.synchronizedList(new ArrayList<>());
358+
poolMetrics = new PoolMetrics() {
359+
@Override
360+
public Object enqueue() {
361+
Object metric = new Object();
362+
enqueueMetrics.add(metric);
363+
queueSize.incrementAndGet();
364+
return metric;
365+
}
366+
367+
@Override
368+
public void dequeue(Object taskMetric) {
369+
dequeueMetrics.add(taskMetric);
370+
queueSize.decrementAndGet();
371+
}
372+
373+
@Override
374+
public Object begin() {
375+
throw new IllegalStateException("Shouldn't be invoked");
376+
}
377+
378+
@Override
379+
public void end(Object usageMetric) {
380+
throw new IllegalStateException("Shouldn't be invoked");
381+
}
382+
};
383+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setName("the-pool");
384+
SqlConnectOptions connectOptions = connectOptions().setHost("does.not.exist.com");
385+
Pool pool = poolBuilder().with(poolOptions).using(vertx).connectingTo(connectOptions).build();
386+
int num = 16;
387+
List<Future<?>> futures = new ArrayList<>();
388+
for (int i = 0; i < num; i++) {
389+
Future<RowSet<Row>> future;
390+
if (useGetConnection) {
391+
future = pool.withConnection(sqlConn -> sqlConn.query("SELECT * FROM immutable WHERE id=1").execute());
392+
} else {
393+
future = pool.query("SELECT * FROM immutable WHERE id=1").execute();
394+
}
395+
futures.add(future);
396+
}
397+
Future.join(futures).otherwiseEmpty().await(20, SECONDS);
398+
ctx.assertEquals(0, queueSize.get());
399+
ctx.assertEquals(num, enqueueMetrics.size());
400+
ctx.assertEquals(enqueueMetrics, dequeueMetrics);
401+
ctx.assertEquals("sql", poolType);
402+
ctx.assertEquals("the-pool", poolName);
403+
}
343404
}

0 commit comments

Comments
 (0)