Skip to content

Commit 3bc4bd9

Browse files
authored
Snowlake table wrapper (#31)
* Snowdlake table wrapper * Fix tests * Fix tests
1 parent 2291b83 commit 3bc4bd9

7 files changed

Lines changed: 191 additions & 18 deletions

File tree

crates/catalog/src/catalogs/embucket/schema.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::block_in_new_runtime;
2+
use crate::snowflake_table::CaseInsensitiveTable;
23
use async_trait::async_trait;
34
use catalog_metastore::error as metastore_error;
45
use catalog_metastore::{Metastore, SchemaIdent, TableIdent};
@@ -92,8 +93,10 @@ impl SchemaProvider for EmbucketSchema {
9293
.await
9394
.map_err(|e| DataFusionError::External(Box::new(e)))?;
9495
let tabular = IcebergTabular::Table(iceberg_table);
95-
let table_provider: Arc<dyn TableProvider> =
96-
Arc::new(IcebergDataFusionTable::new(tabular, None, None, None));
96+
97+
let table_provider: Arc<dyn TableProvider> = Arc::new(CaseInsensitiveTable::new(
98+
Arc::new(IcebergDataFusionTable::new(tabular, None, None, None)),
99+
));
97100
Ok(Some(table_provider))
98101
}
99102
Ok(None) => Ok(None),

crates/catalog/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod df_error;
1010
pub mod error;
1111
pub mod information_schema;
1212
pub mod schema;
13+
pub mod snowflake_table;
1314
pub mod table;
1415

1516
#[cfg(test)]

crates/catalog/src/schema.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::df_error;
2+
use crate::snowflake_table::CaseInsensitiveTable;
23
use crate::table::{CachingTable, IcebergTableBuilder};
34
use async_trait::async_trait;
45
use dashmap::DashMap;
@@ -96,8 +97,9 @@ impl SchemaProvider for CachingSchema {
9697
.await
9798
.context(df_error::IcebergSnafu)?;
9899
let tabular = IcebergTabular::Table(iceberg_table);
99-
let table_provider: Arc<dyn TableProvider> =
100-
Arc::new(DataFusionTable::new(tabular, None, None, None));
100+
let table_provider: Arc<dyn TableProvider> = Arc::new(CaseInsensitiveTable::new(
101+
Arc::new(DataFusionTable::new(tabular, None, None, None)),
102+
));
101103
Ok::<Arc<dyn TableProvider>, DataFusionError>(table_provider)
102104
})?
103105
} else {
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
use async_trait::async_trait;
2+
use datafusion::arrow::datatypes::{Schema, SchemaRef};
3+
use datafusion::catalog::{Session, TableProvider};
4+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
5+
use datafusion_common::{Statistics, project_schema};
6+
use datafusion_expr::dml::InsertOp;
7+
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
8+
use datafusion_physical_plan::expressions::Column;
9+
use datafusion_physical_plan::projection::ProjectionExec;
10+
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
11+
use std::any::Any;
12+
use std::fmt::Debug;
13+
use std::sync::Arc;
14+
15+
/// A [`TableProvider`] adapter that rewrites column names to match Snowflake's
16+
/// case-insensitive semantics.
17+
///
18+
/// Snowflake stores schemas in uppercase, while queries are typically written
19+
/// in lowercase. `DataFusion` treats identifiers as case-sensitive, which causes
20+
/// mismatches between the logical projection expressions (lowercase) and the
21+
/// physical input schema (uppercase). This adapter:
22+
///
23+
/// - Rewrites column references in filter expressions to the original schema
24+
/// casing before delegating to the underlying table provider.
25+
/// - Wraps the produced physical plan in a projection that aliases columns to
26+
/// lowercase names, so the output schema matches the logical expectations.
27+
#[derive(Debug)]
28+
pub struct CaseInsensitiveTable {
29+
inner: Arc<dyn TableProvider>,
30+
original_schema: SchemaRef,
31+
normalized_schema: SchemaRef,
32+
requires_case_rewrite: bool,
33+
}
34+
35+
impl CaseInsensitiveTable {
36+
pub fn new(inner: Arc<dyn TableProvider>) -> Self {
37+
let original_schema = inner.schema();
38+
let requires_case_rewrite = original_schema
39+
.fields()
40+
.iter()
41+
.any(|field| field.name().eq(&field.name().to_ascii_uppercase()));
42+
let normalized_schema = Arc::new(Schema::new(
43+
original_schema
44+
.fields()
45+
.iter()
46+
.map(|field| {
47+
let mut cloned = field.as_ref().clone();
48+
cloned.set_name(field.name().to_ascii_lowercase());
49+
cloned
50+
})
51+
.collect::<Vec<_>>(),
52+
));
53+
Self {
54+
inner,
55+
original_schema,
56+
normalized_schema,
57+
requires_case_rewrite,
58+
}
59+
}
60+
61+
fn rewrite_expr(&self, expr: Expr) -> datafusion_common::Result<Expr> {
62+
expr.transform_up(|e| {
63+
if let Expr::Column(col) = &e {
64+
let lookup = self
65+
.original_schema
66+
.fields()
67+
.iter()
68+
.find(|field| field.name().eq_ignore_ascii_case(&col.name));
69+
70+
if let Some(field) = lookup {
71+
let mut updated = col.clone();
72+
updated.name.clone_from(field.name());
73+
return Ok(Transformed::yes(Expr::Column(updated)));
74+
}
75+
}
76+
Ok(Transformed::no(e))
77+
})
78+
.data()
79+
}
80+
}
81+
82+
#[async_trait]
83+
impl TableProvider for CaseInsensitiveTable {
84+
fn as_any(&self) -> &dyn Any {
85+
self.inner.as_any()
86+
}
87+
88+
fn schema(&self) -> SchemaRef {
89+
Arc::clone(&self.normalized_schema)
90+
}
91+
92+
fn table_type(&self) -> TableType {
93+
self.inner.table_type()
94+
}
95+
96+
#[allow(clippy::as_conversions)]
97+
async fn scan(
98+
&self,
99+
state: &dyn Session,
100+
projection: Option<&Vec<usize>>,
101+
filters: &[Expr],
102+
limit: Option<usize>,
103+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
104+
if !self.requires_case_rewrite {
105+
return self.inner.scan(state, projection, filters, limit).await;
106+
}
107+
108+
let rewritten_filters = filters
109+
.iter()
110+
.map(|expr| self.rewrite_expr(expr.clone()))
111+
.collect::<Result<Vec<_>, _>>()?;
112+
113+
let plan = self
114+
.inner
115+
.scan(state, projection, &rewritten_filters, limit)
116+
.await?;
117+
118+
let target_schema = if let Some(indices) = projection {
119+
project_schema(&self.normalized_schema, Some(indices))?
120+
} else {
121+
Arc::clone(&self.normalized_schema)
122+
};
123+
124+
let mut projection_exprs = Vec::with_capacity(plan.schema().fields().len());
125+
for (idx, field) in plan.schema().fields().iter().enumerate() {
126+
let target_name = target_schema.field(idx).name().clone();
127+
128+
projection_exprs.push((
129+
Arc::new(Column::new(field.name(), idx)) as Arc<dyn PhysicalExpr>,
130+
target_name,
131+
));
132+
}
133+
134+
let projected_plan = ProjectionExec::try_new(projection_exprs, plan)?;
135+
Ok(Arc::new(projected_plan))
136+
}
137+
138+
fn supports_filters_pushdown(
139+
&self,
140+
filters: &[&Expr],
141+
) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
142+
if !self.requires_case_rewrite {
143+
return self.inner.supports_filters_pushdown(filters);
144+
}
145+
146+
let rewritten = filters
147+
.iter()
148+
.map(|expr| self.rewrite_expr((*expr).clone()))
149+
.collect::<Result<Vec<_>, _>>()?;
150+
let rewritten_refs = rewritten.iter().collect::<Vec<_>>();
151+
self.inner.supports_filters_pushdown(&rewritten_refs)
152+
}
153+
154+
fn statistics(&self) -> Option<Statistics> {
155+
self.inner.statistics()
156+
}
157+
158+
#[allow(clippy::as_conversions)]
159+
async fn insert_into(
160+
&self,
161+
state: &dyn Session,
162+
input: Arc<dyn ExecutionPlan>,
163+
insert_op: InsertOp,
164+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
165+
self.inner.insert_into(state, input, insert_op).await
166+
}
167+
}

