Skip to content

Commit a1f66eb

Browse files
[fix](fe) Keep cached file systems alive while in use (#63677)
### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Hive file listing and Hive ACID state loading borrow `FileSystem` instances from `FileSystemCache` and keep using them while checking splitability, listing remote files, or loading ACID state. The previous cache implementation returned the raw cached `FileSystem`. When a cache entry was evicted or expired, the Caffeine removal listener closed that same instance immediately. If cache cleanup happened while another thread was still using the returned instance, the active Hive operation could observe a closed filesystem. This PR fixes the lifecycle in `FileSystemCache` instead of bypassing the cache at Hive call sites. Cached filesystems are now returned through leases backed by a holder with an active reference count. Cache eviction marks the holder as evicted, and the underlying filesystem is closed only after the last active lease is released. If the filesystem cache is disabled, the direct lease owns the newly created filesystem and closes it when released. Hive file listing and ACID paths now use try-with-resources to hold the lease for the whole filesystem usage window. ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [x] Unit Test - `./run-fe-ut.sh --run org.apache.doris.fs.FileSystemCacheTest,org.apache.doris.datasource.hive.HiveMetaStoreCacheTest` - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent 4a0c58b commit a1f66eb

3 files changed

Lines changed: 308 additions & 30 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -394,13 +394,13 @@ private FileCacheValue getFileCache(HMSExternalCatalog catalog,
394394

395395
FileSystemCache.FileSystemCacheKey fileSystemCacheKey = new FileSystemCache.FileSystemCacheKey(
396396
path.getFsIdentifier(), path.getStorageProperties());
397-
FileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
398-
.getFileSystem(fileSystemCacheKey);
399-
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, path.getNormalizedLocation()));
400397

401398
boolean isRecursiveDirectories = Boolean.valueOf(
402399
catalog.getProperties().getOrDefault("hive.recursive_directories", "true"));
403-
try {
400+
try (FileSystemCache.FileSystemLease fileSystemLease = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
401+
.getFileSystem(fileSystemCacheKey)) {
402+
FileSystem fs = fileSystemLease.fileSystem();
403+
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, path.getNormalizedLocation()));
404404
RemoteIterator<FileEntry> iterator = directoryLister.listFiles(fs, isRecursiveDirectories,
405405
table, path.getNormalizedLocation());
406406
boolean isLzoInputFormat = HiveUtil.isLzoInputFormat(inputFormat);
@@ -824,22 +824,24 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
824824
HMSExternalCatalog catalog = hmsCatalog(partition.getNameMapping().getCtlId());
825825
LocationPath locationPath = LocationPath.of(partition.getPath(),
826826
catalog.getCatalogProperty().getStoragePropertiesMap());
827-
FileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
828-
.getFileSystem(new FileSystemCache.FileSystemCacheKey(
829-
locationPath.getNormalizedLocation(),
830-
locationPath.getStorageProperties()));
827+
FileSystemCache.FileSystemCacheKey fileSystemCacheKey = new FileSystemCache.FileSystemCacheKey(
828+
locationPath.getNormalizedLocation(), locationPath.getStorageProperties());
831829
AuthenticationConfig authenticationConfig = AuthenticationConfig
832830
.getKerberosConfig(locationPath.getStorageProperties().getBackendConfigProperties());
833831
HadoopAuthenticator hadoopAuthenticator =
834832
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
835833

