diff --git a/README.adoc b/README.adoc index 0a690b25ed..527a5c699b 100644 --- a/README.adoc +++ b/README.adoc @@ -239,6 +239,23 @@ See the respective `package-info.java` files for details. == Chronicle Queue Versions and Remarkable Changes +=== `sequenceForPosition(MAX_VALUE)` now starts from `writePosition` + +`ExcerptTailer.lastIndex()` (via `SCQIndexing.sequenceForPosition(Long.MAX_VALUE, ...)`) +previously always performed an indexed lookup first and then walked forward from the latest +indexed slot -- up to `indexSpacing` entries before `writePosition`. The new fast path consults +the in-memory `Sequence` tracker directly: when the tracker reports a consistent +`(writePosition, sequence)` pair the call walks forward from `writePosition` only. + +This is functionally equivalent: the recovery walk past `writePosition` (used to detect orphan +or in-progress writes -- see `PartialUpdateTest`) is preserved, so partial-write recovery is +unchanged. The win is that we no longer scan the up-to-`indexSpacing` entries between the +latest indexed slot and `writePosition`, which were known-published and therefore redundant. + +The tracker is read with up to 128 retries on `NOT_FOUND_RETRY` (writer mid-update) before +falling through to the original indexed-lookup path; `Jvm.perf()` logs whenever any retry was +needed, so the rare race is observable. + === Changes from Version 4 to Version 5 In Chronicle Queue v5 tailers are now read-only, in Chronicle Queue v4 we had the concept of lazy indexing, where appenders would not write indexes but instead the indexing could be done by the tailer. diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java index c28765e631..c256c1f770 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java @@ -72,6 +72,33 @@ class SCQIndexing extends AbstractCloseable implements Indexing, Demarshallable, // visible for testing int linearScanCount; int linearScanByPositionCount; + /** Maximum {@code NOT_FOUND_RETRY} attempts the {@code MAX_VALUE} fast path will spin + * before falling through to the indexed-lookup recovery path. Non-final and package-local + * so tests can shrink the budget and exercise exhaustion without forcing 128 stub calls + * + 125 {@link Thread#yield()} invocations per test run. */ + static int SEQUENCE_TRACKER_RETRY_BUDGET = 128; + /** Spin this many iterations before {@link Thread#yield()}-ing in the retry loop; the + * writer race window is sub-microsecond so the first few retries should not yield. */ + private static final int SEQUENCE_TRACKER_RETRY_YIELD_AFTER = 2; + // Public log markers emitted on the lastIndex() fast path. Tests / observers attach a perf + // exception handler (see Jvm.setPerfExceptionHandler) and count these messages to assert + // that retries are rare under contention and that the path never falls through to the + // brute-force indexed-anchor scan during a healthy run -- avoiding any per-instance counter + // fields whose only purpose is observability. + static final String LOG_TRACKER_RETRY_PREFIX = + "sequenceForPosition(MAX_VALUE) tracker-read retried"; + static final String LOG_TRACKER_BRUTE_FORCE_FALLTHROUGH = + "sequenceForPosition(MAX_VALUE) tracker-read retry loop exhausted"; + // Description tags passed to printLinearScanTime so each call site is distinguishable in + // perf logs (and so tests can match on a single source of truth rather than copying the + // string). The whole point of the fast path is to take SCAN_LABEL_FAST_PATH (or + // SCAN_LABEL_TAIL_CHECK), not SCAN_LABEL_FALL_THROUGH. + static final String SCAN_LABEL_FAST_PATH = + "linearScan from writePosition (sequenceForPosition fast path)"; + static final String SCAN_LABEL_FALL_THROUGH = + "linearScan from indexed anchor (sequenceForPosition fall-through)"; + static final String SCAN_LABEL_TAIL_CHECK = + "linearScan from writePosition (lastSequenceNumber tail-check)"; Collection closeables = new ArrayList<>(); private long lastScannedIndex = -1; @@ -505,7 +532,8 @@ private ScanResult linearScan0(@NotNull final Wire wire, fromKnownIndex = lastIndex; } - bytes.readPositionUnlimited(knownAddress); + bytes.readLimitToCapacity(); + bytes.readPosition(knownAddress); for (long i = fromKnownIndex; ; i++) { try { @@ -565,10 +593,26 @@ long linearScanByPosition(@NotNull final Wire wire, final long indexOfNext, final long startAddress, boolean inclusive) throws EOFException { + return linearScanByPosition(wire, toPosition, indexOfNext, startAddress, inclusive, "linearScan by position"); + } + + /** + * Variant that lets the caller supply a description for the perf log so different scan + * origins (fast path from {@code writePosition}, fall-through from indexed anchor, etc.) + * are distinguishable in logs -- important because the whole point of the + * {@code MAX_VALUE} fast path is to avoid the brute-force fall-through scan, and + * we want to be able to verify that from log output rather than guess. + */ + long linearScanByPosition(@NotNull final Wire wire, + final long toPosition, + final long indexOfNext, + final long startAddress, + boolean inclusive, + String desc) throws EOFException { long start = REPORT_LINEAR_SCAN ? System.nanoTime() : 0; long index = linearScanByPosition0(wire, toPosition, indexOfNext, startAddress, inclusive); if (REPORT_LINEAR_SCAN) { - printLinearScanTime(index, startAddress, start, "linearScan by position"); + printLinearScanTime(index, startAddress, start, desc); } return index; } @@ -664,12 +708,13 @@ long linearScanByPosition0(@NotNull final Wire wire, * @return The starting index for the scan. */ private long calculateInitialValue(long toPosition, long indexOfNext, long startAddress, Bytes bytes, long lastAddress, long lastIndex) { + bytes.readLimit(bytes.capacity()); if (lastAddress > 0 && toPosition == lastAddress && lastIndex != Sequence.NOT_FOUND && lastIndex != Sequence.NOT_FOUND_RETRY) { - bytes.readPositionUnlimited(toPosition); + bytes.readPosition(toPosition); return lastIndex - 1; } else { - bytes.readPositionUnlimited(startAddress); + bytes.readPosition(startAddress); return indexOfNext - 1; } } @@ -723,6 +768,49 @@ public long nextEntryToBeIndexed() { long sequenceForPosition(@NotNull ExcerptContext ec, final long position, boolean inclusive) throws StreamCorruptedException { + // Fast path for lastIndex() -- historically the dominant slow caller. Writers commit + // (writePosition, sequence) atomically into a single TwoLongValue. When the two halves + // agree we have the sequence at writePosition directly, so we can skip the indexed + // lookup (which walks back to the latest-indexed-anchor -- up to indexSpacing entries + // before writePos). We still walk forward from writePosition to recover any + // partial-write orphans / in-progress writes, matching the semantics of the indexed + // path. Falls through on writer race (NOT_FOUND_RETRY) and on tracker-uninitialised + // (NOT_FOUND), preserving the original behaviour when the optimisation can't be satisfied. + if (position == Long.MAX_VALUE) { + int retry = 0; + for (; retry < SEQUENCE_TRACKER_RETRY_BUDGET; retry++) { + long lastWritePos = writePosition.getVolatileValue(); + long latestSeq = sequence.getSequence(lastWritePos); + if (latestSeq >= 0) { + if (retry > 0) { + Jvm.perf().on(getClass(), + LOG_TRACKER_RETRY_PREFIX + " " + retry + " times"); + } + try { + return linearScanByPosition(ec.wireForIndex(), Long.MAX_VALUE, latestSeq, lastWritePos, inclusive, + SCAN_LABEL_FAST_PATH); + } catch (EOFException e) { + throw new UncheckedIOException(e); + } + } + if (latestSeq == Sequence.NOT_FOUND) + break; + + // NOT_FOUND_RETRY: writer raced. Spin-retry for the first couple of iterations + // (the race window is sub-microsecond); after that yield to let the writer make + // progress instead of starving them with our volatile reads. + if (retry > SEQUENCE_TRACKER_RETRY_YIELD_AFTER) + Thread.yield(); + } + if (retry >= SEQUENCE_TRACKER_RETRY_BUDGET) { + // Retry budget exhausted in a row of NOT_FOUND_RETRY -- surface this clearly: + // it means we're about to do a brute-force indexed-anchor scan instead of the + // efficient writePos-anchored fast-path scan. The log makes a regression visible. + Jvm.perf().on(getClass(), + LOG_TRACKER_BRUTE_FORCE_FALLTHROUGH + " after " + retry + + " attempts; falling through to indexed lookup"); + } + } long indexOfNext = 0; long lastKnownAddress = 0; @NotNull Wire wire = ec.wireForIndex(); @@ -773,7 +861,7 @@ long sequenceForPosition(@NotNull ExcerptContext ec, } try { // Perform a linear scan if no exact match is found. - return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive); + return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive, SCAN_LABEL_FALL_THROUGH); } catch (EOFException e) { throw new UncheckedIOException(e); } @@ -976,7 +1064,8 @@ public long lastSequenceNumber(@NotNull ExcerptContext ec) break; try { Wire wireForIndex = ec.wireForIndex(); - return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true); + return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true, + SCAN_LABEL_TAIL_CHECK); } catch (EOFException e) { throw new UncheckedIOException(e); } diff --git a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java index 35d49d73b9..3511ee57a1 100644 --- a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java @@ -55,10 +55,8 @@ public void test() throws InterruptedException { .rollCycle(TEST_SECONDLY).build(); ExcerptAppender appender = writeQueue.createAppender()) { for (int i = 0; i < messages; i++) { - long millis = System.currentTimeMillis() % 100; - if (millis > 1 && millis < 99) { - Jvm.pause(99 - millis); - } + long millis = System.currentTimeMillis() & 63; + Jvm.pause(millis); Map map = new HashMap<>(); map.put("key", Thread.currentThread().getName() + " - " + i); appender.writeMap(map); diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/LastIndexFastPathStress.java b/src/test/java/net/openhft/chronicle/queue/impl/single/LastIndexFastPathStress.java new file mode 100644 index 0000000000..5bb7f806d3 --- /dev/null +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/LastIndexFastPathStress.java @@ -0,0 +1,438 @@ +/* + * Copyright 2013-2025 chronicle.software; SPDX-License-Identifier: Apache-2.0 + */ +package net.openhft.chronicle.queue.impl.single; + +import com.sun.management.GarbageCollectionNotificationInfo; +import net.openhft.chronicle.bytes.Bytes; +import net.openhft.chronicle.core.Jvm; +import net.openhft.chronicle.core.StackTrace; +import net.openhft.chronicle.core.io.IOTools; +import net.openhft.chronicle.core.onoes.ExceptionHandler; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.queue.TailerDirection; +import net.openhft.chronicle.threads.NamedThreadFactory; +import net.openhft.chronicle.wire.DocumentContext; + +import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReference; + +/** + * On-demand stress harness for the {@code sequenceForPosition(MAX_VALUE)} fast path in + * {@link SCQIndexing}. Not a JUnit test -- run from the command line with + * + *
{@code
+ *   mvn -Dcheckstyle.skip=true test-compile
+ *   java -cp target/test-classes:target/classes:$(cat classpath.txt) \
+ *        net.openhft.chronicle.queue.impl.single.LastIndexFastPathStress
+ * }
+ * + * Or via maven exec: + * + *
{@code
+ *   mvn test-compile exec:java \
+ *     -Dexec.mainClass=net.openhft.chronicle.queue.impl.single.LastIndexFastPathStress \
+ *     -Dexec.classpathScope=test
+ * }
+ * + *

