Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ private ScanResult linearScan0(@NotNull final Wire wire,
fromKnownIndex = lastIndex;
}

bytes.readPositionUnlimited(knownAddress);
bytes.readLimitToCapacity();
bytes.readPosition(knownAddress);

for (long i = fromKnownIndex; ; i++) {
try {
Expand Down Expand Up @@ -565,10 +566,26 @@ long linearScanByPosition(@NotNull final Wire wire,
final long indexOfNext,
final long startAddress,
boolean inclusive) throws EOFException {
return linearScanByPosition(wire, toPosition, indexOfNext, startAddress, inclusive, "linearScan by position");
}

/**
* Variant that lets the caller supply a description for the perf log so different scan
* origins (fast path from {@code writePosition}, fall-through from indexed anchor, etc.)
* are distinguishable in logs -- important because the whole point of the
* {@code MAX_VALUE} fast path is to <em>avoid</em> the brute-force fall-through scan, and
* we want to be able to verify that from log output rather than guess.
*/
long linearScanByPosition(@NotNull final Wire wire,
final long toPosition,
final long indexOfNext,
final long startAddress,
boolean inclusive,
String desc) throws EOFException {
long start = REPORT_LINEAR_SCAN ? System.nanoTime() : 0;
long index = linearScanByPosition0(wire, toPosition, indexOfNext, startAddress, inclusive);
if (REPORT_LINEAR_SCAN) {
printLinearScanTime(index, startAddress, start, "linearScan by position");
printLinearScanTime(index, startAddress, start, desc);
}
return index;
}
Expand Down Expand Up @@ -664,12 +681,13 @@ long linearScanByPosition0(@NotNull final Wire wire,
* @return The starting index for the scan.
*/
private long calculateInitialValue(long toPosition, long indexOfNext, long startAddress, Bytes<?> bytes, long lastAddress, long lastIndex) {
bytes.readLimit(bytes.capacity());
if (lastAddress > 0 && toPosition == lastAddress
&& lastIndex != Sequence.NOT_FOUND && lastIndex != Sequence.NOT_FOUND_RETRY) {
bytes.readPositionUnlimited(toPosition);
bytes.readPosition(toPosition);
return lastIndex - 1;
} else {
bytes.readPositionUnlimited(startAddress);
bytes.readPosition(startAddress);
return indexOfNext - 1;
}
}
Expand Down Expand Up @@ -709,6 +727,44 @@ public long nextEntryToBeIndexed() {
return nextEntryToBeIndexed.getVolatileValue();
}

long sequenceForMaxPosition(@NotNull ExcerptContext ec,
boolean inclusive) throws StreamCorruptedException {
int retry = 0;
for (; retry < 128; retry++) {
long lastWritePos = writePosition.getVolatileValue();
long latestSeq = sequence.getSequence(lastWritePos);
if (latestSeq >= 0) {
if (retry > 0) {
Jvm.perf().on(getClass(),
"sequenceForPosition(MAX_VALUE) tracker-read retried" + " " + retry + " times");
}
try {
return linearScanByPosition(ec.wireForIndex(), Long.MAX_VALUE, latestSeq, lastWritePos, inclusive,
"linearScan from writePosition (sequenceForPosition fast path)");
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
}
if (latestSeq == Sequence.NOT_FOUND)
break;

// NOT_FOUND_RETRY: writer raced. Spin-retry for the first couple of iterations
// (the race window is sub-microsecond); after that yield to let the writer make
// progress instead of starving them with our volatile reads.
if (retry > 2)
Thread.yield();
}
if (retry >= 128) {
// Retry budget exhausted in a row of NOT_FOUND_RETRY -- surface this clearly:
// it means we're about to do a brute-force indexed-anchor scan instead of the
// efficient writePos-anchored fast-path scan. The log makes a regression visible.
Jvm.perf().on(getClass(),
"sequenceForPosition(MAX_VALUE) tracker-read retry loop exhausted" + " after " + retry +
" attempts; falling through to indexed lookup");
}
return sequenceForPosition(ec, Long.MAX_VALUE, inclusive);
}

/**
* Returns the sequence number for a given position in the wire.
* If an exact match is found for the position, the corresponding index is returned;
Expand Down Expand Up @@ -773,7 +829,7 @@ long sequenceForPosition(@NotNull ExcerptContext ec,
}
try {
// Perform a linear scan if no exact match is found.
return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive);
return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive, "linearScan from indexed anchor (sequenceForPosition fall-through)");
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -976,13 +1032,16 @@ public long lastSequenceNumber(@NotNull ExcerptContext ec)
break;
try {
Wire wireForIndex = ec.wireForIndex();
return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true);
return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true,
"linearScan from writePosition (lastSequenceNumber tail-check)");
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
}
}

// The write-position retry budget has already been spent above. Fall through
// directly to the indexed lookup instead of restarting the MAX_VALUE fast path.
return sequenceForPosition(ec, Long.MAX_VALUE, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,9 @@ public MappedBytes bytes() {
public long sequenceForPosition(@NotNull final ExcerptContext ec, final long position, boolean inclusive) throws StreamCorruptedException {
throwExceptionIfClosed();

return indexing.sequenceForPosition(ec, position, inclusive);
return position == Long.MAX_VALUE
? indexing.sequenceForMaxPosition(ec, inclusive)
: indexing.sequenceForPosition(ec, position, inclusive);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void setUp() {

@After
public void tearDown() {
System.setSecurityManager(null);
if (Jvm.majorVersion() < 17)
System.setSecurityManager(null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import static net.openhft.chronicle.queue.rollcycles.TestRollCycles.TEST_SECONDLY;

Expand All @@ -39,9 +40,10 @@ public void tearDown() {
IOTools.deleteDirWithFiles(path);
}

@Test
@Test(timeout = 180_000L)
public void test() throws InterruptedException {
int threads = Math.min(64, Runtime.getRuntime().availableProcessors() * 4) - 1;
int processors = Runtime.getRuntime().availableProcessors();
int threads = Math.max(2, Math.min(64, processors * 2) - 1);
int messages = 100;

AtomicInteger count = new AtomicInteger();
Expand All @@ -55,12 +57,12 @@ public void test() throws InterruptedException {
.rollCycle(TEST_SECONDLY).build();
ExcerptAppender appender = writeQueue.createAppender()) {
for (int i = 0; i < messages; i++) {
long millis = System.currentTimeMillis() % 100;
if (millis > 1 && millis < 99) {
Jvm.pause(99 - millis);
}
Map<String, Object> map = new HashMap<>();
map.put("key", Thread.currentThread().getName() + " - " + i);

long nanos = System.nanoTime() % 100_000_000;
// should cause them all to try to wake and write at the same time
LockSupport.parkNanos(100_000_000 - nanos);
appender.writeMap(map);
count.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void setUp() {
queuePath = getTmpDir();
}

@Test
@Test(timeout = 180_000L)
public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() throws InterruptedException, TimeoutException, ExecutionException {
finishedNormally = false;
try (ChronicleQueue queue = createQueue(SYSTEM_TIME_PROVIDER)) {
Expand Down Expand Up @@ -125,7 +125,7 @@ public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() thr
finishedNormally = true;
}

@Test
@Test(timeout = 180_000L)
public void tailerResourcesCanBeReleasedManually() throws Exception {
FlakyTestRunner.builder(this::tailerResourcesCanBeReleasedManually0).build().run();
}
Expand Down