Skip to content

feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO#355

Open
beinan wants to merge 9 commits into
lance-format:mainfrom
beinan:feat/blob-join-preservation
Open

feat: preserve blob data through Spark shuffle during JOIN + INSERT INTO#355
beinan wants to merge 9 commits into
lance-format:mainfrom
beinan:feat/blob-join-preservation

Conversation

@beinan
Copy link
Copy Markdown
Contributor

@beinan beinan commented Mar 27, 2026

Summary

  • When blob columns flow through Spark's shuffle (e.g., INSERT INTO target SELECT ... FROM source_a JOIN source_b), the actual blob data was previously lost because getBinary() returned empty byte arrays (new byte[0]). The target table ended up with zero-length blobs.
  • This PR introduces a blob reference mechanism that preserves blob data through Spark's shuffle without materializing the full blob bytes (which could be MBs/GBs per row and would kill shuffle performance).

How it works

Read side — compact blob references flow through shuffle:

  • Instead of returning byte[0], blob columns now serialize compact ~100-byte BlobReference descriptors containing the source dataset URI, column name, and row address
  • The scanner automatically requests _rowaddr when blob columns are present, and strips it from the output after extracting the row addresses
  • These references are small enough to flow through Spark shuffle without overhead

Write side — blob references are resolved to actual bytes:

  • A new BlobResolvingLargeBinaryWriter detects BlobReference magic headers in incoming binary values
  • It opens the source dataset (with caching), calls Dataset.takeBlobs() to fetch the actual blob bytes
  • The real blob data is then written to the target table through the normal blob write path

Key files

File Status Description
BlobReference.java NEW Compact serializable blob reference format (LBRF magic + dataset URI + column name + row address)
BlobReferenceResolver.java NEW Write-side resolver: opens source dataset → takeBlobs() → returns actual bytes
BaseBlobJoinTest.java NEW Tests for blob preservation through simple INSERT INTO SELECT, JOIN + INSERT, and mixed blob/non-blob JOINs
BlobJoinTest.java (3.4 + 3.5) NEW Concrete test implementations
BlobStructAccessor.java Modified Added blob reference context (dataset URI, column name, row addresses) and getBlobReference()
LanceArrowColumnVector.java Modified getBinary() returns blob reference instead of byte[0]
LanceFragmentScanner.java Modified Detects blob columns, requests _rowaddr, exposes dataset URI and blob column names
LanceFragmentColumnarBatchScanner.java Modified Extracts row addresses from _rowaddr, sets blob context on BlobStructAccessor, strips implicit _rowaddr
LanceArrowWriter.scala Modified Added BlobResolvingLargeBinaryWriter for blob-encoded columns
BaseBlobCreateTableTest.java Modified Updated assertions for new blob reference behavior

Test plan

  • BlobJoinTest.testBlobPreservedDuringInsertIntoSelect — Simple INSERT INTO target SELECT FROM source preserves blob data
  • BlobJoinTest.testBlobPreservedDuringJoinAndInsertJOIN of two blob tables preserves both blob columns in target
  • BlobJoinTest.testNonBlobColumnsPreservedDuringJoinWithBlobs — Non-blob columns survive JOIN+INSERT alongside blob columns
  • All existing BlobCreateTableTest tests pass (9/9)

🤖 Generated with Claude Code

@github-actions github-actions Bot added the enhancement New feature or request label Mar 27, 2026
jackye1995 pushed a commit to lance-format/lance that referenced this pull request Apr 7, 2026
## Summary

Add support for writing blob v2 columns with external URI references
that are outside registered base paths. This enables use cases like
INSERT INTO SELECT across Lance tables where the target table stores
external blob references pointing to the source table's blob files
instead of copying the actual blob bytes.

## Changes

- **WriteParams.java**: Add `allowExternalBlobOutsideBases`
Optional<Boolean> field, getter, and builder method
- **Fragment.java**: Pass the new field through `createWithFfiArray` and
`createWithFfiStream` native methods
- **fragment.rs (JNI)**: Thread the new `Optional<Boolean>` parameter
through all fragment creation functions to `extract_write_params`
- **utils.rs (JNI)**: Parse the new parameter and set
`allow_external_blob_outside_bases` on Rust `WriteParams`
- **blocking_dataset.rs (JNI)**: Pass `JObject::null()` for the new
param in `Dataset.write()` path (not needed there)

## Context

This is a prerequisite for lance-spark blob JOIN support
(lance-format/lance-spark#355). When blob data flows through Spark's
shuffle during JOIN + INSERT INTO, the target table needs to write
external blob references pointing to the source table's physical blob
files. The Rust `BlobPreprocessor` already supports this via
`allow_external_blob_outside_bases`, but the Java SDK had no way to set
it.

Ref: #6321, #6322

## Test plan

- [x] Rust JNI code compiles cleanly (no errors in changed files)
- [ ] Java unit tests (CI)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
beinan and others added 3 commits May 13, 2026 20:11
When blob columns flow through Spark's shuffle (e.g., INSERT INTO target
SELECT ... FROM source_a JOIN source_b), the actual blob data was
previously lost because getBinary() returned empty byte arrays.

