Skip to content

Commit d1e1480

Browse files
authored
test: relax bytesRead ratio assertion for Spark 4.1+ (#4197)
1 parent da187f2 commit d1e1480

2 files changed

Lines changed: 48 additions & 15 deletions

File tree

docs/source/user-guide/latest/metrics.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,26 @@ Here is a guide to some of the native metrics.
6464
| `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. |
6565
| `mempool_time` | Time interacting with memory pool. |
6666
| `write_time` | Time spent writing bytes to disk. |
67+
68+
## Task-Level Input Metrics on Spark 4.1+
69+
70+
Comet's native scans set `inputMetrics.bytesRead` to the actual file IO performed by the
71+
DataFusion parquet reader (`bytes_scanned`). This is the truthful number you would see at the
72+
filesystem layer.
73+
74+
Spark 4.1 changed its own parquet reader to pre-open the `SeekableInputStream` and read the file
75+
footer outside the `FileScanRDD.compute()` thread. Spark's `inputMetrics.bytesRead` is updated
76+
from a Hadoop FileSystem thread-local byte counter that only captures reads on the
77+
`compute()` thread, so reads serviced by the pre-opened stream's internal buffer go uncounted.
78+
The under-count is largest when the file fits in the pre-fetched buffer (tiny files, unit test
79+
sizes) and shrinks as files grow large enough that subsequent row-group reads cross the buffer
80+
and trigger fresh FS reads on the `compute()` thread.
81+
82+
This is purely an observability difference: `inputMetrics.bytesRead` is reported to listeners
83+
and the Spark UI but is not consumed by the planner, the optimizer, or AQE, so the discrepancy
84+
does not affect query plans, partitioning, or correctness. Records read (`recordsRead`) is
85+
unaffected and remains exactly equal between Comet and Spark.
86+
87+
If you compare Comet's `bytesRead` against vanilla Spark's on Spark 4.1+ (via the Spark UI or
88+
the REST API), expect Comet's number to be substantially larger for small files, and closer to
89+
Spark's for large files. Comet's value reflects what the storage layer actually delivered.

spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
192192
}
193193

194194
test("native_datafusion scan reports task-level input metrics matching Spark") {
195-
assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098")
196195
val totalRows = 10000
197196
withTempPath { dir =>
198197
spark
@@ -230,15 +229,11 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
230229
// (e.g. footer reads, page headers, buffering granularity).
231230
assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes")
232231
assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes")
233-
val ratio = cometBytes.toDouble / sparkBytes.toDouble
234-
assert(
235-
ratio >= 0.7 && ratio <= 1.3,
236-
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
232+
assertCometBytesReadInRange(cometBytes, sparkBytes)
237233
}
238234
}
239235

240236
test("input metrics aggregate across multiple native scans in a join") {
241-
assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098")
242237
withTempPath { dir1 =>
243238
withTempPath { dir2 =>
244239
// Create two separate parquet tables
@@ -283,16 +278,34 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
283278
assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords")
284279

285280
// Both sides should contribute to the total bytes
286-
val ratio = cometBytes.toDouble / sparkBytes.toDouble
287-
assert(
288-
ratio >= 0.7 && ratio <= 1.3,
289-
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
281+
assertCometBytesReadInRange(cometBytes, sparkBytes)
290282
}
291283
}
292284
}
293285

286+
/**
287+
* Compare Comet's `bytesRead` against Spark's baseline. On Spark <= 4.0 the two readers report
288+
* the same Hadoop-FS thread-local byte count, so we keep a tight 0.7-1.3 band. Spark 4.1
289+
* pre-opens the parquet `SeekableInputStream` outside the FileScanRDD `compute()` thread, so
290+
* its `getFSBytesReadOnThreadCallback` no longer captures most of the parquet IO and
291+
* `inputMetrics.bytesRead` is now a small fraction of the actual file IO. Comet (via
292+
* DataFusion's `bytes_scanned`) still reports actual bytes, so the only safe cross-version
293+
* invariant on 4.1+ is that Comet >= Spark and both are positive.
294+
*/
295+
private def assertCometBytesReadInRange(cometBytes: Long, sparkBytes: Long): Unit = {
296+
if (isSpark41Plus) {
297+
assert(
298+
cometBytes >= sparkBytes,
299+
s"Comet bytesRead should be >= Spark bytesRead on 4.1+: comet=$cometBytes, spark=$sparkBytes")
300+
} else {
301+
val ratio = cometBytes.toDouble / sparkBytes.toDouble
302+
assert(
303+
ratio >= 0.7 && ratio <= 1.3,
304+
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
305+
}
306+
}
307+
294308
test("input metrics aggregate across multiple native scans in a union") {
295-
assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098")
296309
withTempPath { dir1 =>
297310
withTempPath { dir2 =>
298311
spark
@@ -335,10 +348,7 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
335348
assert(sparkRecords > 0, s"Spark recordsRead should be > 0, got $sparkRecords")
336349
assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords")
337350

338-
val ratio = cometBytes.toDouble / sparkBytes.toDouble
339-
assert(
340-
ratio >= 0.7 && ratio <= 1.3,
341-
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
351+
assertCometBytesReadInRange(cometBytes, sparkBytes)
342352
}
343353
}
344354
}

0 commit comments

Comments
 (0)