diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java index e0b941ffc4772..4e2871067931b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java @@ -104,7 +104,7 @@ public void testPrefixPattern() throws Exception { } @Test - public void testIotdbPattern() throws Exception { + public void testIoTDBPattern() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -127,8 +127,6 @@ public void testIotdbPattern() throws Exception { final Map connectorAttributes = new HashMap<>(); extractorAttributes.put("extractor.path", "root.**.d1.s*"); - // When path is set, pattern should be ignored - extractorAttributes.put("extractor.pattern", "root"); extractorAttributes.put("extractor.inclusion", "data.insert"); connectorAttributes.put("connector", "iotdb-thrift-connector"); @@ -158,7 +156,7 @@ public void testIotdbPattern() throws Exception { } @Test - public void testIotdbPatternWithLegacySyntax() throws Exception { + public void testIoTDBPatternWithLegacySyntax() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -209,4 +207,324 @@ public void testIotdbPatternWithLegacySyntax() throws Exception { expectedResSet); } } + + @Test + public void testMultiplePrefixPatternHistoricalData() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s"); + extractorAttributes.put("extractor.inclusion", "data.insert"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s, s1) values (1, 1, 1)", + "insert into root.db.d2(time, s) values (2, 2)", + "insert into root.db2.d1(time, s) values (3, 3)"), + null); + awaitUntilFlush(senderEnv); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,null,1.0,1.0,"); + expectedResSet.add("3,3.0,null,null,"); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.db2.**,root.db.**", + "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,", + expectedResSet); + } + } + + @Test + public void testMultipleIoTDBPatternHistoricalData() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*"); + extractorAttributes.put("extractor.inclusion", "data.insert"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s, s1) values (1, 1, 1)", + "insert into root.db.d2(time, s) values (2, 2)", + "insert into root.db2.d1(time, s, t) values (3, 3, 3)", + "insert into root.db3.d1(time, s) values (4, 4)"), + null); + awaitUntilFlush(senderEnv); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,null,null,1.0,1.0,null,"); + expectedResSet.add("2,null,null,null,null,2.0,"); + expectedResSet.add("3,3.0,3.0,null,null,null,"); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.db2.**,root.db.**", + "Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,", + expectedResSet); + } + } + + @Test + public void testMultipleHybridPatternHistoricalData() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.path", "root.db.d1.*"); + extractorAttributes.put("extractor.pattern", "root.db2.d1.s"); + extractorAttributes.put("extractor.inclusion", "data.insert"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s, s1) values (1, 1, 1)", + "insert into root.db2.d1(time, s) values (2, 2)", + "insert into root.db3.d1(time, s) values (3, 3)"), + null); + awaitUntilFlush(senderEnv); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,1.0,null,"); + expectedResSet.add("2,null,null,2.0,"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.db.**,root.db2.**", + "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,", + expectedResSet); + } + } + + @Test + public void testMultiplePrefixPatternRealtimeData() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s"); + extractorAttributes.put("extractor.inclusion", "data.insert"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s, s1) values (1, 1, 1)", + "insert into root.db.d2(time, s) values (2, 2)", + "insert into root.db2.d1(time, s) values (3, 3)"), + null); + awaitUntilFlush(senderEnv); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,null,1.0,1.0,"); + expectedResSet.add("3,3.0,null,null,"); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.db2.**,root.db.**", + "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,", + expectedResSet); + } + } + + @Test + public void testMultipleIoTDBPatternRealtimeData() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*"); + extractorAttributes.put("extractor.inclusion", "data.insert"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s, s1) values (1, 1, 1)", + "insert into root.db.d2(time, s) values (2, 2)", + "insert into root.db2.d1(time, s, t) values (3, 3, 3)", + "insert into root.db3.d1(time, s) values (4, 4)"), + null); + awaitUntilFlush(senderEnv); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,null,null,1.0,1.0,null,"); + expectedResSet.add("2,null,null,null,null,2.0,"); + expectedResSet.add("3,3.0,3.0,null,null,null,"); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.db2.**,root.db.**", + "Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,", + expectedResSet); + } + } + + @Test + public void testMultipleHybridPatternRealtimeData() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor.path", "root.db.d1.*"); + extractorAttributes.put("extractor.pattern", "root.db2.d1.s"); + extractorAttributes.put("extractor.inclusion", "data.insert"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s, s1) values (1, 1, 1)", + "insert into root.db2.d1(time, s) values (2, 2)", + "insert into root.db3.d1(time, s) values (3, 3)"), + null); + awaitUntilFlush(senderEnv); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,1.0,null,"); + expectedResSet.add("2,null,null,2.0,"); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.db.**,root.db2.**", + "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,", + expectedResSet); + } + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java index 9f0e6c64839e6..f5724eeed13b2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -120,7 +121,7 @@ public PipeConfigRegionSnapshotEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -130,7 +131,7 @@ public PipeConfigRegionSnapshotEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -203,7 +204,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -219,7 +220,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java index 861cc7931f672..ef3ca525d0dcd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; public class PipeConfigRegionWritePlanEvent extends PipeWritePlanEvent { @@ -50,7 +51,7 @@ public PipeConfigRegionWritePlanEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -61,7 +62,7 @@ public PipeConfigRegionWritePlanEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -80,7 +81,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -93,7 +94,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java index fc965624c1d86..314fe9705864a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java @@ -244,7 +244,8 @@ protected Optional trimRealtimeEventByPipePattern( final PipeWritePlanEvent event) { return parseConfigPlan( ((PipeConfigRegionWritePlanEvent) event).getConfigPhysicalPlan(), - treePattern, + // TODO: handle multiple patterns + (IoTDBTreePattern) treePatterns.get(0), tablePattern) .map( configPhysicalPlan -> @@ -289,7 +290,8 @@ protected boolean isTypeListened(final PipeWritePlanEvent event) { return isTypeListened( ((PipeConfigRegionWritePlanEvent) event).getConfigPhysicalPlan(), listenedTypeSet, - treePattern, + // TODO: handle multiple patterns + (IoTDBTreePattern) treePatterns.get(0), tablePattern); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index a4c9f0e8403cd..c815b1a7093b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -190,7 +190,7 @@ private void parseAndCollectEvent(final PipeDeleteDataNodeEvent deleteDataEvent) deleteDataEvent.getPipeName(), deleteDataEvent.getCreationTime(), deleteDataEvent.getPipeTaskMeta(), - deleteDataEvent.getTreePattern(), + deleteDataEvent.getTreePatterns(), deleteDataEvent.getTablePattern(), deleteDataEvent.getUserId(), deleteDataEvent.getUserName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java index 04db68aa3e816..854ae9518dcc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java @@ -27,6 +27,8 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.UserDefinedEvent; +import java.util.List; + public class UserDefinedEnrichedEvent extends EnrichedEvent { private final UserDefinedEvent userDefinedEvent; @@ -46,7 +48,7 @@ private UserDefinedEnrichedEvent( enrichedEvent.getPipeName(), enrichedEvent.getCreationTime(), enrichedEvent.getPipeTaskMeta(), - enrichedEvent.getTreePattern(), + enrichedEvent.getTreePatterns(), enrichedEvent.getTablePattern(), enrichedEvent.getUserId(), enrichedEvent.getUserName(), @@ -82,7 +84,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -94,7 +96,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java index 2641f522654df..9e568c2628340 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java @@ -28,6 +28,8 @@ import javax.validation.constraints.NotNull; +import java.util.List; + /** * The data model used to record the Event and the data model of the DataRegion corresponding to the * source data, so this type requires some specifications . @@ -61,7 +63,7 @@ protected PipeInsertionEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -77,7 +79,7 @@ protected PipeInsertionEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -97,7 +99,7 @@ protected PipeInsertionEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -111,7 +113,7 @@ protected PipeInsertionEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java index 46f07bd40feeb..202911a781515 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java @@ -39,6 +39,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.nio.ByteBuffer; +import java.util.List; import java.util.Optional; public class PipeDeleteDataNodeEvent extends EnrichedEvent implements SerializableEvent { @@ -62,7 +63,7 @@ public PipeDeleteDataNodeEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -73,7 +74,7 @@ public PipeDeleteDataNodeEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -126,7 +127,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -139,7 +140,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 3cacb5a9245ba..5a35cd8f62fc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.Objects; public class PipeHeartbeatEvent extends EnrichedEvent { @@ -125,7 +126,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java index 711b473a0869f..d63aa6e4aa38b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -121,7 +122,7 @@ public PipeSchemaRegionSnapshotEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -131,7 +132,7 @@ public PipeSchemaRegionSnapshotEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -208,7 +209,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -224,7 +225,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java index bc04f11ff3edb..67ca23b13b405 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java @@ -30,6 +30,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.nio.ByteBuffer; +import java.util.List; public class PipeSchemaRegionWritePlanEvent extends PipeWritePlanEvent { @@ -49,7 +50,7 @@ public PipeSchemaRegionWritePlanEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -60,7 +61,7 @@ public PipeSchemaRegionWritePlanEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -79,7 +80,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -92,7 +93,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java index 42afe2e5b6c96..34755e8d4fc65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java @@ -36,6 +36,7 @@ import org.apache.tsfile.utils.RamUsageEstimator; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -57,7 +58,7 @@ public PipeStatementInsertionEvent( String pipeName, long creationTime, PipeTaskMeta pipeTaskMeta, - TreePattern treePattern, + List treePatterns, TablePattern tablePattern, String userId, String userName, @@ -70,7 +71,7 @@ public PipeStatementInsertionEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -132,7 +133,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( String pipeName, long creationTime, PipeTaskMeta pipeTaskMeta, - TreePattern treePattern, + List treePatterns, TablePattern tablePattern, String userId, String userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 63cae75fbbd13..cc833f971b304 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -124,7 +124,7 @@ private PipeInsertNodeTabletInsertionEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -136,7 +136,7 @@ private PipeInsertNodeTabletInsertionEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -236,7 +236,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -251,7 +251,7 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -363,7 +363,11 @@ public boolean mayEventPathsOverlappedWithPattern() { if (insertNode instanceof InsertRowNode || insertNode instanceof InsertTabletNode) { final PartialPath devicePartialPath = insertNode.getTargetPath(); return Objects.isNull(devicePartialPath) - || treePattern.mayOverlapWithDevice(devicePartialPath.getIDeviceIDAsFullDevice()); + || treePatterns.stream() + .anyMatch( + treePattern -> + treePattern.mayOverlapWithDevice( + devicePartialPath.getIDeviceIDAsFullDevice())); } if (insertNode instanceof InsertRowsNode) { @@ -372,8 +376,13 @@ public boolean mayEventPathsOverlappedWithPattern() { .anyMatch( insertRowNode -> Objects.isNull(insertRowNode.getTargetPath()) - || treePattern.mayOverlapWithDevice( - insertRowNode.getTargetPath().getIDeviceIDAsFullDevice())); + || treePatterns.stream() + .anyMatch( + treePattern -> + treePattern.mayOverlapWithDevice( + insertRowNode + .getTargetPath() + .getIDeviceIDAsFullDevice()))); } return true; @@ -446,14 +455,30 @@ private List initEventParsers() { switch (node.getType()) { case INSERT_ROW: case INSERT_TABLET: - eventParsers.add( - new TabletInsertionEventTreePatternParser(pipeTaskMeta, this, node, treePattern)); + if (treePatterns.isEmpty()) { + eventParsers.add( + new TabletInsertionEventTreePatternParser(pipeTaskMeta, this, node, null)); + } else { + treePatterns.forEach( + treePattern -> + eventParsers.add( + new TabletInsertionEventTreePatternParser( + pipeTaskMeta, this, node, treePattern))); + } break; case INSERT_ROWS: for (final InsertRowNode insertRowNode : ((InsertRowsNode) node).getInsertRowNodeList()) { - eventParsers.add( - new TabletInsertionEventTreePatternParser( - pipeTaskMeta, this, insertRowNode, treePattern)); + if (treePatterns.isEmpty()) { + eventParsers.add( + new TabletInsertionEventTreePatternParser( + pipeTaskMeta, this, insertRowNode, null)); + } else { + treePatterns.forEach( + treePattern -> + eventParsers.add( + new TabletInsertionEventTreePatternParser( + pipeTaskMeta, this, insertRowNode, treePattern))); + } } break; case RELATIONAL_INSERT_ROW: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index adcef5128f52d..4672ccb259ac6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -44,10 +44,14 @@ import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.record.Tablet; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.stream.Collectors; public class PipeRawTabletInsertionEvent extends PipeInsertionEvent implements TabletInsertionEvent, ReferenceTrackableEvent, AutoCloseable { @@ -64,7 +68,7 @@ public class PipeRawTabletInsertionEvent extends PipeInsertionEvent private final PipeTabletMemoryBlock allocatedMemoryBlock; - private TabletInsertionEventParser eventParser; + private List eventParsers; private volatile ProgressIndex overridingProgressIndex; @@ -80,7 +84,7 @@ private PipeRawTabletInsertionEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -92,7 +96,7 @@ private PipeRawTabletInsertionEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -218,7 +222,7 @@ public PipeRawTabletInsertionEvent(final Tablet tablet, final boolean isAligned) @TestOnly public PipeRawTabletInsertionEvent( - final Tablet tablet, final boolean isAligned, final TreePattern treePattern) { + final Tablet tablet, final boolean isAligned, final List treePatterns) { this( null, null, @@ -231,7 +235,7 @@ public PipeRawTabletInsertionEvent( null, 0, null, - treePattern, + treePatterns, null, null, null, @@ -276,7 +280,10 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa // Actually release the occupied memory. tablet = null; - eventParser = null; + if (Objects.nonNull(eventParsers)) { + eventParsers.clear(); + eventParsers = null; + } // Update metrics of the source event if (needToReport && shouldReportOnCommit && Objects.nonNull(pipeName)) { @@ -333,7 +340,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -353,7 +360,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -406,13 +413,19 @@ public EnrichedEvent getSourceEvent() { @Override public Iterable processRowByRow( final BiConsumer consumer) { - return initEventParser().processRowByRow(consumer); + return initEventParsers().stream() + .map(tabletInsertionEventParser -> tabletInsertionEventParser.processRowByRow(consumer)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); } @Override public Iterable processTablet( final BiConsumer consumer) { - return initEventParser().processTablet(consumer); + return initEventParsers().stream() + .map(tabletInsertionEventParser -> tabletInsertionEventParser.processTablet(consumer)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); } /////////////////////////// convertToTablet /////////////////////////// @@ -425,21 +438,31 @@ public Tablet convertToTablet() { if (!shouldParseTimeOrPattern()) { return tablet; } - return initEventParser().convertToTablet(); + // TODO: handle multiple patterns + return initEventParsers().get(0).convertToTablet(); } /////////////////////////// event parser /////////////////////////// - private TabletInsertionEventParser initEventParser() { - if (eventParser == null) { - eventParser = + private List initEventParsers() { + if (Objects.isNull(eventParsers)) { + eventParsers = tablet.getDeviceId().startsWith("root.") - ? new TabletInsertionEventTreePatternParser( - pipeTaskMeta, this, tablet, isAligned, treePattern) - : new TabletInsertionEventTablePatternParser( - pipeTaskMeta, this, tablet, isAligned, tablePattern); + ? (treePatterns.isEmpty() + ? Collections.singletonList( + new TabletInsertionEventTreePatternParser( + pipeTaskMeta, this, tablet, isAligned, null)) + : treePatterns.stream() + .map( + treePattern -> + new TabletInsertionEventTreePatternParser( + pipeTaskMeta, this, tablet, isAligned, treePattern)) + .collect(Collectors.toList())) + : Collections.singletonList( + new TabletInsertionEventTablePatternParser( + pipeTaskMeta, this, tablet, isAligned, tablePattern)); } - return eventParser; + return eventParsers; } public long count() { @@ -480,8 +503,8 @@ public static boolean isTabletEmpty(final Tablet tablet) { @Override public String toString() { return String.format( - "PipeRawTabletInsertionEvent{tablet=%s, isAligned=%s, sourceEvent=%s, needToReport=%s, allocatedMemoryBlock=%s, eventParser=%s}", - tablet, isAligned, sourceEvent, needToReport, allocatedMemoryBlock, eventParser) + "PipeRawTabletInsertionEvent{tablet=%s, isAligned=%s, sourceEvent=%s, needToReport=%s, allocatedMemoryBlock=%s, eventParsers=%s}", + tablet, isAligned, sourceEvent, needToReport, allocatedMemoryBlock, eventParsers) + " - " + super.toString(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index 3b933b92b73b5..6691e7ce27958 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -105,7 +106,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java index bf3f2c97acb2f..bd44fa7b313f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java @@ -61,7 +61,7 @@ public PipeCompactedTsFileInsertionEvent( committerKey.getPipeName(), committerKey.getCreationTime(), anyOfOriginalEvents.getPipeTaskMeta(), - anyOfOriginalEvents.getTreePattern(), + anyOfOriginalEvents.getTreePatterns(), anyOfOriginalEvents.getTablePattern(), anyOfOriginalEvents.getUserId(), anyOfOriginalEvents.getUserName(), @@ -238,7 +238,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 3617b7347ad9c..9c8a4d9fbd091 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -54,15 +54,19 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; public class PipeTsFileInsertionEvent extends PipeInsertionEvent implements TsFileInsertionEvent, ReferenceTrackableEvent { @@ -84,7 +88,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent protected final boolean isGeneratedByPipeConsensus; protected final boolean isGeneratedByHistoricalExtractor; private final AtomicBoolean isClosed; - private final AtomicReference eventParser; + private final AtomicReference> eventParsers; // The point count of the TsFile. Used for metrics on PipeConsensus' receiver side. // May be updated after it is flushed. Should be negative if not set. @@ -133,7 +137,7 @@ public PipeTsFileInsertionEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -145,7 +149,7 @@ public PipeTsFileInsertionEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -211,7 +215,7 @@ public PipeTsFileInsertionEvent( // and can be sent. isClosed.set(resource.isClosed()); - this.eventParser = new AtomicReference<>(null); + this.eventParsers = new AtomicReference<>(Collections.emptyList()); addOnCommittedHook( () -> { @@ -403,7 +407,7 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -423,7 +427,7 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -503,7 +507,11 @@ public boolean mayEventPathsOverlappedWithPattern() { } try { - return getDeviceSet().stream().anyMatch(treePattern::mayOverlapWithDevice); + return getDeviceSet().stream() + .anyMatch( + device -> + treePatterns.stream() + .anyMatch(treePattern -> treePattern.mayOverlapWithDevice(device))); } catch (final Exception e) { LOGGER.info( "Pipe {}: failed to get devices from TsFile {}, extract it anyway", @@ -609,7 +617,43 @@ public Iterable toTabletInsertionEvents(final long timeout return Collections.emptyList(); } waitForResourceEnough4Parsing(timeoutMs); - return initEventParser().toTabletInsertionEvents(); + return () -> { + final Iterator parserIterator = initEventParsers().iterator(); + return new Iterator() { + private TsFileInsertionEventParser currentParser = null; + private Iterator currentEventIterator = Collections.emptyIterator(); + + private void closeCurrentParser() { + if (Objects.nonNull(currentParser)) { + currentParser.close(); + currentParser = null; + } + } + + @Override + public boolean hasNext() { + while (!currentEventIterator.hasNext() && parserIterator.hasNext()) { + closeCurrentParser(); + currentParser = parserIterator.next(); + currentEventIterator = currentParser.toTabletInsertionEvents().iterator(); + } + + if (!currentEventIterator.hasNext()) { + closeCurrentParser(); + } + + return currentEventIterator.hasNext(); + } + + @Override + public TabletInsertionEvent next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentEventIterator.next(); + } + }; + }; } catch (final Exception e) { close(); @@ -683,36 +727,47 @@ public boolean isGeneratedByHistoricalExtractor() { return isGeneratedByHistoricalExtractor; } - private TsFileInsertionEventParser initEventParser() { + private List initEventParsers() { try { - eventParser.compareAndSet( - null, - new TsFileInsertionEventParserProvider( - pipeName, - creationTime, - tsFile, - treePattern, - tablePattern, - startTime, - endTime, - pipeTaskMeta, - // Do not parse privilege if it should not be parsed - // To avoid renaming of the tsFile database - shouldParse4Privilege ? userName : null, - this) - .provide()); - return eventParser.get(); - } catch (final IOException e) { + eventParsers.compareAndSet( + Collections.emptyList(), + treePatterns.stream() + .map( + treePattern -> { + try { + // Call the method that throws a checked exception + return new TsFileInsertionEventParserProvider( + pipeName, + creationTime, + tsFile, + treePattern, + tablePattern, + startTime, + endTime, + pipeTaskMeta, + shouldParse4Privilege ? userName : null, + this) + .provide(); + } catch (final IOException e) { + // Wrap the checked exception in an unchecked one + throw new UncheckedIOException(e); + } + }) + .collect(Collectors.toList())); + return eventParsers.get(); + } catch (final UncheckedIOException e) { // Catch the unchecked wrapper close(); + // Unwrap the original IOException + final IOException cause = e.getCause(); final String errorMsg = String.format("Read TsFile %s error.", tsFile.getPath()); - LOGGER.warn(errorMsg, e); - throw new PipeException(errorMsg, e); + LOGGER.warn(errorMsg, cause); + throw new PipeException(errorMsg, cause); } } public long count(final boolean skipReportOnCommit) throws IOException { - AtomicLong count = new AtomicLong(); + final AtomicLong count = new AtomicLong(); if (shouldParseTime()) { try { @@ -730,19 +785,24 @@ public long count(final boolean skipReportOnCommit) throws IOException { } } - try (final TsFileInsertionPointCounter counter = - new TsFileInsertionPointCounter(tsFile, treePattern)) { - return counter.count(); + for (final TreePattern treePattern : treePatterns) { + try (final TsFileInsertionPointCounter counter = + new TsFileInsertionPointCounter(tsFile, treePattern)) { + count.addAndGet(counter.count()); + } } + return count.get(); } /** Release the resource of {@link TsFileInsertionEventParser}. */ @Override public void close() { - eventParser.getAndUpdate( - parser -> { - if (Objects.nonNull(parser)) { - parser.close(); + eventParsers.getAndUpdate( + parsers -> { + if (Objects.nonNull(parsers)) { + for (final TsFileInsertionEventParser parser : parsers) { + parser.close(); + } } return null; }); @@ -753,8 +813,8 @@ public void close() { @Override public String toString() { return String.format( - "PipeTsFileInsertionEvent{resource=%s, tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s, eventParser=%s}", - resource, tsFile, isLoaded, isGeneratedByPipe, isClosed.get(), eventParser) + "PipeTsFileInsertionEvent{resource=%s, tsFile=%s, isLoaded=%s, isGeneratedByPipe=%s, isClosed=%s, eventParsers=%s}", + resource, tsFile, isLoaded, isGeneratedByPipe, isClosed.get(), eventParsers) + " - " + super.toString(); } @@ -785,7 +845,7 @@ public PipeEventResource eventResourceBuilder() { this.isWithMod, this.modFile, this.sharedModFile, - this.eventParser); + this.eventParsers); } private static class PipeTsFileInsertionEventResource extends PipeEventResource { @@ -794,7 +854,7 @@ private static class PipeTsFileInsertionEventResource extends PipeEventResource private final boolean isWithMod; private final File modFile; private final File sharedModFile; // unused now - private final AtomicReference eventParser; + private final AtomicReference> eventParsers; private final String pipeName; private PipeTsFileInsertionEventResource( @@ -805,14 +865,14 @@ private PipeTsFileInsertionEventResource( final boolean isWithMod, final File modFile, final File sharedModFile, - final AtomicReference eventParser) { + final AtomicReference> eventParsers) { super(isReleased, referenceCount); this.pipeName = pipeName; this.tsFile = tsFile; this.isWithMod = isWithMod; this.modFile = modFile; this.sharedModFile = sharedModFile; - this.eventParser = eventParser; + this.eventParsers = eventParsers; } @Override @@ -825,10 +885,12 @@ protected void finalizeResource() { } // close event parser - eventParser.getAndUpdate( - parser -> { - if (Objects.nonNull(parser)) { - parser.close(); + eventParsers.getAndUpdate( + parsers -> { + if (Objects.nonNull(parsers)) { + for (final TsFileInsertionEventParser parser : parsers) { + parser.close(); + } } return null; }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index 9ee89189e1384..c21837214355d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -29,6 +29,7 @@ import org.apache.tsfile.file.metadata.IDeviceID; +import java.util.List; import java.util.Map; /** @@ -67,7 +68,7 @@ public PipeRealtimeEvent( final TsFileEpoch tsFileEpoch, final Map device2Measurements, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -82,7 +83,7 @@ public PipeRealtimeEvent( event != null ? event.getPipeName() : null, event != null ? event.getCreationTime() : 0, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -199,7 +200,7 @@ public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -212,7 +213,7 @@ public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -225,7 +226,7 @@ public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( // If null is not passed, the field will not be GCed and may cause OOM. null, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/DataRegionListeningFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/DataRegionListeningFilter.java index e2a15a64480cd..6c24e6dbae611 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/DataRegionListeningFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/DataRegionListeningFilter.java @@ -32,6 +32,7 @@ import org.apache.tsfile.utils.Pair; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -78,9 +79,13 @@ public static boolean shouldDatabaseBeListened( } else { final String databaseTreeModel = databaseRawName.startsWith("root.") ? databaseRawName : "root." + databaseRawName; - final TreePattern treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); - return treePattern.isTreeModelDataAllowedToBeCaptured() - && treePattern.mayOverlapWithDb(databaseTreeModel); + final List treePatterns = + TreePattern.parsePipePatternFromSourceParameters(parameters); + return treePatterns.stream() + .anyMatch( + treePattern -> + treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.mayOverlapWithDb(databaseTreeModel)); } } @@ -106,11 +111,15 @@ public static boolean shouldDataRegionBeListened( final String databaseTableModel = databaseRawName.startsWith("root.") ? databaseRawName.substring(5) : databaseRawName; - final TreePattern treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); + final List treePatterns = + TreePattern.parsePipePatternFromSourceParameters(parameters); final TablePattern tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters); - return treePattern.isTreeModelDataAllowedToBeCaptured() - && treePattern.mayOverlapWithDb(databaseTreeModel) + return treePatterns.stream() + .anyMatch( + treePattern -> + treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.mayOverlapWithDb(databaseTreeModel)) || tablePattern.isTableModelDataAllowedToBeCaptured() && tablePattern.matchesDatabase(databaseTableModel); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index b5fa3b2db40e6..fb672ae81ec2d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -236,7 +236,8 @@ public void validate(final PipeParameterValidator validator) throws Exception { EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE); // Validate tree pattern and table pattern - validatePattern(TreePattern.parsePipePatternFromSourceParameters(validator.getParameters())); + TreePattern.parsePipePatternFromSourceParameters(validator.getParameters()) + .forEach(this::validatePattern); // Validate extractor.history.enable and extractor.realtime.enable validator diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 868bb035c5ffa..d4f03d4670f4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -135,7 +135,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource private int dataRegionId; - private TreePattern treePattern; + private List treePatterns; private TablePattern tablePattern; private boolean isModelDetected = false; @@ -325,7 +325,7 @@ public void customize( dataRegionId = environment.getRegionId(); - treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); + treePatterns = TreePattern.parsePipePatternFromSourceParameters(parameters); tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters); final DataRegion dataRegion = @@ -338,7 +338,8 @@ public void customize( if (isTableModel) { isDbNameCoveredByPattern = tablePattern.coversDb(databaseName); } else { - isDbNameCoveredByPattern = treePattern.coversDb(databaseName); + isDbNameCoveredByPattern = + treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName)); } } } @@ -718,8 +719,11 @@ private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource reso ? (tablePattern.isTableModelDataAllowedToBeCaptured() && tablePattern.matchesDatabase(resource.getDatabaseName()) && tablePattern.matchesTable(deviceID.getTableName())) - : (treePattern.isTreeModelDataAllowedToBeCaptured() - && treePattern.mayOverlapWithDevice(deviceID)); + : (treePatterns.stream() + .anyMatch( + treePattern -> + treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.mayOverlapWithDevice(deviceID))); }); } @@ -734,8 +738,11 @@ private void detectModel(final TsFileResource resource, final IDeviceID deviceID isTableModel ? tablePattern.isTableModelDataAllowedToBeCaptured() && tablePattern.coversDb(databaseName) - : treePattern.isTreeModelDataAllowedToBeCaptured() - && treePattern.coversDb(databaseName); + : treePatterns.stream() + .allMatch( + treePattern -> + treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.coversDb(databaseName)); } private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) { @@ -858,7 +865,7 @@ private Event supplyTsFileEvent(final TsFileResource resource) { pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -917,7 +924,7 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) { pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 08cc91b232da5..dd1da9bd379d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -98,7 +98,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { protected boolean shouldExtractInsertion; protected boolean shouldExtractDeletion; - protected TreePattern treePattern; + protected List treePatterns; protected TablePattern tablePattern; private boolean isDbNameCoveredByPattern = false; @@ -226,7 +226,7 @@ public void customize( pipeID = pipeName + "_" + creationTime; taskID = pipeName + "_" + dataRegionId + "_" + creationTime; - treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters); + treePatterns = TreePattern.parsePipePatternFromSourceParameters(parameters); tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters); final DataRegion dataRegion = @@ -237,7 +237,8 @@ public void customize( if (PathUtils.isTableModelDatabase(databaseName)) { isDbNameCoveredByPattern = tablePattern.coversDb(databaseName); } else { - isDbNameCoveredByPattern = treePattern.coversDb(databaseName); + isDbNameCoveredByPattern = + treePatterns.stream().allMatch(treePattern -> treePattern.coversDb(databaseName)); } } } @@ -532,8 +533,8 @@ public final boolean shouldExtractDeletion() { return shouldExtractDeletion; } - public final TreePattern getTreePattern() { - return treePattern; + public final List getTreePatterns() { + return treePatterns; } public final TablePattern getTablePattern() { @@ -629,7 +630,7 @@ public String toString() { .add("pipeTaskMeta", pipeTaskMeta) .add("shouldExtractInsertion", shouldExtractInsertion) .add("shouldExtractDeletion", shouldExtractDeletion) - .add("treePattern", treePattern) + .add("treePatterns", treePatterns) .add("tablePattern", tablePattern) .add("isDbNameCoveredByPattern", isDbNameCoveredByPattern) .add("realtimeDataExtractionStartTime", realtimeDataExtractionStartTime) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 8ea973fbf0a3a..8cc7bab78f7ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -167,7 +167,7 @@ private void assignToExtractor( extractor.getPipeName(), extractor.getCreationTime(), extractor.getPipeTaskMeta(), - extractor.getTreePattern(), + extractor.getTreePatterns(), extractor.getTablePattern(), String.valueOf(extractor.getUserId()), extractor.getUserName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java index 6e675ed0fdd1a..9f1811f553b9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -230,22 +231,23 @@ protected void matchTreeModelEvent( return; } - final TreePattern pattern = source.getTreePattern(); - if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(device)) { - // The pattern can match all measurements of the device. - matchedSources.add(source); - } else { - for (final String measurement : measurements) { - // Ignore null measurement for partial insert - if (measurement == null) { - continue; - } - - if (pattern.matchesMeasurement(device, measurement)) { - matchedSources.add(source); - // There would be no more matched sources because the measurements are - // unique - break; + for (final TreePattern pattern : source.getTreePatterns()) { + if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(device)) { + // The pattern can match all measurements of the device. + matchedSources.add(source); + } else { + for (final String measurement : measurements) { + // Ignore null measurement for partial insert + if (measurement == null) { + continue; + } + + if (pattern.matchesMeasurement(device, measurement)) { + matchedSources.add(source); + // There would be no more matched sources because the measurements are + // unique + break; + } } } } @@ -262,10 +264,13 @@ protected Set filterSourcesByDevice(final IDeviceI continue; } - final TreePattern treePattern = source.getTreePattern(); - if (Objects.isNull(treePattern) - || (treePattern.isTreeModelDataAllowedToBeCaptured() - && treePattern.mayOverlapWithDevice(device))) { + final List treePatterns = source.getTreePatterns(); + if (Objects.isNull(treePatterns) + || treePatterns.stream() + .anyMatch( + treePattern -> + (treePattern.isTreeModelDataAllowedToBeCaptured() + && treePattern.mayOverlapWithDevice(device)))) { filteredSources.add(source); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java index 6dbb8e1110bca..1b47a194dff98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent; @@ -216,7 +217,10 @@ protected Optional trimRealtimeEventByPrivilege( protected Optional trimRealtimeEventByPipePattern( final PipeWritePlanEvent event) { return TREE_PATTERN_PARSE_VISITOR - .process(((PipeSchemaRegionWritePlanEvent) event).getPlanNode(), treePattern) + .process( + ((PipeSchemaRegionWritePlanEvent) event).getPlanNode(), + // TODO: handle multiple patterns + (IoTDBTreePattern) treePatterns.get(0)) .flatMap( planNode -> TABLE_PATTERN_PARSE_VISITOR diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 8516a9900e63e..af51fc2229471 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -40,6 +40,7 @@ import java.time.LocalDate; import java.util.Arrays; +import java.util.Collections; public class PipeTabletInsertionEventTest { @@ -280,14 +281,16 @@ public void convertToTabletForTest() { Assert.assertFalse(isAligned2); PipeRawTabletInsertionEvent event3 = - new PipeRawTabletInsertionEvent(tablet1, false, new PrefixTreePattern(pattern)); + new PipeRawTabletInsertionEvent( + tablet1, false, Collections.singletonList(new PrefixTreePattern(pattern))); Tablet tablet3 = event3.convertToTablet(); boolean isAligned3 = event3.isAligned(); Assert.assertEquals(tablet1, tablet3); Assert.assertFalse(isAligned3); PipeRawTabletInsertionEvent event4 = - new PipeRawTabletInsertionEvent(tablet2, false, new PrefixTreePattern(pattern)); + new PipeRawTabletInsertionEvent( + tablet2, false, Collections.singletonList(new PrefixTreePattern(pattern))); Tablet tablet4 = event4.convertToTablet(); boolean isAligned4 = event4.isAligned(); Assert.assertEquals(tablet2, tablet4); @@ -313,14 +316,16 @@ public void convertToAlignedTabletForTest() { Assert.assertTrue(isAligned2); PipeRawTabletInsertionEvent event3 = - new PipeRawTabletInsertionEvent(tablet1, true, new PrefixTreePattern(pattern)); + new PipeRawTabletInsertionEvent( + tablet1, true, Collections.singletonList(new PrefixTreePattern(pattern))); Tablet tablet3 = event3.convertToTablet(); boolean isAligned3 = event3.isAligned(); Assert.assertEquals(tablet1, tablet3); Assert.assertTrue(isAligned3); PipeRawTabletInsertionEvent event4 = - new PipeRawTabletInsertionEvent(tablet2, true, new PrefixTreePattern(pattern)); + new PipeRawTabletInsertionEvent( + tablet2, true, Collections.singletonList(new PrefixTreePattern(pattern))); Tablet tablet4 = event4.convertToTablet(); boolean isAligned4 = event4.isAligned(); Assert.assertEquals(tablet2, tablet4); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java index 61d5232a3d858..f6d08c71b9ab1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java @@ -181,7 +181,7 @@ public void testCachedMatcher() throws Exception { public static class PipeRealtimeDataRegionFakeSource extends PipeRealtimeDataRegionSource { public PipeRealtimeDataRegionFakeSource() { - treePattern = new PrefixTreePattern(null); + treePatterns = Collections.singletonList(new PrefixTreePattern(null)); } @Override @@ -201,13 +201,13 @@ protected void doExtract(final PipeRealtimeEvent event) { match[0] = match[0] || (k + TsFileConstant.PATH_SEPARATOR + s) - .startsWith(getTreePattern().getPattern()); + .startsWith(getTreePatterns().get(0).getPattern()); } } else { match[0] = match[0] - || (getTreePattern().getPattern().startsWith(k.toString()) - || k.toString().startsWith(getTreePattern().getPattern())); + || (getTreePatterns().get(0).getPattern().startsWith(k.toString()) + || k.toString().startsWith(getTreePatterns().get(0).getPattern())); } }); Assert.assertTrue(match[0]); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java index dd0558bc23805..fd1853fca1b74 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java @@ -82,6 +82,8 @@ public void testIoTDBDataRegionExtractorWithPattern() { Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root")); Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.`a-b`")); Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b")); + Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b,root.db1.`a,b`.**")); } public Exception testIoTDBDataRegionExtractorWithPattern(final String pattern) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index 7eef37ea3a618..33b78b85667ea 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java @@ -27,8 +27,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Objects; +import java.util.function.Function; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; @@ -68,48 +72,115 @@ public boolean isRoot() { * * @return The interpreted {@link TreePattern} which is not {@code null}. */ - public static TreePattern parsePipePatternFromSourceParameters( + public static List parsePipePatternFromSourceParameters( final PipeParameters sourceParameters) { final boolean isTreeModelDataAllowedToBeCaptured = isTreeModelDataAllowToBeCaptured(sourceParameters); final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); + final String pattern = + sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); - // 1. If "source.path" is specified, it will be interpreted as an IoTDB-style path, - // ignoring the other 2 parameters. - if (path != null) { - return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, path); + // 1. If both "source.path" and "source.pattern" are specified, their union will be used. + if (path != null && pattern != null) { + final List result = new ArrayList<>(); + // Parse "source.path" as IoTDB-style path. + result.addAll( + parseMultiplePatterns( + path, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p))); + // Parse "source.pattern" using the helper method. + result.addAll( + parsePatternsFromPatternParameter( + pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured)); + return result; } - final String pattern = - sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); + // 2. If only "source.path" is specified, it will be interpreted as an IoTDB-style path. + if (path != null) { + return parseMultiplePatterns( + path, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + } - // 2. Otherwise, If "source.pattern" is specified, it will be interpreted - // according to "source.pattern.format". + // 3. If only "source.pattern" is specified, parse it using the helper method. if (pattern != null) { - final String patternFormat = - sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY, SOURCE_PATTERN_FORMAT_KEY); + return parsePatternsFromPatternParameter( + pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured); + } - // If "source.pattern.format" is not specified, use prefix format by default. - if (patternFormat == null) { - return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, pattern); - } + // 4. If neither "source.path" nor "source.pattern" is specified, + // this pipe source will match all data. + return Collections.singletonList( + new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, null)); + } + + /** + * A private helper method to parse a list of {@link TreePattern}s from the "pattern" parameter, + * considering its "format". + * + * @param pattern The pattern string to parse. + * @param sourceParameters The source parameters to read the format from. + * @param isTreeModelDataAllowedToBeCaptured A boolean flag passed to the TreePattern constructor. + * @return A list of parsed {@link TreePattern}s. + */ + private static List parsePatternsFromPatternParameter( + final String pattern, + final PipeParameters sourceParameters, + final boolean isTreeModelDataAllowedToBeCaptured) { + final String patternFormat = + sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY, SOURCE_PATTERN_FORMAT_KEY); + + // If "source.pattern.format" is not specified, use prefix format by default. + if (patternFormat == null) { + return parseMultiplePatterns( + pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + } + + switch (patternFormat.toLowerCase()) { + case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE: + return parseMultiplePatterns( + pattern, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE: + return parseMultiplePatterns( + pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + default: + LOGGER.info( + "Unknown pattern format: {}, use prefix matching format by default.", patternFormat); + return parseMultiplePatterns( + pattern, p -> new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p)); + } + } + + private static List parseMultiplePatterns( + final String pattern, final Function patternSupplier) { + if (pattern.isEmpty()) { + return Collections.singletonList(patternSupplier.apply(pattern)); + } - switch (patternFormat.toLowerCase()) { - case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE: - return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, pattern); - case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE: - return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, pattern); - default: - LOGGER.info( - "Unknown pattern format: {}, use prefix matching format by default.", patternFormat); - return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, pattern); + final List patterns = new ArrayList<>(); + final StringBuilder currentPattern = new StringBuilder(); + boolean inBackticks = false; + + for (final char c : pattern.toCharArray()) { + if (c == '`') { + inBackticks = !inBackticks; + currentPattern.append(c); + } else if (c == ',' && !inBackticks) { + final String singlePattern = currentPattern.toString().trim(); + if (!singlePattern.isEmpty()) { + patterns.add(patternSupplier.apply(singlePattern)); + } + currentPattern.setLength(0); + } else { + currentPattern.append(c); } } - // 3. If neither "source.path" nor "source.pattern" is specified, - // this pipe source will match all data. - return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, null); + final String lastPattern = currentPattern.toString().trim(); + if (!lastPattern.isEmpty()) { + patterns.add(patternSupplier.apply(lastPattern)); + } + + return patterns; } public static boolean isTreeModelDataAllowToBeCaptured(final PipeParameters sourceParameters) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index afe0ebe22df5e..6120dcdc27d73 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -67,7 +67,7 @@ public abstract class EnrichedEvent implements Event { public static final long INITIAL_RETRY_INTERVAL_FOR_IOTV2 = 500L; protected long retryInterval = INITIAL_RETRY_INTERVAL_FOR_IOTV2; - protected final TreePattern treePattern; + protected final List treePatterns; protected final TablePattern tablePattern; protected final long startTime; @@ -87,7 +87,7 @@ protected EnrichedEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -101,7 +101,7 @@ protected EnrichedEvent( this.pipeName = pipeName; this.creationTime = creationTime; this.pipeTaskMeta = pipeTaskMeta; - this.treePattern = treePattern; + this.treePatterns = Objects.nonNull(treePatterns) ? treePatterns : Collections.emptyList(); this.tablePattern = tablePattern; this.userId = userId; this.userName = userName; @@ -111,9 +111,11 @@ protected EnrichedEvent( this.endTime = endTime; isPatternParsed = - (treePattern == null || treePattern.isRoot()) - && (tablePattern == null - || !tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()); + Objects.isNull(treePatterns) + || ((treePatterns.stream() + .allMatch(treePattern -> treePattern == null || treePattern.isRoot())) + && (tablePattern == null + || !tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern())); isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime; } @@ -327,11 +329,17 @@ public final boolean isDataRegionEvent() { */ // TODO: consider tablePattern public final String getTreePatternString() { - return treePattern != null ? treePattern.getPattern() : null; + // TODO: handle multiple patterns + return treePatterns.isEmpty() ? null : treePatterns.get(0).getPattern(); } public final TreePattern getTreePattern() { - return treePattern; + // TODO: handle multiple patterns + return treePatterns.isEmpty() ? null : treePatterns.get(0); + } + + public final List getTreePatterns() { + return treePatterns; } public final TablePattern getTablePattern() { @@ -390,7 +398,7 @@ public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressRepor final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -523,8 +531,8 @@ public String toString() { + commitId + "', replicateIndexForIoTV2=" + replicateIndexForIoTV2 - + ", treePattern='" - + treePattern + + ", treePatterns='" + + treePatterns + "', tablePattern='" + tablePattern + "', startTime=" @@ -558,8 +566,8 @@ public String coreReportMessage() { + commitId + "', replicateIndexForIoTV2=" + replicateIndexForIoTV2 - + ", treePattern='" - + treePattern + + ", treePatterns='" + + treePatterns + "', tablePattern='" + tablePattern + "', startTime=" diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java index ac3de4ca0efd7..dcf96404be444 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -38,7 +39,7 @@ protected PipeSnapshotEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -49,7 +50,7 @@ protected PipeSnapshotEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java index 6fbb143747bb0..6db7108be07ac 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import java.util.List; + public abstract class PipeWritePlanEvent extends EnrichedEvent implements SerializableEvent { protected boolean isGeneratedByPipe; @@ -34,7 +36,7 @@ protected PipeWritePlanEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -45,7 +47,7 @@ protected PipeWritePlanEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java index e6e69acac139c..4566682014af6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import java.util.List; + /** * {@link ProgressReportEvent} is an {@link EnrichedEvent} that is used only for progress report. It * is bind to a {@link ProgressIndex} and will be committed after all its preceding {@link @@ -74,7 +76,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java index bad0d1413ce69..e13c3900fc11e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java @@ -43,6 +43,7 @@ import org.apache.tsfile.utils.Pair; import java.io.IOException; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -53,7 +54,7 @@ @TableModel public abstract class IoTDBNonDataRegionSource extends IoTDBSource { - protected IoTDBTreePattern treePattern; + protected List treePatterns; protected TablePattern tablePattern; private List historicalEvents = new LinkedList<>(); @@ -76,8 +77,16 @@ public void customize( throws Exception { super.customize(parameters, configuration); - final TreePattern pattern = TreePattern.parsePipePatternFromSourceParameters(parameters); + final List patterns = TreePattern.parsePipePatternFromSourceParameters(parameters); + // TODO: support multiple path pattern + if (patterns.size() != 1) { + throw new IllegalArgumentException( + String.format( + "The path pattern parameters %s is not valid for the source. Only single path pattern is allowed.", + parameters)); + } + final TreePattern pattern = patterns.get(0); if (!(pattern instanceof IoTDBTreePattern && (((IoTDBTreePattern) pattern).isPrefix() || ((IoTDBTreePattern) pattern).isFullPath()))) { @@ -86,7 +95,7 @@ public void customize( "The path pattern %s is not valid for the source. Only prefix or full path is allowed.", pattern.getPattern())); } - treePattern = (IoTDBTreePattern) pattern; + treePatterns = Collections.singletonList((IoTDBTreePattern) pattern); tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters); } @@ -179,7 +188,7 @@ public EnrichedEvent supply() throws Exception { pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -242,7 +251,7 @@ public EnrichedEvent supply() throws Exception { pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java index 61b217c35d472..55beaaae164be 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeCommitQueueTest.java @@ -32,6 +32,7 @@ import org.junit.Test; import java.util.HashSet; +import java.util.List; import java.util.Set; public class PipeCommitQueueTest { @@ -90,7 +91,7 @@ protected TestEnrichedEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName, @@ -102,7 +103,7 @@ protected TestEnrichedEvent( pipeName, creationTime, pipeTaskMeta, - treePattern, + treePatterns, tablePattern, userId, userName, @@ -127,7 +128,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final TreePattern treePattern, + final List treePatterns, final TablePattern tablePattern, final String userId, final String userName,