Skip to content

Commit 3fbba9b

Browse files
committed
Pipe: Do not listen to tsFiles when no sources need (#17669)
* assigner * Update PipeDataRegionAssigner.java * fix (cherry picked from commit 906b86f)
1 parent ac09072 commit 3fbba9b

3 files changed

Lines changed: 139 additions & 37 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public class PipeDataRegionAssigner implements Closeable {
5656

5757
private final String dataRegionId;
5858

59+
private volatile int listenToTsFileSourceCount = 0;
60+
private volatile int listenToInsertNodeSourceCount = 0;
61+
5962
private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter();
6063

6164
public String getDataRegionId() {
@@ -194,12 +197,34 @@ private void assignToSource(
194197
});
195198
}
196199

197-
public void startAssignTo(final PipeRealtimeDataRegionSource extractor) {
198-
matcher.register(extractor);
200+
public synchronized void startAssignTo(final PipeRealtimeDataRegionSource source) {
201+
matcher.register(source);
202+
if (source.isNeedListenToTsFile()) {
203+
listenToTsFileSourceCount++;
204+
}
205+
if (source.isNeedListenToInsertNode()) {
206+
listenToInsertNodeSourceCount++;
207+
}
208+
logSourceAssignmentChange("registered", source);
209+
}
210+
211+
public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource source) {
212+
matcher.deregister(source);
213+
if (source.isNeedListenToTsFile()) {
214+
listenToTsFileSourceCount--;
215+
}
216+
if (source.isNeedListenToInsertNode()) {
217+
listenToInsertNodeSourceCount--;
218+
}
219+
logSourceAssignmentChange("deregistered", source);
199220
}
200221

201-
public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) {
202-
matcher.deregister(extractor);
222+
public boolean shouldListenToTsFile() {
223+
return listenToTsFileSourceCount > 0;
224+
}
225+
226+
public boolean shouldListenToInsertNode() {
227+
return listenToInsertNodeSourceCount > 0;
203228
}
204229

205230
public boolean notMoreSourceNeededToBeAssigned() {
@@ -236,4 +261,19 @@ public int getTsFileInsertionEventCount() {
236261
public int getPipeHeartbeatEventCount() {
237262
return eventCounter.getPipeHeartbeatEventCount();
238263
}
264+
265+
private void logSourceAssignmentChange(
266+
final String action, final PipeRealtimeDataRegionSource source) {
267+
LOGGER.info(
268+
"Pipe {}@{} {} realtime source on data region {} (listenToTsFile={}, listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={}, insertNodeSourceCount={}).",
269+
source.getPipeName(),
270+
source.getCreationTime(),
271+
action,
272+
dataRegionId,
273+
source.isNeedListenToTsFile(),
274+
source.isNeedListenToInsertNode(),
275+
matcher.getRegisterCount(),
276+
listenToTsFileSourceCount,
277+
listenToInsertNodeSourceCount);
278+
}
239279
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import java.util.concurrent.ConcurrentHashMap;
3232
import java.util.concurrent.ConcurrentMap;
33-
import java.util.concurrent.atomic.AtomicInteger;
3433

3534
/**
3635
* PipeInsertionEventListener is a singleton in each data node.
@@ -48,23 +47,20 @@ public class PipeInsertionDataNodeListener {
4847
private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner =
4948
new ConcurrentHashMap<>();
5049

51-
private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
52-
private final AtomicInteger listenToInsertNodeSourceCount = new AtomicInteger(0);
53-
5450
//////////////////////////// start & stop ////////////////////////////
5551

5652
public synchronized void startListenAndAssign(
5753
final String dataRegionId, final PipeRealtimeDataRegionSource source) {
58-
dataRegionId2Assigner
59-
.computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId))
60-
.startAssignTo(source);
61-
62-
if (source.isNeedListenToTsFile()) {
63-
listenToTsFileSourceCount.incrementAndGet();
64-
}
65-
if (source.isNeedListenToInsertNode()) {
66-
listenToInsertNodeSourceCount.incrementAndGet();
67-
}
54+
// Keep registration inside compute so the assigner is fully started before it becomes visible
55+
// to concurrent listeners.
56+
dataRegionId2Assigner.compute(
57+
dataRegionId,
58+
(id, assigner) -> {
59+
final PipeDataRegionAssigner actualAssigner =
60+
assigner == null ? new PipeDataRegionAssigner(dataRegionId) : assigner;
61+
actualAssigner.startAssignTo(source);
62+
return actualAssigner;
63+
});
6864
}
6965

7066
public synchronized void stopListenAndAssign(
@@ -79,13 +75,6 @@ public synchronized void stopListenAndAssign(
7975

8076
assigner.stopAssignTo(source);
8177

82-
if (source.isNeedListenToTsFile()) {
83-
listenToTsFileSourceCount.decrementAndGet();
84-
}
85-
if (source.isNeedListenToInsertNode()) {
86-
listenToInsertNodeSourceCount.decrementAndGet();
87-
}
88-
8978
if (assigner.notMoreSourceNeededToBeAssigned()) {
9079
// The removed assigner will is the same as the one referenced by the variable `assigner`
9180
dataRegionId2Assigner.remove(dataRegionId);
@@ -104,14 +93,10 @@ public synchronized void stopListenAndAssign(
10493

10594
public void listenToTsFile(
10695
final String dataRegionId, final TsFileResource tsFileResource, final boolean isLoaded) {
107-
// We don't judge whether listenToTsFileSourceCount.get() == 0 here on purpose
108-
// because sources may use tsfile events when some exceptions occur in the
109-
// insert nodes listening process.
110-
11196
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
11297

113-
// only events from registered data region will be extracted
114-
if (assigner == null) {
98+
// only events from registered data region with tsfile listeners will be extracted
99+
if (assigner == null || !assigner.shouldListenToTsFile()) {
115100
return;
116101
}
117102

@@ -121,14 +106,10 @@ public void listenToTsFile(
121106

122107
public void listenToInsertNode(
123108
final String dataRegionId, final InsertNode insertNode, final TsFileResource tsFileResource) {
124-
if (listenToInsertNodeSourceCount.get() == 0) {
125-
return;
126-
}
127-
128109
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
129110

130-
// only events from registered data region will be extracted
131-
if (assigner == null) {
111+
// only events from registered data region with insert listeners will be extracted
112+
if (assigner == null || !assigner.shouldListenToInsertNode()) {
132113
return;
133114
}
134115

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
2828
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
2929
import org.apache.iotdb.commons.utils.FileUtils;
30+
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
3031
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
3132
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
3233
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
@@ -39,6 +40,7 @@
3940
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
4041
import org.apache.iotdb.pipe.api.event.Event;
4142
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
43+
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
4244

4345
import org.apache.tsfile.common.constant.TsFileConstant;
4446
import org.apache.tsfile.enums.TSDataType;
@@ -63,6 +65,7 @@
6365
import java.util.concurrent.TimeUnit;
6466
import java.util.concurrent.TimeoutException;
6567
import java.util.concurrent.atomic.AtomicBoolean;
68+
import java.util.concurrent.atomic.AtomicInteger;
6669
import java.util.function.Function;
6770

6871
public class PipeRealtimeExtractTest {
@@ -268,6 +271,52 @@ public void testRealtimeExtractProcess() {
268271
}
269272
}
270273

274+
@Test
275+
public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws Exception {
276+
try (final NoTsFileRealtimeDataRegionSource extractor =
277+
new NoTsFileRealtimeDataRegionSource()) {
278+
final PipeParameters parameters =
279+
new PipeParameters(
280+
new HashMap<String, String>() {
281+
{
282+
put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1);
283+
}
284+
});
285+
final PipeTaskRuntimeConfiguration configuration =
286+
new PipeTaskRuntimeConfiguration(
287+
new PipeTaskSourceRuntimeEnvironment(
288+
"1",
289+
1,
290+
Integer.parseInt(dataRegion1),
291+
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
292+
293+
extractor.validate(new PipeParameterValidator(parameters));
294+
extractor.customize(parameters, configuration);
295+
extractor.start();
296+
297+
final File dataRegionDir =
298+
new File(tsFileDir.getPath() + File.separator + dataRegion1 + File.separator + "0");
299+
final boolean ignored = dataRegionDir.mkdirs();
300+
final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile");
301+
Assert.assertTrue(tsFile.createNewFile());
302+
303+
final TsFileResource resource = new TsFileResource(tsFile);
304+
resource.updateStartTime(
305+
new PlainDeviceID(String.join(TsFileConstant.PATH_SEPARATOR, device)), 0);
306+
resource.close();
307+
308+
PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegion1, resource, false);
309+
310+
final long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1);
311+
while (System.currentTimeMillis() < deadline
312+
&& extractor.getObservedTsFileEventCount() == 0) {
313+
TimeUnit.MILLISECONDS.sleep(10);
314+
}
315+
316+
Assert.assertEquals(0, extractor.getObservedTsFileEventCount());
317+
}
318+
}
319+
271320
private Future<?> write2DataRegion(
272321
final int writeNum, final String dataRegionId, final int startNum) {
273322
final File dataRegionDir =
@@ -351,4 +400,36 @@ private Future<?> listen(
351400
}
352401
});
353402
}
403+
404+
private static class NoTsFileRealtimeDataRegionSource extends PipeRealtimeDataRegionSource {
405+
406+
private final AtomicInteger observedTsFileEventCount = new AtomicInteger(0);
407+
408+
@Override
409+
public Event supply() {
410+
return null;
411+
}
412+
413+
@Override
414+
protected void doExtract(final PipeRealtimeEvent event) {
415+
if (event.getEvent() instanceof TsFileInsertionEvent) {
416+
observedTsFileEventCount.incrementAndGet();
417+
}
418+
event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(), false);
419+
}
420+
421+
@Override
422+
public boolean isNeedListenToTsFile() {
423+
return false;
424+
}
425+
426+
@Override
427+
public boolean isNeedListenToInsertNode() {
428+
return false;
429+
}
430+
431+
private int getObservedTsFileEventCount() {
432+
return observedTsFileEventCount.get();
433+
}
434+
}
354435
}

0 commit comments

Comments
 (0)