Skip to content

[flink] Avoid initializing lake snapshot splits after log-only source restore #3354

@luoyuxia

Description

@luoyuxia

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

main (development)

Please describe the bug 🐞

When a Flink source starts from a specified log position, such as timestamp/latest/earliest startup instead of full startup, FlinkTableSource correctly passes null as the lakeSource to FlinkSourceEnumerator. The source therefore initializes only Fluss log splits and does not initialize lake snapshot splits.

After the job is restored from a checkpoint/savepoint, the recreated source can pass a non-null lakeSource to FlinkSourceEnumerator. Because the enumerator checkpoint state currently stores remainingHybridLakeFlussSplits as null in the original log-only startup, the restored enumerator treats the state as if lake splits have not been generated yet. It then initializes lake snapshot splits during restore.

This changes the startup semantics across restore and can duplicate historical data: the original run starts from the specified Fluss log position, but the restored run additionally reads lake snapshot data.

Expected behavior: restoring a source that originally started in log-only mode should preserve that mode and must not initialize lake snapshot splits.

Actual behavior: restore can initialize lake snapshot splits if lakeSource becomes non-null, causing duplicated initialization/read of historical lake data.

Solution

Use the existing remainingHybridLakeFlussSplits checkpoint field as a sentinel. For a source enumerator that starts with lake snapshot reading disabled, persist an empty list instead of null. On restore, pendingHybridLakeFlussSplits != null already means the enumerator should not list/generate lake splits again, so an empty list preserves the original log-only startup semantics without changing the enumerator state schema.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions