diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java index c28765e631..d09a889fc5 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java @@ -505,7 +505,8 @@ private ScanResult linearScan0(@NotNull final Wire wire, fromKnownIndex = lastIndex; } - bytes.readPositionUnlimited(knownAddress); + bytes.readLimitToCapacity(); + bytes.readPosition(knownAddress); for (long i = fromKnownIndex; ; i++) { try { @@ -565,10 +566,26 @@ long linearScanByPosition(@NotNull final Wire wire, final long indexOfNext, final long startAddress, boolean inclusive) throws EOFException { + return linearScanByPosition(wire, toPosition, indexOfNext, startAddress, inclusive, "linearScan by position"); + } + + /** + * Variant that lets the caller supply a description for the perf log so different scan + * origins (fast path from {@code writePosition}, fall-through from indexed anchor, etc.) + * are distinguishable in logs -- important because the whole point of the + * {@code MAX_VALUE} fast path is to avoid the brute-force fall-through scan, and + * we want to be able to verify that from log output rather than guess. + */ + long linearScanByPosition(@NotNull final Wire wire, + final long toPosition, + final long indexOfNext, + final long startAddress, + boolean inclusive, + String desc) throws EOFException { long start = REPORT_LINEAR_SCAN ? System.nanoTime() : 0; long index = linearScanByPosition0(wire, toPosition, indexOfNext, startAddress, inclusive); if (REPORT_LINEAR_SCAN) { - printLinearScanTime(index, startAddress, start, "linearScan by position"); + printLinearScanTime(index, startAddress, start, desc); } return index; } @@ -664,12 +681,13 @@ long linearScanByPosition0(@NotNull final Wire wire, * @return The starting index for the scan. */ private long calculateInitialValue(long toPosition, long indexOfNext, long startAddress, Bytes bytes, long lastAddress, long lastIndex) { + bytes.readLimit(bytes.capacity()); if (lastAddress > 0 && toPosition == lastAddress && lastIndex != Sequence.NOT_FOUND && lastIndex != Sequence.NOT_FOUND_RETRY) { - bytes.readPositionUnlimited(toPosition); + bytes.readPosition(toPosition); return lastIndex - 1; } else { - bytes.readPositionUnlimited(startAddress); + bytes.readPosition(startAddress); return indexOfNext - 1; } } @@ -709,6 +727,44 @@ public long nextEntryToBeIndexed() { return nextEntryToBeIndexed.getVolatileValue(); } + long sequenceForMaxPosition(@NotNull ExcerptContext ec, + boolean inclusive) throws StreamCorruptedException { + int retry = 0; + for (; retry < 128; retry++) { + long lastWritePos = writePosition.getVolatileValue(); + long latestSeq = sequence.getSequence(lastWritePos); + if (latestSeq >= 0) { + if (retry > 0) { + Jvm.perf().on(getClass(), + "sequenceForPosition(MAX_VALUE) tracker-read retried" + " " + retry + " times"); + } + try { + return linearScanByPosition(ec.wireForIndex(), Long.MAX_VALUE, latestSeq, lastWritePos, inclusive, + "linearScan from writePosition (sequenceForPosition fast path)"); + } catch (EOFException e) { + throw new UncheckedIOException(e); + } + } + if (latestSeq == Sequence.NOT_FOUND) + break; + + // NOT_FOUND_RETRY: writer raced. Spin-retry for the first couple of iterations + // (the race window is sub-microsecond); after that yield to let the writer make + // progress instead of starving them with our volatile reads. + if (retry > 2) + Thread.yield(); + } + if (retry >= 128) { + // Retry budget exhausted in a row of NOT_FOUND_RETRY -- surface this clearly: + // it means we're about to do a brute-force indexed-anchor scan instead of the + // efficient writePos-anchored fast-path scan. The log makes a regression visible. + Jvm.perf().on(getClass(), + "sequenceForPosition(MAX_VALUE) tracker-read retry loop exhausted" + " after " + retry + + " attempts; falling through to indexed lookup"); + } + return sequenceForPosition(ec, Long.MAX_VALUE, inclusive); + } + /** * Returns the sequence number for a given position in the wire. * If an exact match is found for the position, the corresponding index is returned; @@ -773,7 +829,7 @@ long sequenceForPosition(@NotNull ExcerptContext ec, } try { // Perform a linear scan if no exact match is found. - return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive); + return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive, "linearScan from indexed anchor (sequenceForPosition fall-through)"); } catch (EOFException e) { throw new UncheckedIOException(e); } @@ -976,13 +1032,16 @@ public long lastSequenceNumber(@NotNull ExcerptContext ec) break; try { Wire wireForIndex = ec.wireForIndex(); - return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true); + return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true, + "linearScan from writePosition (lastSequenceNumber tail-check)"); } catch (EOFException e) { throw new UncheckedIOException(e); } } } + // The write-position retry budget has already been spent above. Fall through + // directly to the indexed lookup instead of restarting the MAX_VALUE fast path. return sequenceForPosition(ec, Long.MAX_VALUE, false); } diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java index e8c123f345..08f7165032 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java @@ -377,7 +377,9 @@ public MappedBytes bytes() { public long sequenceForPosition(@NotNull final ExcerptContext ec, final long position, boolean inclusive) throws StreamCorruptedException { throwExceptionIfClosed(); - return indexing.sequenceForPosition(ec, position, inclusive); + return position == Long.MAX_VALUE + ? indexing.sequenceForMaxPosition(ec, inclusive) + : indexing.sequenceForPosition(ec, position, inclusive); } /** diff --git a/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java b/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java index 9e117f9640..daf1305192 100644 --- a/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java +++ b/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java @@ -43,7 +43,8 @@ public void setUp() { @After public void tearDown() { - System.setSecurityManager(null); + if (Jvm.majorVersion() < 17) + System.setSecurityManager(null); } @Test diff --git a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java index 35d49d73b9..3f71fe7fbb 100644 --- a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import static net.openhft.chronicle.queue.rollcycles.TestRollCycles.TEST_SECONDLY; @@ -39,9 +40,10 @@ public void tearDown() { IOTools.deleteDirWithFiles(path); } - @Test + @Test(timeout = 180_000L) public void test() throws InterruptedException { - int threads = Math.min(64, Runtime.getRuntime().availableProcessors() * 4) - 1; + int processors = Runtime.getRuntime().availableProcessors(); + int threads = Math.max(2, Math.min(64, processors * 2) - 1); int messages = 100; AtomicInteger count = new AtomicInteger(); @@ -55,12 +57,12 @@ public void test() throws InterruptedException { .rollCycle(TEST_SECONDLY).build(); ExcerptAppender appender = writeQueue.createAppender()) { for (int i = 0; i < messages; i++) { - long millis = System.currentTimeMillis() % 100; - if (millis > 1 && millis < 99) { - Jvm.pause(99 - millis); - } Map map = new HashMap<>(); map.put("key", Thread.currentThread().getName() + " - " + i); + + long nanos = System.nanoTime() % 100_000_000; + // should cause them all to try to wake and write at the same time + LockSupport.parkNanos(100_000_000 - nanos); appender.writeMap(map); count.incrementAndGet(); } diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java index f956db4aca..d17ff8b20b 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java @@ -89,7 +89,7 @@ public void setUp() { queuePath = getTmpDir(); } - @Test + @Test(timeout = 180_000L) public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() throws InterruptedException, TimeoutException, ExecutionException { finishedNormally = false; try (ChronicleQueue queue = createQueue(SYSTEM_TIME_PROVIDER)) { @@ -125,7 +125,7 @@ public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() thr finishedNormally = true; } - @Test + @Test(timeout = 180_000L) public void tailerResourcesCanBeReleasedManually() throws Exception { FlakyTestRunner.builder(this::tailerResourcesCanBeReleasedManually0).build().run(); }