Skip to content

Commit 58ca1c6

Browse files
authored
Pipe: Fixed the hard-link lock problem & Some pipe CIs on master (#16006) (#16012)
* Update IoTDBPipePermissionIT.java * refacotr * revert=pom * fix-lock
1 parent 1a3a3ff commit 58ca1c6

2 files changed

Lines changed: 35 additions & 33 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -408,18 +408,18 @@ public Pair<Long, Double> getRemainingEventAndTime(
408408

409409
//////////////////////////// singleton ////////////////////////////
410410

411-
private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
411+
private static class PipeDataNodeSinglePipeMetricsHolder {
412412

413413
private static final PipeDataNodeSinglePipeMetrics INSTANCE =
414414
new PipeDataNodeSinglePipeMetrics();
415415

416-
private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
416+
private PipeDataNodeSinglePipeMetricsHolder() {
417417
// Empty constructor
418418
}
419419
}
420420

421421
public static PipeDataNodeSinglePipeMetrics getInstance() {
422-
return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
422+
return PipeDataNodeSinglePipeMetricsHolder.INSTANCE;
423423
}
424424

425425
private PipeDataNodeSinglePipeMetrics() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -92,30 +92,27 @@ private File increaseFileReference(
9292
throws IOException {
9393
// If the file is already a hardlink or copied file,
9494
// just increase reference count and return it
95-
segmentLock.lock(file);
96-
try {
97-
if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
98-
return file;
99-
}
100-
} finally {
101-
segmentLock.unlock(file);
95+
if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
96+
return file;
10297
}
10398

10499
// If the file is not a hardlink or copied file, check if there is a related hardlink or
105100
// copied file in pipe dir. if so, increase reference count and return it
106101
final File hardlinkOrCopiedFile =
107102
Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, pipeName) : file;
108-
segmentLock.lock(hardlinkOrCopiedFile);
109-
try {
110-
if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
111-
return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
112-
}
113103

114-
// If the file is a tsfile, create a hardlink in pipe dir and will return it.
115-
// otherwise, copy the file (.mod or .resource) to pipe dir and will return it.
116-
final File source = Objects.isNull(sourceFile) ? file : sourceFile;
104+
if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
105+
return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
106+
}
107+
108+
// If the file is a tsfile, create a hardlink in pipe dir and will return it.
109+
// otherwise, copy the file (.mod or .resource) to pipe dir and will return it.
110+
final File source = Objects.isNull(sourceFile) ? file : sourceFile;
111+
final File resultFile;
117112

118-
final File resultFile =
113+
segmentLock.lock(hardlinkOrCopiedFile);
114+
try {
115+
resultFile =
119116
isTsFile
120117
? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
121118
: FileUtils.copyFile(source, hardlinkOrCopiedFile);
@@ -131,25 +128,29 @@ private File increaseFileReference(
131128
hardlinkOrCopiedFileToTsFilePublicResourceMap.put(
132129
resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
133130
}
134-
135-
increasePublicReference(resultFile, pipeName, isTsFile);
136-
137-
return resultFile;
138131
} finally {
139132
segmentLock.unlock(hardlinkOrCopiedFile);
140133
}
134+
increasePublicReference(resultFile, pipeName, isTsFile);
135+
return resultFile;
141136
}
142137

143138
private boolean increaseReferenceIfExists(
144139
final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException {
145-
final String path = file.getPath();
146-
final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
147-
if (resource != null) {
148-
resource.increaseReferenceCount();
149-
increasePublicReference(file, pipeName, isTsFile);
150-
return true;
140+
segmentLock.lock(file);
141+
try {
142+
final String path = file.getPath();
143+
final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
144+
if (resource != null) {
145+
resource.increaseReferenceCount();
146+
} else {
147+
return false;
148+
}
149+
} finally {
150+
segmentLock.unlock(file);
151151
}
152-
return false;
152+
increasePublicReference(file, pipeName, isTsFile);
153+
return true;
153154
}
154155

155156
private void increasePublicReference(
@@ -222,12 +223,13 @@ public void decreaseFileReference(
222223
if (resource != null && resource.decreaseReferenceCount()) {
223224
getResourceMap(pipeName).remove(filePath);
224225
}
225-
// Decrease the assigner's file to clear hard-link and memory cache
226-
// Note that it does not exist for historical files
227-
decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
228226
} finally {
229227
segmentLock.unlock(hardlinkOrCopiedFile);
230228
}
229+
230+
// Decrease the assigner's file to clear hard-link and memory cache
231+
// Note that it does not exist for historical files
232+
decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
231233
}
232234

233235
private void decreasePublicReferenceIfExists(final File file, final @Nullable String pipeName) {

0 commit comments

Comments
 (0)