diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java
index c28765e631..d09a889fc5 100644
--- a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java
+++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java
@@ -505,7 +505,8 @@ private ScanResult linearScan0(@NotNull final Wire wire,
fromKnownIndex = lastIndex;
}
- bytes.readPositionUnlimited(knownAddress);
+ bytes.readLimitToCapacity();
+ bytes.readPosition(knownAddress);
for (long i = fromKnownIndex; ; i++) {
try {
@@ -565,10 +566,26 @@ long linearScanByPosition(@NotNull final Wire wire,
final long indexOfNext,
final long startAddress,
boolean inclusive) throws EOFException {
+ return linearScanByPosition(wire, toPosition, indexOfNext, startAddress, inclusive, "linearScan by position");
+ }
+
+ /**
+ * Variant that lets the caller supply a description for the perf log so different scan
+ * origins (fast path from {@code writePosition}, fall-through from indexed anchor, etc.)
+ * are distinguishable in logs -- important because the whole point of the
+ * {@code MAX_VALUE} fast path is to avoid the brute-force fall-through scan, and
+ * we want to be able to verify that from log output rather than guess.
+ */
+ long linearScanByPosition(@NotNull final Wire wire,
+ final long toPosition,
+ final long indexOfNext,
+ final long startAddress,
+ boolean inclusive,
+ String desc) throws EOFException {
long start = REPORT_LINEAR_SCAN ? System.nanoTime() : 0;
long index = linearScanByPosition0(wire, toPosition, indexOfNext, startAddress, inclusive);
if (REPORT_LINEAR_SCAN) {
- printLinearScanTime(index, startAddress, start, "linearScan by position");
+ printLinearScanTime(index, startAddress, start, desc);
}
return index;
}
@@ -664,12 +681,13 @@ long linearScanByPosition0(@NotNull final Wire wire,
* @return The starting index for the scan.
*/
private long calculateInitialValue(long toPosition, long indexOfNext, long startAddress, Bytes> bytes, long lastAddress, long lastIndex) {
+ bytes.readLimit(bytes.capacity());
if (lastAddress > 0 && toPosition == lastAddress
&& lastIndex != Sequence.NOT_FOUND && lastIndex != Sequence.NOT_FOUND_RETRY) {
- bytes.readPositionUnlimited(toPosition);
+ bytes.readPosition(toPosition);
return lastIndex - 1;
} else {
- bytes.readPositionUnlimited(startAddress);
+ bytes.readPosition(startAddress);
return indexOfNext - 1;
}
}
@@ -709,6 +727,44 @@ public long nextEntryToBeIndexed() {
return nextEntryToBeIndexed.getVolatileValue();
}
+ long sequenceForMaxPosition(@NotNull ExcerptContext ec,
+ boolean inclusive) throws StreamCorruptedException {
+ int retry = 0;
+ for (; retry < 128; retry++) {
+ long lastWritePos = writePosition.getVolatileValue();
+ long latestSeq = sequence.getSequence(lastWritePos);
+ if (latestSeq >= 0) {
+ if (retry > 0) {
+ Jvm.perf().on(getClass(),
+ "sequenceForPosition(MAX_VALUE) tracker-read retried" + " " + retry + " times");
+ }
+ try {
+ return linearScanByPosition(ec.wireForIndex(), Long.MAX_VALUE, latestSeq, lastWritePos, inclusive,
+ "linearScan from writePosition (sequenceForPosition fast path)");
+ } catch (EOFException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ if (latestSeq == Sequence.NOT_FOUND)
+ break;
+
+ // NOT_FOUND_RETRY: writer raced. Spin-retry for the first couple of iterations
+ // (the race window is sub-microsecond); after that yield to let the writer make
+ // progress instead of starving them with our volatile reads.
+ if (retry > 2)
+ Thread.yield();
+ }
+ if (retry >= 128) {
+ // Retry budget exhausted in a row of NOT_FOUND_RETRY -- surface this clearly:
+ // it means we're about to do a brute-force indexed-anchor scan instead of the
+ // efficient writePos-anchored fast-path scan. The log makes a regression visible.
+ Jvm.perf().on(getClass(),
+ "sequenceForPosition(MAX_VALUE) tracker-read retry loop exhausted" + " after " + retry +
+ " attempts; falling through to indexed lookup");
+ }
+ return sequenceForPosition(ec, Long.MAX_VALUE, inclusive);
+ }
+
/**
* Returns the sequence number for a given position in the wire.
* If an exact match is found for the position, the corresponding index is returned;
@@ -773,7 +829,7 @@ long sequenceForPosition(@NotNull ExcerptContext ec,
}
try {
// Perform a linear scan if no exact match is found.
- return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive);
+ return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive, "linearScan from indexed anchor (sequenceForPosition fall-through)");
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
@@ -976,13 +1032,16 @@ public long lastSequenceNumber(@NotNull ExcerptContext ec)
break;
try {
Wire wireForIndex = ec.wireForIndex();
- return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true);
+ return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true,
+ "linearScan from writePosition (lastSequenceNumber tail-check)");
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
}
}
+ // The write-position retry budget has already been spent above. Fall through
+ // directly to the indexed lookup instead of restarting the MAX_VALUE fast path.
return sequenceForPosition(ec, Long.MAX_VALUE, false);
}
diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java
index e8c123f345..08f7165032 100644
--- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java
+++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java
@@ -377,7 +377,9 @@ public MappedBytes bytes() {
public long sequenceForPosition(@NotNull final ExcerptContext ec, final long position, boolean inclusive) throws StreamCorruptedException {
throwExceptionIfClosed();
- return indexing.sequenceForPosition(ec, position, inclusive);
+ return position == Long.MAX_VALUE
+ ? indexing.sequenceForMaxPosition(ec, inclusive)
+ : indexing.sequenceForPosition(ec, position, inclusive);
}
/**
diff --git a/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java b/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java
index 9e117f9640..daf1305192 100644
--- a/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java
+++ b/src/test/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMainTest.java
@@ -43,7 +43,8 @@ public void setUp() {
@After
public void tearDown() {
- System.setSecurityManager(null);
+ if (Jvm.majorVersion() < 17)
+ System.setSecurityManager(null);
}
@Test
diff --git a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java
index 35d49d73b9..3f71fe7fbb 100644
--- a/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java
+++ b/src/test/java/net/openhft/chronicle/queue/ChronicleRollingIssueTest.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
import static net.openhft.chronicle.queue.rollcycles.TestRollCycles.TEST_SECONDLY;
@@ -39,9 +40,10 @@ public void tearDown() {
IOTools.deleteDirWithFiles(path);
}
- @Test
+ @Test(timeout = 180_000L)
public void test() throws InterruptedException {
- int threads = Math.min(64, Runtime.getRuntime().availableProcessors() * 4) - 1;
+ int processors = Runtime.getRuntime().availableProcessors();
+ int threads = Math.max(2, Math.min(64, processors * 2) - 1);
int messages = 100;
AtomicInteger count = new AtomicInteger();
@@ -55,12 +57,12 @@ public void test() throws InterruptedException {
.rollCycle(TEST_SECONDLY).build();
ExcerptAppender appender = writeQueue.createAppender()) {
for (int i = 0; i < messages; i++) {
- long millis = System.currentTimeMillis() % 100;
- if (millis > 1 && millis < 99) {
- Jvm.pause(99 - millis);
- }
Map map = new HashMap<>();
map.put("key", Thread.currentThread().getName() + " - " + i);
+
+ long nanos = System.nanoTime() % 100_000_000;
+ // should cause them all to try to wake and write at the same time
+ LockSupport.parkNanos(100_000_000 - nanos);
appender.writeMap(map);
count.incrementAndGet();
}
diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java
index f956db4aca..d17ff8b20b 100644
--- a/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java
+++ b/src/test/java/net/openhft/chronicle/queue/impl/single/AppenderFileHandleLeakTest.java
@@ -89,7 +89,7 @@ public void setUp() {
queuePath = getTmpDir();
}
- @Test
+ @Test(timeout = 180_000L)
public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() throws InterruptedException, TimeoutException, ExecutionException {
finishedNormally = false;
try (ChronicleQueue queue = createQueue(SYSTEM_TIME_PROVIDER)) {
@@ -125,7 +125,7 @@ public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() thr
finishedNormally = true;
}
- @Test
+ @Test(timeout = 180_000L)
public void tailerResourcesCanBeReleasedManually() throws Exception {
FlakyTestRunner.builder(this::tailerResourcesCanBeReleasedManually0).build().run();
}