Skip to content

[spark] supports updating blobs through DataEvolution MergeInto#8175

Open
steFaiz wants to merge 7 commits into
apache:masterfrom
steFaiz:spark_de_update_blobs
Open

[spark] supports updating blobs through DataEvolution MergeInto#8175
steFaiz wants to merge 7 commits into
apache:masterfrom
steFaiz:spark_de_update_blobs

Conversation

@steFaiz

@steFaiz steFaiz commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Purpose

Parts of #7881

Supports Spark:

MERGE INTO t
USING s
ON ...
WHEN MATCHED THEN UPDATE SET t.raw_blob = s.raw_blob

where raw_blob means blobs stored in BlobFormat Files

Implementation

Introduce several marker columns during data evolution:

update columns..., _ROW_ID, _FIRST_ROW_ID, marker columns...

This is because spark only allow literal columns for basic types. i.e. BlobPlaceholder is not allowed.
Each blob column have one marker column, representing whether write blob values or BlobPlaceholder.INSTANCE

Side Effects

  1. In Conflict Detection: change current checkRowIdExistence to:
    a. If new files are normal files: should have an exactly matching row ranges
    b. If new files are special storage files: should be exactly a sub range of an existing one
  2. Change the semantics of all-placeholder:
    If all blob records at the same row id are placeholder, it's deemed as NULL now (previously it's illegal)
  3. Also fixes a bug for current DataEvolutionMergeInto impl at below situation: updating different columns with different match condition
WHEN MATCHED AND condition1 THEN UPDATE SET col1 = ...
WHEN MATCHED AND condition2 THEN UPDATE SET col2 = ...

Tests

@steFaiz steFaiz marked this pull request as draft June 9, 2026 03:01
@steFaiz steFaiz force-pushed the spark_de_update_blobs branch from a72f8c8 to b43f513 Compare June 9, 2026 10:09
@steFaiz steFaiz marked this pull request as ready for review June 10, 2026 04:12
@steFaiz steFaiz changed the title [wip][spark] supports updating blobs through DataEvolution MergeInto [spark] supports updating blobs through DataEvolution MergeInto Jun 10, 2026
// The final output is composed by updated columns, metadata columns and blob marker columns.
// Marker columns are used to mark whether a blob field should be written with placeholder
val rawBlobUpdateColumns = updateColumnsSorted.filter(isRawBlobUpdateColumn)
val rawBlobMarkerNamesByColumn = rawBlobUpdateColumns.zipWithIndex.map {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal marker column names can collide with real target columns. For example, a table can legally have a column named __paimon_raw_blob_placeholder_0; if a MERGE updates that column and a raw BLOB in the same statement, mergeOutput will contain two attributes with the same name. Then reorderPartialWriteColumns selects by quoted name and writePartialFields resolves the marker with data.schema.fieldIndex, so Spark can either report an ambiguous reference or bind the user column as the boolean marker. Could we generate marker names that are guaranteed not to collide with the write columns/source output, or carry the marker attributes through by exprId instead of resolving them by name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Fixed, now picking new names will loop and increment the index util find some non-existing columns

base.firstRowId(),
base.rowCount()));
if (base.firstRowId() != null && !dedicatedStorageFile(base.fileName())) {
existingRanges.put(base.firstRowId(), base.rowCount());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition level has been removed, cc @leaves12138 to take a look.

}

Set<FileRowIdKey> existingIndex = new HashSet<>();
NavigableMap<Long, Long> existingRanges = new TreeMap<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use RowRangeIndex? Add method containsExactly.

&& rowCount == that.rowCount
&& Objects.equals(partition, that.partition);
}
private static boolean rowIdRangeCovered(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add containsExactly to RowRangeIndex? This seems common

return Collections.unmodifiableList(ranges);
}

public boolean contains(long start, long end) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already contains method in this class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your remind! I've noticed that this method is added recently in master branch. I've rebased the master and cleaned my code

@steFaiz steFaiz force-pushed the spark_de_update_blobs branch from a4e21ac to b806157 Compare June 11, 2026 09:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants