feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO#355
feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO#355beinan wants to merge 9 commits into
Conversation
## Summary Add support for writing blob v2 columns with external URI references that are outside registered base paths. This enables use cases like INSERT INTO SELECT across Lance tables where the target table stores external blob references pointing to the source table's blob files instead of copying the actual blob bytes. ## Changes - **WriteParams.java**: Add `allowExternalBlobOutsideBases` Optional<Boolean> field, getter, and builder method - **Fragment.java**: Pass the new field through `createWithFfiArray` and `createWithFfiStream` native methods - **fragment.rs (JNI)**: Thread the new `Optional<Boolean>` parameter through all fragment creation functions to `extract_write_params` - **utils.rs (JNI)**: Parse the new parameter and set `allow_external_blob_outside_bases` on Rust `WriteParams` - **blocking_dataset.rs (JNI)**: Pass `JObject::null()` for the new param in `Dataset.write()` path (not needed there) ## Context This is a prerequisite for lance-spark blob JOIN support (lance-format/lance-spark#355). When blob data flows through Spark's shuffle during JOIN + INSERT INTO, the target table needs to write external blob references pointing to the source table's physical blob files. The Rust `BlobPreprocessor` already supports this via `allow_external_blob_outside_bases`, but the Java SDK had no way to set it. Ref: #6321, #6322 ## Test plan - [x] Rust JNI code compiles cleanly (no errors in changed files) - [ ] Java unit tests (CI) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When blob columns flow through Spark's shuffle (e.g., INSERT INTO target SELECT ... FROM source_a JOIN source_b), the actual blob data was previously lost because getBinary() returned empty byte arrays. This change introduces a blob reference mechanism: **Read side**: Instead of returning byte[0], blob columns now serialize compact ~100-byte BlobReference descriptors containing the source dataset URI, column name, and row address. These are small enough to flow through Spark shuffle without overhead. **Write side**: A new BlobResolvingLargeBinaryWriter detects blob references in incoming binary values, opens the source dataset, fetches the actual blob bytes via Dataset.takeBlobs(), and writes the real data to the target table. Key changes: - BlobReference: compact serializable blob reference format - BlobReferenceResolver: write-side resolver that fetches actual blob bytes - LanceFragmentScanner: automatically requests _rowaddr for blob columns - LanceFragmentColumnarBatchScanner: extracts row addresses, sets blob context - BlobStructAccessor: produces blob references with dataset/row context - LanceArrowWriter: BlobResolvingLargeBinaryWriter resolves refs on write Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Currently takeBlobs() is called once per row, which will be a bottleneck for large joins (e.g. 20B rows). Document the planned optimization to batch references by (datasetUri, columnName) and call takeBlobs() once per group. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove lines over 100 chars and fix malformed HTML in javadoc. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
d3a606c to
9fabe62
Compare
- Fix typo: withFragemtId -> withFragmentId - Remove silent exception swallowing in BlobResolvingLargeBinaryWriter; let blob resolution failures propagate instead of writing corrupt reference bytes to the target table Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hamersaw
left a comment
There was a problem hiding this comment.
Thanks for the PR! Should be a pretty significant help in reducing memory utilization. A few comments to iron things out, but I think I need to dive through the regular read paths to make sure this doesn't negatively affect performance there as well.
|
Diving through this a bit more. I'm not sure there's a reason to have a |
- Store blob column names as Set<String> in LanceFragmentScanner instead of creating a new HashSet per batch - Search for _rowaddr by name instead of assuming last position - Batch takeBlobs() calls in BlobReferenceResolver (batch size 64) - Merge LargeBinaryWriter and BlobResolvingLargeBinaryWriter into one writer that always resolves blob references, preventing silent corruption when copying blob->regular binary columns - Close resolver properly via finish()/reset() lifecycle - Add BlobReference round-trip serialization unit test - Verify actual blob content in integration tests, not just sizes Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Good point — collapsed BlobResolvingLargeBinaryWriter into LargeBinaryWriter so there is now a single writer that always resolves blob references via resolveIfNeeded(). This means blob references are correctly resolved regardless of whether the target column is blob-encoded or regular binary, preventing silent data corruption. |
|
@hamersaw All feedback has been addressed and CI is fully green. Ready for another look when you get a chance! |
hamersaw
left a comment
There was a problem hiding this comment.
Thanks for the iteration here, I think we're getting close!
| for (int i = 0; i < rowCount; i++) { | ||
| rowAddresses[i] = rowAddrVector.getObjectNoOverflow(i).longValue(); | ||
| } |
There was a problem hiding this comment.
Per-row BigInteger allocation extracting row addresses: extractRowAddresses() runs once per Arrow batch and loops over every row calling rowAddrVector.getObjectNoOverflow(i), which returns a freshly allocated BigInteger that is immediately reduced to a long. For a multi-million-row scan with a blob column this is millions of throwaway BigInteger allocations purely to read a 64-bit value, adding avoidable GC pressure on the scan hot path.
Fix: Use UInt8Vector.get(i), which returns a primitive long directly, instead of getObjectNoOverflow(i).longValue().
There was a problem hiding this comment.
Fixed — now uses UInt8Vector.get(i) which returns a primitive long directly, avoiding per-row BigInteger allocation.
| if (group.size() >= batchSize) { | ||
| flushGroup(groupKey, group); | ||
| } | ||
|
|
||
| if (pending.resolved == null) { | ||
| flushGroup(groupKey, pendingBlobs.remove(groupKey)); | ||
| } | ||
|
|
||
| return pending.resolved; | ||
| } |
There was a problem hiding this comment.
Blob resolve batching is defeated — one takeBlobs() round-trip per row: resolve() adds the pending blob to its group, and only batch-flushes when group.size() >= batchSize (64). But the very next statement if (pending.resolved == null) flushes immediately whenever the batch threshold was NOT hit — which is every call in steady state, because flushGroup() also removes the group from pendingBlobs so it never accumulates. resolveIfNeeded()/resolve() must return bytes synchronously to LargeBinaryWriter.setValue(), so deferral is impossible by construction. Net effect: dataset.takeBlobs() is invoked with exactly one row address per blob row instead of once per 64. On a JOIN+INSERT INTO carrying millions of blob cells through shuffle, this is ~64x the JNI crossings and object-store read round-trips the batching was designed to eliminate, making the feature impractical at scale.
Fix: Make resolution genuinely deferred: have getBinary()/the writer accumulate references and resolve them in true batches (e.g. a two-pass write, or buffer references and back-fill the vector after a single batched takeBlobs per group), rather than resolving synchronously per row.
There was a problem hiding this comment.
Completely reworked — the writer now does genuine two-pass batched resolution. setValue() buffers blob references with empty placeholders in the vector. finish() groups all pending references by (datasetUri, columnName) and calls takeBlobs() once per group, then back-fills the vector with resolved bytes. This reduces JNI+I/O from O(N) to O(groups).
| if (bytes == null || bytes.length == 0) { | ||
| valueVector.setSafe(count, bytes) | ||
| } else { | ||
| val resolved = resolver.resolveIfNeeded(bytes) | ||
| valueVector.setSafe(count, resolved) | ||
| } |
There was a problem hiding this comment.
Every non-empty large-binary value written to a blob column is now passed through resolver.resolveIfNeeded(), which calls BlobReference.isBlobReference() — a 4-byte magic-header ('L','B','R','F') sniff plus a >=17 byte length check. Legitimate user-supplied blob bytes whose first four bytes happen to be 0x4C424652 and whose length is >=17 (e.g. an arbitrary file/binary payload) will be misclassified as a serialized BlobReference. deserialize() then either throws IllegalArgumentException (unknown version byte) — failing the INSERT with a RuntimeException — or, on a coincidental version match, parses bogus length-prefixed strings and a garbage dataset URI, causing a takeBlobs() failure or silently wrong data being written. This affects first-time user binary data on a plain INSERT into a blob column, not reference round-trips.
Fix: Disambiguate references from real payloads instead of content sniffing — e.g. carry an out-of-band flag/column indicating the value is a reference, or make the magic long/structured enough (and validate full structure) that collision with real data is negligible, and treat a failed parse as 'not a reference' rather than throwing.
There was a problem hiding this comment.
Addressed in two ways: (1) Extended magic from 4 bytes (LBRF) to 8 bytes (LANCEREF), making random collision probability ~1 in 2^64. (2) isBlobReference() now does full structural validation — checks version byte, verifies both length-prefixed strings fit within the total byte count, and confirms the remaining bytes exactly equal colLen + 8 (the row address). Random binary data that happens to start with LANCEREF will fail the structural check.
| override def reset(): Unit = { | ||
| super.reset() | ||
| resolver.close() |
There was a problem hiding this comment.
reset() closes resolver, discarding the source-dataset cache: LargeBinaryWriter.reset() calls resolver.close(), which closes and clears BlobReferenceResolver.datasetCache — the cache whose stated purpose is 'avoid re-opening the same dataset for every row'. Arrow field writers are reset between batches, so the source Lance dataset (manifest + metadata I/O, JNI open) is re-opened on the first blob of every subsequent batch instead of once per task. Combined with the broken batching above, the write path repeatedly re-opens the source dataset and issues per-row blob fetches.
Fix: Release the resolver (and its dataset cache) once at writer finish/close, not on every reset(); reset() should only flush pending state, keeping cached datasets open for the task lifetime.
There was a problem hiding this comment.
Fixed — reset() now only clears the pending blob reference buffer. The resolver (and its dataset cache) lives for the lifetime of the writer and is never closed/recreated between batches. Datasets opened on the first batch stay cached for all subsequent batches in the same task.
- Use UInt8Vector.get(i) instead of getObjectNoOverflow(i).longValue() to avoid per-row BigInteger allocation on the scan hot path - Extend magic header from 4 bytes (LBRF) to 8 bytes (LANCEREF) and add full structural validation in isBlobReference() to prevent false positives from user binary data - Implement genuine two-pass batched blob resolution: setValue() buffers blob references with empty placeholders, finish() resolves all references in a single takeBlobs() call per (dataset, column) group and back-fills the vector - Keep dataset cache alive across batches (reset() only clears pending state, close() releases datasets at task end) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Use parallel List<Integer> and List<BlobReference> instead of Scala tuples to avoid Java generics invariance issues. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
INSERT INTO target SELECT ... FROM source_a JOIN source_b), the actual blob data was previously lost becausegetBinary()returned empty byte arrays (new byte[0]). The target table ended up with zero-length blobs.How it works
Read side — compact blob references flow through shuffle:
byte[0], blob columns now serialize compact ~100-byteBlobReferencedescriptors containing the source dataset URI, column name, and row address_rowaddrwhen blob columns are present, and strips it from the output after extracting the row addressesWrite side — blob references are resolved to actual bytes:
BlobResolvingLargeBinaryWriterdetectsBlobReferencemagic headers in incoming binary valuesDataset.takeBlobs()to fetch the actual blob bytesKey files
BlobReference.javaLBRFmagic + dataset URI + column name + row address)BlobReferenceResolver.javatakeBlobs()→ returns actual bytesBaseBlobJoinTest.javaBlobJoinTest.java(3.4 + 3.5)BlobStructAccessor.javagetBlobReference()LanceArrowColumnVector.javagetBinary()returns blob reference instead ofbyte[0]LanceFragmentScanner.java_rowaddr, exposes dataset URI and blob column namesLanceFragmentColumnarBatchScanner.java_rowaddr, sets blob context onBlobStructAccessor, strips implicit_rowaddrLanceArrowWriter.scalaBlobResolvingLargeBinaryWriterfor blob-encoded columnsBaseBlobCreateTableTest.javaTest plan
BlobJoinTest.testBlobPreservedDuringInsertIntoSelect— SimpleINSERT INTO target SELECT FROM sourcepreserves blob dataBlobJoinTest.testBlobPreservedDuringJoinAndInsert—JOINof two blob tables preserves both blob columns in targetBlobJoinTest.testNonBlobColumnsPreservedDuringJoinWithBlobs— Non-blob columns survive JOIN+INSERT alongside blob columnsBlobCreateTableTesttests pass (9/9)🤖 Generated with Claude Code