836-
fileCacheValues.add(
837-
hadoopAuthenticator.doAs(() -> AcidUtil.getAcidState(
838-
fileSystem,
839-
partition,
840-
txnValidIds,
841-
catalog.getCatalogProperty().getStoragePropertiesMap(),
842-
isFullAcid)));
834+
try (FileSystemCache.FileSystemLease fileSystemLease =
835+
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getFileSystem(fileSystemCacheKey)) {
836+
FileSystem fileSystem = fileSystemLease.fileSystem();
837+
fileCacheValues.add(
838+
hadoopAuthenticator.doAs(() -> AcidUtil.getAcidState(
839+
fileSystem,
840+
partition,
841+
txnValidIds,
842+
catalog.getCatalogProperty().getStoragePropertiesMap(),
843+
isFullAcid)));
844+
}
843845
}
844846
} catch (Exception e) {
845847
throw new CacheException("Failed to get input splits %s", e, txnValidIds.toString());

fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java

Lines changed: 152 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,48 +23,185 @@
2323
import org.apache.doris.filesystem.FileSystem;
2424

2525
import com.github.benmanes.caffeine.cache.LoadingCache;
26+
import com.google.common.annotations.VisibleForTesting;
27+
import com.google.common.base.Preconditions;
2628
import org.apache.logging.log4j.LogManager;
2729
import org.apache.logging.log4j.Logger;
2830

2931
import java.io.IOException;
3032
import java.util.Objects;
3133
import java.util.OptionalLong;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.function.Function;
3236

3337
public class FileSystemCache {
3438

3539
private static final Logger LOG = LogManager.getLogger(FileSystemCache.class);
3640

37-
private final LoadingCache<FileSystemCacheKey, FileSystem> fileSystemCache;
41+
private final LoadingCache<FileSystemCacheKey, FileSystemHolder> fileSystemCache;
42+
private final Function<FileSystemCacheKey, FileSystem> loader;
3843

3944
public FileSystemCache() {
45+
this(
46+
Config.max_remote_file_system_cache_num,
47+
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
48+
FileSystemCache::loadFileSystem);
49+
}
50+
51+
@VisibleForTesting
52+
FileSystemCache(long maxSize, OptionalLong expireAfterAccessSec, Function<FileSystemCacheKey, FileSystem> loader) {
53+
this.loader = Objects.requireNonNull(loader, "loader");
54+
if (maxSize == 0) {
55+
fileSystemCache = null;
56+
return;
57+
}
4058
// no need to set refreshAfterWrite, because the FileSystem is created once and never changed
4159
CacheFactory fsCacheFactory = new CacheFactory(
42-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
60+
expireAfterAccessSec,
4361
OptionalLong.empty(),
44-
Config.max_remote_file_system_cache_num,
62+
maxSize,
4563
false,
4664
null);
47-
fileSystemCache = fsCacheFactory.buildCacheWithSyncRemovalListener(this::loadFileSystem, (key, fs, cause) -> {
48-
if (fs != null) {
49-
try {
50-
fs.close();
51-
} catch (IOException e) {
52-
LOG.warn("Failed to close evicted FileSystem for key: {}", key, e);
53-
}
54-
}
55-
});
65+
fileSystemCache = fsCacheFactory.buildCacheWithSyncRemovalListener(
66+
key -> new FileSystemHolder(key, loader.apply(key)), (key, holder, cause) -> {
67+
if (holder != null) {
68+
holder.markEvicted();
69+
}
70+
});
5671
}
5772

58-
private FileSystem loadFileSystem(FileSystemCacheKey key) {
73+
private static FileSystem loadFileSystem(FileSystemCacheKey key) {
5974
try {
6075
return FileSystemFactory.getFileSystem(key.properties);
6176
} catch (IOException e) {
6277
throw new RuntimeException("Failed to create filesystem for key: " + key, e);
6378
}
6479
}
6580

66-
public FileSystem getFileSystem(FileSystemCacheKey key) {
67-
return fileSystemCache.get(key);
81+
public FileSystemLease getFileSystem(FileSystemCacheKey key) {
82+
if (fileSystemCache == null) {
83+
return new DirectFileSystemLease(key, loader.apply(key));
84+
}
85+
while (true) {
86+
FileSystemHolder holder = fileSystemCache.get(key);
87+
FileSystemLease lease = holder.acquire();
88+
if (lease != null) {
89+
return lease;
90+
}
91+
fileSystemCache.asMap().remove(key, holder);
92+
}
93+
}
94+
95+
@VisibleForTesting
96+
void cleanUp() {
97+
if (fileSystemCache != null) {
98+
fileSystemCache.cleanUp();
99+
}
100+
}
101+
102+
private static final class FileSystemHolder {
103+
private final FileSystemCacheKey key;
104+
private final FileSystem fileSystem;
105+
private int referenceCount = 0;
106+
private boolean evicted = false;
107+
private boolean closed = false;
108+
109+
private FileSystemHolder(FileSystemCacheKey key, FileSystem fileSystem) {
110+
this.key = Objects.requireNonNull(key, "key");
111+
this.fileSystem = Objects.requireNonNull(fileSystem, "fileSystem");
112+
}
113+
114+
private synchronized FileSystemLease acquire() {
115+
if (evicted || closed) {
116+
return null;
117+
}
118+
referenceCount++;
119+
return new CachedFileSystemLease(this);
120+
}
121+
122+
private synchronized void release() {
123+
Preconditions.checkState(referenceCount > 0, "FileSystem lease has been released more than once");
124+
referenceCount--;
125+
closeIfIdle();
126+
}
127+
128+
private synchronized void markEvicted() {
129+
evicted = true;
130+
closeIfIdle();
131+
}
132+
133+
private void closeIfIdle() {
134+
if (!evicted || referenceCount != 0 || closed) {
135+
return;
136+
}
137+
closed = true;
138+
try {
139+
fileSystem.close();
140+
} catch (IOException e) {
141+
LOG.warn("Failed to close evicted FileSystem for key: {}", key, e);
142+
}
143+
}
144+
145+
private FileSystem fileSystem() {
146+
return fileSystem;
147+
}
148+
}
149+
150+
public interface FileSystemLease extends AutoCloseable {
151+
FileSystem fileSystem();
152+
153+
@Override
154+
void close();
155+
}
156+
157+
private static final class CachedFileSystemLease implements FileSystemLease {
158+
private final FileSystemHolder holder;
159+
private final AtomicBoolean closed = new AtomicBoolean(false);
160+
161+
private CachedFileSystemLease(FileSystemHolder holder) {
162+
this.holder = holder;
163+
}
164+
165+
@Override
166+
public FileSystem fileSystem() {
167+
return holder.fileSystem();
168+
}
169+
170+
@Override
171+
public void close() {
172+
if (!closed.compareAndSet(false, true)) {
173+
return;
174+
}
175+
holder.release();
176+
}
177+
}
178+
179+
private static final class DirectFileSystemLease implements FileSystemLease {
180+
private final FileSystemCacheKey key;
181+
private final FileSystem fileSystem;
182+
private final AtomicBoolean closed = new AtomicBoolean(false);
183+
184+
private DirectFileSystemLease(FileSystemCacheKey key, FileSystem fileSystem) {
185+
this.key = key;
186+
this.fileSystem = fileSystem;
187+
}
188+
189+
@Override
190+
public FileSystem fileSystem() {
191+
return fileSystem;
192+
}
193+
194+
@Override
195+
public void close() {
196+
if (!closed.compareAndSet(false, true)) {
197+
return;
198+
}
199+
try {
200+
fileSystem.close();
201+
} catch (IOException e) {
202+
LOG.warn("Failed to close uncached FileSystem for key: {}", key, e);
203+
}
204+
}
68205
}
69206

70207
public static class FileSystemCacheKey {
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.fs;
19+
20+
import org.apache.doris.datasource.property.storage.StorageProperties;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
import java.util.Collections;
26+
import java.util.OptionalLong;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
public class FileSystemCacheTest {
31+
32+
@Test
33+
public void testEvictedFileSystemClosesAfterLastLeaseIsReleased() {
34+
CountingFileSystem first = new CountingFileSystem();
35+
CountingFileSystem second = new CountingFileSystem();
36+
AtomicInteger loadCount = new AtomicInteger();
37+
FileSystemCache cache = new FileSystemCache(1L, OptionalLong.empty(),
38+
key -> loadCount.getAndIncrement() == 0 ? first : second);
39+
40+
FileSystemCache.FileSystemCacheKey firstKey = key("hdfs://ns1");
41+
FileSystemCache.FileSystemLease firstLease = cache.getFileSystem(firstKey);
42+
Assert.assertSame(first, firstLease.fileSystem());
43+
44+
FileSystemCache.FileSystemLease secondLease = cache.getFileSystem(key("hdfs://ns2"));
45+
cache.cleanUp();
46+
47+
Assert.assertEquals(0, first.getCloseCount());
48+
Assert.assertEquals(0, second.getCloseCount());
49+
50+
firstLease.close();
51+
Assert.assertEquals(1, first.getCloseCount());
52+
Assert.assertEquals(0, second.getCloseCount());
53+
54+
secondLease.close();
55+
Assert.assertEquals(0, second.getCloseCount());
56+
}
57+
58+
@Test
59+
public void testUncachedFileSystemClosesWhenLeaseIsReleased() {
60+
CountingFileSystem fileSystem = new CountingFileSystem();
61+
FileSystemCache cache = new FileSystemCache(0L, OptionalLong.empty(), key -> fileSystem);
62+
63+
FileSystemCache.FileSystemLease lease = cache.getFileSystem(key("hdfs://ns1"));
64+
Assert.assertSame(fileSystem, lease.fileSystem());
65+
Assert.assertEquals(0, fileSystem.getCloseCount());
66+
67+
lease.close();
68+
Assert.assertEquals(1, fileSystem.getCloseCount());
69+
}
70+
71+
@Test
72+
public void testLeaseCloseIsConcurrentIdempotent() throws InterruptedException {
73+
CountingFileSystem fileSystem = new CountingFileSystem();
74+
FileSystemCache cache = new FileSystemCache(1L, OptionalLong.empty(), key -> fileSystem);
75+
76+
FileSystemCache.FileSystemLease lease = cache.getFileSystem(key("hdfs://ns1"));
77+
FileSystemCache.FileSystemLease evictingLease = cache.getFileSystem(key("hdfs://ns2"));
78+
cache.cleanUp();
79+
80+
closeConcurrently(lease);
81+
Assert.assertEquals(1, fileSystem.getCloseCount());
82+
83+
evictingLease.close();
84+
}
85+
86+
@Test
87+
public void testUncachedLeaseCloseIsConcurrentIdempotent() throws InterruptedException {
88+
CountingFileSystem fileSystem = new CountingFileSystem();
89+
FileSystemCache cache = new FileSystemCache(0L, OptionalLong.empty(), key -> fileSystem);
90+
91+
closeConcurrently(cache.getFileSystem(key("hdfs://ns1")));
92+
93+
Assert.assertEquals(1, fileSystem.getCloseCount());
94+
}
95+
96+
private static void closeConcurrently(FileSystemCache.FileSystemLease lease) throws InterruptedException {
97+
CountDownLatch ready = new CountDownLatch(2);
98+
CountDownLatch start = new CountDownLatch(1);
99+
Thread first = closeThread(lease, ready, start);
100+
Thread second = closeThread(lease, ready, start);
101+
first.start();
102+
second.start();
103+
ready.await();
104+
start.countDown();
105+
first.join();
106+
second.join();
107+
}
108+
109+
private static Thread closeThread(FileSystemCache.FileSystemLease lease,
110+
CountDownLatch ready, CountDownLatch start) {
111+
return new Thread(() -> {
112+
ready.countDown();
113+
try {
114+
start.await();
115+
} catch (InterruptedException e) {
116+
throw new RuntimeException(e);
117+
}
118+
lease.close();
119+
});
120+
}
121+
122+
private static FileSystemCache.FileSystemCacheKey key(String fsIdent) {
123+
return new FileSystemCache.FileSystemCacheKey(fsIdent, StorageProperties.createPrimary(
124+
Collections.singletonMap(StorageProperties.FS_HDFS_SUPPORT, "true")));
125+
}
126+
127+
private static class CountingFileSystem extends MemoryFileSystem {
128+
private final AtomicInteger closeCount = new AtomicInteger();
129+
130+
@Override
131+
public void close() {
132+
closeCount.incrementAndGet();
133+
}
134+
135+
int getCloseCount() {
136+
return closeCount.get();
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)