Skip to content

Commit f72b3ee

Browse files
authored
Fixed the bug that create attribute does not support attribute.None & Pipe: Reset tablet pipeDataStructureTabletSizeInBytes to 16MB & Enable stopping exception restart by manual stop pipe (#17588)
* stop pipe * 16 * fix * legacy
1 parent dd8631f commit f72b3ee

13 files changed

Lines changed: 343 additions & 25 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
8181
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
8282
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
83+
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
8384
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
8485
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
8586
import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -504,6 +505,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
504505
case SetPipeStatusV2:
505506
plan = new SetPipeStatusPlanV2();
506507
break;
508+
case SetPipeStatusWithStoppedByRuntimeExceptionV2:
509+
plan = new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2();
510+
break;
507511
case DropPipeV2:
508512
plan = new DropPipePlanV2();
509513
break;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ public enum ConfigPhysicalPlanType {
295295
ShowPipeV2((short) 1503),
296296
AlterPipeV2((short) 1504),
297297
OperateMultiplePipesV2((short) 1505),
298+
SetPipeStatusWithStoppedByRuntimeExceptionV2((short) 1506),
298299

299300
/** Pipe Runtime. */
300301
PipeHandleLeaderChange((short) 1600),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
21+
22+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
23+
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
24+
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
25+
26+
import org.apache.tsfile.utils.ReadWriteIOUtils;
27+
28+
import java.io.DataOutputStream;
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.Objects;
32+
33+
public class SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 extends ConfigPhysicalPlan {
34+
35+
private String pipeName;
36+
private PipeStatus status;
37+
private boolean stoppedByRuntimeException;
38+
39+
public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2() {
40+
super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2);
41+
}
42+
43+
public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
44+
final String pipeName, final PipeStatus status, final boolean stoppedByRuntimeException) {
45+
super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2);
46+
this.pipeName = pipeName;
47+
this.status = status;
48+
this.stoppedByRuntimeException = stoppedByRuntimeException;
49+
}
50+
51+
public String getPipeName() {
52+
return pipeName;
53+
}
54+
55+
public PipeStatus getPipeStatus() {
56+
return status;
57+
}
58+
59+
public boolean isStoppedByRuntimeException() {
60+
return stoppedByRuntimeException;
61+
}
62+
63+
@Override
64+
protected void serializeImpl(final DataOutputStream stream) throws IOException {
65+
stream.writeShort(getType().getPlanType());
66+
ReadWriteIOUtils.write(pipeName, stream);
67+
ReadWriteIOUtils.write(status.getType(), stream);
68+
ReadWriteIOUtils.write(stoppedByRuntimeException, stream);
69+
}
70+
71+
@Override
72+
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
73+
pipeName = ReadWriteIOUtils.readString(buffer);
74+
status = PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(buffer));
75+
stoppedByRuntimeException = ReadWriteIOUtils.readBool(buffer);
76+
}
77+
78+
@Override
79+
public boolean equals(final Object obj) {
80+
if (this == obj) {
81+
return true;
82+
}
83+
if (obj == null || getClass() != obj.getClass()) {
84+
return false;
85+
}
86+
final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 that =
87+
(SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) obj;
88+
return stoppedByRuntimeException == that.stoppedByRuntimeException
89+
&& pipeName.equals(that.pipeName)
90+
&& status.equals(that.status);
91+
}
92+
93+
@Override
94+
public int hashCode() {
95+
return Objects.hash(pipeName, status, stoppedByRuntimeException);
96+
}
97+
98+
@Override
99+
public String toString() {
100+
return "SetPipeStatusWithStoppedByRuntimeExceptionPlanV2{"
101+
+ "pipeName='"
102+
+ pipeName
103+
+ "', status='"
104+
+ status
105+
+ "', stoppedByRuntimeException='"
106+
+ stoppedByRuntimeException
107+
+ "'}";
108+
}
109+
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -158,24 +158,13 @@ public TSStatus startPipe(TStartPipeReq req) {
158158

159159
/** Caller should ensure that the method is called in the lock {@link #lock()}. */
160160
private TSStatus stopPipe(String pipeName) {
161-
final boolean isStoppedByRuntimeException = pipeTaskInfo.isStoppedByRuntimeException(pipeName);
162161
final TSStatus status;
163162
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
164163
status = configManager.getProcedureManager().stopConsensusPipe(pipeName);
165164
} else {
166165
status = configManager.getProcedureManager().stopPipe(pipeName);
167166
}
168-
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
169-
if (isStoppedByRuntimeException) {
170-
// Even if the return status is success, it doesn't imply the success of the
171-
// `executeFromOperateOnDataNodes` phase of stopping pipe. However, we still need to set
172-
// `isStoppedByRuntimeException` to false to avoid auto-restart. Meanwhile,
173-
// `isStoppedByRuntimeException` does not need to be synchronized with DNs.
174-
LOGGER.info("Pipe {} has stopped manually, stop its auto restart process.", pipeName);
175-
pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName);
176-
configManager.getProcedureManager().pipeHandleMetaChange(true, false);
177-
}
178-
} else {
167+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
179168
LOGGER.warn("Failed to stop pipe {}. Result status: {}.", pipeName, status);
180169
}
181170
return status;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
105105
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
106106
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
107+
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
107108
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
108109
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
109110
import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -621,6 +622,9 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
621622
return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan);
622623
case SetPipeStatusV2:
623624
return pipeInfo.setPipeStatus((SetPipeStatusPlanV2) physicalPlan);
625+
case SetPipeStatusWithStoppedByRuntimeExceptionV2:
626+
return pipeInfo.setPipeStatusWithStoppedByRuntimeException(
627+
(SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) physicalPlan);
624628
case DropPipeV2:
625629
return pipeInfo.dropPipe((DropPipePlanV2) physicalPlan);
626630
case AlterPipeV2:

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
3030
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
3131
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
32+
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
3233
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
3334
import org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
3435
import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
@@ -124,6 +125,25 @@ public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) {
124125
}
125126
}
126127

