Skip to content

Commit 0f5909a

Browse files
authored
[flink] Add union read support to datastream (#3432)
* [flink] add union read support to datastream * [flink] support bounded mode in ds * [flink] address comments
1 parent e3ed5b4 commit 0f5909a

3 files changed

Lines changed: 639 additions & 4 deletions

File tree

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.fluss.flink.FlinkConnectorOptions;
2424
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
2525
import org.apache.fluss.flink.source.reader.LeaseContext;
26+
import org.apache.fluss.lake.source.LakeSource;
27+
import org.apache.fluss.lake.source.LakeSplit;
2628
import org.apache.fluss.metadata.TablePath;
2729
import org.apache.fluss.predicate.Predicate;
2830
import org.apache.fluss.types.RowType;
@@ -71,7 +73,8 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
7173
OffsetsInitializer offsetsInitializer,
7274
long scanPartitionDiscoveryIntervalMs,
7375
FlussDeserializationSchema<OUT> deserializationSchema,
74-
boolean streaming) {
76+
boolean streaming,
77+
@Nullable LakeSource<LakeSplit> lakeSource) {
7578
this(
7679
flussConf,
7780
tablePath,
@@ -84,7 +87,8 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
8487
scanPartitionDiscoveryIntervalMs,
8588
FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(),
8689
deserializationSchema,
87-
streaming);
90+
streaming,
91+
lakeSource);
8892
}
8993

9094
FlussSource(
@@ -99,7 +103,8 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
99103
long scanPartitionDiscoveryIntervalMs,
100104
int splitPerAssignmentBatchSize,
101105
FlussDeserializationSchema<OUT> deserializationSchema,
102-
boolean streaming) {
106+
boolean streaming,
107+
@Nullable LakeSource<LakeSplit> lakeSource) {
103108
// TODO: Support partition pushDown in datastream
104109
super(
105110
flussConf,
@@ -115,6 +120,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
115120
deserializationSchema,
116121
streaming,
117122
null,
123+
lakeSource,
118124
LeaseContext.DEFAULT);
119125
}
120126

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.client.admin.Admin;
2323
import org.apache.fluss.client.initializer.OffsetsInitializer;
24+
import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
2425
import org.apache.fluss.config.ConfigOptions;
2526
import org.apache.fluss.config.Configuration;
2627
import org.apache.fluss.flink.FlinkConnectorOptions;
2728
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
29+
import org.apache.fluss.flink.utils.LakeSourceUtils;
30+
import org.apache.fluss.lake.source.LakeSource;
31+
import org.apache.fluss.lake.source.LakeSplit;
2832
import org.apache.fluss.metadata.TableInfo;
2933
import org.apache.fluss.metadata.TablePath;
3034
import org.apache.fluss.predicate.Predicate;
@@ -33,6 +37,7 @@
3337
import org.slf4j.Logger;
3438
import org.slf4j.LoggerFactory;
3539

40+
import java.util.Collections;
3641
import java.util.HashMap;
3742
import java.util.List;
3843
import java.util.Map;
@@ -60,6 +65,11 @@
6065
* .build();
6166
* }</pre>
6267
*
68+
* <p>When the target table has datalake enabled and the source starts in full mode (the default,
69+
* {@link OffsetsInitializer#full()}), the built source performs a union read: it reads the
70+
* historical data tiered to the lake (e.g. Iceberg, Paimon) together with the real-time data still
71+
* in Fluss. Other startup modes (earliest/latest/timestamp) read data from Fluss only.
72+
*
6373
* @param <OUT> The type of records produced by the source being built
6474
*/
6575
public class FlussSourceBuilder<OUT> {
@@ -73,6 +83,7 @@ public class FlussSourceBuilder<OUT> {
7383
private Long scanPartitionDiscoveryIntervalMs;
7484
private Integer splitPerAssignmentBatchSize;
7585
private OffsetsInitializer offsetsInitializer;
86+
private boolean bounded;
7687
private FlussDeserializationSchema<OUT> deserializationSchema;
7788

7889
private String bootstrapServers;
@@ -161,6 +172,19 @@ public FlussSourceBuilder<OUT> setStartingOffsets(OffsetsInitializer offsetsInit
161172
return this;
162173
}
163174

175+
/**
176+
* Builds a bounded source for batch execution. The source reads up to the latest offsets at job
177+
* startup and then finishes; combined with the default {@link OffsetsInitializer#full()} on a
178+
* datalake-enabled table this performs a bounded union read of the lake snapshot and the Fluss
179+
* log. If not called, the source is unbounded (streaming).
180+
*
181+
* @return this builder
182+
*/
183+
public FlussSourceBuilder<OUT> setBounded() {
184+
this.bounded = true;
185+
return this;
186+
}
187+
164188
/**
165189
* Sets the deserialization schema for converting Fluss records to output records.
166190
*
@@ -324,6 +348,40 @@ public FlussSource<OUT> build() {
324348
? tableInfo.getRowType().project(projectedFields)
325349
: tableInfo.getRowType();
326350

351+
// union read (lake historical + Fluss) only applies to full startup mode, like the SQL
352+
// connector; other startup modes read Fluss only.
353+
boolean lakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled();
354+
boolean fullStartup = offsetsInitializer instanceof SnapshotOffsetsInitializer;
355+
356+
if (bounded && !(lakeEnabled && fullStartup)) {
357+
throw new IllegalArgumentException(
358+
String.format(
359+
"Bounded (batch) read requires a datalake-enabled table started in "
360+
+ "full mode (OffsetsInitializer.full()), but table '%s' has "
361+
+ "datalake enabled=%s and full startup mode=%s.",
362+
tablePath, lakeEnabled, fullStartup));
363+
}
364+
365+
LakeSource<LakeSplit> lakeSource = null;
366+
if (lakeEnabled && fullStartup) {
367+
lakeSource =
368+
LakeSourceUtils.createLakeSource(tablePath, tableInfo.getProperties().toMap());
369+
if (lakeSource != null) {
370+
if (projectedFields != null) {
371+
int[][] nestedProjectedFields = new int[projectedFields.length][];
372+
for (int i = 0; i < projectedFields.length; i++) {
373+
nestedProjectedFields[i] = new int[] {projectedFields[i]};
374+
}
375+
lakeSource.withProject(nestedProjectedFields);
376+
}
377+
// push the record-batch filter to the lake side as well,
378+
// so the historical lake scan is filtered consistently with Fluss.
379+
if (logRecordBatchFilter != null) {
380+
lakeSource.withFilters(Collections.singletonList(logRecordBatchFilter));
381+
}
382+
}
383+
}
384+
327385
LOG.info("Creating Fluss Source with Configuration: {}", flussConf);
328386

329387
return new FlussSource<>(
@@ -338,6 +396,7 @@ public FlussSource<OUT> build() {
338396
scanPartitionDiscoveryIntervalMs,
339397
splitPerAssignmentBatchSize,
340398
deserializationSchema,
341-
true);
399+
!bounded,
400+
lakeSource);
342401
}
343402
}

0 commit comments

Comments
 (0)