Skip to content

Commit 2706001

Browse files
authored
Pipe: Do not wait for sealed tsFiles close at realtime (#17671)
1 parent 0d382d8 commit 2706001

2 files changed

Lines changed: 98 additions & 6 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent
9191
protected final boolean isGeneratedByPipe;
9292
protected final boolean isGeneratedByIoTConsensusV2;
9393
protected final boolean isGeneratedByHistoricalExtractor;
94+
// Realtime TsFile events are created after TsFileProcessor#endFile(), so the file is already
95+
// immutable even if TsFileResource status is still UNCLOSED.
96+
private final boolean isTsFileSealed;
9497
private final AtomicBoolean isClosed;
9598
private final AtomicReference<TsFileInsertionEventParser> eventParser;
9699

@@ -130,7 +133,8 @@ public PipeTsFileInsertionEvent(
130133
null,
131134
true,
132135
Long.MIN_VALUE,
133-
Long.MAX_VALUE);
136+
Long.MAX_VALUE,
137+
true);
134138
}
135139

136140
public PipeTsFileInsertionEvent(
@@ -153,6 +157,50 @@ public PipeTsFileInsertionEvent(
153157
final boolean skipIfNoPrivileges,
154158
final long startTime,
155159
final long endTime) {
160+
this(
161+
isTableModelEvent,
162+
databaseNameFromDataRegion,
163+
resource,
164+
tsFile,
165+
isWithMod,
166+
isLoaded,
167+
isGeneratedByHistoricalExtractor,
168+
tableNames,
169+
pipeName,
170+
creationTime,
171+
pipeTaskMeta,
172+
treePattern,
173+
tablePattern,
174+
userId,
175+
userName,
176+
cliHostname,
177+
skipIfNoPrivileges,
178+
startTime,
179+
endTime,
180+
false);
181+
}
182+
183+
private PipeTsFileInsertionEvent(
184+
final Boolean isTableModelEvent,
185+
final String databaseNameFromDataRegion,
186+
final TsFileResource resource,
187+
final File tsFile,
188+
final boolean isWithMod,
189+
final boolean isLoaded,
190+
final boolean isGeneratedByHistoricalExtractor,
191+
final Set<String> tableNames,
192+
final String pipeName,
193+
final long creationTime,
194+
final PipeTaskMeta pipeTaskMeta,
195+
final TreePattern treePattern,
196+
final TablePattern tablePattern,
197+
final String userId,
198+
final String userName,
199+
final String cliHostname,
200+
final boolean skipIfNoPrivileges,
201+
final long startTime,
202+
final long endTime,
203+
final boolean isTsFileSealed) {
156204
super(
157205
pipeName,
158206
creationTime,
@@ -186,6 +234,7 @@ public PipeTsFileInsertionEvent(
186234
this.isGeneratedByPipe = resource.isGeneratedByPipe();
187235
this.isGeneratedByIoTConsensusV2 = resource.isGeneratedByIoTConsensusV2();
188236
this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
237+
this.isTsFileSealed = isTsFileSealed;
189238
this.tableNames = tableNames;
190239

191240
isClosed = new AtomicBoolean(resource.isClosed());
@@ -242,6 +291,10 @@ public boolean waitForTsFileClose() throws InterruptedException {
242291
return true;
243292
}
244293

294+
if (isTsFileSealed) {
295+
return !resource.isEmpty();
296+
}
297+
245298
if (!isClosed.get()) {
246299
isClosed.set(resource.isClosed());
247300

@@ -452,7 +505,8 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
452505
cliHostname,
453506
skipIfNoPrivileges,
454507
startTime,
455-
endTime)
508+
endTime,
509+
isTsFileSealed)
456510
.bindTsFileDedupScopeID(tsFileDedupScopeID);
457511
}
458512

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,18 +242,56 @@ public void testTsFileDedupScopeIdIsPreservedForCleanupAndCopy() throws Exceptio
242242
}
243243
}
244244

245+
@Test(timeout = 5000)
246+
public void testRealtimeEventCanSkipWaitingForClosedStatusAfterTsFileSealed() throws Exception {
247+
final File tempDir = Files.createTempDirectory("pipeTsFileSealed").toFile();
248+
249+
try {
250+
final TsFileResource resource =
251+
createNonEmptyTsFileResource(tempDir, "realtime.tsfile", 1L, 1);
252+
Assert.assertFalse(resource.isClosed());
253+
Assert.assertFalse(resource.isEmpty());
254+
255+
final PipeTsFileInsertionEvent sourceEvent =
256+
new PipeTsFileInsertionEvent(false, "root.db", resource, false);
257+
Assert.assertTrue(sourceEvent.waitForTsFileClose());
258+
259+
final PipeTsFileInsertionEvent copiedEvent =
260+
sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
261+
"pipe", 1L, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE);
262+
Assert.assertTrue(copiedEvent.waitForTsFileClose());
263+
264+
copiedEvent.close();
265+
sourceEvent.close();
266+
} finally {
267+
FileUtils.deleteFileOrDirectory(tempDir);
268+
}
269+
}
270+
245271
private TsFileResource createSpyTsFileResource(
246272
final File tempDir, final String fileName, final long flushOrderId, final int dataRegionId)
247273
throws IOException {
274+
final TsFileResource resource =
275+
createNonEmptyTsFileResource(tempDir, fileName, flushOrderId, dataRegionId);
276+
final TsFileResource spyResource = Mockito.spy(resource);
277+
Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId();
278+
return spyResource;
279+
}
280+
281+
private TsFileResource createNonEmptyTsFileResource(
282+
final File tempDir, final String fileName, final long flushOrderId, final int dataRegionId)
283+
throws IOException {
248284
final File file = new File(tempDir, fileName);
249285
Assert.assertTrue(file.createNewFile());
250286

251287
final TsFileResource resource = new TsFileResource(file);
252288
resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
253-
254-
final TsFileResource spyResource = Mockito.spy(resource);
255-
Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId();
256-
return spyResource;
289+
final ITimeIndex timeIndex = new ArrayDeviceTimeIndex();
290+
final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create("root.db.d" + dataRegionId);
291+
timeIndex.putStartTime(deviceID, 1);
292+
timeIndex.putEndTime(deviceID, 1);
293+
resource.setTimeIndex(timeIndex);
294+
return resource;
257295
}
258296

259297
static class TestAccessControl implements AccessControl {

0 commit comments

Comments
 (0)