Skip to content

Commit cee5fbb

Browse files
authored
Pipe: Fixed the bug that the split historical pipe's enable-send-tsfile-limit cannot be configured (#17598)
1 parent 70ba5fc commit cee5fbb

2 files changed

Lines changed: 27 additions & 9 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,21 @@ public void testSingleEnv() throws Exception {
140140
"insert into root.test.device(time, field) values(0,1),(1,2)",
141141
"delete from root.test.device.* where time == 0",
142142
String.format(
143-
"create pipe a2b with source ('inclusion'='all') with sink ('node-urls'='%s')",
143+
"create pipe a2b with source ('inclusion'='all') with sink "
144+
+ "('node-urls'='%s', 'enable-send-tsfile-limit'='false')",
144145
receiverDataNode.getIpAndPortString())));
145146

147+
try (final SyncConfigNodeIServiceClient client =
148+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
149+
final List<TShowPipeInfo> showPipeResult =
150+
client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
151+
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
152+
Assert.assertTrue(
153+
showPipeResult.stream()
154+
.filter(i -> Objects.equals(i.id, "a2b_history"))
155+
.anyMatch(i -> i.pipeConnector.contains("enable-send-tsfile-limit=false")));
156+
}
157+
146158
TestUtils.assertDataEventuallyOnEnv(
147159
receiverEnv,
148160
"select * from root.test.device",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2260,6 +2260,19 @@ public SettableFuture<ConfigTaskResult> createPipe(
22602260
}
22612261

22622262
// 2. Send request to create the historical data synchronization pipeline
2263+
final Map<String, String> historySinkAttributes =
2264+
sinkPipeParameters.hasAnyAttributes(
2265+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
2266+
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT)
2267+
? createPipeStatement.getSinkAttributes()
2268+
: sinkPipeParameters
2269+
.addOrReplaceEquivalentAttributesWithClone(
2270+
new PipeParameters(
2271+
Collections.singletonMap(
2272+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
2273+
Boolean.TRUE.toString())))
2274+
.getAttribute();
2275+
22632276
final TCreatePipeReq historyReq =
22642277
new TCreatePipeReq()
22652278
// Append suffix to the pipeline name for historical data
@@ -2292,14 +2305,7 @@ public SettableFuture<ConfigTaskResult> createPipe(
22922305
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
22932306
.getAttribute())
22942307
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
2295-
.setConnectorAttributes(
2296-
sinkPipeParameters
2297-
.addOrReplaceEquivalentAttributesWithClone(
2298-
new PipeParameters(
2299-
Collections.singletonMap(
2300-
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
2301-
Boolean.TRUE.toString())))
2302-
.getAttribute());
2308+
.setConnectorAttributes(historySinkAttributes);
23032309

23042310
final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq);
23052311
// If creation fails, immediately return with exception

0 commit comments

Comments
 (0)