Skip to content

Commit 52610a8

Browse files
authored
chore: Document Comet runtime data debug (#4235)
1 parent 1f9fa62 commit 52610a8

5 files changed

Lines changed: 296 additions & 24 deletions

File tree

docs/source/contributor-guide/debugging.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,3 +220,73 @@ Example log output:
220220
```
221221

222222
When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations.
223+
224+
### Dumping native stream output with the `debug` module
225+
226+
The `native/core/src/debug/` module ships small wrappers that print every
227+
`RecordBatch` (and, at expression granularity, every `ColumnarValue`) flowing
228+
through a node. They are gated behind `#[cfg(debug_assertions)]` in
229+
`native/core/src/lib.rs`, so they exist in every `cargo check` / `cargo test`
230+
/ `make core` build but are stripped from `make release`. CI builds them on
231+
every run, so the wrappers cannot bitrot.
232+
233+
Two types are exposed:
234+
235+
- `DebugExecutionDataStream` — wraps an `Arc<dyn ExecutionPlan>`, prints each batch it
236+
produces.
237+
- `DebugExecutionDataPhyExpr` — wraps an `Arc<dyn PhysicalExpr>`, prints the input
238+
batch and output `ColumnarValue` for every `evaluate()` call.
239+
240+
Output is `eprintln!` + `dbg!`, i.e. plain stderr of the executor process — the
241+
same place `rust-gdb` / `dbg!` output lands. It is not routed through `log4rs`,
242+
so `spark.comet.debug.enabled` and `COMET_LOG_LEVEL` do not affect it.
243+
244+
#### Example: dumping window operator output
245+
246+
In `PhysicalPlanner::create_plan` in `native/core/src/execution/planner.rs`,
247+
the `OpStruct::Window` arm builds a `BoundedWindowAggExec` and hands it to
248+
`SparkPlan::new`. Wrap the node between those two steps:
249+
250+
```rust
251+
let window_agg = Arc::new(BoundedWindowAggExec::try_new(
252+
window_expr?,
253+
Arc::clone(&child.native_plan),
254+
InputOrderMode::Sorted,
255+
!partition_exprs.is_empty(),
256+
)?);
257+
258+
#[cfg(debug_assertions)]
259+
let window_agg: Arc<dyn datafusion::physical_plan::ExecutionPlan> = {
260+
use crate::debug::DebugExecutionDataStream;
261+
Arc::new(DebugExecutionDataStream::new("window-output", window_agg))
262+
};
263+
264+
Ok((
265+
scans,
266+
shuffle_scans,
267+
Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg, vec![child])),
268+
))
269+
```
270+
271+
The `let` rebinding keeps the original `window_agg` typed concretely on release
272+
builds (where the `cfg` block is compiled out) and re-binds it to
273+
`Arc<dyn ExecutionPlan>` when the debug module is active. No annotation is
274+
needed on the first `let` because release builds use the original binding
275+
directly.
276+
277+
Rebuild with `make core` and run a window test, for example:
278+
279+
```sh
280+
./mvnw test -Dsuites="org.apache.comet.exec.CometWindowExecSuite window query with rangeBetween" -Dtest=none
281+
```
282+
283+
Sample stderr excerpt (abbreviated):
284+
285+
```text
286+
[comet-debug] DebugExecutionDataStream[window-output] execute(partition=0)
287+
[core/src/debug/debug_batch_stream.rs:30] batch = RecordBatch { columns: [...], row_count: 4 }
288+
[core/src/debug/debug_batch_stream.rs:37] col_idx = 0
289+
[core/src/debug/debug_batch_stream.rs:37] column = PrimitiveArray<Int32> [1, 1, 2, 2]
290+
[core/src/debug/debug_batch_stream.rs:37] column.nulls() = None
291+
...
292+
```
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::fmt;
20+
use std::hash::{Hash, Hasher};
21+
use std::sync::Arc;
22+
23+
use arrow::array::RecordBatch;
24+
use arrow::datatypes::{DataType, Schema};
25+
use datafusion::common::Result;
26+
use datafusion::execution::SendableRecordBatchStream;
27+
use datafusion::logical_expr::ColumnarValue;
28+
use datafusion::physical_expr::PhysicalExpr;
29+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
30+
31+
/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through.
32+
pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
33+
use futures::StreamExt;
34+
let schema = stream.schema();
35+
let printing_stream = stream.map(|batch_result| {
36+
match &batch_result {
37+
Ok(batch) => {
38+
dbg!(batch, batch.schema());
39+
for (col_idx, column) in batch.columns().iter().enumerate() {
40+
dbg!(col_idx, column, column.nulls());
41+
}
42+
}
43+
Err(e) => {
44+
println!("batch error: {:?}", e);
45+
}
46+
}
47+
batch_result
48+
});
49+
Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream))
50+
}
51+
52+
/// `ExecutionPlan` wrapper that prints every batch produced by `inner`.
53+
#[derive(Debug)]
54+
pub struct DebugExecutionDataStream {
55+
label: String,
56+
inner: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
57+
}
58+
59+
impl DebugExecutionDataStream {
60+
pub fn new(
61+
label: impl Into<String>,
62+
inner: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
63+
) -> Self {
64+
Self {
65+
label: label.into(),
66+
inner,
67+
}
68+
}
69+
}
70+
71+
impl datafusion::physical_plan::DisplayAs for DebugExecutionDataStream {
72+
fn fmt_as(
73+
&self,
74+
_t: datafusion::physical_plan::DisplayFormatType,
75+
f: &mut std::fmt::Formatter,
76+
) -> std::fmt::Result {
77+
write!(f, "DebugExecutionDataStream[{}]", self.label)
78+
}
79+
}
80+
81+
impl datafusion::physical_plan::ExecutionPlan for DebugExecutionDataStream {
82+
fn name(&self) -> &str {
83+
"DebugExecutionDataStream"
84+
}
85+
fn as_any(&self) -> &dyn std::any::Any {
86+
self
87+
}
88+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
89+
self.inner.properties()
90+
}
91+
fn children(&self) -> Vec<&Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
92+
vec![&self.inner]
93+
}
94+
fn with_new_children(
95+
self: Arc<Self>,
96+
children: Vec<Arc<dyn datafusion::physical_plan::ExecutionPlan>>,
97+
) -> datafusion::common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
98+
Ok(Arc::new(DebugExecutionDataStream::new(
99+
self.label.clone(),
100+
Arc::clone(&children[0]),
101+
)))
102+
}
103+
fn execute(
104+
&self,
105+
partition: usize,
106+
context: Arc<datafusion::execution::TaskContext>,
107+
) -> datafusion::common::Result<SendableRecordBatchStream> {
108+
eprintln!(
109+
"[comet-debug] DebugExecutionDataStream[{}] execute(partition={})",
110+
self.label, partition
111+
);
112+
let stream = self.inner.execute(partition, context)?;
113+
Ok(dbg_batch_stream(stream))
114+
}
115+
}
116+
117+
/// `PhysicalExpr` wrapper that prints every `evaluate()` call: input
118+
/// `RecordBatch` and the resulting `ColumnarValue`.
119+
#[derive(Debug)]
120+
pub struct DebugExecutionDataPhyExpr {
121+
label: String,
122+
inner: Arc<dyn PhysicalExpr>,
123+
}
124+
125+
impl DebugExecutionDataPhyExpr {
126+
pub fn new(label: impl Into<String>, inner: Arc<dyn PhysicalExpr>) -> Self {
127+
Self {
128+
label: label.into(),
129+
inner,
130+
}
131+
}
132+
}
133+
134+
impl fmt::Display for DebugExecutionDataPhyExpr {
135+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136+
write!(
137+
f,
138+
"DebugExecutionDataPhyExpr[{}]({})",
139+
self.label, self.inner
140+
)
141+
}
142+
}
143+
144+
impl PartialEq for DebugExecutionDataPhyExpr {
145+
fn eq(&self, other: &Self) -> bool {
146+
self.label == other.label && self.inner.eq(&other.inner)
147+
}
148+
}
149+
impl Eq for DebugExecutionDataPhyExpr {}
150+
impl Hash for DebugExecutionDataPhyExpr {
151+
fn hash<H: Hasher>(&self, state: &mut H) {
152+
self.label.hash(state);
153+
self.inner.hash(state);
154+
}
155+
}
156+
157+
impl PhysicalExpr for DebugExecutionDataPhyExpr {
158+
fn as_any(&self) -> &dyn Any {
159+
self
160+
}
161+
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
162+
self.inner.data_type(input_schema)
163+
}
164+
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
165+
self.inner.nullable(input_schema)
166+
}
167+
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
168+
eprintln!(
169+
"[comet-debug] DebugExecutionDataPhyExpr[{}].evaluate(rows={}, cols={})",
170+
self.label,
171+
batch.num_rows(),
172+
batch.num_columns()
173+
);
174+
dbg!(batch, batch.schema());
175+
let out = self.inner.evaluate(batch)?;
176+
match &out {
177+
ColumnarValue::Array(arr) => {
178+
dbg!(arr.len(), arr.nulls(), arr);
179+
}
180+
ColumnarValue::Scalar(s) => {
181+
dbg!(s);
182+
}
183+
}
184+
Ok(out)
185+
}
186+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
187+
vec![&self.inner]
188+
}
189+
fn with_new_children(
190+
self: Arc<Self>,
191+
children: Vec<Arc<dyn PhysicalExpr>>,
192+
) -> Result<Arc<dyn PhysicalExpr>> {
193+
Ok(Arc::new(DebugExecutionDataPhyExpr::new(
194+
self.label.clone(),
195+
Arc::clone(&children[0]),
196+
)))
197+
}
198+
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199+
self.inner.fmt_sql(f)
200+
}
201+
}

