Skip to content

Commit a93669b

Browse files
committed
Implement deterministic pgjson serialization
Add deterministic JSON serialization that maintains consistent key ordering regardless of map ordering. Introduce ordered key logic in display.rs, along with a new pretty-writer. Update root pgjson emission to use the ordered writer and add a regression test to verify key sequence for Values plan output.
1 parent a85804b commit a93669b

File tree

2 files changed

+128
-6
lines changed

2 files changed

+128
-6
lines changed

datafusion/expr/src/logical_plan/display.rs

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,107 @@ pub struct PgJsonVisitor<'a, 'b> {
283283
parent_ids: Vec<u32>,
284284
}
285285

286+
fn ordered_keys(map: &serde_json::Map<String, serde_json::Value>) -> Vec<&str> {
287+
let mut ordered_keys = vec![];
288+
289+
// pgjson node objects are always emitted as:
290+
// Node Type, node-specific keys, Plans, Output
291+
if map.contains_key("Node Type") {
292+
ordered_keys.push("Node Type");
293+
294+
let mut middle_keys = map
295+
.keys()
296+
.map(String::as_str)
297+
.filter(|k| *k != "Node Type" && *k != "Plans" && *k != "Output")
298+
.collect::<Vec<_>>();
299+
middle_keys.sort_unstable();
300+
ordered_keys.extend(middle_keys);
301+
302+
if map.contains_key("Plans") {
303+
ordered_keys.push("Plans");
304+
}
305+
306+
if map.contains_key("Output") {
307+
ordered_keys.push("Output");
308+
}
309+
} else {
310+
let mut keys = map.keys().map(String::as_str).collect::<Vec<_>>();
311+
keys.sort_unstable();
312+
ordered_keys.extend(keys);
313+
}
314+
315+
ordered_keys
316+
}
317+
318+
fn write_ordered_json(
319+
value: &serde_json::Value,
320+
buf: &mut String,
321+
indent: usize,
322+
) -> datafusion_common::Result<()> {
323+
match value {
324+
serde_json::Value::Null
325+
| serde_json::Value::Bool(_)
326+
| serde_json::Value::Number(_)
327+
| serde_json::Value::String(_) => {
328+
let scalar = serde_json::to_string(value)
329+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
330+
buf.push_str(&scalar);
331+
}
332+
serde_json::Value::Array(values) => {
333+
if values.is_empty() {
334+
buf.push_str("[]");
335+
return Ok(());
336+
}
337+
338+
buf.push('[');
339+
buf.push('\n');
340+
for (idx, value) in values.iter().enumerate() {
341+
buf.push_str(&" ".repeat((indent + 1) * 2));
342+
write_ordered_json(value, buf, indent + 1)?;
343+
if idx + 1 != values.len() {
344+
buf.push(',');
345+
}
346+
buf.push('\n');
347+
}
348+
buf.push_str(&" ".repeat(indent * 2));
349+
buf.push(']');
350+
}
351+
serde_json::Value::Object(map) => {
352+
if map.is_empty() {
353+
buf.push_str("{}");
354+
return Ok(());
355+
}
356+
357+
buf.push('{');
358+
buf.push('\n');
359+
360+
let keys = ordered_keys(map);
361+
for (idx, key) in keys.iter().enumerate() {
362+
let value = map
363+
.get(*key)
364+
.ok_or_else(|| internal_datafusion_err!("Missing key in object!"))?;
365+
366+
buf.push_str(&" ".repeat((indent + 1) * 2));
367+
let escaped_key = serde_json::to_string(key)
368+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
369+
buf.push_str(&escaped_key);
370+
buf.push_str(": ");
371+
write_ordered_json(value, buf, indent + 1)?;
372+
373+
if idx + 1 != keys.len() {
374+
buf.push(',');
375+
}
376+
buf.push('\n');
377+
}
378+
379+
buf.push_str(&" ".repeat(indent * 2));
380+
buf.push('}');
381+
}
382+
}
383+
384+
Ok(())
385+
}
386+
286387
impl<'a, 'b> PgJsonVisitor<'a, 'b> {
287388
pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
288389
Self {
@@ -692,12 +793,9 @@ impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> {
692793
} else {
693794
// This is the root node
694795
let plan = serde_json::json!([{"Plan": current_node}]);
695-
write!(
696-
self.f,
697-
"{}",
698-
serde_json::to_string_pretty(&plan)
699-
.map_err(|e| DataFusionError::External(Box::new(e)))?
700-
)?;
796+
let mut plan_str = String::new();
797+
write_ordered_json(&plan, &mut plan_str, 0)?;
798+
write!(self.f, "{plan_str}")?;
701799
}
702800

703801
Ok(TreeNodeRecursion::Continue)

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4593,6 +4593,30 @@ mod tests {
45934593
Ok(())
45944594
}
45954595

4596+
#[test]
4597+
fn test_display_pg_json_values_field_order() -> Result<()> {
4598+
let plan = LogicalPlanBuilder::values(vec![vec![lit(1_i64)]])?.build()?;
4599+
4600+
let output = plan.display_pg_json().to_string();
4601+
let node_type_pos = output
4602+
.find("\"Node Type\": \"Values\"")
4603+
.expect("expected Node Type key in pgjson output");
4604+
let values_pos = output
4605+
.find("\"Values\": \"(Int64(1))\"")
4606+
.expect("expected Values key in pgjson output");
4607+
let plans_pos = output
4608+
.find("\"Plans\": []")
4609+
.expect("expected Plans key in pgjson output");
4610+
let output_pos = output
4611+
.find("\"Output\": [")
4612+
.expect("expected Output key in pgjson output");
4613+
4614+
assert!(node_type_pos < values_pos);
4615+
assert!(values_pos < plans_pos);
4616+
assert!(plans_pos < output_pos);
4617+
Ok(())
4618+
}
4619+
45964620
/// Tests for the Visitor trait and walking logical plan nodes
45974621
#[derive(Debug, Default)]
45984622
struct OkVisitor {

0 commit comments

Comments
 (0)