Skip to content

Commit 06d2766

Browse files
authored
Make some uts more stable (#17720)
1 parent 7dcad2b commit 06d2766

3 files changed

Lines changed: 74 additions & 118 deletions

File tree

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.tsfile.file.metadata.enums.TSEncoding;
5252
import org.apache.tsfile.read.reader.IPointReader;
5353
import org.apache.tsfile.write.schema.MeasurementSchema;
54+
import org.junit.BeforeClass;
5455
import org.junit.Test;
5556
import org.mockito.Mockito;
5657

@@ -62,6 +63,7 @@
6263
import java.util.List;
6364
import java.util.concurrent.Executor;
6465
import java.util.concurrent.ExecutorService;
66+
import java.util.concurrent.TimeUnit;
6567

6668
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
6769
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -72,8 +74,16 @@
7274

7375
public class FragmentInstanceExecutionTest {
7476

77+
@BeforeClass
78+
public static void setUpClass() {
79+
// Initialize DataNodeId before any test to avoid ExceptionInInitializerError when
80+
// Coordinator.<clinit> is triggered indirectly by async state-change listeners
81+
// (e.g., via QueryRelatedResourceMetricSet -> Coordinator -> QueryIdGenerator).
82+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
83+
}
84+
7585
@Test
76-
public void testFragmentInstanceExecution() {
86+
public void testFragmentInstanceExecution() throws InterruptedException {
7787
ExecutorService instanceNotificationExecutor =
7888
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
7989
try {
@@ -110,75 +120,84 @@ public void testFragmentInstanceExecution() {
110120
e.printStackTrace();
111121
fail(e.getMessage());
112122
} finally {
113-
instanceNotificationExecutor.shutdown();
123+
shutdownAndAwaitTermination(instanceNotificationExecutor);
114124
}
115125
}
116126

117127
@Test
118128
public void testTVListOwnerTransfer() throws InterruptedException {
119-
// Capture System.err to check for warning messages
129+
ExecutorService instanceNotificationExecutor =
130+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
131+
132+
// Capture System.out to check for warning messages. Set up the capture only after the
133+
// executor is created so that any thread-pool init logging does not pollute the captured
134+
// output. Capture must also be torn down before awaiting executor termination so any
135+
// late async log output from this test goes to the original stream, not the captured one.
120136
PrintStream systemOut = System.out;
121137
ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
122138
System.setOut(new PrintStream(logPrint));
123139

124140
try {
125-
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
126-
127-
ExecutorService instanceNotificationExecutor =
128-
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
129-
try {
130-
// TVList
131-
TVList tvList = buildTVList();
132-
133-
// FragmentInstance Context & Execution
134-
FragmentInstanceExecution execution1 =
135-
createFragmentInstanceExecution(1, instanceNotificationExecutor);
136-
FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext();
137-
fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
138-
tvList.getQueryContextSet().add(fragmentInstanceContext1);
139-
140-
FragmentInstanceExecution execution2 =
141-
createFragmentInstanceExecution(2, instanceNotificationExecutor);
142-
FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext();
143-
fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
144-
tvList.getQueryContextSet().add(fragmentInstanceContext2);
145-
146-
// mock flush's behavior
147-
fragmentInstanceContext1
148-
.getMemoryReservationContext()
149-
.reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize());
150-
tvList.setOwnerQuery(fragmentInstanceContext1);
151-
152-
fragmentInstanceContext1.decrementNumOfUnClosedDriver();
153-
fragmentInstanceContext2.decrementNumOfUnClosedDriver();
154-
155-
fragmentInstanceContext1.getStateMachine().finished();
156-
Thread.sleep(100);
157-
fragmentInstanceContext2.getStateMachine().finished();
158-
159-
assertTrue(execution1.getInstanceState().isDone());
160-
assertTrue(execution2.getInstanceState().isDone());
161-
Thread.sleep(100);
162-
} catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e) {
163-
fail(e.getMessage());
164-
} finally {
165-
instanceNotificationExecutor.shutdown();
166-
}
141+
// TVList
142+
TVList tvList = buildTVList();
143+
144+
// FragmentInstance Context & Execution
145+
FragmentInstanceExecution execution1 =
146+
createFragmentInstanceExecution(1, instanceNotificationExecutor);
147+
FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext();
148+
fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
149+
tvList.getQueryContextSet().add(fragmentInstanceContext1);
150+
151+
FragmentInstanceExecution execution2 =
152+
createFragmentInstanceExecution(2, instanceNotificationExecutor);
153+
FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext();
154+
fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
155+
tvList.getQueryContextSet().add(fragmentInstanceContext2);
156+
157+
// mock flush's behavior
158+
fragmentInstanceContext1
159+
.getMemoryReservationContext()
160+
.reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize());
161+
tvList.setOwnerQuery(fragmentInstanceContext1);
162+
163+
fragmentInstanceContext1.decrementNumOfUnClosedDriver();
164+
fragmentInstanceContext2.decrementNumOfUnClosedDriver();
165+
166+
fragmentInstanceContext1.getStateMachine().finished();
167+
Thread.sleep(100);
168+
fragmentInstanceContext2.getStateMachine().finished();
169+
170+
assertTrue(execution1.getInstanceState().isDone());
171+
assertTrue(execution2.getInstanceState().isDone());
172+
} catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e) {
173+
fail(e.getMessage());
167174
} finally {
168-
// Restore original System.out
175+
// Restore original System.out before waiting for the executor so that any late
176+
// async log output from listeners is written to the real stdout, not the captured buffer.
169177
System.setOut(systemOut);
178+
shutdownAndAwaitTermination(instanceNotificationExecutor);
170179

171180
// should not contain warn message: "The memory cost to be released is larger than the memory
172181
// cost of memory block"
173182
String capturedOutput = logPrint.toString();
174-
assertTrue(capturedOutput.isEmpty());
183+
assertFalse(
184+
"captured stdout should not contain memory-block release warning, but was:\n"
185+
+ capturedOutput,
186+
capturedOutput.contains(
187+
"The memory cost to be released is larger than the memory cost of memory block"));
175188
}
176189
}
177190

