Skip to content

Commit 8b4ac09

Browse files
authored
feat: add get clustering columns in transactions (delta-io#1693)
## What changes are proposed in this pull request? Stacked on top of delta-io#1707 - Add support for clustering columns in statistics schema generation - Per the Delta protocol, writers MUST write per-file statistics for clustering columns Contains no optimizations in this PR ## How was this change tested? New unit tests
1 parent bcaf921 commit 8b4ac09

8 files changed

Lines changed: 524 additions & 44 deletions

File tree

kernel/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
152152
"env-filter",
153153
"fmt",
154154
] }
155+
rstest = "0.23"
155156

156157
[[bench]]
157158
name = "metadata_bench"

kernel/src/clustering.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
//! Clustering column support for Delta tables.
2+
//!
3+
//! This module provides functionality for reading clustering columns from domain metadata.
4+
//! Per the Delta protocol, writers MUST write per-file statistics for clustering columns.
5+
//!
6+
//! Clustering columns are stored in domain metadata under the `delta.clustering` domain
7+
//! as a JSON object with a `clusteringColumns` field containing an array of column paths,
8+
//! where each path is an array of field names (to handle nested columns).
9+
10+
use serde::Deserialize;
11+
12+
use crate::actions::domain_metadata::domain_metadata_configuration;
13+
use crate::expressions::ColumnName;
14+
use crate::log_segment::LogSegment;
15+
use crate::{DeltaResult, Engine};
16+
17+
/// Domain metadata structure for clustering columns.
18+
///
19+
/// This is deserialized from the JSON configuration stored in the
20+
/// `delta.clustering` domain metadata. Each clustering column is represented
21+
/// as an array of field names to support nested columns.
22+
///
23+
/// The column names are physical names. If column mapping is enabled, these will be
24+
/// the physical column identifiers (e.g., `col-uuid`); otherwise, they match the logical names.
25+
///
26+
/// Example JSON:
27+
/// ```json
28+
/// {"clusteringColumns": [["col1"], ["user", "address", "city"]]}
29+
/// ```
30+
#[derive(Debug, Deserialize)]
31+
#[serde(rename_all = "camelCase")]
32+
struct ClusteringDomainMetadata {
33+
clustering_columns: Vec<Vec<String>>,
34+
}
35+
36+
/// The domain name for clustering metadata.
37+
const CLUSTERING_DOMAIN_NAME: &str = "delta.clustering";
38+
39+
/// Parses clustering columns from a JSON configuration string.
40+
///
41+
/// Returns `Ok(columns)` if the configuration is valid, or an error if malformed.
42+
fn parse_clustering_columns(json_str: &str) -> DeltaResult<Vec<ColumnName>> {
43+
let metadata: ClusteringDomainMetadata = serde_json::from_str(json_str)?;
44+
Ok(metadata
45+
.clustering_columns
46+
.into_iter()
47+
.map(ColumnName::new)
48+
.collect())
49+
}
50+
51+
/// Reads clustering columns from the log segment's domain metadata.
52+
///
53+
/// This function performs a log scan to find the clustering domain metadata.
54+
/// Callers should first check if the `ClusteredTable` feature is enabled via
55+
/// the protocol before calling this function to avoid unnecessary I/O.
56+
/// See [`Snapshot::get_clustering_columns`] which performs this check.
57+
///
58+
/// Returns `Ok(Some(columns))` if clustering domain metadata exists,
59+
/// `Ok(None)` if no clustering domain metadata is found, or an error if the
60+
/// metadata is malformed.
61+
///
62+
/// [`Snapshot::get_clustering_columns`]: crate::snapshot::Snapshot::get_clustering_columns
63+
pub(crate) fn get_clustering_columns(
64+
log_segment: &LogSegment,
65+
engine: &dyn Engine,
66+
) -> DeltaResult<Option<Vec<ColumnName>>> {
67+
let config = domain_metadata_configuration(log_segment, CLUSTERING_DOMAIN_NAME, engine)?;
68+
match config {
69+
Some(json_str) => Ok(Some(parse_clustering_columns(&json_str)?)),
70+
None => Ok(None),
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::*;
77+
78+
#[rstest::rstest]
79+
#[case::simple(
80+
r#"{"clusteringColumns": [["col1"], ["col2"]]}"#,
81+
vec![vec!["col1"], vec!["col2"]]
82+
)]
83+
#[case::empty(
84+
r#"{"clusteringColumns": []}"#,
85+
vec![]
86+
)]
87+
#[case::nested(
88+
r#"{"clusteringColumns": [["id"], ["user", "address", "city"], ["a", "b", "c", "d", "e"]]}"#,
89+
vec![vec!["id"], vec!["user", "address", "city"], vec!["a", "b", "c", "d", "e"]]
90+
)]
91+
#[case::special_characters(
92+
r#"{"clusteringColumns": [["col.with.dot"], ["`backticks`", "nested"]]}"#,
93+
vec![vec!["col.with.dot"], vec!["`backticks`", "nested"]]
94+
)]
95+
#[case::tolerates_unknown_fields(
96+
r#"{"clusteringColumns": [["col1"]], "foo": "bar", "futureField": 123}"#,
97+
vec![vec!["col1"]]
98+
)]
99+
fn test_parse_clustering_columns(#[case] json: &str, #[case] expected: Vec<Vec<&str>>) {
100+
let columns = parse_clustering_columns(json).unwrap();
101+
let expected_cols: Vec<ColumnName> = expected.into_iter().map(ColumnName::new).collect();
102+
assert_eq!(columns, expected_cols);
103+
}
104+
}

kernel/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ pub use log_path::LogPath;
109109

110110
mod row_tracking;
111111

112+
pub(crate) mod clustering;
113+
112114
mod arrow_compat;
113115
#[cfg(any(feature = "arrow-56", feature = "arrow-57"))]
114116
pub use arrow_compat::*;

0 commit comments

Comments
 (0)