Skip to content

Commit 7bfd45c

Browse files
authored
[VL] Stop using Input.available() to probe trailing markers in CachedColumnarBatch (#12147)
ColumnarCachedBatchSerializer.read guarded the trailing hasStats / hasSchema booleans with `input.available() > 0` to tolerate the V1 wire format that predates those markers. The intent was correct -- the existing ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks absent-trailing as silent null, and that contract must be preserved -- but `Input.available()` is the wrong probe. `Kryo Input.available()` returns `(limit - position) + underlyingStream.available()`, and the JDK `InputStream.available()` contract permits any implementation to return 0 even when more data follows -- BufferedInputStream over shuffle-spill / network chunk boundaries routinely does so. When the Kryo buffer is drained AND the underlying stream reports 0 at the trailing-boolean byte position, the probe falsely concludes EOF, skips hasStats, and the next `readClassAndObject` interprets the stats payload (which contains the schema JSON) as a class name -- surfacing as `ClassNotFoundException: {"type":"struct",...}` with the stack topped by `DefaultClassResolver.readName`. Replace the probe with a try/readBoolean/catch on the narrow Kryo "Buffer underflow" surface. This catches the real EOF when the V1 wire has no trailing booleans (preserves the silent-null contract) without ever consulting `available()`, so a V2 wire under chunked-fill always reads the trailing markers correctly. The catch is intentionally narrow (message-prefix match on "Buffer underflow") so that genuine corruption -- including ClassNotFoundException wrapped during readClassAndObject -- is never swallowed. The length-bound `require(... <= maxLen ...)` guard from commit 491070b (defending against NegativeArraySizeException / oversized allocation) is preserved -- that part is orthogonal to the V1 probe and remains useful. A new test ColumnarCachedBatchKryoBoundaryProbeBugSuite locks the chunked-fill probe contract: a 1-byte-per-read InputStream that returns `available() == 0` must still round-trip multi-batch V2 wire correctly. The existing V1-wire silent-null test in ColumnarCachedBatchKryoSuite continues to pass unchanged.
1 parent 9bc830e commit 7bfd45c

2 files changed

Lines changed: 150 additions & 11 deletions

File tree

backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
4444
import org.apache.spark.storage.StorageLevel
4545
import org.apache.spark.unsafe.types.UTF8String
4646

47-
import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
47+
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoSerializer}
4848
import com.esotericsoftware.kryo.DefaultSerializer
4949
import com.esotericsoftware.kryo.io.{Input, Output}
5050
import org.apache.arrow.c.ArrowSchema
@@ -152,14 +152,30 @@ class CachedColumnarBatchKryoSerializer extends KryoSerializer[CachedColumnarBat
152152
)
153153
val bytes = new Array[Byte](payloadLen)
154154
input.readBytes(bytes)
155-
// Backward-compat with the V1 wire format (no trailing hasStats / hasSchema booleans):
156-
// legacy CachedColumnarBatch instances persisted on disk (DISK_ONLY / MEMORY_AND_DISK)
157-
// surviving a rolling upgrade lack these fields. available() is best-effort -- treats
158-
// unavailable suffix as "absent" instead of throwing KryoException.
159-
val hasStats = input.available() > 0 && input.readBoolean()
160-
// Even when hasStats=false we still consume the hasSchema tag to keep the stream aligned.
161-
// NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics and the typed
162-
// pattern match throws MatchError at runtime.
155+
// Read the trailing hasStats marker. Catching a Buffer-underflow KryoException
156+
// here preserves backward compatibility with the V1 wire format (no trailing
157+
// hasStats / hasSchema booleans), which the existing
158+
// ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks as a contract:
159+
// an absent trailing byte must read as null, not throw.
160+
//
161+
// Why a try/catch instead of `input.available() > 0 && readBoolean`:
162+
// Kryo `Input.available()` returns `(limit - position) + underlyingStream.available()`,
163+
// and the JDK `InputStream.available()` contract permits any implementation to
164+
// return 0 even when more data follows -- BufferedInputStream over shuffle-spill
165+
// / network chunk boundaries routinely does so. When the Kryo buffer is drained
166+
// AND the underlying stream reports 0 at the trailing-boolean byte position, the
167+
// probe falsely concludes EOF, skips hasStats, and the next readClassAndObject
168+
// interprets the stats payload (which contains the schema JSON) as a class name --
169+
// surfacing as `ClassNotFoundException: {"type":"struct",...}` with the stack
170+
// topped by `DefaultClassResolver.readName`. A try/catch on the real EOF surface
171+
// (Kryo "Buffer underflow") avoids the false-EOF probe while still tolerating
172+
// V1 wire.
173+
//
174+
// NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics and the
175+
// typed pattern match throws MatchError at runtime.
176+
val hasStats =
177+
try input.readBoolean()
178+
catch { case e: KryoException if isBufferUnderflow(e) => false }
163179
val statsAndSchema: (InternalRow, StructType) = if (hasStats) {
164180
val statsLen = input.readInt()
165181
require(
@@ -177,9 +193,21 @@ class CachedColumnarBatchKryoSerializer extends KryoSerializer[CachedColumnarBat
177193
CachedColumnarBatch(numRows, sizeInBytes, bytes, statsAndSchema._1, statsAndSchema._2)
178194
}
179195

196+
// Kryo signals end-of-input by throwing KryoException with a message starting
197+
// with "Buffer underflow". There is no dedicated subclass, so a message-prefix
198+
// check is the narrowest filter we can apply without swallowing real corruption
199+
// (e.g. ClassNotFoundException wrapped during readClassAndObject).
200+
private def isBufferUnderflow(e: KryoException): Boolean = {
201+
val msg = e.getMessage
202+
msg != null && msg.startsWith("Buffer underflow")
203+
}
204+
180205
private def readOptionalSchema(input: Input, maxLen: Long): StructType = {
181-
// Treat absent trailing bytes as "no schema": V1 wire format predates this field.
182-
if (input.available() <= 0 || !input.readBoolean()) {
206+
// Trailing schema marker. See readSchema above for the same V1-vs-chunked-fill rationale.
207+
val hasSchema =
208+
try input.readBoolean()
209+
catch { case e: KryoException if isBufferUnderflow(e) => false }
210+
if (!hasSchema) {
183211
null
184212
} else {
185213
val schemaLen = input.readInt()
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution
18+
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
21+
import org.apache.spark.sql.types.{LongType, StructField, StructType}
22+
23+
import com.esotericsoftware.kryo.Kryo
24+
import com.esotericsoftware.kryo.io.{Input, Output}
25+
import org.scalatest.funsuite.AnyFunSuite
26+
27+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
28+
29+
/**
30+
* Deterministic repro for the L154/L180 Input.available() boolean-probe bug.
31+
*
32+
* Trigger conditions (all required):
33+
* (1) Multi-batch deserialize via kryo.readClassAndObject from one stream.
34+
* (2) Kryo Input wraps an InputStream (not byte[]).
35+
* (3) At a batch's trailing hasStats/hasSchema position, the underlying
36+
* InputStream returns available()=0 AND the Kryo Input buffer is drained
37+
* (limit==position). Both conditions must hit the SAME byte position.
38+
*
39+
* Real prod path observed in production:
40+
* BufferedInputStream over shuffle-spill / network ManagedBuffer chunk
41+
* boundary -> stream.available()=0 between chunks, Kryo Input.available()
42+
* = (limit-pos) + 0 -> reads 0 when buffer drained.
43+
*
44+
* Fixture: 1-byte-per-read stream + lying available()=0 -> every byte boundary
45+
* satisfies (3); any trailing-boolean byte aligned with a Kryo refill triggers
46+
* the false-EOF.
47+
*/
48+
class ColumnarCachedBatchKryoBoundaryProbeBugSuite extends AnyFunSuite {
49+
50+
final private class LyingOneByteStream(src: InputStream) extends InputStream {
51+
override def read(): Int = src.read()
52+
override def read(b: Array[Byte], off: Int, len: Int): Int = {
53+
if (len == 0) 0
54+
else {
55+
val c = src.read()
56+
if (c == -1) -1
57+
else {
58+
b(off) = c.toByte
59+
1
60+
}
61+
}
62+
}
63+
override def available(): Int = 0
64+
}
65+
66+
private def mkBatch(i: Int): CachedColumnarBatch = {
67+
// PartitionStatistics per-column slots:
68+
// [lower(typed) upper(typed) count(Int) nullCount(Int) sizeBytes(Long)]
69+
val stats: InternalRow =
70+
new GenericInternalRow(Array[Any](i.toLong, (i * 10).toLong, i, 0, 8L))
71+
val schema = StructType(Seq(StructField(s"col$i", LongType, nullable = true)))
72+
val bytes = Array.fill[Byte](128)(i.toByte)
73+
CachedColumnarBatch(
74+
numRows = i,
75+
sizeInBytes = bytes.length.toLong,
76+
bytes = bytes,
77+
stats = stats,
78+
schema = schema)
79+
}
80+
81+
test("multi-batch deserialize survives boundary-aligned trailing-boolean probe") {
82+
val kryo = new Kryo()
83+
val ser = new CachedColumnarBatchKryoSerializer()
84+
kryo.register(classOf[CachedColumnarBatch], ser)
85+
86+
val baos = new ByteArrayOutputStream()
87+
val out = new Output(baos)
88+
val originals = (1 to 10).map(mkBatch)
89+
originals.foreach(b => kryo.writeClassAndObject(out, b))
90+
out.close()
91+
92+
val raw = baos.toByteArray
93+
val in = new Input(new LyingOneByteStream(new ByteArrayInputStream(raw)), 32)
94+
95+
val read = (1 to 10).map(_ => kryo.readClassAndObject(in).asInstanceOf[CachedColumnarBatch])
96+
in.close()
97+
98+
originals.zip(read).zipWithIndex.foreach {
99+
case ((o, r), i) =>
100+
info(s"batch $i: orig.stats=${o.stats != null} schema=${o.schema}")
101+
info(s"batch $i: read.stats=${r.stats != null} schema=${r.schema}")
102+
assert(r.numRows == o.numRows, s"batch $i numRows mismatch")
103+
assert(r.bytes.toSeq == o.bytes.toSeq, s"batch $i bytes mismatch")
104+
assert(r.stats != null, s"batch $i stats lost (BUG)")
105+
assert(r.schema == o.schema, s"batch $i schema mismatch (BUG)")
106+
}
107+
}
108+
109+
// V1 wire backward-compat is locked by ColumnarCachedBatchKryoSuite#"V1 wire ..."
110+
// -- not duplicated here. This suite only covers the chunked-fill probe path.
111+
}

0 commit comments

Comments
 (0)