Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
prost = { workspace = true }
regex = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
Expand Down
143 changes: 140 additions & 3 deletions quickwit/quickwit-datafusion/src/sources/metrics/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,35 @@
//! OSS column names: `service`, `env`, `datacenter`, `region`, `host`
//! (no `tag_` prefix — the parquet files use bare column names).

use std::collections::HashMap;

use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
use datafusion::scalar::ScalarValue;

/// Extracted filters for querying the metrics_splits table.
///
/// Split-level filters for metastore pruning.
///
/// Only metric name and time range are used — the only fields the metastore
/// reliably populates today. Tag-based pruning will be added when the
/// zonemap/bloom-filter mechanism lands.
/// These fields are safe to pass to the metastore for coarse split discovery.
/// More granular string filters are extracted separately for conservative
/// client-side metadata pruning after the split metadata has been fetched.
#[derive(Debug, Default, Clone)]
pub struct MetricsSplitQuery {
pub metric_names: Option<Vec<String>>,
pub time_range_start: Option<u64>,
pub time_range_end: Option<u64>,
}

/// Equality/IN filter over string-valued columns.
///
/// This is used for split metadata pruning. Values are interpreted as an OR
/// list for a column; multiple filters on the same column are intersected.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct StringFilter {
pub column: String,
pub values: Vec<String>,
}

/// Analyzes pushed-down filter expressions and extracts split-level filters.
///
/// Returns a `MetricsSplitQuery` for Postgres pruning plus any remaining
Expand All @@ -54,6 +66,85 @@ pub fn extract_split_filters(filters: &[Expr]) -> (MetricsSplitQuery, Vec<Expr>)
(query, remaining)
}

/// Extract string equality/IN predicates that are safe to evaluate against
/// exact split metadata or zonemap superset regexes.
pub(crate) fn extract_string_filters(filters: &[Expr]) -> Vec<StringFilter> {
let mut by_column: HashMap<String, Vec<String>> = HashMap::new();

for filter in filters {
collect_string_filters(filter, &mut by_column);
}

by_column
.into_iter()
.map(|(column, values)| StringFilter { column, values })
.collect()
}

fn collect_string_filters(expr: &Expr, by_column: &mut HashMap<String, Vec<String>>) {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::Eq => {
let Some((column, value)) = string_eq_filter(left, right) else {
return;
};
tighten_string_values(by_column, column, vec![value]);
}
Operator::And => {
collect_string_filters(left, by_column);
collect_string_filters(right, by_column);
}
_ => {}
},
Expr::InList(in_list) if !in_list.negated => {
let Some(column) = column_name(&in_list.expr) else {
return;
};
let values: Vec<String> = in_list.list.iter().filter_map(scalar_utf8).collect();
if values.len() == in_list.list.len() {
tighten_string_values(by_column, column, values);
}
}
_ => {}
}
}

fn string_eq_filter(left: &Expr, right: &Expr) -> Option<(String, String)> {
match (column_name(left), scalar_utf8(right)) {
(Some(column), Some(value)) => Some((column, value)),
_ => match (scalar_utf8(left), column_name(right)) {
(Some(value), Some(column)) => Some((column, value)),
_ => None,
},
}
}

fn tighten_string_values(
by_column: &mut HashMap<String, Vec<String>>,
column: String,
values: Vec<String>,
) {
let mut values = dedup_strings(values);
match by_column.get_mut(&column) {
Some(existing) => {
existing.retain(|value| values.contains(value));
}
None => {
by_column.insert(column, std::mem::take(&mut values));
}
}
}

fn dedup_strings(values: Vec<String>) -> Vec<String> {
let mut deduped = Vec::with_capacity(values.len());
for value in values {
if !deduped.contains(&value) {
deduped.push(value);
}
}
deduped
}

fn try_extract_filter(expr: &Expr, query: &mut MetricsSplitQuery) -> bool {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Expand Down Expand Up @@ -214,6 +305,8 @@ fn scalar_utf8(expr: &Expr) -> Option<String> {
Expr::Literal(ScalarValue::Utf8View(Some(s)), _) => Some(s.clone()),
// DF auto-casts string literals to Dict(Int32, Utf8) to match dict-encoded columns
Expr::Literal(ScalarValue::Dictionary(_, inner), _) => scalar_utf8_from_scalar(inner),
Expr::Cast(datafusion::logical_expr::Cast { expr, .. })
| Expr::TryCast(datafusion::logical_expr::TryCast { expr, .. }) => scalar_utf8(expr),
_ => None,
}
}
Expand Down Expand Up @@ -279,6 +372,50 @@ mod tests {
assert_eq!(remaining.len(), 2);
}

#[test]
fn test_extract_string_filters_for_tags() {
let filters = vec![
col("service").eq(lit("web")),
col("env").in_list(vec![lit("prod"), lit("staging")], false),
col("value").gt(lit(42.0)),
];

let mut string_filters = extract_string_filters(&filters);
string_filters.sort_by(|left, right| left.column.cmp(&right.column));

assert_eq!(
string_filters,
vec![
StringFilter {
column: "env".to_string(),
values: vec!["prod".to_string(), "staging".to_string()],
},
StringFilter {
column: "service".to_string(),
values: vec!["web".to_string()],
},
]
);
}

#[test]
fn test_extract_string_filters_intersects_repeated_column() {
let filters = vec![
col("service").in_list(vec![lit("web"), lit("api")], false),
col("service").eq(lit("api")),
];

let string_filters = extract_string_filters(&filters);

assert_eq!(
string_filters,
vec![StringFilter {
column: "service".to_string(),
values: vec!["api".to_string()],
}]
);
}

#[test]
fn test_unknown_column_left_as_remaining() {
let filters = vec![
Expand Down
Loading
Loading