Skip to content

Commit 7c565fe

Browse files
fix
1 parent 18bd320 commit 7c565fe

8 files changed

Lines changed: 58 additions & 129 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ option(ICEBERG_BUILD_TESTS "Build tests" ON)
4545
option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON)
4646
option(ICEBERG_BUILD_REST "Build rest catalog client" ON)
4747
option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF)
48+
option(ICEBERG_S3 "Build with S3 support" ON)
4849
option(ICEBERG_ENABLE_ASAN "Enable Address Sanitizer" OFF)
4950
option(ICEBERG_ENABLE_UBSAN "Enable Undefined Behavior Sanitizer" OFF)
5051

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ function(resolve_arrow_dependency)
102102
# Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*)
103103
set(ARROW_IPC ON)
104104
set(ARROW_FILESYSTEM ON)
105-
set(ARROW_S3 ON)
105+
set(ARROW_S3 ${ICEBERG_S3})
106106
set(ARROW_JSON ON)
107107
set(ARROW_PARQUET ON)
108108
set(ARROW_SIMD_LEVEL "NONE")

src/iceberg/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,15 @@ if(ICEBERG_BUILD_BUNDLE)
244244
OUTPUTS
245245
ICEBERG_BUNDLE_LIBRARIES)
246246

247+
if(ICEBERG_S3)
248+
foreach(target iceberg_bundle_static iceberg_bundle_shared)
249+
if(TARGET ${target})
250+
target_compile_definitions(${target} PUBLIC
251+
"$<BUILD_INTERFACE:ICEBERG_S3_ENABLED=1>")
252+
endif()
253+
endforeach()
254+
endif()
255+
247256
add_subdirectory(arrow)
248257
add_subdirectory(avro)
249258
add_subdirectory(parquet)

src/iceberg/arrow/arrow_fs_file_io.cc

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,24 @@
2525
#include "iceberg/arrow/arrow_file_io.h"
2626
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2727
#include "iceberg/arrow/arrow_status_internal.h"
28+
#include "iceberg/util/macros.h"
2829

2930
namespace iceberg::arrow {
3031

32+
Result<std::string> ArrowFileSystemFileIO::ResolvePath(
33+
const std::string& file_location) {
34+
if (file_location.find("://") != std::string::npos) {
35+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, arrow_fs_->PathFromUri(file_location));
36+
return path;
37+
}
38+
return file_location;
39+
}
40+
3141
/// \brief Read the content of the file at the given location.
3242
Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string& file_location,
3343
std::optional<size_t> length) {
34-
::arrow::fs::FileInfo file_info(file_location);
44+
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
45+
::arrow::fs::FileInfo file_info(path);
3546
if (length.has_value()) {
3647
file_info.set_size(length.value());
3748
}
@@ -47,6 +58,10 @@ Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string& file_loca
4758
ICEBERG_ARROW_ASSIGN_OR_RETURN(
4859
auto read_bytes,
4960
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
61+
if (read_bytes == 0) {
62+
return IOError("Unexpected EOF reading {}: got {} of {} bytes", file_location,
63+
offset, file_size);
64+
}
5065
remain -= read_bytes;
5166
offset += read_bytes;
5267
}
@@ -57,7 +72,8 @@ Result<std::string> ArrowFileSystemFileIO::ReadFile(const std::string& file_loca
5772
/// \brief Write the given content to the file at the given location.
5873
Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
5974
std::string_view content) {
60-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location));
75+
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
76+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path));
6177
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
6278
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
6379
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
@@ -66,7 +82,8 @@ Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
6682

6783
/// \brief Delete a file at the given location.
6884
Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
69-
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location));
85+
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
86+
ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path));
7087
return {};
7188
}
7289

src/iceberg/arrow/arrow_fs_file_io_internal.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
5656
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; }
5757

5858
private:
59+
/// \brief Resolve a file location to a filesystem path.
60+
Result<std::string> ResolvePath(const std::string& file_location);
61+
5962
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
6063
};
6164

src/iceberg/arrow/arrow_s3_file_io.cc

Lines changed: 22 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717
* under the License.
1818
*/
1919

20-
#include <cstdlib>
2120
#include <mutex>
22-
#include <string_view>
21+
#include <stdexcept>
2322

