Skip to content

Commit 268ae1a

Browse files
authored
[IcebergIO] Support hash distribution mode when writing rows (#38061)
* group rows by partition before writing * add documentation; fix tests; extend to YAML * add BeamRowWrapper test class * lint * spotless * spotless * fix lint * add BeamRowWrapper javadoc; add an autosharding toggle option; address other comments * cleanup * cleanup * volatile map * add to changes.md * format * format
1 parent 010c52f commit 268ae1a

14 files changed

Lines changed: 1483 additions & 29 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 5
3+
"modification": 1
44
}

CHANGES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666

6767
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
6868
* IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)).
69+
* IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/issues/38061))).
6970

7071
## New Features / Improvements
7172

@@ -2434,4 +2435,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss
24342435

24352436
## Highlights
24362437

2437-
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
2438+
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.apache.beam.sdk.coders.KvCoder;
25+
import org.apache.beam.sdk.coders.RowCoder;
26+
import org.apache.beam.sdk.transforms.DoFn;
27+
import org.apache.beam.sdk.transforms.PTransform;
28+
import org.apache.beam.sdk.transforms.ParDo;
29+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
30+
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
31+
import org.apache.beam.sdk.values.KV;
32+
import org.apache.beam.sdk.values.PCollection;
33+
import org.apache.beam.sdk.values.Row;
34+
import org.apache.beam.sdk.values.ValueInSingleWindow;
35+
import org.apache.iceberg.PartitionKey;
36+
import org.apache.iceberg.PartitionSpec;
37+
import org.apache.iceberg.Schema;
38+
import org.apache.iceberg.catalog.TableIdentifier;
39+
import org.apache.iceberg.exceptions.NoSuchTableException;
40+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
41+
import org.checkerframework.checker.nullness.qual.Nullable;
42+
import org.joda.time.Instant;
43+
44+
/**
45+
* Assigns destination metadata for each input record.
46+
*
47+
* <p>The output will have the format { {destination, partition}, data }
48+
*/
49+
class AssignDestinationsAndPartitions
50+
extends PTransform<PCollection<Row>, PCollection<KV<Row, Row>>> {
51+
52+
private final DynamicDestinations dynamicDestinations;
53+
private final IcebergCatalogConfig catalogConfig;
54+
static final String DESTINATION = "destination";
55+
static final String PARTITION = "partition";
56+
static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
57+
org.apache.beam.sdk.schemas.Schema.builder()
58+
.addStringField(DESTINATION)
59+
.addStringField(PARTITION)
60+
.build();
61+
62+
public AssignDestinationsAndPartitions(
63+
DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
64+
this.dynamicDestinations = dynamicDestinations;
65+
this.catalogConfig = catalogConfig;
66+
}
67+
68+
@Override
69+
public PCollection<KV<Row, Row>> expand(PCollection<Row> input) {
70+
return input
71+
.apply(ParDo.of(new AssignDoFn(dynamicDestinations, catalogConfig)))
72+
.setCoder(
73+
KvCoder.of(
74+
RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema())));
75+
}
76+
77+
static class AssignDoFn extends DoFn<Row, KV<Row, Row>> {
78+
private transient @MonotonicNonNull Map<String, PartitionKey> partitionKeys;
79+
private transient @MonotonicNonNull Map<String, BeamRowWrapper> wrappers;
80+
private final DynamicDestinations dynamicDestinations;
81+
private final IcebergCatalogConfig catalogConfig;
82+
83+
AssignDoFn(DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
84+
this.dynamicDestinations = dynamicDestinations;
85+
this.catalogConfig = catalogConfig;
86+
}
87+
88+
@Setup
89+
public void setup() {
90+
this.wrappers = new HashMap<>();
91+
this.partitionKeys = new HashMap<>();
92+
}
93+
94+
@ProcessElement
95+
public void processElement(
96+
@Element Row element,
97+
BoundedWindow window,
98+
PaneInfo paneInfo,
99+
@Timestamp Instant timestamp,
100+
OutputReceiver<KV<Row, Row>> out) {
101+
String tableIdentifier =
102+
dynamicDestinations.getTableStringIdentifier(
103+
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
104+
Row data = dynamicDestinations.getData(element);
105+
106+
@Nullable PartitionKey partitionKey = checkStateNotNull(partitionKeys).get(tableIdentifier);
107+
@Nullable BeamRowWrapper wrapper = checkStateNotNull(wrappers).get(tableIdentifier);
108+
if (partitionKey == null || wrapper == null) {
109+
PartitionSpec spec = PartitionSpec.unpartitioned();
110+
Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema());
111+
@Nullable
112+
IcebergTableCreateConfig createConfig =
113+
dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig();
114+
if (createConfig != null && createConfig.getPartitionFields() != null) {
115+
spec =
116+
PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema());
117+
} else {
118+
try {
119+
// see if table already exists with a spec
120+
// TODO(https://github.com/apache/beam/issues/38337): improve this by periodically
121+
// refreshing the table to fetch updated specs
122+
spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
123+
} catch (NoSuchTableException ignored) {
124+
// no partition to apply
125+
}
126+
}
127+
partitionKey = new PartitionKey(spec, schema);
128+
wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct());
129+
checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey);
130+
checkStateNotNull(wrappers).put(tableIdentifier, wrapper);
131+
}
132+
partitionKey.partition(wrapper.wrap(data));
133+
String partitionPath = partitionKey.toPath();
134+
135+
Row destAndPartition =
136+
Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build();
137+
out.output(KV.of(destAndPartition, data));
138+
}
139+
}
140+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
22+
import java.lang.reflect.Array;
23+
import java.math.BigDecimal;
24+
import java.nio.ByteBuffer;
25+
import java.time.Instant;
26+
import java.time.LocalDate;
27+
import java.time.LocalDateTime;
28+
import java.time.LocalTime;
29+
import java.util.concurrent.TimeUnit;
30+
import org.apache.beam.sdk.schemas.Schema;
31+
import org.apache.beam.sdk.schemas.Schema.FieldType;
32+
import org.apache.beam.sdk.schemas.logicaltypes.Date;
33+
import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
34+
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
35+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
36+
import org.apache.beam.sdk.schemas.logicaltypes.Time;
37+
import org.apache.beam.sdk.values.Row;
38+
import org.apache.iceberg.StructLike;
39+
import org.apache.iceberg.types.Type;
40+
import org.apache.iceberg.types.Types;
41+
import org.apache.iceberg.util.DateTimeUtil;
42+
import org.apache.iceberg.util.UUIDUtil;
43+
import org.checkerframework.checker.nullness.qual.Nullable;
44+
45+
/**
46+
* A wrapper that adapts a Beam {@link Row} to Iceberg's {@link StructLike} interface.
47+
*
48+
* <p>This class allows Beam rows to be processed by Iceberg internal components (like partition
49+
* keys or writers) without requiring a full conversion into Iceberg's internal Record format. It
50+
* handles the mapping between Beam's {@link Schema} and Iceberg's {@link Types.StructType},
51+
* including complex type conversions for timestamps, logical types, and UUIDs.
52+
*
53+
* <p><b>Note:</b> This implementation is <b>read-only</b>. Calls to {@link #set(int, Object)} will
54+
* throw an {@link UnsupportedOperationException}.
55+
*/
56+
public class BeamRowWrapper implements StructLike {
57+
58+
private final FieldType[] types;
59+
private final @Nullable PositionalGetter<?>[] getters;
60+
private @Nullable Row row = null;
61+
62+
/** Constructs a new wrapper and pre-computes the mapping between Beam and Iceberg fields. */
63+
public BeamRowWrapper(Schema schema, Types.StructType struct) {
64+
int size = schema.getFieldCount();
65+
66+
types = (FieldType[]) Array.newInstance(FieldType.class, size);
67+
getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size);
68+
69+
for (int i = 0; i < size; i++) {
70+
types[i] = schema.getField(i).getType();
71+
getters[i] = buildGetter(types[i], struct.fields().get(i).type());
72+
}
73+
}
74+
75+
/**
76+
* Sets the current Beam {@link Row} to be wrapped. This method allows the wrapper to be reused
77+
* across different rows to minimize object allocation.
78+
*/
79+
public BeamRowWrapper wrap(@Nullable Row row) {
80+
this.row = row;
81+
return this;
82+
}
83+
84+
@Override
85+
public int size() {
86+
return types.length;
87+
}
88+
89+
/**
90+
* Retrieves a field value from the wrapped row, performing any necessary type conversion to match
91+
* Iceberg's internal expectations (e.g., converting Timestamps to microseconds).
92+
*/
93+
@Override
94+
public <T> @Nullable T get(int pos, Class<T> javaClass) {
95+
if (row == null || row.getValue(pos) == null) {
96+
return null;
97+
} else if (getters[pos] != null) {
98+
return javaClass.cast(getters[pos].get(checkStateNotNull(row), pos));
99+
}
100+
101+
return javaClass.cast(checkStateNotNull(row).getValue(pos));
102+
}
103+
104+
@Override
105+
public <T> void set(int pos, T value) {
106+
throw new UnsupportedOperationException(
107+
"Could not set a field in the BeamRowWrapper because rowData is read-only");
108+
}
109+
110+
private interface PositionalGetter<T> {
111+
T get(Row data, int pos);
112+
}
113+
114+
/**
115+
* Factory method to create a getter that handles type-specific conversion logic.
116+
*
117+
* <p>Handles special cases:
118+
*
119+
* <ul>
120+
* <li>UUID: Converts {@code byte[]} to Iceberg's UUID representation.
121+
* <li>DateTime: Converts Beam {@code DateTime} or logical types to microsecond timestamps.
122+
* <li>Nested Rows: Recursively wraps nested structures in a new {@code BeamRowWrapper}.
123+
* </ul>
124+
*/
125+
private static @Nullable PositionalGetter<?> buildGetter(FieldType beamType, Type icebergType) {
126+
switch (beamType.getTypeName()) {
127+
case BYTE:
128+
return Row::getByte;
129+
case INT16:
130+
return Row::getInt16;
131+
case STRING:
132+
return Row::getString;
133+
case BYTES:
134+
return (row, pos) -> {
135+
byte[] bytes = checkStateNotNull(row.getBytes(pos));
136+
if (Type.TypeID.UUID == icebergType.typeId()) {
137+
return UUIDUtil.convert(bytes);
138+
} else {
139+
return ByteBuffer.wrap(bytes);
140+
}
141+
};
142+
case DECIMAL:
143+
return Row::getDecimal;
144+
case DATETIME:
145+
return (row, pos) ->
146+
TimeUnit.MILLISECONDS.toMicros(checkStateNotNull(row.getDateTime(pos)).getMillis());
147+
case ROW:
148+
Schema beamSchema = checkStateNotNull(beamType.getRowSchema());
149+
Types.StructType structType = (Types.StructType) icebergType;
150+
151+
BeamRowWrapper nestedWrapper = new BeamRowWrapper(beamSchema, structType);
152+
return (row, pos) -> nestedWrapper.wrap(row.getRow(pos));
153+
case LOGICAL_TYPE:
154+
if (beamType.isLogicalType(MicrosInstant.IDENTIFIER)) {
155+
return (row, pos) -> {
156+
Instant instant = checkStateNotNull(row.getLogicalTypeValue(pos, Instant.class));
157+
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + instant.getNano() / 1000;
158+
};
159+
} else if (beamType.isLogicalType(DateTime.IDENTIFIER)) {
160+
return (row, pos) ->
161+
DateTimeUtil.microsFromTimestamp(
162+
checkStateNotNull(row.getLogicalTypeValue(pos, LocalDateTime.class)));
163+
} else if (beamType.isLogicalType(Date.IDENTIFIER)) {
164+
return (row, pos) ->
165+
DateTimeUtil.daysFromDate(
166+
checkStateNotNull(row.getLogicalTypeValue(pos, LocalDate.class)));
167+
} else if (beamType.isLogicalType(Time.IDENTIFIER)) {
168+
return (row, pos) ->
169+
DateTimeUtil.microsFromTime(
170+
checkStateNotNull(row.getLogicalTypeValue(pos, LocalTime.class)));
171+
} else if (beamType.isLogicalType(FixedPrecisionNumeric.IDENTIFIER)) {
172+
return (row, pos) -> row.getLogicalTypeValue(pos, BigDecimal.class);
173+
} else {
174+
return null;
175+
}
176+
default:
177+
return null;
178+
}
179+
}
180+
}

0 commit comments

Comments
 (0)