crates/executor/src/tests/sql/physical_optimizer/list_field_metadata.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ use crate::test_query;
33
test_query!(
44
list_field_metadata_array,
55
"SELECT * FROM array",
6-
setup_queries = ["CREATE TABLE array as (SELECT [1,2]::ARRAY)"],
6+
setup_queries = ["CREATE TABLE array as (SELECT [1,2]::ARRAY as test)"],
77
snapshot_path = "list_field_metadata"
88
);
99

1010
test_query!(
1111
list_field_metadata_literal,
1212
"SELECT * FROM array",
13-
setup_queries = ["CREATE TABLE array as (SELECT 'A'::ARRAY)"],
13+
setup_queries = ["CREATE TABLE array as (SELECT 'A'::ARRAY as test)"],
1414
snapshot_path = "list_field_metadata"
1515
);
1616

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
---
22
source: crates/executor/src/tests/sql/physical_optimizer/list_field_metadata.rs
33
description: "\"SELECT * FROM array\""
4-
info: "Setup queries: CREATE TABLE array as (SELECT [1,2]::ARRAY)"
4+
info: "Setup queries: CREATE TABLE array as (SELECT [1,2]::ARRAY as test)"
55
---
66
Ok(
77
[
8-
"+------------------------------------+",
9-
"| array_construct(Int64(1),Int64(2)) |",
10-
"+------------------------------------+",
11-
"| [1, 2] |",
12-
"+------------------------------------+",
8+
"+--------+",
9+
"| test |",
10+
"+--------+",
11+
"| [1, 2] |",
12+
"+--------+",
1313
],
1414
)
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
---
22
source: crates/executor/src/tests/sql/physical_optimizer/list_field_metadata.rs
33
description: "\"SELECT * FROM array\""
4-
info: "Setup queries: CREATE TABLE array as (SELECT 'A'::ARRAY)"
4+
info: "Setup queries: CREATE TABLE array as (SELECT 'A'::ARRAY as test)"
55
---
66
Ok(
77
[
8-
"+-----------+",
9-
"| Utf8(\"A\") |",
10-
"+-----------+",
11-
"| [A] |",
12-
"+-----------+",
8+
"+------+",
9+
"| test |",
10+
"+------+",
11+
"| [A] |",
12+
"+------+",
1313
],
1414
)

0 commit comments

Comments
 (0)