Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testPrefixPattern() throws Exception {
}

@Test
public void testIotdbPattern() throws Exception {
public void testIoTDBPattern() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand All @@ -127,8 +127,6 @@ public void testIotdbPattern() throws Exception {
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.**.d1.s*");
// When path is set, pattern should be ignored
extractorAttributes.put("extractor.pattern", "root");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
Expand Down Expand Up @@ -158,7 +156,7 @@ public void testIotdbPattern() throws Exception {
}

@Test
public void testIotdbPatternWithLegacySyntax() throws Exception {
public void testIoTDBPatternWithLegacySyntax() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand Down Expand Up @@ -209,4 +207,324 @@ public void testIotdbPatternWithLegacySyntax() throws Exception {
expectedResSet);
}
}

@Test
public void testMultiplePrefixPatternHistoricalData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,1.0,1.0,");
expectedResSet.add("3,3.0,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
expectedResSet);
}
}

@Test
public void testMultipleIoTDBPatternHistoricalData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s, t) values (3, 3, 3)",
"insert into root.db3.d1(time, s) values (4, 4)"),
null);
awaitUntilFlush(senderEnv);

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,null,1.0,1.0,null,");
expectedResSet.add("2,null,null,null,null,2.0,");
expectedResSet.add("3,3.0,3.0,null,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
expectedResSet);
}
}

@Test
public void testMultipleHybridPatternHistoricalData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.d1.*");
extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db2.d1(time, s) values (2, 2)",
"insert into root.db3.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,1.0,null,");
expectedResSet.add("2,null,null,2.0,");

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db.**,root.db2.**",
"Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
expectedResSet);
}
}

@Test
public void testMultiplePrefixPatternRealtimeData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.pattern", "root.db.d1.s, root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,1.0,1.0,");
expectedResSet.add("3,3.0,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
expectedResSet);
}
}

@Test
public void testMultipleIoTDBPatternRealtimeData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db.d2(time, s) values (2, 2)",
"insert into root.db2.d1(time, s, t) values (3, 3, 3)",
"insert into root.db3.d1(time, s) values (4, 4)"),
null);
awaitUntilFlush(senderEnv);

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,null,null,1.0,1.0,null,");
expectedResSet.add("2,null,null,null,null,2.0,");
expectedResSet.add("3,3.0,3.0,null,null,null,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db2.**,root.db.**",
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
expectedResSet);
}
}

@Test
public void testMultipleHybridPatternRealtimeData() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.path", "root.db.d1.*");
extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
extractorAttributes.put("extractor.inclusion", "data.insert");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s, s1) values (1, 1, 1)",
"insert into root.db2.d1(time, s) values (2, 2)",
"insert into root.db3.d1(time, s) values (3, 3)"),
null);
awaitUntilFlush(senderEnv);

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,1.0,null,");
expectedResSet.add("2,null,null,2.0,");

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.db.**,root.db2.**",
"Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
expectedResSet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -120,7 +121,7 @@ public PipeConfigRegionSnapshotEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final TreePattern treePattern,
final List<TreePattern> treePatterns,
final TablePattern tablePattern,
final String userId,
final String userName,
Expand All @@ -130,7 +131,7 @@ public PipeConfigRegionSnapshotEvent(
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
treePatterns,
tablePattern,
userId,
userName,
Expand Down Expand Up @@ -203,7 +204,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final TreePattern treePattern,
final List<TreePattern> treePatterns,
final TablePattern tablePattern,
final String userId,
final String userName,
Expand All @@ -219,7 +220,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
pipeName,
creationTime,
pipeTaskMeta,
treePattern,
treePatterns,
tablePattern,
userId,
userName,
Expand Down
Loading
Loading