Skip to content

Commit d09785e

Browse files
authored
Support sink config key for pipe request slicing (#17858)
* Support sink config key for pipe request slicing * Support processor output series aliases
1 parent 4747d5f commit d09785e

5 files changed

Lines changed: 123 additions & 5 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long pipeMetaSyncerSync
538538
public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
539539
int pipeConnectorRequestSliceThresholdBytes) {
540540
setProperty(
541-
"pipe_connector_request_slice_threshold_bytes",
541+
"pipe_sink_request_slice_threshold_bytes",
542542
String.valueOf(pipeConnectorRequestSliceThresholdBytes));
543543

544544
return this;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2424
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
2525
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
26+
import org.apache.iotdb.commons.exception.IllegalPathException;
2627
import org.apache.iotdb.commons.path.PartialPath;
2728
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2829
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
@@ -146,7 +147,7 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
146147
isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
147148
}
148149

149-
outputSeries = new PartialPath(parameters.getString(_PROCESSOR_OUTPUT_SERIES_KEY));
150+
outputSeries = parseOutputSeries(parameters);
150151

151152
if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
152153
if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
@@ -178,6 +179,12 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
178179
twoStageAggregateSender = new TwoStageAggregateSender(pipeName, creationTime);
179180
}
180181

182+
static PartialPath parseOutputSeries(final PipeParameters parameters)
183+
throws IllegalPathException {
184+
return new PartialPath(
185+
parameters.getStringByKeys(PROCESSOR_OUTPUT_SERIES_KEY, _PROCESSOR_OUTPUT_SERIES_KEY));
186+
}
187+
181188
@Override
182189
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
183190
throws Exception {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.processor.twostage.plugin;
21+
22+
import org.apache.iotdb.commons.path.PartialPath;
23+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
24+
25+
import org.junit.Assert;
26+
import org.junit.Test;
27+
28+
import java.util.Collections;
29+
30+
public class TwoStageCountProcessorTest {
31+
32+
@Test
33+
public void testOutputSeriesSupportsNewAndLegacyKeys() throws Exception {
34+
Assert.assertEquals(
35+
"root.db.d.s1", parseOutputSeries("processor.output.series", "root.db.d.s1").getFullPath());
36+
Assert.assertEquals(
37+
"root.db.d.s2", parseOutputSeries("processor.output-series", "root.db.d.s2").getFullPath());
38+
}
39+
40+
private PartialPath parseOutputSeries(final String key, final String value) throws Exception {
41+
return TwoStageCountProcessor.parseOutputSeries(
42+
new PipeParameters(Collections.singletonMap(key, value)));
43+
}
44+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
437437

438438
config.setPipeSinkRequestSliceThresholdBytes(
439439
Integer.parseInt(
440-
properties.getProperty(
441-
"pipe_connector_request_slice_threshold_bytes",
442-
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
440+
Optional.ofNullable(properties.getProperty("pipe_sink_request_slice_threshold_bytes"))
441+
.orElse(
442+
properties.getProperty(
443+
"pipe_connector_request_slice_threshold_bytes",
444+
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes())))));
443445

444446
config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
445447
Long.parseLong(
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.pipe.config;
21+
22+
import org.apache.iotdb.commons.conf.CommonConfig;
23+
import org.apache.iotdb.commons.conf.CommonDescriptor;
24+
import org.apache.iotdb.commons.conf.TrimProperties;
25+
26+
import org.junit.After;
27+
import org.junit.Assert;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
31+
public class PipeDescriptorTest {
32+
33+
private final CommonConfig config = CommonDescriptor.getInstance().getConfig();
34+
35+
private int originalRequestSliceThresholdBytes;
36+
37+
@Before
38+
public void setUp() {
39+
originalRequestSliceThresholdBytes = config.getPipeSinkRequestSliceThresholdBytes();
40+
}
41+
42+
@After
43+
public void tearDown() {
44+
config.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
45+
}
46+
47+
@Test
48+
public void testPipeRequestSliceThresholdSupportsSinkAndConnectorKeys() {
49+
final TrimProperties connectorProperties = new TrimProperties();
50+
connectorProperties.setProperty("pipe_connector_request_slice_threshold_bytes", "123");
51+
PipeDescriptor.loadPipeInternalConfig(config, connectorProperties);
52+
Assert.assertEquals(123, config.getPipeSinkRequestSliceThresholdBytes());
53+
54+
final TrimProperties sinkProperties = new TrimProperties();
55+
sinkProperties.setProperty("pipe_sink_request_slice_threshold_bytes", "456");
56+
PipeDescriptor.loadPipeInternalConfig(config, sinkProperties);
57+
Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
58+
59+
final TrimProperties bothProperties = new TrimProperties();
60+
bothProperties.setProperty("pipe_connector_request_slice_threshold_bytes", "123");
61+
bothProperties.setProperty("pipe_sink_request_slice_threshold_bytes", "456");
62+
PipeDescriptor.loadPipeInternalConfig(config, bothProperties);
63+
Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
64+
}
65+
}

0 commit comments

Comments
 (0)