This change introduces a blob reference mechanism:

**Read side**: Instead of returning byte[0], blob columns now serialize
compact ~100-byte BlobReference descriptors containing the source dataset
URI, column name, and row address. These are small enough to flow through
Spark shuffle without overhead.

**Write side**: A new BlobResolvingLargeBinaryWriter detects blob
references in incoming binary values, opens the source dataset, fetches
the actual blob bytes via Dataset.takeBlobs(), and writes the real data
to the target table.

Key changes:
- BlobReference: compact serializable blob reference format
- BlobReferenceResolver: write-side resolver that fetches actual blob bytes
- LanceFragmentScanner: automatically requests _rowaddr for blob columns
- LanceFragmentColumnarBatchScanner: extracts row addresses, sets blob context
- BlobStructAccessor: produces blob references with dataset/row context
- LanceArrowWriter: BlobResolvingLargeBinaryWriter resolves refs on write

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Currently takeBlobs() is called once per row, which will be a
bottleneck for large joins (e.g. 20B rows). Document the planned
optimization to batch references by (datasetUri, columnName) and
call takeBlobs() once per group.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove lines over 100 chars and fix malformed HTML in javadoc.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@beinan beinan force-pushed the feat/blob-join-preservation branch from d3a606c to 9fabe62 Compare May 13, 2026 20:16
beinan and others added 2 commits May 13, 2026 20:27
- Fix typo: withFragemtId -> withFragmentId
- Remove silent exception swallowing in BlobResolvingLargeBinaryWriter;
  let blob resolution failures propagate instead of writing corrupt
  reference bytes to the target table

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Should be a pretty significant help in reducing memory utilization. A few comments to iron things out, but I think I need to dive through the regular read paths to make sure this doesn't negatively affect performance there as well.

Comment thread lance-spark-base_2.12/src/main/scala/org/lance/spark/arrow/LanceArrowWriter.scala Outdated
@hamersaw
Copy link
Copy Markdown
Collaborator

Diving through this a bit more. I'm not sure there's a reason to have a LargeBinaryWriter (existing) and a BlobResolvingLargeBinaryWriter (new). Right now, if we're reading from a blob column and writing to a regular binary we're going to dump a ton of BlobReferences rather than the actual bytes because it uses the LargeBinaryWriter which doesn't resolve the BlobReference. I think we should collapse these into a single writer to mitigate future issues.

beinan and others added 2 commits May 15, 2026 01:05
- Store blob column names as Set<String> in LanceFragmentScanner
  instead of creating a new HashSet per batch
- Search for _rowaddr by name instead of assuming last position
- Batch takeBlobs() calls in BlobReferenceResolver (batch size 64)
- Merge LargeBinaryWriter and BlobResolvingLargeBinaryWriter into
  one writer that always resolves blob references, preventing
  silent corruption when copying blob->regular binary columns
- Close resolver properly via finish()/reset() lifecycle
- Add BlobReference round-trip serialization unit test
- Verify actual blob content in integration tests, not just sizes

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@beinan
Copy link
Copy Markdown
Contributor Author

beinan commented May 15, 2026

Good point — collapsed BlobResolvingLargeBinaryWriter into LargeBinaryWriter so there is now a single writer that always resolves blob references via resolveIfNeeded(). This means blob references are correctly resolved regardless of whether the target column is blob-encoded or regular binary, preventing silent data corruption.

@beinan
Copy link
Copy Markdown
Contributor Author

beinan commented May 17, 2026

@hamersaw All feedback has been addressed and CI is fully green. Ready for another look when you get a chance!

Copy link
Copy Markdown
Collaborator

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the iteration here, I think we're getting close!