178-
@Test
179-
public void testTVListCloneForQuery() {
180-
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
191+
private static void shutdownAndAwaitTermination(ExecutorService executor)
192+
throws InterruptedException {
193+
executor.shutdown();
194+
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
195+
executor.shutdownNow();
196+
}
197+
}
181198

199+
@Test
200+
public void testTVListCloneForQuery() throws InterruptedException {
182201
ExecutorService instanceNotificationExecutor =
183202
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
184203

@@ -218,8 +237,7 @@ public void testTVListCloneForQuery() {
218237
Collections.emptyMap()));
219238
ReadOnlyMemChunk readOnlyMemChunk1 =
220239
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null);
221-
ReadOnlyMemChunk readOnlyMemChunk2 =
222-
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);
240+
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);
223241

224242
IPointReader pointReader = readOnlyMemChunk1.getPointReader();
225243
while (pointReader.hasNextTimeValuePair()) {
@@ -234,7 +252,7 @@ public void testTVListCloneForQuery() {
234252
| IllegalArgumentException e) {
235253
fail(e.getMessage());
236254
} finally {
237-
instanceNotificationExecutor.shutdown();
255+
shutdownAndAwaitTermination(instanceNotificationExecutor);
238256
}
239257
}
240258

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ public void singleDeviceViewOperatorTest() {
174174
int total = 0;
175175
while (singleDeviceViewOperator.isBlocked().isDone() && singleDeviceViewOperator.hasNext()) {
176176
TsBlock tsBlock = singleDeviceViewOperator.next();
177+
if (tsBlock == null || tsBlock.isEmpty()) {
178+
continue;
179+
}
177180
assertEquals(4, tsBlock.getValueColumnCount());
178181
total += tsBlock.getPositionCount();
179182
for (int i = 0; i < tsBlock.getPositionCount(); i++) {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
2121

22-
import org.apache.iotdb.commons.conf.IoTDBConstant;
2322
import org.apache.iotdb.commons.exception.MetadataException;
2423
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2524
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -40,9 +39,6 @@
4039
import org.junit.Test;
4140

4241
import java.io.IOException;
43-
import java.util.ArrayList;
44-
import java.util.Collections;
45-
import java.util.List;
4642
import java.util.concurrent.CountDownLatch;
4743
import java.util.concurrent.TimeUnit;
4844

@@ -64,67 +60,6 @@ public void tearDown() throws IOException, StorageEngineException {
6460
.setCompactionReadThroughputRate(compactionReadThroughputPerSec);
6561
}
6662

67-
@Test
68-
public void testManyAlignedDeviceTsFile() throws IOException, InterruptedException {
69-
List<String> deviceNames = new ArrayList<>();
70-
for (int i = 0; i < 100000; i++) {
71-
deviceNames.add("d" + i);
72-
}
73-
TsFileResource resource = createEmptyFileAndResource(true);
74-
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) {
75-
for (String device : deviceNames) {
76-
writer.startChunkGroup(device);
77-
writer.generateSimpleAlignedSeriesToCurrentDevice(
78-
Collections.singletonList("s0"),
79-
new TimeRange[] {new TimeRange(1, 2)},
80-
TSEncoding.PLAIN,
81-
CompressionType.LZ4);
82-
writer.endChunkGroup();
83-
}
84-
writer.endFile();
85-
}
86-
seqResources.add(resource);
87-
tsFileManager.add(resource, true);
88-
long tsFileSize = resource.getTsFileSize();
89-
Thread thread =
90-
new Thread(
91-
() -> {
92-
InnerSpaceCompactionTask task =
93-
new InnerSpaceCompactionTask(
94-
0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
95-
task.start();
96-
});
97-
thread.start();
98-
thread.join(TimeUnit.SECONDS.toMillis(30 + tsFileSize / IoTDBConstant.MB));
99-
}
100-
101-
@Test
102-
public void testManyNotAlignedDeviceTsFile() throws IOException {
103-
List<String> deviceNames = new ArrayList<>();
104-
for (int i = 0; i < 100000; i++) {
105-
deviceNames.add("d" + i);
106-
}
107-
TsFileResource resource = createEmptyFileAndResource(true);
108-
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) {
109-
for (String device : deviceNames) {
110-
writer.startChunkGroup(device);
111-
writer.generateSimpleNonAlignedSeriesToCurrentDevice(
112-
"s0", new TimeRange[] {new TimeRange(1, 2)}, TSEncoding.PLAIN, CompressionType.LZ4);
113-
writer.endChunkGroup();
114-
}
115-
writer.endFile();
116-
}
117-
seqResources.add(resource);
118-
tsFileManager.add(resource, true);
119-
long tsFileSize = resource.getTsFileSize();
120-
InnerSpaceCompactionTask task =
121-
new InnerSpaceCompactionTask(
122-
0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0);
123-
Assert.assertTrue(task.start());
124-
Assert.assertTrue(
125-
TimeUnit.SECONDS.toMillis(tsFileSize / IoTDBConstant.MB + 30) > task.getTimeCost());
126-
}
127-
12863
@Test
12964
public void testReadRateLimit() throws IOException, InterruptedException {
13065
int compactionReadOperationPerSec =

0 commit comments

Comments
 (0)