Skip to content

Commit 5b0c491

Browse files
committed
[Experiment] Adaptive filter pushdown
1 parent 9d92944 commit 5b0c491

39 files changed

Lines changed: 5686 additions & 633 deletions

File tree

Cargo.lock

Lines changed: 30 additions & 46 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,34 @@ url = "2.5.7"
204204
uuid = "1.23"
205205
zstd = { version = "0.13", default-features = false }
206206

207+
# Override arrow / parquet to the `adaptive-strategy-swap` branch on
208+
# pydantic's fork of arrow-rs, which adds the `swap_strategy` API on
209+
# `ParquetPushDecoder` that the in-decoder adaptive filter scheduling
210+
# depends on.
211+
#
212+
# The full set of arrow-rs workspace crates is listed so transitive
213+
# deps (e.g. `arrow-cast` pulled in via `arrow`) resolve to the patched
214+
# version and we don't link two copies into one binary.
215+
#
216+
# Branch: https://github.com/pydantic/arrow-rs/tree/adaptive-strategy-swap
217+
[patch.crates-io]
218+
arrow = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
219+
arrow-arith = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
220+
arrow-array = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
221+
arrow-buffer = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
222+
arrow-cast = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
223+
arrow-csv = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
224+
arrow-data = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
225+
arrow-flight = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
226+
arrow-ipc = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
227+
arrow-json = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
228+
arrow-ord = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
229+
arrow-row = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
230+
arrow-schema = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
231+
arrow-select = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
232+
arrow-string = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
233+
parquet = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
234+
207235
[workspace.lints.clippy]
208236
# Detects large stack-allocated futures that may cause stack overflow crashes (see threshold in clippy.toml)
209237
large_futures = "warn"

datafusion-cli/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -616,9 +616,9 @@ mod tests {
616616
+-----------------------------------+-----------------+---------------------+------+------------------+
617617
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
618618
+-----------------------------------+-----------------+---------------------+------+------------------+
619-
| alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false |
620-
| alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true |
621-
| lz4_raw_compressed_larger.parquet | 380836 | 1339 | 2 | page_index=false |
619+
| alltypes_plain.parquet | 1851 | 8794 | 2 | page_index=false |
620+
| alltypes_tiny_pages.parquet | 454233 | 268970 | 2 | page_index=true |
621+
| lz4_raw_compressed_larger.parquet | 380836 | 1331 | 2 | page_index=false |
622622
+-----------------------------------+-----------------+---------------------+------+------------------+
623623
");
624624

@@ -647,9 +647,9 @@ mod tests {
647647
+-----------------------------------+-----------------+---------------------+------+------------------+
648648
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
649649
+-----------------------------------+-----------------+---------------------+------+------------------+
650-
| alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false |
651-
| alltypes_tiny_pages.parquet | 454233 | 269074 | 2 | page_index=true |
652-
| lz4_raw_compressed_larger.parquet | 380836 | 1339 | 3 | page_index=false |
650+
| alltypes_plain.parquet | 1851 | 8794 | 5 | page_index=false |
651+
| alltypes_tiny_pages.parquet | 454233 | 268970 | 2 | page_index=true |
652+
| lz4_raw_compressed_larger.parquet | 380836 | 1331 | 3 | page_index=false |
653653
+-----------------------------------+-----------------+---------------------+------+------------------+
654654
");
655655

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ pub async fn json_shredding() -> Result<()> {
9292
// Set up query execution
9393
let mut cfg = SessionConfig::new();
9494
cfg.options_mut().execution.parquet.pushdown_filters = true;
95+
// Force every filter to row-level so the example's
96+
// `pushdown_rows_pruned=1` assertion is deterministic. The default
97+
// adaptive scheduler keeps small-file filters on the post-scan path
98+
// (via the byte-ratio heuristic), where `pushdown_rows_pruned` stays
99+
// 0; setting `filter_pushdown_min_bytes_per_sec = 0` disables that
100+
// heuristic.
101+
cfg.options_mut()
102+
.execution
103+
.parquet
104+
.filter_pushdown_min_bytes_per_sec = 0.0;
95105
let ctx = SessionContext::new_with_config(cfg);
96106
ctx.runtime_env().register_object_store(
97107
ObjectStoreUrl::parse("memory://")?.as_ref(),

datafusion/common/src/config.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,6 +919,29 @@ config_namespace! {
919919
/// parquet reader setting. 0 means no caching.
920920
pub max_predicate_cache_size: Option<usize>, default = None
921921

922+
/// (reading) Minimum throughput, in bytes per second, that an adaptive
923+
/// row-level filter must sustain to remain at row-level. Filters that
924+
/// drop below this threshold (with statistical confidence — see
925+
/// `filter_confidence_z`) are demoted to post-scan, or dropped entirely
926+
/// if they were optional (e.g. a hash-join build-side dynamic filter).
927+
/// Set to `0` to force every filter to row-level (skip the threshold
928+
/// check); set to `f64::INFINITY` to keep every filter post-scan.
929+
pub filter_pushdown_min_bytes_per_sec: f64, default = 100.0 * 1024.0 * 1024.0
930+
931+
/// (reading) Initial-placement heuristic for adaptive filters: when a
932+
/// filter is first observed, place it at row-level if its column bytes
933+
/// are this fraction or less of the total projection's column bytes.
934+
/// Above this ratio, the filter starts as post-scan and only gets
935+
/// promoted later if measured throughput crosses
936+
/// `filter_pushdown_min_bytes_per_sec`.
937+
pub filter_collecting_byte_ratio_threshold: f64, default = 0.20
938+
939+
/// (reading) Z-score for the one-sided confidence interval the adaptive
940+
/// filter scheduler uses when promoting / demoting / dropping filters.
941+
/// Default `2.0` (≈ 97.5%) keeps strategy moves conservative; lower the
942+
/// value for snappier adaptation, raise it for more stable placements.
943+
pub filter_confidence_z: f64, default = 2.0
944+
922945
// The following options affect writing to parquet files
923946
// and map to parquet::file::properties::WriterProperties
924947

0 commit comments

Comments
 (0)