1919
2020#include < cstdlib>
2121#include < mutex>
22- #include < stdexcept >
22+ #include < string >
2323
2424#include < arrow/filesystem/filesystem.h>
2525#ifdef ICEBERG_S3_ENABLED
3232#include " iceberg/arrow/arrow_file_io.h"
3333#include " iceberg/arrow/arrow_fs_file_io_internal.h"
3434#include " iceberg/arrow/arrow_status_internal.h"
35- #include " iceberg/arrow/s3_properties.h"
35+ #include " iceberg/arrow/s3/ s3_properties.h"
3636#include " iceberg/util/macros.h"
37+ #include " iceberg/util/string_util.h"
3738
3839namespace iceberg ::arrow {
3940
@@ -44,7 +45,7 @@ Status EnsureS3Initialized() {
4445 static std::once_flag init_flag;
4546 static ::arrow::Status init_status = ::arrow::Status::OK ();
4647 std::call_once (init_flag, []() {
47- ::arrow::fs::S3GlobalOptions options ;
48+ auto options = ::arrow::fs::S3GlobalOptions::Defaults () ;
4849 init_status = ::arrow::fs::InitializeS3 (options);
4950 });
5051 if (!init_status.ok ()) {
@@ -53,7 +54,7 @@ Status EnsureS3Initialized() {
5354 }
5455 return {};
5556#else
56- return NotImplemented (" Arrow S3 support is not enabled" );
57+ return NotSupported (" Arrow S3 support is not enabled" );
5758#endif
5859}
5960
@@ -67,9 +68,9 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
6768 ::arrow::fs::S3Options options;
6869
6970 // Configure credentials
70- auto access_key_it = properties.find (S3Properties::kAccessKeyId );
71- auto secret_key_it = properties.find (S3Properties::kSecretAccessKey );
72- auto session_token_it = properties.find (S3Properties::kSessionToken );
71+ auto access_key_it = properties.find (std::string ( S3Properties::kAccessKeyId ) );
72+ auto secret_key_it = properties.find (std::string ( S3Properties::kSecretAccessKey ) );
73+ auto session_token_it = properties.find (std::string ( S3Properties::kSessionToken ) );
7374
7475 if (access_key_it != properties.end () && secret_key_it != properties.end ()) {
7576 if (session_token_it != properties.end ()) {
@@ -84,13 +85,13 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
8485 }
8586
8687 // Configure region
87- auto region_it = properties.find (S3Properties::kRegion );
88+ auto region_it = properties.find (std::string ( S3Properties::kRegion ) );
8889 if (region_it != properties.end ()) {
8990 options.region = region_it->second ;
9091 }
9192
9293 // Configure endpoint (for MinIO, LocalStack, etc.)
93- auto endpoint_it = properties.find (S3Properties::kEndpoint );
94+ auto endpoint_it = properties.find (std::string ( S3Properties::kEndpoint ) );
9495 if (endpoint_it != properties.end ()) {
9596 options.endpoint_override = endpoint_it->second ;
9697 } else {
@@ -106,36 +107,30 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
106107 }
107108 }
108109
109- auto path_style_it = properties.find (S3Properties::kPathStyleAccess );
110+ auto path_style_it = properties.find (std::string ( S3Properties::kPathStyleAccess ) );
110111 if (path_style_it != properties.end () && path_style_it->second == " true" ) {
111112 options.force_virtual_addressing = false ;
112113 }
113114
114115 // Configure SSL
115- auto ssl_it = properties.find (S3Properties::kSslEnabled );
116+ auto ssl_it = properties.find (std::string ( S3Properties::kSslEnabled ) );
116117 if (ssl_it != properties.end () && ssl_it->second == " false" ) {
117118 options.scheme = " http" ;
118119 }
119120
120121 // Configure timeouts
121- auto connect_timeout_it = properties.find (S3Properties::kConnectTimeoutMs );
122+ auto connect_timeout_it = properties.find (std::string ( S3Properties::kConnectTimeoutMs ) );
122123 if (connect_timeout_it != properties.end ()) {
123- try {
124- options.connect_timeout = std::stod (connect_timeout_it->second ) / 1000.0 ;
125- } catch (const std::exception& e) {
126- return InvalidArgument (" Invalid {}: '{}' ({})" , S3Properties::kConnectTimeoutMs ,
127- connect_timeout_it->second , e.what ());
128- }
124+ ICEBERG_ASSIGN_OR_RAISE (auto timeout_ms,
125+ StringUtils::ParseNumber<double >(connect_timeout_it->second ));
126+ options.connect_timeout = timeout_ms / 1000.0 ;
129127 }
130128
131- auto socket_timeout_it = properties.find (S3Properties::kSocketTimeoutMs );
129+ auto socket_timeout_it = properties.find (std::string ( S3Properties::kSocketTimeoutMs ) );
132130 if (socket_timeout_it != properties.end ()) {
133- try {
134- options.request_timeout = std::stod (socket_timeout_it->second ) / 1000.0 ;
135- } catch (const std::exception& e) {
136- return InvalidArgument (" Invalid {}: '{}' ({})" , S3Properties::kSocketTimeoutMs ,
137- socket_timeout_it->second , e.what ());
138- }
131+ ICEBERG_ASSIGN_OR_RAISE (auto timeout_ms,
132+ StringUtils::ParseNumber<double >(socket_timeout_it->second ));
133+ options.request_timeout = timeout_ms / 1000.0 ;
139134 }
140135
141136 return options;
@@ -145,13 +140,9 @@ Result<::arrow::fs::S3Options> ConfigureS3Options(
145140} // namespace
146141
147142Result<std::unique_ptr<FileIO>> MakeS3FileIO (
148- const std::string& uri,
149143 const std::unordered_map<std::string, std::string>& properties) {
150- if (!uri.starts_with (" s3://" )) {
151- return InvalidArgument (" S3 URI must start with s3://" );
152- }
153144#if !ICEBERG_ARROW_HAS_S3
154- return NotImplemented (" Arrow S3 support is not enabled" );
145+ return NotSupported (" Arrow S3 support is not enabled" );
155146#else
156147 ICEBERG_RETURN_UNEXPECTED (EnsureS3Initialized ());
157148
@@ -163,4 +154,14 @@ Result<std::unique_ptr<FileIO>> MakeS3FileIO(
163154#endif
164155}
165156
157+ Status FinalizeS3 () {
158+ #if ICEBERG_ARROW_HAS_S3
159+ auto status = ::arrow::fs::FinalizeS3 ();
160+ ICEBERG_ARROW_RETURN_NOT_OK (status);
161+ return {};
162+ #else
163+ return NotSupported (" Arrow S3 support is not enabled" );
164+ #endif
165+ }
166+
166167} // namespace iceberg::arrow
0 commit comments