Skip to content

Commit 3b01bc0

Browse files
author
hongli.wwj
committed
[flink] Expose scan.bucket for single-bucket manifest pruning
1 parent 94b468a commit 3b01bc0

10 files changed

Lines changed: 552 additions & 9 deletions

File tree

docs/generated/flink_connector_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@
152152
<td>Boolean</td>
153153
<td>Bounded mode for Paimon consumer. By default, Paimon automatically selects bounded mode based on the mode of the Flink job.</td>
154154
</tr>
155+
<tr>
156+
<td><h5>scan.bucket</h5></td>
157+
<td style="word-wrap: break-word;">(none)</td>
158+
<td>Integer</td>
159+
<td>Specify a single bucket to scan. This option filters manifest entries and only plans splits for the given bucket. It is only supported for fixed-bucket primary key tables (bucket &gt; 0). It cannot be used with postpone bucket tables.</td>
160+
</tr>
155161
<tr>
156162
<td><h5>scan.dedicated-split-generation</h5></td>
157163
<td style="word-wrap: break-word;">false</td>

paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.paimon.predicate.Predicate;
2424
import org.apache.paimon.predicate.PredicateBuilder;
2525
import org.apache.paimon.predicate.TopN;
26+
import org.apache.paimon.table.BucketMode;
27+
import org.apache.paimon.table.FileStoreTable;
2628
import org.apache.paimon.table.InnerTable;
2729
import org.apache.paimon.types.RowType;
2830
import org.apache.paimon.utils.Filter;
@@ -37,6 +39,7 @@
3739

3840
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
3941
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
42+
import static org.apache.paimon.utils.Preconditions.checkArgument;
4043
import static org.apache.paimon.utils.Preconditions.checkState;
4144

