Skip to content

Commit 910756a

Browse files
Zone Map Pruning for Metrics
1 parent 32fbda2 commit 910756a

6 files changed

Lines changed: 459 additions & 39 deletions

File tree

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ bytes = { workspace = true }
1616
chrono = { workspace = true }
1717
futures = { workspace = true }
1818
prost = { workspace = true }
19+
regex = { workspace = true }
1920
serde_json = { workspace = true }
2021
tokio = { workspace = true }
2122
tonic = { workspace = true }

quickwit/quickwit-datafusion/src/sources/metrics/predicate.rs

Lines changed: 140 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,35 @@
2020
//! OSS column names: `service`, `env`, `datacenter`, `region`, `host`
2121
//! (no `tag_` prefix — the parquet files use bare column names).
2222
23+
use std::collections::HashMap;
24+
2325
use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
2426
use datafusion::scalar::ScalarValue;
2527

2628
/// Extracted filters for querying the metrics_splits table.
2729
///
2830
/// Split-level filters for metastore pruning.
2931
///
30-
/// Only metric name and time range are used — the only fields the metastore
31-
/// reliably populates today. Tag-based pruning will be added when the
32-
/// zonemap/bloom-filter mechanism lands.
32+
/// These fields are safe to pass to the metastore for coarse split discovery.
33+
/// More granular string filters are extracted separately for conservative
34+
/// client-side metadata pruning after the split metadata has been fetched.
3335
#[derive(Debug, Default, Clone)]
3436
pub struct MetricsSplitQuery {
3537
pub metric_names: Option<Vec<String>>,
3638
pub time_range_start: Option<u64>,
3739
pub time_range_end: Option<u64>,
3840
}
3941

42+
/// Equality/IN filter over string-valued columns.
43+
///
44+
/// This is used for split metadata pruning. Values are interpreted as an OR
45+
/// list for a column; multiple filters on the same column are intersected.
46+
#[derive(Debug, Clone, PartialEq, Eq)]
47+
pub(crate) struct StringFilter {
48+
pub column: String,
49+
pub values: Vec<String>,
50+
}
51+
4052
/// Analyzes pushed-down filter expressions and extracts split-level filters.
4153
///
4254
/// Returns a `MetricsSplitQuery` for Postgres pruning plus any remaining
@@ -54,6 +66,85 @@ pub fn extract_split_filters(filters: &[Expr]) -> (MetricsSplitQuery, Vec<Expr>)
5466
(query, remaining)
5567
}
5668

69+
/// Extract string equality/IN predicates that are safe to evaluate against
70+
/// exact split metadata or zonemap superset regexes.
71+
pub(crate) fn extract_string_filters(filters: &[Expr]) -> Vec<StringFilter> {
72+
let mut by_column: HashMap<String, Vec<String>> = HashMap::new();
73+
74+
for filter in filters {
75+
collect_string_filters(filter, &mut by_column);
76+
}
77+
78+
by_column
79+
.into_iter()
80+
.map(|(column, values)| StringFilter { column, values })
81+
.collect()
82+
}
83+
84+
fn collect_string_filters(expr: &Expr, by_column: &mut HashMap<String, Vec<String>>) {
85+
match expr {
86+
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
87+
Operator::Eq => {
88+
let Some((column, value)) = string_eq_filter(left, right) else {
89+
return;
90+
};
91+
tighten_string_values(by_column, column, vec![value]);
92+
}
93+
Operator::And => {
94+
collect_string_filters(left, by_column);
95+
collect_string_filters(right, by_column);
96+
}
97+
_ => {}
98+
},
99+
Expr::InList(in_list) if !in_list.negated => {
100+
let Some(column) = column_name(&in_list.expr) else {
101+
return;
102+
};
103+
let values: Vec<String> = in_list.list.iter().filter_map(scalar_utf8).collect();
104+
if values.len() == in_list.list.len() {
105+
tighten_string_values(by_column, column, values);
106+
}
107+
}
108+
_ => {}
109+
}
110+
}
111+
112+
fn string_eq_filter(left: &Expr, right: &Expr) -> Option<(String, String)> {
113+
match (column_name(left), scalar_utf8(right)) {
114+
(Some(column), Some(value)) => Some((column, value)),
115+
_ => match (scalar_utf8(left), column_name(right)) {
116+
(Some(value), Some(column)) => Some((column, value)),
117+
_ => None,
118+
},
119+
}
120+
}
121+
122+
fn tighten_string_values(
123+
by_column: &mut HashMap<String, Vec<String>>,
124+
column: String,
125+
values: Vec<String>,
126+
) {
127+
let mut values = dedup_strings(values);
128+
match by_column.get_mut(&column) {
129+
Some(existing) => {
130+
existing.retain(|value| values.contains(value));
131+
}
132+
None => {
133+
by_column.insert(column, std::mem::take(&mut values));
134+
}
135+
}
136+
}
137+
138+
fn dedup_strings(values: Vec<String>) -> Vec<String> {
139+
let mut deduped = Vec::with_capacity(values.len());
140+
for value in values {
141+
if !deduped.contains(&value) {
142+
deduped.push(value);
143+
}
144+
}
145+
deduped
146+
}
147+
57148
fn try_extract_filter(expr: &Expr, query: &mut MetricsSplitQuery) -> bool {
58149
match expr {
59150
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
@@ -214,6 +305,8 @@ fn scalar_utf8(expr: &Expr) -> Option<String> {
214305
Expr::Literal(ScalarValue::Utf8View(Some(s)), _) => Some(s.clone()),
215306
// DF auto-casts string literals to Dict(Int32, Utf8) to match dict-encoded columns
216307
Expr::Literal(ScalarValue::Dictionary(_, inner), _) => scalar_utf8_from_scalar(inner),
308+
Expr::Cast(datafusion::logical_expr::Cast { expr, .. })
309+
| Expr::TryCast(datafusion::logical_expr::TryCast { expr, .. }) => scalar_utf8(expr),
217310
_ => None,
218311
}
219312
}
@@ -279,6 +372,50 @@ mod tests {
279372
assert_eq!(remaining.len(), 2);
280373
}
281374

375+
#[test]
376+
fn test_extract_string_filters_for_tags() {
377+
let filters = vec![
378+
col("service").eq(lit("web")),
379+
col("env").in_list(vec![lit("prod"), lit("staging")], false),
380+
col("value").gt(lit(42.0)),
381+
];
382+
383+
let mut string_filters = extract_string_filters(&filters);
384+
string_filters.sort_by(|left, right| left.column.cmp(&right.column));
385+
386+
assert_eq!(
387+
string_filters,
388+
vec![
389+
StringFilter {
390+
column: "env".to_string(),
391+
values: vec!["prod".to_string(), "staging".to_string()],
392+
},
393+
StringFilter {
394+
column: "service".to_string(),
395+
values: vec!["web".to_string()],
396+
},
397+
]
398+
);
399+
}
400+
401+
#[test]
402+
fn test_extract_string_filters_intersects_repeated_column() {
403+
let filters = vec![
404+
col("service").in_list(vec![lit("web"), lit("api")], false),
405+
col("service").eq(lit("api")),
406+
];
407+
408+
let string_filters = extract_string_filters(&filters);
409+
410+
assert_eq!(
411+
string_filters,
412+
vec![StringFilter {
413+
column: "service".to_string(),
414+
values: vec!["api".to_string()],
415+
}]
416+
);
417+
}
418+
282419
#[test]
283420
fn test_unknown_column_left_as_remaining() {
284421
let filters = vec![

0 commit comments

Comments
 (0)