Skip to content

Commit ccd33b1

Browse files
authored
[core] Make HadoopFileIO determine isObjectStore based on the path (#6393)
1 parent 9b6d43d commit ccd33b1

8 files changed

Lines changed: 64 additions & 32 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.fs.RemoteIterator;
2828
import org.apache.paimon.fs.SeekableInputStream;
2929
import org.apache.paimon.hadoop.SerializableConfiguration;
30+
import org.apache.paimon.utils.FileIOUtils;
3031
import org.apache.paimon.utils.FunctionWithException;
3132
import org.apache.paimon.utils.Pair;
3233
import org.apache.paimon.utils.ReflectionUtils;
@@ -43,6 +44,7 @@
4344
import java.lang.reflect.Method;
4445
import java.net.URI;
4546
import java.nio.charset.StandardCharsets;
47+
import java.util.Locale;
4648
import java.util.Map;
4749
import java.util.concurrent.ConcurrentHashMap;
4850
import java.util.concurrent.atomic.AtomicReference;
@@ -58,15 +60,21 @@ public class HadoopFileIO implements FileIO {
5860

5961
protected transient volatile Map<Pair<String, String>, FileSystem> fsMap;
6062

63+
private final Path path;
64+
65+
public HadoopFileIO(Path path) {
66+
this.path = path;
67+
}
68+
6169
@VisibleForTesting
62-
public void setFileSystem(Path path, FileSystem fs) throws IOException {
63-
org.apache.hadoop.fs.Path hadoopPath = path(path);
64-
getFileSystem(hadoopPath, p -> fs);
70+
public void setFileSystem(FileSystem fs) throws IOException {
71+
getFileSystem(path(path), p -> fs);
6572
}
6673

6774
@Override
6875
public boolean isObjectStore() {
69-
return false;
76+
String scheme = path.toUri().getScheme().toLowerCase(Locale.US);
77+
return FileIOUtils.isObjectStore(scheme);
7078
}
7179

7280
@Override

paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ public String getScheme() {
3333

3434
@Override
3535
public HadoopFileIO load(Path path) {
36-
return new HadoopFileIO();
36+
return new HadoopFileIO(path);
3737
}
3838
}

paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ public String getScheme() {
3333

3434
@Override
3535
public HadoopFileIO load(Path path) {
36-
return new HadoopFileIO();
36+
return new HadoopFileIO(path);
3737
}
3838
}

paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,4 +382,27 @@ public static void deleteDirectoryQuietly(File directory) {
382382
} catch (Exception ignored) {
383383
}
384384
}
385+
386+
public static boolean isObjectStore(String scheme) {
387+
if (scheme.startsWith("s3")
388+
|| scheme.startsWith("emr")
389+
|| scheme.startsWith("oss")
390+
|| scheme.startsWith("wasb")
391+
|| scheme.startsWith("abfs")
392+
|| scheme.startsWith("gs")
393+
|| scheme.startsWith("cosn")) {
394+
// the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage
395+
// or Google Cloud Storage
396+
return true;
397+
} else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
398+
// file servers instead of file systems
399+
// they might actually be consistent, but we have no hard guarantees
400+
// currently to rely on that
401+
return true;
402+
} else {
403+
// the remainder should include hdfs, kosmos, ceph, ...
404+
// this also includes federated HDFS (viewfs).
405+
return false;
406+
}
407+
}
385408
}

paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class HadoopLocalFileIOBehaviorTest extends FileIOBehaviorTestBase {
3838
protected FileIO getFileSystem() throws Exception {
3939
org.apache.hadoop.fs.FileSystem fs = new RawLocalFileSystem();
4040
fs.initialize(URI.create("file:///"), new Configuration());
41-
HadoopFileIO fileIO = new HadoopFileIO();
42-
fileIO.setFileSystem(getBasePath(), fs);
41+
HadoopFileIO fileIO = new HadoopFileIO(getBasePath());
42+
fileIO.setFileSystem(fs);
4343
return fileIO;
4444
}
4545

paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
import org.junit.jupiter.api.Test;
2929
import org.junit.jupiter.api.io.TempDir;
3030

31+
import java.io.ByteArrayOutputStream;
3132
import java.io.File;
3233
import java.io.IOException;
34+
import java.io.ObjectOutputStream;
3335

3436
import static org.assertj.core.api.Assertions.assertThat;
3537
import static org.assertj.core.api.Assumptions.assumeThat;
38+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3639

3740
/** Behavior tests for HDFS. */
3841
class HdfsBehaviorTest extends FileIOBehaviorTestBase {
@@ -62,8 +65,8 @@ static void createHDFS(@TempDir File tmp) throws Exception {
6265
org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
6366

6467
basePath = new Path(hdfs.getUri().toString() + "/tests");
65-
fs = new HadoopFileIO();
66-
fs.setFileSystem(basePath, hdfs);
68+
fs = new HadoopFileIO(basePath);
69+
fs.setFileSystem(hdfs);
6770
}
6871

6972
@AfterAll
@@ -102,4 +105,20 @@ public void testAtomicWrite() throws IOException {
102105
public void testAtomicWriteMultipleThreads() throws InterruptedException {
103106
FileIOTest.testOverwriteFileUtf8(new Path(getBasePath(), randomName()), fs);
104107
}
108+
109+
@Test
110+
public void testIsObjectStore() {
111+
assertThat(fs.isObjectStore()).isEqualTo(false);
112+
}
113+
114+
@Test
115+
public void testSerializable() {
116+
assertDoesNotThrow(
117+
() -> {
118+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
119+
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
120+
oos.writeObject(fs);
121+
}
122+
});
123+
}
105124
}

paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.fs.hadoop;
2020

2121
import org.apache.paimon.catalog.CatalogContext;
22+
import org.apache.paimon.fs.Path;
2223
import org.apache.paimon.options.Options;
2324

2425
import org.junit.jupiter.api.Test;
@@ -41,7 +42,7 @@ public void test() throws Exception {
4142
options.set("security.kerberos.login.principal", "test-user");
4243
options.set("security.kerberos.login.keytab", keytabFile.getAbsolutePath());
4344

44-
HadoopFileIO fileIO = new HadoopFileIO();
45+
HadoopFileIO fileIO = new HadoopFileIO(new Path("file:///tmp/test"));
4546
fileIO.configure(CatalogContext.create(options));
4647
assertThat(fileIO.getFileSystem(new org.apache.hadoop.fs.Path("file:///tmp/test")))
4748
.isInstanceOf(HadoopSecuredFileSystem.class);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.fs.Path;
2525
import org.apache.paimon.fs.PositionOutputStream;
2626
import org.apache.paimon.fs.SeekableInputStream;
27+
import org.apache.paimon.utils.FileIOUtils;
2728

2829
import org.apache.flink.core.fs.FSDataInputStream;
2930
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -50,27 +51,7 @@ public boolean isObjectStore() {
5051
try {
5152
FileSystem fs = path.getFileSystem();
5253
String scheme = fs.getUri().getScheme().toLowerCase(Locale.US);
53-
54-
if (scheme.startsWith("s3")
55-
|| scheme.startsWith("emr")
56-
|| scheme.startsWith("oss")
57-
|| scheme.startsWith("wasb")
58-
|| scheme.startsWith("abfs")
59-
|| scheme.startsWith("gs")
60-
|| scheme.startsWith("cosn")) {
61-
// the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage
62-
// or Google Cloud Storage
63-
return true;
64-
} else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
65-
// file servers instead of file systems
66-
// they might actually be consistent, but we have no hard guarantees
67-
// currently to rely on that
68-
return true;
69-
} else {
70-
// the remainder should include hdfs, kosmos, ceph, ...
71-
// this also includes federated HDFS (viewfs).
72-
return false;
73-
}
54+
return FileIOUtils.isObjectStore(scheme);
7455
} catch (IOException e) {
7556
throw new UncheckedIOException(e);
7657
}

0 commit comments

Comments
 (0)