Skip to content

Commit 45fec39

Browse files
iceberg handlers modularized, api locked for 0.3.0
1 parent e949a1f commit 45fec39

22 files changed

Lines changed: 2989 additions & 1951 deletions

pangolin/pangolin_api/src/asset_handlers.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use crate::auth::TenantId;
1313
use crate::authz::check_permission;
1414
use pangolin_core::permission::{PermissionScope, Action};
1515
use pangolin_core::user::UserSession;
16-
use crate::iceberg_handlers::{AppState, parse_table_identifier};
16+
use crate::iceberg::AppState;
17+
use crate::iceberg::parse_table_identifier;
1718
use utoipa::ToSchema;
1819

1920
#[derive(Deserialize, ToSchema)]
@@ -320,7 +321,7 @@ pub struct AssetSummary {
320321
pub name: String,
321322
pub namespace: Vec<String>,
322323
pub kind: AssetType,
323-
pub identifier: crate::iceberg_handlers::TableIdentifier,
324+
pub identifier: crate::iceberg::TableIdentifier,
324325
}
325326

326327
/// List all assets in a namespace (Iceberg tables + Generic assets)
@@ -406,7 +407,7 @@ pub async fn list_assets(
406407
name: asset.name.clone(),
407408
namespace: namespace_parts.clone(),
408409
kind: asset.kind,
409-
identifier: crate::iceberg_handlers::TableIdentifier {
410+
identifier: crate::iceberg::TableIdentifier {
410411
namespace: namespace_parts.clone(),
411412
name: asset.name,
412413
}
@@ -422,7 +423,7 @@ pub async fn list_assets(
422423
#[cfg(test)]
423424
mod tests {
424425
use super::*;
425-
use crate::iceberg_handlers::AppState;
426+
use crate::iceberg::AppState;
426427
use pangolin_core::user::UserSession;
427428
use pangolin_core::permission::{PermissionScope, Action};
428429
use crate::tenant::TenantId;

pangolin/pangolin_api/src/business_metadata_handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use pangolin_core::business_metadata::{BusinessMetadata, AccessRequest, RequestS
1212
use pangolin_core::user::{UserSession, UserRole};
1313
use pangolin_core::permission::{Action, PermissionScope};
1414
use uuid::Uuid;
15-
use crate::iceberg_handlers::AppState;
15+
use crate::iceberg::AppState;
1616
use utoipa::ToSchema;
1717

1818
#[derive(Deserialize, Serialize, ToSchema)]

pangolin/pangolin_api/src/dashboard_handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use axum::{
77
use serde::{Deserialize, Serialize};
88
use pangolin_store::CatalogStore;
99
use pangolin_core::user::{UserSession, UserRole};
10-
use crate::iceberg_handlers::AppState;
10+
use crate::iceberg::AppState;
1111
use crate::error::ApiError;
1212
use utoipa::ToSchema;
1313

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use axum::Json;
2+
use std::collections::HashMap;
3+
use super::types::CatalogConfig;
4+
5+
/// Get Iceberg Catalog Configuration
6+
#[utoipa::path(
7+
get,
8+
path = "/v1/config",
9+
tag = "Iceberg REST",
10+
responses(
11+
(status = 200, description = "Catalog configuration", body = CatalogConfig),
12+
)
13+
)]
14+
pub async fn get_iceberg_catalog_config_handler() -> Json<CatalogConfig> {
15+
// Return Iceberg REST catalog config
16+
// Use X-Iceberg-Access-Delegation header to enable credential vending
17+
let mut defaults = HashMap::new();
18+
19+
// This header tells PyIceberg to request credentials via the vend-credentials endpoint
20+
// PyIceberg will call POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/credentials
21+
defaults.insert("header.X-Iceberg-Access-Delegation".to_string(), "vended-credentials".to_string());
22+
23+
// Optionally provide S3 endpoint if using MinIO or custom S3
24+
if let Ok(endpoint) = std::env::var("S3_ENDPOINT") {
25+
defaults.insert("s3.endpoint".to_string(), endpoint);
26+
} else if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") {
27+
defaults.insert("s3.endpoint".to_string(), endpoint);
28+
}
29+
30+
// Add region if specified
31+
if let Ok(region) = std::env::var("AWS_REGION") {
32+
defaults.insert("s3.region".to_string(), region);
33+
}
34+
35+
Json(CatalogConfig {
36+
defaults,
37+
overrides: HashMap::new(),
38+
})
39+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use axum::{
2+
body::Body,
3+
http::{HeaderMap, Method, StatusCode},
4+
response::IntoResponse,
5+
Json,
6+
};
7+
use bytes::Bytes;
8+
use pangolin_store::CatalogStore;
9+
use pangolin_core::model::CatalogType;
10+
use std::sync::Arc;
11+
use crate::federated_proxy::FederatedCatalogProxy;
12+
13+
pub mod config;
14+
pub mod namespaces;
15+
pub mod tables;
16+
pub mod types;
17+
18+
// Re-export types for convenience
19+
pub use types::*;
20+
pub type AppState = std::sync::Arc<dyn pangolin_store::CatalogStore + Send + Sync>;
21+
22+
/// Helper function to check if a catalog is federated and forward the request if so
23+
pub async fn check_and_forward_if_federated(
24+
store: &Arc<dyn CatalogStore + Send + Sync>,
25+
tenant_id: uuid::Uuid,
26+
catalog_name: &str,
27+
method: Method,
28+
path: &str,
29+
body: Option<Bytes>,
30+
headers: HeaderMap,
31+
) -> Option<axum::response::Response> {
32+
// Get the catalog
33+
let catalog = match store.get_catalog(tenant_id, catalog_name.to_string()).await {
34+
Ok(Some(c)) => c,
35+
Ok(None) => return None, // Catalog not found, let handler deal with it
36+
Err(_) => return None,
37+
};
38+
39+
// Check if it's federated
40+
if catalog.catalog_type == CatalogType::Federated {
41+
if let Some(config) = catalog.federated_config {
42+
let proxy = FederatedCatalogProxy::new();
43+
match proxy.forward_request(&config, method, path, body, headers).await {
44+
Ok(response) => Some(response),
45+
Err(e) => Some((
46+
StatusCode::BAD_GATEWAY,
47+
Json(serde_json::json!({"error": format!("Federated catalog error: {}", e)})),
48+
).into_response()),
49+
}
50+
} else {
51+
Some((
52+
StatusCode::INTERNAL_SERVER_ERROR,
53+
Json(serde_json::json!({"error": "Federated catalog missing configuration"})),
54+
).into_response())
55+
}
56+
} else {
57+
None // Not federated, continue with local handling
58+
}
59+
}

0 commit comments

Comments
 (0)