1919
2020package org .apache .iotdb .db .tools .validate ;
2121
22+ import org .apache .iotdb .commons .consensus .index .impl .MinimumProgressIndex ;
2223import org .apache .iotdb .db .storageengine .dataregion .tsfile .TsFileResource ;
2324
2425import org .apache .tsfile .common .constant .TsFileConstant ;
3738import java .util .concurrent .atomic .AtomicInteger ;
3839import java .util .concurrent .atomic .AtomicLong ;
3940
40- public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
41+ public class TsFileResourcePipeStatisticsSetTool {
4142
4243 private static final Logger LOGGER =
43- org .slf4j .LoggerFactory .getLogger (
44- TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool .class );
44+ org .slf4j .LoggerFactory .getLogger (TsFileResourcePipeStatisticsSetTool .class );
4545
4646 private static final String USAGE =
47- "Usage: --expected true|false --dirs <dir1> <dir2> ...\n "
48- + " --expected: whether the TsFileResource is expected to be generated by pipe\n "
47+ "Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs <dir1> <dir2> ...\n "
48+ + " --isGeneratedByPipe: whether the TsFileResource is isGeneratedByPipe to be generated by pipe\n "
49+ + " --resetProgressIndex: whether to reset the TsFileResources' progressIndexes\n "
4950 + " --dirs: list of data directories to validate and repair" ;
5051
5152 private static final Set <File > dataDirs = new ConcurrentSkipListSet <>();
52- private static final AtomicBoolean expectedMark = new AtomicBoolean (true );
53+ private static AtomicBoolean isGeneratedByPipeMark = null ;
54+ private static boolean resetProgressIndex = false ;
5355
5456 private static final AtomicLong runtime = new AtomicLong (System .currentTimeMillis ());
5557
5658 private static final AtomicInteger totalTsFileNum = new AtomicInteger (0 );
57- private static final AtomicInteger toRepairTsFileNum = new AtomicInteger (0 );
59+ private static final AtomicInteger toResetFlagNum = new AtomicInteger (0 );
60+ private static final AtomicInteger toResetProgressIndexNum = new AtomicInteger (0 );
61+ private static final AtomicInteger changedNum = new AtomicInteger (0 );
5862
59- // Usage: --expected true|false --dirs <dir1> <dir2> ...
63+ // Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs <dir1> <dir2> ...
6064 public static void main (String [] args ) throws IOException {
6165 parseCommandLineArgs (args );
6266 final List <File > partitionDirs = findAllPartitionDirs ();
6367 partitionDirs .parallelStream ()
64- .forEach (
65- TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
66- ::validateAndRepairTsFileResourcesInPartition );
68+ .forEach (TsFileResourcePipeStatisticsSetTool ::validateAndRepairTsFileResourcesInPartition );
6769 printStatistics ();
6870 }
6971
@@ -74,14 +76,17 @@ private static void parseCommandLineArgs(final String[] args) {
7476 if (args .length == 0
7577 || argSet .contains ("--help" )
7678 || argSet .contains ("-h" )
77- || !(argSet .contains ("--expected" ) && argSet .contains ("--dirs" ))) {
79+ || !((argSet .contains ("--isGeneratedByPipe" ) || argSet .contains ("--resetProgressIndex" ))
80+ && argSet .contains ("--dirs" ))) {
7881 LOGGER .info (USAGE );
7982 System .exit (1 );
8083 }
8184
8285 for (int i = 0 ; i < args .length ; i ++) {
83- if ("--expected" .equals (args [i ]) && i + 1 < args .length ) {
84- expectedMark .set (Boolean .parseBoolean (args [++i ]));
86+ if ("--isGeneratedByPipe" .equals (args [i ]) && i + 1 < args .length ) {
87+ isGeneratedByPipeMark = new AtomicBoolean (Boolean .parseBoolean (args [++i ]));
88+ } else if ("--resetProgressIndex" .equals (args [i ])) {
89+ resetProgressIndex = true ;
8590 } else if ("--dirs" .equals (args [i ]) && i + 1 < args .length ) {
8691 i ++;
8792 while (i < args .length && !args [i ].startsWith ("--" )) {
@@ -102,7 +107,8 @@ private static void parseCommandLineArgs(final String[] args) {
102107 }
103108
104109 LOGGER .info ("------------------------------------------------------" );
105- LOGGER .info ("Expected mark: {}" , expectedMark .get ());
110+ LOGGER .info ("isGeneratedByPipe mark: {}" , isGeneratedByPipeMark );
111+ LOGGER .info ("resetProgressIndex: {}" , resetProgressIndex );
106112 LOGGER .info ("Data directories: " );
107113 for (File dir : dataDirs ) {
108114 LOGGER .info (" {}" , dir .getAbsolutePath ());
@@ -144,19 +150,20 @@ public static List<File> findLeafDirectories(File dir) {
144150 }
145151
146152 private static void validateAndRepairTsFileResourcesInPartition (final File partitionDir ) {
147- final AtomicInteger totalResources = new AtomicInteger ();
148- final AtomicInteger toRepairResources = new AtomicInteger ();
153+ final AtomicInteger totalTsFileResource = new AtomicInteger (0 );
154+ final AtomicInteger toResetFlagResource = new AtomicInteger (0 );
155+ final AtomicInteger toResetProgressIndexResource = new AtomicInteger (0 );
156+ final AtomicInteger changedResource = new AtomicInteger (0 );
149157
150158 try {
151159 final List <TsFileResource > resources =
152160 loadAllTsFileResources (Collections .singletonList (partitionDir ));
153- totalResources .addAndGet (resources .size ());
161+ totalTsFileResource .addAndGet (resources .size ());
154162
155163 for (final TsFileResource resource : resources ) {
156164 try {
157- if (validateAndRepairSingleTsFileResource (resource )) {
158- toRepairResources .incrementAndGet ();
159- }
165+ validateAndRepairSingleTsFileResource (
166+ resource , toResetFlagResource , toResetProgressIndexResource , changedResource );
160167 } catch (final Exception e ) {
161168 // Continue processing other resources even if one fails
162169 LOGGER .warn (
@@ -174,13 +181,17 @@ private static void validateAndRepairTsFileResourcesInPartition(final File parti
174181 e );
175182 }
176183
177- totalTsFileNum .addAndGet (totalResources .get ());
178- toRepairTsFileNum .addAndGet (toRepairResources .get ());
184+ totalTsFileNum .addAndGet (totalTsFileResource .get ());
185+ toResetFlagNum .addAndGet (toResetFlagResource .get ());
186+ toResetProgressIndexNum .addAndGet (toResetProgressIndexResource .get ());
187+ changedNum .addAndGet (changedResource .get ());
179188 LOGGER .info (
180- "TimePartition {} has {} total resources, {} to repair resources. Process completed." ,
189+ "TimePartition {} has {} total resources, {} to set isGeneratedByPipe resources, {} to reset progressIndex resources, {} changed resources. Process completed." ,
181190 partitionDir ,
182- totalResources .get (),
183- toRepairResources .get ());
191+ totalTsFileResource .get (),
192+ toResetFlagResource .get (),
193+ toResetProgressIndexResource .get (),
194+ changedResource .get ());
184195 }
185196
186197 private static List <TsFileResource > loadAllTsFileResources (List <File > timePartitionDirs )
@@ -217,47 +228,75 @@ private static List<TsFileResource> loadAllTsFileResources(List<File> timePartit
217228 * @param resource the TsFileResource to validate and repair
218229 * @return true if the resource needs to be repaired and false if it is valid
219230 */
220- private static boolean validateAndRepairSingleTsFileResource (TsFileResource resource ) {
221- if (resource .isGeneratedByPipe () == expectedMark .get ()) {
231+ private static void validateAndRepairSingleTsFileResource (
232+ final TsFileResource resource ,
233+ final AtomicInteger toResetFlagResource ,
234+ final AtomicInteger toResetProgressIndexResource ,
235+ final AtomicInteger changedResource ) {
236+ boolean skip = true ;
237+ if (Objects .nonNull (isGeneratedByPipeMark )
238+ && resource .isGeneratedByPipe () != isGeneratedByPipeMark .get ()) {
222239 // The resource is valid, no need to repair
223- return false ;
240+ LOGGER .info (
241+ "Repairing TsFileResource: {}, isGeneratedByPipe mark: {}, actual mark: {}" ,
242+ resource .getTsFile ().getAbsolutePath (),
243+ isGeneratedByPipeMark .get (),
244+ resource .isGeneratedByPipe ());
245+
246+ toResetFlagResource .getAndIncrement ();
247+ skip = false ;
224248 }
225249
226- LOGGER .info (
227- "Repairing TsFileResource: {}, expected mark: {}, actual mark: {}" ,
228- resource .getTsFile ().getAbsolutePath (),
229- expectedMark .get (),
230- resource .isGeneratedByPipe ());
250+ if (resetProgressIndex && resource .getProgressIndex () != MinimumProgressIndex .INSTANCE ) {
251+ // The resource is valid, no need to repair
252+ LOGGER .info (
253+ "Resetting TsFileResource:{} 's progressIndex to minimum, original progressIndex: {}" ,
254+ resource .getTsFile ().getAbsolutePath (),
255+ resource .getProgressIndex ());
256+
257+ toResetProgressIndexResource .getAndIncrement ();
258+ skip = false ;
259+ }
260+ if (skip ) {
261+ return ;
262+ }
263+ changedResource .getAndIncrement ();
231264
232265 try {
233266 repairSingleTsFileResource (resource );
234267
235268 LOGGER .info (
236269 "Marked TsFileResource as {} in resource: {}" ,
237- expectedMark .get (),
270+ isGeneratedByPipeMark .get (),
271+ resource .getTsFile ().getAbsolutePath ());
272+ LOGGER .info (
273+ "Reset TsFileResource:{} 's progressIndex to minimum." ,
238274 resource .getTsFile ().getAbsolutePath ());
239275 } catch (final Exception e ) {
240276 LOGGER .warn (
241- "ERROR: Failed to repair TsFileResource: {}, error: {}" ,
242- resource .getTsFile ().getAbsolutePath (),
243- e .getMessage ());
277+ "ERROR: Failed to repair TsFileResource: {}" , resource .getTsFile ().getAbsolutePath (), e );
244278 }
245-
246- return true ;
247279 }
248280
249281 private static void repairSingleTsFileResource (TsFileResource resource ) throws IOException {
250- resource .setGeneratedByPipe (expectedMark .get ());
282+ if (Objects .nonNull (isGeneratedByPipeMark )) {
283+ resource .setGeneratedByPipe (isGeneratedByPipeMark .get ());
284+ }
285+ if (resetProgressIndex ) {
286+ resource .setProgressIndex (MinimumProgressIndex .INSTANCE );
287+ }
251288 resource .serialize ();
252289 }
253290
254291 private static void printStatistics () {
255292 LOGGER .info ("------------------------------------------------------" );
256293 LOGGER .info ("Validation and repair completed. Statistics:" );
257294 LOGGER .info (
258- "Total time taken: {} ms, total TsFile resources: {}, repaired TsFile resources: {}" ,
295+ "Total time taken: {} ms, total TsFile resources: {}, set isGeneratedByPipe resources: {}, reset progressIndex resources: {}, changed resources: {}" ,
259296 System .currentTimeMillis () - runtime .get (),
260297 totalTsFileNum .get (),
261- toRepairTsFileNum .get ());
298+ toResetFlagNum .get (),
299+ toResetProgressIndexNum .get (),
300+ changedNum .get ());
262301 }
263302}
0 commit comments