Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class PipeDataRegionAssigner implements Closeable {

private final String dataRegionId;

private volatile int listenToTsFileSourceCount = 0;
private volatile int listenToInsertNodeSourceCount = 0;

private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter();

public String getDataRegionId() {
Expand Down Expand Up @@ -194,12 +197,34 @@ private void assignToSource(
});
}

public void startAssignTo(final PipeRealtimeDataRegionSource extractor) {
matcher.register(extractor);
public synchronized void startAssignTo(final PipeRealtimeDataRegionSource source) {
matcher.register(source);
if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount++;
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount++;
}
logSourceAssignmentChange("registered", source);
}

public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource source) {
matcher.deregister(source);
if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount--;
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount--;
}
logSourceAssignmentChange("deregistered", source);
}

public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) {
matcher.deregister(extractor);
public boolean shouldListenToTsFile() {
return listenToTsFileSourceCount > 0;
}

public boolean shouldListenToInsertNode() {
return listenToInsertNodeSourceCount > 0;
}

public boolean notMoreSourceNeededToBeAssigned() {
Expand Down Expand Up @@ -236,4 +261,19 @@ public int getTsFileInsertionEventCount() {
public int getPipeHeartbeatEventCount() {
return eventCounter.getPipeHeartbeatEventCount();
}

private void logSourceAssignmentChange(
final String action, final PipeRealtimeDataRegionSource source) {
LOGGER.info(
"Pipe {}@{} {} realtime source on data region {} (listenToTsFile={}, listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={}, insertNodeSourceCount={}).",
source.getPipeName(),
source.getCreationTime(),
action,
dataRegionId,
source.isNeedListenToTsFile(),
source.isNeedListenToInsertNode(),
matcher.getRegisterCount(),
listenToTsFileSourceCount,
listenToInsertNodeSourceCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* PipeInsertionEventListener is a singleton in each data node.
Expand All @@ -48,23 +47,20 @@ public class PipeInsertionDataNodeListener {
private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner =
new ConcurrentHashMap<>();

private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
private final AtomicInteger listenToInsertNodeSourceCount = new AtomicInteger(0);

//////////////////////////// start & stop ////////////////////////////

public synchronized void startListenAndAssign(
final String dataRegionId, final PipeRealtimeDataRegionSource source) {
dataRegionId2Assigner
.computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId))
.startAssignTo(source);

if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount.incrementAndGet();
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount.incrementAndGet();
}
// Keep registration inside compute so the assigner is fully started before it becomes visible
// to concurrent listeners.
dataRegionId2Assigner.compute(
dataRegionId,
(id, assigner) -> {
final PipeDataRegionAssigner actualAssigner =
assigner == null ? new PipeDataRegionAssigner(dataRegionId) : assigner;
actualAssigner.startAssignTo(source);
return actualAssigner;
});
}

public synchronized void stopListenAndAssign(
Expand All @@ -79,13 +75,6 @@ public synchronized void stopListenAndAssign(

assigner.stopAssignTo(source);

if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount.decrementAndGet();
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount.decrementAndGet();
}

if (assigner.notMoreSourceNeededToBeAssigned()) {
// The removed assigner will is the same as the one referenced by the variable `assigner`
dataRegionId2Assigner.remove(dataRegionId);
Expand All @@ -104,14 +93,10 @@ public synchronized void stopListenAndAssign(

public void listenToTsFile(
final String dataRegionId, final TsFileResource tsFileResource, final boolean isLoaded) {
// We don't judge whether listenToTsFileSourceCount.get() == 0 here on purpose
// because sources may use tsfile events when some exceptions occur in the
// insert nodes listening process.

final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);

// only events from registered data region will be extracted
if (assigner == null) {
// only events from registered data region with tsfile listeners will be extracted
if (assigner == null || !assigner.shouldListenToTsFile()) {
return;
}

Expand All @@ -121,14 +106,10 @@ public void listenToTsFile(

public void listenToInsertNode(
final String dataRegionId, final InsertNode insertNode, final TsFileResource tsFileResource) {
if (listenToInsertNodeSourceCount.get() == 0) {
return;
}

final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);

// only events from registered data region will be extracted
if (assigner == null) {
// only events from registered data region with insert listeners will be extracted
if (assigner == null || !assigner.shouldListenToInsertNode()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
Expand All @@ -39,6 +40,7 @@
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
Expand All @@ -63,6 +65,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class PipeRealtimeExtractTest {
Expand Down Expand Up @@ -268,6 +271,52 @@ public void testRealtimeExtractProcess() {
}
}

@Test
public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws Exception {
try (final NoTsFileRealtimeDataRegionSource extractor =
new NoTsFileRealtimeDataRegionSource()) {
final PipeParameters parameters =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1);
}
});
final PipeTaskRuntimeConfiguration configuration =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(
"1",
1,
Integer.parseInt(dataRegion1),
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));

extractor.validate(new PipeParameterValidator(parameters));
extractor.customize(parameters, configuration);
extractor.start();

final File dataRegionDir =
new File(tsFileDir.getPath() + File.separator + dataRegion1 + File.separator + "0");
final boolean ignored = dataRegionDir.mkdirs();
final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile");
Assert.assertTrue(tsFile.createNewFile());

final TsFileResource resource = new TsFileResource(tsFile);
resource.updateStartTime(
new PlainDeviceID(String.join(TsFileConstant.PATH_SEPARATOR, device)), 0);
resource.close();

PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegion1, resource, false);

final long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1);
while (System.currentTimeMillis() < deadline
&& extractor.getObservedTsFileEventCount() == 0) {
TimeUnit.MILLISECONDS.sleep(10);
}

Assert.assertEquals(0, extractor.getObservedTsFileEventCount());
}
}

private Future<?> write2DataRegion(
final int writeNum, final String dataRegionId, final int startNum) {
final File dataRegionDir =
Expand Down Expand Up @@ -351,4 +400,36 @@ private Future<?> listen(
}
});
}

private static class NoTsFileRealtimeDataRegionSource extends PipeRealtimeDataRegionSource {

private final AtomicInteger observedTsFileEventCount = new AtomicInteger(0);

@Override
public Event supply() {
return null;
}

@Override
protected void doExtract(final PipeRealtimeEvent event) {
if (event.getEvent() instanceof TsFileInsertionEvent) {
observedTsFileEventCount.incrementAndGet();
}
event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(), false);
}

@Override
public boolean isNeedListenToTsFile() {
return false;
}

@Override
public boolean isNeedListenToInsertNode() {
return false;
}

private int getObservedTsFileEventCount() {
return observedTsFileEventCount.get();
}
}
}
Loading