Skip to content

Commit af6737e

Browse files
support sorted series column
1 parent 5d24f57 commit af6737e

7 files changed

Lines changed: 422 additions & 30 deletions

File tree

quickwit/quickwit-datafusion/src/sources/metrics/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
pub(crate) mod factory;
2424
pub(crate) mod index_resolver;
2525
pub(crate) mod metastore_provider;
26+
pub(crate) mod optimizer;
2627
pub(crate) mod predicate;
2728
pub(crate) mod table_provider;
2829

@@ -46,6 +47,7 @@ use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient};
4647

4748
use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory};
4849
use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver};
50+
use self::optimizer::SortedSeriesStreamingAggregateRule;
4951
use self::table_provider::MetricsTableProvider;
5052

5153
/// Returns `true` when `err` wraps a [`MetastoreError::NotFound`].
@@ -210,7 +212,16 @@ impl QuickwitRuntimePlugin for MetricsDataSource {
210212
let factory: Arc<dyn TableProviderFactory> = Arc::new(MetricsTableProviderFactory::new(
211213
Arc::clone(&self.index_resolver),
212214
));
213-
QuickwitRuntimeRegistration::default().with_table_factory(METRICS_FILE_TYPE, factory)
215+
QuickwitRuntimeRegistration::default()
216+
.with_session_config_setter(|config| {
217+
config
218+
.options_mut()
219+
.optimizer
220+
.enable_round_robin_repartition = false;
221+
config.options_mut().optimizer.repartition_file_scans = false;
222+
})
223+
.with_physical_optimizer_rule(Arc::new(SortedSeriesStreamingAggregateRule))
224+
.with_table_factory(METRICS_FILE_TYPE, factory)
214225
}
215226
}
216227

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Physical rewrites for sorted-series metrics rollups.
16+
17+
use std::sync::Arc;
18+
19+
use datafusion::common::tree_node::{Transformed, TreeNode};
20+
use datafusion::config::ConfigOptions;
21+
use datafusion::error::Result as DFResult;
22+
use datafusion::physical_expr::expressions::Column;
23+
use datafusion::physical_expr::{LexOrdering, Partitioning};
24+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
25+
use datafusion::physical_plan::ExecutionPlan;
26+
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
27+
use datafusion::physical_plan::repartition::RepartitionExec;
28+
use datafusion::physical_plan::sorts::sort::SortExec;
29+
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
30+
use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN;
31+
32+
/// Replaces the inner sorted-series hash repartition in rollup plans with a
33+
/// sort-preserving merge into a single final aggregate.
34+
///
35+
/// This keeps worker/file-local partial aggregation parallel, then lets the
36+
/// coordinator stitch ordered per-series partial rows without hash-shuffling
37+
/// those partials by `sorted_series`.
38+
#[derive(Debug, Default)]
39+
pub struct SortedSeriesStreamingAggregateRule;
40+
41+
impl PhysicalOptimizerRule for SortedSeriesStreamingAggregateRule {
42+
fn optimize(
43+
&self,
44+
plan: Arc<dyn ExecutionPlan>,
45+
_config: &ConfigOptions,
46+
) -> DFResult<Arc<dyn ExecutionPlan>> {
47+
let transformed = plan.transform_up(|plan| {
48+
if let Some(rewritten) = rewrite_sorted_series_final_aggregate(&plan)? {
49+
Ok(Transformed::yes(rewritten))
50+
} else {
51+
Ok(Transformed::no(plan))
52+
}
53+
})?;
54+
Ok(transformed.data)
55+
}
56+
57+
fn name(&self) -> &str {
58+
"sorted_series_streaming_aggregate"
59+
}
60+
61+
fn schema_check(&self) -> bool {
62+
true
63+
}
64+
}
65+
66+
fn rewrite_sorted_series_final_aggregate(
67+
plan: &Arc<dyn ExecutionPlan>,
68+
) -> DFResult<Option<Arc<dyn ExecutionPlan>>> {
69+
let Some(final_agg) = plan.as_any().downcast_ref::<AggregateExec>() else {
70+
return Ok(None);
71+
};
72+
if final_agg.mode() != &AggregateMode::FinalPartitioned
73+
|| !aggregate_groups_on_sorted_series(final_agg)
74+
{
75+
return Ok(None);
76+
}
77+
78+
let Some(sort) = final_agg.input().as_any().downcast_ref::<SortExec>() else {
79+
return Ok(None);
80+
};
81+
if !sort.preserve_partitioning()
82+
|| sort.fetch().is_some()
83+
|| !ordering_starts_with_sorted_series(sort.expr())
84+
{
85+
return Ok(None);
86+
}
87+
88+
let Some(repartition) = sort.input().as_any().downcast_ref::<RepartitionExec>() else {
89+
return Ok(None);
90+
};
91+
if !hash_partitioning_contains_sorted_series(repartition.partitioning()) {
92+
return Ok(None);
93+
}
94+
95+
let ordering = sort.expr().clone();
96+
let partition_sort: Arc<dyn ExecutionPlan> = Arc::new(
97+
SortExec::new(ordering.clone(), Arc::clone(repartition.input()))
98+
.with_preserve_partitioning(true),
99+
);
100+
let merged: Arc<dyn ExecutionPlan> =
101+
Arc::new(SortPreservingMergeExec::new(ordering, partition_sort));
102+
103+
let rewritten = AggregateExec::try_new(
104+
AggregateMode::Final,
105+
final_agg.group_expr().clone(),
106+
final_agg.aggr_expr().to_vec(),
107+
final_agg.filter_expr().to_vec(),
108+
merged,
109+
final_agg.input_schema(),
110+
)?
111+
.with_limit_options(final_agg.limit_options());
112+
113+
Ok(Some(Arc::new(rewritten)))
114+
}
115+
116+
fn aggregate_groups_on_sorted_series(aggregate: &AggregateExec) -> bool {
117+
aggregate
118+
.group_expr()
119+
.expr()
120+
.iter()
121+
.any(|(expr, alias)| alias == SORTED_SERIES_COLUMN || is_sorted_series_column(expr))
122+
}
123+
124+
fn hash_partitioning_contains_sorted_series(partitioning: &Partitioning) -> bool {
125+
let Partitioning::Hash(exprs, _) = partitioning else {
126+
return false;
127+
};
128+
exprs.iter().any(is_sorted_series_column)
129+
}
130+
131+
fn ordering_starts_with_sorted_series(ordering: &LexOrdering) -> bool {
132+
is_sorted_series_column(&ordering.first().expr)
133+
}
134+
135+
fn is_sorted_series_column(expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>) -> bool {
136+
expr.as_any()
137+
.downcast_ref::<Column>()
138+
.is_some_and(|column| column.name() == SORTED_SERIES_COLUMN)
139+
}

0 commit comments

Comments
 (0)