-
Notifications
You must be signed in to change notification settings - Fork 567
Expand file tree
/
Copy pathChronicleRollingIssueTest.java
More file actions
116 lines (106 loc) · 4.13 KB
/
ChronicleRollingIssueTest.java
File metadata and controls
116 lines (106 loc) · 4.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
* Copyright 2013-2025 chronicle.software; SPDX-License-Identifier: Apache-2.0
*/
package net.openhft.chronicle.queue;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.annotation.RequiredForClient;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.core.util.Time;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static net.openhft.chronicle.queue.rollcycles.TestRollCycles.TEST_SECONDLY;
@RequiredForClient
public class ChronicleRollingIssueTest extends QueueTestCommon {
private String path;
@Override
@Before
public void threadDump() {
super.threadDump();
path = OS.getTarget() + "/" + getClass().getSimpleName() + "-" + Time.uniqueId();
}
@After
public void tearDown() {
IOTools.deleteDirWithFiles(path);
}
@Test
public void test() throws InterruptedException {
int threads = Math.min(64, Runtime.getRuntime().availableProcessors() * 4) - 1;
int messages = 100;
AtomicInteger count = new AtomicInteger();
StoreFileListener storeFileListener = (cycle, file) -> {
};
Runnable appendRunnable = () -> {
try (final ChronicleQueue writeQueue = ChronicleQueue
.singleBuilder(path)
.testBlockSize()
.storeFileListener(storeFileListener)
.rollCycle(TEST_SECONDLY).build();
ExcerptAppender appender = writeQueue.createAppender()) {
for (int i = 0; i < messages; i++) {
long millis = System.currentTimeMillis() & 63;
Jvm.pause(millis);
Map<String, Object> map = new HashMap<>();
map.put("key", Thread.currentThread().getName() + " - " + i);
appender.writeMap(map);
count.incrementAndGet();
}
}
};
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < threads; i++) {
Thread thread = new Thread(appendRunnable, "appender-" + i);
thread.start();
threadList.add(thread);
}
long start = System.currentTimeMillis();
long lastIndex = 0;
try (final ChronicleQueue queue = ChronicleQueue
.singleBuilder(path)
.testBlockSize()
.storeFileListener(storeFileListener)
.rollCycle(TEST_SECONDLY).build()) {
ExcerptTailer tailer = queue.createTailer();
int count2 = 0;
while (count2 < threads * messages) {
Map<String, Object> map = tailer.readMap();
long index = tailer.index();
if (map != null) {
count2++;
} else if (index >= 0) {
if (TEST_SECONDLY.toCycle(lastIndex) != TEST_SECONDLY.toCycle(index)) {
/*
// System.out.println("Wrote: " + count
+ " read: " + count2
+ " index: " + Long.toHexString(index));
*/
lastIndex = index;
}
}
if (System.currentTimeMillis() > start + 60000) {
throw new AssertionError("Wrote: " + count
+ " read: " + count2
+ " index: " + Long.toHexString(index));
}
}
} finally {
for (Thread thread : threadList) {
thread.interrupt();
thread.join(1000);
}
try {
IOTools.deleteDirWithFiles(path, 2);
} catch (IORuntimeException e) {
Jvm.debug().on(ChronicleRollingIssueTest.class, "Failed to clean up test directory", e);
}
}
}
}