Skip to content

Commit 313c2d5

Browse files
committed
Flink implementation
1 parent ce9b82a commit 313c2d5

7 files changed

Lines changed: 336 additions & 262 deletions

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java

Lines changed: 145 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.iceberg.flink.sink;
2020

21+
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
22+
2123
import java.io.IOException;
2224
import java.io.Serializable;
2325
import java.io.UncheckedIOException;
@@ -34,6 +36,7 @@
3436
import org.apache.iceberg.avro.Avro;
3537
import org.apache.iceberg.deletes.EqualityDeleteWriter;
3638
import org.apache.iceberg.deletes.PositionDeleteWriter;
39+
import org.apache.iceberg.encryption.EncryptedFiles;
3740
import org.apache.iceberg.encryption.EncryptedOutputFile;
3841
import org.apache.iceberg.flink.FlinkSchemaUtil;
3942
import org.apache.iceberg.flink.data.FlinkAvroWriter;
@@ -44,6 +47,11 @@
4447
import org.apache.iceberg.io.FileAppender;
4548
import org.apache.iceberg.io.FileAppenderFactory;
4649
import org.apache.iceberg.io.OutputFile;
50+
import org.apache.iceberg.io.datafile.AppenderBuilder;
51+
import org.apache.iceberg.io.datafile.DataFileServiceRegistry;
52+
import org.apache.iceberg.io.datafile.DataWriterBuilder;
53+
import org.apache.iceberg.io.datafile.EqualityDeleteWriterBuilder;
54+
import org.apache.iceberg.io.datafile.PositionDeleteWriterBuilder;
4755
import org.apache.iceberg.orc.ORC;
4856
import org.apache.iceberg.parquet.Parquet;
4957
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -91,8 +99,8 @@ private RowType lazyEqDeleteFlinkSchema() {
9199

92100
private RowType lazyPosDeleteFlinkSchema() {
93101
if (posDeleteFlinkSchema == null) {
94-
Preconditions.checkNotNull(posDeleteRowSchema, "Pos-delete row schema shouldn't be null");
95-
this.posDeleteFlinkSchema = FlinkSchemaUtil.convert(posDeleteRowSchema);
102+
this.posDeleteFlinkSchema =
103+
FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
96104
}
97105
return this.posDeleteFlinkSchema;
98106
}
@@ -101,38 +109,16 @@ private RowType lazyPosDeleteFlinkSchema() {
101109
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
102110
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
103111
try {
104-
switch (format) {
105-
case AVRO:
106-
return Avro.write(outputFile)
107-
.createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema))
108-
.setAll(props)
109-
.schema(schema)
110-
.metricsConfig(metricsConfig)
111-
.overwrite()
112-
.build();
113-
114-
case ORC:
115-
return ORC.write(outputFile)
116-
.createWriterFunc(
117-
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema))
118-
.setAll(props)
119-
.metricsConfig(metricsConfig)
120-
.schema(schema)
121-
.overwrite()
122-
.build();
123-
124-
case PARQUET:
125-
return Parquet.write(outputFile)
126-
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType))
127-
.setAll(props)
128-
.metricsConfig(metricsConfig)
129-
.schema(schema)
130-
.overwrite()
131-
.build();
132-
133-
default:
134-
throw new UnsupportedOperationException("Cannot write unknown file format: " + format);
135-
}
112+
return DataFileServiceRegistry.appenderBuilder(
113+
format,
114+
RowData.class.getName(),
115+
EncryptedFiles.plainAsEncryptedOutput(outputFile),
116+
flinkSchema)
117+
.setAll(props)
118+
.schema(schema)
119+
.metricsConfig(metricsConfig)
120+
.overwrite()
121+
.build();
136122
} catch (IOException e) {
137123
throw new UncheckedIOException(e);
138124
}
@@ -162,52 +148,17 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
162148

