Skip to content

[SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution#56548

Closed
ericm-db wants to merge 2 commits into
apache:branch-4.2from
ericm-db:SPARK-56971-branch-4.2
Closed

[SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution#56548
ericm-db wants to merge 2 commits into
apache:branch-4.2from
ericm-db:SPARK-56971-branch-4.2

Conversation

@ericm-db

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Backport of [SPARK-56971] (apache/spark#56019) to branch-4.2.

Add the commit log data structures for streaming sink evolution:

  • CommitMetadataV3 (VERSION_3 of the commit log wire format) carries a sinkMetadataMap: Map[String, SinkMetadataInfo] keyed by sink name, in addition to the V2 fields (nextBatchWatermarkMs, stateUniqueIds).
  • SinkMetadataInfo records per-sink metadata: sinkName, commitOffset (serialized via OffsetV2.json()), providerName, apiVersion, and an isActive flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use.
  • CommitMetadataV3.activeSinkMetadataInfo returns the entry with isActive = true; CommitMetadataV3 requires exactly one active sink.
  • CommitLog.createMetadata learns to produce a CommitMetadataV3 when commitLogFormatVersion = VERSION_3, requiring a non-empty sinkMetadataMap.
  • CommitLog.readCommitMetadata dispatches v3 files to the new class.

The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through MicroBatchExecution is the SPARK-56972 follow-up.

Prerequisite commit. SPARK-56971 was built on top of [SPARK-56970] (apache/spark#56018), which splits CommitMetadata into a CommitMetadataBase trait with concrete CommitMetadata (V1) and CommitMetadataV2 case classes. branch-4.2 does not yet have SPARK-56970, so this PR includes it as the first commit and adds SPARK-56971 on top. Both commits are cherry-picked from the branch-4.x backports (5322ec30c02 and 706ce2f3743). The only conflicts were import-line collisions in CommitLogSuite.scala (the suite extends SparkFunSuite with SharedSparkSession on branch-4.2); the resolved CommitLog.scala is identical to branch-4.x.

Why are the changes needed?

SPARK-56719 added DataStreamWriter.name() as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in the 4.2 release line.

Does this PR introduce any user-facing change?

No. CommitMetadataV3 is in the internal org.apache.spark.sql.execution.streaming.checkpointing package and is not produced by any code path yet. As part of the SPARK-56970 refactor, V1 commit log files no longer serialize stateUniqueIds: null; old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field.

How was this patch tested?

  • Cherry-picked the two branch-4.x commits; resolved import conflicts in CommitLogSuite.scala.
  • Existing and new CommitLogSuite cases (V1/V2/V3 SerDe, historical-sink retention, createMetadata V3 empty-map failure, exactly-one-active-sink invariant).
  • sql/core main and test sources compile cleanly on branch-4.2 (build/sbt sql/Test/compile).

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-8)

@uros-db uros-db left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a clean backport, thank you @ericm-db! anishshri-db PTAL.

@cloud-fan

Copy link
Copy Markdown
Contributor

what's the justification for the 4.2 backport? @anishshri-db @ericm-db

@anishshri-db

Copy link
Copy Markdown
Contributor

@cloud-fan - mainly so that we can upgrade the commit (persistent) log format sooner. Delaying another release would also delay changing the default format in the future further. (Note that this change only introduces the logic for the format changes - no defaults are changed in this release)

@cloud-fan

Copy link
Copy Markdown
Contributor

can we fix merge conflicts? 4.2.0 RC just failed so now we have a good chance to get this in

@anishshri-db

Copy link
Copy Markdown
Contributor

@ericm-db - could you please update the PR ? thx

ericm-db added 2 commits June 30, 2026 17:21
…2 case classes

Backport of [SPARK-56970] (apache#56018) to `branch-4.x`.

Refactor `CommitLog` so that the commit log metadata is dispatched through a `CommitMetadataBase` trait with concrete `CommitMetadata` (V1, watermark only) and `CommitMetadataV2` (watermark + `stateUniqueIds`) case classes. The deserializer now reads the wire-format version from the file header and constructs the matching subclass.

This is preparation for `CommitMetadataV3` (which adds sink metadata for streaming sink evolution) in a follow-up.

Notable changes:
- Add `CommitMetadataBase` trait and `CommitMetadataV2` case class.
- `CommitMetadata` becomes V1 (no `stateUniqueIds` field).
- Add `CommitLog.createMetadata` factory that dispatches by version and defaults to the configured `STATE_STORE_CHECKPOINT_FORMAT_VERSION`.
- `CommitLog.readCommitMetadata` reads the version line and constructs the matching subclass.
- `MicroBatchExecution`, `OfflineStateRepartitionRunner`, and the existing tests are updated to use the new types / factory.

The pre-refactor `CommitMetadata` carried both the V1 and V2 wire shape in a single case class, with `stateUniqueIds` optional. That made it awkward to add a V3 wire format with additional fields, and forced `serialize` to take the wire version from `SQLConf` rather than from the metadata itself.

No new public API. The wire format for V1 changes slightly: V1 commit log files no longer serialize `stateUniqueIds: null`. Old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field.

This PR also relaxes the version-exact-match check on read so that a commit log opened with the V2 conf can deserialize a V1 file. This incidentally resolves SPARK-50653.

- Existing `CommitLogSuite` (V1, V2, and cross-version); the cross-version test now asserts successful V1 deserialization.
- `sql/core` main and test sources compile cleanly on `branch-4.x` (`build/sbt sql/Test/compile`).

Generated-by: Claude Code (claude-opus-4-7)

Closes apache#56307 from ericm-db/SPARK-56970-branch-4.x.

Authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…evolution

Add the commit log data structures for streaming sink evolution:

- `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`).
- `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` (serialized via `OffsetV2.json()`), `providerName`, and an `isActive` flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use.
- `CommitMetadataV3.activeSinkMetadataInfoOpt` returns the entry with `isActive = true`, if any.
- `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when `commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`.
- `CommitLog.readCommitMetadata` dispatches `v3` files to the new class.

The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through `MicroBatchExecution` (so each batch persists its sink name + offset, and so restarts read the map back) is the SPARK-56972 follow-up.

This PR is built on top of apache#56018 (SPARK-56970). It currently shows the SPARK-56970 commits in its diff; that will resolve once SPARK-56970 merges.

SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in a separate, narrowly scoped change.

No. `CommitMetadataV3` is in the internal `org.apache.spark.sql.execution.streaming.checkpointing` package and is not produced by any code path yet.

Added unit tests in `CommitLogSuite`:
- V3 SerDe with a single active sink (round-trips through commit log).
- V3 retains historical sinks alongside the active one and `activeSinkMetadataInfoOpt` resolves correctly.
- `createMetadata(version = V3, sinkMetadataMap = Map.empty)` fails fast with `IllegalArgumentException`.

Generated-by: Claude Code (claude-opus-4-7)

Closes apache#56019 from ericm-db/sink-evolution-sink-metadata-info.

Authored-by: ericm-db <eric.marnadi@databricks.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
(cherry picked from commit 4d26262)
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
@ericm-db ericm-db force-pushed the SPARK-56971-branch-4.2 branch from ea5478d to 9648a33 Compare June 30, 2026 17:30
anishshri-db pushed a commit that referenced this pull request Jun 30, 2026
…evolution

### What changes were proposed in this pull request?

Backport of [SPARK-56971] ([#56019](#56019)) to `branch-4.2`.

Add the commit log data structures for streaming sink evolution:

- `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`).
- `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` (serialized via `OffsetV2.json()`), `providerName`, `apiVersion`, and an `isActive` flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use.
- `CommitMetadataV3.activeSinkMetadataInfo` returns the entry with `isActive = true`; `CommitMetadataV3` requires exactly one active sink.
- `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when `commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`.
- `CommitLog.readCommitMetadata` dispatches `v3` files to the new class.

The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through `MicroBatchExecution` is the SPARK-56972 follow-up.

**Prerequisite commit.** SPARK-56971 was built on top of [SPARK-56970] ([#56018](#56018)), which splits `CommitMetadata` into a `CommitMetadataBase` trait with concrete `CommitMetadata` (V1) and `CommitMetadataV2` case classes. `branch-4.2` does not yet have SPARK-56970, so this PR includes it as the first commit and adds SPARK-56971 on top. Both commits are cherry-picked from the `branch-4.x` backports (`5322ec30c02` and `706ce2f3743`). The only conflicts were import-line collisions in `CommitLogSuite.scala` (the suite extends `SparkFunSuite with SharedSparkSession` on `branch-4.2`); the resolved `CommitLog.scala` is identical to `branch-4.x`.

### Why are the changes needed?

SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in the 4.2 release line.

### Does this PR introduce _any_ user-facing change?

No. `CommitMetadataV3` is in the internal `org.apache.spark.sql.execution.streaming.checkpointing` package and is not produced by any code path yet. As part of the SPARK-56970 refactor, V1 commit log files no longer serialize `stateUniqueIds: null`; old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field.

### How was this patch tested?

- Cherry-picked the two `branch-4.x` commits; resolved import conflicts in `CommitLogSuite.scala`.
- Existing and new `CommitLogSuite` cases (V1/V2/V3 SerDe, historical-sink retention, `createMetadata` V3 empty-map failure, exactly-one-active-sink invariant).
- `sql/core` main and test sources compile cleanly on `branch-4.2` (`build/sbt sql/Test/compile`).

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-8)

Closes #56548 from ericm-db/SPARK-56971-branch-4.2.

Lead-authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Co-authored-by: ericm-db <eric.marnadi@databricks.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
@anishshri-db

Copy link
Copy Markdown
Contributor

Merged the PR here: bfcc62b

cloud-fan pushed a commit that referenced this pull request Jul 1, 2026
…tchExecution

### What changes were proposed in this pull request?

Backport of [SPARK-56972] ([#56020](#56020)) to `branch-4.2`.

Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches:

- Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`.
- When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`.
- When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged.

This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred.

**Stacked PR.** `branch-4.2` does not yet have the predecessors SPARK-56970 ([#56018](#56018)) and SPARK-56971 ([#56019](#56019)), which are still under review for 4.2 in [#56548](#56548). This PR is built on top of #56548 and currently shows those two predecessor commits in its diff; that will resolve once #56548 merges. Only the final commit (`[SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution`) is the subject of this PR. The cherry-pick of `cfa759af5b6` produced the same diff as on master (+156/-3); the only conflict was an import-line collision in `MicroBatchExecution.scala`, resolved by keeping the branch's existing import and adding `CommitLog`, `CommitMetadataV3`, and `SinkMetadataInfo`.

### Why are the changes needed?

SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable.

### Does this PR introduce _any_ user-facing change?

Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off.

### How was this patch tested?

- Cherry-picked `cfa759af5b6` on top of the SPARK-56971 4.2 backport branch (#56548); resolved the single import-line conflict in `MicroBatchExecution.scala`.
- `StreamingSinkEvolutionSuite` passes on this branch (12 tests, including the four new V3 commit-log cases: named-sink active entry, historical-sink retention across rename, V1/V2 preserved when disabled, and mid-checkpoint upgrade to V3).
- `sql/core` main and test sources compile cleanly (`build/sbt sql/Test/compile`).

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-8)

This pull request and its description were written by Isaac.

Closes #56707 from ericm-db/SPARK-56972-branch-4.2.

Authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants