From 45e4b7793992a275ba6634393a26ca721f9a2600 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 1 May 2026 14:04:06 +0100 Subject: [PATCH 1/4] Optimize last-position sequence lookup --- .../queue/impl/single/SCQIndexing.java | 71 +++++++++++++++++-- .../single/SingleChronicleQueueStore.java | 4 +- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java index c28765e631..d09a889fc5 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java @@ -505,7 +505,8 @@ private ScanResult linearScan0(@NotNull final Wire wire, fromKnownIndex = lastIndex; } - bytes.readPositionUnlimited(knownAddress); + bytes.readLimitToCapacity(); + bytes.readPosition(knownAddress); for (long i = fromKnownIndex; ; i++) { try { @@ -565,10 +566,26 @@ long linearScanByPosition(@NotNull final Wire wire, final long indexOfNext, final long startAddress, boolean inclusive) throws EOFException { + return linearScanByPosition(wire, toPosition, indexOfNext, startAddress, inclusive, "linearScan by position"); + } + + /** + * Variant that lets the caller supply a description for the perf log so different scan + * origins (fast path from {@code writePosition}, fall-through from indexed anchor, etc.) + * are distinguishable in logs -- important because the whole point of the + * {@code MAX_VALUE} fast path is to avoid the brute-force fall-through scan, and + * we want to be able to verify that from log output rather than guess. + */ + long linearScanByPosition(@NotNull final Wire wire, + final long toPosition, + final long indexOfNext, + final long startAddress, + boolean inclusive, + String desc) throws EOFException { long start = REPORT_LINEAR_SCAN ? System.nanoTime() : 0; long index = linearScanByPosition0(wire, toPosition, indexOfNext, startAddress, inclusive); if (REPORT_LINEAR_SCAN) { - printLinearScanTime(index, startAddress, start, "linearScan by position"); + printLinearScanTime(index, startAddress, start, desc); } return index; } @@ -664,12 +681,13 @@ long linearScanByPosition0(@NotNull final Wire wire, * @return The starting index for the scan. */ private long calculateInitialValue(long toPosition, long indexOfNext, long startAddress, Bytes bytes, long lastAddress, long lastIndex) { + bytes.readLimit(bytes.capacity()); if (lastAddress > 0 && toPosition == lastAddress && lastIndex != Sequence.NOT_FOUND && lastIndex != Sequence.NOT_FOUND_RETRY) { - bytes.readPositionUnlimited(toPosition); + bytes.readPosition(toPosition); return lastIndex - 1; } else { - bytes.readPositionUnlimited(startAddress); + bytes.readPosition(startAddress); return indexOfNext - 1; } } @@ -709,6 +727,44 @@ public long nextEntryToBeIndexed() { return nextEntryToBeIndexed.getVolatileValue(); } + long sequenceForMaxPosition(@NotNull ExcerptContext ec, + boolean inclusive) throws StreamCorruptedException { + int retry = 0; + for (; retry < 128; retry++) { + long lastWritePos = writePosition.getVolatileValue(); + long latestSeq = sequence.getSequence(lastWritePos); + if (latestSeq >= 0) { + if (retry > 0) { + Jvm.perf().on(getClass(), + "sequenceForPosition(MAX_VALUE) tracker-read retried" + " " + retry + " times"); + } + try { + return linearScanByPosition(ec.wireForIndex(), Long.MAX_VALUE, latestSeq, lastWritePos, inclusive, + "linearScan from writePosition (sequenceForPosition fast path)"); + } catch (EOFException e) { + throw new UncheckedIOException(e); + } + } + if (latestSeq == Sequence.NOT_FOUND) + break; + + // NOT_FOUND_RETRY: writer raced. Spin-retry for the first couple of iterations + // (the race window is sub-microsecond); after that yield to let the writer make + // progress instead of starving them with our volatile reads. + if (retry > 2) + Thread.yield(); + } + if (retry >= 128) { + // Retry budget exhausted in a row of NOT_FOUND_RETRY -- surface this clearly: + // it means we're about to do a brute-force indexed-anchor scan instead of the + // efficient writePos-anchored fast-path scan. The log makes a regression visible. + Jvm.perf().on(getClass(), + "sequenceForPosition(MAX_VALUE) tracker-read retry loop exhausted" + " after " + retry + + " attempts; falling through to indexed lookup"); + } + return sequenceForPosition(ec, Long.MAX_VALUE, inclusive); + } + /** * Returns the sequence number for a given position in the wire. * If an exact match is found for the position, the corresponding index is returned; @@ -773,7 +829,7 @@ long sequenceForPosition(@NotNull ExcerptContext ec, } try { // Perform a linear scan if no exact match is found. - return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive); + return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive, "linearScan from indexed anchor (sequenceForPosition fall-through)"); } catch (EOFException e) { throw new UncheckedIOException(e); } @@ -976,13 +1032,16 @@ public long lastSequenceNumber(@NotNull ExcerptContext ec) break; try { Wire wireForIndex = ec.wireForIndex(); - return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true); + return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true, + "linearScan from writePosition (lastSequenceNumber tail-check)"); } catch (EOFException e) { throw new UncheckedIOException(e); } } } + // The write-position retry budget has already been spent above. Fall through + // directly to the indexed lookup instead of restarting the MAX_VALUE fast path. return sequenceForPosition(ec, Long.MAX_VALUE, false); } diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java index e8c123f345..08f7165032 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java @@ -377,7 +377,9 @@ public MappedBytes bytes() { public long sequenceForPosition(@NotNull final ExcerptContext ec, final long position, boolean inclusive) throws StreamCorruptedException { throwExceptionIfClosed(); - return indexing.sequenceForPosition(ec, position, inclusive); + return position == Long.MAX_VALUE + ? indexing.sequenceForMaxPosition(ec, inclusive) + : indexing.sequenceForPosition(ec, position, inclusive); } /** From 9989fc589acd889b882c8aa5b9869c50542bb3e2 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 1 May 2026 14:04:21 +0100 Subject: [PATCH 2/4] Bound queue lock pausing --- .../single/SingleChronicleQueueBuilder.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java index 4b38277198..1a048c67e0 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java @@ -69,6 +69,16 @@ public class SingleChronicleQueueBuilder extends SelfDescribingMarshallable impl private static final Constructor ENTERPRISE_QUEUE_CONSTRUCTOR; private static final WireStoreFactory storeFactory = SingleChronicleQueueBuilder::createStore; private static final Supplier TIMING_PAUSER_SUPPLIER = DefaultPauserSupplier.INSTANCE; + /** + * Maximum park duration (ms) for the writeLock/appendLock pauser. + */ + private static final int WRITE_LOCK_PAUSER_MAX_MS = Jvm.getInteger("chronicle.queue.writeLockPauserMaxMs", 5); + /** + * Pauser supplier used by {@link #writeLock()} / {@link #appendLock()}. Uses + * {@link Pauser#balancedUpToMillis(int)} + */ + private static final Supplier WRITE_LOCK_PAUSER_SUPPLIER = + () -> Pauser.balancedUpToMillis(WRITE_LOCK_PAUSER_MAX_MS); static { CLASS_ALIASES.addAlias(WireType.class); @@ -711,7 +721,9 @@ public SingleChronicleQueueBuilder createAppenderConditionCreator(Function yield -> parkNanos, capped at ~5ms) + return readOnly() ? new ReadOnlyWriteLock() + : new TableStoreWriteLock(metaStore, WRITE_LOCK_PAUSER_SUPPLIER, timeoutMS() * 3 / 2); } /** @@ -1594,7 +1606,10 @@ public SingleChronicleQueueBuilder setAllNullFields(@Nullable SingleChronicleQue * @return the append lock for the queue */ public WriteLock appendLock() { - return readOnly() ? WriteLock.NO_OP : new AppendLock(metaStore, pauserSupplier(), timeoutMS() * 3 / 2); + // Same balanced-pauser rationale as writeLock(): the lock holder is in another thread, + // tight spinning is pure CPU burn under contention. + return readOnly() ? WriteLock.NO_OP + : new AppendLock(metaStore, WRITE_LOCK_PAUSER_SUPPLIER, timeoutMS() * 3 / 2); } /** From 5bac48cf7425cc9effab1f6d33d0f97c607e35f2 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 1 May 2026 14:04:35 +0100 Subject: [PATCH 3/4] Harden contention regression tests --- .../queue/ChronicleHistoryReaderMainTest.java | 3 ++- .../chronicle/queue/ChronicleRollingIssueTest.java | 14 ++++++++------ .../impl/single/AppenderFileHandleLeakTest.java | 4 ++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java b/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java index 9e117f9640..daf1305192 100644 --- a/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java +++ b/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java @@ -43,7 +43,8 @@ public void setUp() { @After public void tearDown() { - System.setSecurityManager(null); + if (Jvm.majorVersion() < 17) + System.setSecurityManager(null); } @Test diff --git a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java index 35d49d73b9..3f71fe7fbb 100644 --- a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import static net.openhft.chronicle.queue.rollcycles.TestRollCycles.TEST_SECONDLY; @@ -39,9 +40,10 @@ public void tearDown() { IOTools.deleteDirWithFiles(path); } - @Test + @Test(timeout = 180_000L) public void test() throws InterruptedException { - int threads = Math.min(64, Runtime.getRuntime().availableProcessors() * 4) - 1; + int processors = Runtime.getRuntime().availableProcessors(); + int threads = Math.max(2, Math.min(64, processors * 2) - 1); int messages = 100; AtomicInteger count = new AtomicInteger(); @@ -55,12 +57,12 @@ public void test() throws InterruptedException { .rollCycle(TEST_SECONDLY).build(); ExcerptAppender appender = writeQueue.createAppender()) { for (int i = 0; i < messages; i++) { - long millis = System.currentTimeMillis() % 100; - if (millis > 1 && millis < 99) { - Jvm.pause(99 - millis); - } Map map = new HashMap<>(); map.put("key", Thread.currentThread().getName() + " - " + i); + + long nanos = System.nanoTime() % 100_000_000; + // should cause them all to try to wake and write at the same time + LockSupport.parkNanos(100_000_000 - nanos); appender.writeMap(map); count.incrementAndGet(); } diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java index f956db4aca..d17ff8b20b 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java @@ -89,7 +89,7 @@ public void setUp() { queuePath = getTmpDir(); } - @Test + @Test(timeout = 180_000L) public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() throws InterruptedException, TimeoutException, ExecutionException { finishedNormally = false; try (ChronicleQueue queue = createQueue(SYSTEM_TIME_PROVIDER)) { @@ -125,7 +125,7 @@ public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() thr finishedNormally = true; } - @Test + @Test(timeout = 180_000L) public void tailerResourcesCanBeReleasedManually() throws Exception { FlakyTestRunner.builder(this::tailerResourcesCanBeReleasedManually0).build().run(); } From 75bed112e415d381c4630f692f8afc9404507c85 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Wed, 13 May 2026 10:06:03 +0100 Subject: [PATCH 4/4] Refactor writeLock and appendLock to use pauserSupplier directly --- .../single/SingleChronicleQueueBuilder.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java index 1a048c67e0..4b38277198 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java @@ -69,16 +69,6 @@ public class SingleChronicleQueueBuilder extends SelfDescribingMarshallable impl private static final Constructor ENTERPRISE_QUEUE_CONSTRUCTOR; private static final WireStoreFactory storeFactory = SingleChronicleQueueBuilder::createStore; private static final Supplier TIMING_PAUSER_SUPPLIER = DefaultPauserSupplier.INSTANCE; - /** - * Maximum park duration (ms) for the writeLock/appendLock pauser. - */ - private static final int WRITE_LOCK_PAUSER_MAX_MS = Jvm.getInteger("chronicle.queue.writeLockPauserMaxMs", 5); - /** - * Pauser supplier used by {@link #writeLock()} / {@link #appendLock()}. Uses - * {@link Pauser#balancedUpToMillis(int)} - */ - private static final Supplier WRITE_LOCK_PAUSER_SUPPLIER = - () -> Pauser.balancedUpToMillis(WRITE_LOCK_PAUSER_MAX_MS); static { CLASS_ALIASES.addAlias(WireType.class); @@ -721,9 +711,7 @@ public SingleChronicleQueueBuilder createAppenderConditionCreator(Function yield -> parkNanos, capped at ~5ms) - return readOnly() ? new ReadOnlyWriteLock() - : new TableStoreWriteLock(metaStore, WRITE_LOCK_PAUSER_SUPPLIER, timeoutMS() * 3 / 2); + return readOnly() ? new ReadOnlyWriteLock() : new TableStoreWriteLock(metaStore, pauserSupplier(), timeoutMS() * 3 / 2); } /** @@ -1606,10 +1594,7 @@ public SingleChronicleQueueBuilder setAllNullFields(@Nullable SingleChronicleQue * @return the append lock for the queue */ public WriteLock appendLock() { - // Same balanced-pauser rationale as writeLock(): the lock holder is in another thread, - // tight spinning is pure CPU burn under contention. - return readOnly() ? WriteLock.NO_OP - : new AppendLock(metaStore, WRITE_LOCK_PAUSER_SUPPLIER, timeoutMS() * 3 / 2); + return readOnly() ? WriteLock.NO_OP : new AppendLock(metaStore, pauserSupplier(), timeoutMS() * 3 / 2); } /**