Skip to content

Commit 2b04150

Browse files
committed
gr
1 parent 6614bbd commit 2b04150

2 files changed

Lines changed: 52 additions & 1 deletion

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class SetTTLProcedure extends StateMachineProcedure<ConfigNodeProcedureEn
5858
private static final Logger LOGGER = LoggerFactory.getLogger(SetTTLProcedure.class);
5959
// Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset marker for rollback.
6060
private static final long TTL_NOT_EXIST = Long.MIN_VALUE;
61+
private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2;
6162

6263
private SetTTLPlan plan;
6364
private long previousTTL = TTL_NOT_EXIST;
@@ -202,8 +203,10 @@ private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws Proc
202203
private void restoreTTLOnConfigNode(
203204
final ConfigNodeProcedureEnv env, final String[] pathPattern, final long ttl)
204205
throws ProcedureException {
206+
// TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the executor to unset it.
205207
final SetTTLPlan rollbackPlan =
206208
new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl);
209+
// Database rollback restores the database path and db.** separately, so avoid auto-expansion.
207210
rollbackPlan.setDataBase(false);
208211
final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
209212
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -247,6 +250,9 @@ private void restoreTTLOnDataNodes(
247250
}
248251
}
249252

253+
/**
254+
* Best-effort rollback: restore both sides, throw the earliest failure, and suppress later ones.
255+
*/
250256
@Override
251257
protected void rollbackState(final ConfigNodeProcedureEnv env, final SetTTLState setTTLState)
252258
throws IOException, InterruptedException, ProcedureException {
@@ -266,6 +272,8 @@ protected void rollbackState(final ConfigNodeProcedureEnv env, final SetTTLState
266272
LOGGER.error("Failed to rollback DataNode ttl cache.", e);
267273
if (rollbackFailure == null) {
268274
rollbackFailure = e;
275+
} else {
276+
rollbackFailure.addSuppressed(e);
269277
}
270278
}
271279
if (rollbackFailure != null) {
@@ -313,8 +321,9 @@ public void deserialize(ByteBuffer byteBuffer) {
313321
final int length = ReadWriteIOUtils.readInt(byteBuffer);
314322
final int position = byteBuffer.position();
315323
this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
324+
// The serialized plan buffer may include padding; skip to the actual payload end.
316325
byteBuffer.position(position + length);
317-
if (byteBuffer.remaining() >= 17) {
326+
if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) {
318327
this.previousTTLStateCaptured = byteBuffer.get() != 0;
319328
this.previousTTL = byteBuffer.getLong();
320329
this.previousDatabaseWildcardTTL = byteBuffer.getLong();

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@
3131
import org.apache.iotdb.confignode.manager.ConfigManager;
3232
import org.apache.iotdb.confignode.manager.TTLManager;
3333
import org.apache.iotdb.confignode.manager.node.NodeManager;
34+
import org.apache.iotdb.confignode.procedure.Procedure;
3435
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
36+
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
3537
import org.apache.iotdb.confignode.procedure.state.schema.SetTTLState;
3638
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
39+
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
3740
import org.apache.iotdb.rpc.TSStatusCode;
3841

3942
import org.apache.tsfile.utils.PublicBAOS;
43+
import org.apache.tsfile.utils.ReadWriteIOUtils;
4044
import org.junit.Assert;
4145
import org.junit.Test;
4246
import org.mockito.Mockito;
@@ -111,6 +115,25 @@ public void serializeDeserializeTestWithCapturedRollbackState() throws Exception
111115
deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L, false);
112116
}
113117

118+
@Test
119+
public void deserializeOldFormatWithoutRollbackStateTest() throws Exception {
120+
final SetTTLPlan setTTLPlan =
121+
new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 2000L);
122+
setTTLPlan.setDataBase(true);
123+
124+
final PublicBAOS byteArrayOutputStream = new PublicBAOS();
125+
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
126+
writeOldFormatProcedure(outputStream, setTTLPlan);
127+
128+
final ByteBuffer buffer =
129+
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
130+
final SetTTLProcedure deserializedProcedure =
131+
(SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
132+
133+
assertSerializedProcedure(
134+
deserializedProcedure, "root.db", 2000L, true, false, Long.MIN_VALUE, Long.MIN_VALUE, false);
135+
}
136+
114137
@Test
115138
public void setConfigNodeTTLShouldNotWriteBeforePreviousStateIsCaptured() throws Exception {
116139
final SetTTLPlan setTTLPlan =
@@ -229,6 +252,25 @@ private void assertRequest(
229252
Assert.assertEquals(isDataBase, req.isDataBase);
230253
}
231254

255+
private void writeOldFormatProcedure(final DataOutputStream stream, final SetTTLPlan plan)
256+
throws IOException {
257+
stream.writeShort(ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
258+
// Procedure fields.
259+
stream.writeLong(Procedure.NO_PROC_ID);
260+
stream.writeInt(ProcedureState.INITIALIZING.ordinal());
261+
stream.writeLong(0L);
262+
stream.writeLong(0L);
263+
stream.writeLong(Procedure.NO_PROC_ID);
264+
stream.writeLong(Procedure.NO_TIMEOUT);
265+
stream.writeInt(-1); // no stack indexes
266+
stream.write((byte) 0); // no exception
267+
stream.writeInt(-1); // no result
268+
stream.write((byte) 0); // no lock
269+
// StateMachineProcedure fields.
270+
stream.writeInt(0); // no states
271+
ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
272+
}
273+
232274
private void assertSerializedProcedure(
233275
final SetTTLProcedure procedure,
234276
final String path,

0 commit comments

Comments
 (0)