Skip to content

Commit 0469e5e

Browse files
authored
feat(catalog): expose InformationSchemataBuilder as public API (#22499)
## Which issue does this PR close? No linked issue. Happy to file one if reviewers prefer. ## Rationale for this change Downstream catalog implementations that resolve schemas asynchronously cannot reuse `InformationSchemaProvider` — it enumerates schemas via `CatalogProvider::schema_names()`, which is synchronous, so an async-only catalog has to provide its own `information_schema.schemata` view. Today that requires either duplicating the column layout and the row-building logic, or reaching into private items. Exposing `InformationSchemataBuilder` and a `schemata_schema()` factory lets external crates emit byte-for-byte-compatible `schemata` batches without copy-pasting the contract. ## What changes are included in this PR? - `pub fn schemata_schema() -> SchemaRef` — extracts the column-layout factory. `InformationSchemata::new` now calls it instead of inlining the schema, so there is a single source of truth. - `InformationSchemataBuilder` becomes `pub` (was private) with a `Default` impl and a public `new()`. `add_schemata` and `finish` are bumped to `pub`. The function bodies and parameter types (`&str` / `Option<&str>`) are unchanged. - `finish` now returns `Result<RecordBatch>` instead of panicking via an internal `.unwrap()`. The one internal caller (`PartitionStream::execute` for `InformationSchemata`) was previously wrapping `Ok(builder.finish())` and is updated to just `builder.finish()` since the inner expression now produces the `Result` directly. ## Are these changes tested? Yes. A new unit test `schemata_builder_emits_canonical_schema_and_rows` exercises the public API end-to-end via `Default::default()`, asserts the produced batch's schema matches `schemata_schema()`, and verifies the null pattern for `schema_owner`, the three `default_character_set_*` columns, and `sql_path`. The pre-existing internal users (`InformationSchemata::new`, `PartitionStream::execute`) continue to exercise the same code path through the unchanged `InformationSchemata::builder()` constructor. ## Are there any user-facing changes? Yes — three new public items in `datafusion-catalog`: `schemata_schema`, `InformationSchemataBuilder` (with its `new` / `add_schemata` / `finish` methods + `Default` impl). No existing public API is broken. The `Result<RecordBatch>` return on `finish` is a first-time-public surface, not a regression.
1 parent 7c39318 commit 0469e5e

1 file changed

Lines changed: 121 additions & 17 deletions

File tree

datafusion/catalog/src/information_schema.rs

Lines changed: 121 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -967,18 +967,34 @@ struct InformationSchemata {
967967
config: InformationSchemaConfig,
968968
}
969969

970+
/// The Arrow schema of [`information_schema.schemata`] rows.
971+
///
972+
/// Useful for downstream catalog implementations that want to declare a
973+
/// `TableProvider` for `schemata` before populating any rows via
974+
/// [`InformationSchemataBuilder`].
975+
///
976+
/// Columns and nullability match
977+
/// <https://www.postgresql.org/docs/current/infoschema-schemata.html>.
978+
///
979+
/// [`information_schema.schemata`]: https://www.postgresql.org/docs/current/infoschema-schemata.html
980+
pub fn schemata_schema() -> SchemaRef {
981+
Arc::new(Schema::new(vec![
982+
Field::new("catalog_name", DataType::Utf8, false),
983+
Field::new("schema_name", DataType::Utf8, false),
984+
Field::new("schema_owner", DataType::Utf8, true),
985+
Field::new("default_character_set_catalog", DataType::Utf8, true),
986+
Field::new("default_character_set_schema", DataType::Utf8, true),
987+
Field::new("default_character_set_name", DataType::Utf8, true),
988+
Field::new("sql_path", DataType::Utf8, true),
989+
]))
990+
}
991+
970992
impl InformationSchemata {
971993
fn new(config: InformationSchemaConfig) -> Self {
972-
let schema = Arc::new(Schema::new(vec![
973-
Field::new("catalog_name", DataType::Utf8, false),
974-
Field::new("schema_name", DataType::Utf8, false),
975-
Field::new("schema_owner", DataType::Utf8, true),
976-
Field::new("default_character_set_catalog", DataType::Utf8, true),
977-
Field::new("default_character_set_schema", DataType::Utf8, true),
978-
Field::new("default_character_set_name", DataType::Utf8, true),
979-
Field::new("sql_path", DataType::Utf8, true),
980-
]));
981-
Self { schema, config }
994+
Self {
995+
schema: schemata_schema(),
996+
config,
997+
}
982998
}
983999

9841000
fn builder(&self) -> InformationSchemataBuilder {
@@ -995,7 +1011,16 @@ impl InformationSchemata {
9951011
}
9961012
}
9971013

998-
struct InformationSchemataBuilder {
1014+
/// Builder that produces [`RecordBatch`] values matching the schema of
1015+
/// `information_schema.schemata` (see [`schemata_schema`]).
1016+
///
1017+
/// Intended for downstream catalog implementations that need to emit
1018+
/// `schemata` rows from their own metadata source rather than going
1019+
/// through DataFusion's `InformationSchemaProvider`, which enumerates
1020+
/// schemas synchronously via `CatalogProviderList` and so is unsuitable
1021+
/// for catalog backends that resolve asynchronously.
1022+
#[derive(Debug)]
1023+
pub struct InformationSchemataBuilder {
9991024
schema: SchemaRef,
10001025
catalog_name: StringBuilder,
10011026
schema_name: StringBuilder,
@@ -1006,8 +1031,32 @@ struct InformationSchemataBuilder {
10061031
sql_path: StringBuilder,
10071032
}
10081033

1034+
impl Default for InformationSchemataBuilder {
1035+
fn default() -> Self {
1036+
Self::new()
1037+
}
1038+
}
1039+
10091040
impl InformationSchemataBuilder {
1010-
fn add_schemata(
1041+
/// Construct an empty builder.
1042+
pub fn new() -> Self {
1043+
Self {
1044+
schema: schemata_schema(),
1045+
catalog_name: StringBuilder::new(),
1046+
schema_name: StringBuilder::new(),
1047+
schema_owner: StringBuilder::new(),
1048+
default_character_set_catalog: StringBuilder::new(),
1049+
default_character_set_schema: StringBuilder::new(),
1050+
default_character_set_name: StringBuilder::new(),
1051+
sql_path: StringBuilder::new(),
1052+
}
1053+
}
1054+
1055+
/// Append one row to the builder. `schema_owner` is the optional SQL
1056+
/// schema owner; the three `default_character_set_*` columns and
1057+
/// `sql_path` are written as null (DataFusion does not model those
1058+
/// concepts; see the PostgreSQL docs link on [`schemata_schema`]).
1059+
pub fn add_schemata(
10111060
&mut self,
10121061
catalog_name: &str,
10131062
schema_name: &str,
@@ -1019,15 +1068,19 @@ impl InformationSchemataBuilder {
10191068
Some(owner) => self.schema_owner.append_value(owner),
10201069
None => self.schema_owner.append_null(),
10211070
}
1022-
// refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
1023-
// these rows apply to a feature that is not implemented in DataFusion
10241071
self.default_character_set_catalog.append_null();
10251072
self.default_character_set_schema.append_null();
10261073
self.default_character_set_name.append_null();
10271074
self.sql_path.append_null();
10281075
}
10291076

1030-
fn finish(&mut self) -> RecordBatch {
1077+
/// Finalize the builder into a [`RecordBatch`].
1078+
///
1079+
/// Returns an error only if Arrow buffer construction fails, which
1080+
/// the builder's column-count and type invariants make unreachable
1081+
/// under normal use. The `Result` return type preserves room to add
1082+
/// validation in the future without a breaking API change.
1083+
pub fn finish(&mut self) -> Result<RecordBatch> {
10311084
RecordBatch::try_new(
10321085
Arc::clone(&self.schema),
10331086
vec![
@@ -1040,7 +1093,7 @@ impl InformationSchemataBuilder {
10401093
Arc::new(self.sql_path.finish()),
10411094
],
10421095
)
1043-
.unwrap()
1096+
.map_err(DataFusionError::from)
10441097
}
10451098
}
10461099

@@ -1057,7 +1110,7 @@ impl PartitionStream for InformationSchemata {
10571110
// TODO: Stream this
10581111
futures::stream::once(async move {
10591112
config.make_schemata(&mut builder).await;
1060-
Ok(builder.finish())
1113+
builder.finish()
10611114
}),
10621115
))
10631116
}
@@ -1413,6 +1466,57 @@ impl PartitionStream for InformationSchemaParameters {
14131466
mod tests {
14141467
use super::*;
14151468
use crate::CatalogProvider;
1469+
use arrow::array::Array;
1470+
1471+
#[test]
1472+
fn schemata_builder_emits_canonical_schema_and_rows() {
1473+
// Construct via `Default` so the test exercises both `new()` (via
1474+
// the `Default` impl) and the public column-layout contract.
1475+
let mut builder = InformationSchemataBuilder::default();
1476+
builder.add_schemata("cat", "schema_one", Some("alice"));
1477+
builder.add_schemata("cat", "schema_two", None);
1478+
let batch = builder.finish().expect("finish should not fail");
1479+
1480+
assert_eq!(batch.schema(), schemata_schema());
1481+
assert_eq!(batch.num_rows(), 2);
1482+
1483+
let col = |name: &str| {
1484+
batch
1485+
.column_by_name(name)
1486+
.unwrap_or_else(|| panic!("missing column {name}"))
1487+
};
1488+
let string_col = |name: &str| {
1489+
col(name)
1490+
.as_any()
1491+
.downcast_ref::<arrow::array::StringArray>()
1492+
.unwrap_or_else(|| panic!("{name} should be a StringArray"))
1493+
};
1494+
1495+
let catalog = string_col("catalog_name");
1496+
assert_eq!(catalog.value(0), "cat");
1497+
assert_eq!(catalog.value(1), "cat");
1498+
1499+
let schema = string_col("schema_name");
1500+
assert_eq!(schema.value(0), "schema_one");
1501+
assert_eq!(schema.value(1), "schema_two");
1502+
1503+
let owner = string_col("schema_owner");
1504+
assert_eq!(owner.value(0), "alice");
1505+
assert!(owner.is_null(1));
1506+
1507+
// The three character-set columns and sql_path are unconditionally
1508+
// null — they exist for SQL-standard column-layout compatibility.
1509+
for name in [
1510+
"default_character_set_catalog",
1511+
"default_character_set_schema",
1512+
"default_character_set_name",
1513+
"sql_path",
1514+
] {
1515+
let c = string_col(name);
1516+
assert!(c.is_null(0), "{name} row 0 should be null");
1517+
assert!(c.is_null(1), "{name} row 1 should be null");
1518+
}
1519+
}
14161520

14171521
#[tokio::test]
14181522
async fn make_tables_uses_table_type() {

0 commit comments

Comments
 (0)