Skip to content

Commit 7c34911

Browse files
LLDayaskalt
authored andcommitted
feat(physical-plan): introduce binder
1 parent 431445e commit 7c34911

11 files changed

Lines changed: 606 additions & 127 deletions

File tree

datafusion/core/benches/reset_plan_states.rs

Lines changed: 172 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::cell::OnceCell;
1819
use std::sync::{Arc, LazyLock};
1920

2021
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
22+
use criterion::measurement::WallTime;
2123
use criterion::{Criterion, criterion_group, criterion_main};
2224
use datafusion::prelude::SessionContext;
2325
use datafusion_catalog::MemTable;
26+
use datafusion_common::metadata::ScalarAndMetadata;
27+
use datafusion_common::{ParamValues, ScalarValue};
2428
use datafusion_physical_plan::ExecutionPlan;
29+
use datafusion_physical_plan::bind::Binder;
2530
use datafusion_physical_plan::displayable;
2631
use datafusion_physical_plan::execution_plan::reset_plan_states;
2732
use tokio::runtime::Runtime;
@@ -37,6 +42,42 @@ static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
3742
))
3843
});
3944

45+
/// Decides when to generate placeholders, helping to form a query
46+
/// with a certain placeholders percent.
47+
struct PlaceholderGen {
48+
placeholders_percent: usize,
49+
c: usize,
50+
num_placeholders: usize,
51+
}
52+
53+
impl PlaceholderGen {
54+
fn new(placeholders_percent: usize) -> Self {
55+
Self {
56+
placeholders_percent,
57+
c: 0,
58+
num_placeholders: 0,
59+
}
60+
}
61+
62+
fn placeholder(&mut self) -> Option<String> {
63+
let is_placeholder = self.c < self.placeholders_percent;
64+
self.c += 1;
65+
if self.c >= 100 {
66+
self.c = 0;
67+
}
68+
if is_placeholder {
69+
self.num_placeholders += 1;
70+
Some("$1".to_owned())
71+
} else {
72+
None
73+
}
74+
}
75+
76+
fn placeholder_or(&mut self, f: impl FnOnce() -> String) -> String {
77+
self.placeholder().unwrap_or_else(f)
78+
}
79+
}
80+
4081
fn col_name(i: usize) -> String {
4182
format!("x_{i}")
4283
}
@@ -47,7 +88,7 @@ fn aggr_name(i: usize) -> String {
4788

4889
fn physical_plan(
4990
ctx: &SessionContext,
50-
rt: &Runtime,
91+
rt: &tokio::runtime::Handle,
5192
sql: &str,
5293
) -> Arc<dyn ExecutionPlan> {
5394
rt.block_on(async {
@@ -60,15 +101,16 @@ fn physical_plan(
60101
})
61102
}
62103

63-
fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
104+
fn predicate(mut comparee: impl FnMut(usize) -> (String, String), len: usize) -> String {
64105
let mut predicate = String::new();
65106
for i in 0..len {
66107
if i > 0 {
67108
predicate.push_str(" AND ");
68109
}
69-
predicate.push_str(&col_name(i));
110+
let (lhs, rhs) = comparee(i);
111+
predicate.push_str(&lhs);
70112
predicate.push_str(" = ");
71-
predicate.push_str(&i.to_string());
113+
predicate.push_str(&rhs);
72114
}
73115
predicate
74116
}
@@ -83,23 +125,46 @@ fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
83125
///
84126
/// Where `p1` and `p2` some long predicates.
85127
///
86-
fn query1() -> String {
128+
fn query0(placeholders_percent: usize) -> (String, usize) {
129+
let mut plg = PlaceholderGen::new(placeholders_percent);
87130
let mut query = String::new();
88131
query.push_str("SELECT ");
89132
for i in 0..NUM_FIELDS {
90133
if i > 0 {
91134
query.push_str(", ");
92135
}
93136
query.push_str("AVG(");
94-
query.push_str(&col_name(i));
137+
138+
if let Some(placeholder) = plg.placeholder() {
139+
query.push_str(&format!("{}+{}", placeholder, col_name(i)));
140+
} else {
141+
query.push_str(&col_name(i));
142+
}
143+
95144
query.push_str(") AS ");
96145
query.push_str(&aggr_name(i));
97146
}
98147
query.push_str(" FROM t WHERE ");
99-
query.push_str(&predicate(col_name, PREDICATE_LEN));
148+
query.push_str(&predicate(
149+
|i| {
150+
(
151+
plg.placeholder_or(|| col_name(i)),
152+
plg.placeholder_or(|| col_name(i + 1)),
153+
)
154+
},
155+
PREDICATE_LEN,
156+
));
100157
query.push_str(" HAVING ");
101-
query.push_str(&predicate(aggr_name, PREDICATE_LEN));
102-
query
158+
query.push_str(&predicate(
159+
|i| {
160+
(
161+
plg.placeholder_or(|| aggr_name(i)),
162+
plg.placeholder_or(|| aggr_name(i + 1)),
163+
)
164+
},
165+
PREDICATE_LEN,
166+
));
167+
(query, plg.num_placeholders)
103168
}
104169

105170
/// Returns a typical plan for the query like:
@@ -109,27 +174,35 @@ fn query1() -> String {
109174
/// WHERE p1
110175
/// ```
111176
///
112-
fn query2() -> String {
177+
fn query1(placeholders_percent: usize) -> (String, usize) {
178+
let mut plg = PlaceholderGen::new(placeholders_percent);
113179
let mut query = String::new();
114180
query.push_str("SELECT ");
115181
for i in (0..NUM_FIELDS).step_by(2) {
116182
if i > 0 {
117183
query.push_str(", ");
118184
}
119-
if (i / 2) % 2 == 0 {
120-
query.push_str(&format!("t.{}", col_name(i)));
185+
let col = if (i / 2) % 2 == 0 {
186+
format!("t.{}", col_name(i))
121187
} else {
122-
query.push_str(&format!("v.{}", col_name(i)));
123-
}
188+
format!("v.{}", col_name(i))
189+
};
190+
let add = plg.placeholder_or(|| "1".to_owned());
191+
let proj = format!("{col} + {add}");
192+
query.push_str(&proj);
124193
}
125194
query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE ");
126195

127-
fn qualified_name(i: usize) -> String {
128-
format!("t.{}", col_name(i))
129-
}
130-
131-
query.push_str(&predicate(qualified_name, PREDICATE_LEN));
132-
query
196+
query.push_str(&predicate(
197+
|i| {
198+
(
199+
plg.placeholder_or(|| format!("t.{}", col_name(i))),
200+
plg.placeholder_or(|| i.to_string()),
201+
)
202+
},
203+
PREDICATE_LEN,
204+
));
205+
(query, plg.num_placeholders)
133206
}
134207

135208
/// Returns a typical plan for the query like:
@@ -139,7 +212,8 @@ fn query2() -> String {
139212
/// WHERE p
140213
/// ```
141214
///
142-
fn query3() -> String {
215+
fn query2(placeholders_percent: usize) -> (String, usize) {
216+
let mut plg = PlaceholderGen::new(placeholders_percent);
143217
let mut query = String::new();
144218
query.push_str("SELECT ");
145219

@@ -150,24 +224,23 @@ fn query3() -> String {
150224
}
151225
query.push_str(&col_name(i * 2));
152226
query.push_str(" + ");
153-
query.push_str(&col_name(i * 2 + 1));
227+
query.push_str(&plg.placeholder_or(|| col_name(i * 2 + 1)));
154228
}
155229

156230
query.push_str(" FROM t WHERE ");
157-
query.push_str(&predicate(col_name, PREDICATE_LEN));
158-
query
231+
query.push_str(&predicate(
232+
|i| {
233+
(
234+
plg.placeholder_or(|| col_name(i)),
235+
plg.placeholder_or(|| i.to_string()),
236+
)
237+
},
238+
PREDICATE_LEN,
239+
));
240+
(query, plg.num_placeholders)
159241
}
160242

161-
fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
162-
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()));
163-
}
164-
165-
/// Benchmark is intended to measure overhead of actions, required to perform
166-
/// making an independent instance of the execution plan to re-execute it, avoiding
167-
/// re-planning stage.
168-
fn bench_reset_plan_states(c: &mut Criterion) {
169-
env_logger::init();
170-
243+
fn init() -> (SessionContext, Runtime) {
171244
let rt = Runtime::new().unwrap();
172245
let ctx = SessionContext::new();
173246
ctx.register_table(
@@ -181,20 +254,73 @@ fn bench_reset_plan_states(c: &mut Criterion) {
181254
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
182255
)
183256
.unwrap();
257+
(ctx, rt)
258+
}
184259

185-
macro_rules! bench_query {
186-
($query_producer: expr) => {{
187-
let sql = $query_producer();
188-
let plan = physical_plan(&ctx, &rt, &sql);
189-
log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
190-
move |b| run_reset_states(b, &plan)
191-
}};
192-
}
260+
/// Benchmark is intended to measure overhead of actions, required to perform
261+
/// making an independent instance of the execution plan to re-execute it, avoiding
262+
/// re-planning stage.
263+
fn bench_reset(
264+
g: &mut criterion::BenchmarkGroup<'_, WallTime>,
265+
query_fn: impl FnOnce() -> String,
266+
) {
267+
let (ctx, rt) = init();
268+
let query = query_fn();
269+
let rt = rt.handle();
270+
let plan: OnceCell<Arc<dyn ExecutionPlan>> = OnceCell::new();
271+
g.bench_function("reset", |b| {
272+
let plan = plan.get_or_init(|| {
273+
log::info!("sql:\n{}\n\n", query);
274+
let plan = physical_plan(&ctx, rt, &query);
275+
log::info!("plan:\n{}", displayable(plan.as_ref()).indent(true));
276+
plan
277+
});
278+
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()))
279+
});
280+
}
281+
282+
/// The same as [`bench_reset`] for placeholdered plans.
283+
/// `placeholders_percent` is a percent of placeholders that must be used in generated queries.
284+
fn bench_bind(
285+
g: &mut criterion::BenchmarkGroup<'_, WallTime>,
286+
placeholders_percent: usize,
287+
query_fn: impl FnOnce(usize) -> (String, usize),
288+
) {
289+
let (ctx, rt) = init();
290+
let params = ParamValues::List(vec![ScalarAndMetadata::new(
291+
ScalarValue::Int64(Some(42)),
292+
None,
293+
)]);
294+
let (query, num_placeholders) = query_fn(placeholders_percent);
295+
let rt = rt.handle();
296+
let binder: OnceCell<Binder> = OnceCell::new();
297+
g.bench_function(format!("{num_placeholders}_placeholders"), move |b| {
298+
let binder = binder.get_or_init(|| {
299+
log::info!("sql:\n{}\n\n", query);
300+
let plan = physical_plan(&ctx, rt, &query);
301+
log::info!("plan:\n{}", displayable(plan.as_ref()).indent(true));
302+
plan.into()
303+
});
304+
b.iter(|| std::hint::black_box(binder.bind(Some(&params))))
305+
});
306+
}
193307

194-
c.bench_function("query1", bench_query!(query1));
195-
c.bench_function("query2", bench_query!(query2));
196-
c.bench_function("query3", bench_query!(query3));
308+
fn criterion_benchmark(c: &mut Criterion) {
309+
env_logger::init();
310+
311+
for (query_idx, query_fn) in [query0, query1, query2].iter().enumerate() {
312+
{
313+
let mut g = c.benchmark_group(format!("reset_query{query_idx}"));
314+
bench_reset(&mut g, || query_fn(0).0);
315+
}
316+
{
317+
let mut g = c.benchmark_group(format!("bind_query{query_idx}"));
318+
for placeholders_percent in [0, 1, 10, 50, 100] {
319+
bench_bind(&mut g, placeholders_percent, query_fn);
320+
}
321+
}
322+
}
197323
}
198324

199-
criterion_group!(benches, bench_reset_plan_states);
325+
criterion_group!(benches, criterion_benchmark);
200326
criterion_main!(benches);

0 commit comments

Comments
 (0)