128+
public TSStatus setPipeStatusWithStoppedByRuntimeException(
129+
final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) {
130+
try {
131+
pipeTaskInfo.setPipeStatusWithStoppedByRuntimeException(plan);
132+
133+
PipeConfigNodeAgent.task()
134+
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
135+
PipeTemporaryMetaInCoordinatorMetrics.getInstance()
136+
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
137+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
138+
} catch (final Exception e) {
139+
LOGGER.error("Failed to set pipe status with stopped-by-runtime-exception flag", e);
140+
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
141+
.setMessage(
142+
"Failed to set pipe status with stopped-by-runtime-exception flag, because "
143+
+ e.getMessage());
144+
}
145+
}
146+
127147
public TSStatus dropPipe(final DropPipePlanV2 plan) {
128148
try {
129149
final Optional<PipeMeta> pipeMetaBeforeDrop =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
4949
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
5050
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
51+
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
5152
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
5253
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
5354
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
@@ -523,6 +524,25 @@ public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) {
523524
}
524525
}
525526

527+
public TSStatus setPipeStatusWithStoppedByRuntimeException(
528+
final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) {
529+
acquireWriteLock();
530+
try {
531+
pipeMetaKeeper
532+
.getPipeMeta(plan.getPipeName())
533+
.getRuntimeMeta()
534+
.getStatus()
535+
.set(plan.getPipeStatus());
536+
pipeMetaKeeper
537+
.getPipeMeta(plan.getPipeName())
538+
.getRuntimeMeta()
539+
.setIsStoppedByRuntimeException(plan.isStoppedByRuntimeException());
540+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
541+
} finally {
542+
releaseWriteLock();
543+
}
544+
}
545+
526546
public TSStatus dropPipe(final DropPipePlanV2 plan) {
527547
acquireWriteLock();
528548
try {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
24-
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
24+
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
2525
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
2626
import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
2727
import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
@@ -44,6 +44,7 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
4444
private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedureV2.class);
4545

4646
private String pipeName;
47+
private boolean isStoppedByRuntimeExceptionBeforeStop;
4748

4849
public StopPipeProcedureV2() {
4950
super();
@@ -71,7 +72,8 @@ public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeEx
7172
@Override
7273
public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException {
7374
LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName);
74-
// Do nothing
75+
isStoppedByRuntimeExceptionBeforeStop =
76+
pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
7577
}
7678

7779
@Override
@@ -83,7 +85,9 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) thro
8385
response =
8486
env.getConfigManager()
8587
.getConsensusManager()
86-
.write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
88+
.write(
89+
new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
90+
pipeName, PipeStatus.STOPPED, false));
8791
} catch (ConsensusException e) {
8892
LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
8993
response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -128,7 +132,9 @@ public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
128132
response =
129133
env.getConfigManager()
130134
.getConsensusManager()
131-
.write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
135+
.write(
136+
new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
137+
pipeName, PipeStatus.RUNNING, isStoppedByRuntimeExceptionBeforeStop));
132138
} catch (ConsensusException e) {
133139
LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
134140
response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -159,12 +165,16 @@ public void serialize(DataOutputStream stream) throws IOException {
159165
stream.writeShort(ProcedureType.STOP_PIPE_PROCEDURE_V2.getTypeCode());
160166
super.serialize(stream);
161167
ReadWriteIOUtils.write(pipeName, stream);
168+
ReadWriteIOUtils.write(isStoppedByRuntimeExceptionBeforeStop, stream);
162169
}
163170

164171
@Override
165172
public void deserialize(ByteBuffer byteBuffer) {
166173
super.deserialize(byteBuffer);
167174
pipeName = ReadWriteIOUtils.readString(byteBuffer);
175+
// Legacy persisted procedures do not carry this field.
176+
isStoppedByRuntimeExceptionBeforeStop =
177+
byteBuffer.hasRemaining() && ReadWriteIOUtils.readBool(byteBuffer);
168178
}
169179

170180
@Override
@@ -179,11 +189,17 @@ public boolean equals(Object o) {
179189
return getProcId() == that.getProcId()
180190
&& getCurrentState().equals(that.getCurrentState())
181191
&& getCycles() == that.getCycles()
192+
&& isStoppedByRuntimeExceptionBeforeStop == that.isStoppedByRuntimeExceptionBeforeStop
182193
&& pipeName.equals(that.pipeName);
183194
}
184195

185196
@Override
186197
public int hashCode() {
187-
return Objects.hash(getProcId(), getCurrentState(), getCycles(), pipeName);
198+
return Objects.hash(
199+
getProcId(),
200+
getCurrentState(),
201+
getCycles(),
202+
pipeName,
203+
isStoppedByRuntimeExceptionBeforeStop);
188204
}
189205
}

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
114114
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
115115
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
116+
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
116117
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
117118
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
118119
import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -987,6 +988,28 @@ public void SetPipeStatusPlanV2Test() throws IOException {
987988
Assert.assertEquals(setPipeStatusPlanV2.getPipeStatus(), setPipeStatusPlanV21.getPipeStatus());
988989
}
989990

