@@ -56,6 +56,11 @@ public class PipeTsFileResourceManager {
5656 hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap <>();
5757 private final PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock ();
5858
59+ public File increaseFileReference (
60+ final File file , final boolean isTsFile , final @ Nullable String pipeName ) throws IOException {
61+ return increaseFileReference (file , isTsFile , pipeName , null );
62+ }
63+
5964 /**
6065 * Given a file, create a hardlink or copy it to pipe dir, maintain a reference count for the
6166 * hardlink or copied file, and return the hardlink or copied file.
@@ -72,16 +77,24 @@ public class PipeTsFileResourceManager {
7277 * @param file tsfile, resource file or mod file. can be original file or hardlink/copy of
7378 * original file
7479 * @param isTsFile {@code true} to create hardlink, {@code false} to copy file
80+ * @param pipeName Nonnull if the pipe is from historical or assigner -> extractors, null if is
81+ * dataRegion -> assigner
82+ * @param sourceFile for inner use, historical extractor will use this to create hardlink from
83+ * pipe tsFile -> common tsFile
7584 * @return the hardlink or copied file
7685 * @throws IOException when create hardlink or copy file failed
7786 */
78- public File increaseFileReference (
79- final File file , final boolean isTsFile , final @ Nullable String pipeName ) throws IOException {
87+ private File increaseFileReference (
88+ final File file ,
89+ final boolean isTsFile ,
90+ final @ Nullable String pipeName ,
91+ final @ Nullable File sourceFile )
92+ throws IOException {
8093 // If the file is already a hardlink or copied file,
8194 // just increase reference count and return it
8295 segmentLock .lock (file );
8396 try {
84- if (increaseReferenceIfExists (file , pipeName )) {
97+ if (increaseReferenceIfExists (file , pipeName , isTsFile )) {
8598 return file ;
8699 }
87100 } finally {
@@ -90,19 +103,22 @@ public File increaseFileReference(
90103
91104 // If the file is not a hardlink or copied file, check if there is a related hardlink or
92105 // copied file in pipe dir. if so, increase reference count and return it
93- final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir (file , pipeName );
106+ final File hardlinkOrCopiedFile =
107+ Objects .isNull (sourceFile ) ? getHardlinkOrCopiedFileInPipeDir (file , pipeName ) : file ;
94108 segmentLock .lock (hardlinkOrCopiedFile );
95109 try {
96- if (increaseReferenceIfExists (hardlinkOrCopiedFile , pipeName )) {
110+ if (increaseReferenceIfExists (hardlinkOrCopiedFile , pipeName , isTsFile )) {
97111 return getResourceMap (pipeName ).get (hardlinkOrCopiedFile .getPath ()).getFile ();
98112 }
99113
100114 // If the file is a tsfile, create a hardlink in pipe dir and will return it.
101115 // otherwise, copy the file (.mod or .resource) to pipe dir and will return it.
116+ final File source = Objects .isNull (sourceFile ) ? file : sourceFile ;
117+
102118 final File resultFile =
103119 isTsFile
104- ? FileUtils .createHardLink (file , hardlinkOrCopiedFile )
105- : FileUtils .copyFile (file , hardlinkOrCopiedFile );
120+ ? FileUtils .createHardLink (source , hardlinkOrCopiedFile )
121+ : FileUtils .copyFile (source , hardlinkOrCopiedFile );
106122
107123 // If the file is not a hardlink or copied file, and there is no related hardlink or copied
108124 // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for
@@ -116,42 +132,34 @@ public File increaseFileReference(
116132 resultFile .getPath (), new PipeTsFilePublicResource (resultFile ));
117133 }
118134
119- increasePublicReference (resultFile , pipeName );
135+ increasePublicReference (resultFile , pipeName , isTsFile );
120136
121137 return resultFile ;
122138 } finally {
123139 segmentLock .unlock (hardlinkOrCopiedFile );
124140 }
125141 }
126142
127- private boolean increaseReferenceIfExists (final File file , final @ Nullable String pipeName ) {
143+ private boolean increaseReferenceIfExists (
144+ final File file , final @ Nullable String pipeName , final boolean isTsFile ) throws IOException {
128145 final String path = file .getPath ();
129146 final PipeTsFileResource resource = getResourceMap (pipeName ).get (path );
130147 if (resource != null ) {
131148 resource .increaseReferenceCount ();
132- increasePublicReference (file , pipeName );
149+ increasePublicReference (file , pipeName , isTsFile );
133150 return true ;
134151 }
135152 return false ;
136153 }
137154
138- private void increasePublicReference (final File file , final String pipeName ) {
155+ private void increasePublicReference (
156+ final File file , final String pipeName , final boolean isTsFile ) throws IOException {
139157 if (Objects .isNull (pipeName )) {
140158 return ;
141159 }
142160 // Increase the assigner's file to avoid hard-link or memory cache cleaning
143161 // Note that it does not exist for historical files
144- final String path = getCommonFilePath (file );
145- hardlinkOrCopiedFileToTsFilePublicResourceMap .compute (
146- path ,
147- (k , v ) -> {
148- if (Objects .isNull (v )) {
149- return new PipeTsFilePublicResource (new File (path ));
150- } else {
151- v .increaseReferenceCount ();
152- return v ;
153- }
154- });
162+ increaseFileReference (new File (getCommonFilePath (file )), isTsFile , null , file );
155163 }
156164
157165 public static File getHardlinkOrCopiedFileInPipeDir (
@@ -228,13 +236,7 @@ private void decreasePublicReferenceIfExists(final File file, final @Nullable St
228236 }
229237 // Increase the assigner's file to avoid hard-link or memory cache cleaning
230238 // Note that it does not exist for historical files
231- final String commonFilePath = getCommonFilePath (file );
232- if (hardlinkOrCopiedFileToTsFilePublicResourceMap .containsKey (commonFilePath )
233- && hardlinkOrCopiedFileToTsFilePublicResourceMap
234- .get (commonFilePath )
235- .decreaseReferenceCount ()) {
236- hardlinkOrCopiedFileToPipeTsFileResourceMap .remove (commonFilePath );
237- }
239+ decreaseFileReference (new File (getCommonFilePath (file )), null );
238240 }
239241
240242 // Warning: Shall not be called by the assigner
0 commit comments