Skip to content

Commit 03b0289

Browse files
committed
Refactor min/max batch handling and simplify JSON logic
Collapse repeated complex-type match arms in min_batch and max_batch. Introduce a shared update_extreme path for the min/max accumulators. Simplify the ordered_keys function and factor the repeated pretty-print container logic into write_delimited_entries. Use a local macro for JSON stringification deduplication in display.rs to maintain behavior without adding dependencies.
1 parent df4078c commit 03b0289

File tree

2 files changed

+92
-81
lines changed

2 files changed

+92
-81
lines changed

datafusion/expr/src/logical_plan/display.rs

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -295,33 +295,69 @@ fn ordered_keys(map: &serde_json::Map<String, serde_json::Value>) -> Vec<&str> {
295295
return keys;
296296
}
297297

298-
let mut ordered_keys = Vec::with_capacity(keys.len());
298+
let mut remaining_keys = Vec::with_capacity(keys.len().saturating_sub(1));
299+
let mut has_plans_key = false;
300+
let mut has_output_key = false;
301+
for key in keys {
302+
match key {
303+
NODE_TYPE_KEY => {}
304+
PLANS_KEY => has_plans_key = true,
305+
OUTPUT_KEY => has_output_key = true,
306+
_ => remaining_keys.push(key),
307+
}
308+
}
309+
310+
let mut ordered_keys = Vec::with_capacity(map.len());
299311
ordered_keys.push(NODE_TYPE_KEY);
300-
ordered_keys.extend(
301-
keys.iter()
302-
.copied()
303-
.filter(|key| !matches!(*key, NODE_TYPE_KEY | PLANS_KEY | OUTPUT_KEY)),
304-
);
305-
ordered_keys.extend(
306-
[PLANS_KEY, OUTPUT_KEY]
307-
.into_iter()
308-
.filter(|key| map.contains_key(*key)),
309-
);
312+
ordered_keys.extend(remaining_keys);
313+
if has_plans_key {
314+
ordered_keys.push(PLANS_KEY);
315+
}
316+
if has_output_key {
317+
ordered_keys.push(OUTPUT_KEY);
318+
}
310319
ordered_keys
311320
}
312321

313-
fn json_to_string(value: &serde_json::Value) -> datafusion_common::Result<String> {
314-
serde_json::to_string(value).map_err(|e| DataFusionError::External(Box::new(e)))
315-
}
316-
317-
fn json_key_to_string(key: &str) -> datafusion_common::Result<String> {
318-
serde_json::to_string(key).map_err(|e| DataFusionError::External(Box::new(e)))
322+
macro_rules! to_json_string {
323+
($value:expr) => {
324+
serde_json::to_string($value).map_err(|e| DataFusionError::External(Box::new(e)))
325+
};
319326
}
320327

321328
fn push_indent(buf: &mut String, indent: usize) {
322329
buf.push_str(&" ".repeat(indent * 2));
323330
}
324331

332+
fn write_delimited_entries(
333+
buf: &mut String,
334+
indent: usize,
335+
open: char,
336+
close: char,
337+
len: usize,
338+
mut write_entry: impl FnMut(usize, &mut String) -> datafusion_common::Result<()>,
339+
) -> datafusion_common::Result<()> {
340+
if len == 0 {
341+
buf.push(open);
342+
buf.push(close);
343+
return Ok(());
344+
}
345+
346+
buf.push(open);
347+
buf.push('\n');
348+
for idx in 0..len {
349+
push_indent(buf, indent + 1);
350+
write_entry(idx, buf)?;
351+
if idx + 1 != len {
352+
buf.push(',');
353+
}
354+
buf.push('\n');
355+
}
356+
push_indent(buf, indent);
357+
buf.push(close);
358+
Ok(())
359+
}
360+
325361
fn write_ordered_json(
326362
value: &serde_json::Value,
327363
buf: &mut String,
@@ -332,55 +368,25 @@ fn write_ordered_json(
332368
| serde_json::Value::Bool(_)
333369
| serde_json::Value::Number(_)
334370
| serde_json::Value::String(_) => {
335-
buf.push_str(&json_to_string(value)?);
371+
buf.push_str(&to_json_string!(value)?);
336372
}
337373
serde_json::Value::Array(values) => {
338-
if values.is_empty() {
339-
buf.push_str("[]");
340-
return Ok(());
341-
}
342-
343-
buf.push('[');
344-
buf.push('\n');
345-
for (idx, value) in values.iter().enumerate() {
346-
push_indent(buf, indent + 1);
347-
write_ordered_json(value, buf, indent + 1)?;
348-
if idx + 1 != values.len() {
349-
buf.push(',');
350-
}
351-
buf.push('\n');
352-
}
353-
push_indent(buf, indent);
354-
buf.push(']');
374+
write_delimited_entries(buf, indent, '[', ']', values.len(), |idx, buf| {
375+
write_ordered_json(&values[idx], buf, indent + 1)
376+
})?;
355377
}
356378
serde_json::Value::Object(map) => {
357-
if map.is_empty() {
358-
buf.push_str("{}");
359-
return Ok(());
360-
}
361-
362-
buf.push('{');
363-
buf.push('\n');
364-
365379
let keys = ordered_keys(map);
366-
for (idx, key) in keys.iter().enumerate() {
380+
write_delimited_entries(buf, indent, '{', '}', keys.len(), |idx, buf| {
381+
let key = keys[idx];
367382
let value = map
368-
.get(*key)
383+
.get(key)
369384
.ok_or_else(|| internal_datafusion_err!("Missing key in object!"))?;
370385

371-
push_indent(buf, indent + 1);
372-
buf.push_str(&json_key_to_string(key)?);
386+
buf.push_str(&to_json_string!(key)?);
373387
buf.push_str(": ");
374-
write_ordered_json(value, buf, indent + 1)?;
375-
376-
if idx + 1 != keys.len() {
377-
buf.push(',');
378-
}
379-
buf.push('\n');
380-
}
381-
382-
push_indent(buf, indent);
383-
buf.push('}');
388+
write_ordered_json(value, buf, indent + 1)
389+
})?;
384390
}
385391
}
386392

datafusion/functions-aggregate-common/src/min_max.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,23 @@ fn wrap_dictionary_scalar(key_type: &DataType, value: ScalarValue) -> ScalarValu
469469
ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(value))
470470
}
471471

