Skip to content

Commit 93bcde4

Browse files
Zone Map Pruning for Metrics
1 parent 149a8cb commit 93bcde4

7 files changed

Lines changed: 830 additions & 40 deletions

File tree

quickwit/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ rdkafka = { version = "0.39", default-features = false, features = [
214214
"zstd",
215215
] }
216216
regex = "1.12"
217+
regex-automata = "0.4"
217218
regex-syntax = "0.8"
218219
reqwest = { version = "0.12", default-features = false, features = [
219220
"json",

quickwit/quickwit-datafusion/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ bytes = { workspace = true }
1616
chrono = { workspace = true }
1717
futures = { workspace = true }
1818
prost = { workspace = true }
19+
regex = { workspace = true }
20+
regex-automata = { workspace = true }
1921
serde_json = { workspace = true }
2022
tokio = { workspace = true }
2123
tonic = { workspace = true }

quickwit/quickwit-datafusion/src/sources/metrics/predicate.rs

Lines changed: 221 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,46 @@
2020
//! OSS column names: `service`, `env`, `datacenter`, `region`, `host`
2121
//! (no `tag_` prefix — the parquet files use bare column names).
2222
23-
use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
23+
use std::collections::HashMap;
24+
25+
use datafusion::logical_expr::{BinaryExpr, Expr, Like, Operator};
2426
use datafusion::scalar::ScalarValue;
2527

2628
/// Extracted filters for querying the metrics_splits table.
2729
///
2830
/// Split-level filters for metastore pruning.
2931
///
30-
/// Only metric name and time range are used — the only fields the metastore
31-
/// reliably populates today. Tag-based pruning will be added when the
32-
/// zonemap/bloom-filter mechanism lands.
32+
/// These fields are safe to pass to the metastore for coarse split discovery.
33+
/// More granular string filters are extracted separately for conservative
34+
/// client-side metadata pruning after the split metadata has been fetched.
3335
#[derive(Debug, Default, Clone)]
3436
pub struct MetricsSplitQuery {
3537
pub metric_names: Option<Vec<String>>,
3638
pub time_range_start: Option<u64>,
3739
pub time_range_end: Option<u64>,
3840
}
3941

42+
/// Equality/IN filter over string-valued columns.
43+
///
44+
/// This is used for split metadata pruning. Values are interpreted as an OR
45+
/// list for a column; multiple filters on the same column are intersected.
46+
#[derive(Debug, Clone, PartialEq, Eq)]
47+
pub(crate) struct StringFilter {
48+
pub column: String,
49+
pub values: Vec<String>,
50+
}
51+
52+
/// Prefix filter over string-valued columns extracted from simple LIKE
53+
/// predicates.
54+
///
55+
/// Each prefix represents `column LIKE 'prefix%'`. More general LIKE patterns
56+
/// are intentionally ignored because split pruning needs to stay conservative.
57+
#[derive(Debug, Clone, PartialEq, Eq)]
58+
pub(crate) struct StringPrefixFilter {
59+
pub column: String,
60+
pub prefixes: Vec<String>,
61+
}
62+
4063
/// Analyzes pushed-down filter expressions and extracts split-level filters.
4164
///
4265
/// Returns a `MetricsSplitQuery` for Postgres pruning plus any remaining
@@ -54,6 +77,134 @@ pub fn extract_split_filters(filters: &[Expr]) -> (MetricsSplitQuery, Vec<Expr>)
5477
(query, remaining)
5578
}
5679

80+
/// Extract string equality/IN predicates that are safe to evaluate against
81+
/// exact split metadata or zonemap superset regexes.
82+
pub(crate) fn extract_string_filters(filters: &[Expr]) -> Vec<StringFilter> {
83+
let mut by_column: HashMap<String, Vec<String>> = HashMap::new();
84+
85+
for filter in filters {
86+
collect_string_filters(filter, &mut by_column);
87+
}
88+
89+
by_column
90+
.into_iter()
91+
.map(|(column, values)| StringFilter { column, values })
92+
.collect()
93+
}
94+
95+
/// Extract simple string prefix predicates of the form `column LIKE 'prefix%'`.
96+
pub(crate) fn extract_string_prefix_filters(filters: &[Expr]) -> Vec<StringPrefixFilter> {
97+
let mut by_column: HashMap<String, Vec<String>> = HashMap::new();
98+
99+
for filter in filters {
100+
collect_string_prefix_filters(filter, &mut by_column);
101+
}
102+
103+
by_column
104+
.into_iter()
105+
.map(|(column, prefixes)| StringPrefixFilter { column, prefixes })
106+
.collect()
107+
}
108+
109+
fn collect_string_filters(expr: &Expr, by_column: &mut HashMap<String, Vec<String>>) {
110+
match expr {
111+
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
112+
Operator::Eq => {
113+
let Some((column, value)) = string_eq_filter(left, right) else {
114+
return;
115+
};
116+
tighten_string_values(by_column, column, vec![value]);
117+
}
118+
Operator::And => {
119+
collect_string_filters(left, by_column);
120+
collect_string_filters(right, by_column);
121+
}
122+
_ => {}
123+
},
124+
Expr::InList(in_list) if !in_list.negated => {
125+
let Some(column) = column_name(&in_list.expr) else {
126+
return;
127+
};
128+
let values: Vec<String> = in_list.list.iter().filter_map(scalar_utf8).collect();
129+
if values.len() == in_list.list.len() {
130+
tighten_string_values(by_column, column, values);
131+
}
132+
}
133+
_ => {}
134+
}
135+
}
136+
137+
fn collect_string_prefix_filters(expr: &Expr, by_column: &mut HashMap<String, Vec<String>>) {
138+
match expr {
139+
Expr::BinaryExpr(BinaryExpr { left, op, right }) if *op == Operator::And => {
140+
collect_string_prefix_filters(left, by_column);
141+
collect_string_prefix_filters(right, by_column);
142+
}
143+
Expr::Like(like) => {
144+
let Some((column, prefix)) = string_prefix_filter(like) else {
145+
return;
146+
};
147+
by_column.entry(column).or_default().push(prefix);
148+
}
149+
_ => {}
150+
}
151+
}
152+
153+
fn string_eq_filter(left: &Expr, right: &Expr) -> Option<(String, String)> {
154+
match (column_name(left), scalar_utf8(right)) {
155+
(Some(column), Some(value)) => Some((column, value)),
156+
_ => match (scalar_utf8(left), column_name(right)) {
157+
(Some(value), Some(column)) => Some((column, value)),
158+
_ => None,
159+
},
160+
}
161+
}
162+
163+
fn string_prefix_filter(like: &Like) -> Option<(String, String)> {
164+
if like.negated || like.case_insensitive || like.escape_char.is_some() {
165+
return None;
166+
}
167+
168+
let column = column_name(&like.expr)?;
169+
let pattern = scalar_utf8(&like.pattern)?;
170+
let prefix = simple_like_prefix(&pattern)?;
171+
Some((column, prefix))
172+
}
173+
174+
fn simple_like_prefix(pattern: &str) -> Option<String> {
175+
let prefix = pattern.strip_suffix('%')?;
176+
if prefix.is_empty() || prefix.contains(['%', '_']) {
177+
return None;
178+
}
179+
Some(prefix.to_string())
180+
}
181+
182+
fn tighten_string_values(
183+
by_column: &mut HashMap<String, Vec<String>>,
184+
column: String,
185+
values: Vec<String>,
186+
) {
187+
let mut values = dedup_strings(values);
188+
match by_column.get_mut(&column) {
189+
Some(existing) => {
190+
existing.retain(|value| values.contains(value));
191+
}
192+
None => {
193+
by_column.insert(column, std::mem::take(&mut values));
194+
}
195+
}
196+
}
197+
198+
fn dedup_strings(values: Vec<String>) -> Vec<String> {
199+
let mut deduped = Vec::with_capacity(values.len());
200+
for value in values {
201+
if !deduped.contains(&value) {
202+
deduped.push(value);
203+
}
204+
}
205+
deduped
206+
}
207+
57208
fn try_extract_filter(expr: &Expr, query: &mut MetricsSplitQuery) -> bool {
58209
match expr {
59210
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
@@ -214,6 +365,8 @@ fn scalar_utf8(expr: &Expr) -> Option<String> {
214365
Expr::Literal(ScalarValue::Utf8View(Some(s)), _) => Some(s.clone()),
215366
// DF auto-casts string literals to Dict(Int32, Utf8) to match dict-encoded columns
216367
Expr::Literal(ScalarValue::Dictionary(_, inner), _) => scalar_utf8_from_scalar(inner),
368+
Expr::Cast(datafusion::logical_expr::Cast { expr, .. })
369+
| Expr::TryCast(datafusion::logical_expr::TryCast { expr, .. }) => scalar_utf8(expr),
217370
_ => None,
218371
}
219372
}
@@ -279,6 +432,70 @@ mod tests {
279432
assert_eq!(remaining.len(), 2);
280433
}
281434

435+
#[test]
436+
fn test_extract_string_filters_for_tags() {
437+
let filters = vec![
438+
col("service").eq(lit("web")),
439+
col("env").in_list(vec![lit("prod"), lit("staging")], false),
440+
col("value").gt(lit(42.0)),
441+
];
442+
443+
let mut string_filters = extract_string_filters(&filters);
444+
string_filters.sort_by(|left, right| left.column.cmp(&right.column));
445+
446+
assert_eq!(
447+
string_filters,
448+
vec![
449+
StringFilter {
450+
column: "env".to_string(),
451+
values: vec!["prod".to_string(), "staging".to_string()],
452+
},
453+
StringFilter {
454+
column: "service".to_string(),
455+
values: vec!["web".to_string()],
456+
},
457+
]
458+
);
459+
}
460+
461+
#[test]
462+
fn test_extract_string_filters_intersects_repeated_column() {
463+
let filters = vec![
464+
col("service").in_list(vec![lit("web"), lit("api")], false),
465+
col("service").eq(lit("api")),
466+
];
467+
468+
let string_filters = extract_string_filters(&filters);
469+
470+
assert_eq!(
471+
string_filters,
472+
vec![StringFilter {
473+
column: "service".to_string(),
474+
values: vec!["api".to_string()],
475+
}]
476+
);
477+
}
478+
479+
#[test]
480+
fn test_extract_string_prefix_filters_for_simple_like() {
481+
let filters = vec![
482+
col("host").like(lit("ID-07%")),
483+
col("env").like(lit("prod")),
484+
col("service").ilike(lit("web%")),
485+
col("region").like(lit("us_%")),
486+
];
487+
488+
let prefix_filters = extract_string_prefix_filters(&filters);
489+
490+
assert_eq!(
491+
prefix_filters,
492+
vec![StringPrefixFilter {
493+
column: "host".to_string(),
494+
prefixes: vec!["ID-07".to_string()],
495+
}]
496+
);
497+
}
498+
282499
#[test]
283500
fn test_unknown_column_left_as_remaining() {
284501
let filters = vec![

0 commit comments

Comments
 (0)