Runs for 120 seconds with one writer per ~4 cores and two readers per core. Prints a + * one-line summary plus a per-phase max-latency breakdown, watchdog stuck-event totals, + * GC summary, and slow-scan kind breakdown. Optional {@code -Dscq.fastpath.stress.delay.ns=N} + * inserts a symmetric busy-wait in both writer and reader loops to widen race windows. + * + *

When JFR is available, each run dumps an allocation profile to + * {@code /tmp/LastIndexFastPathStress-*.jfr} -- inspect with + * {@code jfr print --events 'ObjectAllocation*' }. JFR is loaded reflectively so this + * diagnostic harness still compiles on build agents whose JDK image omits {@code jdk.jfr}. + */ +public final class LastIndexFastPathStress { + + private static final long DURATION_NANOS = TimeUnit.SECONDS.toNanos(120); + private static final long WATCHDOG_THRESHOLD_MS = 25; + private static final long WATCHDOG_POLL_MS = 5; + + private LastIndexFastPathStress() { + } + + public static void main(String[] args) throws Exception { + Path tmpDir = Files.createTempDirectory("LastIndexFastPathStress"); + boolean originalResourceTracing = Jvm.isResourceTracing(); + // Resource tracing captures a Throwable on every reference op, dominating any close- + // phase tail and skewing the GC profile. Off for stress so we measure the real path. + Jvm.setResourceTracing(false); + try { + run(tmpDir); + } finally { + Jvm.setResourceTracing(originalResourceTracing); + IOTools.deleteDirWithFiles(tmpDir.toFile()); + } + } + + private static void run(Path tmpDir) throws Exception { + // Scale by available CPU so contention behaviour is comparable across hosts: many more + // readers than writers so some readers stall on each tick, amplifying the chance that + // a reader observes a writer mid-update (writePosition published, sequence half not + // yet stored), exercising the NOT_FOUND_RETRY path. + final int cores = Runtime.getRuntime().availableProcessors(); + final int writerCount = Math.min(4, (cores + 3) / 4); + final int readerCount = 2 * cores; + // Optional symmetric per-iteration delay (ns) for both writers and readers. + final long writerDelayNs = Long.getLong("scq.fastpath.stress.delay.ns", 0L); + final long readerDelayNs = writerDelayNs; + + // NamedThreadFactory creates CleaningThread instances, so per-thread CleaningThreadLocal + // cleanup hooks fire deterministically when each worker exits at the end of the run -- + // closing every SCQIndexing holder it touched. A plain Executors.defaultThreadFactory + // would skip that cleanup and leak per-thread off-heap holders until the SCQIndexing + // itself is GC'd. + ExecutorService exec = Executors.newFixedThreadPool(writerCount + readerCount, + new NamedThreadFactory("scq-fastpath", true)); + + // Keep the warm appender alive for the lifetime of the run so the cycle's store (and + // therefore its single SCQIndexing instance) stays cached. If we let it go, captured + // counters could refer to a released instance while workers use a freshly-opened one. + try (ChronicleQueue queue = ChronicleQueue.singleBuilder(tmpDir.toFile()) + .rollCycle(RollCycles.FAST_HOURLY) + .build(); + ExcerptAppender warm = queue.createAppender()) { + + for (int i = 0; i < 100; i++) { + try (DocumentContext dc = warm.writingDocument()) { + dc.wire().bytes().writeInt(i); + } + } + + final AtomicLong totalGcEvents = new AtomicLong(); + final AtomicLong totalGcMillis = new AtomicLong(); + final AtomicLong maxGcMillis = new AtomicLong(); + final long testStartMillis = System.currentTimeMillis(); + final NotificationListener gcListener = (notification, handback) -> { + if (!notification.getType().equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) + return; + GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from( + (CompositeData) notification.getUserData()); + long durMs = info.getGcInfo().getDuration(); + totalGcEvents.incrementAndGet(); + totalGcMillis.addAndGet(durMs); + maxGcMillis.accumulateAndGet(durMs, Math::max); + if (durMs >= 10) + System.out.printf("[gc] +%dms %s %s in %dms (cause=%s)%n", + System.currentTimeMillis() - testStartMillis, + info.getGcAction(), info.getGcName(), durMs, info.getGcCause()); + }; + final List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean bean : gcBeans) + ((NotificationEmitter) bean).addNotificationListener(gcListener, null, null); + + // JFR allocation profile, when the runtime has jdk.jfr available. Some CI JDK + // images omit it, so keep this diagnostic optional and avoid a compile-time link. + final AllocationRecording allocRecording = startAllocationRecording(); + + // Recording perf handler: silently aggregate "Took X us to linearScan ..." lines and + // the fast-path retry/exhaustion log markers so the test reports retry counts and + // brute-force fall-through counts WITHOUT SCQIndexing carrying observability-only + // counter fields. Each retry log line carries the retry count -- parse and sum it + // so we get the same total via observation rather than instrumentation. + final ConcurrentMap perfCounts = new ConcurrentHashMap<>(); + final AtomicLong retryAttemptsTotal = new AtomicLong(); + final AtomicLong bruteForceFallThroughs = new AtomicLong(); + final ExceptionHandler recorder = (logger, message, throwable) -> { + if (message == null) + return; + perfCounts.computeIfAbsent(message, k -> new AtomicLong()).incrementAndGet(); + if (message.startsWith(SCQIndexing.LOG_TRACKER_RETRY_PREFIX)) { + retryAttemptsTotal.addAndGet(parseTrailingRetryCount(message)); + } else if (message.startsWith(SCQIndexing.LOG_TRACKER_BRUTE_FORCE_FALLTHROUGH)) { + bruteForceFallThroughs.incrementAndGet(); + } + }; + Jvm.setPerfExceptionHandler(recorder); + + final AtomicBoolean stop = new AtomicBoolean(false); + final AtomicLong totalWrites = new AtomicLong(); + final AtomicLong totalLastIndexCalls = new AtomicLong(); + final AtomicLong sumLatencyNanos = new AtomicLong(); + final AtomicLong maxLatencyNanos = new AtomicLong(); + final AtomicLong maxToEndNanos = new AtomicLong(); + final AtomicLong maxReadNanos = new AtomicLong(); + final AtomicLongArray readerIterationStartNanos = new AtomicLongArray(readerCount); + final Thread[] readerThreads = new Thread[readerCount]; + final AtomicLong stuckEventCount = new AtomicLong(); + final AtomicLong maxStuckMillis = new AtomicLong(); + final AtomicReference failure = new AtomicReference<>(); + final CyclicBarrier barrier = new CyclicBarrier(writerCount + readerCount); + final List> futures = new ArrayList<>(); + + for (int w = 0; w < writerCount; w++) { + final int wid = w; + futures.add(exec.submit(() -> { + try (ExcerptAppender ap = queue.createAppender()) { + barrier.await(); + long count = 0; + while (!stop.get()) { + try (DocumentContext dc = ap.writingDocument()) { + Bytes bytes = dc.wire().bytes(); + bytes.writeInt(wid); + bytes.writeLong(count++); + } + totalWrites.incrementAndGet(); + if (writerDelayNs > 0) + busyWaitNanos(writerDelayNs); + } + } catch (Throwable t) { + failure.compareAndSet(null, t); + } + })); + } + for (int r = 0; r < readerCount; r++) { + final int rid = r; + futures.add(exec.submit(() -> { + readerThreads[rid] = Thread.currentThread(); + // ONE tailer per reader for the entire run -- allocations should scale with + // reader count, not iteration count. + try (ExcerptTailer tailer = queue.createTailer() + .direction(TailerDirection.BACKWARD)) { + barrier.await(); + while (!stop.get()) { + long t0 = System.nanoTime(); + readerIterationStartNanos.lazySet(rid, t0); + tailer.toEnd(); + long t1 = System.nanoTime(); + try (DocumentContext dc = tailer.readingDocument()) { + if (dc.isPresent() && !dc.isMetaData()) + dc.index(); + } + long t2 = System.nanoTime(); + maxToEndNanos.accumulateAndGet(t1 - t0, Math::max); + maxReadNanos.accumulateAndGet(t2 - t1, Math::max); + long elapsed = t2 - t0; + totalLastIndexCalls.incrementAndGet(); + sumLatencyNanos.addAndGet(elapsed); + maxLatencyNanos.accumulateAndGet(elapsed, Math::max); + readerIterationStartNanos.lazySet(rid, 0L); + if (readerDelayNs > 0) + busyWaitNanos(readerDelayNs); + } + } catch (Throwable t) { + failure.compareAndSet(null, t); + } + })); + } + + // Watchdog: every 10ms check per-reader iteration-start. Snapshot the stack of any + // reader stuck > 50ms. StackTrace.forThread captures the trace as a Throwable so it + // formats with a familiar exception layout including the thread name. + Thread watchdog = new Thread(() -> { + long[] lastReportedStart = new long[readerCount]; + while (!stop.get()) { + long now = System.nanoTime(); + for (int i = 0; i < readerCount; i++) { + long start = readerIterationStartNanos.get(i); + if (start == 0L) + continue; + long elapsedMs = (now - start) / 1_000_000L; + if (elapsedMs >= WATCHDOG_THRESHOLD_MS && start != lastReportedStart[i]) { + Thread t = readerThreads[i]; + if (t == null) + continue; + stuckEventCount.incrementAndGet(); + maxStuckMillis.accumulateAndGet(elapsedMs, Math::max); + StackTrace snapshot = StackTrace.forThread(t); + System.out.printf("[watchdog] +%dms reader %d stuck for %dms%n", + System.currentTimeMillis() - testStartMillis, i, elapsedMs); + snapshot.printStackTrace(System.out); + lastReportedStart[i] = start; + } + } + try { + Thread.sleep(WATCHDOG_POLL_MS); + } catch (InterruptedException ie) { + return; + } + } + }, "scq-fastpath-watchdog"); + watchdog.setDaemon(true); + watchdog.start(); + + long deadline = System.nanoTime() + DURATION_NANOS; + while (System.nanoTime() < deadline && failure.get() == null) { + Thread.sleep(500); + } + stop.set(true); + watchdog.interrupt(); + for (GarbageCollectorMXBean bean : gcBeans) { + try { + ((NotificationEmitter) bean).removeNotificationListener(gcListener); + } catch (Exception ignored) { + // best-effort + } + } + for (Future f : futures) f.get(15, TimeUnit.SECONDS); + + Path jfrPath = allocRecording.stopAndDump(); + allocRecording.close(); + Jvm.resetExceptionHandlers(); + + if (failure.get() != null) { + System.out.println("FAILURE: " + failure.get()); + failure.get().printStackTrace(System.out); + System.exit(1); + } + + long calls = totalLastIndexCalls.get(); + long retries = retryAttemptsTotal.get(); + long bruteForceFallThroughCount = bruteForceFallThroughs.get(); + long avgLatencyNs = calls == 0 ? 0 : sumLatencyNanos.get() / calls; + long maxLatencyNs = maxLatencyNanos.get(); + double retryPerCall = calls == 0 ? 0 : (double) retries / calls; + + long efficientFastPathScans = countCapturedMessages(perfCounts, SCQIndexing.SCAN_LABEL_FAST_PATH); + long efficientTailChecks = countCapturedMessages(perfCounts, SCQIndexing.SCAN_LABEL_TAIL_CHECK); + long bruteForceScans = countCapturedMessages(perfCounts, SCQIndexing.SCAN_LABEL_FALL_THROUGH); + + System.out.printf("LastIndexFastPathStress: cores=%d writers=%d readers=%d delayNs=%d writes=%d lastIndexCalls=%d retries=%d retries/call=%.6f avgLatency=%dns maxLatency=%.2fms%n", + cores, writerCount, readerCount, writerDelayNs, + totalWrites.get(), calls, retries, retryPerCall, avgLatencyNs, maxLatencyNs / 1e6); + System.out.printf("LastIndexFastPathStress per-phase max: toEnd=%.2fms read=%.2fms%n", + maxToEndNanos.get() / 1e6, maxReadNanos.get() / 1e6); + System.out.printf("LastIndexFastPathStress watchdog: stuckEvents=%d maxStuckMs=%d (threshold=%dms, poll=%dms)%n", + stuckEventCount.get(), maxStuckMillis.get(), WATCHDOG_THRESHOLD_MS, WATCHDOG_POLL_MS); + System.out.printf("LastIndexFastPathStress gc: events=%d totalMs=%d maxPauseMs=%d%n", + totalGcEvents.get(), totalGcMillis.get(), maxGcMillis.get()); + System.out.printf("LastIndexFastPathStress slow-scan breakdown: bruteForceFallThroughs=%d efficientFastPath=%d efficientTailCheck=%d bruteForceScans=%d%n", + bruteForceFallThroughCount, efficientFastPathScans, efficientTailChecks, bruteForceScans); + if (jfrPath != null) + System.out.println("LastIndexFastPathStress allocation profile: " + jfrPath + + " (try: jfr print --events 'ObjectAllocation*' " + jfrPath + " | head -200)"); + else + System.out.println("LastIndexFastPathStress allocation profile: unavailable (jdk.jfr not present)"); + } finally { + Jvm.resetExceptionHandlers(); + exec.shutdownNow(); + exec.awaitTermination(5, TimeUnit.SECONDS); + } + } + + private static long countCapturedMessages(ConcurrentMap perfCounts, String fragment) { + return perfCounts.entrySet().stream() + .filter(e -> e.getKey().contains(fragment)) + .mapToLong(e -> e.getValue().get()) + .sum(); + } + + /** Extract the retry count from a "{prefix} N times" log line. The prefix is fixed + * ({@link SCQIndexing#LOG_TRACKER_RETRY_PREFIX}); the trailing ' times' is dropped and + * the remainder parsed as an int. Returns 0 if the format is unexpected so a single + * malformed line can't poison the totals. */ + private static long parseTrailingRetryCount(String message) { + int prefixLen = SCQIndexing.LOG_TRACKER_RETRY_PREFIX.length(); + int end = message.lastIndexOf(" times"); + if (end <= prefixLen + 1) + return 0; + try { + return Long.parseLong(message.substring(prefixLen + 1, end)); + } catch (NumberFormatException nfe) { + return 0; + } + } + + /** Tight nanosecond busy-wait. Used for the optional symmetric writer/reader delay knob; + * default is 0 so this is never called. */ + private static void busyWaitNanos(long nanos) { + long deadline = System.nanoTime() + nanos; + while (System.nanoTime() < deadline) { + // spin + } + } + + private static AllocationRecording startAllocationRecording() { + try { + Class recordingClass = Class.forName("jdk.jfr.Recording"); + Object recording = recordingClass.getConstructor().newInstance(); + recordingClass.getMethod("setName", String.class).invoke(recording, "LastIndexFastPath-allocations"); + enableAllocationEvent(recordingClass, recording, "jdk.ObjectAllocationInNewTLAB"); + enableAllocationEvent(recordingClass, recording, "jdk.ObjectAllocationOutsideTLAB"); + recordingClass.getMethod("start").invoke(recording); + return new JfrAllocationRecording(recordingClass, recording); + } catch (Exception | LinkageError e) { + return NoopAllocationRecording.INSTANCE; + } + } + + private static void enableAllocationEvent(Class recordingClass, Object recording, String eventName) throws Exception { + Object eventSettings = recordingClass.getMethod("enable", String.class).invoke(recording, eventName); + eventSettings.getClass().getMethod("withStackTrace").invoke(eventSettings); + } + + private interface AllocationRecording { + Path stopAndDump(); + + void close(); + } + + private enum NoopAllocationRecording implements AllocationRecording { + INSTANCE; + + @Override + public Path stopAndDump() { + return null; + } + + @Override + public void close() { + // no-op + } + } + + private static final class JfrAllocationRecording implements AllocationRecording { + private final Class recordingClass; + private final Object recording; + + private JfrAllocationRecording(Class recordingClass, Object recording) { + this.recordingClass = recordingClass; + this.recording = recording; + } + + @Override + public Path stopAndDump() { + try { + recordingClass.getMethod("stop").invoke(recording); + Path jfrPath = Files.createTempFile("LastIndexFastPathStress-", ".jfr"); + recordingClass.getMethod("dump", Path.class).invoke(recording, jfrPath); + return jfrPath; + } catch (Exception | LinkageError e) { + return null; + } + } + + @Override + public void close() { + try { + recordingClass.getMethod("close").invoke(recording); + } catch (Exception | LinkageError ignored) { + // best-effort + } + } + } +} diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/SequenceForPositionRetryExhaustionTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/SequenceForPositionRetryExhaustionTest.java new file mode 100644 index 0000000000..ad39adbe90 --- /dev/null +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/SequenceForPositionRetryExhaustionTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2013-2025 chronicle.software; SPDX-License-Identifier: Apache-2.0 + */ +package net.openhft.chronicle.queue.impl.single; + +import net.openhft.chronicle.core.Jvm; +import net.openhft.chronicle.core.onoes.ExceptionKey; +import net.openhft.chronicle.wire.Sequence; +import org.junit.jupiter.api.Test; + +import java.io.StreamCorruptedException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Drives the {@code sequenceForPosition(MAX_VALUE)} fast path's retry loop to completion by + * injecting a {@link Sequence} that always returns {@link Sequence#NOT_FOUND_RETRY}. Verifies + * that: + *

    + *
  1. the loop spins up to the configured retry budget, then logs a {@code retry loop + * exhausted} warning; + *
  2. the call falls through cleanly to the original indexed-lookup path and still produces + * the correct sequence -- i.e. exhaustion is recoverable, not an error. + *
+ * + *

The test temporarily shrinks {@link SCQIndexing#SEQUENCE_TRACKER_RETRY_BUDGET} so the loop + * exits in a handful of iterations rather than the production-default 128: each retry invokes + * the production-side {@code Thread.yield()} (after retry > 2), which would otherwise mean + * >120 OS scheduler hits per test run. + * + *

Injection is via direct write to the package-private {@link SCQIndexing#sequence} field; + * this is why the test lives in the {@code single} package. + */ +class SequenceForPositionRetryExhaustionTest extends IndexingTestCommon { + + private static final int TEST_RETRY_BUDGET = 8; + + @Test + void retryExhaustionLogsAndFallsThroughToIndexedLookup() throws StreamCorruptedException { + // One write is enough -- we only need a non-empty queue so the indexed-lookup + // fall-through has something to scan. + appender.writeText("entry"); + + SingleChronicleQueueStore store = appender.store; + SCQIndexing indexing = store.indexing; + long expectedSeq = queue.rollCycle().toSequenceNumber(appender.lastIndexAppended()); + + int originalBudget = SCQIndexing.SEQUENCE_TRACKER_RETRY_BUDGET; + SCQIndexing.SEQUENCE_TRACKER_RETRY_BUDGET = TEST_RETRY_BUDGET; + Sequence original = indexing.sequence; + AlwaysRetrySequence stuck = new AlwaysRetrySequence(); + indexing.sequence = stuck; + + Map recorded = Jvm.recordExceptions(); + try { + long observed = indexing.sequenceForPosition(appender, Long.MAX_VALUE, false); + assertEquals(expectedSeq, observed, + "After retry exhaustion, the indexed-lookup fall-through must still produce the latest sequence"); + } finally { + indexing.sequence = original; + SCQIndexing.SEQUENCE_TRACKER_RETRY_BUDGET = originalBudget; + Jvm.resetExceptionHandlers(); + } + + assertTrue(stuck.calls.get() >= TEST_RETRY_BUDGET, + "The fast path should have exercised the full retry budget; observed " + + stuck.calls.get() + " calls"); + + boolean exhaustionLogged = recorded.keySet().stream() + .anyMatch(k -> k.message() != null && k.message().contains("tracker-read retry loop exhausted")); + assertTrue(exhaustionLogged, + "Expected a 'tracker-read retry loop exhausted' log entry; observed messages: " + recorded.keySet()); + } + + /** + * Minimal {@link Sequence} stub that always reports {@link Sequence#NOT_FOUND_RETRY}, + * counting invocations so the test can assert the retry budget was fully exercised. + */ + private static final class AlwaysRetrySequence implements Sequence { + final AtomicInteger calls = new AtomicInteger(); + + @Override + public long getSequence(long forWritePosition) { + calls.incrementAndGet(); + return Sequence.NOT_FOUND_RETRY; + } + + @Override + public void setSequence(long sequence, long position) { + // not used in this test + } + + @Override + public long toIndex(long headerNumber, long sequence) { + return 0; + } + + @Override + public long toSequenceNumber(long index) { + return 0; + } + } +}