Skip to content

Commit fdefbf2

Browse files
committed
Adapt to Iceberg 1.7.x API changes
- Migrate WriteResult → FlinkWriteResult in Flink sink pipeline - Add FieldGetter[] parameter to RowDataUtil.clone() in RowDataRecordFactory - Migrate PruneColumns from ParquetTypeVisitor to TypeWithSchemaVisitor - Fix INT96 timestamp min/max statistics byte-order inversion - Fork MiniClusterResource (removed in Iceberg 1.7.0) into test package - Update MiniClusterResource imports in all test classes Signed-off-by: Jiwon Park <jpark92@outlook.kr>
1 parent bc4c637 commit fdefbf2

15 files changed

Lines changed: 170 additions & 68 deletions

File tree

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/RowDataRecordFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
class RowDataRecordFactory implements RecordFactory<RowData> {
3030
private final RowType rowType;
3131
private final TypeSerializer[] fieldSerializers;
32+
private final RowData.FieldGetter[] fieldGetters;
3233

3334
RowDataRecordFactory(RowType rowType) {
3435
this.rowType = rowType;
3536
this.fieldSerializers = createFieldSerializers(rowType);
37+
this.fieldGetters = createFieldGetters(rowType);
3638
}
3739

3840
static TypeSerializer[] createFieldSerializers(RowType rowType) {
@@ -41,6 +43,14 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) {
4143
.toArray(TypeSerializer[]::new);
4244
}
4345

46+
static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
47+
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
48+
for (int i = 0; i < rowType.getFieldCount(); i++) {
49+
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
50+
}
51+
return fieldGetters;
52+
}
53+
4454
@Override
4555
public RowData[] createBatch(int batchSize) {
4656
RowData[] arr = new RowData[batchSize];
@@ -57,6 +67,7 @@ public void clone(RowData from, RowData[] batch, int position) {
5767
// Clone method will allocate a new GenericRowData object
5868
// if the target object is NOT a GenericRowData.
5969
// So we should always set the clone return value back to the array.
60-
batch[position] = RowDataUtil.clone(from, batch[position], rowType, fieldSerializers);
70+
batch[position] =
71+
RowDataUtil.clone(from, batch[position], rowType, fieldSerializers, fieldGetters);
6172
}
6273
}

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
import org.apache.iceberg.Schema;
3636
import org.apache.iceberg.encryption.EncryptionManager;
3737
import org.apache.iceberg.flink.TableLoader;
38+
import org.apache.iceberg.flink.sink.FlinkWriteResult;
3839
import org.apache.iceberg.flink.sink.TaskWriterFactory;
3940
import org.apache.iceberg.flink.source.FlinkInputFormat;
4041
import org.apache.iceberg.flink.source.ScanContext;
4142
import org.apache.iceberg.flink.source.StreamingReaderOperator;
4243
import org.apache.iceberg.io.FileIO;
43-
import org.apache.iceberg.io.WriteResult;
4444
import org.apache.iceberg.util.ThreadPools;
4545

4646
import java.lang.reflect.Constructor;
@@ -76,7 +76,7 @@ public static KeySelector<RowData, Object> newPartitionKeySelector(
7676
}
7777
}
7878