2423
#include <arrow/filesystem/filesystem.h>
25-
#include <arrow/filesystem/localfs.h>
26-
#if __has_include(<arrow/filesystem/s3fs.h>)
24+
#ifdef ICEBERG_S3_ENABLED
2725
#include <arrow/filesystem/s3fs.h>
2826
#define ICEBERG_ARROW_HAS_S3 1
2927
#else
@@ -40,18 +38,13 @@ namespace iceberg::arrow {
4038

4139
namespace {
4240

43-
bool IsS3Uri(std::string_view uri) { return uri.rfind("s3://", 0) == 0; }
44-
4541
Status EnsureS3Initialized() {
4642
#if ICEBERG_ARROW_HAS_S3
4743
static std::once_flag init_flag;
4844
static ::arrow::Status init_status = ::arrow::Status::OK();
4945
std::call_once(init_flag, []() {
5046
::arrow::fs::S3GlobalOptions options;
5147
init_status = ::arrow::fs::InitializeS3(options);
52-
if (init_status.ok()) {
53-
std::atexit([]() { (void)::arrow::fs::FinalizeS3(); });
54-
}
5548
});
5649
if (!init_status.ok()) {
5750
return std::unexpected<Error>{
@@ -69,7 +62,7 @@ Status EnsureS3Initialized() {
6962
///
7063
/// \param properties The configuration properties map.
7164
/// \return Configured S3Options.
72-
::arrow::fs::S3Options ConfigureS3Options(
65+
Result<::arrow::fs::S3Options> ConfigureS3Options(
7366
const std::unordered_map<std::string, std::string>& properties) {
7467
::arrow::fs::S3Options options;
7568

@@ -102,11 +95,9 @@ ::arrow::fs::S3Options ConfigureS3Options(
10295
options.endpoint_override = endpoint_it->second;
10396
}
10497

105-
// Configure path-style access (needed for MinIO)
10698
auto path_style_it = properties.find(S3Properties::kPathStyleAccess);
107-
if (path_style_it != properties.end()) {
108-
// Arrow's S3 path-style is controlled via endpoint scheme
109-
// For path-style access, we need to ensure the endpoint is properly configured
99+
if (path_style_it != properties.end() && path_style_it->second == "true") {
100+
options.force_virtual_addressing = false;
110101
}
111102

112103
// Configure SSL
@@ -118,117 +109,45 @@ ::arrow::fs::S3Options ConfigureS3Options(
118109
// Configure timeouts
119110
auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs);
120111
if (connect_timeout_it != properties.end()) {
121-
options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0;
112+
try {
113+
options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0;
114+
} catch (const std::exception& e) {
115+
return InvalidArgument("Invalid {}: '{}' ({})", S3Properties::kConnectTimeoutMs,
116+
connect_timeout_it->second, e.what());
117+
}
122118
}
123119

124120
auto socket_timeout_it = properties.find(S3Properties::kSocketTimeoutMs);
125121
if (socket_timeout_it != properties.end()) {
126-
options.request_timeout = std::stod(socket_timeout_it->second) / 1000.0;
122+
try {
123+
options.request_timeout = std::stod(socket_timeout_it->second) / 1000.0;
124+
} catch (const std::exception& e) {
125+
return InvalidArgument("Invalid {}: '{}' ({})", S3Properties::kSocketTimeoutMs,
126+
socket_timeout_it->second, e.what());
127+
}
127128
}
128129

129130
return options;
130131
}
131-
132-
/// \brief Create an S3 FileSystem with the given options.
133-
///
134-
/// \param options The S3Options to use.
135-
/// \return A shared_ptr to the S3FileSystem, or an error.
136-
Result<std::shared_ptr<::arrow::fs::FileSystem>> MakeS3FileSystem(
137-
const ::arrow::fs::S3Options& options) {
138-
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
139-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options));
140-
return fs;
141-
}
142132
#endif
143133

144-
Result<std::shared_ptr<::arrow::fs::FileSystem>> ResolveFileSystemFromUri(
145-
const std::string& uri, std::string* out_path) {
146-
if (IsS3Uri(uri)) {
147-
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
148-
}
149-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::FileSystemFromUri(uri, out_path));
150-
return fs;
151-
}
152-
153-
/// \brief ArrowUriFileIO resolves FileSystem from URI for each operation.
154-
///
155-
/// This implementation is thread-safe as it creates a new FileSystem instance
156-
/// for each operation. However, it may be less efficient than caching the
157-
/// FileSystem. S3 initialization is done once per process.
158-
class ArrowUriFileIO : public FileIO {
159-
public:
160-
Result<std::string> ReadFile(const std::string& file_location,
161-
std::optional<size_t> length) override {
162-
std::string path;
163-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path));
164-
::arrow::fs::FileInfo file_info(path);
165-
if (length.has_value()) {
166-
file_info.set_size(length.value());
167-
}
168-
std::string content;
169-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenInputFile(file_info));
170-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
171-
172-
content.resize(file_size);
173-
size_t remain = file_size;
174-
size_t offset = 0;
175-
while (remain > 0) {
176-
size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
177-
ICEBERG_ARROW_ASSIGN_OR_RETURN(
178-
auto read_bytes,
179-
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
180-
remain -= read_bytes;
181-
offset += read_bytes;
182-
}
183-
184-
return content;
185-
}
186-
187-
Status WriteFile(const std::string& file_location,
188-
std::string_view content) override {
189-
std::string path;
190-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path));
191-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenOutputStream(path));
192-
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
193-
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
194-
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
195-
return {};
196-
}
197-
198-
Status DeleteFile(const std::string& file_location) override {
199-
std::string path;
200-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path));
201-
ICEBERG_ARROW_RETURN_NOT_OK(fs->DeleteFile(path));
202-
return {};
203-
}
204-
};
205-
206134
} // namespace
207135

