Skip to content

Commit 3c53111

Browse files
authored
Support session-time source configuration in DataFusion, and clear up precedence between config sources (#8575)
## Rational for this change Makes it possible to configure Vortex FileSource-based tables using `SET` statements on an appropriately configured DataFusion session. This is mostly useful for testing or interactive workflows. ## What changes are included in this PR? 1. Use DataFusion's `ExtensionOptions` and `ConfigExtension` APIs instead of `config_namespace` to get configuration handling for free. 2. Adds SLT based tests for table and session configuration. 3. Better document the order of precedence between the various sources of config, trying to mirror the built-in file formats. ## What APIs are changed? Are there any user-facing changes? For users of `VortexSource` and `VortexFormat`, some APIs might change slightly, and if they expose Vortex through an interactive interface, the config might change slightly. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent bdbf6c4 commit 3c53111

4 files changed

Lines changed: 205 additions & 43 deletions

File tree

benchmarks/datafusion-bench/src/lib.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
122122
}
123123

124124
fn vortex_table_options() -> VortexTableOptions {
125-
VortexTableOptions {
126-
projection_pushdown: true,
127-
predicate_pushdown: true,
128-
..Default::default()
129-
}
125+
let mut opts = VortexTableOptions::default();
126+
127+
opts.predicate_pushdown = true;
128+
opts.predicate_pushdown = true;
129+
130+
opts
130131
}

vortex-datafusion/src/persistent/format.rs

Lines changed: 119 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ use datafusion_common::GetExt;
1515
use datafusion_common::Result as DFResult;
1616
use datafusion_common::ScalarValue as DFScalarValue;
1717
use datafusion_common::Statistics;
18+
use datafusion_common::config::ConfigExtension;
1819
use datafusion_common::config::ConfigField;
19-
use datafusion_common::config_namespace;
20+
use datafusion_common::extensions_options;
2021
use datafusion_common::internal_datafusion_err;
2122
use datafusion_common::not_impl_err;
2223
use datafusion_common::parsers::CompressionTypeVariant;
@@ -132,28 +133,62 @@ impl Debug for VortexFormat {
132133
}
133134
}
134135

