Skip to content

Commit 75d49e6

Browse files
hamersawclaude
andcommitted
fix: address third round of reviewer feedback
- BlobReference.writeString now rejects strings longer than 65535 UTF-8 bytes instead of silently truncating the 2-byte length prefix, which would produce a wrong-but-decodable reference. - Scope BlobReferenceResolver per finish() call with explicit close so cached Datasets release JNI handles each batch instead of leaking for the writer's lifetime. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 439cc44 commit 75d49e6

2 files changed

Lines changed: 14 additions & 4 deletions

File tree

lance-spark-base_2.12/src/main/java/org/lance/spark/utils/BlobReference.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ public static BlobReference deserialize(byte[] bytes) {
133133

134134
private static void writeString(DataOutputStream out, String s) throws IOException {
135135
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
136+
// The wire format uses a 2-byte unsigned length prefix; reject strings that would
137+
// overflow it rather than silently truncating to 16 bits and corrupting the payload.
138+
if (bytes.length > 0xFFFF) {
139+
throw new IOException(
140+
"BlobReference string exceeds maximum length of " + 0xFFFF + " bytes: " + bytes.length);
141+
}
136142
out.writeShort(bytes.length);
137143
out.write(bytes);
138144
}

lance-spark-base_2.12/src/main/scala/org/lance/spark/arrow/LanceArrowWriter.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,6 @@ private[arrow] class LargeBinaryWriter(val valueVector: LargeVarBinaryVector)
317317
private val pendingIndices = new java.util.ArrayList[java.lang.Integer]()
318318
private val pendingRefs = new java.util.ArrayList[org.lance.spark.utils.BlobReference]()
319319

320-
@transient private lazy val resolver = new org.lance.spark.utils.BlobReferenceResolver()
321-
322320
override def setNull(): Unit = {}
323321
override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
324322
val bytes = input.getBinary(ordinal)
@@ -337,14 +335,20 @@ private[arrow] class LargeBinaryWriter(val valueVector: LargeVarBinaryVector)
337335
override def finish(): Unit = {
338336
super.finish()
339337
if (!pendingRefs.isEmpty) {
338+
// Scope the resolver per-batch so the source Datasets it opens are released
339+
// (via AutoCloseable) instead of leaking JNI handles for the writer's lifetime.
340+
val resolver = new org.lance.spark.utils.BlobReferenceResolver()
340341
try {
341342
resolver.resolveBatch(pendingIndices, pendingRefs, valueVector)
342343
} catch {
343344
case e: java.io.IOException =>
344345
throw new RuntimeException("Failed to resolve blob references", e)
345346
} finally {
346-
pendingIndices.clear()
347-
pendingRefs.clear()
347+
try resolver.close()
348+
finally {
349+
pendingIndices.clear()
350+
pendingRefs.clear()
351+
}
348352
}
349353
}
350354
}

0 commit comments

Comments
 (0)