Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,23 @@ See the respective `package-info.java` files for details.

== Chronicle Queue Versions and Remarkable Changes

=== `sequenceForPosition(MAX_VALUE)` now starts from `writePosition`

`ExcerptTailer.lastIndex()` (via `SCQIndexing.sequenceForPosition(Long.MAX_VALUE, ...)`)
previously always performed an indexed lookup first and then walked forward from the latest
indexed slot -- up to `indexSpacing` entries before `writePosition`. The new fast path consults
the in-memory `Sequence` tracker directly: when the tracker reports a consistent
`(writePosition, sequence)` pair the call walks forward from `writePosition` only.

This is functionally equivalent: the recovery walk past `writePosition` (used to detect orphan
or in-progress writes -- see `PartialUpdateTest`) is preserved, so partial-write recovery is
unchanged. The win is that we no longer scan the up-to-`indexSpacing` entries between the
latest indexed slot and `writePosition`, which were known-published and therefore redundant.

The tracker is read with up to 128 retries on `NOT_FOUND_RETRY` (writer mid-update) before
falling through to the original indexed-lookup path; `Jvm.perf()` logs whenever any retry was
needed, so the rare race is observable.

=== Changes from Version 4 to Version 5

In Chronicle Queue v5 tailers are now read-only, in Chronicle Queue v4 we had the concept of lazy indexing, where appenders would not write indexes but instead the indexing could be done by the tailer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,33 @@
// visible for testing
int linearScanCount;
int linearScanByPositionCount;
/** Maximum {@code NOT_FOUND_RETRY} attempts the {@code MAX_VALUE} fast path will spin
* before falling through to the indexed-lookup recovery path. Non-final and package-local
* so tests can shrink the budget and exercise exhaustion without forcing 128 stub calls
* + 125 {@link Thread#yield()} invocations per test run. */
static int SEQUENCE_TRACKER_RETRY_BUDGET = 128;

Check warning on line 79 in src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this field "SEQUENCE_TRACKER_RETRY_BUDGET" to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=OpenHFT_Chronicle-Queue&issues=AZ3e14eT9joHD3qlKVLL&open=AZ3e14eT9joHD3qlKVLL&pullRequest=1705
/** Spin this many iterations before {@link Thread#yield()}-ing in the retry loop; the
* writer race window is sub-microsecond so the first few retries should not yield. */
private static final int SEQUENCE_TRACKER_RETRY_YIELD_AFTER = 2;
// Public log markers emitted on the lastIndex() fast path. Tests / observers attach a perf
// exception handler (see Jvm.setPerfExceptionHandler) and count these messages to assert
// that retries are rare under contention and that the path never falls through to the
// brute-force indexed-anchor scan during a healthy run -- avoiding any per-instance counter
// fields whose only purpose is observability.
static final String LOG_TRACKER_RETRY_PREFIX =
"sequenceForPosition(MAX_VALUE) tracker-read retried";
static final String LOG_TRACKER_BRUTE_FORCE_FALLTHROUGH =
"sequenceForPosition(MAX_VALUE) tracker-read retry loop exhausted";
// Description tags passed to printLinearScanTime so each call site is distinguishable in
// perf logs (and so tests can match on a single source of truth rather than copying the
// string). The whole point of the fast path is to take SCAN_LABEL_FAST_PATH (or
// SCAN_LABEL_TAIL_CHECK), not SCAN_LABEL_FALL_THROUGH.
static final String SCAN_LABEL_FAST_PATH =
"linearScan from writePosition (sequenceForPosition fast path)";
static final String SCAN_LABEL_FALL_THROUGH =
"linearScan from indexed anchor (sequenceForPosition fall-through)";
static final String SCAN_LABEL_TAIL_CHECK =
"linearScan from writePosition (lastSequenceNumber tail-check)";
Collection<Closeable> closeables = new ArrayList<>();
private long lastScannedIndex = -1;

Expand Down Expand Up @@ -505,7 +532,8 @@
fromKnownIndex = lastIndex;
}

bytes.readPositionUnlimited(knownAddress);
bytes.readLimitToCapacity();
bytes.readPosition(knownAddress);

