Skip to content

Commit b8dd5b6

Browse files
committed
[Experiment] Adaptive filter pushdown
1 parent 9d92944 commit b8dd5b6

33 files changed

Lines changed: 5538 additions & 626 deletions

File tree

Cargo.lock

Lines changed: 30 additions & 46 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,34 @@ url = "2.5.7"
204204
uuid = "1.23"
205205
zstd = { version = "0.13", default-features = false }
206206

207+
# Override arrow / parquet to the `adaptive-strategy-swap` branch on
208+
# pydantic's fork of arrow-rs, which adds the `swap_strategy` API on
209+
# `ParquetPushDecoder` that the in-decoder adaptive filter scheduling
210+
# depends on.
211+
#
212+
# The full set of arrow-rs workspace crates is listed so transitive
213+
# deps (e.g. `arrow-cast` pulled in via `arrow`) resolve to the patched
214+
# version and we don't link two copies into one binary.
215+
#
216+
# Branch: https://github.com/pydantic/arrow-rs/tree/adaptive-strategy-swap
217+
[patch.crates-io]
218+
arrow = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
219+
arrow-arith = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
220+
arrow-array = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
221+
arrow-buffer = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
222+
arrow-cast = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
223+
arrow-csv = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
224+
arrow-data = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
225+
arrow-flight = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
226+
arrow-ipc = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
227+
arrow-json = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
228+
arrow-ord = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
229+
arrow-row = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
230+
arrow-schema = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
231+
arrow-select = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
232+
arrow-string = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
233+
parquet = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
234+
207235
[workspace.lints.clippy]
208236
# Detects large stack-allocated futures that may cause stack overflow crashes (see threshold in clippy.toml)
209237
large_futures = "warn"

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ pub async fn json_shredding() -> Result<()> {
9292
// Set up query execution
9393
let mut cfg = SessionConfig::new();
9494
cfg.options_mut().execution.parquet.pushdown_filters = true;
95+
// Force every filter to row-level so the example's
96+
// `pushdown_rows_pruned=1` assertion is deterministic. The default
97+
// adaptive scheduler keeps small-file filters on the post-scan path
98+
// (via the byte-ratio heuristic), where `pushdown_rows_pruned` stays
99+
// 0; setting `filter_pushdown_min_bytes_per_sec = 0` disables that
100+
// heuristic.
101+
cfg.options_mut()
102+
.execution
103+
.parquet
104+
.filter_pushdown_min_bytes_per_sec = 0.0;
95105
let ctx = SessionContext::new_with_config(cfg);
96106
ctx.runtime_env().register_object_store(
97107
ObjectStoreUrl::parse("memory://")?.as_ref(),

datafusion/common/src/config.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,29 @@ config_namespace! {
919919
/// parquet reader setting. 0 means no caching.
920920
pub max_predicate_cache_size: Option<usize>, default = None
921921

922+
/// (reading) Minimum throughput, in bytes per second, that an adaptive
923+
/// row-level filter must sustain to remain at row-level. Filters that
924+
/// drop below this threshold (with statistical confidence — see
925+
/// `filter_confidence_z`) are demoted to post-scan, or dropped entirely
926+
/// if they were optional (e.g. a hash-join build-side dynamic filter).
927+
/// Set to `0` to force every filter to row-level (skip the threshold
928+
/// check); set to `f64::INFINITY` to keep every filter post-scan.
929+
pub filter_pushdown_min_bytes_per_sec: f64, default = 100.0 * 1024.0 * 1024.0
930+
931+
/// (reading) Initial-placement heuristic for adaptive filters: when a
932+
/// filter is first observed, place it at row-level if its column bytes
933+
/// are this fraction or less of the total projection's column bytes.
934+
/// Above this ratio, the filter starts as post-scan and only gets
935+
/// promoted later if measured throughput crosses
936+
/// `filter_pushdown_min_bytes_per_sec`.
937+
pub filter_collecting_byte_ratio_threshold: f64, default = 0.20
938+
939+
/// (reading) Z-score for the one-sided confidence interval the adaptive
940+
/// filter scheduler uses when promoting / demoting / dropping filters.
941+
/// Default `2.0` (≈ 97.5%) keeps strategy moves conservative; lower the
942+
/// value for snappier adaptation, raise it for more stable placements.
943+
pub filter_confidence_z: f64, default = 2.0
944+
922945
// The following options affect writing to parquet files
923946
// and map to parquet::file::properties::WriterProperties
924947

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
157157
}
158158

159159
if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
160-
builder =
161-
builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
160+
builder = builder
161+
.set_column_bloom_filter_max_ndv(path.clone(), bloom_filter_ndv);
162162
}
163163
}
164164

@@ -210,6 +210,10 @@ impl ParquetOptions {
210210
coerce_int96: _, // not used for writer props
211211
skip_arrow_metadata: _,
212212
max_predicate_cache_size: _,
213+
// Read-time adaptive filter knobs; not used for writer props.
214+
filter_pushdown_min_bytes_per_sec: _,
215+
filter_collecting_byte_ratio_threshold: _,
216+
filter_confidence_z: _,
213217
} = self;
214218

215219
let mut builder = WriterProperties::builder()
@@ -234,7 +238,7 @@ impl ParquetOptions {
234238
builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
235239
};
236240
if let Some(bloom_filter_ndv) = bloom_filter_ndv {
237-
builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv);
241+
builder = builder.set_bloom_filter_max_ndv(*bloom_filter_ndv);
238242
};
239243
if let Some(dictionary_enabled) = dictionary_enabled {
240244
builder = builder.set_dictionary_enabled(*dictionary_enabled);
@@ -483,6 +487,10 @@ mod tests {
483487
skip_arrow_metadata: defaults.skip_arrow_metadata,
484488
coerce_int96: None,
485489
max_predicate_cache_size: defaults.max_predicate_cache_size,
490+
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
491+
filter_collecting_byte_ratio_threshold: defaults
492+
.filter_collecting_byte_ratio_threshold,
493+
filter_confidence_z: defaults.filter_confidence_z,
486494
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
487495
}
488496
}
@@ -600,6 +608,11 @@ mod tests {
600608
binary_as_string: global_options_defaults.binary_as_string,
601609
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
602610
coerce_int96: None,
611+
filter_pushdown_min_bytes_per_sec: global_options_defaults
612+
.filter_pushdown_min_bytes_per_sec,
613+
filter_collecting_byte_ratio_threshold: global_options_defaults
614+
.filter_collecting_byte_ratio_threshold,
615+
filter_confidence_z: global_options_defaults.filter_confidence_z,
603616
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
604617
CdcOptions {
605618
min_chunk_size: c.min_chunk_size,
@@ -900,7 +913,7 @@ mod tests {
900913
// the WriterProperties::default, with only ndv set
901914
let default_writer_props = WriterProperties::builder()
902915
.set_bloom_filter_enabled(true)
903-
.set_bloom_filter_ndv(42)
916+
.set_bloom_filter_max_ndv(42)
904917
.build();
905918

906919
assert_eq!(

0 commit comments

Comments
 (0)