Skip to content

Commit 52f3da8

Browse files
committed
Add tests for Iceberg 1.7.2 API migration
- TestAdaptHiveParquetSchemaUtil: verify pruneColumns with TypeWithSchemaVisitor correctly prunes columns using expected schema - TestAdaptHiveParquetUtil: verify INT96 timestamp min/max statistics are correctly re-ordered when byte-wise ordering disagrees with chronological ordering Signed-off-by: Jiwon Park <jpark92@outlook.kr>
1 parent fdefbf2 commit 52f3da8

2 files changed

Lines changed: 374 additions & 0 deletions

File tree

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.parquet;
20+
21+
import static org.apache.iceberg.types.Types.NestedField.optional;
22+
import static org.apache.iceberg.types.Types.NestedField.required;
23+
24+
import org.apache.iceberg.Schema;
25+
import org.apache.iceberg.types.Types;
26+
import org.apache.parquet.schema.MessageType;
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
/**
31+
* Tests for {@link AdaptHiveParquetSchemaUtil}, specifically verifying that {@code pruneColumns}
32+
* uses {@link TypeWithSchemaVisitor} (Iceberg 1.7+) to correctly prune columns using the expected
33+
* schema. Prior to this migration the method used {@link ParquetTypeVisitor} which did not pass the
34+
* expected schema, causing incorrect results for schemas whose column order differed from the file
35+
* schema.
36+
*/
37+
public class TestAdaptHiveParquetSchemaUtil {
38+
39+
@Test
40+
public void testPruneColumnsSelectsSubset() {
41+
// Full file schema with 3 columns
42+
Schema fullSchema =
43+
new Schema(
44+
required(1, "id", Types.IntegerType.get()),
45+
optional(2, "name", Types.StringType.get()),
46+
optional(3, "ts", Types.TimestampType.withoutZone()));
47+
48+
MessageType fileSchema = AdaptHiveParquetSchemaUtil.convert(fullSchema, "test");
49+
50+
// Expected schema requests only 2 of the 3 columns
51+
Schema expectedSchema =
52+
new Schema(
53+
required(1, "id", Types.IntegerType.get()),
54+
optional(3, "ts", Types.TimestampType.withoutZone()));
55+
56+
MessageType pruned = AdaptHiveParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
57+
58+
Assert.assertEquals(
59+
"Pruned schema should contain only the requested columns", 2, pruned.getFieldCount());
60+
Assert.assertTrue("Pruned schema should contain 'id'", pruned.containsField("id"));
61+
Assert.assertTrue("Pruned schema should contain 'ts'", pruned.containsField("ts"));
62+
Assert.assertFalse("Pruned schema should not contain 'name'", pruned.containsField("name"));
63+
}
64+
65+
@Test
66+
public void testPruneColumnsWithNestedStruct() {
67+
// File schema with a nested struct
68+
Schema fullSchema =
69+
new Schema(
70+
required(1, "id", Types.IntegerType.get()),
71+
optional(
72+
2,
73+
"data",
74+
Types.StructType.of(
75+
required(3, "x", Types.IntegerType.get()),
76+
optional(4, "y", Types.StringType.get()))),
77+
optional(5, "extra", Types.StringType.get()));
78+
79+
MessageType fileSchema = AdaptHiveParquetSchemaUtil.convert(fullSchema, "test");
80+
81+
// Request only id and the nested struct — the TypeWithSchemaVisitor approach
82+
// correctly preserves nested fields using the expectedSchema tree.
83+
Schema expectedSchema =
84+
new Schema(
85+
required(1, "id", Types.IntegerType.get()),
86+
optional(
87+
2,
88+
"data",
89+
Types.StructType.of(
90+
required(3, "x", Types.IntegerType.get()),
91+
optional(4, "y", Types.StringType.get()))));
92+
93+
MessageType pruned = AdaptHiveParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
94+
95+
Assert.assertEquals(2, pruned.getFieldCount());
96+
Assert.assertTrue(pruned.containsField("id"));
97+
Assert.assertTrue(pruned.containsField("data"));
98+
Assert.assertFalse(pruned.containsField("extra"));
99+
}
100+
101+
@Test
102+
public void testPruneColumnsSingleColumn() {
103+
Schema fullSchema =
104+
new Schema(
105+
required(1, "id", Types.IntegerType.get()),
106+
optional(2, "name", Types.StringType.get()),
107+
optional(3, "value", Types.DoubleType.get()));
108+
109+
MessageType fileSchema = AdaptHiveParquetSchemaUtil.convert(fullSchema, "test");
110+
111+
Schema expectedSchema = new Schema(optional(2, "name", Types.StringType.get()));
112+
113+
MessageType pruned = AdaptHiveParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
114+
115+
Assert.assertEquals(1, pruned.getFieldCount());
116+
Assert.assertTrue(pruned.containsField("name"));
117+
}
118+
}
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.parquet;
20+
21+
import static org.apache.iceberg.types.Types.NestedField.required;
22+
23+
import org.apache.iceberg.FieldMetrics;
24+
import org.apache.iceberg.Metrics;
25+
import org.apache.iceberg.MetricsConfig;
26+
import org.apache.iceberg.Schema;
27+
import org.apache.iceberg.SchemaParser;
28+
import org.apache.iceberg.types.Conversions;
29+
import org.apache.iceberg.types.Types;
30+
import org.apache.parquet.column.Encoding;
31+
import org.apache.parquet.column.statistics.Statistics;
32+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
33+
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
34+
import org.apache.parquet.hadoop.metadata.ColumnPath;
35+
import org.apache.parquet.hadoop.metadata.FileMetaData;
36+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
37+
import org.apache.parquet.schema.MessageType;
38+
import org.apache.parquet.schema.PrimitiveType;
39+
import org.junit.Assert;
40+
import org.junit.Test;
41+
42+
import java.nio.ByteBuffer;
43+
import java.nio.ByteOrder;
44+
import java.util.Collections;
45+
import java.util.HashMap;
46+
import java.util.HashSet;
47+
import java.util.Map;
48+
import java.util.Set;
49+
import java.util.concurrent.TimeUnit;
50+
import java.util.stream.Stream;
51+
52+
/**
53+
* Tests for {@link AdaptHiveParquetUtil#footerMetrics}, verifying that INT96 timestamp statistics
54+
* are correctly re-ordered.
55+
*
56+
* <p>Parquet stores INT96 timestamps as 12 bytes (8-byte LE nanoseconds-of-day + 4-byte LE Julian
57+
* day). Byte-wise comparison compares the nanos first, which does not match chronological order
58+
* when timestamps cross day boundaries. Without the fix, stats.genericGetMin() could be
59+
* chronologically *later* than stats.genericGetMax(), causing inverted lower/upper bounds in
60+
* Iceberg metrics.
61+
*/
62+
public class TestAdaptHiveParquetUtil {
63+
64+
private static final int JULIAN_EPOCH_OFFSET = 2_440_588;
65+
66+
/**
67+
* Encodes a unix-epoch-millisecond timestamp into a 12-byte INT96 Parquet binary (LE
68+
* nanoseconds-of-day + LE Julian day).
69+
*/
70+
private static byte[] toInt96Bytes(long epochMillis) {
71+
long epochDay = Math.floorDiv(epochMillis, TimeUnit.DAYS.toMillis(1));
72+
long milliOfDay = epochMillis - epochDay * TimeUnit.DAYS.toMillis(1);
73+
long nanoOfDay = milliOfDay * 1_000_000L;
74+
int julianDay = (int) (epochDay + JULIAN_EPOCH_OFFSET);
75+
76+
ByteBuffer buf = ByteBuffer.allocate(12).order(ByteOrder.LITTLE_ENDIAN);
77+
buf.putLong(nanoOfDay);
78+
buf.putInt(julianDay);
79+
return buf.array();
80+
}
81+
82+
/**
83+
* Verifies that footerMetrics swaps INT96 min/max when byte-wise ordering disagrees with
84+
* chronological ordering.
85+
*
86+
* <p>We construct two timestamps crossing a day boundary:
87+
*
88+
* <ul>
89+
* <li>earlier: 2020-01-01 23:59:59.999 UTC (large nanos-of-day, end of day)
90+
* <li>later: 2020-01-02 00:00:00.001 UTC (small nanos-of-day, start of next day)
91+
* </ul>
92+
*
93+
* In INT96 little-endian format, the nanos-of-day occupy the first 8 bytes. The earlier timestamp
94+
* has a much larger nanos value (86399999 ms → ~8.6e13 nanos), so byte-wise it appears "greater"
95+
* than the later timestamp (1 ms → 1e6 nanos). Parquet therefore sets stats.genericGetMin() =
96+
* later, stats.genericGetMax() = earlier. The fix detects this inversion for INT96 columns and
97+
* swaps them.
98+
*/
99+
@Test
100+
public void testInt96MinMaxSwappedWhenByteOrderReversed() {
101+
// 2020-01-01 23:59:59.999 UTC — large nanos-of-day (end of day)
102+
long earlierMillis = 1577923199999L;
103+
// 2020-01-02 00:00:00.001 UTC — small nanos-of-day (start of next day)
104+
long laterMillis = 1577923200001L;
105+
106+
byte[] earlierBytes = toInt96Bytes(earlierMillis);
107+
byte[] laterBytes = toInt96Bytes(laterMillis);
108+
109+
// Verify our premise: byte-wise, laterBytes < earlierBytes
110+
// (because nanos-of-day for 01:00 < nanos-of-day for 23:00, and nanos come first in LE)
111+
Assert.assertTrue(
112+
"Precondition: byte-wise earlier > later for these INT96 values",
113+
compareBytewise(earlierBytes, laterBytes) > 0);
114+
115+
// Build Parquet schema with INT96 column carrying Iceberg field id
116+
MessageType parquetSchema =
117+
new MessageType(
118+
"test",
119+
Collections.singletonList(
120+
org.apache.parquet.schema.Types.required(PrimitiveType.PrimitiveTypeName.INT96)
121+
.id(1)
122+
.named("ts")));
123+
124+
// Build statistics: Parquet's byte-wise min = laterBytes, max = earlierBytes
125+
Statistics<?> stats =
126+
Statistics.getBuilderForReading(parquetSchema.getType("ts").asPrimitiveType())
127+
.withMin(laterBytes)
128+
.withMax(earlierBytes)
129+
.withNumNulls(0)
130+
.build();
131+
132+
// Column chunk metadata
133+
Set<Encoding> encodings = new HashSet<>();
134+
encodings.add(Encoding.PLAIN);
135+
ColumnChunkMetaData columnMeta =
136+
ColumnChunkMetaData.get(
137+
ColumnPath.get("ts"),
138+
parquetSchema.getType("ts").asPrimitiveType(),
139+
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED,
140+
null,
141+
encodings,
142+
stats,
143+
0L,
144+
0L,
145+
1L,
146+
100L,
147+
100L);
148+
149+
BlockMetaData block = new BlockMetaData();
150+
block.addColumn(columnMeta);
151+
block.setRowCount(1);
152+
153+
Schema icebergSchema = new Schema(required(1, "ts", Types.TimestampType.withoutZone()));
154+
Map<String, String> keyValueMetadata = new HashMap<>();
155+
keyValueMetadata.put("iceberg.schema", SchemaParser.toJson(icebergSchema));
156+
FileMetaData fileMetaData = new FileMetaData(parquetSchema, keyValueMetadata, "test");
157+
ParquetMetadata metadata = new ParquetMetadata(fileMetaData, Collections.singletonList(block));
158+
159+
// Call footerMetrics
160+
Metrics metrics =
161+
AdaptHiveParquetUtil.footerMetrics(
162+
metadata, Stream.<FieldMetrics<?>>empty(), MetricsConfig.getDefault(), icebergSchema);
163+
164+
// Extract lower and upper bounds for field 1 (ts)
165+
ByteBuffer lowerBuf = metrics.lowerBounds().get(1);
166+
ByteBuffer upperBuf = metrics.upperBounds().get(1);
167+
168+
Assert.assertNotNull("Lower bound should exist for ts", lowerBuf);
169+
Assert.assertNotNull("Upper bound should exist for ts", upperBuf);
170+
171+
long lowerMicros = Conversions.fromByteBuffer(Types.TimestampType.withoutZone(), lowerBuf);
172+
long upperMicros = Conversions.fromByteBuffer(Types.TimestampType.withoutZone(), upperBuf);
173+
174+
Assert.assertTrue(
175+
"Lower bound (earlier timestamp) must be <= upper bound (later timestamp), "
176+
+ "but got lower="
177+
+ lowerMicros
178+
+ ", upper="
179+
+ upperMicros,
180+
lowerMicros <= upperMicros);
181+
}
182+
183+
/**
184+
* Verifies that non-INT96 columns are not affected by the swap logic — their min/max follow
185+
* Parquet's original order.
186+
*/
187+
@Test
188+
public void testNonInt96ColumnMinMaxPreserved() {
189+
PrimitiveType int32Type =
190+
org.apache.parquet.schema.Types.required(PrimitiveType.PrimitiveTypeName.INT32)
191+
.id(1)
192+
.named("id");
193+
MessageType parquetSchema = new MessageType("test", Collections.singletonList(int32Type));
194+
195+
// min = 10, max = 42
196+
byte[] minBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(10).array();
197+
byte[] maxBytes = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(42).array();
198+
199+
Statistics<?> stats =
200+
Statistics.getBuilderForReading(int32Type)
201+
.withMin(minBytes)
202+
.withMax(maxBytes)
203+
.withNumNulls(0)
204+
.build();
205+
206+
Set<Encoding> encodings = new HashSet<>();
207+
encodings.add(Encoding.PLAIN);
208+
ColumnChunkMetaData columnMeta =
209+
ColumnChunkMetaData.get(
210+
ColumnPath.get("id"),
211+
int32Type,
212+
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED,
213+
null,
214+
encodings,
215+
stats,
216+
0L,
217+
0L,
218+
2L,
219+
100L,
220+
100L);
221+
222+
BlockMetaData block = new BlockMetaData();
223+
block.addColumn(columnMeta);
224+
block.setRowCount(2);
225+
226+
Schema icebergSchema = new Schema(required(1, "id", Types.IntegerType.get()));
227+
Map<String, String> keyValueMetadata = new HashMap<>();
228+
keyValueMetadata.put("iceberg.schema", SchemaParser.toJson(icebergSchema));
229+
FileMetaData fileMetaData = new FileMetaData(parquetSchema, keyValueMetadata, "test");
230+
ParquetMetadata metadata = new ParquetMetadata(fileMetaData, Collections.singletonList(block));
231+
232+
Metrics metrics =
233+
AdaptHiveParquetUtil.footerMetrics(
234+
metadata, Stream.<FieldMetrics<?>>empty(), MetricsConfig.getDefault(), icebergSchema);
235+
236+
ByteBuffer lowerBuf = metrics.lowerBounds().get(1);
237+
ByteBuffer upperBuf = metrics.upperBounds().get(1);
238+
Assert.assertNotNull(lowerBuf);
239+
Assert.assertNotNull(upperBuf);
240+
241+
int lower = Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBuf);
242+
int upper = Conversions.fromByteBuffer(Types.IntegerType.get(), upperBuf);
243+
Assert.assertEquals(10, lower);
244+
Assert.assertEquals(42, upper);
245+
}
246+
247+
private static int compareBytewise(byte[] a, byte[] b) {
248+
for (int i = 0; i < Math.min(a.length, b.length); i++) {
249+
int cmp = Byte.compareUnsigned(a[i], b[i]);
250+
if (cmp != 0) {
251+
return cmp;
252+
}
253+
}
254+
return Integer.compare(a.length, b.length);
255+
}
256+
}

0 commit comments

Comments
 (0)