208136
Result<std::unique_ptr<FileIO>> MakeS3FileIO(
209137
const std::string& uri,
210138
const std::unordered_map<std::string, std::string>& properties) {
211-
if (!IsS3Uri(uri)) {
139+
if (!uri.starts_with("s3://")) {
212140
return InvalidArgument("S3 URI must start with s3://");
213141
}
214142
#if !ICEBERG_ARROW_HAS_S3
215143
return NotImplemented("Arrow S3 support is not enabled");
216144
#else
217-
// If properties are empty, use the simple URI-based resolution
218-
if (properties.empty()) {
219-
// Validate that S3 can be initialized and the URI is valid
220-
std::string path;
221-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(uri, &path));
222-
(void)path;
223-
(void)fs;
224-
return std::make_unique<ArrowUriFileIO>();
225-
}
145+
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
226146

227-
// Create S3FileSystem with explicit configuration
228-
auto options = ConfigureS3Options(properties);
229-
ICEBERG_ASSIGN_OR_RAISE(auto fs, MakeS3FileSystem(options));
147+
// Configure S3 options from properties (uses default credentials if empty)
148+
ICEBERG_ASSIGN_OR_RAISE(auto options, ConfigureS3Options(properties));
149+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options));
230150

231-
// Return ArrowFileSystemFileIO with the configured S3 filesystem
232151
return std::make_unique<ArrowFileSystemFileIO>(std::move(fs));
233152
#endif
234153
}

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
194194
impl_name = io_impl->second;
195195
} else {
196196
// Use default based on warehouse URI scheme
197-
if (warehouse.rfind("s3://", 0) == 0) {
197+
if (warehouse.starts_with("s3://")) {
198198
impl_name = FileIORegistry::kArrowS3FileIO;
199199
} else {
200200
impl_name = FileIORegistry::kArrowLocalFileIO;

src/iceberg/test/arrow_s3_file_io_test.cc

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@
2121
#include <string>
2222
#include <unordered_map>
2323

24-
#if __has_include(<arrow/filesystem/s3fs.h>)
25-
#include <arrow/filesystem/s3fs.h>
26-
#endif
2724
#include <gtest/gtest.h>
2825

2926
#include "iceberg/arrow/arrow_file_io.h"
@@ -32,22 +29,13 @@
3229

3330
namespace iceberg::arrow {
3431

35-
#if __has_include(<arrow/filesystem/s3fs.h>)
36-
namespace {
37-
class ArrowS3Environment final : public ::testing::Environment {
38-
public:
39-
void TearDown() override { (void)::arrow::fs::FinalizeS3(); }
40-
};
41-
} // namespace
42-
#endif
43-
4432
TEST(ArrowS3FileIOTest, RejectsNonS3Uri) {
4533
auto result = MakeS3FileIO("file:///tmp/not-s3");
4634
EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
4735
EXPECT_THAT(result, HasErrorMessage("s3://"));
4836
}
4937

50-
#if __has_include(<arrow/filesystem/s3fs.h>)
38+
#ifdef ICEBERG_S3_ENABLED
5139
TEST(ArrowS3FileIOTest, RequiresS3SupportAtBuildTime) {
5240
auto result = MakeS3FileIO("s3://bucket/path");
5341
if (!result.has_value()) {
@@ -216,11 +204,3 @@ TEST(ArrowS3FileIOTest, MakeS3FileIOWithTimeouts) {
216204
}
217205

218206
} // namespace iceberg::arrow
219-
220-
#if __has_include(<arrow/filesystem/s3fs.h>)
221-
int main(int argc, char** argv) {
222-
::testing::InitGoogleTest(&argc, argv);
223-
::testing::AddGlobalTestEnvironment(new iceberg::arrow::ArrowS3Environment());
224-
return RUN_ALL_TESTS();
225-
}
226-
#endif

0 commit comments

Comments
 (0)