4245
/** Implementation for {@link ReadBuilder}. */
@@ -161,10 +164,40 @@ public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) {
161164

162165
@Override
163166
public ReadBuilder withBucket(int bucket) {
167+
validateSpecifiedBucket(table, bucket);
164168
this.specifiedBucket = bucket;
165169
return this;
166170
}
167171

172+
/**
173+
* Validates bucket id before manifest pruning ({@link InnerTableScan#withBucket(int)}). Callers
174+
* such as Flink {@code scan.bucket} should route through {@link #withBucket(int)}.
175+
*/
176+
static void validateSpecifiedBucket(InnerTable table, int bucket) {
177+
checkArgument(bucket >= 0, "Bucket id must be non-negative, but is %s.", bucket);
178+
if (!(table instanceof FileStoreTable)) {
179+
throw new IllegalArgumentException(
180+
"Bucket scan is only supported for FileStoreTable, but got "
181+
+ table.getClass().getName());
182+
}
183+
FileStoreTable fileStoreTable = (FileStoreTable) table;
184+
checkArgument(
185+
fileStoreTable.bucketMode() == BucketMode.HASH_FIXED,
186+
"Bucket scan is only supported for fixed-bucket tables, but got bucket mode %s.",
187+
fileStoreTable.bucketMode());
188+
189+
int numBuckets = CoreOptions.fromMap(fileStoreTable.options()).bucket();
190+
checkArgument(
191+
numBuckets > 0,
192+
"Bucket scan is only supported for tables with bucket > 0, but got bucket %s.",
193+
numBuckets);
194+
checkArgument(
195+
bucket < numBuckets,
196+
"Bucket id %s must be less than table bucket number %s.",
197+
bucket,
198+
numBuckets);
199+
}
200+
168201
@Override
169202
public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
170203
this.bucketFilter = bucketFilter;
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table.source;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.catalog.Catalog;
23+
import org.apache.paimon.catalog.FileSystemCatalog;
24+
import org.apache.paimon.catalog.Identifier;
25+
import org.apache.paimon.fs.Path;
26+
import org.apache.paimon.fs.local.LocalFileIO;
27+
import org.apache.paimon.schema.Schema;
28+
import org.apache.paimon.table.FileStoreTable;
29+
import org.apache.paimon.types.DataTypes;
30+
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.io.TempDir;
33+
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
37+
import static org.assertj.core.api.Assertions.assertThatCode;
38+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
39+
40+
/** Tests for {@link ReadBuilderImpl}. */
41+
public class ReadBuilderImplTest {
42+
43+
@TempDir java.nio.file.Path tempDir;
44+
45+
@Test
46+
public void testValidateSpecifiedBucketOnFixedBucketPrimaryKeyTable() throws Exception {
47+
FileStoreTable table = createTable("4", true);
48+
assertThatCode(() -> table.newReadBuilder().withBucket(0)).doesNotThrowAnyException();
49+
}
50+
51+
@Test
52+
public void testValidateSpecifiedBucketRejectsDynamicBucketTable() throws Exception {
53+
FileStoreTable table = createTable("-1", true);
54+
assertThatThrownBy(() -> table.newReadBuilder().withBucket(0))
55+
.isInstanceOf(IllegalArgumentException.class)
56+
.hasMessageContaining("fixed-bucket tables")
57+
.hasMessageContaining("HASH_DYNAMIC");
58+
}
59+
60+
@Test
61+
public void testValidateSpecifiedBucketRejectsPostponeBucketTable() throws Exception {
62+
FileStoreTable table = createTable("-2", true);
63+
assertThatThrownBy(() -> table.newReadBuilder().withBucket(0))
64+
.isInstanceOf(IllegalArgumentException.class)
65+
.hasMessageContaining("fixed-bucket tables")
66+
.hasMessageContaining("POSTPONE_MODE");
67+
}
68+
69+
@Test
70+
public void testValidateSpecifiedBucketRejectsBucketUnawareTable() throws Exception {
71+
FileStoreTable table = createBucketUnawareAppendOnlyTable();
72+
assertThatThrownBy(() -> table.newReadBuilder().withBucket(0))
73+
.isInstanceOf(IllegalArgumentException.class)
74+
.hasMessageContaining("fixed-bucket tables")
75+
.hasMessageContaining("BUCKET_UNAWARE");
76+
}
77+
78+
@Test
79+
public void testValidateSpecifiedBucketAcceptsAppendOnlyTable() throws Exception {
80+
FileStoreTable table = createAppendOnlyTable("4");
81+
assertThatCode(() -> table.newReadBuilder().withBucket(0)).doesNotThrowAnyException();
82+
}
83+
84+
@Test
85+
public void testValidateSpecifiedBucketRejectsOutOfRangeBucketId() throws Exception {
86+
FileStoreTable table = createTable("4", true);
87+
assertThatThrownBy(() -> table.newReadBuilder().withBucket(4))
88+
.isInstanceOf(IllegalArgumentException.class)
89+
.hasMessageContaining("Bucket id 4 must be less than table bucket number 4");
90+
}
91+
92+
private FileStoreTable createTable(String bucket, boolean withPrimaryKey) throws Exception {
93+
Map<String, String> options = new HashMap<>();
94+
options.put(CoreOptions.BUCKET.key(), bucket);
95+
Schema.Builder schemaBuilder =
96+
Schema.newBuilder().column("id", DataTypes.INT()).column("v", DataTypes.INT());
97+
if (withPrimaryKey) {
98+
schemaBuilder.primaryKey("id");
99+
}
100+
Schema schema = schemaBuilder.options(options).build();
101+
return createTable(schema);
102+
}
103+
104+
private FileStoreTable createBucketUnawareAppendOnlyTable() throws Exception {
105+
Map<String, String> options = new HashMap<>();
106+
options.put(CoreOptions.BUCKET.key(), "-1");
107+
Schema schema =
108+
Schema.newBuilder()
109+
.column("id", DataTypes.INT())
110+
.column("v", DataTypes.INT())
111+
.options(options)
112+
.build();
113+
return createTable(schema);
114+
}
115+
116+
private FileStoreTable createAppendOnlyTable(String bucket) throws Exception {
117+
Map<String, String> options = new HashMap<>();
118+
options.put(CoreOptions.BUCKET.key(), bucket);
119+
options.put(CoreOptions.BUCKET_KEY.key(), "id");
120+
Schema schema =
121+
Schema.newBuilder()
122+
.column("id", DataTypes.INT())
123+
.column("v", DataTypes.INT())
124+
.options(options)
125+
.build();
126+
return createTable(schema);
127+
}
128+
129+
private FileStoreTable createTable(Schema schema) throws Exception {
130+
Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
131+
catalog.createDatabase("default", true);
132+
Identifier identifier = Identifier.create("default", "test_bucket");
133+
catalog.createTable(identifier, schema, false);
134+
return (FileStoreTable) catalog.getTable(identifier);
135+
}
136+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,16 @@ public class FlinkConnectorOptions {
274274
+ "normal source, the max partition(s) will be determined before job running "
275275
+ "without refreshing even for streaming jobs.");
276276

277+
public static final ConfigOption<Integer> SCAN_BUCKET =
278+
ConfigOptions.key("scan.bucket")
279+
.intType()
280+
.noDefaultValue()
281+
.withDescription(
282+
"Specify a single bucket to scan. This option filters manifest entries "
283+
+ "and only plans splits for the given bucket. It is only supported "
284+
+ "for fixed-bucket primary key tables (bucket > 0). It cannot be used "
285+
+ "with postpone bucket tables.");
286+
277287
public static final ConfigOption<Duration> LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
278288
ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
279289
.durationType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.flink.sink.FlinkSink;
2727
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
2828
import org.apache.paimon.flink.source.operator.MonitorSource;
29+
import org.apache.paimon.flink.utils.ScanBucketUtils;
2930
import org.apache.paimon.flink.utils.TableScanUtils;
3031
import org.apache.paimon.options.Options;
3132
import org.apache.paimon.partition.PartitionPredicate;
@@ -200,6 +201,7 @@ private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType
200201
if (limit != null) {
201202
readBuilder.withLimit(limit.intValue());
202203
}
204+
ScanBucketUtils.applyScanBucket(table, readBuilder, conf);
203205
return readBuilder.dropStats();
204206
}
205207

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
2727
import org.apache.paimon.flink.lookup.PartitionLoader;
2828
import org.apache.paimon.flink.lookup.StaticPartitionLoader;
29+
import org.apache.paimon.flink.utils.ScanBucketUtils;
2930
import org.apache.paimon.manifest.PartitionEntry;
3031
import org.apache.paimon.options.Options;
3132
import org.apache.paimon.partition.PartitionPredicate;
@@ -35,6 +36,7 @@
3536
import org.apache.paimon.predicate.PredicateVisitor;
3637
import org.apache.paimon.table.DataTable;
3738
import org.apache.paimon.table.Table;
39+
import org.apache.paimon.table.source.ReadBuilder;
3840
import org.apache.paimon.table.source.Split;
3941
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
4042

@@ -245,13 +247,14 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
245247
protected void scanSplitsForInference() {
246248
if (splitStatistics == null) {
247249
if (table instanceof DataTable) {
248-
List<PartitionEntry> partitionEntries =
250+
ReadBuilder readBuilder =
249251
table.newReadBuilder()
250252
.dropStats()
251253
.withFilter(predicate)
252-
.withPartitionFilter(partitionPredicate)
253-
.newScan()
254-
.listPartitionEntries();
254+
.withPartitionFilter(partitionPredicate);
255+
ScanBucketUtils.applyScanBucket(table, readBuilder, options);
256+
List<PartitionEntry> partitionEntries =
257+
readBuilder.newScan().listPartitionEntries();
255258
long totalSize = 0;
256259
long rowCount = 0;
257260
for (PartitionEntry entry : partitionEntries) {
@@ -262,15 +265,14 @@ protected void scanSplitsForInference() {
262265
splitStatistics =
263266
new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount);
264267
} else {
265-
List<Split> splits =
268+
ReadBuilder readBuilder =
266269
table.newReadBuilder()
267270
.dropStats()
268271
.withFilter(predicate)
269272
.withPartitionFilter(partitionPredicate)
270-
.withProjection(new int[0])
271-
.newScan()
272-
.plan()
273-
.splits();
273+
.withProjection(new int[0]);
274+
ScanBucketUtils.applyScanBucket(table, readBuilder, options);
275+
List<Split> splits = readBuilder.newScan().plan().splits();
274276
splitStatistics =
275277
new SplitStatistics(
276278
splits.size(), splits.stream().mapToLong(Split::rowCount).sum());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/AggregatePushDownUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.flink.LogicalTypeConversion;
23+
import org.apache.paimon.flink.utils.ScanBucketUtils;
24+
import org.apache.paimon.options.Options;
2325
import org.apache.paimon.partition.PartitionPredicate;
2426
import org.apache.paimon.predicate.Predicate;
2527
import org.apache.paimon.table.FileStoreTable;
@@ -153,6 +155,7 @@ private static List<DataSplit> planSplits(
153155
table.newReadBuilder()
154156
.withFilter(predicate)
155157
.withPartitionFilter(partitionPredicate);
158+
ScanBucketUtils.applyScanBucket(table, readBuilder, Options.fromMap(table.options()));
156159
if (minMaxColumns.isEmpty()) {
157160
readBuilder.dropStats();
158161
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.utils;
20+
21+
import org.apache.paimon.flink.FlinkConnectorOptions;
22+
import org.apache.paimon.options.Options;
23+
import org.apache.paimon.table.FileStoreTable;
24+
import org.apache.paimon.table.Table;
25+
import org.apache.paimon.table.source.ReadBuilder;
26+
27+
import static org.apache.paimon.utils.Preconditions.checkArgument;
28+
29+
/** Utilities for {@link FlinkConnectorOptions#SCAN_BUCKET}. */
30+
public class ScanBucketUtils {
31+
32+
private ScanBucketUtils() {}
33+
34+
/** Apply {@link FlinkConnectorOptions#SCAN_BUCKET} to the given {@link ReadBuilder}. */
35+
public static ReadBuilder applyScanBucket(
36+
Table table, ReadBuilder readBuilder, Options options) {
37+
Integer scanBucket = options.get(FlinkConnectorOptions.SCAN_BUCKET);
38+
if (scanBucket == null) {
39+
return readBuilder;
40+
}
41+
if (table instanceof FileStoreTable) {
42+
checkArgument(
43+
!((FileStoreTable) table).schema().primaryKeys().isEmpty(),
44+
"Bucket scan is only supported for primary key tables.");
45+
}
46+
return readBuilder.withBucket(scanBucket);
47+
}
48+
}

0 commit comments

Comments
 (0)