472+
fn update_extreme(
473+
extreme: &mut ScalarValue,
474+
values: &[ArrayRef],
475+
batch_extreme: fn(&ArrayRef) -> Result<ScalarValue>,
476+
is_max: bool,
477+
) -> Result<()> {
478+
let delta = batch_extreme(&values[0])?;
479+
let current = &*extreme;
480+
let next: Result<ScalarValue, DataFusionError> = if is_max {
481+
min_max!(current, &delta, max)
482+
} else {
483+
min_max!(current, &delta, min)
484+
};
485+
*extreme = next?;
486+
Ok(())
487+
}
488+
472489
/// An accumulator to compute the maximum value
473490
#[derive(Debug, Clone)]
474491
pub struct MaxAccumulator {
@@ -486,12 +503,7 @@ impl MaxAccumulator {
486503

487504
impl Accumulator for MaxAccumulator {
488505
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
489-
let values = &values[0];
490-
let delta = &max_batch(values)?;
491-
let new_max: Result<ScalarValue, DataFusionError> =
492-
min_max!(&self.max, delta, max);
493-
self.max = new_max?;
494-
Ok(())
506+
update_extreme(&mut self.max, values, max_batch, true)
495507
}
496508

497509
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -531,12 +543,7 @@ impl Accumulator for MinAccumulator {
531543
}
532544

533545
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
534-
let values = &values[0];
535-
let delta = &min_batch(values)?;
536-
let new_min: Result<ScalarValue, DataFusionError> =
537-
min_max!(&self.min, delta, min);
538-
self.min = new_min?;
539-
Ok(())
546+
update_extreme(&mut self.min, values, min_batch, false)
540547
}
541548

542549
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -806,13 +813,11 @@ pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
806813
min_binary_view
807814
)
808815
}
809-
DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
810-
DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
811-
DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
812-
DataType::FixedSizeList(_, _) => {
813-
min_max_batch_generic(values, Ordering::Greater)?
814-
}
815-
DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Greater)?,
816+
DataType::Struct(_)
817+
| DataType::List(_)
818+
| DataType::LargeList(_)
819+
| DataType::FixedSizeList(_, _)
820+
| DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Greater)?,
816821
_ => min_max_batch!(values, min),
817822
})
818823
}
@@ -868,11 +873,11 @@ pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
868873
let value = value.map(|e| e.to_vec());
869874
ScalarValue::FixedSizeBinary(*size, value)
870875
}
871-
DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
872-
DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
873-
DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
874-
DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
875-
DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Less)?,
876+
DataType::Struct(_)
877+
| DataType::List(_)
878+
| DataType::LargeList(_)
879+
| DataType::FixedSizeList(_, _)
880+
| DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Less)?,
876881
_ => min_max_batch!(values, max),
877882
})
878883
}

0 commit comments

Comments
 (0)