|
| 1 | +use crate::{ |
| 2 | + apis::{ |
| 3 | + self, |
| 4 | + catalog_api_api::{self, NamespaceExistsError}, |
| 5 | + configuration::Configuration, |
| 6 | + }, |
| 7 | + models::{self, StorageCredential}, |
| 8 | +}; |
1 | 9 | use async_trait::async_trait; |
2 | 10 | use futures::{FutureExt, TryFutureExt}; |
3 | 11 | /** |
@@ -25,21 +33,13 @@ use iceberg_rust::{ |
25 | 33 | table::Table, |
26 | 34 | view::View, |
27 | 35 | }; |
28 | | -use object_store::{aws::AmazonS3Builder, ObjectStore}; |
| 36 | +use object_store::{aws::AmazonS3Builder, ObjectStore, ObjectStoreScheme}; |
29 | 37 | use std::{ |
30 | 38 | collections::HashMap, |
31 | 39 | path::Path, |
32 | 40 | sync::{Arc, RwLock}, |
33 | 41 | }; |
34 | | - |
35 | | -use crate::{ |
36 | | - apis::{ |
37 | | - self, |
38 | | - catalog_api_api::{self, NamespaceExistsError}, |
39 | | - configuration::Configuration, |
40 | | - }, |
41 | | - models::{self, StorageCredential}, |
42 | | -}; |
| 42 | +use url::Url; |
43 | 43 |
|
44 | 44 | #[derive(Debug)] |
45 | 45 | pub struct RestCatalog { |
@@ -287,7 +287,7 @@ impl Catalog for RestCatalog { |
287 | 287 | self.name.as_deref(), |
288 | 288 | &identifier.namespace().to_string(), |
289 | 289 | identifier.name(), |
290 | | - None, |
| 290 | + Some("vended-credentials"), |
291 | 291 | None, |
292 | 292 | ) |
293 | 293 | .await |
@@ -613,43 +613,59 @@ const CLIENT_REGION: &str = "client.region"; |
613 | 613 | const AWS_ACCESS_KEY_ID: &str = "s3.access-key-id"; |
614 | 614 | const AWS_SECRET_ACCESS_KEY: &str = "s3.secret-access-key"; |
615 | 615 | const AWS_SESSION_TOKEN: &str = "s3.session-token"; |
| 616 | +const AWS_REGION: &str = "s3.region"; |
| 617 | +const AWS_ENDPOINT: &str = "s3.endpoint"; |
| 618 | +const AWS_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous"; |
616 | 619 |
|
617 | 620 | fn object_store_from_response( |
618 | 621 | response: &models::LoadTableResult, |
619 | 622 | ) -> Result<Option<Arc<dyn ObjectStore>>, Error> { |
620 | 623 | let config = match (&response.storage_credentials, &response.config) { |
621 | | - (Some(credentials), _) => Some(&credentials[0].config), |
622 | | - (None, Some(config)) => Some(config), |
623 | | - (None, None) => None, |
624 | | - }; |
625 | | - |
626 | | - let Some(config) = config else { |
627 | | - return Ok(None); |
| 624 | + (Some(credentials), Some(config)) => { |
| 625 | + // Enrich credentials with other options that might only be found in the config (e.g. |
| 626 | + // a custom endpoint) |
| 627 | + let mut options = credentials[0].config.clone(); |
| 628 | + options.extend(config.clone()); |
| 629 | + options |
| 630 | + } |
| 631 | + (Some(credentials), None) => credentials[0].config.clone(), |
| 632 | + (None, Some(config)) => config.clone(), |
| 633 | + (None, None) => return Ok(None), |
628 | 634 | }; |
629 | 635 |
|
630 | | - let region = config.get(CLIENT_REGION); |
631 | | - if config.contains_key(AWS_ACCESS_KEY_ID) { |
632 | | - let access_key_id = config.get(AWS_ACCESS_KEY_ID); |
633 | | - let secret_access_key = config.get(AWS_SECRET_ACCESS_KEY); |
634 | | - let session_token = config.get(AWS_SESSION_TOKEN); |
635 | | - let mut builder = AmazonS3Builder::new(); |
| 636 | + let url = Url::parse(&response.metadata.location)?; |
| 637 | + match ObjectStoreScheme::parse(&url) { |
| 638 | + Ok((ObjectStoreScheme::AmazonS3, _path)) => { |
| 639 | + let access_key_id = config.get(AWS_ACCESS_KEY_ID); |
| 640 | + let secret_access_key = config.get(AWS_SECRET_ACCESS_KEY); |
| 641 | + let session_token = config.get(AWS_SESSION_TOKEN); |
| 642 | + let region = config.get(CLIENT_REGION).or(config.get(AWS_REGION)); |
| 643 | + let endpoint = config.get(AWS_ENDPOINT); |
| 644 | + let allow_anonymous = config.get(AWS_ALLOW_ANONYMOUS).is_some_and(|s| s == "true"); |
| 645 | + let mut builder = AmazonS3Builder::new().with_url(&response.metadata.location); |
| 646 | + |
| 647 | + if let Some(region) = region { |
| 648 | + builder = builder.with_region(region) |
| 649 | + } |
| 650 | + if let Some(access_key_id) = access_key_id { |
| 651 | + builder = builder.with_access_key_id(access_key_id) |
| 652 | + } |
| 653 | + if let Some(secret_access_key) = secret_access_key { |
| 654 | + builder = builder.with_secret_access_key(secret_access_key) |
| 655 | + } |
| 656 | + if let Some(session_token) = session_token { |
| 657 | + builder = builder.with_token(session_token) |
| 658 | + } |
| 659 | + if let Some(endpoint) = endpoint { |
| 660 | + builder = builder.with_endpoint(endpoint) |
| 661 | + } |
| 662 | + if allow_anonymous { |
| 663 | + builder = builder.with_skip_signature(true) |
| 664 | + } |
636 | 665 |
|
637 | | - if let Some(region) = region { |
638 | | - builder = builder.with_region(region) |
639 | | - } |
640 | | - if let Some(access_key_id) = access_key_id { |
641 | | - builder = builder.with_access_key_id(access_key_id) |
| 666 | + Ok(Some(Arc::new(builder.build()?))) |
642 | 667 | } |
643 | | - if let Some(secret_access_key) = secret_access_key { |
644 | | - builder = builder.with_secret_access_key(secret_access_key) |
645 | | - } |
646 | | - if let Some(session_token) = session_token { |
647 | | - builder = builder.with_token(session_token) |
648 | | - } |
649 | | - |
650 | | - Ok(Some(Arc::new(builder.build()?))) |
651 | | - } else { |
652 | | - Ok(None) |
| 668 | + _ => Ok(None), |
653 | 669 | } |
654 | 670 | } |
655 | 671 |
|
|
0 commit comments