diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 45b859ee8c1..85c53a712c2 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -8437,6 +8437,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-substrait", "futures", + "mini-moka", "object_store", "parquet", "prost 0.14.3", @@ -8449,6 +8450,8 @@ dependencies = [ "quickwit-proto", "quickwit-search", "quickwit-storage", + "regex", + "regex-automata", "serde_json", "tempfile", "tokio", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 04244aeee40..b88256869be 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -214,6 +214,7 @@ rdkafka = { version = "0.39", default-features = false, features = [ "zstd", ] } regex = "1.12" +regex-automata = "0.4" regex-syntax = "0.8" reqwest = { version = "0.12", default-features = false, features = [ "json", diff --git a/quickwit/quickwit-datafusion/Cargo.toml b/quickwit/quickwit-datafusion/Cargo.toml index 553d1cf5b52..380897f03a3 100644 --- a/quickwit/quickwit-datafusion/Cargo.toml +++ b/quickwit/quickwit-datafusion/Cargo.toml @@ -15,7 +15,10 @@ async-trait = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } futures = { workspace = true } +mini-moka = { workspace = true } prost = { workspace = true } +regex = { workspace = true } +regex-automata = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/predicate.rs b/quickwit/quickwit-datafusion/src/sources/metrics/predicate.rs index 31a65ac1c5a..3df8aac392f 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/predicate.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/predicate.rs @@ -20,16 +20,18 @@ //! OSS column names: `service`, `env`, `datacenter`, `region`, `host` //! (no `tag_` prefix — the parquet files use bare column names). -use datafusion::logical_expr::{BinaryExpr, Expr, Operator}; +use std::collections::HashMap; + +use datafusion::logical_expr::{BinaryExpr, Expr, Like, Operator}; use datafusion::scalar::ScalarValue; /// Extracted filters for querying the metrics_splits table. /// /// Split-level filters for metastore pruning. /// -/// Only metric name and time range are used — the only fields the metastore -/// reliably populates today. Tag-based pruning will be added when the -/// zonemap/bloom-filter mechanism lands. +/// These fields are safe to pass to the metastore for coarse split discovery. +/// More granular string filters are extracted separately for conservative +/// client-side metadata pruning after the split metadata has been fetched. #[derive(Debug, Default, Clone)] pub struct MetricsSplitQuery { pub metric_names: Option>, @@ -37,6 +39,27 @@ pub struct MetricsSplitQuery { pub time_range_end: Option, } +/// Equality/IN filter over string-valued columns. +/// +/// This is used for split metadata pruning. Values are interpreted as an OR +/// list for a column; multiple filters on the same column are intersected. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct StringFilter { + pub column: String, + pub values: Vec, +} + +/// Prefix filter over string-valued columns extracted from simple LIKE +/// predicates. +/// +/// Each prefix represents `column LIKE 'prefix%'`. More general LIKE patterns +/// are intentionally ignored because split pruning needs to stay conservative. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct StringPrefixFilter { + pub column: String, + pub prefixes: Vec, +} + /// Analyzes pushed-down filter expressions and extracts split-level filters. /// /// Returns a `MetricsSplitQuery` for Postgres pruning plus any remaining @@ -54,6 +77,134 @@ pub fn extract_split_filters(filters: &[Expr]) -> (MetricsSplitQuery, Vec) (query, remaining) } +/// Extract string equality/IN predicates that are safe to evaluate against +/// exact split metadata or zonemap superset regexes. +pub(crate) fn extract_string_filters(filters: &[Expr]) -> Vec { + let mut by_column: HashMap> = HashMap::new(); + + for filter in filters { + collect_string_filters(filter, &mut by_column); + } + + by_column + .into_iter() + .map(|(column, values)| StringFilter { column, values }) + .collect() +} + +/// Extract simple string prefix predicates of the form `column LIKE 'prefix%'`. +pub(crate) fn extract_string_prefix_filters(filters: &[Expr]) -> Vec { + let mut by_column: HashMap> = HashMap::new(); + + for filter in filters { + collect_string_prefix_filters(filter, &mut by_column); + } + + by_column + .into_iter() + .map(|(column, prefixes)| StringPrefixFilter { column, prefixes }) + .collect() +} + +fn collect_string_filters(expr: &Expr, by_column: &mut HashMap>) { + match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::Eq => { + let Some((column, value)) = string_eq_filter(left, right) else { + return; + }; + tighten_string_values(by_column, column, vec![value]); + } + Operator::And => { + collect_string_filters(left, by_column); + collect_string_filters(right, by_column); + } + _ => {} + }, + Expr::InList(in_list) if !in_list.negated => { + let Some(column) = column_name(&in_list.expr) else { + return; + }; + let values: Vec = in_list.list.iter().filter_map(scalar_utf8).collect(); + if values.len() == in_list.list.len() { + tighten_string_values(by_column, column, values); + } + } + _ => {} + } +} + +fn collect_string_prefix_filters(expr: &Expr, by_column: &mut HashMap>) { + match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) if *op == Operator::And => { + collect_string_prefix_filters(left, by_column); + collect_string_prefix_filters(right, by_column); + } + Expr::Like(like) => { + let Some((column, prefix)) = string_prefix_filter(like) else { + return; + }; + by_column.entry(column).or_default().push(prefix); + } + _ => {} + } +} + +fn string_eq_filter(left: &Expr, right: &Expr) -> Option<(String, String)> { + match (column_name(left), scalar_utf8(right)) { + (Some(column), Some(value)) => Some((column, value)), + _ => match (scalar_utf8(left), column_name(right)) { + (Some(value), Some(column)) => Some((column, value)), + _ => None, + }, + } +} + +fn string_prefix_filter(like: &Like) -> Option<(String, String)> { + if like.negated || like.case_insensitive || like.escape_char.is_some() { + return None; + } + + let column = column_name(&like.expr)?; + let pattern = scalar_utf8(&like.pattern)?; + let prefix = simple_like_prefix(&pattern)?; + Some((column, prefix)) +} + +fn simple_like_prefix(pattern: &str) -> Option { + let prefix = pattern.strip_suffix('%')?; + if prefix.is_empty() || prefix.contains(['%', '_']) { + return None; + } + Some(prefix.to_string()) +} + +fn tighten_string_values( + by_column: &mut HashMap>, + column: String, + values: Vec, +) { + let mut values = dedup_strings(values); + match by_column.get_mut(&column) { + Some(existing) => { + existing.retain(|value| values.contains(value)); + } + None => { + by_column.insert(column, std::mem::take(&mut values)); + } + } +} + +fn dedup_strings(values: Vec) -> Vec { + let mut deduped = Vec::with_capacity(values.len()); + for value in values { + if !deduped.contains(&value) { + deduped.push(value); + } + } + deduped +} + fn try_extract_filter(expr: &Expr, query: &mut MetricsSplitQuery) -> bool { match expr { Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { @@ -214,6 +365,8 @@ fn scalar_utf8(expr: &Expr) -> Option { Expr::Literal(ScalarValue::Utf8View(Some(s)), _) => Some(s.clone()), // DF auto-casts string literals to Dict(Int32, Utf8) to match dict-encoded columns Expr::Literal(ScalarValue::Dictionary(_, inner), _) => scalar_utf8_from_scalar(inner), + Expr::Cast(datafusion::logical_expr::Cast { expr, .. }) + | Expr::TryCast(datafusion::logical_expr::TryCast { expr, .. }) => scalar_utf8(expr), _ => None, } } @@ -279,6 +432,70 @@ mod tests { assert_eq!(remaining.len(), 2); } + #[test] + fn test_extract_string_filters_for_tags() { + let filters = vec![ + col("service").eq(lit("web")), + col("env").in_list(vec![lit("prod"), lit("staging")], false), + col("value").gt(lit(42.0)), + ]; + + let mut string_filters = extract_string_filters(&filters); + string_filters.sort_by(|left, right| left.column.cmp(&right.column)); + + assert_eq!( + string_filters, + vec![ + StringFilter { + column: "env".to_string(), + values: vec!["prod".to_string(), "staging".to_string()], + }, + StringFilter { + column: "service".to_string(), + values: vec!["web".to_string()], + }, + ] + ); + } + + #[test] + fn test_extract_string_filters_intersects_repeated_column() { + let filters = vec![ + col("service").in_list(vec![lit("web"), lit("api")], false), + col("service").eq(lit("api")), + ]; + + let string_filters = extract_string_filters(&filters); + + assert_eq!( + string_filters, + vec![StringFilter { + column: "service".to_string(), + values: vec!["api".to_string()], + }] + ); + } + + #[test] + fn test_extract_string_prefix_filters_for_simple_like() { + let filters = vec![ + col("host").like(lit("ID-07%")), + col("env").like(lit("prod")), + col("service").ilike(lit("web%")), + col("region").like(lit("us_%")), + ]; + + let prefix_filters = extract_string_prefix_filters(&filters); + + assert_eq!( + prefix_filters, + vec![StringPrefixFilter { + column: "host".to_string(), + prefixes: vec!["ID-07".to_string()], + }] + ); + } + #[test] fn test_unknown_column_left_as_remaining() { let filters = vec![ diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs index e4d870696a1..12eb350b68e 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs @@ -18,8 +18,9 @@ //! and returns a standard `ParquetSource`-backed `DataSourceExec`. use std::any::Any; +use std::collections::{HashSet, VecDeque}; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use arrow::compute::SortOptions; use arrow::datatypes::SchemaRef; @@ -38,10 +39,14 @@ use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_physical_plan::expressions::Column; +use mini_moka::sync::Cache; use quickwit_common::uri::Uri; use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN; use quickwit_parquet_engine::split::ParquetSplitMetadata; use quickwit_parquet_engine::table_config::ProductType; +use regex::{Regex, RegexBuilder}; +use regex_automata::dfa::{Automaton, dense}; +use regex_automata::{Anchored, Input}; use tracing::debug; use super::predicate; @@ -56,6 +61,64 @@ const METRICS_SORT_ORDER: &[&str] = &[ "timeseries_id", "timestamp_secs", ]; +// Per-regex compile-time cap for dense DFA determinization. This limits how +// much memory we are willing to spend building one prefix-pruning automaton for +// a split zonemap regex. If a pathological regex exceeds this limit, DFA +// compilation fails and pruning falls back to conservative keep-split behavior. +const ZONEMAP_DFA_SIZE_LIMIT_BYTES: usize = 1_000_000; +const ZONEMAP_REGEX_CACHE_MAX_ENTRIES: usize = 4096; +// Cache up to roughly 256 worst-case DFAs. The cache is weighted by the actual +// compiled DFA memory usage, so small/common zonemap regexes occupy less of the +// budget than regexes close to `ZONEMAP_DFA_SIZE_LIMIT_BYTES`. +const ZONEMAP_DFA_CACHE_MAX_BYTES: u64 = 256 * ZONEMAP_DFA_SIZE_LIMIT_BYTES as u64; +// Account for the regex key and cache bookkeeping when weighting entries. This +// also keeps invalid regex entries bounded even though they do not store a DFA. +const ZONEMAP_CACHE_ENTRY_OVERHEAD_BYTES: usize = 128; + +type DenseDfa = dense::DFA>; + +static ZONEMAP_DFA_CONFIG: LazyLock = LazyLock::new(|| { + dense::Config::new() + .dfa_size_limit(Some(ZONEMAP_DFA_SIZE_LIMIT_BYTES)) + .determinize_size_limit(Some(ZONEMAP_DFA_SIZE_LIMIT_BYTES)) +}); +static ZONEMAP_SYNTAX_CONFIG: LazyLock = + LazyLock::new(|| regex_automata::util::syntax::Config::new().dot_matches_new_line(true)); +static ZONEMAP_REGEX_CACHE: LazyLock>> = + LazyLock::new(|| Cache::new(ZONEMAP_REGEX_CACHE_MAX_ENTRIES as u64)); +static ZONEMAP_DFA_CACHE: LazyLock>> = LazyLock::new(|| { + Cache::builder() + .max_capacity(ZONEMAP_DFA_CACHE_MAX_BYTES) + .weigher(zonemap_dfa_cache_weight) + .build() +}); + +#[derive(Clone)] +enum CachedCompiled { + Valid(Arc), + Invalid, +} + +impl CachedCompiled { + fn valid(&self) -> Option> { + match self { + Self::Valid(compiled) => Some(Arc::clone(compiled)), + Self::Invalid => None, + } + } +} + +fn zonemap_dfa_cache_weight(regex: &String, compiled: &CachedCompiled) -> u32 { + let compiled_size = match compiled { + CachedCompiled::Valid(dfa) => dfa.memory_usage(), + CachedCompiled::Invalid => 0, + }; + regex + .len() + .saturating_add(ZONEMAP_CACHE_ENTRY_OVERHEAD_BYTES) + .saturating_add(compiled_size) + .min(u32::MAX as usize) as u32 +} /// Provides split metadata for a metrics index. #[async_trait] @@ -159,7 +222,10 @@ impl TableProvider for MetricsTableProvider { &self, filters: &[&Expr], ) -> DFResult> { - Ok(filters.iter().map(|expr| classify_filter(expr)).collect()) + Ok(filters + .iter() + .map(|expr| classify_filter(expr, &self.schema)) + .collect()) } async fn scan( @@ -180,8 +246,15 @@ impl TableProvider for MetricsTableProvider { ); let splits = self.split_provider.list_splits(&split_query).await?; + let num_splits_before_metadata_pruning = splits.len(); + let splits = prune_splits_with_metadata(splits, filters); - debug!(num_splits = splits.len(), "found matching splits"); + debug!( + num_splits = splits.len(), + num_pruned_by_metadata = + num_splits_before_metadata_pruning.saturating_sub(splits.len()), + "found matching splits" + ); // The `ObjectStore` for this URL is lazily built by // `QuickwitObjectStoreRegistry` on first read — see @@ -244,39 +317,265 @@ impl TableProvider for MetricsTableProvider { } } -fn classify_filter(expr: &Expr) -> TableProviderFilterPushDown { +fn classify_filter(expr: &Expr, schema: &SchemaRef) -> TableProviderFilterPushDown { match expr { - Expr::BinaryExpr(binary) => { - if let Some(col_name) = - column_name_from_expr(&binary.left).or_else(|| column_name_from_expr(&binary.right)) + Expr::BinaryExpr(binary) if binary.op == datafusion::logical_expr::Operator::And => { + let left = classify_filter(&binary.left, schema); + let right = classify_filter(&binary.right, schema); + if left == TableProviderFilterPushDown::Inexact + || right == TableProviderFilterPushDown::Inexact { - // OSS uses bare column names (no tag_ prefix) - match col_name.as_str() { - "metric_name" | "timestamp_secs" => TableProviderFilterPushDown::Inexact, - _ => TableProviderFilterPushDown::Unsupported, - } + TableProviderFilterPushDown::Inexact } else { TableProviderFilterPushDown::Unsupported } } + Expr::BinaryExpr(binary) => { + let column_name = column_name_from_expr(&binary.left) + .or_else(|| column_name_from_expr(&binary.right)); + classify_declared_column_filter(column_name, schema) + } Expr::InList(in_list) => { - if let Some(col_name) = column_name_from_expr(&in_list.expr) { - match col_name.as_str() { - "metric_name" => TableProviderFilterPushDown::Inexact, - _ => TableProviderFilterPushDown::Unsupported, - } - } else { - TableProviderFilterPushDown::Unsupported - } + classify_declared_column_filter(column_name_from_expr(&in_list.expr), schema) + } + Expr::Like(like) if !like.negated => { + classify_declared_column_filter(column_name_from_expr(&like.expr), schema) } _ => TableProviderFilterPushDown::Unsupported, } } +fn classify_declared_column_filter( + column_name: Option, + schema: &SchemaRef, +) -> TableProviderFilterPushDown { + if column_name + .as_deref() + .is_some_and(|column_name| schema.index_of(column_name).is_ok()) + { + TableProviderFilterPushDown::Inexact + } else { + TableProviderFilterPushDown::Unsupported + } +} + fn column_name_from_expr(expr: &Expr) -> Option { predicate::column_name(expr) } +fn prune_splits_with_metadata( + splits: Vec, + filters: &[Expr], +) -> Vec { + let string_filters = predicate::extract_string_filters(filters); + let string_prefix_filters = predicate::extract_string_prefix_filters(filters); + if string_filters.is_empty() && string_prefix_filters.is_empty() { + return splits; + } + + splits + .into_iter() + .filter(|split| { + split_may_match_string_filters(split, &string_filters) + && split_may_match_string_prefix_filters(split, &string_prefix_filters) + }) + .collect() +} + +fn split_may_match_string_filters( + split: &ParquetSplitMetadata, + string_filters: &[predicate::StringFilter], +) -> bool { + string_filters + .iter() + .all(|string_filter| split_may_match_string_filter(split, string_filter)) +} + +fn split_may_match_string_filter( + split: &ParquetSplitMetadata, + string_filter: &predicate::StringFilter, +) -> bool { + if string_filter.values.is_empty() { + return false; + } + + if string_filter.column == "metric_name" && !split.metric_names.is_empty() { + return string_filter + .values + .iter() + .any(|value| split.metric_names.contains(value)); + } + + if let Some(split_values) = split.low_cardinality_tags.get(&string_filter.column) { + return string_filter + .values + .iter() + .any(|value| split_values.contains(value)); + } + + if let Some(superset_regex) = split.zonemap_regexes.get(&string_filter.column) { + return zonemap_may_match_any_value(superset_regex, &string_filter.values); + } + + true +} + +fn split_may_match_string_prefix_filters( + split: &ParquetSplitMetadata, + string_prefix_filters: &[predicate::StringPrefixFilter], +) -> bool { + string_prefix_filters.iter().all(|string_prefix_filter| { + split_may_match_string_prefix_filter(split, string_prefix_filter) + }) +} + +fn split_may_match_string_prefix_filter( + split: &ParquetSplitMetadata, + string_prefix_filter: &predicate::StringPrefixFilter, +) -> bool { + if string_prefix_filter.prefixes.is_empty() { + return false; + } + + if string_prefix_filter.column == "metric_name" && !split.metric_names.is_empty() { + return string_prefix_filter.prefixes.iter().any(|prefix| { + split + .metric_names + .iter() + .any(|metric_name| metric_name.starts_with(prefix)) + }); + } + + if let Some(split_values) = split.low_cardinality_tags.get(&string_prefix_filter.column) { + return string_prefix_filter + .prefixes + .iter() + .any(|prefix| split_values.iter().any(|value| value.starts_with(prefix))); + } + + if let Some(superset_regex) = split.zonemap_regexes.get(&string_prefix_filter.column) { + return zonemap_may_match_any_prefix(superset_regex, &string_prefix_filter.prefixes); + } + + true +} + +fn zonemap_may_match_any_value(superset_regex: &str, values: &[String]) -> bool { + let Some(regex) = cached_zonemap_regex(superset_regex) else { + return true; + }; + values.iter().any(|value| regex.is_match(value)) +} + +fn zonemap_may_match_any_prefix(superset_regex: &str, prefixes: &[String]) -> bool { + let Some(dfa) = cached_zonemap_dfa(superset_regex) else { + return true; + }; + + prefixes.iter().any( + |prefix| match zonemap_dfa_may_match_prefix(dfa.as_ref(), prefix) { + Ok(may_match) => may_match, + Err(error) => { + debug!( + %error, + superset_regex, + prefix, + "failed to evaluate split zonemap regex prefix" + ); + true + } + }, + ) +} + +fn cached_zonemap_regex(superset_regex: &str) -> Option> { + let cache_key = superset_regex.to_string(); + if let Some(cached) = ZONEMAP_REGEX_CACHE.get(&cache_key) { + return cached.valid(); + } + + let compiled = match RegexBuilder::new(superset_regex) + .dot_matches_new_line(true) + .build() + { + Ok(regex) => CachedCompiled::Valid(Arc::new(regex)), + Err(error) => { + debug!( + %error, + superset_regex, + "ignoring invalid split zonemap regex" + ); + CachedCompiled::Invalid + } + }; + let result = compiled.valid(); + ZONEMAP_REGEX_CACHE.insert(cache_key, compiled); + result +} + +fn cached_zonemap_dfa(superset_regex: &str) -> Option> { + let cache_key = superset_regex.to_string(); + if let Some(cached) = ZONEMAP_DFA_CACHE.get(&cache_key) { + return cached.valid(); + } + + let compiled = match dense::Builder::new() + .configure(ZONEMAP_DFA_CONFIG.clone()) + .syntax(ZONEMAP_SYNTAX_CONFIG.clone()) + .build(superset_regex) + { + Ok(dfa) => CachedCompiled::Valid(Arc::new(dfa)), + Err(error) => { + debug!( + %error, + superset_regex, + "ignoring invalid split zonemap regex" + ); + CachedCompiled::Invalid + } + }; + let result = compiled.valid(); + ZONEMAP_DFA_CACHE.insert(cache_key, compiled); + result +} + +fn zonemap_dfa_may_match_prefix( + dfa: &A, + prefix: &str, +) -> Result { + let empty = Input::new("").anchored(Anchored::Yes); + let mut state = dfa.start_state_forward(&empty)?; + for byte in prefix.bytes() { + state = dfa.next_state(state, byte); + if dfa.is_dead_state(state) { + return Ok(false); + } + } + + let mut visited = HashSet::new(); + let mut queue = VecDeque::new(); + visited.insert(state); + queue.push_back(state); + + while let Some(state) = queue.pop_front() { + let end_state = dfa.next_eoi_state(state); + if dfa.is_match_state(end_state) { + return Ok(true); + } + if dfa.is_dead_state(state) { + continue; + } + for byte in 0..=u8::MAX { + let next_state = dfa.next_state(state, byte); + if !dfa.is_dead_state(next_state) && visited.insert(next_state) { + queue.push_back(next_state); + } + } + } + + Ok(false) +} + fn sort_expr( schema: &SchemaRef, col_name: &str, @@ -348,7 +647,11 @@ fn splits_have_default_metrics_sort(splits: &[ParquetSplitMetadata]) -> bool { #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field, Schema}; - use quickwit_parquet_engine::split::TimeRange; + use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; + use datafusion::prelude::*; + use quickwit_parquet_engine::split::{ + ParquetSplitId, TAG_ENV, TAG_HOST, TAG_SERVICE, TimeRange, + }; use super::*; @@ -366,7 +669,7 @@ mod tests { .map(|expr| { expr.expr .as_any() - .downcast_ref::() + .downcast_ref::() .expect("metrics ordering should contain column expressions") .name() .to_string() @@ -390,6 +693,70 @@ mod tests { .build() } + fn test_split(split_id: &str) -> ParquetSplitMetadata { + ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new(split_id)) + .index_uid("idx:00000000000000000000000000") + .time_range(TimeRange::new(100, 200)) + .num_rows(10) + .size_bytes(1024) + .add_metric_name("cpu.usage") + .build() + } + + fn test_split_with_low_cardinality_tag( + split_id: &str, + tag_key: &str, + tag_value: &str, + ) -> ParquetSplitMetadata { + ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new(split_id)) + .index_uid("idx:00000000000000000000000000") + .time_range(TimeRange::new(100, 200)) + .num_rows(10) + .size_bytes(1024) + .add_metric_name("cpu.usage") + .add_low_cardinality_tag(tag_key, tag_value) + .build() + } + + fn test_split_with_zonemap( + split_id: &str, + column: &str, + superset_regex: &str, + ) -> ParquetSplitMetadata { + let mut split = test_split(split_id); + split + .zonemap_regexes + .insert(column.to_string(), superset_regex.to_string()); + split + } + + fn metadata_pruned_split_ids( + splits: Vec, + filters: Vec, + ) -> Vec { + prune_splits_with_metadata(splits, &filters) + .into_iter() + .map(|split| split.split_id.as_str().to_string()) + .collect() + } + + fn assert_metadata_pruned_split_ids( + splits: Vec, + filters: Vec, + expected_split_ids: &[&str], + ) { + assert_eq!( + metadata_pruned_split_ids(splits, filters), + expected_names(expected_split_ids) + ); + } + + fn assert_metadata_prunes_all(splits: Vec, filters: Vec) { + assert_metadata_pruned_split_ids(splits, filters, &[]); + } + #[test] fn metrics_output_ordering_stops_at_first_missing_sort_key() { let schema = schema_with_columns(&["metric_name", "service", "timestamp_secs"]); @@ -476,4 +843,296 @@ mod tests { "default metrics sort metadata should enable the advertised ordering" ); } + + #[test] + fn metadata_pruning_uses_low_cardinality_tags() { + assert_metadata_pruned_split_ids( + vec![ + test_split_with_low_cardinality_tag("web", TAG_SERVICE, "web"), + test_split_with_low_cardinality_tag("api", TAG_SERVICE, "api"), + ], + vec![col(TAG_SERVICE).eq(lit("web"))], + &["web"], + ); + } + + #[test] + fn metadata_pruning_treats_low_cardinality_in_list_as_any_match() { + assert_metadata_pruned_split_ids( + vec![ + test_split_with_low_cardinality_tag("web", TAG_SERVICE, "web"), + test_split_with_low_cardinality_tag("api", TAG_SERVICE, "api"), + test_split_with_low_cardinality_tag("db", TAG_SERVICE, "db"), + ], + vec![col(TAG_SERVICE).in_list(vec![lit("web"), lit("api")], false)], + &["web", "api"], + ); + } + + #[test] + fn metadata_pruning_uses_zonemap_regexes() { + let mut prod_split = test_split("prod"); + prod_split + .zonemap_regexes + .insert(TAG_ENV.to_string(), "^prod$".to_string()); + let mut staging_split = test_split("staging"); + staging_split + .zonemap_regexes + .insert(TAG_ENV.to_string(), "^staging$".to_string()); + + assert_metadata_pruned_split_ids( + vec![prod_split, staging_split], + vec![col(TAG_ENV).eq(lit("prod"))], + &["prod"], + ); + } + + #[test] + fn metadata_pruning_uses_zonemap_regexes_for_declared_custom_columns() { + let mut matching_split = test_split("matching"); + matching_split.zonemap_regexes.insert( + "availability_zone".to_string(), + "^us\\-east\\-1a$".to_string(), + ); + let mut nonmatching_split = test_split("nonmatching"); + nonmatching_split.zonemap_regexes.insert( + "availability_zone".to_string(), + "^us\\-east\\-1b$".to_string(), + ); + + assert_metadata_pruned_split_ids( + vec![matching_split, nonmatching_split], + vec![col("availability_zone").eq(lit("us-east-1a"))], + &["matching"], + ); + } + + #[test] + fn metadata_pruning_matches_zonemap_equality_case_sensitively() { + let exact = test_split_with_zonemap("exact", TAG_ENV, "^v1$"); + + assert_metadata_pruned_split_ids( + vec![exact.clone()], + vec![col(TAG_ENV).eq(lit("v1"))], + &["exact"], + ); + assert_metadata_prunes_all(vec![exact.clone()], vec![col(TAG_ENV).eq(lit("V1"))]); + assert_metadata_prunes_all(vec![exact], vec![col(TAG_ENV).eq(lit("v2"))]); + } + + #[test] + fn metadata_pruning_keeps_zonemap_superset_equality_matches() { + let superset = test_split_with_zonemap("superset", TAG_ENV, "^v.*$"); + + assert_metadata_pruned_split_ids( + vec![superset.clone()], + vec![col(TAG_ENV).eq(lit("v1"))], + &["superset"], + ); + assert_metadata_pruned_split_ids( + vec![superset.clone()], + vec![col(TAG_ENV).eq(lit("v2"))], + &["superset"], + ); + assert_metadata_prunes_all(vec![superset], vec![col(TAG_ENV).eq(lit("w3"))]); + } + + #[test] + fn metadata_pruning_requires_all_zonemap_conjuncts() { + let mut multi_column = test_split_with_zonemap("multi-column", TAG_ENV, "^v.*$"); + multi_column + .zonemap_regexes + .insert(TAG_HOST.to_string(), "^x$".to_string()); + assert_metadata_pruned_split_ids( + vec![multi_column.clone()], + vec![col(TAG_ENV).eq(lit("v1")).and(col(TAG_HOST).eq(lit("x")))], + &["multi-column"], + ); + assert_metadata_prunes_all( + vec![multi_column.clone()], + vec![col(TAG_ENV).eq(lit("w3")).and(col(TAG_HOST).eq(lit("x")))], + ); + assert_metadata_prunes_all( + vec![multi_column], + vec![col(TAG_ENV).eq(lit("v1")).and(col(TAG_HOST).eq(lit("y")))], + ); + } + + #[test] + fn metadata_pruning_keeps_or_predicates_conservative() { + let mut multi_column = test_split_with_zonemap("multi-column", TAG_ENV, "^v.*$"); + multi_column + .zonemap_regexes + .insert(TAG_HOST.to_string(), "^x$".to_string()); + + assert_metadata_pruned_split_ids( + vec![multi_column], + vec![col(TAG_ENV).eq(lit("w3")).or(col(TAG_HOST).eq(lit("y")))], + &["multi-column"], + ); + } + + #[test] + fn metadata_pruning_treats_zonemap_in_list_as_any_match() { + let exact = test_split_with_zonemap("exact", TAG_ENV, "^v1$"); + assert_metadata_pruned_split_ids( + vec![exact.clone()], + vec![col(TAG_ENV).in_list(vec![lit("a"), lit("v1"), lit("c")], false)], + &["exact"], + ); + assert_metadata_prunes_all( + vec![exact], + vec![col(TAG_ENV).in_list(vec![lit("a"), lit("V1"), lit("c")], false)], + ); + + let superset = test_split_with_zonemap("superset", TAG_ENV, "^v.*$"); + assert_metadata_pruned_split_ids( + vec![superset.clone()], + vec![col(TAG_ENV).in_list(vec![lit("v1"), lit("v2"), lit("abc")], false)], + &["superset"], + ); + assert_metadata_prunes_all( + vec![superset], + vec![col(TAG_ENV).in_list(vec![lit("a"), lit("b"), lit("c")], false)], + ); + } + + #[test] + fn metadata_pruning_keeps_unsupported_string_predicates_conservative() { + let split = test_split_with_zonemap("prod", TAG_ENV, "^prod$"); + + assert_metadata_pruned_split_ids( + vec![split.clone()], + vec![col(TAG_ENV).gt(lit("prod"))], + &["prod"], + ); + assert_metadata_pruned_split_ids( + vec![split.clone()], + vec![col(TAG_ENV).eq(lit(1i64))], + &["prod"], + ); + assert_metadata_pruned_split_ids( + vec![split.clone()], + vec![col(TAG_ENV).ilike(lit("PROD%"))], + &["prod"], + ); + assert_metadata_pruned_split_ids( + vec![split], + vec![col(TAG_ENV).not_like(lit("prod%"))], + &["prod"], + ); + } + + #[test] + fn metadata_pruning_uses_low_cardinality_tag_prefixes() { + assert_metadata_pruned_split_ids( + vec![ + test_split_with_low_cardinality_tag("host-07", TAG_HOST, "ID-0701"), + test_split_with_low_cardinality_tag("host-08", TAG_HOST, "ID-0801"), + ], + vec![col(TAG_HOST).like(lit("ID-07%"))], + &["host-07"], + ); + } + + #[test] + fn metadata_pruning_prunes_zonemap_regexes_for_like_prefixes() { + let mut host_07_split = test_split("host-07"); + host_07_split + .zonemap_regexes + .insert(TAG_HOST.to_string(), "^ID\\-0701$".to_string()); + let mut host_08_split = test_split("host-08"); + host_08_split + .zonemap_regexes + .insert(TAG_HOST.to_string(), "^ID\\-0801$".to_string()); + + assert_metadata_pruned_split_ids( + vec![host_07_split, host_08_split], + vec![col(TAG_HOST).like(lit("ID-07%"))], + &["host-07"], + ); + } + + #[test] + fn zonemap_prefix_matching_accepts_possible_suffixes() { + assert!(zonemap_may_match_any_prefix( + "^[I][\\s\\S]+$", + &["ID-07".to_string()] + )); + assert!(!zonemap_may_match_any_prefix( + "^host\\-[\\s\\S]+$", + &["ID-07".to_string()] + )); + } + + #[test] + fn metadata_pruning_evaluates_legacy_dotall_zonemap_regexes() { + assert!(zonemap_may_match_any_value( + "^foo.+$", + &["foo\nbar".to_string()] + )); + assert!(zonemap_may_match_any_prefix( + "^foo.+$", + &["foo\n".to_string()] + )); + } + + #[test] + fn classify_filter_pushes_down_like_for_metrics_tags() { + let schema = schema_with_columns(&[TAG_HOST]); + assert_eq!( + classify_filter(&col(TAG_HOST).like(lit("ID-07%")), &schema), + TableProviderFilterPushDown::Inexact + ); + } + + #[test] + fn classify_filter_uses_declared_schema_columns() { + let schema = schema_with_columns(&["metric_name", "availability_zone"]); + + assert_eq!( + classify_filter(&col("availability_zone").eq(lit("us-east-1a")), &schema), + TableProviderFilterPushDown::Inexact + ); + assert_eq!( + classify_filter( + &col("availability_zone").in_list(vec![lit("us-east-1a")], false), + &schema + ), + TableProviderFilterPushDown::Inexact + ); + assert_eq!( + classify_filter(&col("availability_zone").like(lit("us-east-%")), &schema), + TableProviderFilterPushDown::Inexact + ); + assert_eq!( + classify_filter(&col("undeclared_tag").eq(lit("value")), &schema), + TableProviderFilterPushDown::Unsupported + ); + } + + #[test] + fn metadata_pruning_keeps_splits_without_relevant_metadata() { + let split = test_split("unknown"); + + assert_metadata_pruned_split_ids( + vec![split], + vec![col(TAG_ENV).eq(lit("prod"))], + &["unknown"], + ); + } + + #[test] + fn metadata_pruning_keeps_splits_with_invalid_zonemap_regex() { + let mut split = test_split("invalid-regex"); + split + .zonemap_regexes + .insert(TAG_ENV.to_string(), "(".to_string()); + + assert_metadata_pruned_split_ids( + vec![split], + vec![col(TAG_ENV).eq(lit("prod"))], + &["invalid-regex"], + ); + } } diff --git a/quickwit/quickwit-datafusion/tests/metrics.rs b/quickwit/quickwit-datafusion/tests/metrics.rs index 910f4d79db1..d6041125f84 100644 --- a/quickwit/quickwit-datafusion/tests/metrics.rs +++ b/quickwit/quickwit-datafusion/tests/metrics.rs @@ -32,7 +32,7 @@ mod common; mod metrics_splits; use common::{TestSandbox, create_metrics_index}; -use metrics_splits::publish_split; +use metrics_splits::{publish_split, publish_split_with_tag_metadata}; // ── Setup ────────────────────────────────────────────────────────── @@ -480,6 +480,176 @@ async fn test_in_list_tag_filter_returns_all_matching_rows() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_writer_zonemap_metadata_pruning_skips_unreadable_nonmatching_split() { + use quickwit_datafusion::test_utils::make_batch_with_tags; + + let sandbox = start_sandbox().await; + let metastore = sandbox.metastore.clone(); + let data_dir = &sandbox.data_dir; + let builder = session_builder(&sandbox); + + let index_uid = + create_metrics_index(&metastore, "test-tag-metadata-prune", data_dir.path()).await; + // Publish without exact low-cardinality tag metadata. If this query succeeds + // after deleting staging.parquet, pruning came from writer-generated + // zonemap_regexes rather than exact tag sets. + publish_split_with_tag_metadata( + &metastore, + &index_uid, + data_dir.path(), + "prod", + &make_batch_with_tags( + "cpu.usage", + &[100, 200], + &[1.0, 2.0], + Some("web"), + Some("prod"), + None, + None, + None, + ), + false, + ) + .await; + publish_split_with_tag_metadata( + &metastore, + &index_uid, + data_dir.path(), + "staging", + &make_batch_with_tags( + "cpu.usage", + &[100, 200], + &[10.0, 20.0], + Some("web"), + Some("staging"), + None, + None, + None, + ), + false, + ) + .await; + + std::fs::remove_file(data_dir.path().join("staging.parquet")) + .expect("remove nonmatching staging parquet file"); + + let sql = r#" + CREATE OR REPLACE EXTERNAL TABLE "test-tag-metadata-prune" ( + metric_name VARCHAR NOT NULL, metric_type TINYINT, + timestamp_secs BIGINT NOT NULL, value DOUBLE NOT NULL, service VARCHAR, env VARCHAR + ) STORED AS metrics LOCATION 'test-tag-metadata-prune'; + SELECT COUNT(*) AS cnt, SUM(value) AS total + FROM "test-tag-metadata-prune" + WHERE env = 'prod'"#; + let batches = run_sql(&builder, sql).await; + assert_eq!(total_rows(&batches), 1); + let cnt = batches[0] + .column_by_name("cnt") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert_eq!(cnt, 2); + let total = batches[0] + .column_by_name("total") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert!( + (total - 3.0).abs() < 0.01, + "expected only prod split values to be scanned" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_writer_zonemap_prefix_like_pruning_skips_unreadable_nonmatching_split() { + use quickwit_datafusion::test_utils::make_batch_with_tags; + + let sandbox = start_sandbox().await; + let metastore = sandbox.metastore.clone(); + let data_dir = &sandbox.data_dir; + let builder = session_builder(&sandbox); + + let index_uid = + create_metrics_index(&metastore, "test-tag-prefix-like-prune", data_dir.path()).await; + // Publish without exact low-cardinality tag metadata. If this query succeeds + // after deleting host-08.parquet, pruning came from writer-generated + // zonemap_regexes rather than exact tag sets. + publish_split_with_tag_metadata( + &metastore, + &index_uid, + data_dir.path(), + "host-07", + &make_batch_with_tags( + "cpu.usage", + &[100, 200], + &[1.0, 2.0], + None, + None, + None, + None, + Some("ID-0701"), + ), + false, + ) + .await; + publish_split_with_tag_metadata( + &metastore, + &index_uid, + data_dir.path(), + "host-08", + &make_batch_with_tags( + "cpu.usage", + &[100, 200], + &[10.0, 20.0], + None, + None, + None, + None, + Some("ID-0801"), + ), + false, + ) + .await; + + std::fs::remove_file(data_dir.path().join("host-08.parquet")) + .expect("remove nonmatching host-08 parquet file"); + + let sql = r#" + CREATE OR REPLACE EXTERNAL TABLE "test-tag-prefix-like-prune" ( + metric_name VARCHAR NOT NULL, metric_type TINYINT, + timestamp_secs BIGINT NOT NULL, value DOUBLE NOT NULL, host VARCHAR + ) STORED AS metrics LOCATION 'test-tag-prefix-like-prune'; + SELECT COUNT(*) AS cnt, SUM(value) AS total + FROM "test-tag-prefix-like-prune" + WHERE host LIKE 'ID-07%'"#; + let batches = run_sql(&builder, sql).await; + assert_eq!(total_rows(&batches), 1); + let cnt = batches[0] + .column_by_name("cnt") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert_eq!(cnt, 2); + let total = batches[0] + .column_by_name("total") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert!( + (total - 3.0).abs() < 0.01, + "expected only host-07 split values to be scanned" + ); +} + /// Demonstrates the `sum:metric{filter} by {groups}.rollup(agg, interval)` pattern /// over wide-format parquet data — no context/points JOIN needed. /// @@ -680,8 +850,9 @@ async fn test_rollup_nested_aggregation() { "expected scan/partial stage parallelism to stay split-bounded:\n{plan_str}" ); assert!( - plan_str.contains("file_groups={4 groups"), - "expected one scan partition per split, not byte-range split partitions:\n{plan_str}" + plan_str.contains("file_groups={3 groups"), + "expected one scan partition per matching split after metadata pruning, not byte-range \ + split partitions:\n{plan_str}" ); let batches = df.collect().await.unwrap(); diff --git a/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs b/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs index 5537fabf4ff..532af78722d 100644 --- a/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs +++ b/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs @@ -38,7 +38,18 @@ pub async fn publish_split( split_name: &str, batch: &RecordBatch, ) { - let (parquet_bytes, _) = + publish_split_with_tag_metadata(metastore, index_uid, data_dir, split_name, batch, true).await; +} + +pub(crate) async fn publish_split_with_tag_metadata( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + data_dir: &std::path::Path, + split_name: &str, + batch: &RecordBatch, + include_low_cardinality_tags: bool, +) { + let (parquet_bytes, (row_keys_proto, zonemap_regexes)) = ParquetWriter::new(ParquetWriterConfig::default(), &TableConfig::default()) .unwrap() .write_to_bytes(batch, None) @@ -92,33 +103,41 @@ pub async fn publish_split( for name in &metric_names { builder = builder.add_metric_name(name.clone()); } + if let Some(row_keys_proto) = row_keys_proto { + builder = builder.row_keys_proto(row_keys_proto); + } + for (column, regex) in zonemap_regexes { + builder = builder.add_zonemap_regex(column, regex); + } - for tag_col in &["service", "env", "datacenter", "region", "host"] { - if let Ok(col_idx) = batch_schema.index_of(tag_col) { - let col = batch.column(col_idx); - let values: HashSet = if let Some(dict) = - col.as_any() - .downcast_ref::>() - { - let keys = dict - .keys() - .as_any() - .downcast_ref::() - .unwrap(); - let vals = dict - .values() - .as_any() - .downcast_ref::() - .unwrap(); - (0..batch.num_rows()) - .filter(|i| !keys.is_null(*i)) - .map(|i| vals.value(keys.value(i) as usize).to_string()) - .collect() - } else { - HashSet::new() - }; - for v in values { - builder = builder.add_low_cardinality_tag(tag_col.to_string(), v); + if include_low_cardinality_tags { + for tag_col in &["service", "env", "datacenter", "region", "host"] { + if let Ok(col_idx) = batch_schema.index_of(tag_col) { + let col = batch.column(col_idx); + let values: HashSet = if let Some(dict) = + col.as_any() + .downcast_ref::>() + { + let keys = dict + .keys() + .as_any() + .downcast_ref::() + .unwrap(); + let vals = dict + .values() + .as_any() + .downcast_ref::() + .unwrap(); + (0..batch.num_rows()) + .filter(|i| !keys.is_null(*i)) + .map(|i| vals.value(keys.value(i) as usize).to_string()) + .collect() + } else { + HashSet::new() + }; + for v in values { + builder = builder.add_low_cardinality_tag(tag_col.to_string(), v); + } } } }