Skip to content

Commit af9e4b5

Browse files
committed
Merge remote-tracking branch 'upstream/main' into asolimando/22958-statistics-from-inputs
2 parents d4701d5 + 46b508e commit af9e4b5

120 files changed

Lines changed: 8313 additions & 3945 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/README.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,15 @@ Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark foc
558558

559559
Sort integration benchmark runs whole table sort queries on TPCH `lineitem` table, with different characteristics. For example, different number of sort keys, different sort key cardinality, different number of payload columns, etc.
560560

561-
If the TPCH tables have been converted as sorted on their first column (see [Sorted Conversion](#sorted-conversion)), you can use the `--sorted` flag to indicate that the input data is pre-sorted, allowing DataFusion to leverage that order during query execution.
561+
The `--sorted` flag does not sort or rewrite the input files. It declares that the `lineitem` Parquet input is already sorted ascending by its first column (`l_orderkey`). DataFusion can then leverage that ordering during query execution.
562+
563+
To generate the expected TPC-H SF=1 Parquet input for this benchmark, run:
564+
565+
```bash
566+
./bench.sh data tpch
567+
```
568+
569+
For the `lineitem` table used by `sort-tpch`, this uses `tpchgen-cli` to generate Parquet data that is already ordered by `l_orderkey`. If you use a different input directory, only pass `--sorted` when the `lineitem` files already have that ordering.
562570

563571
Additionally, an optional `--limit` flag is available for the sort benchmark. When specified, this flag appends a `LIMIT n` clause to the SQL query, effectively converting the query into a TopK query. Combining the `--sorted` and `--limit` options enables benchmarking of TopK queries on pre-sorted inputs.
564572

@@ -578,7 +586,7 @@ See [`sort_tpch.rs`](src/sort_tpch.rs) for more details.
578586
cargo run --release --bin dfbench -- sort-tpch -p './datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
579587
```
580588

581-
3. Run all queries as TopK queries on presorted data:
589+
3. Run all queries as TopK queries on already sorted data:
582590

583591
```bash
584592
cargo run --release --bin dfbench -- sort-tpch --sorted --limit 10 -p './datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
@@ -598,6 +606,14 @@ In addition, topk_tpch is available from the bench.sh script:
598606
./bench.sh run topk_tpch
599607
```
600608

609+
To benchmark TopK queries on TPC-H `lineitem` input ordered by `l_orderkey`, use:
610+
611+
```bash
612+
./bench.sh run topk_sorted_tpch
613+
```
614+
615+
This runs `dfbench sort-tpch --sorted --limit 100` through the benchmark script, using `--sorted` to declare the existing `l_orderkey` ordering.
616+
601617
## IMDB
602618

603619
Run Join Order Benchmark (JOB) on IMDB dataset.

benchmarks/bench.sh

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ tpcds: TPCDS inspired benchmark on Scale Factor (SF) 1 (~1GB),
9999
sort_tpch: Benchmark of sorting speed for end-to-end sort queries on TPC-H dataset (SF=1)
100100
sort_tpch10: Benchmark of sorting speed for end-to-end sort queries on TPC-H dataset (SF=10)
101101
topk_tpch: Benchmark of top-k (sorting with limit) queries on TPC-H dataset (SF=1)
102+
topk_sorted_tpch: Benchmark of top-k queries on TPC-H lineitem ordered by l_orderkey (SF=1)
102103
push_down_topk: Benchmark of ORDER BY ... LIMIT over outer joins on TPC-H dataset (SF=1) — exercises pushing TopK through a join
103104
external_aggr: External aggregation benchmark on TPC-H dataset (SF=1)
104105
wide_schema: Small-projection queries on a wide synthetic dataset (1024 cols × 256 files) — measures per-file metadata overhead
@@ -346,7 +347,7 @@ main() {
346347
# same data as for tpch10
347348
data_tpch "10" "parquet"
348349
;;
349-
topk_tpch)
350+
topk_tpch|topk_sorted_tpch)
350351
# same data as for tpch
351352
data_tpch "1" "parquet"
352353
;;
@@ -577,6 +578,9 @@ main() {
577578
topk_tpch)
578579
run_topk_tpch
579580
;;
581+
topk_sorted_tpch)
582+
run_topk_sorted_tpch
583+
;;
580584
push_down_topk)
581585
run_push_down_topk
582586
;;
@@ -1506,6 +1510,16 @@ run_topk_tpch() {
15061510
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} ${LATENCY_ARG}
15071511
}
15081512

1513+
# Runs the sorted sort tpch integration benchmark with limit 100 (topk)
1514+
run_topk_sorted_tpch() {
1515+
TPCH_DIR="${DATA_DIR}/tpch_sf1"
1516+
RESULTS_FILE="${RESULTS_DIR}/run_topk_sorted_tpch.json"
1517+
echo "RESULTS_FILE: ${RESULTS_FILE}"
1518+
echo "Running sorted topk tpch benchmark..."
1519+
1520+
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --sorted --limit 100 ${QUERY_ARG} ${LATENCY_ARG}
1521+
}
1522+
15091523
# Runs the nlj benchmark
15101524
run_nlj() {
15111525
RESULTS_FILE="${RESULTS_DIR}/nlj.json"
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
subgroup correlation
2+
3+
template sql_benchmarks/predicate_eval/predicate_eval.benchmark.template
4+
SUBGROUP=correlation
5+
QPAD=73
6+
DATASET=corrproxy
7+
NAME=correlation_q73_redundant_proxy
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
-- Correlated-proxy dataset: a cheap integer predicate that is a perfect proxy
2+
-- for three string predicates, plus one independent string predicate.
3+
--
4+
-- c0 = 1 for ~30% of rows (cheap proxy)
5+
-- s1, s2, s3 each contain a marker exactly where c0 = 1 (correlated)
6+
-- s4 contains a marker for an independent ~30% (independent)
7+
--
8+
-- The four string columns are deliberately *identical in shape*: same width,
9+
-- the same single marker at the same offset, each matched by an equally cheap
10+
-- regex with the same ~30% marginal selectivity. Marginally the four regex
11+
-- predicates are therefore indistinguishable -- same cost, same selectivity, in
12+
-- every position -- so neither a marginal cost/selectivity estimator nor
13+
-- runtime timing can prefer one over another. Only their *conditional*
14+
-- behaviour behind the proxy differs: after `c0 = 1`, the s1/s2/s3 regexes keep
15+
-- every survivor (each re-tests the proxy's condition) while the s4 regex still
16+
-- discards ~70%. Only joint statistics can see that; an independence assumption
17+
-- prices all four regexes identically in every position.
18+
--
19+
-- PRED_FILL sets the filler width on each side of the marker (a non-matching
20+
-- `regexp_like` must scan the whole value), and PRED_ROWS sizes the table.
21+
CREATE TABLE t AS
22+
WITH base AS (
23+
SELECT
24+
-- The cheap proxy and the independent control share one definition each, so
25+
-- the perfect-proxy / independence invariants can't drift apart silently.
26+
(value * 7) % 100 < 30 AS proxy, -- ~30%, drives c0 and s1/s2/s3
27+
(value * 13) % 100 < 30 AS indep -- ~30%, independent of proxy, drives s4
28+
FROM generate_series(1, ${PRED_ROWS:-1000000})
29+
)
30+
SELECT
31+
CASE WHEN proxy THEN 1 ELSE 0 END AS c0,
32+
repeat('q', ${PRED_FILL:-30})
33+
|| CASE WHEN proxy THEN 'aaa' ELSE 'zzz' END
34+
|| repeat('q', ${PRED_FILL:-30}) AS s1,
35+
repeat('q', ${PRED_FILL:-30})
36+
|| CASE WHEN proxy THEN 'ccc' ELSE 'zzz' END
37+
|| repeat('q', ${PRED_FILL:-30}) AS s2,
38+
repeat('q', ${PRED_FILL:-30})
39+
|| CASE WHEN proxy THEN 'ddd' ELSE 'zzz' END
40+
|| repeat('q', ${PRED_FILL:-30}) AS s3,
41+
repeat('q', ${PRED_FILL:-30})
42+
|| CASE WHEN indep THEN 'bbb' ELSE 'zzz' END
43+
|| repeat('q', ${PRED_FILL:-30}) AS s4
44+
FROM base;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- Hidden: `c0 = 1` is a perfect proxy for the s1/s2/s3 regexes -- after the
2+
-- cheap proxy, each of those keeps every survivor while the equally selective
3+
-- (~30%) s4 regex still discards ~70%. The optimal order is [c0, s4, s1/s2/s3]
4+
-- (one informative regex on 30% of rows, the three redundant ones on 9%), but
5+
-- the four regexes are marginally identical -- same width, same marker offset,
6+
-- same cost, same selectivity -- so ranking them takes their *joint*
7+
-- distribution with the proxy. Written with the redundant regexes first,
8+
-- grouped with their proxy, as an author naturally would.
9+
SELECT count(*) FROM t
10+
WHERE c0 = 1
11+
AND regexp_like(s1, 'a.a')
12+
AND regexp_like(s2, 'c.c')
13+
AND regexp_like(s3, 'd.d')
14+
AND regexp_like(s4, 'b.b');

benchmarks/src/sort_tpch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ pub struct RunOpt {
6464
#[arg(short = 'm', long = "mem-table")]
6565
mem_table: bool,
6666

67-
/// Mark the first column of each table as sorted in ascending order.
68-
/// The tables should have been created with the `--sort` option for this to have any effect.
67+
/// Declare that the first column of the input table is already sorted in ascending order.
68+
/// This flag only attaches ordering metadata; it does not sort the input files.
6969
#[arg(short = 't', long = "sorted")]
7070
sorted: bool,
7171

datafusion-cli/src/main.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,8 @@ fn get_session_config(args: &Args) -> Result<SessionConfig> {
335335
if batch_size == 0 {
336336
return config_err!("batch_size must be greater than 0");
337337
}
338-
config_options.execution.batch_size = batch_size;
338+
config_options.execution.batch_size =
339+
datafusion_common::config::ConfigNonZeroUsize::try_new(batch_size)?;
339340
};
340341

341342
// use easier to understand "tree" mode by default
@@ -641,9 +642,9 @@ mod tests {
641642
+-----------------------------------+-----------------+---------------------+------+------------------+
642643
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
643644
+-----------------------------------+-----------------+---------------------+------+------------------+
644-
| alltypes_plain.parquet | 1851 | 8794 | 2 | page_index=false |
645+
| alltypes_plain.parquet | 1851 | 8794 | 1 | page_index=false |
645646
| alltypes_tiny_pages.parquet | 454233 | 268970 | 2 | page_index=true |
646-
| lz4_raw_compressed_larger.parquet | 380836 | 1331 | 2 | page_index=false |
647+
| lz4_raw_compressed_larger.parquet | 380836 | 1331 | 1 | page_index=false |
647648
+-----------------------------------+-----------------+---------------------+------+------------------+
648649
");
649650

@@ -672,9 +673,9 @@ mod tests {
672673
+-----------------------------------+-----------------+---------------------+------+------------------+
673674
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
674675
+-----------------------------------+-----------------+---------------------+------+------------------+
675-
| alltypes_plain.parquet | 1851 | 8794 | 5 | page_index=false |
676+
| alltypes_plain.parquet | 1851 | 8794 | 4 | page_index=false |
676677
| alltypes_tiny_pages.parquet | 454233 | 268970 | 2 | page_index=true |
677-
| lz4_raw_compressed_larger.parquet | 380836 | 1331 | 3 | page_index=false |
678+
| lz4_raw_compressed_larger.parquet | 380836 | 1331 | 2 | page_index=false |
678679
+-----------------------------------+-----------------+---------------------+------+------------------+
679680
");
680681

datafusion-cli/src/object_storage/stdin.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ impl StdinUtils {
9999
format!("{}:///{object_name}", Self::SCHEME)
100100
}
101101

102-
/// Returns the object store backing the `stdin://` scheme, reading and
103-
/// buffering standard input on first use and reusing that buffer for any
104-
/// subsequent `stdin://` table created in the same session.
102+
/// Returns the object store backing the `stdin://` scheme, buffering all of
103+
/// standard input when the store is first constructed and reusing that
104+
/// buffer for any subsequent `stdin://` table created in the same session.
105105
///
106106
/// stdin is a one-shot stream: it can only be read once. The object store
107107
/// registry keys by scheme/authority, so every `stdin://` URL maps to the
@@ -268,15 +268,26 @@ mod tests {
268268
// stdin can only be read once, so a second `stdin://` table must reuse
269269
// the store buffered by the first instead of re-reading (now-empty)
270270
// stdin and overwriting it.
271+
//
272+
// The very first read happens inside `get_or_create` -> `object_store`,
273+
// which consumes the real process stdin and so cannot be driven from a
274+
// unit test. Seed the registry with the store that first read would have
275+
// produced (as the first `CREATE EXTERNAL TABLE` does), then drive the
276+
// lookup through `get_or_create` and assert it hands back that exact
277+
// store rather than rebuilding it.
271278
let url = Url::parse("stdin:///stdin.csv").unwrap();
272-
let store =
273-
StdinUtils::in_memory_object_store(&url, b"a\n1\n2\n".to_vec()).await?;
279+
let path = ObjectStorePath::from_url_path(url.path())?;
280+
let buffered: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
281+
buffered.put(&path, b"a\n1\n2\n".to_vec().into()).await?;
274282

275283
let ctx = SessionContext::new();
276-
ctx.register_object_store(&url, store);
284+
ctx.register_object_store(&url, Arc::clone(&buffered));
277285

278286
let reused = StdinUtils::get_or_create(&ctx.state(), &url).await?;
279-
let path = ObjectStorePath::from_url_path(url.path())?;
287+
assert!(
288+
Arc::ptr_eq(&buffered, &reused),
289+
"get_or_create must reuse the registered stdin store, not rebuild it"
290+
);
280291
let bytes = reused.get(&path).await?.bytes().await?;
281292
assert_eq!(bytes.as_ref(), b"a\n1\n2\n");
282293
Ok(())

datafusion/catalog-listing/src/table.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3434
use datafusion_datasource::{
3535
ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics,
3636
};
37-
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
38-
use datafusion_execution::cache::cache_manager::TableScopedPath;
37+
use datafusion_execution::cache::cache_manager::{FileStatisticsCache, TableScopedPath};
3938
use datafusion_expr::dml::InsertOp;
4039
use datafusion_expr::execution_props::ExecutionProps;
4140
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
@@ -264,6 +263,21 @@ impl ListingTable {
264263
self
265264
}
266265

266+
fn statistics_cache(
267+
&self,
268+
has_table_reference: bool,
269+
) -> Option<&Arc<FileStatisticsCache>> {
270+
let shared_cache = self.collected_statistics.as_ref()?;
271+
if has_table_reference || self.schema_source == SchemaSource::Inferred {
272+
Some(shared_cache)
273+
} else {
274+
// Anonymous specified-schema reads can use the same file path with
275+
// different logical schemas. File statistics are schema-dependent,
276+
// so avoid reusing stats computed for a different read schema.
277+
None
278+
}
279+
}
280+
267281
/// Specify the SQL definition for this table, if any
268282
pub fn with_definition(mut self, definition: Option<String>) -> Self {
269283
self.definition = definition;
@@ -807,7 +821,7 @@ impl ListingTable {
807821
let meta = &part_file.object_meta;
808822

809823
// Check cache first - if we have valid cached statistics and ordering
810-
if let Some(cache) = &self.collected_statistics
824+
if let Some(cache) = self.statistics_cache(path.table.is_some())
811825
&& let Some(cached) = cache.get(&path)
812826
&& cached.is_valid_for(meta)
813827
{
@@ -825,7 +839,7 @@ impl ListingTable {
825839
let statistics = Arc::new(file_meta.statistics);
826840

827841
// Store in cache
828-
if let Some(cache) = &self.collected_statistics {
842+
if let Some(cache) = self.statistics_cache(path.table.is_some()) {
829843
cache.put(
830844
&path,
831845
CachedFileMetadata::new(

0 commit comments

Comments
 (0)