Skip to content

Commit e8c99a7

Browse files
committed
Upgrade Iceberg to 1.10.1 with local ApplyNameMappingForParquet implementation to avoid parquet 1.16 dependency conflict with Hudi/Paimon.
1 parent 365217d commit e8c99a7

File tree

4 files changed

+317
-11
lines changed

4 files changed

+317
-11
lines changed

pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,6 @@
302302
<artifactId>iceberg-api</artifactId>
303303
<version>${iceberg.version}</version>
304304
</dependency>
305-
<dependency>
306-
<groupId>org.apache.iceberg</groupId>
307-
<artifactId>iceberg-parquet</artifactId>
308-
<version>${iceberg.version}</version>
309-
</dependency>
310305
<dependency>
311306
<groupId>org.apache.iceberg</groupId>
312307
<artifactId>iceberg-spark-runtime-${spark.version.prefix}_${scala.binary.version}</artifactId>

xtable-hudi-support/xtable-hudi-support-extensions/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,6 @@
107107
</exclusion>
108108
</exclusions>
109109
</dependency>
110-
<dependency>
111-
<groupId>org.apache.iceberg</groupId>
112-
<artifactId>iceberg-parquet</artifactId>
113-
</dependency>
114110

115111
<!-- hudi -->
116112
<dependency>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.hudi.extensions;
20+
21+
import java.util.ArrayDeque;
22+
import java.util.ArrayList;
23+
import java.util.Deque;
24+
import java.util.List;
25+
import java.util.Objects;
26+
import java.util.stream.Collectors;
27+
28+
import org.apache.parquet.schema.GroupType;
29+
import org.apache.parquet.schema.LogicalTypeAnnotation;
30+
import org.apache.parquet.schema.MessageType;
31+
import org.apache.parquet.schema.PrimitiveType;
32+
import org.apache.parquet.schema.Type;
33+
import org.apache.parquet.schema.Types;
34+
35+
import org.apache.iceberg.mapping.MappedField;
36+
import org.apache.iceberg.mapping.NameMapping;
37+
38+
/**
39+
* A Parquet schema visitor that applies a NameMapping to add field IDs to a Parquet schema.
40+
*
41+
* <h2>Why this class exists</h2>
42+
*
43+
* <p>This is a local implementation adapted from Iceberg's {@code
44+
* org.apache.iceberg.parquet.ApplyNameMapping} to work around a dependency conflict introduced in
45+
* Iceberg 1.10+.
46+
*
47+
* <p>The issue: Iceberg 1.10 added Parquet variant type support, which requires parquet 1.16.0's
48+
* {@code LogicalTypeAnnotation.variantType()} method. However, other dependencies in the XTable
49+
* ecosystem (Hudi 0.14.0, Paimon 1.3.1) are not yet compatible with parquet 1.16:
50+
*
51+
* <ul>
52+
* <li>Hudi 0.14.0 - compiled against parquet 1.10.1, fails with {@code
53+
* GeospatialStatistics.writePage()} errors on parquet 1.16
54+
* <li>Paimon 1.3.1 - bundles shaded parquet with older fastutil, causing {@code LongList.of()}
55+
* conflicts
56+
* </ul>
57+
*
58+
* <p>The original {@code iceberg-parquet} module's {@code ParquetSchemaUtil.applyNameMapping()}
59+
* internally uses {@code ParquetTypeVisitor.visit()}, which unconditionally checks for variant
60+
* types even when not processing variants - triggering the parquet 1.16 requirement.
61+
*
62+
* <p>This implementation avoids extending Iceberg's {@code ParquetTypeVisitor} and instead
63+
* implements the traversal logic directly, skipping variant type handling entirely since it's not
64+
* needed for the field ID mapping use case.
65+
*
66+
* <h2>Deprecation Plan</h2>
67+
*
68+
* <p>This class should be removed in favor of using {@code
69+
* org.apache.iceberg.parquet.ParquetSchemaUtil.applyNameMapping()} directly once the following
70+
* conditions are met:
71+
*
72+
* <ol>
73+
* <li>Hudi upgrades to a version compatible with parquet 1.16+ (likely Hudi 0.16+)
74+
* <li>Paimon upgrades to a version compatible with parquet 1.16+
75+
* <li>XTable can safely set {@code parquet.version} to 1.16.0 or later globally
76+
* </ol>
77+
*
78+
* <p>To migrate back to iceberg-parquet:
79+
*
80+
* <ol>
81+
* <li>Add {@code iceberg-parquet} dependency to xtable-hudi-support-extensions/pom.xml
82+
* <li>Replace usage of {@code ApplyNameMappingForParquet.applyNameMapping()} with {@code
83+
* ParquetSchemaUtil.applyNameMapping()}
84+
* <li>Delete this class
85+
* <li>Update parquet.version to 1.16.0+ in the root pom.xml
86+
* </ol>
87+
*
88+
* @see <a href="https://github.com/apache/iceberg/pull/14588">Iceberg PR #14588 - Add variant type
89+
* support to ParquetTypeVisitor</a>
90+
*/
91+
class ApplyNameMappingForParquet {
92+
93+
private static final String LIST_ELEMENT_NAME = "element";
94+
private static final String MAP_KEY_NAME = "key";
95+
private static final String MAP_VALUE_NAME = "value";
96+
97+
private ApplyNameMappingForParquet() {}
98+
99+
/**
100+
* Applies a NameMapping to a Parquet MessageType, adding field IDs based on the mapping.
101+
*
102+
* @param fileSchema the Parquet schema to apply the mapping to
103+
* @param nameMapping the NameMapping containing field ID assignments
104+
* @return a new MessageType with field IDs applied
105+
*/
106+
public static MessageType applyNameMapping(MessageType fileSchema, NameMapping nameMapping) {
107+
Visitor visitor = new Visitor(nameMapping);
108+
return (MessageType) visit(fileSchema, visitor);
109+
}
110+
111+
private static Type visit(Type type, Visitor visitor) {
112+
if (type instanceof MessageType) {
113+
MessageType message = (MessageType) type;
114+
List<Type> fields = new ArrayList<>();
115+
for (Type field : message.getFields()) {
116+
visitor.beforeField(field);
117+
try {
118+
fields.add(visit(field, visitor));
119+
} finally {
120+
visitor.afterField(field);
121+
}
122+
}
123+
return visitor.message(message, fields);
124+
} else if (type.isPrimitive()) {
125+
return visitor.primitive(type.asPrimitiveType());
126+
} else {
127+
GroupType group = type.asGroupType();
128+
LogicalTypeAnnotation annotation = group.getLogicalTypeAnnotation();
129+
130+
if (LogicalTypeAnnotation.listType().equals(annotation)) {
131+
return visitList(group, visitor);
132+
} else if (LogicalTypeAnnotation.mapType().equals(annotation)) {
133+
return visitMap(group, visitor);
134+
}
135+
136+
// Regular struct
137+
List<Type> fields = new ArrayList<>();
138+
for (Type field : group.getFields()) {
139+
visitor.beforeField(field);
140+
try {
141+
fields.add(visit(field, visitor));
142+
} finally {
143+
visitor.afterField(field);
144+
}
145+
}
146+
return visitor.struct(group, fields);
147+
}
148+
}
149+
150+
private static Type visitList(GroupType list, Visitor visitor) {
151+
if (list.getFieldCount() != 1) {
152+
throw new IllegalArgumentException("Invalid list: " + list);
153+
}
154+
155+
Type repeatedElement = list.getType(0);
156+
Type elementResult;
157+
158+
if (isElementType(list, repeatedElement)) {
159+
visitor.beforeElementField(repeatedElement);
160+
try {
161+
elementResult = visit(repeatedElement, visitor);
162+
} finally {
163+
visitor.afterField(repeatedElement);
164+
}
165+
} else {
166+
GroupType repeated = repeatedElement.asGroupType();
167+
Type element = repeated.getType(0);
168+
visitor.beforeElementField(element);
169+
try {
170+
elementResult = visit(element, visitor);
171+
} finally {
172+
visitor.afterField(element);
173+
}
174+
}
175+
176+
return visitor.list(list, elementResult);
177+
}
178+
179+
private static boolean isElementType(GroupType list, Type repeatedElement) {
180+
// Check for 2-level list encoding
181+
return repeatedElement.isPrimitive()
182+
|| repeatedElement.asGroupType().getFieldCount() > 1
183+
|| repeatedElement.getName().equals("array")
184+
|| repeatedElement.getName().equals(list.getName() + "_tuple");
185+
}
186+
187+
private static Type visitMap(GroupType map, Visitor visitor) {
188+
if (map.getFieldCount() != 1) {
189+
throw new IllegalArgumentException("Invalid map: " + map);
190+
}
191+
192+
GroupType keyValue = map.getType(0).asGroupType();
193+
if (keyValue.getFieldCount() != 2) {
194+
throw new IllegalArgumentException("Invalid map key-value: " + keyValue);
195+
}
196+
197+
Type key = keyValue.getType(0);
198+
Type value = keyValue.getType(1);
199+
200+
visitor.beforeKeyField(key);
201+
Type keyResult;
202+
try {
203+
keyResult = visit(key, visitor);
204+
} finally {
205+
visitor.afterField(key);
206+
}
207+
208+
visitor.beforeValueField(value);
209+
Type valueResult;
210+
try {
211+
valueResult = visit(value, visitor);
212+
} finally {
213+
visitor.afterField(value);
214+
}
215+
216+
return visitor.map(map, keyResult, valueResult);
217+
}
218+
219+
private static class Visitor {
220+
private final NameMapping nameMapping;
221+
private final Deque<String> fieldNames = new ArrayDeque<>();
222+
223+
Visitor(NameMapping nameMapping) {
224+
this.nameMapping = nameMapping;
225+
}
226+
227+
Type message(MessageType message, List<Type> fields) {
228+
Types.MessageTypeBuilder builder = Types.buildMessage();
229+
fields.stream().filter(Objects::nonNull).forEach(builder::addField);
230+
return builder.named(message.getName());
231+
}
232+
233+
Type struct(GroupType struct, List<Type> types) {
234+
MappedField field = nameMapping.find(currentPath());
235+
List<Type> actualTypes = types.stream().filter(Objects::nonNull).collect(Collectors.toList());
236+
Type structType = struct.withNewFields(actualTypes);
237+
return field == null ? structType : structType.withId(field.id());
238+
}
239+
240+
Type list(GroupType list, Type elementType) {
241+
if (elementType == null) {
242+
throw new IllegalArgumentException("List type must have element field");
243+
}
244+
245+
Type listElement = determineListElementType(list);
246+
MappedField field = nameMapping.find(currentPath());
247+
248+
Types.GroupBuilder<GroupType> listBuilder =
249+
Types.buildGroup(list.getRepetition()).as(LogicalTypeAnnotation.listType());
250+
if (listElement.isRepetition(Type.Repetition.REPEATED)) {
251+
listBuilder.addFields(elementType);
252+
} else {
253+
listBuilder.repeatedGroup().addFields(elementType).named(list.getFieldName(0));
254+
}
255+
Type listType = listBuilder.named(list.getName());
256+
257+
return field == null ? listType : listType.withId(field.id());
258+
}
259+
260+
Type map(GroupType map, Type keyType, Type valueType) {
261+
if (keyType == null || valueType == null) {
262+
throw new IllegalArgumentException("Map type must have both key field and value field");
263+
}
264+
265+
MappedField field = nameMapping.find(currentPath());
266+
Type mapType =
267+
Types.buildGroup(map.getRepetition())
268+
.as(LogicalTypeAnnotation.mapType())
269+
.repeatedGroup()
270+
.addFields(keyType, valueType)
271+
.named(map.getFieldName(0))
272+
.named(map.getName());
273+
274+
return field == null ? mapType : mapType.withId(field.id());
275+
}
276+
277+
Type primitive(PrimitiveType primitive) {
278+
MappedField field = nameMapping.find(currentPath());
279+
return field == null ? primitive : primitive.withId(field.id());
280+
}
281+
282+
void beforeField(Type type) {
283+
fieldNames.push(type.getName());
284+
}
285+
286+
void afterField(Type type) {
287+
fieldNames.pop();
288+
}
289+
290+
void beforeElementField(Type element) {
291+
fieldNames.push(LIST_ELEMENT_NAME);
292+
}
293+
294+
void beforeKeyField(Type key) {
295+
fieldNames.push(MAP_KEY_NAME);
296+
}
297+
298+
void beforeValueField(Type value) {
299+
fieldNames.push(MAP_VALUE_NAME);
300+
}
301+
302+
private String[] currentPath() {
303+
List<String> path = new ArrayList<>(fieldNames);
304+
java.util.Collections.reverse(path);
305+
return path.toArray(new String[0]);
306+
}
307+
308+
private static Type determineListElementType(GroupType list) {
309+
Type repeated = list.getType(0);
310+
if (isElementType(list, repeated)) {
311+
return repeated;
312+
}
313+
return repeated.asGroupType().getType(0);
314+
}
315+
}
316+
}

xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/extensions/HoodieAvroWriteSupportWithFieldIds.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.iceberg.mapping.MappedField;
3434
import org.apache.iceberg.mapping.MappedFields;
3535
import org.apache.iceberg.mapping.NameMapping;
36-
import org.apache.iceberg.parquet.ParquetSchemaUtil;
3736

3837
import org.apache.xtable.hudi.idtracking.IdTracker;
3938
import org.apache.xtable.hudi.idtracking.models.IdMapping;
@@ -88,7 +87,7 @@ private static MessageType addFieldIdsToParquetSchema(
8887
idMappings.stream()
8988
.map(HoodieAvroWriteSupportWithFieldIds::toMappedField)
9089
.collect(Collectors.toList()));
91-
return ParquetSchemaUtil.applyNameMapping(messageType, nameMapping);
90+
return ApplyNameMappingForParquet.applyNameMapping(messageType, nameMapping);
9291
})
9392
.orElse(messageType);
9493
}

0 commit comments

Comments
 (0)