Skip to content

Commit ab1bfb7

Browse files
committed
QUEUE-123 added tests to check for a stuck tailer condition
1 parent 4a9c058 commit ab1bfb7

1 file changed

Lines changed: 284 additions & 0 deletions

File tree

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
/*
2+
* Copyright 2013-2025 chronicle.software; SPDX-License-Identifier: Apache-2.0
3+
*/
4+
package net.openhft.chronicle.queue;
5+
6+
import net.openhft.chronicle.bytes.Bytes;
7+
import net.openhft.chronicle.core.time.SetTimeProvider;
8+
import net.openhft.chronicle.queue.impl.single.InternalAppender;
9+
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
10+
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
11+
import net.openhft.chronicle.wire.DocumentContext;
12+
import org.junit.Test;
13+
14+
import java.io.File;
15+
import java.util.concurrent.TimeUnit;
16+
17+
import static net.openhft.chronicle.queue.rollcycles.TestRollCycles.TEST_HOURLY;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
20+
21+
/**
22+
* Plan 2 (deterministic, white-box) reproduction of the "tailer stuck in the previous cycle"
23+
* failure analysed in TAILER_STUCK_AFTER_ROLL_DURING_BACKFILL.md.
24+
* <p>
25+
* <b>Root cause (confirmed by these tests):</b> a streaming {@link ExcerptTailer} crosses from one
26+
* cycle to the next <em>only</em> when it reads an EOF marker. See
27+
* {@code StoreTailer.inACycle2(...)}: the {@code NONE} case (no data, no EOF) just returns - the
28+
* design assumes <i>"appender will always write (or recover) EOF"</i>. The {@code cycle <
29+
* queue.lastCycle()} test is in the random-access {@code moveToIndex}/{@code toEnd} path only; it
30+
* does <b>not</b> apply to sequential reads.
31+
* <p>
32+
* Therefore if cycle N is left <b>without an EOF</b> (e.g. the roll to N+1 happened via the
33+
* replication back-fill / suppressed-roll path that never seals N), a tailer parks at the tip of N
34+
* with {@code isPresent()==false} forever - blind to the data already in N+1 - until something
35+
* writes/recovers N's EOF (as a healthy forward roll on restart does). {@code lastCycle} is
36+
* irrelevant to this.
37+
* <p>
38+
* <b>These are red bug-repro tests.</b> The {@code tailerShouldRead*} tests assert the DESIRED
39+
* behaviour (a streaming tailer reads the data already on disk in N+1) and therefore <b>FAIL</b>
40+
* against the current code; a fix that seals/recovers N's EOF should turn them green. The remaining
41+
* tests ({@code tailerRecovers...}, {@code tailerCrosses...}, {@code restart_thenForwardRoll...},
42+
* {@code toEnd_alreadyReaches...}) assert behaviour that is already correct and pass.
43+
*/
44+
public class TailerStuckOnUnsealedCycleTest extends QueueTestCommon {
45+
46+
private SingleChronicleQueue build(SetTimeProvider tp) {
47+
return build(getTmpDir(), tp);
48+
}
49+
50+
private SingleChronicleQueue build(File dir, SetTimeProvider tp) {
51+
return SingleChronicleQueueBuilder.binary(dir)
52+
.timeProvider(tp)
53+
.rollCycle(TEST_HOURLY)
54+
.build();
55+
}
56+
57+
/**
58+
* N has data and is NOT sealed (no EOF); N+1 has data. Built with out-of-order (backward)
59+
* writes so the forward roll never seals N - mirroring the replication back-fill path that
60+
* leaves a cycle un-EOF'd.
61+
*/
62+
private void writeUnsealedNWithDataInNPlus1(SingleChronicleQueue q, ExcerptAppender appender) {
63+
int currentCycle = q.cycle();
64+
RollCycle rc = q.rollCycle();
65+
// write N+1 first, so rolling *back* to N suppresses the EOF and leaves N open
66+
InternalAppender internalAppender = (InternalAppender) appender;
67+
internalAppender.writeBytes(rc.toIndex(currentCycle + 1, 0), Bytes.from("nPlus1-0"));
68+
internalAppender.writeBytes(rc.toIndex(currentCycle, 0), Bytes.from("n-0"));
69+
internalAppender.writeBytes(rc.toIndex(currentCycle, 1), Bytes.from("n-1"));
70+
}
71+
72+
/** Read forward until the first non-present document; return how many entries were read. */
73+
private int drain(ExcerptTailer tailer) {
74+
int read = 0;
75+
while (true) {
76+
try (DocumentContext dc = tailer.readingDocument()) {
77+
if (!dc.isPresent())
78+
return read;
79+
read++;
80+
}
81+
}
82+
}
83+
84+
// ---- the failure --------------------------------------------------------------------------
85+
86+
@Test
87+
public void tailerShouldReadNewCycle_whenPreviousCycleHasNoEof() {
88+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
89+
try (SingleChronicleQueue q = build(tp);
90+
ExcerptAppender appender = q.createAppender()) {
91+
writeUnsealedNWithDataInNPlus1(q, appender);
92+
93+
try (ExcerptTailer tailer = q.createTailer()) {
94+
// DESIRED behaviour - FAILS against the current code, which reads only 2 and parks
95+
// at the un-EOF'd cycle N. A streaming tailer should read N's two entries AND cross
96+
// into N+1, instead of being stranded by the missing EOF on N.
97+
assertEquals("tailer should read N's entries and cross into N+1", 3, drain(tailer));
98+
}
99+
}
100+
}
101+
102+
@Test
103+
public void tailerShouldReadNewCycle_withPreExistingDataInASingleCycle() {
104+
File dir = getTmpDir();
105+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
106+
107+
// 1) a queue that ALREADY EXISTS on disk, with data in a single cycle file (no roll, no EOF)
108+
int preCycle;
109+
try (SingleChronicleQueue q = build(dir, tp);
110+
ExcerptAppender appender = q.createAppender()) {
111+
preCycle = q.cycle();
112+
appender.writeBytes(Bytes.from("preexisting-0"));
113+
appender.writeBytes(Bytes.from("preexisting-1"));
114+
}
115+
116+
// 2) reopen the existing queue and perform the same backward / back-fill construction,
117+
// at a later cycle
118+
try (SingleChronicleQueue q = build(dir, tp);
119+
ExcerptAppender appender = q.createAppender()) {
120+
tp.advanceMillis(TimeUnit.HOURS.toMillis(1));
121+
assertTrue("run 2 must be a later cycle than the pre-existing one", q.cycle() > preCycle);
122+
writeUnsealedNWithDataInNPlus1(q, appender);
123+
124+
try (ExcerptTailer tailer = q.createTailer()) {
125+
// DESIRED behaviour - FAILS against the current code, which reads only 4 and parks
126+
// at the un-EOF'd cycle N. The tailer should read the 2 pre-existing entries, N's 2
127+
// entries, AND cross into N+1.
128+
assertEquals("tailer should read pre-existing + N + N+1", 5, drain(tailer));
129+
130+
tailer.direction(TailerDirection.BACKWARD).toEnd();
131+
try (DocumentContext dc = tailer.readingDocument()) {
132+
assertTrue(dc.isPresent());
133+
assertEquals("n-1", dc.wire().bytes().toString());
134+
}
135+
}
136+
}
137+
}
138+
139+
// ---- recovery: writing/recovering N's EOF un-sticks the tailer ----------------------------
140+
141+
@Test
142+
public void tailerRecovers_oncePreviousCyclesAreSealed() {
143+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
144+
try (SingleChronicleQueue q = build(tp);
145+
ExcerptAppender appender = q.createAppender()) {
146+
writeUnsealedNWithDataInNPlus1(q, appender);
147+
148+
try (ExcerptTailer stuck = q.createTailer()) {
149+
assertEquals(2, drain(stuck));
150+
}
151+
152+
// a healthy forward roll (as happens on restart once wall-clock has moved on) seals the
153+
// earlier cycles - here rolling to N+2 writes the EOFs for both N and N+1
154+
tp.advanceMillis(TimeUnit.HOURS.toMillis(2));
155+
appender.writeBytes(Bytes.from("nPlus2-0"));
156+
157+
try (ExcerptTailer healed = q.createTailer()) {
158+
// N:0, N:1, N+1:0, N+2:0
159+
assertEquals("after the EOFs are written the tailer reads every cycle", 4, drain(healed));
160+
}
161+
}
162+
}
163+
164+
// ---- control: a normally-sealed queue crosses cycles fine ---------------------------------
165+
166+
@Test
167+
public void tailerCrosses_whenCyclesAreSealed() {
168+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
169+
try (SingleChronicleQueue q = build(tp);
170+
ExcerptAppender appender = q.createAppender()) {
171+
int n = q.cycle();
172+
RollCycle rc = q.rollCycle();
173+
// forward writes: the roll into N+1 seals N with an EOF
174+
InternalAppender internalAppender = (InternalAppender) appender;
175+
internalAppender.writeBytes(rc.toIndex(n, 0), Bytes.from("n-0"));
176+
internalAppender.writeBytes(rc.toIndex(n, 1), Bytes.from("n-1"));
177+
internalAppender.writeBytes(rc.toIndex(n + 1, 0), Bytes.from("nPlus1-0"));
178+
179+
try (ExcerptTailer tailer = q.createTailer()) {
180+
assertEquals("a sealed cycle N lets the tailer cross into N+1", 3, drain(tailer));
181+
}
182+
}
183+
}
184+
185+
// ---- normaliseEOFs() should make the new-cycle data readable ------------------------------
186+
187+
@Test
188+
public void tailerShouldRead_afterNormaliseEOFs() {
189+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
190+
try (SingleChronicleQueue q = build(tp);
191+
ExcerptAppender appender = q.createAppender()) {
192+
// the backward / back-fill construction leaves the appender positioned ON cycle N
193+
writeUnsealedNWithDataInNPlus1(q, appender);
194+
195+
appender.normaliseEOFs();
196+
197+
try (ExcerptTailer tailer = q.createTailer()) {
198+
// DESIRED behaviour - FAILS against the current code: normaliseEOFs0 only seals
199+
// cycles strictly below min(queue.cycle(), appender.cycle()), so with the appender on
200+
// N it never seals N and the tailer still reads only 2. normaliseEOFs() should make
201+
// N+1 reachable.
202+
assertEquals("after normaliseEOFs the tailer should read N+1", 3, drain(tailer));
203+
}
204+
}
205+
}
206+
207+
// ---- does the directory listing (lastCycle) explain it? -----------------------------------
208+
209+
@Test
210+
public void toEnd_alreadyReachesNPlus1_inTheStuckState() {
211+
// In the stuck state lastCycle() is naturally N+1 - creating N+1 bumped the shared,
212+
// monotonic table-store highestCycle. So the directory listing is NOT stale, and the
213+
// random-access toEnd() path already lands in N+1 even while a STREAMING tailer is stuck
214+
// in N. i.e. for this failure the heal is the missing EOF, not a directory-listing refresh.
215+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
216+
try (SingleChronicleQueue q = build(tp);
217+
ExcerptAppender appender = q.createAppender()) {
218+
int n = q.cycle();
219+
writeUnsealedNWithDataInNPlus1(q, appender);
220+
221+
assertEquals("lastCycle already knows about N+1", n + 1, q.lastCycle());
222+
223+
try (ExcerptTailer streaming = q.createTailer()) {
224+
assertEquals("streaming tailer is stuck in N", 2, drain(streaming));
225+
}
226+
try (ExcerptTailer end = q.createTailer()) {
227+
end.toEnd();
228+
assertEquals("toEnd() lands in N+1, not N", n + 1, q.rollCycle().toCycle(end.index()));
229+
}
230+
}
231+
}
232+
233+
// ---- how restart actually heals it --------------------------------------------------------
234+
235+
@Test
236+
public void tailerShouldRead_afterRestart() {
237+
File dir = getTmpDir();
238+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
239+
240+
// run 1: create the stuck state, then "shut down"
241+
try (SingleChronicleQueue q = build(dir, tp);
242+
ExcerptAppender appender = q.createAppender()) {
243+
int n = q.cycle();
244+
writeUnsealedNWithDataInNPlus1(q, appender);
245+
}
246+
247+
// run 2: reopen (no forward roll). DESIRED behaviour - FAILS against the current code:
248+
// re-opening alone does not seal N (the constructor positions on N but writes no EOF, and
249+
// there is no forward-rolling write), so the tailer still reads only 2. A restarted reader
250+
// should see every entry already on disk (N and N+1).
251+
try (SingleChronicleQueue q = build(dir, tp);
252+
ExcerptTailer t = q.createTailer()) {
253+
assertEquals("after restart the tailer should read every entry on disk", 3, drain(t));
254+
}
255+
}
256+
257+
@Test
258+
public void restart_thenForwardRoll_sealsN_andTailerRecovers() {
259+
File dir = getTmpDir();
260+
SetTimeProvider tp = new SetTimeProvider(TimeUnit.HOURS.toMillis(100));
261+
262+
int n;
263+
try (SingleChronicleQueue q = build(dir, tp);
264+
ExcerptAppender appender = q.createAppender()) {
265+
n = q.cycle();
266+
writeUnsealedNWithDataInNPlus1(q, appender);
267+
try (ExcerptTailer t = q.createTailer()) {
268+
assertEquals(2, drain(t));
269+
}
270+
}
271+
272+
// run 2: wall-clock has moved on while we were down; the first live write rolls forward and
273+
// rollCycleTo writes the EOF on N (and N+1) -> the tailer can now cross
274+
tp.advanceMillis(TimeUnit.HOURS.toMillis(5));
275+
try (SingleChronicleQueue q = build(dir, tp);
276+
ExcerptAppender appender = q.createAppender()) {
277+
appender.writeBytes(Bytes.from("live-after-restart")); // forward roll seals N (and N+1)
278+
try (ExcerptTailer t = q.createTailer()) {
279+
assertTrue("after the post-restart forward roll the tailer crosses past N",
280+
drain(t) >= 3);
281+
}
282+
}
283+
}
284+
}

0 commit comments

Comments
 (0)