135-
config_namespace! {
136+
extensions_options! {
136137
/// Options to configure [`VortexFormat`] and [`VortexSource`].
137138
///
138-
/// These options are usually set on a [`VortexFormatFactory`] and inherited
139-
/// by the `VortexFormat` / `VortexSource` instances created for individual
140-
/// tables.
139+
/// The API follows DataFusion's built-in Parquet and JSON format factories:
140+
/// a format factory may carry customized defaults, the session may carry
141+
/// format defaults, and `CREATE EXTERNAL TABLE ... OPTIONS(...)` can
142+
/// override individual fields for one table.
143+
///
144+
/// [`FileFormatFactory::create`] builds the `VortexTableOptions` copied into
145+
/// each [`VortexFormat`] as follows:
146+
///
147+
/// 1. If the factory has explicit options from
148+
/// [`VortexFormatFactory::with_options`] or
149+
/// [`VortexFormatFactory::new_with_options`], start from that complete
150+
/// `VortexTableOptions` value. This matches
151+
/// [`ParquetFormatFactory::new_with_options`] and
152+
/// [`JsonFormatFactory::new_with_options`]: factory options replace
153+
/// session defaults; they are not merged with them field-by-field.
154+
/// 2. If the factory does not have explicit options, read the session's
155+
/// `vortex` extension at the time `create` is called. This is the value
156+
/// changed by `SET vortex.<option> = ...`.
157+
/// 3. If the session has no `vortex` extension, start from
158+
/// `VortexTableOptions::default()`.
159+
/// 4. Apply table `OPTIONS(...)` last. Each option overwrites only its
160+
/// matching field, so per-table settings can override either the factory
161+
/// options or the session/default value.
162+
///
163+
/// In SQL, session settings use the `vortex.` prefix. Table options use the
164+
/// field names directly, the same style as Parquet or JSON table options:
165+
///
166+
/// ```text
167+
/// SET vortex.predicate_pushdown = false;
168+
///
169+
/// CREATE EXTERNAL TABLE t (x BIGINT)
170+
/// STORED AS vortex
171+
/// LOCATION 's3://bucket/path/'
172+
/// OPTIONS(predicate_pushdown 'true');
173+
/// ```
141174
///
142175
/// # Example
143176
///
144177
/// ```rust
145178
/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
146179
///
147-
/// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
148-
/// projection_pushdown: true,
149-
/// predicate_pushdown: true,
150-
/// scan_concurrency: Some(8),
151-
/// ..Default::default()
152-
/// });
180+
/// let mut options = VortexTableOptions::default();
181+
/// options.predicate_pushdown = true;
182+
/// options.projection_pushdown = true;
183+
/// options.scan_concurrency = Some(8);
184+
///
185+
/// let factory = VortexFormatFactory::new().with_options(options);
153186
/// # let _ = factory;
154187
/// ```
155188
///
156189
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
190+
/// [`ParquetFormatFactory::new_with_options`]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/parquet/struct.ParquetFormatFactory.html#method.new_with_options
191+
/// [`JsonFormatFactory::new_with_options`]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/json/struct.JsonFormatFactory.html#method.new_with_options
157192
pub struct VortexTableOptions {
158193
/// The number of bytes to read when parsing a file footer.
159194
///
@@ -165,33 +200,46 @@ config_namespace! {
165200
/// When enabled, projection expressions may be partially evaluated during
166201
/// the scan. When disabled, Vortex reads only the referenced columns and
167202
/// all expressions are evaluated after the scan.
203+
///
204+
/// Disabled by default.
168205
pub projection_pushdown: bool, default = false
169206
/// Whether to enable predicate pushdown into the underlying Vortex scan.
170207
///
171208
/// When enabled, supported filters are evaluated during the scan. When
172209
/// disabled, DataFusion evaluates filters after the scan, while
173210
/// `VortexSource` can still use the full predicate for file pruning.
211+
///
212+
/// Enabled by default.
174213
pub predicate_pushdown: bool, default = true
175214
/// The intra-partition scan concurrency, controlling the number of row splits to process
176215
/// concurrently per-thread within each file.
177216
///
178217
/// This does not affect the overall parallelism
179218
/// across partitions, which is controlled by DataFusion's execution configuration.
219+
///
220+
/// Leave as `None` to use Vortex's scan default. Override per session
221+
/// with `SET vortex.scan_concurrency = <n>`, or per table with
222+
/// `OPTIONS(scan_concurrency '<n>')`.
180223
pub scan_concurrency: Option<usize>, default = None
181224
}
182225
}
183226

184-
impl Eq for VortexTableOptions {}
227+
impl ConfigExtension for VortexTableOptions {
228+
const PREFIX: &'static str = "vortex";
229+
}
185230

186231
/// Registration entry point for the file-backed Vortex integration.
187232
///
188233
/// `VortexFormatFactory` is the type most applications use. Register it with a
189234
/// DataFusion session, and DataFusion will create [`VortexFormat`] values for
190235
/// `CREATE EXTERNAL TABLE`, [`ListingTable`], and URL-table scans.
191236
///
192-
/// The factory stores a [`VortexSession`] and default [`VortexTableOptions`].
193-
/// Those defaults are copied into the formats and sources created for each
194-
/// table.
237+
/// The factory stores a [`VortexSession`] and optional factory-level
238+
/// [`VortexTableOptions`]. When options are set on the factory they act like
239+
/// customized format defaults, matching DataFusion's Parquet and JSON factory
240+
/// APIs. Otherwise, `VortexFormatFactory::create` uses the session's `vortex`
241+
/// options. In both cases, table `OPTIONS(...)` are applied last for the table
242+
/// being created.
195243
///
196244
/// # Example
197245
///
@@ -203,11 +251,11 @@ impl Eq for VortexTableOptions {}
203251
/// use datafusion_common::GetExt;
204252
/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
205253
///
206-
/// let factory = Arc::new(VortexFormatFactory::new().with_options(VortexTableOptions {
207-
/// projection_pushdown: true,
208-
/// predicate_pushdown: true,
209-
/// ..Default::default()
210-
/// }));
254+
/// let mut options = VortexTableOptions::default();
255+
/// options.predicate_pushdown = true;
256+
/// options.projection_pushdown = true;
257+
///
258+
/// let factory = Arc::new(VortexFormatFactory::new().with_options(options));
211259
///
212260
/// let mut state_builder = SessionStateBuilder::new()
213261
/// .with_default_features()
@@ -235,7 +283,12 @@ impl GetExt for VortexFormatFactory {
235283
}
236284

237285
impl VortexFormatFactory {
238-
/// Creates a factory with a default [`VortexSession`] and default options.
286+
/// Creates a factory with a default [`VortexSession`] and no factory-level
287+
/// options.
288+
///
289+
/// Formats created by this factory start from the session's `vortex`
290+
/// options, or from [`VortexTableOptions::default`] if the session does not
291+
/// contain them. Table-level `OPTIONS(...)` are still applied last.
239292
#[expect(
240293
clippy::new_without_default,
241294
reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
@@ -247,33 +300,37 @@ impl VortexFormatFactory {
247300
}
248301
}
249302

250-
/// Creates a factory with an explicit session and default options.
303+
/// Creates a factory with an explicit session and factory-level options.
251304
///
252-
/// The supplied options become the baseline for every [`VortexFormat`]
253-
/// created by this factory. DataFusion may still override them with
254-
/// table-level options passed into [`FileFormatFactory::create`].
305+
/// The supplied options become the complete starting value for every
306+
/// [`VortexFormat`] created by this factory. Session `SET vortex.*` values
307+
/// are ignored for these formats, matching DataFusion's built-in
308+
/// `new_with_options` factories. Table-level `OPTIONS(...)` are still
309+
/// applied last.
255310
pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
256311
Self {
257312
session,
258313
options: Some(options),
259314
}
260315
}
261316

262-
/// Overrides the default options for this factory.
317+
/// Sets factory-level options.
263318
///
264-
/// This is the usual way to turn on features such as projection pushdown for
265-
/// every table created through the factory.
319+
/// This is the usual way to customize Vortex defaults for every table
320+
/// created through the factory. These options replace, rather than merge
321+
/// with, session `SET vortex.*` values. Table-level `OPTIONS(...)` are still
322+
/// applied last.
266323
///
267324
/// # Example
268325
///
269326
/// ```rust
270327
/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
271328
///
272-
/// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
273-
/// projection_pushdown: true,
274-
/// predicate_pushdown: true,
275-
/// ..Default::default()
276-
/// });
329+
/// let mut options = VortexTableOptions::default();
330+
/// options.predicate_pushdown = true;
331+
/// options.projection_pushdown = true;
332+
///
333+
/// let factory = VortexFormatFactory::new().with_options(options);
277334
/// # let _ = factory;
278335
/// ```
279336
pub fn with_options(mut self, options: VortexTableOptions) -> Self {
@@ -286,13 +343,32 @@ impl FileFormatFactory for VortexFormatFactory {
286343
#[expect(clippy::disallowed_types, reason = "required by trait signature")]
287344
fn create(
288345
&self,
289-
_state: &dyn Session,
346+
state: &dyn Session,
290347
format_options: &std::collections::HashMap<String, String>,
291348
) -> DFResult<Arc<dyn FileFormat>> {
292-
let mut opts = self.options.clone().unwrap_or_default();
349+
// This mirrors DataFusion's Parquet/JSON file-format factories:
350+
//
351+
// 1. Factory options are a complete customized default when present.
352+
// 2. Without factory options, use the session's `vortex` extension
353+
// (`SET vortex.* = ...`), falling back to built-in defaults.
354+
// 3. Table-level `CREATE EXTERNAL TABLE ... OPTIONS(...)` values apply
355+
// last. DataFusion prefixes file-format options with `format.`
356+
// before passing them to this factory; SQL users write the field
357+
// name directly, e.g. `OPTIONS(predicate_pushdown 'false')`.
358+
let mut opts = self
359+
.options
360+
.clone()
361+
.or_else(|| {
362+
state
363+
.config_options()
364+
.extensions
365+
.get::<VortexTableOptions>()
366+
.cloned()
367+
})
368+
.unwrap_or_default();
293369
for (key, value) in format_options {
294370
if let Some(key) = key.strip_prefix("format.") {
295-
opts.set(key, value)?;
371+
ConfigField::set(&mut opts, key, value)?;
296372
} else {
297373
tracing::trace!("Ignoring option '{key}'");
298374
}
@@ -698,7 +774,7 @@ mod tests {
698774
#[test]
699775
fn format_plumbs_footer_initial_read_size() {
700776
let mut opts = VortexTableOptions::default();
701-
opts.set("footer_initial_read_size_bytes", "12345").unwrap();
777+
ConfigField::set(&mut opts, "footer_initial_read_size_bytes", "12345").unwrap();
702778

703779
let format = VortexFormat::new_with_options(VortexSession::default(), opts);
704780
assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
@@ -720,7 +796,12 @@ mod tests {
720796
.downcast_ref::<VortexSource>()
721797
.ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?;
722798

723-
assert_eq!(source.options(), &opts);
799+
assert_eq!(
800+
source.options().projection_pushdown,
801+
opts.projection_pushdown
802+
);
803+
assert_eq!(source.options().predicate_pushdown, opts.predicate_pushdown);
804+
assert_eq!(source.options().scan_concurrency, opts.scan_concurrency);
724805
Ok(())
725806
}
726807
}

vortex-sqllogictest/bin/sqllogictests-runner.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::sync::LazyLock;
1010
use datafusion::common::GetExt;
1111
use datafusion::datasource::provider::DefaultTableFactory;
1212
use datafusion::execution::SessionStateBuilder;
13+
use datafusion::prelude::SessionConfig;
1314
use datafusion::prelude::SessionContext;
1415
use datafusion_sqllogictest::DataFusion;
1516
use datafusion_sqllogictest::df_value_validator;
@@ -21,6 +22,7 @@ use sqllogictest::harness::Failed;
2122
use sqllogictest::harness::Trial;
2223
use sqllogictest::strict_column_validator;
2324
use vortex_datafusion::VortexFormatFactory;
25+
use vortex_datafusion::VortexTableOptions;
2426
use vortex_sqllogictest::duckdb::DuckDB;
2527
use vortex_sqllogictest::duckdb::duckdb_validator;
2628
use vortex_sqllogictest::normalize::PathNormalizing;
@@ -61,8 +63,10 @@ fn drive_datafusion(path: &Path, work_dir: &Path, mode: Mode) -> anyhow::Result<
6163

6264
let rt = build_runtime()?;
6365
rt.block_on(async {
66+
let config = SessionConfig::default().with_option_extension(VortexTableOptions::default());
6467
let factory = Arc::new(VortexFormatFactory::new());
6568
let session_state_builder = SessionStateBuilder::new()
69+
.with_config(config)
6670
.with_default_features()
6771
.with_table_factory(
6872
factory.get_ext().to_uppercase(),
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
# Example-style coverage for DataFusion Vortex table options.
5+
#
6+
# Session settings provide the baseline for tables created through
7+
# VortexFormatFactory::create. Per-table OPTIONS override that baseline for the
8+
# table being created.
9+
10+
include ../setup.slt.no
11+
12+
statement ok
13+
SET vortex.predicate_pushdown = false;
14+
15+
statement ok
16+
CREATE EXTERNAL TABLE session_disabled_pushdown (
17+
x BIGINT NOT NULL,
18+
y BIGINT NOT NULL
19+
)
20+
STORED AS vortex
21+
LOCATION '${WORK_DIR}/session_disabled_pushdown/';
22+
23+
statement ok
24+
INSERT INTO session_disabled_pushdown VALUES
25+
(1, 10),
26+
(2, 20),
27+
(3, 30);
28+
29+
query TT
30+
EXPLAIN SELECT y FROM session_disabled_pushdown WHERE x > 1 ORDER BY y;
31+
----
32+
logical_plan
33+
01)Sort: session_disabled_pushdown.y ASC NULLS LAST
34+
02)--Projection: session_disabled_pushdown.y
35+
03)----Filter: session_disabled_pushdown.x > Int64(1)
36+
04)------TableScan: session_disabled_pushdown projection=[x, y], partial_filters=[session_disabled_pushdown.x > Int64(1)]
37+
physical_plan
38+
01)SortExec: expr=[y@0 ASC NULLS LAST], preserve_partitioning=[false]
39+
02)--FilterExec: x@0 > 1, projection=[y@1]
40+
03)----DataSourceExec: file_groups={<slt:ignore>}, projection=[x, y], file_type=vortex
41+
42+
statement ok
43+
CREATE EXTERNAL TABLE table_enabled_pushdown (
44+
x BIGINT NOT NULL,
45+
y BIGINT NOT NULL
46+
)
47+
STORED AS vortex
48+
LOCATION '${WORK_DIR}/table_enabled_pushdown/'
49+
OPTIONS(predicate_pushdown 'true');
50+
51+
statement ok
52+
INSERT INTO table_enabled_pushdown VALUES
53+
(1, 10),
54+
(2, 20),
55+
(3, 30);
56+
57+
query TT
58+
EXPLAIN SELECT y FROM table_enabled_pushdown WHERE x > 1 ORDER BY y;
59+
----
60+
logical_plan
61+
01)Sort: table_enabled_pushdown.y ASC NULLS LAST
62+
02)--Projection: table_enabled_pushdown.y
63+
03)----Filter: table_enabled_pushdown.x > Int64(1)
64+
04)------TableScan: table_enabled_pushdown projection=[x, y], partial_filters=[table_enabled_pushdown.x > Int64(1)]
65+
physical_plan
66+
01)SortExec: expr=[y@0 ASC NULLS LAST], preserve_partitioning=[false]
67+
02)--DataSourceExec: file_groups={<slt:ignore>}, projection=[y], file_type=vortex, predicate: x@0 > 1
68+
69+
query I
70+
SELECT y FROM table_enabled_pushdown WHERE x > 1 ORDER BY y;
71+
----
72+
20
73+
30
74+
75+
statement ok
76+
SET vortex.predicate_pushdown = true;

0 commit comments

Comments
 (0)