Skip to content

Commit 376162a

Browse files
authored
Pipe: Fixed the hard-link lock problem & Some pipe CIs on master (#16006)
* Update IoTDBPipePermissionIT.java * refacotr * revert=pom * fix-lock
1 parent eab1b82 commit 376162a

3 files changed

Lines changed: 40 additions & 34 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,11 @@ public void testSourcePermission() {
207207
return;
208208
}
209209

210-
TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test");
210+
try {
211+
TableModelUtils.createDataBaseAndTable(receiverEnv, "test", "test");
212+
} catch (final Exception ignore) {
213+
// Ignore because the db/table may be transferred because sender user may see these
214+
}
211215

212216
// Exception, block here
213217
TableModelUtils.assertCountDataAlwaysOnEnv("test", "test", 0, receiverEnv);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -404,18 +404,18 @@ public Pair<Long, Double> getRemainingEventAndTime(
404404

405405
//////////////////////////// singleton ////////////////////////////
406406

407-
private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
407+
private static class PipeDataNodeSinglePipeMetricsHolder {
408408

409409
private static final PipeDataNodeSinglePipeMetrics INSTANCE =
410410
new PipeDataNodeSinglePipeMetrics();
411411

412-
private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
412+
private PipeDataNodeSinglePipeMetricsHolder() {
413413
// Empty constructor
414414
}
415415
}
416416

417417
public static PipeDataNodeSinglePipeMetrics getInstance() {
418-
return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
418+
return PipeDataNodeSinglePipeMetricsHolder.INSTANCE;
419419
}
420420

421421
private PipeDataNodeSinglePipeMetrics() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -91,30 +91,27 @@ private File increaseFileReference(
9191
throws IOException {
9292
// If the file is already a hardlink or copied file,
9393
// just increase reference count and return it
94-
segmentLock.lock(file);
95-
try {
96-
if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
97-
return file;
98-
}
99-
} finally {
100-
segmentLock.unlock(file);
94+
if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
95+
return file;
10196
}
10297

10398
// If the file is not a hardlink or copied file, check if there is a related hardlink or
10499
// copied file in pipe dir. if so, increase reference count and return it
105100
final File hardlinkOrCopiedFile =
106101
Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, pipeName) : file;
107-
segmentLock.lock(hardlinkOrCopiedFile);
108-
try {
109-
if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
110-
return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
111-
}
112102

113-
// If the file is a tsfile, create a hardlink in pipe dir and will return it.
114-
// otherwise, copy the file (.mod or .resource) to pipe dir and will return it.
115-
final File source = Objects.isNull(sourceFile) ? file : sourceFile;
103+
if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
104+
return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
105+
}
106+
107+
// If the file is a tsfile, create a hardlink in pipe dir and will return it.
108+
// otherwise, copy the file (.mod or .resource) to pipe dir and will return it.
109+
final File source = Objects.isNull(sourceFile) ? file : sourceFile;
110+
final File resultFile;
116111

117-
final File resultFile =
112+
segmentLock.lock(hardlinkOrCopiedFile);
113+
try {
114+
resultFile =
118115
isTsFile
119116
? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
120117
: FileUtils.copyFile(source, hardlinkOrCopiedFile);
@@ -130,25 +127,29 @@ private File increaseFileReference(
130127
hardlinkOrCopiedFileToTsFilePublicResourceMap.put(
131128
resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
132129
}
133-
134-
increasePublicReference(resultFile, pipeName, isTsFile);
135-
136-
return resultFile;
137130
} finally {
138131
segmentLock.unlock(hardlinkOrCopiedFile);
139132
}
133+
increasePublicReference(resultFile, pipeName, isTsFile);
134+
return resultFile;
140135
}
141136

142137
private boolean increaseReferenceIfExists(
143138
final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException {
144-
final String path = file.getPath();
145-
final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
146-
if (resource != null) {
147-
resource.increaseReferenceCount();
148-
increasePublicReference(file, pipeName, isTsFile);
149-
return true;
139+
segmentLock.lock(file);
140+
try {
141+
final String path = file.getPath();
142+
final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
143+
if (resource != null) {
144+
resource.increaseReferenceCount();
145+
} else {
146+
return false;
147+
}
148+
} finally {
149+
segmentLock.unlock(file);
150150
}
151-
return false;
151+
increasePublicReference(file, pipeName, isTsFile);
152+
return true;
152153
}
153154

154155
private void increasePublicReference(
@@ -221,12 +222,13 @@ public void decreaseFileReference(
221222
if (resource != null && resource.decreaseReferenceCount()) {
222223
getResourceMap(pipeName).remove(filePath);
223224
}
224-
// Decrease the assigner's file to clear hard-link and memory cache
225-
// Note that it does not exist for historical files
226-
decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
227225
} finally {
228226
segmentLock.unlock(hardlinkOrCopiedFile);
229227
}
228+
229+
// Decrease the assigner's file to clear hard-link and memory cache
230+
// Note that it does not exist for historical files
231+
decreasePublicReferenceIfExists(hardlinkOrCopiedFile, pipeName);
230232
}
231233

232234
private void decreasePublicReferenceIfExists(final File file, final @Nullable String pipeName) {

0 commit comments

Comments
 (0)