991+
@Test
992+
public void SetPipeStatusWithStoppedByRuntimeExceptionPlanV2Test() throws IOException {
993+
final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2
994+
setPipeStatusWithStoppedByRuntimeExceptionPlanV2 =
995+
new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
996+
"pipe", org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.STOPPED, true);
997+
final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2
998+
setPipeStatusWithStoppedByRuntimeExceptionPlanV21 =
999+
(SetPipeStatusWithStoppedByRuntimeExceptionPlanV2)
1000+
ConfigPhysicalPlan.Factory.create(
1001+
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.serializeToByteBuffer());
1002+
Assert.assertEquals(
1003+
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeName(),
1004+
setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeName());
1005+
Assert.assertEquals(
1006+
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeStatus(),
1007+
setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeStatus());
1008+
Assert.assertEquals(
1009+
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.isStoppedByRuntimeException(),
1010+
setPipeStatusWithStoppedByRuntimeExceptionPlanV21.isStoppedByRuntimeException());
1011+
}
1012+
9901013
@Test
9911014
public void DropPipePlanV2Test() throws IOException {
9921015
final DropPipePlanV2 dropPipePlanV2 = new DropPipePlanV2("demo");
@@ -1028,7 +1051,6 @@ public void OperateMultiplePipesPlanV2Test() throws IOException {
10281051
final SetPipeStatusPlanV2 setPipeStatusPlanV2 =
10291052
new SetPipeStatusPlanV2(
10301053
"testSet", org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.RUNNING);
1031-
10321054
final List<ConfigPhysicalPlan> subPlans = new ArrayList<>();
10331055
subPlans.add(createPipePlanV2);
10341056
subPlans.add(alterPipePlanV2);

0 commit comments

Comments
 (0)