Skip to content

Commit 5bbe19f

Browse files
authored
Flink: Nanosecond gaps in SortKeySerializer and ColumnStatsWatermarkExtractor (#16268)
1 parent 11e582b commit 5bbe19f

5 files changed

Lines changed: 86 additions & 6 deletions

File tree

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ public void serialize(SortKey record, DataOutputView target) throws IOException
159159
case LONG:
160160
case TIME:
161161
case TIMESTAMP:
162+
case TIMESTAMP_NANO:
162163
target.writeLong(record.get(i, Long.class));
163164
break;
164165
case FLOAT:
@@ -237,6 +238,7 @@ public SortKey deserialize(SortKey reuse, DataInputView source) throws IOExcepti
237238
case LONG:
238239
case TIME:
239240
case TIMESTAMP:
241+
case TIMESTAMP_NANO:
240242
reuse.set(i, source.readLong());
241243
break;
242244
case FLOAT:

flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,24 @@ public ColumnStatsWatermarkExtractor(
5353
Types.NestedField field = schema.findField(eventTimeFieldName);
5454
TypeID typeID = field.type().typeId();
5555
Preconditions.checkArgument(
56-
typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
57-
"Found %s, expected a LONG or TIMESTAMP column for watermark generation.",
56+
typeID.equals(TypeID.LONG)
57+
|| typeID.equals(TypeID.TIMESTAMP)
58+
|| typeID.equals(TypeID.TIMESTAMP_NANO),
59+
"Found %s, expected a LONG, TIMESTAMP, or TIMESTAMP_NANO column for watermark generation.",
5860
typeID);
5961
this.eventTimeFieldId = field.fieldId();
6062
this.eventTimeFieldName = eventTimeFieldName;
61-
// Use the timeUnit only for Long columns.
62-
this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS;
63+
// Use the timeUnit only for Long columns; timestamp columns store fixed-precision longs.
64+
switch (typeID) {
65+
case LONG:
66+
this.timeUnit = timeUnit;
67+
break;
68+
case TIMESTAMP_NANO:
69+
this.timeUnit = TimeUnit.NANOSECONDS;
70+
break;
71+
default:
72+
this.timeUnit = TimeUnit.MICROSECONDS;
73+
}
6374
}
6475

6576
@VisibleForTesting

flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ public class TestRowDataPartitionKey {
5757
Types.NestedField.required(12, "decimalType2", Types.DecimalType.of(10, 5)),
5858
Types.NestedField.required(13, "decimalType3", Types.DecimalType.of(38, 19)),
5959
Types.NestedField.required(14, "floatType", Types.FloatType.get()),
60-
Types.NestedField.required(15, "doubleType", Types.DoubleType.get()));
60+
Types.NestedField.required(15, "doubleType", Types.DoubleType.get()),
61+
Types.NestedField.required(
62+
16, "timestampNanoWithoutZone", Types.TimestampNanoType.withoutZone()),
63+
Types.NestedField.required(
64+
17, "timestampNanoWithZone", Types.TimestampNanoType.withZone()));
6165