for (long i = fromKnownIndex; ; i++) {
try {
Expand Down Expand Up @@ -565,10 +593,26 @@
final long indexOfNext,
final long startAddress,
boolean inclusive) throws EOFException {
return linearScanByPosition(wire, toPosition, indexOfNext, startAddress, inclusive, "linearScan by position");
}

/**
* Variant that lets the caller supply a description for the perf log so different scan
* origins (fast path from {@code writePosition}, fall-through from indexed anchor, etc.)
* are distinguishable in logs -- important because the whole point of the
* {@code MAX_VALUE} fast path is to <em>avoid</em> the brute-force fall-through scan, and
* we want to be able to verify that from log output rather than guess.
*/
long linearScanByPosition(@NotNull final Wire wire,
final long toPosition,
final long indexOfNext,
final long startAddress,
boolean inclusive,
String desc) throws EOFException {
long start = REPORT_LINEAR_SCAN ? System.nanoTime() : 0;
long index = linearScanByPosition0(wire, toPosition, indexOfNext, startAddress, inclusive);
if (REPORT_LINEAR_SCAN) {
printLinearScanTime(index, startAddress, start, "linearScan by position");
printLinearScanTime(index, startAddress, start, desc);
}
return index;
}
Expand Down Expand Up @@ -664,12 +708,13 @@
* @return The starting index for the scan.
*/
private long calculateInitialValue(long toPosition, long indexOfNext, long startAddress, Bytes<?> bytes, long lastAddress, long lastIndex) {
bytes.readLimit(bytes.capacity());
if (lastAddress > 0 && toPosition == lastAddress
&& lastIndex != Sequence.NOT_FOUND && lastIndex != Sequence.NOT_FOUND_RETRY) {
bytes.readPositionUnlimited(toPosition);
bytes.readPosition(toPosition);
return lastIndex - 1;
} else {
bytes.readPositionUnlimited(startAddress);
bytes.readPosition(startAddress);
return indexOfNext - 1;
}
}
Expand Down Expand Up @@ -723,6 +768,49 @@
long sequenceForPosition(@NotNull ExcerptContext ec,
final long position,
boolean inclusive) throws StreamCorruptedException {
// Fast path for lastIndex() -- historically the dominant slow caller. Writers commit
// (writePosition, sequence) atomically into a single TwoLongValue. When the two halves
// agree we have the sequence at writePosition directly, so we can skip the indexed
// lookup (which walks back to the latest-indexed-anchor -- up to indexSpacing entries
// before writePos). We still walk forward from writePosition to recover any
// partial-write orphans / in-progress writes, matching the semantics of the indexed
// path. Falls through on writer race (NOT_FOUND_RETRY) and on tracker-uninitialised
// (NOT_FOUND), preserving the original behaviour when the optimisation can't be satisfied.
if (position == Long.MAX_VALUE) {
int retry = 0;
for (; retry < SEQUENCE_TRACKER_RETRY_BUDGET; retry++) {
long lastWritePos = writePosition.getVolatileValue();
long latestSeq = sequence.getSequence(lastWritePos);
if (latestSeq >= 0) {
if (retry > 0) {
Jvm.perf().on(getClass(),
LOG_TRACKER_RETRY_PREFIX + " " + retry + " times");
}
try {
return linearScanByPosition(ec.wireForIndex(), Long.MAX_VALUE, latestSeq, lastWritePos, inclusive,
SCAN_LABEL_FAST_PATH);
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
}
if (latestSeq == Sequence.NOT_FOUND)
break;

// NOT_FOUND_RETRY: writer raced. Spin-retry for the first couple of iterations
// (the race window is sub-microsecond); after that yield to let the writer make
// progress instead of starving them with our volatile reads.
if (retry > SEQUENCE_TRACKER_RETRY_YIELD_AFTER)
Thread.yield();
}
if (retry >= SEQUENCE_TRACKER_RETRY_BUDGET) {
// Retry budget exhausted in a row of NOT_FOUND_RETRY -- surface this clearly:
// it means we're about to do a brute-force indexed-anchor scan instead of the
// efficient writePos-anchored fast-path scan. The log makes a regression visible.
Jvm.perf().on(getClass(),
LOG_TRACKER_BRUTE_FORCE_FALLTHROUGH + " after " + retry +
" attempts; falling through to indexed lookup");
}
}
long indexOfNext = 0;
long lastKnownAddress = 0;
@NotNull Wire wire = ec.wireForIndex();
Expand Down Expand Up @@ -773,7 +861,7 @@
}
try {
// Perform a linear scan if no exact match is found.
return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive);
return linearScanByPosition(wire, position, indexOfNext, lastKnownAddress, inclusive, SCAN_LABEL_FALL_THROUGH);
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -976,7 +1064,8 @@
break;
try {
Wire wireForIndex = ec.wireForIndex();
return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true);
return wireForIndex == null ? sequence : linearScanByPosition(wireForIndex, Long.MAX_VALUE, sequence, address, true,
SCAN_LABEL_TAIL_CHECK);
} catch (EOFException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ public void test() throws InterruptedException {
.rollCycle(TEST_SECONDLY).build();
ExcerptAppender appender = writeQueue.createAppender()) {
for (int i = 0; i < messages; i++) {
long millis = System.currentTimeMillis() % 100;
if (millis > 1 && millis < 99) {
Jvm.pause(99 - millis);
}
long millis = System.currentTimeMillis() & 63;
Jvm.pause(millis);
Map<String, Object> map = new HashMap<>();
map.put("key", Thread.currentThread().getName() + " - " + i);
appender.writeMap(map);
Expand Down
Loading