Skip to content

Commit 697ff0e

Browse files
Support sorted series column for DF metrics (#6347)
* support sorted series column * respond to review
1 parent 5d24f57 commit 697ff0e

7 files changed

Lines changed: 441 additions & 30 deletions

File tree

quickwit/quickwit-datafusion/src/sources/metrics/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
pub(crate) mod factory;
2424
pub(crate) mod index_resolver;
2525
pub(crate) mod metastore_provider;
26+
pub(crate) mod optimizer;
2627
pub(crate) mod predicate;
2728
pub(crate) mod table_provider;
2829

@@ -46,6 +47,7 @@ use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient};
4647

4748
use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory};
4849
use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver};
50+
use self::optimizer::SortedSeriesStreamingAggregateRule;
4951
use self::table_provider::MetricsTableProvider;
5052

5153
/// Returns `true` when `err` wraps a [`MetastoreError::NotFound`].
@@ -210,7 +212,16 @@ impl QuickwitRuntimePlugin for MetricsDataSource {
210212
let factory: Arc<dyn TableProviderFactory> = Arc::new(MetricsTableProviderFactory::new(
211213
Arc::clone(&self.index_resolver),
212214
));
213-
QuickwitRuntimeRegistration::default().with_table_factory(METRICS_FILE_TYPE, factory)
215+
QuickwitRuntimeRegistration::default()
216+
.with_session_config_setter(|config| {
217+
config
218+
.options_mut()
219+
.optimizer
220+
.enable_round_robin_repartition = false;
221+
config.options_mut().optimizer.repartition_file_scans = false;
222+
})
223+
.with_physical_optimizer_rule(Arc::new(SortedSeriesStreamingAggregateRule))
224+
.with_table_factory(METRICS_FILE_TYPE, factory)
214225
}
215226
}
216227

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Physical rewrites for sorted-series metrics rollups.
16+
17+
use std::sync::Arc;
18+
19+
use datafusion::common::tree_node::{Transformed, TreeNode};
20+
use datafusion::config::ConfigOptions;
21+
use datafusion::error::Result as DFResult;
22+
use datafusion::physical_expr::expressions::Column;
23+
use datafusion::physical_expr::{LexOrdering, Partitioning};
24+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
25+
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
26+
use datafusion::physical_plan::repartition::RepartitionExec;
27+
use datafusion::physical_plan::sorts::sort::SortExec;
28+
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
29+
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
30+
use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN;
31+
32+
/// Replaces the inner sorted-series hash repartition in rollup plans with a
33+
/// sort-preserving merge into a single final aggregate.
34+
///
35+
/// This keeps worker/file-local partial aggregation parallel, then lets the
36+
/// coordinator stitch ordered per-series partial rows without hash-shuffling
37+
/// those partials by `sorted_series`.
38+
#[derive(Debug, Default)]
39+
pub struct SortedSeriesStreamingAggregateRule;
40+
41+
impl PhysicalOptimizerRule for SortedSeriesStreamingAggregateRule {
42+
fn optimize(
43+
&self,
44+
plan: Arc<dyn ExecutionPlan>,
45+
_config: &ConfigOptions,
46+
) -> DFResult<Arc<dyn ExecutionPlan>> {
47+
let transformed = plan.transform_up(|plan| {
48+
if let Some(rewritten) = rewrite_sorted_series_final_aggregate(&plan)? {
49+
Ok(Transformed::yes(rewritten))
50+
} else {
51+
Ok(Transformed::no(plan))
52+
}
53+
})?;
54+
Ok(transformed.data)
55+
}
56+
57+
fn name(&self) -> &str {
58+
"sorted_series_streaming_aggregate"
59+
}
60+
61+
fn schema_check(&self) -> bool {
62+
true
63+
}
64+
}
65+
66+
fn rewrite_sorted_series_final_aggregate(
67+
plan: &Arc<dyn ExecutionPlan>,
68+
) -> DFResult<Option<Arc<dyn ExecutionPlan>>> {
69+
let Some(final_agg) = plan.as_any().downcast_ref::<AggregateExec>() else {
70+
return Ok(None);
71+
};
72+
if final_agg.mode() != &AggregateMode::FinalPartitioned
73+
|| !aggregate_groups_on_sorted_series(final_agg)
74+
{
75+
return Ok(None);
76+
}
77+
78+
let Some(sort) = final_agg.input().as_any().downcast_ref::<SortExec>() else {
79+
return Ok(None);
80+
};
81+
if !sort.preserve_partitioning()
82+
|| sort.fetch().is_some()
83+
|| !ordering_starts_with_sorted_series(sort.expr())
84+
{
85+
return Ok(None);
86+
}
87+
88+
let Some(repartition) = sort.input().as_any().downcast_ref::<RepartitionExec>() else {
89+
return Ok(None);
90+
};
91+
if !hash_partitioning_contains_sorted_series(repartition.partitioning()) {
92+
return Ok(None);
93+
}
94+
95+
let ordering = sort.expr().clone();
96+
let repartition_input = Arc::clone(repartition.input());
97+
let partition_sort: Arc<dyn ExecutionPlan> = if repartition_input
98+
.equivalence_properties()
99+
.ordering_satisfy(ordering.clone())?
100+
{
101+
repartition_input
102+
} else {
103+
Arc::new(
104+
SortExec::new(ordering.clone(), repartition_input).with_preserve_partitioning(true),
105+
)
106+
};
107+
let merged: Arc<dyn ExecutionPlan> =
108+
Arc::new(SortPreservingMergeExec::new(ordering, partition_sort));
109+
110+
let rewritten = AggregateExec::try_new(
111+
AggregateMode::Final,
112+
final_agg.group_expr().clone(),
113+
final_agg.aggr_expr().to_vec(),
114+
final_agg.filter_expr().to_vec(),
115+
merged,
116+
final_agg.input_schema(),
117+
)?
118+
.with_limit_options(final_agg.limit_options());
119+
120+
Ok(Some(Arc::new(rewritten)))
121+
}
122+
123+
fn aggregate_groups_on_sorted_series(aggregate: &AggregateExec) -> bool {
124+
aggregate
125+
.group_expr()
126+
.expr()
127+
.iter()
128+
.any(|(expr, alias)| alias == SORTED_SERIES_COLUMN || is_sorted_series_column(expr))
129+
}
130+
131+
fn hash_partitioning_contains_sorted_series(partitioning: &Partitioning) -> bool {
132+
let Partitioning::Hash(exprs, _) = partitioning else {
133+
return false;
134+
};
135+
exprs.iter().any(is_sorted_series_column)
136+
}
137+
138+
fn ordering_starts_with_sorted_series(ordering: &LexOrdering) -> bool {
139+
is_sorted_series_column(&ordering.first().expr)
140+
}
141+
142+
fn is_sorted_series_column(expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>) -> bool {
143+
match expr.as_any().downcast_ref::<Column>() {
144+
Some(column) => column.name() == SORTED_SERIES_COLUMN,
145+
None => false,
146+
}
147+
}

0 commit comments

Comments
 (0)