Skip to content

Commit 7372101

Browse files
authored
[To dev/1.3] Pipe: CN adds logic to check if Pipe is out of memory #16119 (#16120)
* Pipe: CN adds logic to check if Pipe is out of memory * update AbstractOperatePipeProcedureV2 * update AbstractOperatePipeProcedureV2 * add it
1 parent 01dcf32 commit 7372101

10 files changed

Lines changed: 169 additions & 8 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,12 @@ public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
543543
return this;
544544
}
545545

546+
@Override
547+
public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) {
548+
setProperty("datanode_memory_proportion", datanodeMemoryProportion);
549+
return this;
550+
}
551+
546552
// For part of the log directory
547553
public String getClusterConfigStr() {
548554
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,4 +559,10 @@ public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
559559
cnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
560560
return this;
561561
}
562+
563+
public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) {
564+
dnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
565+
cnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
566+
return this;
567+
}
562568
}

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,4 +386,9 @@ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
386386
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
387387
return this;
388388
}
389+
390+
@Override
391+
public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) {
392+
return this;
393+
}
389394
}

integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,6 @@ CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
175175
default CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
176176
return this;
177177
}
178+
179+
CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion);
178180
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.it.autocreate;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
24+
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
25+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
26+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
27+
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
28+
import org.apache.iotdb.rpc.TSStatusCode;
29+
30+
import org.junit.Assert;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
import org.junit.experimental.categories.Category;
34+
import org.junit.runner.RunWith;
35+
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
39+
@RunWith(IoTDBTestRunner.class)
40+
@Category({MultiClusterIT2AutoCreateSchema.class})
41+
public class IoTDBPipeMemoryIT extends AbstractPipeDualAutoIT {
42+
43+
@Override
44+
@Before
45+
public void setUp() {
46+
super.setUp();
47+
}
48+
49+
@Override
50+
protected void setupConfig() {
51+
super.setupConfig();
52+
senderEnv
53+
.getConfig()
54+
.getCommonConfig()
55+
.setPipeMemoryManagementEnabled(true)
56+
.setIsPipeEnableMemoryCheck(true)
57+
.setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
58+
receiverEnv
59+
.getConfig()
60+
.getCommonConfig()
61+
.setPipeMemoryManagementEnabled(true)
62+
.setIsPipeEnableMemoryCheck(true)
63+
.setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
64+
}
65+
66+
@Test
67+
public void testCreatePipeMemoryManage() {
68+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
69+
final String receiverIp = receiverDataNode.getIp();
70+
final int receiverPort = receiverDataNode.getPort();
71+
72+
try (final SyncConfigNodeIServiceClient client =
73+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
74+
final Map<String, String> extractorAttributes = new HashMap<>();
75+
final Map<String, String> processorAttributes = new HashMap<>();
76+
final Map<String, String> connectorAttributes = new HashMap<>();
77+
extractorAttributes.put("user", "root");
78+
79+
connectorAttributes.put("connector", "iotdb-thrift-connector");
80+
connectorAttributes.put("connector.batch.enable", "false");
81+
connectorAttributes.put("connector.ip", receiverIp);
82+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
83+
84+
final TSStatus status =
85+
client.createPipe(
86+
new TCreatePipeReq("p1", connectorAttributes)
87+
.setExtractorAttributes(extractorAttributes)
88+
.setProcessorAttributes(processorAttributes));
89+
90+
Assert.assertNotEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
91+
Assert.assertNotNull(status.getMessage());
92+
Assert.assertTrue(status.getMessage().contains("Not enough memory for pipe."));
93+
94+
} catch (Exception e) {
95+
Assert.fail(e.getMessage());
96+
}
97+
}
98+
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ public enum TSStatusCode {
261261
PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811),
262262
PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
263263
PIPE_PUSH_META_TIMEOUT(1813),
264+
PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
264265

265266
// Subscription
266267
SUBSCRIPTION_VERSION_ERROR(1900),

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,11 +424,33 @@ public static Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
424424
public static String parsePushPipeMetaExceptionForPipe(
425425
final String pipeName, final Map<Integer, TPushPipeMetaResp> respMap) {
426426
final StringBuilder exceptionMessageBuilder = new StringBuilder();
427+
final StringBuilder enoughMemoryMessageBuilder = new StringBuilder();
427428

428429
for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry : respMap.entrySet()) {
429430
final int dataNodeId = respEntry.getKey();
430431
final TPushPipeMetaResp resp = respEntry.getValue();
431432

433+
if (resp.getStatus().getCode()
434+
== TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()) {
435+
exceptionMessageBuilder.append(String.format("DataNodeId: %s,", dataNodeId));
436+
resp.getExceptionMessages()
437+
.forEach(
438+
message -> {
439+
// Ignore the timeStamp for simplicity
440+
if (pipeName == null) {
441+
enoughMemoryMessageBuilder.append(
442+
String.format(
443+
"PipeName: %s, Message: %s",
444+
message.getPipeName(), message.getMessage()));
445+
} else if (pipeName.equals(message.getPipeName())) {
446+
enoughMemoryMessageBuilder.append(
447+
String.format("Message: %s", message.getMessage()));
448+
}
449+
});
450+
enoughMemoryMessageBuilder.append(".");
451+
continue;
452+
}
453+
432454
if (resp.getStatus().getCode() == TSStatusCode.PIPE_PUSH_META_TIMEOUT.getStatusCode()) {
433455
exceptionMessageBuilder.append(
434456
String.format(
@@ -472,6 +494,12 @@ public static String parsePushPipeMetaExceptionForPipe(
472494
}
473495
}
474496
}
497+
498+
final String enoughMemoryMessage = enoughMemoryMessageBuilder.toString();
499+
if (!enoughMemoryMessage.isEmpty()) {
500+
throw new PipeException(enoughMemoryMessage);
501+
}
502+
475503
return exceptionMessageBuilder.toString();
476504
}
477505

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,8 @@ protected void calculateMemoryUsage(
692692
if (freeMemorySizeInBytes < needMemory + reservedMemorySizeInBytes) {
693693
final String message =
694694
String.format(
695-
"Not enough memory for pipe. Need memory: %d bytes, free memory: %d bytes, reserved memory: %d bytes, total memory: %d bytes",
695+
"%s Need memory: %d bytes, free memory: %d bytes, reserved memory: %d bytes, total memory: %d bytes",
696+
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
696697
needMemory,
697698
freeMemorySizeInBytes,
698699
freeMemorySizeInBytes,
@@ -739,8 +740,10 @@ private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameter
739740
if (remainingMemory < PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
740741
final String message =
741742
String.format(
742-
"Not enough memory for pipe. Need Floating memory: %d bytes, free Floating memory: %d bytes",
743-
PipeConfig.getInstance().PipeInsertNodeQueueMemory(), remainingMemory);
743+
"%s Need Floating memory: %d bytes, free Floating memory: %d bytes",
744+
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
745+
PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
746+
remainingMemory);
744747
LOGGER.warn(message);
745748
throw new PipeException(message);
746749
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.iotdb.commons.path.PathDeserializeUtil;
6060
import org.apache.iotdb.commons.path.PathPatternTree;
6161
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
62+
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
6263
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
6364
import org.apache.iotdb.commons.schema.SchemaConstant;
6465
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
@@ -1124,12 +1125,22 @@ public TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req) {
11241125
} else {
11251126
throw new Exception("Invalid TPushSinglePipeMetaReq");
11261127
}
1127-
return exceptionMessage == null
1128-
? new TPushPipeMetaResp()
1129-
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
1130-
: new TPushPipeMetaResp()
1131-
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
1128+
if (exceptionMessage != null) {
1129+
if (exceptionMessage.message != null
1130+
&& exceptionMessage.message.contains(PipeTaskAgent.MESSAGE_PIPE_NOT_ENOUGH_MEMORY)) {
1131+
return new TPushPipeMetaResp()
1132+
.setStatus(
1133+
new TSStatus(TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()))
11321134
.setExceptionMessages(Collections.singletonList(exceptionMessage));
1135+
}
1136+
1137+
return new TPushPipeMetaResp()
1138+
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
1139+
.setExceptionMessages(Collections.singletonList(exceptionMessage));
1140+
}
1141+
1142+
return new TPushPipeMetaResp()
1143+
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
11331144
} catch (Exception e) {
11341145
LOGGER.error("Error occurred when pushing single pipe meta", e);
11351146
return new TPushPipeMetaResp()

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public abstract class PipeTaskAgent {
7979

8080
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
8181

82+
public static final String MESSAGE_PIPE_NOT_ENOUGH_MEMORY = "Not enough memory for pipe.";
8283
protected static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s";
8384
protected static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
8485

0 commit comments

Comments
 (0)