2222import org .apache .iotdb .common .rpc .thrift .TDataNodeLocation ;
2323import org .apache .iotdb .common .rpc .thrift .TSStatus ;
2424import org .apache .iotdb .common .rpc .thrift .TSetTTLReq ;
25+ import org .apache .iotdb .commons .conf .IoTDBConstant ;
2526import org .apache .iotdb .commons .exception .IoTDBException ;
2627import org .apache .iotdb .commons .exception .MetadataException ;
28+ import org .apache .iotdb .commons .schema .ttl .TTLCache ;
2729import org .apache .iotdb .confignode .client .async .CnToDnAsyncRequestType ;
2830import org .apache .iotdb .confignode .client .async .CnToDnInternalServiceAsyncRequestManager ;
2931import org .apache .iotdb .confignode .client .async .handlers .DataNodeAsyncRequestContext ;
4547import java .io .DataOutputStream ;
4648import java .io .IOException ;
4749import java .nio .ByteBuffer ;
50+ import java .util .Arrays ;
4851import java .util .Collections ;
4952import java .util .Map ;
5053import java .util .Objects ;
5154
5255public class SetTTLProcedure extends StateMachineProcedure <ConfigNodeProcedureEnv , SetTTLState > {
5356 private static final Logger LOGGER = LoggerFactory .getLogger (SetTTLProcedure .class );
57+ // Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset marker for rollback.
58+ private static final long TTL_NOT_EXIST = Long .MIN_VALUE ;
59+ private static final int ROLLBACK_STATE_BYTES = Byte .BYTES + Long .BYTES * 2 ;
5460
5561 private SetTTLPlan plan ;
62+ private long previousTTL = TTL_NOT_EXIST ;
63+ private long previousDatabaseWildcardTTL = TTL_NOT_EXIST ;
64+ private boolean previousTTLStateCaptured = false ;
5665
5766 public SetTTLProcedure (final boolean isGeneratedByPipe ) {
5867 super (isGeneratedByPipe );
@@ -69,6 +78,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state)
6978 long startTime = System .currentTimeMillis ();
7079 try {
7180 switch (state ) {
81+ case CAPTURE_PREVIOUS_TTL :
82+ capturePreviousTTLState (env );
83+ setNextState (SetTTLState .SET_CONFIGNODE_TTL );
84+ return Flow .HAS_MORE_STATE ;
7285 case SET_CONFIGNODE_TTL :
7386 setConfigNodeTTL (env );
7487 return Flow .HAS_MORE_STATE ;
@@ -83,18 +96,13 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, SetTTLState state)
8396 }
8497 }
8598
86- private void setConfigNodeTTL (ConfigNodeProcedureEnv env ) {
87- TSStatus res ;
88- try {
89- res =
90- env .getConfigManager ()
91- .getConsensusManager ()
92- .write (isGeneratedByPipe ? new PipeEnrichedPlan (this .plan ) : this .plan );
93- } catch (ConsensusException e ) {
94- LOGGER .warn ("Failed in the write API executing the consensus layer due to: " , e );
95- res = new TSStatus (TSStatusCode .EXECUTE_STATEMENT_ERROR .getStatusCode ());
96- res .setMessage (e .getMessage ());
99+ void setConfigNodeTTL (final ConfigNodeProcedureEnv env ) {
100+ if (!previousTTLStateCaptured ) {
101+ capturePreviousTTLState (env );
102+ setNextState (SetTTLState .SET_CONFIGNODE_TTL );
103+ return ;
97104 }
105+ final TSStatus res = writeConfigNodePlan (env , plan );
98106 if (res .code != TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
99107 LOGGER .info ("Failed to execute plan {} because {}" , plan , res .message );
100108 setFailure (new ProcedureException (new IoTDBException (res )));
@@ -103,34 +111,175 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
103111 }
104112 }
105113
106- private void updateDataNodeTTL (ConfigNodeProcedureEnv env ) {
107- Map <Integer , TDataNodeLocation > dataNodeLocationMap =
114+ void updateDataNodeTTL (final ConfigNodeProcedureEnv env ) {
115+ final Map <Integer , TDataNodeLocation > dataNodeLocationMap =
108116 env .getConfigManager ().getNodeManager ().getRegisteredDataNodeLocations ();
109- DataNodeAsyncRequestContext <TSetTTLReq , TSStatus > clientHandler =
110- new DataNodeAsyncRequestContext <>(
111- CnToDnAsyncRequestType .SET_TTL ,
112- new TSetTTLReq (
113- Collections .singletonList (String .join ("." , plan .getPathPattern ())),
114- plan .getTTL (),
115- plan .isDataBase ()),
116- dataNodeLocationMap );
117+ final DataNodeAsyncRequestContext <TSetTTLReq , TSStatus > clientHandler =
118+ sendTTLRequest (
119+ dataNodeLocationMap ,
120+ buildSetTTLReq (plan .getPathPattern (), plan .getTTL (), plan .isDataBase ()));
121+ if (hasFailedDataNode (clientHandler )) {
122+ LOGGER .error ("Failed to update ttl cache of dataNode." );
123+ setFailure (new ProcedureException (new MetadataException ("Update dataNode ttl cache failed" )));
124+ }
125+ }
126+
127+ private void capturePreviousTTLState (final ConfigNodeProcedureEnv env ) {
128+ if (previousTTLStateCaptured ) {
129+ return ;
130+ }
131+ previousTTL = getTTLOrDefault (env , plan .getPathPattern ());
132+ if (plan .isDataBase ()) {
133+ previousDatabaseWildcardTTL =
134+ getTTLOrDefault (env , getDatabaseWildcardPathPattern (plan .getPathPattern ()));
135+ }
136+ previousTTLStateCaptured = true ;
137+ }
138+
139+ TSStatus writeConfigNodePlan (final ConfigNodeProcedureEnv env , final SetTTLPlan setTTLPlan ) {
140+ try {
141+ return env .getConfigManager ()
142+ .getConsensusManager ()
143+ .write (isGeneratedByPipe ? new PipeEnrichedPlan (setTTLPlan ) : setTTLPlan );
144+ } catch (ConsensusException e ) {
145+ LOGGER .warn ("Failed in the write API executing the consensus layer due to: " , e );
146+ final TSStatus res = new TSStatus (TSStatusCode .EXECUTE_STATEMENT_ERROR .getStatusCode ());
147+ res .setMessage (e .getMessage ());
148+ return res ;
149+ }
150+ }
151+
152+ DataNodeAsyncRequestContext <TSetTTLReq , TSStatus > sendTTLRequest (
153+ final Map <Integer , TDataNodeLocation > dataNodeLocationMap , final TSetTTLReq req ) {
154+ final DataNodeAsyncRequestContext <TSetTTLReq , TSStatus > clientHandler =
155+ new DataNodeAsyncRequestContext <>(CnToDnAsyncRequestType .SET_TTL , req , dataNodeLocationMap );
117156 CnToDnInternalServiceAsyncRequestManager .getInstance ().sendAsyncRequestWithRetry (clientHandler );
118- Map <Integer , TSStatus > statusMap = clientHandler .getResponseMap ();
119- for (TSStatus status : statusMap .values ()) {
120- // all dataNodes must clear the related schemaengine cache
157+ return clientHandler ;
158+ }
159+
160+ private TSetTTLReq buildSetTTLReq (
161+ final String [] pathPattern , final long ttl , final boolean isDataBase ) {
162+ return new TSetTTLReq (
163+ Collections .singletonList (String .join ("." , pathPattern )), ttl , isDataBase );
164+ }
165+
166+ private boolean hasFailedDataNode (
167+ final DataNodeAsyncRequestContext <TSetTTLReq , TSStatus > clientHandler ) {
168+ if (!clientHandler .getRequestIndices ().isEmpty ()) {
169+ return true ;
170+ }
171+ for (TSStatus status : clientHandler .getResponseMap ().values ()) {
121172 if (status .getCode () != TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
122- LOGGER .error ("Failed to update ttl cache of dataNode." );
123- setFailure (
124- new ProcedureException (new MetadataException ("Update dataNode ttl cache failed" )));
125- return ;
173+ return true ;
126174 }
127175 }
176+ return false ;
128177 }
129178
179+ private long getTTLOrDefault (final ConfigNodeProcedureEnv env , final String [] pathPattern ) {
180+ final long ttl = env .getConfigManager ().getTTLManager ().getTTL (pathPattern );
181+ return ttl == TTLCache .NULL_TTL ? TTL_NOT_EXIST : ttl ;
182+ }
183+
184+ private String [] getDatabaseWildcardPathPattern (final String [] pathPattern ) {
185+ final String [] pathNodes = Arrays .copyOf (pathPattern , pathPattern .length + 1 );
186+ pathNodes [pathNodes .length - 1 ] = IoTDBConstant .MULTI_LEVEL_PATH_WILDCARD ;
187+ return pathNodes ;
188+ }
189+
190+ private void rollbackConfigNodeTTL (final ConfigNodeProcedureEnv env ) throws ProcedureException {
191+ restoreTTLOnConfigNode (env , plan .getPathPattern (), previousTTL );
192+ if (plan .isDataBase ()) {
193+ restoreTTLOnConfigNode (
194+ env , getDatabaseWildcardPathPattern (plan .getPathPattern ()), previousDatabaseWildcardTTL );
195+ }
196+ }
197+
198+ private void restoreTTLOnConfigNode (
199+ final ConfigNodeProcedureEnv env , final String [] pathPattern , final long ttl )
200+ throws ProcedureException {
201+ // TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the executor to unset it.
202+ final SetTTLPlan rollbackPlan =
203+ new SetTTLPlan (pathPattern , ttl == TTL_NOT_EXIST ? TTLCache .NULL_TTL : ttl );
204+ // Database rollback restores the database path and db.** separately, so avoid auto-expansion.
205+ rollbackPlan .setDataBase (false );
206+ final TSStatus status = writeConfigNodePlan (env , rollbackPlan );
207+ if (status .getCode () != TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
208+ throw new ProcedureException (
209+ new MetadataException (
210+ "Rollback ConfigNode ttl failed for "
211+ + String .join ("." , pathPattern )
212+ + ": "
213+ + status .getMessage ()));
214+ }
215+ }
216+
217+ private void rollbackDataNodeTTL (final ConfigNodeProcedureEnv env ) throws ProcedureException {
218+ final Map <Integer , TDataNodeLocation > dataNodeLocationMap =
219+ env .getConfigManager ().getNodeManager ().getRegisteredDataNodeLocations ();
220+ restoreTTLOnDataNodes (dataNodeLocationMap , plan .getPathPattern (), previousTTL );
221+ if (plan .isDataBase ()) {
222+ restoreTTLOnDataNodes (
223+ dataNodeLocationMap ,
224+ getDatabaseWildcardPathPattern (plan .getPathPattern ()),
225+ previousDatabaseWildcardTTL );
226+ }
227+ }
228+
229+ private void restoreTTLOnDataNodes (
230+ final Map <Integer , TDataNodeLocation > dataNodeLocationMap ,
231+ final String [] pathPattern ,
232+ final long ttl )
233+ throws ProcedureException {
234+ if (dataNodeLocationMap .isEmpty ()) {
235+ return ;
236+ }
237+ final DataNodeAsyncRequestContext <TSetTTLReq , TSStatus > clientHandler =
238+ sendTTLRequest (
239+ dataNodeLocationMap ,
240+ buildSetTTLReq (pathPattern , ttl == TTL_NOT_EXIST ? TTLCache .NULL_TTL : ttl , false ));
241+ if (hasFailedDataNode (clientHandler )) {
242+ throw new ProcedureException (
243+ new MetadataException (
244+ "Rollback dataNode ttl cache failed for " + String .join ("." , pathPattern )));
245+ }
246+ }
247+
248+ /**
249+ * Best-effort rollback: restore both sides, throw the earliest failure, and suppress later ones.
250+ */
130251 @ Override
131- protected void rollbackState (
132- ConfigNodeProcedureEnv configNodeProcedureEnv , SetTTLState setTTLState )
133- throws IOException , InterruptedException , ProcedureException {}
252+ protected void rollbackState (final ConfigNodeProcedureEnv env , final SetTTLState setTTLState )
253+ throws IOException , InterruptedException , ProcedureException {
254+ if (setTTLState != SetTTLState .UPDATE_DATANODE_CACHE || !previousTTLStateCaptured ) {
255+ return ;
256+ }
257+ ProcedureException rollbackFailure = null ;
258+ try {
259+ rollbackConfigNodeTTL (env );
260+ } catch (ProcedureException e ) {
261+ LOGGER .error ("Failed to rollback ConfigNode ttl state." , e );
262+ rollbackFailure = e ;
263+ }
264+ try {
265+ rollbackDataNodeTTL (env );
266+ } catch (ProcedureException e ) {
267+ LOGGER .error ("Failed to rollback DataNode ttl cache." , e );
268+ if (rollbackFailure == null ) {
269+ rollbackFailure = e ;
270+ } else {
271+ rollbackFailure .addSuppressed (e );
272+ }
273+ }
274+ if (rollbackFailure != null ) {
275+ throw rollbackFailure ;
276+ }
277+ }
278+
279+ @ Override
280+ protected boolean isRollbackSupported (final SetTTLState state ) {
281+ return state == SetTTLState .UPDATE_DATANODE_CACHE ;
282+ }
134283
135284 @ Override
136285 protected SetTTLState getState (int stateId ) {
@@ -144,7 +293,7 @@ protected int getStateId(SetTTLState setTTLState) {
144293
145294 @ Override
146295 protected SetTTLState getInitialState () {
147- return SetTTLState .SET_CONFIGNODE_TTL ;
296+ return SetTTLState .CAPTURE_PREVIOUS_TTL ;
148297 }
149298
150299 @ Override
@@ -155,14 +304,25 @@ public void serialize(DataOutputStream stream) throws IOException {
155304 : ProcedureType .SET_TTL_PROCEDURE .getTypeCode ());
156305 super .serialize (stream );
157306 ReadWriteIOUtils .write (plan .serializeToByteBuffer (), stream );
307+ stream .writeBoolean (previousTTLStateCaptured );
308+ stream .writeLong (previousTTL );
309+ stream .writeLong (previousDatabaseWildcardTTL );
158310 }
159311
160312 @ Override
161313 public void deserialize (ByteBuffer byteBuffer ) {
162314 super .deserialize (byteBuffer );
163315 try {
164- ReadWriteIOUtils .readInt (byteBuffer );
316+ final int length = ReadWriteIOUtils .readInt (byteBuffer );
317+ final int position = byteBuffer .position ();
165318 this .plan = (SetTTLPlan ) ConfigPhysicalPlan .Factory .create (byteBuffer );
319+ // The serialized plan buffer may include padding; skip to the actual payload end.
320+ byteBuffer .position (position + length );
321+ if (byteBuffer .remaining () >= ROLLBACK_STATE_BYTES ) {
322+ this .previousTTLStateCaptured = byteBuffer .get () != 0 ;
323+ this .previousTTL = byteBuffer .getLong ();
324+ this .previousDatabaseWildcardTTL = byteBuffer .getLong ();
325+ }
166326 } catch (IOException e ) {
167327 LOGGER .error ("IO error when deserialize setTTL plan." , e );
168328 }
@@ -176,12 +336,21 @@ public boolean equals(Object o) {
176336 if (o == null || getClass () != o .getClass ()) {
177337 return false ;
178338 }
179- return this .plan .equals (((SetTTLProcedure ) o ).plan )
180- && this .isGeneratedByPipe == (((SetTTLProcedure ) o ).isGeneratedByPipe );
339+ final SetTTLProcedure that = (SetTTLProcedure ) o ;
340+ return this .isGeneratedByPipe == that .isGeneratedByPipe
341+ && this .previousTTL == that .previousTTL
342+ && this .previousDatabaseWildcardTTL == that .previousDatabaseWildcardTTL
343+ && this .previousTTLStateCaptured == that .previousTTLStateCaptured
344+ && this .plan .equals (that .plan );
181345 }
182346
183347 @ Override
184348 public int hashCode () {
185- return Objects .hash (plan , isGeneratedByPipe );
349+ return Objects .hash (
350+ plan ,
351+ isGeneratedByPipe ,
352+ previousTTL ,
353+ previousDatabaseWildcardTTL ,
354+ previousTTLStateCaptured );
186355 }
187356}
0 commit comments