79-
public static OneInputStreamOperator<WriteResult, Void> newIcebergFilesCommitter(
79+
public static OneInputStreamOperator<FlinkWriteResult, Void> newIcebergFilesCommitter(
8080
TableLoader tableLoader, boolean replacePartitions, String branch, PartitionSpec spec) {
8181
try {
8282
Class<?> clazz = forName(ICEBERG_FILE_COMMITTER_CLASS);
@@ -89,7 +89,7 @@ public static OneInputStreamOperator<WriteResult, Void> newIcebergFilesCommitter
8989
String.class,
9090
PartitionSpec.class);
9191
c.setAccessible(true);
92-
return (OneInputStreamOperator<WriteResult, Void>)
92+
return (OneInputStreamOperator<FlinkWriteResult, Void>)
9393
c.newInstance(
9494
tableLoader,
9595
replacePartitions,
@@ -105,13 +105,13 @@ public static OneInputStreamOperator<WriteResult, Void> newIcebergFilesCommitter
105105
}
106106
}
107107

108-
public static OneInputStreamOperator<WriteResult, Void> newIcebergFilesCommitter(
108+
public static OneInputStreamOperator<FlinkWriteResult, Void> newIcebergFilesCommitter(
109109
TableLoader tableLoader,
110110
boolean replacePartitions,
111111
String branch,
112112
PartitionSpec spec,
113113
AuthenticatedFileIO authenticatedFileIO) {
114-
OneInputStreamOperator<WriteResult, Void> obj =
114+
OneInputStreamOperator<FlinkWriteResult, Void> obj =
115115
newIcebergFilesCommitter(tableLoader, replacePartitions, branch, spec);
116116
return (OneInputStreamOperator) ProxyUtil.getProxy(obj, authenticatedFileIO);
117117
}

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@
6565
import org.apache.iceberg.Schema;
6666
import org.apache.iceberg.SnapshotRef;
6767
import org.apache.iceberg.flink.FlinkSchemaUtil;
68+
import org.apache.iceberg.flink.sink.FlinkWriteResult;
6869
import org.apache.iceberg.flink.sink.TaskWriterFactory;
69-
import org.apache.iceberg.io.WriteResult;
7070
import org.apache.iceberg.types.TypeUtil;
7171
import org.apache.iceberg.util.PropertyUtil;
7272
import org.slf4j.Logger;
@@ -153,15 +153,15 @@ DataStreamSink<?> withEmit(
153153
DataStream<RowData> input,
154154
MixedFormatLogWriter logWriter,
155155
MixedFormatFileWriter fileWriter,
156-
OneInputStreamOperator<WriteResult, Void> committer,
156+
OneInputStreamOperator<FlinkWriteResult, Void> committer,
157157
int writeOperatorParallelism,
158158
MetricsGenerator metricsGenerator,
159159
String emitMode) {
160160
SingleOutputStreamOperator writerStream =
161161
input
162162
.transform(
163163
MixedFormatWriter.class.getName(),
164-
TypeExtractor.createTypeInfo(WriteResult.class),
164+
TypeExtractor.createTypeInfo(FlinkWriteResult.class),
165165
new MixedFormatWriter<>(logWriter, fileWriter, metricsGenerator))
166166
.name(String.format("MixedFormatWriter %s(%s)", table.name(), emitMode))
167167
.setParallelism(writeOperatorParallelism);
@@ -414,7 +414,7 @@ private static TaskWriterFactory<RowData> createTaskWriterFactory(
414414
return new MixedFormatRowDataTaskWriterFactory(mixedTable, flinkSchema, overwrite);
415415
}
416416

417-
public static OneInputStreamOperator<WriteResult, Void> createFileCommitter(
417+
public static OneInputStreamOperator<FlinkWriteResult, Void> createFileCommitter(
418418
MixedTable mixedTable,
419419
MixedFormatTableLoader tableLoader,
420420
boolean overwrite,
@@ -424,7 +424,7 @@ public static OneInputStreamOperator<WriteResult, Void> createFileCommitter(
424424
mixedTable, tableLoader, overwrite, branch, spec, MIXED_FORMAT_EMIT_FILE);
425425
}
426426

427-
public static OneInputStreamOperator<WriteResult, Void> createFileCommitter(
427+
public static OneInputStreamOperator<FlinkWriteResult, Void> createFileCommitter(
428428
MixedTable mixedTable,
429429
MixedFormatTableLoader tableLoader,
430430
boolean overwrite,

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/MixedFormatFileWriter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
3535
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3636
import org.apache.flink.table.data.RowData;
37+
import org.apache.iceberg.flink.sink.FlinkWriteResult;
3738
import org.apache.iceberg.flink.sink.TaskWriterFactory;
3839
import org.apache.iceberg.io.TaskWriter;
3940
import org.apache.iceberg.io.WriteResult;
@@ -46,8 +47,8 @@
4647
import java.util.stream.IntStream;
4748

4849
/** This is mixed-format table includes writing file data to un keyed table and keyed table. */
49-
public class MixedFormatFileWriter extends AbstractStreamOperator<WriteResult>
50-
implements OneInputStreamOperator<RowData, WriteResult>, BoundedOneInput {
50+
public class MixedFormatFileWriter extends AbstractStreamOperator<FlinkWriteResult>
51+
implements OneInputStreamOperator<RowData, FlinkWriteResult>, BoundedOneInput {
5152

5253
private static final long serialVersionUID = 1L;
5354
private static final Logger LOG = LoggerFactory.getLogger(MixedFormatFileWriter.class);
@@ -61,6 +62,7 @@ public class MixedFormatFileWriter extends AbstractStreamOperator<WriteResult>
6162

6263
private transient TaskWriter<RowData> writer;
6364
private transient int subTaskId;
65+
private transient long currentCheckpointId;
6466
private transient int attemptId;
6567
/**
6668
* Load table in runtime, because that table's refresh method will be invoked in serialization.
@@ -140,6 +142,7 @@ private long getMask(int subTaskId) {
140142

141143
@Override
142144
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
145+
this.currentCheckpointId = checkpointId;
143146
table
144147
.io()
145148
.doAs(
@@ -153,6 +156,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
153156

154157
@Override
155158
public void endInput() throws Exception {
159+
this.currentCheckpointId = Long.MAX_VALUE;
156160
table
157161
.io()
158162
.doAs(
@@ -205,7 +209,7 @@ private void emit(WriteResult writeResult) {
205209
if (shouldEmit(writeResult)) {
206210
// Only emit a non-empty WriteResult to committer operator, thus avoiding submitting too much
207211
// empty snapshots.
208-
output.collect(new StreamRecord<>(writeResult));
212+
output.collect(new StreamRecord<>(new FlinkWriteResult(currentCheckpointId, writeResult)));
209213
}
210214
}
211215

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/FlinkTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.apache.iceberg.data.GenericRecord;
6565
import org.apache.iceberg.data.IdentityPartitionConverters;
6666
import org.apache.iceberg.data.Record;
67-
import org.apache.iceberg.flink.MiniClusterResource;
6867
import org.apache.iceberg.io.CloseableIterable;
6968
import org.apache.iceberg.io.TaskWriter;
7069
import org.apache.iceberg.io.WriteResult;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.flink;
20+
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.configuration.CoreOptions;
23+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
24+
import org.apache.flink.test.util.MiniClusterWithClientResource;
25+
26+
/** Forked from org.apache.iceberg.flink.MiniClusterResource which was removed in Iceberg 1.7.0. */
27+
public class MiniClusterResource {
28+
29+
private static final int DEFAULT_TM_NUM = 1;
30+
private static final int DEFAULT_PARALLELISM = 4;
31+
32+
public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
33+
new Configuration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
34+
35+
private MiniClusterResource() {}
36+
37+
public static MiniClusterWithClientResource createWithClassloaderCheckDisabled() {
38+
return new MiniClusterWithClientResource(
39+
new MiniClusterResourceConfiguration.Builder()
40+
.setNumberTaskManagers(DEFAULT_TM_NUM)
41+
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
42+
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
43+
.build());
44+
}
45+
}

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/catalog/TestMixedCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.amoro.TableTestHelper;
3434
import org.apache.amoro.catalog.BasicCatalogTestHelper;
3535
import org.apache.amoro.catalog.CatalogTestBase;
36+
import org.apache.amoro.flink.MiniClusterResource;
3637
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
3738
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
3839
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
@@ -51,7 +52,6 @@
5152
import org.apache.flink.types.Row;
5253
import org.apache.flink.util.CloseableIterator;
5354
import org.apache.flink.util.CollectionUtil;
54-
import org.apache.iceberg.flink.MiniClusterResource;
5555
import org.junit.After;
5656
import org.junit.Assert;
5757
import org.junit.Before;

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/AmoroCatalogITCaseBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED;
2222

2323
import org.apache.amoro.TestAms;
24+
import org.apache.amoro.flink.MiniClusterResource;
2425
import org.apache.amoro.formats.AmoroCatalogTestBase;
2526
import org.apache.amoro.formats.AmoroCatalogTestHelper;
2627
import org.apache.amoro.hive.TestHMS;
@@ -36,7 +37,6 @@
3637
import org.apache.flink.table.api.TableResult;
3738
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
3839
import org.apache.flink.test.util.MiniClusterWithClientResource;
39-
import org.apache.iceberg.flink.MiniClusterResource;
4040
import org.junit.ClassRule;
4141

4242
import java.io.IOException;

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/CatalogITCaseBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.amoro.TableTestHelper;
2424
import org.apache.amoro.catalog.CatalogTestHelper;
2525
import org.apache.amoro.catalog.TableTestBase;
26+
import org.apache.amoro.flink.MiniClusterResource;
2627
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2728
import org.apache.flink.configuration.Configuration;
2829
import org.apache.flink.runtime.state.StateBackend;
@@ -35,7 +36,6 @@
3536
import org.apache.flink.table.api.TableResult;
3637
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
3738
import org.apache.flink.test.util.MiniClusterWithClientResource;
38-
import org.apache.iceberg.flink.MiniClusterResource;
3939
import org.junit.ClassRule;
4040

4141
public abstract class CatalogITCaseBase extends TableTestBase {

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestUnkeyedOverwrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.amoro.catalog.BasicCatalogTestHelper;
2525
import org.apache.amoro.catalog.CatalogTestHelper;
2626
import org.apache.amoro.flink.FlinkTestBase;
27+
import org.apache.amoro.flink.MiniClusterResource;
2728
import org.apache.amoro.flink.util.DataUtil;
2829
import org.apache.amoro.hive.TestHMS;
2930
import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
@@ -32,7 +33,6 @@
3233
import org.apache.flink.table.api.DataTypes;
3334
import org.apache.flink.table.api.Table;
3435
import org.apache.flink.test.util.MiniClusterWithClientResource;
35-
import org.apache.iceberg.flink.MiniClusterResource;
3636
import org.junit.After;
3737
import org.junit.Assert;
3838
import org.junit.ClassRule;

0 commit comments

Comments
 (0)