Comment on lines +107 to +109
for (int i = 0; i < rowCount; i++) {
rowAddresses[i] = rowAddrVector.getObjectNoOverflow(i).longValue();
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-row BigInteger allocation extracting row addresses: extractRowAddresses() runs once per Arrow batch and loops over every row calling rowAddrVector.getObjectNoOverflow(i), which returns a freshly allocated BigInteger that is immediately reduced to a long. For a multi-million-row scan with a blob column this is millions of throwaway BigInteger allocations purely to read a 64-bit value, adding avoidable GC pressure on the scan hot path.

Fix: Use UInt8Vector.get(i), which returns a primitive long directly, instead of getObjectNoOverflow(i).longValue().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — now uses UInt8Vector.get(i) which returns a primitive long directly, avoiding per-row BigInteger allocation.

Comment on lines +88 to +97
if (group.size() >= batchSize) {
flushGroup(groupKey, group);
}

if (pending.resolved == null) {
flushGroup(groupKey, pendingBlobs.remove(groupKey));
}

return pending.resolved;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blob resolve batching is defeated — one takeBlobs() round-trip per row: resolve() adds the pending blob to its group, and only batch-flushes when group.size() >= batchSize (64). But the very next statement if (pending.resolved == null) flushes immediately whenever the batch threshold was NOT hit — which is every call in steady state, because flushGroup() also removes the group from pendingBlobs so it never accumulates. resolveIfNeeded()/resolve() must return bytes synchronously to LargeBinaryWriter.setValue(), so deferral is impossible by construction. Net effect: dataset.takeBlobs() is invoked with exactly one row address per blob row instead of once per 64. On a JOIN+INSERT INTO carrying millions of blob cells through shuffle, this is ~64x the JNI crossings and object-store read round-trips the batching was designed to eliminate, making the feature impractical at scale.

Fix: Make resolution genuinely deferred: have getBinary()/the writer accumulate references and resolve them in true batches (e.g. a two-pass write, or buffer references and back-fill the vector after a single batched takeBlobs per group), rather than resolving synchronously per row.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely reworked — the writer now does genuine two-pass batched resolution. setValue() buffers blob references with empty placeholders in the vector. finish() groups all pending references by (datasetUri, columnName) and calls takeBlobs() once per group, then back-fills the vector with resolved bytes. This reduces JNI+I/O from O(N) to O(groups).

Comment on lines +322 to +327
if (bytes == null || bytes.length == 0) {
valueVector.setSafe(count, bytes)
} else {
val resolved = resolver.resolveIfNeeded(bytes)
valueVector.setSafe(count, resolved)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every non-empty large-binary value written to a blob column is now passed through resolver.resolveIfNeeded(), which calls BlobReference.isBlobReference() — a 4-byte magic-header ('L','B','R','F') sniff plus a >=17 byte length check. Legitimate user-supplied blob bytes whose first four bytes happen to be 0x4C424652 and whose length is >=17 (e.g. an arbitrary file/binary payload) will be misclassified as a serialized BlobReference. deserialize() then either throws IllegalArgumentException (unknown version byte) — failing the INSERT with a RuntimeException — or, on a coincidental version match, parses bogus length-prefixed strings and a garbage dataset URI, causing a takeBlobs() failure or silently wrong data being written. This affects first-time user binary data on a plain INSERT into a blob column, not reference round-trips.

Fix: Disambiguate references from real payloads instead of content sniffing — e.g. carry an out-of-band flag/column indicating the value is a reference, or make the magic long/structured enough (and validate full structure) that collision with real data is negligible, and treat a failed parse as 'not a reference' rather than throwing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in two ways: (1) Extended magic from 4 bytes (LBRF) to 8 bytes (LANCEREF), making random collision probability ~1 in 2^64. (2) isBlobReference() now does full structural validation — checks version byte, verifies both length-prefixed strings fit within the total byte count, and confirms the remaining bytes exactly equal colLen + 8 (the row address). Random binary data that happens to start with LANCEREF will fail the structural check.

Comment on lines +340 to +342
override def reset(): Unit = {
super.reset()
resolver.close()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset() closes resolver, discarding the source-dataset cache: LargeBinaryWriter.reset() calls resolver.close(), which closes and clears BlobReferenceResolver.datasetCache — the cache whose stated purpose is 'avoid re-opening the same dataset for every row'. Arrow field writers are reset between batches, so the source Lance dataset (manifest + metadata I/O, JNI open) is re-opened on the first blob of every subsequent batch instead of once per task. Combined with the broken batching above, the write path repeatedly re-opens the source dataset and issues per-row blob fetches.

Fix: Release the resolver (and its dataset cache) once at writer finish/close, not on every reset(); reset() should only flush pending state, keeping cached datasets open for the task lifetime.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — reset() now only clears the pending blob reference buffer. The resolver (and its dataset cache) lives for the lifetime of the writer and is never closed/recreated between batches. Datasets opened on the first batch stay cached for all subsequent batches in the same task.

beinan and others added 2 commits May 18, 2026 17:41
- Use UInt8Vector.get(i) instead of getObjectNoOverflow(i).longValue()
  to avoid per-row BigInteger allocation on the scan hot path
- Extend magic header from 4 bytes (LBRF) to 8 bytes (LANCEREF) and
  add full structural validation in isBlobReference() to prevent
  false positives from user binary data
- Implement genuine two-pass batched blob resolution: setValue()
  buffers blob references with empty placeholders, finish() resolves
  all references in a single takeBlobs() call per (dataset, column)
  group and back-fills the vector
- Keep dataset cache alive across batches (reset() only clears
  pending state, close() releases datasets at task end)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Use parallel List<Integer> and List<BlobReference> instead of
Scala tuples to avoid Java generics invariance issues.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants