Skip to content

Commit 1919ae6

Browse files
authored
Flink: Support UUID type in Avro and Parquet readers and writers (#16097)
1 parent bc8f264 commit 1919ae6

6 files changed

Lines changed: 222 additions & 7 deletions

File tree

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public ValueWriter<?> primitive(LogicalType type, Schema primitive) {
132132
return FlinkValueWriters.decimal(decimal.getPrecision(), decimal.getScale());
133133

134134
case "uuid":
135-
return ValueWriters.uuids();
135+
return FlinkValueWriters.uuids();
136136

137137
default:
138138
throw new IllegalArgumentException("Unsupported logical type: " + logicalType);

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,12 @@ public Optional<ParquetValueReader<?>> visit(
311311
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
312312
return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
313313
}
314+
315+
@Override
316+
public Optional<ParquetValueReader<?>> visit(
317+
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
318+
return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
319+
}
314320
}
315321

316322
@Override

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
7676
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
7777
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
78+
import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
7879
import org.apache.parquet.schema.MessageType;
7980
import org.apache.parquet.schema.PrimitiveType;
8081
import org.apache.parquet.schema.Type;
@@ -309,6 +310,11 @@ public Optional<ParquetValueWriter<?>> visit(JsonLogicalTypeAnnotation ignored)
309310
public Optional<ParquetValueWriter<?>> visit(BsonLogicalTypeAnnotation ignored) {
310311
return Optional.of(byteArrays(desc));
311312
}
313+
314+
@Override
315+
public Optional<ParquetValueWriter<?>> visit(UUIDLogicalTypeAnnotation uuid) {
316+
return Optional.of(byteArrays(desc));
317+
}
312318
}
313319

314320
private static ParquetValueWriter<?> ints(LogicalType type, ColumnDescriptor desc) {

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ static ValueWriter<TimestampData> timestampNanos() {
5555
return TimestampNanosWriter.INSTANCE;
5656
}
5757

58+
static ValueWriter<byte[]> uuids() {
59+
return UUIDWriter.INSTANCE;
60+
}
61+
5862
static ValueWriter<DecimalData> decimal(int precision, int scale) {
5963
return new DecimalWriter(precision, scale);
6064
}
@@ -145,6 +149,15 @@ public void write(TimestampData timestampData, Encoder encoder) throws IOExcepti
145149
}
146150
}
147151

152+
private static class UUIDWriter implements ValueWriter<byte[]> {
153+
private static final UUIDWriter INSTANCE = new UUIDWriter();
154+
155+
@Override
156+
public void write(byte[] bytes, Encoder encoder) throws IOException {
157+
encoder.writeFixed(bytes);
158+
}
159+
}
160+
148161
private static class ArrayWriter<T> implements ValueWriter<ArrayData> {
149162
private final ValueWriter<T> elementWriter;
150163
private final ArrayData.ElementGetter elementGetter;

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -732,9 +732,8 @@ private DataStream<RowData> distributeDataStream(
732732
@Deprecated
733733
static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
734734
if (requestedSchema != null) {
735-
// Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing
736-
// iceberg schema.
737-
Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
735+
// Convert the flink schema to iceberg schema using the table schema as the reference.
736+
Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
738737
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
739738

740739
// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will
@@ -749,9 +748,8 @@ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
749748

750749
static RowType toFlinkRowType(Schema schema, ResolvedSchema requestedSchema) {
751750
if (requestedSchema != null) {
752-
// Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing
753-
// iceberg schema.
754-
Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
751+
// Convert the flink schema to iceberg schema using the table schema as the reference.
752+
Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
755753
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
756754

757755
// We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink;
20+
21+
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
import java.nio.ByteBuffer;
25+
import java.nio.file.Path;
26+
import java.util.Arrays;
27+
import java.util.List;
28+
import java.util.UUID;
29+
import org.apache.flink.table.data.GenericRowData;
30+
import org.apache.flink.table.data.RowData;
31+
import org.apache.flink.table.types.logical.RowType;
32+
import org.apache.iceberg.AppendFiles;
33+
import org.apache.iceberg.CombinedScanTask;
34+
import org.apache.iceberg.DataFile;
35+
import org.apache.iceberg.FileFormat;
36+
import org.apache.iceberg.Parameter;
37+
import org.apache.iceberg.Parameters;
38+
import org.apache.iceberg.PartitionSpec;
39+
import org.apache.iceberg.Schema;
40+
import org.apache.iceberg.Table;
41+
import org.apache.iceberg.catalog.Namespace;
42+
import org.apache.iceberg.catalog.TableIdentifier;
43+
import org.apache.iceberg.data.GenericAppenderHelper;
44+
import org.apache.iceberg.data.GenericRecord;
45+
import org.apache.iceberg.data.Record;
46+
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
47+
import org.apache.iceberg.flink.source.DataIterator;
48+
import org.apache.iceberg.flink.source.reader.ReaderUtil;
49+
import org.apache.iceberg.io.CloseableIterable;
50+
import org.apache.iceberg.io.TaskWriter;
51+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
52+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
53+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
54+
import org.apache.iceberg.types.Types;
55+
import org.apache.iceberg.util.UUIDUtil;
56+
import org.junit.jupiter.api.BeforeEach;
57+
import org.junit.jupiter.api.TestTemplate;
58+
import org.junit.jupiter.api.io.TempDir;
59+
60+
class TestFlinkUuidType extends CatalogTestBase {
61+
private static final String TABLE_NAME = "test_table";
62+
private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
63+
private static final UUID EXPECTED_UUID = UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
64+
private static final Schema SCHEMA =
65+
new Schema(
66+
Types.NestedField.required(1, "id", Types.IntegerType.get()),
67+
Types.NestedField.required(2, "uuid", Types.UUIDType.get()));
68+
69+
private Table icebergTable;
70+
@TempDir private Path warehouseDir;
71+
72+
@Parameter(index = 2)
73+
private FileFormat fileFormat;
74+
75+
@Parameters(name = "catalogName={0}, baseNamespace={1}, fileFormat={2}")
76+
protected static List<Object[]> parameters() {
77+
return Arrays.asList(
78+
new Object[] {"testhadoop", Namespace.empty(), FileFormat.PARQUET},
79+
new Object[] {"testhadoop", Namespace.empty(), FileFormat.AVRO},
80+
new Object[] {"testhadoop", Namespace.empty(), FileFormat.ORC});
81+
}
82+
83+
@Override
84+
@BeforeEach
85+
public void before() {
86+
super.before();
87+
sql("CREATE DATABASE %s", flinkDatabase);
88+
sql("USE CATALOG %s", catalogName);
89+
sql("USE %s", DATABASE);
90+
}
91+
92+
/** Writes UUID via Generic writer, reads via Flink. */
93+
@TestTemplate
94+
void uuidWrittenByGenericWriter() throws Exception {
95+
icebergTable =
96+
validationCatalog.createTable(
97+
TableIdentifier.of(icebergNamespace, TABLE_NAME),
98+
SCHEMA,
99+
PartitionSpec.unpartitioned(),
100+
ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
101+
102+
Record record =
103+
GenericRecord.create(icebergTable.schema()).copy("id", 1, "uuid", EXPECTED_UUID);
104+
new GenericAppenderHelper(icebergTable, fileFormat, warehouseDir)
105+
.appendToTable(ImmutableList.of(record));
106+
icebergTable.refresh();
107+
108+
List<GenericRowData> genericRowData = Lists.newArrayList();
109+
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
110+
icebergTable.newScan().planTasks()) {
111+
for (CombinedScanTask combinedScanTask : combinedScanTasks) {
112+
try (DataIterator<RowData> dataIterator =
113+
ReaderUtil.createDataIterator(
114+
combinedScanTask, icebergTable.schema(), icebergTable.schema())) {
115+
while (dataIterator.hasNext()) {
116+
GenericRowData rowData = (GenericRowData) dataIterator.next();
117+
genericRowData.add(rowData);
118+
}
119+
}
120+
}
121+
}
122+
123+
assertThat(genericRowData).hasSize(1);
124+
assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
125+
byte[] bytes = (byte[]) genericRowData.get(0).getField(1);
126+
assertThat(bytes).hasSize(16);
127+
128+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
129+
UUID actualUuid = new UUID(byteBuffer.getLong(), byteBuffer.getLong());
130+
assertThat(actualUuid).isEqualTo(EXPECTED_UUID);
131+
}
132+
133+
/** Writes UUID via Flink TaskWriter, reads via Generic reader. */
134+
@TestTemplate
135+
void writeUuidViaFlinkWriter() throws Exception {
136+
icebergTable =
137+
validationCatalog.createTable(
138+
TableIdentifier.of(icebergNamespace, TABLE_NAME),
139+
SCHEMA,
140+
PartitionSpec.unpartitioned(),
141+
ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
142+
143+
RowType rowType = FlinkSchemaUtil.convert(SCHEMA);
144+
RowDataTaskWriterFactory rowDataTaskWriterFactory =
145+
new RowDataTaskWriterFactory(
146+
icebergTable,
147+
rowType,
148+
TARGET_FILE_SIZE,
149+
fileFormat,
150+
icebergTable.properties(),
151+
null,
152+
false);
153+
rowDataTaskWriterFactory.initialize(1, 1);
154+
155+
byte[] uuidBytes = UUIDUtil.convert(EXPECTED_UUID);
156+
GenericRowData genericRowData = GenericRowData.of(1, uuidBytes);
157+
158+
try (TaskWriter<RowData> writer = rowDataTaskWriterFactory.create()) {
159+
writer.write(genericRowData);
160+
writer.close();
161+
162+
AppendFiles append = icebergTable.newAppend();
163+
for (DataFile dataFile : writer.dataFiles()) {
164+
append.appendFile(dataFile);
165+
}
166+
167+
append.commit();
168+
}
169+
170+
List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
171+
assertThat(records).hasSize(1);
172+
assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
173+
}
174+
175+
/** Writes UUID via SQL INSERT, reads via Generic reader. */
176+
@TestTemplate
177+
void sqlInsertUuid() throws Exception {
178+
icebergTable =
179+
validationCatalog.createTable(
180+
TableIdentifier.of(icebergNamespace, TABLE_NAME),
181+
SCHEMA,
182+
PartitionSpec.unpartitioned(),
183+
ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
184+
185+
String uuidHex = EXPECTED_UUID.toString().replace("-", "");
186+
sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex);
187+
188+
List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
189+
assertThat(records).hasSize(1);
190+
assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
191+
}
192+
}

0 commit comments

Comments
 (0)