Skip to content

Commit 98f8d66

Browse files
wezellclaudeswicken
authored
perf(cache): migrate H22 cache async commits to virtual threads (#35992)
## Proposed Changes Migrates the **H22 cache async commit executor** from a fixed pool of 5 platform threads to **virtual threads**. Async cache writes (`put`/`remove` against the embedded H2 store) are I/O-bound, so they're a natural fit for virtual threads now that we're on Java 25 — **JEP 491** means `synchronized` blocks in Hikari/H2 no longer pin the carrier thread, so the classic "virtual threads + JDBC pinning" concern does not apply. ### What changed (`H22Cache.java`) - `spawnNewThreadPool()` now returns `Executors.newThreadPerTaskExecutor(...)` with a named virtual-thread factory (`H22-ASYNC-COMMIT-*`). Removed the Guava `ThreadFactoryBuilder`, `ThreadPoolExecutor`, and `LinkedBlockingQueue`. - A shared `submitAsync(Runnable)` helper now backs both `putAsync` and `removeAsync`. ### Behavior preserved (not a behavior change) | Concern | How it's handled | |---|---| | **Backpressure** | The real backpressure was always the up-front `shouldAsync()` check, which falls back to a **synchronous commit on the caller**. Unchanged. The old `CallerRunsPolicy` was effectively dead code — the unbounded queue never rejected. | | **Backlog metric** | A virtual-thread-per-task executor has no shared queue, so `isAllocationWithinTolerance()` now reads an `AtomicInteger` in-flight counter instead of `asyncTaskQueue.size()`. Same semantics; the `@Ignore`d `H22CacheTest` still compiles. | | **DB concurrency** | A `Semaphore` (sized by the existing `cache_h22_async_threads` knob, default 5) acquired **inside** each task caps how many commits hit the Hikari pool at once — preventing connection-timeout storms and spurious shard rebuilds under burst load. Virtual threads block cheaply on the semaphore. | | **Shutdown** | In-flight counter is balanced on `RejectedExecutionException` at shutdown; executor shutdown path unchanged. | The obsolete `cache.h22.async.caller.runs.policy` config is no longer read. ## This fixes / relates to Part of #35991 (Java 25 Performance Improvements) — first child PR. ## Testing - `./mvnw compile -pl :dotcms-core` → **BUILD SUCCESS**. - No integration suite run locally (per repo guidance). Reviewer note: worth a sanity run of `H22CacheTest` and a write-heavy cache smoke test. ## Checklist - [x] Compiles (`BUILD SUCCESS`) - [x] No public API / behavior change intended - [ ] Reviewer to confirm cache behavior under load 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: Scott Wicken <1562170+swicken@users.noreply.github.com>
1 parent d27c1aa commit 98f8d66

1 file changed

Lines changed: 47 additions & 21 deletions

File tree

  • dotCMS/src/main/java/com/dotmarketing/business/cache/provider/h22

dotCMS/src/main/java/com/dotmarketing/business/cache/provider/h22/H22Cache.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import com.dotmarketing.util.UtilMethods;
1212
import com.github.benmanes.caffeine.cache.Cache;
1313
import com.github.benmanes.caffeine.cache.Caffeine;
14-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
1514
import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;
1615
import io.vavr.control.Try;
1716
import java.io.BufferedInputStream;
@@ -34,12 +33,13 @@
3433
import java.util.Optional;
3534
import java.util.Set;
3635
import java.util.concurrent.ExecutorService;
37-
import java.util.concurrent.LinkedBlockingQueue;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.RejectedExecutionException;
3838
import java.util.concurrent.Semaphore;
3939
import java.util.concurrent.ThreadFactory;
40-
import java.util.concurrent.ThreadPoolExecutor;
4140
import java.util.concurrent.TimeUnit;
4241
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.concurrent.atomic.AtomicInteger;
4343
import org.apache.commons.io.comparator.LastModifiedFileComparator;
4444
import org.apache.commons.io.filefilter.DirectoryFileFilter;
4545

@@ -53,8 +53,13 @@ public class H22Cache extends CacheProvider {
5353
final boolean shouldAsync=Config.getBooleanProperty("cache_h22_async", true);
5454

5555

56-
final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("H22-ASYNC-COMMIT-%d").build();
57-
final private LinkedBlockingQueue<Runnable> asyncTaskQueue = new LinkedBlockingQueue<>();
56+
// Async cache commits run on virtual threads: they block cheaply on JDBC/disk I/O instead of
57+
// pinning a small pool of platform threads. numberOfAsyncThreads now sizes a Semaphore that caps
58+
// how many commits hit the H2/Hikari pool concurrently (embedded H2 writes don't scale linearly),
59+
// while inFlightTasks tracks the total async backlog that isAllocationWithinTolerance() reads.
60+
final ThreadFactory namedThreadFactory = Thread.ofVirtual().name("H22-ASYNC-COMMIT-", 0).factory();
61+
private final Semaphore dbWorkPermits = new Semaphore(Math.max(1, numberOfAsyncThreads));
62+
private final AtomicInteger inFlightTasks = new AtomicInteger(0);
5863
private ExecutorService executorService;
5964

6065

@@ -115,16 +120,10 @@ public boolean isDistributed() {
115120

116121

117122
private ExecutorService spawnNewThreadPool() {
118-
119-
if (Config.getBooleanProperty("cache.h22.async.caller.runs.policy", true)) {
120-
return new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10,
121-
TimeUnit.SECONDS, asyncTaskQueue, namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()
122-
);
123-
}
124-
125-
return new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10,
126-
TimeUnit.SECONDS, asyncTaskQueue, namedThreadFactory
127-
);
123+
// One virtual thread per submitted commit. Backpressure is handled up-front by shouldAsync()
124+
// (which falls back to running the commit synchronously on the caller), and dbWorkPermits caps
125+
// how many of these virtual threads touch the H2/Hikari pool at the same time.
126+
return Executors.newThreadPerTaskExecutor(namedThreadFactory);
128127
}
129128

130129

@@ -183,15 +182,43 @@ public void put(final String group, final String key, final Object content) {
183182

184183

185184
void putAsync(final Fqn fqn, final Object content) {
186-
187-
executorService.submit(()-> {
185+
submitAsync(() -> {
188186
try {
189187
// Add the given content to the group and for a given key
190188
doUpsert(fqn, (Serializable) content);
191189
} catch (Exception e) {
192190
handleError(e, fqn);
193191
}
194-
});
192+
});
193+
}
194+
195+
/**
196+
* Submits a cache commit to run on a virtual thread. The {@link #inFlightTasks} counter is bumped
197+
* before submission and cleared in a {@code finally} so {@link #isAllocationWithinTolerance()} sees
198+
* the real backlog, and the task acquires a {@link #dbWorkPermits} permit so only a bounded number
199+
* of commits hit the H2/Hikari pool at once.
200+
*/
201+
private void submitAsync(final Runnable dbTask) {
202+
inFlightTasks.incrementAndGet();
203+
try {
204+
executorService.submit(() -> {
205+
try {
206+
dbWorkPermits.acquire();
207+
try {
208+
dbTask.run();
209+
} finally {
210+
dbWorkPermits.release();
211+
}
212+
} catch (InterruptedException e) {
213+
Thread.currentThread().interrupt();
214+
} finally {
215+
inFlightTasks.decrementAndGet();
216+
}
217+
});
218+
} catch (RejectedExecutionException e) {
219+
// Executor is shutting down; keep the counter balanced.
220+
inFlightTasks.decrementAndGet();
221+
}
195222
}
196223

197224

@@ -291,7 +318,7 @@ public void remove(final String group, final String key) {
291318
* @return
292319
*/
293320
boolean isAllocationWithinTolerance() {
294-
final int size = asyncTaskQueue.size();
321+
final int size = inFlightTasks.get();
295322
final float allocation = (float) size / (float) asyncTaskQueueSize;
296323
Logger.debug(H22Cache.class,
297324
() -> " size is " + size + ", allocation is " + allocation + ", tolerance is :"
@@ -309,8 +336,7 @@ boolean shouldAsync() {
309336
}
310337

311338
void removeAsync(final Fqn fqn) {
312-
313-
executorService.submit(()-> {
339+
submitAsync(() -> {
314340
try {
315341
// Invalidates from Cache a key from a given group
316342
doDelete(fqn);

0 commit comments

Comments
 (0)