Skip to content

Commit 736cdf7

Browse files
Pipe: Add a tool for validating and repairing isGeneratedByPipe mark in tsfile resources (#15934) (#15945)
1 parent 393aba2 commit 736cdf7

3 files changed

Lines changed: 373 additions & 0 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
@REM
2+
@REM Licensed to the Apache Software Foundation (ASF) under one
3+
@REM or more contributor license agreements. See the NOTICE file
4+
@REM distributed with this work for additional information
5+
@REM regarding copyright ownership. The ASF licenses this file
6+
@REM to you under the Apache License, Version 2.0 (the
7+
@REM "License"); you may not use this file except in compliance
8+
@REM with the License. You may obtain a copy of the License at
9+
@REM
10+
@REM http://www.apache.org/licenses/LICENSE-2.0
11+
@REM
12+
@REM Unless required by applicable law or agreed to in writing,
13+
@REM software distributed under the License is distributed on an
14+
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
@REM KIND, either express or implied. See the License for the
16+
@REM specific language governing permissions and limitations
17+
@REM under the License.
18+
@REM
19+
20+
@echo off
21+
echo ````````````````````````````````````````````````````````````````````````
22+
echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources
23+
echo ````````````````````````````````````````````````````````````````````````
24+
25+
if "%OS%" == "Windows_NT" setlocal
26+
27+
pushd %~dp0..\..
28+
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
29+
popd
30+
31+
if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
32+
if NOT DEFINED JAVA_HOME goto :err
33+
34+
@REM -----------------------------------------------------------------------------
35+
@REM ***** CLASSPATH library setting *****
36+
@REM Ensure that any user defined CLASSPATH variables are not used on startup
37+
set CLASSPATH="%IOTDB_HOME%\lib\*"
38+
39+
goto okClasspath
40+
41+
:append
42+
set CLASSPATH=%CLASSPATH%;%1
43+
goto :eof
44+
45+
@REM -----------------------------------------------------------------------------
46+
:okClasspath
47+
48+
"%JAVA_HOME%\bin\java" -cp "%CLASSPATH%" %MAIN_CLASS% %*
49+
50+
goto finally
51+
52+
:err
53+
echo JAVA_HOME environment variable must be set!
54+
pause
55+
56+
@REM -----------------------------------------------------------------------------
57+
:finally
58+
59+
ENDLOCAL
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
echo ------------------------------------------------------------------------------------
22+
echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources
23+
echo ------------------------------------------------------------------------------------
24+
25+
source "$(dirname "$0")/../../sbin/iotdb-common.sh"
26+
#get_iotdb_include and checkAllVariables is in iotdb-common.sh
27+
VARS=$(get_iotdb_include "$*")
28+
checkAllVariables
29+
export IOTDB_HOME="${IOTDB_HOME}/.."
30+
eval set -- "$VARS"
31+
32+
if [ -n "$JAVA_HOME" ]; then
33+
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
34+
if [ -x "$java" ]; then
35+
JAVA="$java"
36+
break
37+
fi
38+
done
39+
else
40+
JAVA=java
41+
fi
42+
43+
CLASSPATH=""
44+
for f in ${IOTDB_HOME}/lib/*.jar; do
45+
CLASSPATH=${CLASSPATH}":"$f
46+
done
47+
48+
MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
49+
50+
"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
51+
exit $?
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.tools.validate;
21+
22+
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
23+
24+
import org.apache.tsfile.common.constant.TsFileConstant;
25+
import org.slf4j.Logger;
26+
27+
import java.io.File;
28+
import java.io.IOException;
29+
import java.util.ArrayList;
30+
import java.util.Arrays;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.Objects;
34+
import java.util.Set;
35+
import java.util.concurrent.ConcurrentSkipListSet;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.atomic.AtomicInteger;
38+
import java.util.concurrent.atomic.AtomicLong;
39+
40+
public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
41+
42+
private static final Logger LOGGER =
43+
org.slf4j.LoggerFactory.getLogger(
44+
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.class);
45+
46+
private static final String USAGE =
47+
"Usage: --expected true|false --dirs <dir1> <dir2> ...\n"
48+
+ " --expected: whether the TsFileResource is expected to be generated by pipe\n"
49+
+ " --dirs: list of data directories to validate and repair";
50+
51+
private static final Set<File> dataDirs = new ConcurrentSkipListSet<>();
52+
private static final AtomicBoolean expectedMark = new AtomicBoolean(true);
53+
54+
private static final AtomicLong runtime = new AtomicLong(System.currentTimeMillis());
55+
56+
private static final AtomicInteger totalTsFileNum = new AtomicInteger(0);
57+
private static final AtomicInteger toRepairTsFileNum = new AtomicInteger(0);
58+
59+
// Usage: --expected true|false --dirs <dir1> <dir2> ...
60+
public static void main(String[] args) throws IOException {
61+
parseCommandLineArgs(args);
62+
final List<File> partitionDirs = findAllPartitionDirs();
63+
partitionDirs.parallelStream()
64+
.forEach(
65+
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
66+
::validateAndRepairTsFileResourcesInPartition);
67+
printStatistics();
68+
}
69+
70+
private static void parseCommandLineArgs(final String[] args) {
71+
final Set<String> argSet =
72+
new ConcurrentSkipListSet<>(
73+
args.length > 0 ? Arrays.asList(args) : Collections.emptyList());
74+
if (args.length == 0
75+
|| argSet.contains("--help")
76+
|| argSet.contains("-h")
77+
|| !(argSet.contains("--expected") && argSet.contains("--dirs"))) {
78+
LOGGER.info(USAGE);
79+
System.exit(1);
80+
}
81+
82+
for (int i = 0; i < args.length; i++) {
83+
if ("--expected".equals(args[i]) && i + 1 < args.length) {
84+
expectedMark.set(Boolean.parseBoolean(args[++i]));
85+
} else if ("--dirs".equals(args[i]) && i + 1 < args.length) {
86+
i++;
87+
while (i < args.length && !args[i].startsWith("--")) {
88+
dataDirs.add(new File(args[i++]));
89+
}
90+
i--;
91+
} else {
92+
LOGGER.info("Unknown argument: {}", args[i]);
93+
LOGGER.info(USAGE);
94+
// Exit if an unknown argument is encountered
95+
System.exit(1);
96+
}
97+
}
98+
99+
if (dataDirs.isEmpty()) {
100+
LOGGER.info("No data directories provided. Please specify with --dirs <dir1> <dir2> ...");
101+
System.exit(1);
102+
}
103+
104+
LOGGER.info("------------------------------------------------------");
105+
LOGGER.info("Expected mark: {}", expectedMark.get());
106+
LOGGER.info("Data directories: ");
107+
for (File dir : dataDirs) {
108+
LOGGER.info(" {}", dir.getAbsolutePath());
109+
}
110+
LOGGER.info("------------------------------------------------------");
111+
}
112+
113+
private static List<File> findAllPartitionDirs() {
114+
final List<File> partitionDirs = new ArrayList<>();
115+
for (final File dataDir : dataDirs) {
116+
if (dataDir.exists() && dataDir.isDirectory()) {
117+
partitionDirs.addAll(findLeafDirectories(dataDir));
118+
}
119+
}
120+
return partitionDirs;
121+
}
122+
123+
public static List<File> findLeafDirectories(File dir) {
124+
List<File> leafDirectories = new ArrayList<>();
125+
126+
File[] files = dir.listFiles();
127+
128+
if (files == null || files.length == 0) {
129+
leafDirectories.add(dir);
130+
return leafDirectories;
131+
}
132+
133+
for (File file : files) {
134+
if (file.isDirectory()) {
135+
leafDirectories.addAll(findLeafDirectories(file));
136+
}
137+
}
138+
139+
if (leafDirectories.isEmpty()) {
140+
leafDirectories.add(dir);
141+
}
142+
143+
return leafDirectories;
144+
}
145+
146+
private static void validateAndRepairTsFileResourcesInPartition(final File partitionDir) {
147+
final AtomicInteger totalResources = new AtomicInteger();
148+
final AtomicInteger toRepairResources = new AtomicInteger();
149+
150+
try {
151+
final List<TsFileResource> resources =
152+
loadAllTsFileResources(Collections.singletonList(partitionDir));
153+
totalResources.addAndGet(resources.size());
154+
155+
for (final TsFileResource resource : resources) {
156+
try {
157+
if (validateAndRepairSingleTsFileResource(resource)) {
158+
toRepairResources.incrementAndGet();
159+
}
160+
} catch (final Exception e) {
161+
// Continue processing other resources even if one fails
162+
LOGGER.warn(
163+
"Error validating or repairing resource {}: {}",
164+
resource.getTsFile().getAbsolutePath(),
165+
e.getMessage(),
166+
e);
167+
}
168+
}
169+
} catch (final Exception e) {
170+
LOGGER.warn(
171+
"Error loading resources from partition {}: {}",
172+
partitionDir.getAbsolutePath(),
173+
e.getMessage(),
174+
e);
175+
}
176+
177+
totalTsFileNum.addAndGet(totalResources.get());
178+
toRepairTsFileNum.addAndGet(toRepairResources.get());
179+
LOGGER.info(
180+
"TimePartition {} has {} total resources, {} to repair resources. Process completed.",
181+
partitionDir,
182+
totalResources.get(),
183+
toRepairResources.get());
184+
}
185+
186+
private static List<TsFileResource> loadAllTsFileResources(List<File> timePartitionDirs)
187+
throws IOException {
188+
final List<TsFileResource> resources = new ArrayList<>();
189+
190+
for (final File timePartitionDir : timePartitionDirs) {
191+
for (final File tsfile : Objects.requireNonNull(timePartitionDir.listFiles())) {
192+
final String filePath = tsfile.getAbsolutePath();
193+
if (!filePath.endsWith(TsFileConstant.TSFILE_SUFFIX) || !tsfile.isFile()) {
194+
continue;
195+
}
196+
String resourcePath = tsfile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX;
197+
198+
if (!new File(resourcePath).exists()) {
199+
LOGGER.info(
200+
"{} is skipped because resource file is not exist.", tsfile.getAbsolutePath());
201+
continue;
202+
}
203+
204+
TsFileResource resource = new TsFileResource(tsfile);
205+
resource.deserialize();
206+
resource.close();
207+
resources.add(resource);
208+
}
209+
}
210+
211+
return resources;
212+
}
213+
214+
/**
215+
* Validates and repairs a single TsFileResource.
216+
*
217+
* @param resource the TsFileResource to validate and repair
218+
* @return true if the resource needs to be repaired and false if it is valid
219+
*/
220+
private static boolean validateAndRepairSingleTsFileResource(TsFileResource resource) {
221+
if (resource.isGeneratedByPipe() == expectedMark.get()) {
222+
// The resource is valid, no need to repair
223+
return false;
224+
}
225+
226+
LOGGER.info(
227+
"Repairing TsFileResource: {}, expected mark: {}, actual mark: {}",
228+
resource.getTsFile().getAbsolutePath(),
229+
expectedMark.get(),
230+
resource.isGeneratedByPipe());
231+
232+
try {
233+
repairSingleTsFileResource(resource);
234+
235+
LOGGER.info(
236+
"Marked TsFileResource as {} in resource: {}",
237+
expectedMark.get(),
238+
resource.getTsFile().getAbsolutePath());
239+
} catch (final Exception e) {
240+
LOGGER.warn(
241+
"ERROR: Failed to repair TsFileResource: {}, error: {}",
242+
resource.getTsFile().getAbsolutePath(),
243+
e.getMessage());
244+
}
245+
246+
return true;
247+
}
248+
249+
private static void repairSingleTsFileResource(TsFileResource resource) throws IOException {
250+
resource.setGeneratedByPipe(expectedMark.get());
251+
resource.serialize();
252+
}
253+
254+
private static void printStatistics() {
255+
LOGGER.info("------------------------------------------------------");
256+
LOGGER.info("Validation and repair completed. Statistics:");
257+
LOGGER.info(
258+
"Total time taken: {} ms, total TsFile resources: {}, repaired TsFile resources: {}",
259+
System.currentTimeMillis() - runtime.get(),
260+
totalTsFileNum.get(),
261+
toRepairTsFileNum.get());
262+
}
263+
}

0 commit comments

Comments
 (0)