native/core/src/debug/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub mod debug_batch_stream;
19+
20+
pub use debug_batch_stream::{
21+
dbg_batch_stream, DebugExecutionDataPhyExpr, DebugExecutionDataStream,
22+
};

native/core/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ use errors::{try_unwrap_or_throw, CometError, CometResult};
6969
pub mod common;
7070
pub mod execution;
7171
pub mod parquet;
72+
// this module is for non release only. Intended for debugging/profiling purposes
73+
#[cfg(debug_assertions)]
74+
pub mod debug;
7275

7376
#[cfg(all(
7477
not(target_env = "msvc"),

native/core/src/parquet/parquet_exec.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@ use datafusion::datasource::physical_plan::{
2828
};
2929
use datafusion::datasource::source::DataSourceExec;
3030
use datafusion::execution::object_store::ObjectStoreUrl;
31-
use datafusion::execution::SendableRecordBatchStream;
3231
use datafusion::physical_expr::expressions::{BinaryExpr, Column};
3332
use datafusion::physical_expr::PhysicalExpr;
3433
use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory;
35-
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3634
use datafusion::prelude::SessionContext;
3735
use datafusion::scalar::ScalarValue;
3836
use datafusion_comet_spark_expr::EvalMode;
@@ -217,25 +215,3 @@ fn get_options(
217215

218216
(table_parquet_options, spark_parquet_options)
219217
}
220-
221-
/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through.
222-
/// Returns a new `SendableRecordBatchStream` that yields the same batches.
223-
pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
224-
use futures::StreamExt;
225-
let schema = stream.schema();
226-
let printing_stream = stream.map(|batch_result| {
227-
match &batch_result {
228-
Ok(batch) => {
229-
dbg!(batch, batch.schema());
230-
for (col_idx, column) in batch.columns().iter().enumerate() {
231-
dbg!(col_idx, column, column.nulls());
232-
}
233-
}
234-
Err(e) => {
235-
println!("batch error: {:?}", e);
236-
}
237-
}
238-
batch_result
239-
});
240-
Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream))
241-
}

0 commit comments

Comments
 (0)