6266
private static final List<String> SUPPORTED_PRIMITIVES =
6367
SCHEMA.asStruct().fields().stream().map(Types.NestedField::name).collect(Collectors.toList());
@@ -248,4 +252,47 @@ public void testNestedPartitionValues() {
248252
}
249253
}
250254
}
255+
256+
@Test
257+
public void testTimestampNanoPartitionTransforms() {
258+
RowType rowType = FlinkSchemaUtil.convert(SCHEMA);
259+
RowDataWrapper rowWrapper = new RowDataWrapper(rowType, SCHEMA.asStruct());
260+
InternalRecordWrapper recordWrapper = new InternalRecordWrapper(SCHEMA.asStruct());
261+
262+
List<Record> records = RandomGenericData.generate(SCHEMA, 10, 1995);
263+
List<RowData> rows = Lists.newArrayList(RandomRowData.convert(SCHEMA, records));
264+
265+
String[] columns = {"timestampNanoWithoutZone", "timestampNanoWithZone"};
266+
for (String column : columns) {
267+
List<PartitionSpec> specs =
268+
Lists.newArrayList(
269+
PartitionSpec.builderFor(SCHEMA).identity(column).build(),
270+
PartitionSpec.builderFor(SCHEMA).year(column).build(),
271+
PartitionSpec.builderFor(SCHEMA).month(column).build(),
272+
PartitionSpec.builderFor(SCHEMA).day(column).build(),
273+
PartitionSpec.builderFor(SCHEMA).hour(column).build(),
274+
PartitionSpec.builderFor(SCHEMA).bucket(column, 16).build());
275+
276+
for (PartitionSpec spec : specs) {
277+
Class<?>[] javaClasses = spec.javaClasses();
278+
PartitionKey pk = new PartitionKey(spec, SCHEMA);
279+
PartitionKey expectedPK = new PartitionKey(spec, SCHEMA);
280+
281+
for (int j = 0; j < rows.size(); j++) {
282+
pk.partition(rowWrapper.wrap(rows.get(j)));
283+
expectedPK.partition(recordWrapper.wrap(records.get(j)));
284+
285+
assertThat(pk.size()).isEqualTo(1);
286+
assertThat(pk.get(0, javaClasses[0]))
287+
.as(
288+
"Partition with column "
289+
+ column
290+
+ " and spec "
291+
+ spec
292+
+ " should match Iceberg-side computation")
293+
.isEqualTo(expectedPK.get(0, javaClasses[0]));
294+
}
295+
}
296+
}
297+
}
251298
}

flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ protected SortOrder sortOrder() {
5454
.sortBy(Expressions.bucket("uuid_field", 16), SortDirection.ASC, NullOrder.NULLS_FIRST)
5555
.sortBy(Expressions.hour("ts_with_zone_field"), SortDirection.ASC, NullOrder.NULLS_FIRST)
5656
.sortBy(Expressions.day("ts_without_zone_field"), SortDirection.ASC, NullOrder.NULLS_FIRST)
57+
.asc("ts_ns_with_zone_field")
58+
.sortBy(
59+
Expressions.hour("ts_ns_without_zone_field"), SortDirection.ASC, NullOrder.NULLS_FIRST)
5760
// can not test HeapByteBuffer due to equality test inside SerializerTestBase
5861
// .sortBy(Expressions.truncate("binary_field", 2), SortDirection.ASC,
5962
// NullOrder.NULLS_FIRST)

flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ public class TestColumnStatsWatermarkExtractor {
6161
required(3, "long_column", Types.LongType.get()),
6262
required(4, "string_column", Types.StringType.get()));
6363

64+
// Separate schema for nanosecond columns: TIMESTAMP_NANO requires table format v3, which the
65+
// HadoopTableExtension above does not provision. Tested via constructor preconditions only.
66+
private static final Schema NANO_SCHEMA =
67+
new Schema(
68+
required(1, "timestamp_ns_column", Types.TimestampNanoType.withoutZone()),
69+
required(2, "timestamptz_ns_column", Types.TimestampNanoType.withZone()));
70+
6471
private static final List<List<Record>> TEST_RECORDS =
6572
ImmutableList.of(
6673
RandomGenericData.generate(SCHEMA, 3, 2L), RandomGenericData.generate(SCHEMA, 3, 19L));
@@ -147,7 +154,17 @@ public void testWrongColumn() {
147154
assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, columnName, null))
148155
.isInstanceOf(IllegalArgumentException.class)
149156
.hasMessageContaining(
150-
"Found STRING, expected a LONG or TIMESTAMP column for watermark generation.");
157+
"Found STRING, expected a LONG, TIMESTAMP, or TIMESTAMP_NANO column for watermark generation.");
158+
}
159+
160+
@TestTemplate
161+
public void testTimestampNanoAccepted() {
162+
// Run the precondition check exactly once across the parameterized matrix.
163+
assumeThat(columnName).isEqualTo("timestamp_column");
164+
165+
// Both flavours of TIMESTAMP_NANO must be accepted by the extractor's precondition check.
166+
new ColumnStatsWatermarkExtractor(NANO_SCHEMA, "timestamp_ns_column", null);
167+
new ColumnStatsWatermarkExtractor(NANO_SCHEMA, "timestamptz_ns_column", null);
151168
}
152169

153170
@TestTemplate

0 commit comments

Comments
 (0)