Skip to content

Commit d39d22f

Browse files
authored
Support sink config key for pipe request slicing (#17858) (#17883)
* Support sink config key for pipe request slicing * Support processor output series aliases
1 parent 6a59cf4 commit d39d22f

5 files changed

Lines changed: 146 additions & 8 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long pipeMetaSyncerSync
508508
public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
509509
int pipeConnectorRequestSliceThresholdBytes) {
510510
setProperty(
511-
"pipe_connector_request_slice_threshold_bytes",
511+
"pipe_sink_request_slice_threshold_bytes",
512512
String.valueOf(pipeConnectorRequestSliceThresholdBytes));
513513

514514
return this;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
public class TwoStageCountProcessor implements PipeProcessor {
7575

7676
private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageCountProcessor.class);
77+
private static final String LEGACY_PROCESSOR_OUTPUT_SERIES_KEY = "processor.output.series";
7778

7879
private String pipeName;
7980
private long creationTime;
@@ -98,10 +99,17 @@ public class TwoStageCountProcessor implements PipeProcessor {
9899

99100
@Override
100101
public void validate(PipeParameterValidator validator) throws Exception {
101-
validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
102+
validator.validateSynonymAttributes(
103+
Collections.singletonList(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY),
104+
Collections.singletonList(LEGACY_PROCESSOR_OUTPUT_SERIES_KEY),
105+
true);
102106

103107
final String rawOutputSeries =
104-
validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
108+
validator
109+
.getParameters()
110+
.getStringByKeys(
111+
PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY,
112+
LEGACY_PROCESSOR_OUTPUT_SERIES_KEY);
105113
try {
106114
PathUtils.isLegalPath(rawOutputSeries);
107115
} catch (IllegalPathException e) {
@@ -119,8 +127,7 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
119127
regionId = runtimeEnvironment.getRegionId();
120128
pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
121129

122-
outputSeries =
123-
new PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY));
130+
outputSeries = parseOutputSeries(parameters);
124131

125132
if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
126133
if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
@@ -152,6 +159,13 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
152159
twoStageAggregateSender = new TwoStageAggregateSender(pipeName, creationTime);
153160
}
154161

162+
static PartialPath parseOutputSeries(final PipeParameters parameters)
163+
throws IllegalPathException {
164+
return new PartialPath(
165+
parameters.getStringByKeys(
166+
PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY, LEGACY_PROCESSOR_OUTPUT_SERIES_KEY));
167+
}
168+
155169
@Override
156170
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
157171
throws Exception {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.processor.twostage.plugin;
21+
22+
import org.apache.iotdb.commons.path.PartialPath;
23+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
24+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
25+
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
29+
import java.util.Collections;
30+
31+
public class TwoStageCountProcessorTest {
32+
33+
@Test
34+
public void testOutputSeriesSupportsNewAndLegacyKeys() throws Exception {
35+
Assert.assertEquals(
36+
"root.db.d.s1", parseOutputSeries("processor.output.series", "root.db.d.s1").getFullPath());
37+
Assert.assertEquals(
38+
"root.db.d.s2", parseOutputSeries("processor.output-series", "root.db.d.s2").getFullPath());
39+
}
40+
41+
@Test
42+
public void testValidateOutputSeriesSupportsNewAndLegacyKeys() throws Exception {
43+
validateOutputSeries("processor.output.series", "root.db.d.s1");
44+
validateOutputSeries("processor.output-series", "root.db.d.s2");
45+
}
46+
47+
private PartialPath parseOutputSeries(final String key, final String value) throws Exception {
48+
return TwoStageCountProcessor.parseOutputSeries(
49+
new PipeParameters(Collections.singletonMap(key, value)));
50+
}
51+
52+
private void validateOutputSeries(final String key, final String value) throws Exception {
53+
new TwoStageCountProcessor()
54+
.validate(
55+
new PipeParameterValidator(new PipeParameters(Collections.singletonMap(key, value))));
56+
}
57+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -445,9 +445,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
445445

446446
config.setPipeSinkRequestSliceThresholdBytes(
447447
Integer.parseInt(
448-
properties.getProperty(
449-
"pipe_connector_request_slice_threshold_bytes",
450-
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
448+
Optional.ofNullable(properties.getProperty("pipe_sink_request_slice_threshold_bytes"))
449+
.orElse(
450+
properties.getProperty(
451+
"pipe_connector_request_slice_threshold_bytes",
452+
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes())))));
451453

452454
config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
453455
Long.parseLong(
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.pipe.config;
21+
22+
import org.apache.iotdb.commons.conf.CommonConfig;
23+
import org.apache.iotdb.commons.conf.CommonDescriptor;
24+
import org.apache.iotdb.commons.conf.TrimProperties;
25+
26+
import org.junit.After;
27+
import org.junit.Assert;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
31+
public class PipeDescriptorTest {
32+
33+
private final CommonConfig config = CommonDescriptor.getInstance().getConfig();
34+
35+
private int originalRequestSliceThresholdBytes;
36+
37+
@Before
38+
public void setUp() {
39+
originalRequestSliceThresholdBytes = config.getPipeSinkRequestSliceThresholdBytes();
40+
}
41+
42+
@After
43+
public void tearDown() {
44+
config.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
45+
}
46+
47+
@Test
48+
public void testPipeRequestSliceThresholdSupportsSinkAndConnectorKeys() {
49+
final TrimProperties connectorProperties = new TrimProperties();
50+
connectorProperties.setProperty("pipe_connector_request_slice_threshold_bytes", "123");
51+
PipeDescriptor.loadPipeInternalConfig(config, connectorProperties);
52+
Assert.assertEquals(123, config.getPipeSinkRequestSliceThresholdBytes());
53+
54+
final TrimProperties sinkProperties = new TrimProperties();
55+
sinkProperties.setProperty("pipe_sink_request_slice_threshold_bytes", "456");
56+
PipeDescriptor.loadPipeInternalConfig(config, sinkProperties);
57+
Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
58+
59+
final TrimProperties bothProperties = new TrimProperties();
60+
bothProperties.setProperty("pipe_connector_request_slice_threshold_bytes", "123");
61+
bothProperties.setProperty("pipe_sink_request_slice_threshold_bytes", "456");
62+
PipeDescriptor.loadPipeInternalConfig(config, bothProperties);
63+
Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
64+
}
65+
}

0 commit comments

Comments
 (0)