163149
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
164150
try {
165-
switch (format) {
166-
case AVRO:
167-
return Avro.writeDeletes(outputFile.encryptingOutputFile())
168-
.createWriterFunc(ignore -> new FlinkAvroWriter(lazyEqDeleteFlinkSchema()))
169-
.withPartition(partition)
170-
.overwrite()
171-
.setAll(props)
172-
.metricsConfig(metricsConfig)
173-
.rowSchema(eqDeleteRowSchema)
174-
.withSpec(spec)
175-
.withKeyMetadata(outputFile.keyMetadata())
176-
.equalityFieldIds(equalityFieldIds)
177-
.buildEqualityWriter();
178-
179-
case ORC:
180-
return ORC.writeDeletes(outputFile.encryptingOutputFile())
181-
.createWriterFunc(
182-
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema))
183-
.withPartition(partition)
184-
.overwrite()
185-
.setAll(props)
186-
.metricsConfig(metricsConfig)
187-
.rowSchema(eqDeleteRowSchema)
188-
.withSpec(spec)
189-
.withKeyMetadata(outputFile.keyMetadata())
190-
.equalityFieldIds(equalityFieldIds)
191-
.buildEqualityWriter();
192-
193-
case PARQUET:
194-
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
195-
.createWriterFunc(
196-
msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType))
197-
.withPartition(partition)
198-
.overwrite()
199-
.setAll(props)
200-
.metricsConfig(metricsConfig)
201-
.rowSchema(eqDeleteRowSchema)
202-
.withSpec(spec)
203-
.withKeyMetadata(outputFile.keyMetadata())
204-
.equalityFieldIds(equalityFieldIds)
205-
.buildEqualityWriter();
206-
207-
default:
208-
throw new UnsupportedOperationException(
209-
"Cannot write equality-deletes for unsupported file format: " + format);
210-
}
151+
return DataFileServiceRegistry.equalityDeleteWriterBuilder(
152+
format, RowData.class.getName(), outputFile, lazyEqDeleteFlinkSchema())
153+
.withPartition(partition)
154+
.overwrite()
155+
.setAll(props)
156+
.metricsConfig(metricsConfig)
157+
.rowSchema(eqDeleteRowSchema)
158+
.withSpec(spec)
159+
.withKeyMetadata(outputFile.keyMetadata())
160+
.equalityFieldIds(equalityFieldIds)
161+
.buildEqualityWriter();
211162
} catch (IOException e) {
212163
throw new UncheckedIOException(e);
213164
}
@@ -218,57 +169,124 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(
218169
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
219170
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
220171
try {
221-
switch (format) {
222-
case AVRO:
223-
return Avro.writeDeletes(outputFile.encryptingOutputFile())
224-
.createWriterFunc(ignore -> new FlinkAvroWriter(lazyPosDeleteFlinkSchema()))
225-
.withPartition(partition)
226-
.overwrite()
227-
.setAll(props)
228-
.metricsConfig(metricsConfig)
229-
.rowSchema(posDeleteRowSchema)
230-
.withSpec(spec)
231-
.withKeyMetadata(outputFile.keyMetadata())
232-
.buildPositionWriter();
172+
return DataFileServiceRegistry.positionDeleteWriterBuilder(
173+
format, RowData.class.getName(), outputFile, lazyPosDeleteFlinkSchema())
174+
.withPartition(partition)
175+
.overwrite()
176+
.setAll(props)
177+
.metricsConfig(metricsConfig)
178+
.rowSchema(posDeleteRowSchema)
179+
.withSpec(spec)
180+
.withKeyMetadata(outputFile.keyMetadata())
181+
.buildPositionWriter();
182+
} catch (IOException e) {
183+
throw new UncheckedIOException(e);
184+
}
185+
}
186+
187+
public static class AvroWriterService implements DataFileServiceRegistry.WriterService<RowType> {
188+
@Override
189+
public DataFileServiceRegistry.Key key() {
190+
return new DataFileServiceRegistry.Key(FileFormat.AVRO, RowData.class.getName());
191+
}
233192

234-
case ORC:
235-
RowType orcPosDeleteSchema =
236-
FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
237-
return ORC.writeDeletes(outputFile.encryptingOutputFile())
238-
.createWriterFunc(
239-
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(orcPosDeleteSchema, iSchema))
240-
.withPartition(partition)
241-
.overwrite()
242-
.setAll(props)
243-
.metricsConfig(metricsConfig)
244-
.rowSchema(posDeleteRowSchema)
245-
.withSpec(spec)
246-
.withKeyMetadata(outputFile.keyMetadata())
247-
.transformPaths(path -> StringData.fromString(path.toString()))
248-
.buildPositionWriter();
193+
@Override
194+
public AppenderBuilder appenderBuilder(EncryptedOutputFile outputFile, RowType rowType) {
195+
return Avro.write(outputFile).createWriterFunc(ignore -> new FlinkAvroWriter(rowType));
196+
}
249197

250-
case PARQUET:
251-
RowType flinkPosDeleteSchema =
252-
FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
253-
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
254-
.createWriterFunc(
255-
msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType))
256-
.withPartition(partition)
257-
.overwrite()
258-
.setAll(props)
259-
.metricsConfig(metricsConfig)
260-
.rowSchema(posDeleteRowSchema)
261-
.withSpec(spec)
262-
.withKeyMetadata(outputFile.keyMetadata())
263-
.transformPaths(path -> StringData.fromString(path.toString()))
264-
.buildPositionWriter();
198+
@Override
199+
public DataWriterBuilder dataWriterBuilder(EncryptedOutputFile outputFile, RowType rowType) {
200+
return Avro.writeData(outputFile).createWriterFunc(ignore -> new FlinkAvroWriter(rowType));
201+
}
265202

266-
default:
267-
throw new UnsupportedOperationException(
268-
"Cannot write pos-deletes for unsupported file format: " + format);
203+
@Override
204+
public EqualityDeleteWriterBuilder<?> equalityDeleteWriterBuilder(
205+
EncryptedOutputFile outputFile, RowType rowType) {
206+
return Avro.writeDeletes(outputFile).createWriterFunc(ignore -> new FlinkAvroWriter(rowType));
207+
}
208+
209+
@Override
210+
public PositionDeleteWriterBuilder<?> positionDeleteWriterBuilder(
211+
EncryptedOutputFile outputFile, RowType rowType) {
212+
Avro.DeleteWriteBuilder builder = Avro.writeDeletes(outputFile);
213+
int rowFieldIndex = rowType != null ? rowType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME) : -1;
214+
if (rowFieldIndex >= 0) {
215+
// FlinkAvroWriter accepts just the Flink type of the row ignoring the path and pos
216+
return builder.createWriterFunc(
217+
ignored -> new FlinkAvroWriter((RowType) rowType.getTypeAt(rowFieldIndex)));
218+
} else {
219+
return builder;
269220
}
270-
} catch (IOException e) {
271-
throw new UncheckedIOException(e);
221+
}
222+
}
223+
224+
public static class ParquetWriterService
225+
implements DataFileServiceRegistry.WriterService<RowType> {
226+
@Override
227+
public DataFileServiceRegistry.Key key() {
228+
return new DataFileServiceRegistry.Key(FileFormat.PARQUET, RowData.class.getName());
229+
}
230+
231+
@Override
232+
public AppenderBuilder appenderBuilder(EncryptedOutputFile outputFile, RowType rowType) {
233+
return Parquet.write(outputFile)
234+
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(rowType, msgType));
235+
}
236+
237+
@Override
238+
public DataWriterBuilder dataWriterBuilder(EncryptedOutputFile outputFile, RowType rowType) {
239+
return Parquet.writeData(outputFile)
240+
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(rowType, msgType));
241+
}
242+
243+
@Override
244+
public EqualityDeleteWriterBuilder<?> equalityDeleteWriterBuilder(
245+
EncryptedOutputFile outputFile, RowType rowType) {
246+
return Parquet.writeDeletes(outputFile)
247+
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(rowType, msgType));
248+
}
249+
250+
@Override
251+
public PositionDeleteWriterBuilder<?> positionDeleteWriterBuilder(
252+
EncryptedOutputFile outputFile, RowType rowType) {
253+
return Parquet.writeDeletes(outputFile)
254+
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(rowType, msgType))
255+
.transformPaths(path -> StringData.fromString(path.toString()));
256+
}
257+
}
258+
259+
public static class ORCWriterService implements DataFileServiceRegistry.WriterService<RowType> {
260+
@Override
261+
public DataFileServiceRegistry.Key key() {
262+
return new DataFileServiceRegistry.Key(FileFormat.ORC, RowData.class.getName());
263+
}
264+
265+
@Override
266+
public AppenderBuilder appenderBuilder(EncryptedOutputFile outputFile, RowType rowType) {
267+
return ORC.write(outputFile)
268+
.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(rowType, iSchema));
269+
}
270+
271+
@Override
272+
public DataWriterBuilder dataWriterBuilder(EncryptedOutputFile outputFile, RowType rowType) {
273+
return ORC.writeData(outputFile)
274+
.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(rowType, iSchema));
275+
}
276+
277+
@Override
278+
public EqualityDeleteWriterBuilder<?> equalityDeleteWriterBuilder(
279+
EncryptedOutputFile outputFile, RowType rowType) {
280+
return ORC.writeDeletes(outputFile)
281+
.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(rowType, iSchema));
282+
}
283+
284+
@Override
285+
public PositionDeleteWriterBuilder<?> positionDeleteWriterBuilder(
286+
EncryptedOutputFile outputFile, RowType rowType) {
287+
return ORC.writeDeletes(outputFile)
288+
.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(rowType, iSchema))
289+
.transformPaths(path -> StringData.fromString(path.toString()));
272290
}
273291
}
274292
}

0 commit comments

Comments
 (0)