@@ -44,8 +44,8 @@ use crate::client::{
4444} ;
4545use crate :: types:: {
4646 CatalogConfig , CommitTableRequest , CommitTableResponse , CreateTableRequest ,
47- ListNamespaceResponse , ListTableResponse , LoadTableResponse , NamespaceSerde ,
48- RegisterTableRequest , RenameTableRequest ,
47+ ListNamespaceResponse , ListTableResponse , LoadCredentialsResponse , LoadTableResponse ,
48+ NamespaceSerde , RegisterTableRequest , RenameTableRequest ,
4949} ;
5050
5151/// REST catalog URI
@@ -461,6 +461,108 @@ impl RestCatalog {
461461 pub async fn regenerate_token ( & self ) -> Result < ( ) > {
462462 self . context ( ) . await ?. client . regenerate_token ( ) . await
463463 }
464+
465+ /// The actual logic for loading table, that supports loading vended credentials if requested.
466+ async fn load_table_internal (
467+ & self ,
468+ table_ident : & TableIdent ,
469+ load_credentials : bool ,
470+ ) -> Result < Table > {
471+ let context = self . context ( ) . await ?;
472+
473+ let mut request_builder = context
474+ . client
475+ . request ( Method :: GET , context. config . table_endpoint ( table_ident) ) ;
476+
477+ if load_credentials {
478+ request_builder =
479+ request_builder. header ( "X-Iceberg-Access-Delegation" , "vended-credentials" ) ;
480+ }
481+
482+ let request = request_builder. build ( ) ?;
483+
484+ let http_response = context. client . query_catalog ( request) . await ?;
485+
486+ let response = match http_response. status ( ) {
487+ StatusCode :: OK | StatusCode :: NOT_MODIFIED => {
488+ deserialize_catalog_response :: < LoadTableResponse > ( http_response) . await ?
489+ }
490+ StatusCode :: NOT_FOUND => {
491+ return Err ( Error :: new (
492+ ErrorKind :: Unexpected ,
493+ "Tried to load a table that does not exist" ,
494+ ) ) ;
495+ }
496+ _ => return Err ( deserialize_unexpected_catalog_error ( http_response) . await ) ,
497+ } ;
498+
499+ // Build config with proper precedence, with each next config overriding previous one:
500+ // 1. response.config (server defaults)
501+ // 2. user_config.props (user configuration)
502+ // 3. storage_credentials (vended credentials - highest priority)
503+ let mut config: HashMap < String , String > = response
504+ . config
505+ . unwrap_or_default ( )
506+ . into_iter ( )
507+ . chain ( self . user_config . props . clone ( ) )
508+ . collect ( ) ;
509+
510+ // Per the OpenAPI spec: "Clients must first check whether the respective credentials
511+ // exist in the storage-credentials field before checking the config for credentials."
512+ // When vended-credentials header is set, credentials are returned in storage_credentials field.
513+ if let Some ( storage_credentials) = response. storage_credentials {
514+ for cred in storage_credentials {
515+ config. extend ( cred. config ) ;
516+ }
517+ }
518+
519+ let file_io = self
520+ . load_file_io ( response. metadata_location . as_deref ( ) , Some ( config) )
521+ . await ?;
522+
523+ let table_builder = Table :: builder ( )
524+ . identifier ( table_ident. clone ( ) )
525+ . file_io ( file_io)
526+ . metadata ( response. metadata ) ;
527+
528+ if let Some ( metadata_location) = response. metadata_location {
529+ table_builder. metadata_location ( metadata_location) . build ( )
530+ } else {
531+ table_builder. build ( )
532+ }
533+ }
534+
535+ /// Load vended credentials for a table from the catalog.
536+ pub async fn load_table_credentials (
537+ & self ,
538+ table_ident : & TableIdent ,
539+ ) -> Result < LoadCredentialsResponse > {
540+ let context = self . context ( ) . await ?;
541+
542+ let endpoint = format ! ( "{}/credentials" , context. config. table_endpoint( table_ident) ) ;
543+
544+ let request = context. client . request ( Method :: GET , endpoint) . build ( ) ?;
545+
546+ let http_response = context. client . query_catalog ( request) . await ?;
547+
548+ match http_response. status ( ) {
549+ StatusCode :: OK => deserialize_catalog_response ( http_response) . await ,
550+ StatusCode :: NOT_FOUND => Err ( Error :: new (
551+ ErrorKind :: Unexpected ,
552+ "Tried to load credentials for a table that does not exist" ,
553+ ) ) ,
554+ _ => Err ( deserialize_unexpected_catalog_error ( http_response) . await ) ,
555+ }
556+ }
557+
558+ /// Load a table with vended credentials from the catalog.
559+ ///
560+ /// This method loads the table and automatically fetches short-lived credentials
561+ /// for accessing the table's data files. The credentials are merged into the
562+ /// FileIO configuration.
563+ pub async fn load_table_with_credentials ( & self , table_ident : & TableIdent ) -> Result < Table > {
564+ self . load_table_internal ( table_ident, true ) . await
565+ }
464566}
465567
466568/// All requests and expected responses are derived from the REST catalog API spec:
@@ -754,49 +856,7 @@ impl Catalog for RestCatalog {
754856 /// server and the config provided when creating this `RestCatalog` instance, then the value
755857 /// provided locally to the `RestCatalog` will take precedence.
756858 async fn load_table ( & self , table_ident : & TableIdent ) -> Result < Table > {
757- let context = self . context ( ) . await ?;
758-
759- let request = context
760- . client
761- . request ( Method :: GET , context. config . table_endpoint ( table_ident) )
762- . build ( ) ?;
763-
764- let http_response = context. client . query_catalog ( request) . await ?;
765-
766- let response = match http_response. status ( ) {
767- StatusCode :: OK | StatusCode :: NOT_MODIFIED => {
768- deserialize_catalog_response :: < LoadTableResponse > ( http_response) . await ?
769- }
770- StatusCode :: NOT_FOUND => {
771- return Err ( Error :: new (
772- ErrorKind :: Unexpected ,
773- "Tried to load a table that does not exist" ,
774- ) ) ;
775- }
776- _ => return Err ( deserialize_unexpected_catalog_error ( http_response) . await ) ,
777- } ;
778-
779- let config = response
780- . config
781- . unwrap_or_default ( )
782- . into_iter ( )
783- . chain ( self . user_config . props . clone ( ) )
784- . collect ( ) ;
785-
786- let file_io = self
787- . load_file_io ( response. metadata_location . as_deref ( ) , Some ( config) )
788- . await ?;
789-
790- let table_builder = Table :: builder ( )
791- . identifier ( table_ident. clone ( ) )
792- . file_io ( file_io)
793- . metadata ( response. metadata ) ;
794-
795- if let Some ( metadata_location) = response. metadata_location {
796- table_builder. metadata_location ( metadata_location) . build ( )
797- } else {
798- table_builder. build ( )
799- }
859+ self . load_table_internal ( table_ident, false ) . await
800860 }
801861
802862 /// Drop a table from the catalog.
@@ -999,6 +1059,7 @@ mod tests {
9991059 use std:: sync:: Arc ;
10001060
10011061 use chrono:: { TimeZone , Utc } ;
1062+ use futures:: stream:: StreamExt ;
10021063 use iceberg:: spec:: {
10031064 FormatVersion , NestedField , NullOrder , Operation , PrimitiveType , Schema , Snapshot ,
10041065 SnapshotLog , SortDirection , SortField , SortOrder , Summary , Transform , Type ,
@@ -2764,4 +2825,141 @@ mod tests {
27642825 assert_eq ! ( err. message( ) , "Catalog uri is required" ) ;
27652826 }
27662827 }
2828+
2829+ #[ tokio:: test]
2830+ #[ ignore]
2831+ async fn test_load_table_credentials_integration ( ) {
2832+ use std:: env;
2833+
2834+ let client_id =
2835+ env:: var ( "POLARIS_USER" ) . expect ( "POLARIS_USER environment variable must be set" ) ;
2836+ let client_secret =
2837+ env:: var ( "POLARIS_SECRET" ) . expect ( "POLARIS_SECRET environment variable must be set" ) ;
2838+ let catalog_uri = env:: var ( "POLARIS_URI" )
2839+ . unwrap_or_else ( |_| "http://localhost:8181/api/catalog" . to_string ( ) ) ;
2840+
2841+ let mut props = HashMap :: new ( ) ;
2842+ props. insert (
2843+ "credential" . to_string ( ) ,
2844+ format ! ( "{}:{}" , client_id, client_secret) ,
2845+ ) ;
2846+ props. insert ( "scope" . to_string ( ) , "PRINCIPAL_ROLE:ALL" . to_string ( ) ) ;
2847+ props. insert (
2848+ "s3.endpoint" . to_string ( ) ,
2849+ "http://localhost:9000" . to_string ( ) ,
2850+ ) ;
2851+
2852+ let catalog = RestCatalog :: new (
2853+ RestCatalogConfig :: builder ( )
2854+ . uri ( catalog_uri)
2855+ . warehouse ( "warehouse" . to_string ( ) )
2856+ . props ( props)
2857+ . build ( ) ,
2858+ ) ;
2859+
2860+ let table_ident = TableIdent :: new (
2861+ NamespaceIdent :: new ( "tpch.sf01" . to_string ( ) ) ,
2862+ "nation" . to_string ( ) ,
2863+ ) ;
2864+
2865+ let credentials_result = catalog. load_table_credentials ( & table_ident) . await ;
2866+
2867+ match credentials_result {
2868+ Ok ( credentials) => {
2869+ println ! ( "Successfully loaded credentials" ) ;
2870+ println ! (
2871+ "Number of storage credentials: {}" ,
2872+ credentials. storage_credentials. len( )
2873+ ) ;
2874+ // println!("Full response: {:#?}", credentials);
2875+ assert ! ( !credentials. storage_credentials. is_empty( ) ) ;
2876+ }
2877+ Err ( e) => {
2878+ panic ! ( "Failed to load table credentials: {:?}" , e) ;
2879+ }
2880+ }
2881+
2882+ // Also test loading table with vended credentials
2883+ println ! ( "\n --- Testing load_table_with_credentials ---" ) ;
2884+ let table_result = catalog. load_table_with_credentials ( & table_ident) . await ;
2885+
2886+ match table_result {
2887+ Ok ( table) => {
2888+ println ! ( "Successfully loaded table with vended credentials" ) ;
2889+ println ! ( "Table identifier: {}" , table. identifier( ) ) ;
2890+ println ! ( "Metadata location: {:?}" , table. metadata_location( ) ) ;
2891+ println ! ( "FileIO configured with vended credentials" ) ;
2892+
2893+ // Scan the table and count rows
2894+ println ! ( "\n --- Scanning table ---" ) ;
2895+ let scan = table. scan ( ) . build ( ) . expect ( "Failed to build scan" ) ;
2896+ let mut row_count = 0 ;
2897+
2898+ let mut stream = scan
2899+ . to_arrow ( )
2900+ . await
2901+ . expect ( "Failed to create arrow stream" ) ;
2902+
2903+ while let Some ( batch_result) = stream. next ( ) . await {
2904+ match batch_result {
2905+ Ok ( batch) => {
2906+ row_count += batch. num_rows ( ) ;
2907+ println ! ( " Batch: {} rows" , batch. num_rows( ) ) ;
2908+ }
2909+ Err ( e) => {
2910+ panic ! ( "Failed to read batch: {:?}" , e) ;
2911+ }
2912+ }
2913+ }
2914+
2915+ println ! ( "Total rows scanned: {}" , row_count) ;
2916+ assert_eq ! ( row_count, 25 , "Expected 25 rows in nation table" ) ;
2917+ println ! ( "✓ Successfully verified 25 rows in table" ) ;
2918+ }
2919+ Err ( e) => {
2920+ panic ! ( "Failed to load table with vended credentials: {:?}" , e) ;
2921+ }
2922+ }
2923+
2924+ // Test loading table WITHOUT vended credentials and verify scan fails
2925+ println ! ( "\n --- Testing load_table WITHOUT vended credentials (should fail) ---" ) ;
2926+ let table_result_no_creds = catalog. load_table ( & table_ident) . await ;
2927+
2928+ match table_result_no_creds {
2929+ Ok ( table) => {
2930+ println ! ( "Successfully loaded table WITHOUT vended credentials" ) ;
2931+ println ! ( "Table identifier: {}" , table. identifier( ) ) ;
2932+ println ! ( "Metadata location: {:?}" , table. metadata_location( ) ) ;
2933+
2934+ // Try to scan the table - this should fail
2935+ println ! ( "\n --- Attempting to scan table without credentials ---" ) ;
2936+ let scan = table. scan ( ) . build ( ) . expect ( "Failed to build scan" ) ;
2937+
2938+ // Try to create arrow stream - this should fail when accessing manifest list
2939+ match scan. to_arrow ( ) . await {
2940+ Ok ( _stream) => {
2941+ panic ! (
2942+ "Stream creation succeeded without vended credentials - this should not happen!"
2943+ ) ;
2944+ }
2945+ Err ( e) => {
2946+ println ! ( "✓ Scan failed as expected without vended credentials" ) ;
2947+ println ! ( "Error: {}" , e) ;
2948+ // Verify it's a permission/authentication error
2949+ let error_msg = e. to_string ( ) ;
2950+ assert ! (
2951+ error_msg. contains( "PermissionDenied" )
2952+ && error_msg. contains( "InvalidAccessKeyId" )
2953+ && error_msg. contains( "403" ) ,
2954+ "Expected permission/authentication error, got: {}" ,
2955+ error_msg
2956+ ) ;
2957+ }
2958+ }
2959+ }
2960+ Err ( e) => {
2961+ panic ! ( "Failed to load table without vended credentials: {:?}" , e) ;
2962+ }
2963+ }
2964+ }
27672965}
0 commit comments