Search before asking
Fluss version
main (development)
Please describe the bug 🐞
When a Flink source starts from a specified log position, such as timestamp/latest/earliest startup instead of full startup, FlinkTableSource correctly passes null as the lakeSource to FlinkSourceEnumerator. The source therefore initializes only Fluss log splits and does not initialize lake snapshot splits.
After the job is restored from a checkpoint/savepoint, the recreated source can pass a non-null lakeSource to FlinkSourceEnumerator. Because the enumerator checkpoint state currently stores remainingHybridLakeFlussSplits as null in the original log-only startup, the restored enumerator treats the state as if lake splits have not been generated yet. It then initializes lake snapshot splits during restore.
This changes the startup semantics across restore and can duplicate historical data: the original run starts from the specified Fluss log position, but the restored run additionally reads lake snapshot data.
Expected behavior: restoring a source that originally started in log-only mode should preserve that mode and must not initialize lake snapshot splits.
Actual behavior: restore can initialize lake snapshot splits if lakeSource becomes non-null, causing duplicated initialization/read of historical lake data.
Solution
Use the existing remainingHybridLakeFlussSplits checkpoint field as a sentinel. For a source enumerator that starts with lake snapshot reading disabled, persist an empty list instead of null. On restore, pendingHybridLakeFlussSplits != null already means the enumerator should not list/generate lake splits again, so an empty list preserves the original log-only startup semantics without changing the enumerator state schema.
Are you willing to submit a PR?
Search before asking
Fluss version
main (development)
Please describe the bug 🐞
When a Flink source starts from a specified log position, such as timestamp/latest/earliest startup instead of full startup,
FlinkTableSourcecorrectly passesnullas thelakeSourcetoFlinkSourceEnumerator. The source therefore initializes only Fluss log splits and does not initialize lake snapshot splits.After the job is restored from a checkpoint/savepoint, the recreated source can pass a non-null
lakeSourcetoFlinkSourceEnumerator. Because the enumerator checkpoint state currently storesremainingHybridLakeFlussSplitsasnullin the original log-only startup, the restored enumerator treats the state as if lake splits have not been generated yet. It then initializes lake snapshot splits during restore.This changes the startup semantics across restore and can duplicate historical data: the original run starts from the specified Fluss log position, but the restored run additionally reads lake snapshot data.
Expected behavior: restoring a source that originally started in log-only mode should preserve that mode and must not initialize lake snapshot splits.
Actual behavior: restore can initialize lake snapshot splits if
lakeSourcebecomes non-null, causing duplicated initialization/read of historical lake data.Solution
Use the existing
remainingHybridLakeFlussSplitscheckpoint field as a sentinel. For a source enumerator that starts with lake snapshot reading disabled, persist an empty list instead ofnull. On restore,pendingHybridLakeFlussSplits != nullalready means the enumerator should not list/generate lake splits again, so an empty list preserves the original log-only startup semantics without changing the enumerator state schema.Are you willing to submit a PR?