Skip to content

Commit 5ad73a6

Browse files
authored
HIVE-29460: Iceberg: Support CLUSTERED BY for iceberg tables (apache#6358)
1 parent f9705a8 commit 5ad73a6

7 files changed

Lines changed: 991 additions & 3 deletions

File tree

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
package org.apache.iceberg.hive;
2121

2222
import java.util.Collections;
23+
import java.util.List;
2324
import java.util.Map;
25+
import org.apache.commons.collections4.CollectionUtils;
2426
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
2527
import org.apache.hadoop.hive.metastore.TableType;
2628
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -161,6 +163,12 @@ default void persistTable(Table hmsTable, boolean updateHiveTable, String metada
161163

162164
static StorageDescriptor storageDescriptor(
163165
Schema schema, String location, boolean hiveEngineEnabled) {
166+
return storageDescriptor(schema, location, hiveEngineEnabled, null, -1);
167+
}
168+
169+
static StorageDescriptor storageDescriptor(
170+
Schema schema, String location, boolean hiveEngineEnabled,
171+
List<String> bucketCols, int numBuckets) {
164172
final StorageDescriptor storageDescriptor = new StorageDescriptor();
165173
storageDescriptor.setCols(HiveSchemaUtil.convert(schema));
166174
storageDescriptor.setLocation(location);
@@ -178,6 +186,13 @@ static StorageDescriptor storageDescriptor(
178186
}
179187

180188
storageDescriptor.setSerdeInfo(serDeInfo);
189+
190+
// Preserve Hive bucketing information if provided (for CLUSTERED BY support)
191+
if (CollectionUtils.isNotEmpty(bucketCols) && numBuckets > 0) {
192+
storageDescriptor.setBucketCols(bucketCols);
193+
storageDescriptor.setNumBuckets(numBuckets);
194+
}
195+
181196
return storageDescriptor;
182197
}
183198

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iceberg.hive;
2121

2222
import java.util.Collections;
23+
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Objects;
2526
import java.util.Set;
@@ -30,6 +31,7 @@
3031
import org.apache.hadoop.hive.metastore.TableType;
3132
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
3233
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
34+
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
3335
import org.apache.hadoop.hive.metastore.api.Table;
3436
import org.apache.iceberg.BaseMetastoreOperations;
3537
import org.apache.iceberg.BaseMetastoreTableOperations;
@@ -178,11 +180,23 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
178180
LOG.debug("Committing new table: {}", fullName);
179181
}
180182

183+
// Preserve Hive bucketing (CLUSTERED BY) if present in the existing table
184+
StorageDescriptor sd = tbl.getSd();
185+
List<String> bucketCols = null;
186+
int numBuckets = -1;
187+
188+
if (sd != null) {
189+
bucketCols = sd.getBucketCols();
190+
numBuckets = sd.getNumBuckets();
191+
}
192+
181193
tbl.setSd(
182194
HiveOperationsBase.storageDescriptor(
183195
metadata.schema(),
184196
metadata.location(),
185-
hiveEngineEnabled)); // set to pickup any schema changes
197+
hiveEngineEnabled, // set to pickup any schema changes
198+
bucketCols,
199+
numBuckets));
186200

187201
String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
188202
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.hive;
21+
22+
import java.io.IOException;
23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.LinkedHashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import org.apache.hadoop.hive.metastore.api.Table;
29+
import org.apache.iceberg.FileFormat;
30+
import org.apache.iceberg.FileScanTask;
31+
import org.apache.iceberg.catalog.TableIdentifier;
32+
import org.apache.iceberg.io.CloseableIterable;
33+
import org.apache.iceberg.mr.hive.test.TestTables.TestTableType;
34+
import org.apache.iceberg.mr.hive.test.utils.HiveIcebergStorageHandlerTestUtils;
35+
import org.apache.thrift.TException;
36+
import org.junit.Assert;
37+
import org.junit.Test;
38+
import org.junit.runners.Parameterized.Parameters;
39+
40+
/**
41+
* Verifies that Iceberg tables honor the {@code CLUSTERED BY} clause by checking HMS metadata
42+
* persistence, data readability and physical file distribution across buckets.
43+
*/
44+
public class TestHiveIcebergClusteredBy extends HiveIcebergStorageHandlerWithEngineBase {
45+
46+
private static final String TABLE_NAME = "ice_clus_data_files";
47+
private static final TableIdentifier TABLE_ID = TableIdentifier.of("default", TABLE_NAME);
48+
private static final int NUM_BUCKETS = 3;
49+
50+
private static final Map<Integer, Long> EXPECTED_BUCKET_RECORD_COUNTS = buildExpectedBuckets();
51+
52+
private static Map<Integer, Long> buildExpectedBuckets() {
53+
Map<Integer, Long> bucketCounts = new LinkedHashMap<>();
54+
bucketCounts.put(0, 4L); // customer_id=2
55+
bucketCounts.put(1, 3L); // customer_id=3
56+
bucketCounts.put(2, 2L); // customer_id=1
57+
return Collections.unmodifiableMap(bucketCounts);
58+
}
59+
60+
@Parameters(name = "fileFormat={0}, catalog={1}, isVectorized={2}, formatVersion={3}")
61+
public static Collection<Object[]> parameters() {
62+
return HiveIcebergStorageHandlerWithEngineBase.getParameters(p ->
63+
p.fileFormat() == FileFormat.PARQUET &&
64+
p.testTableType() == TestTableType.HIVE_CATALOG &&
65+
p.isVectorized());
66+
}
67+
68+
private void setUpIcebergTable() {
69+
shell.executeStatement(
70+
"CREATE TABLE " + TABLE_NAME +
71+
" (customer_id BIGINT, first_name STRING, last_name STRING) " +
72+
"CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " +
73+
"STORED BY ICEBERG STORED AS PARQUET " +
74+
"TBLPROPERTIES ('format-version'='" + formatVersion + "')");
75+
76+
shell.executeStatement(testTables.getInsertQuery(
77+
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, TABLE_ID, false));
78+
}
79+
80+
@Test
81+
public void testHmsHasBucketMetadata() throws TException, InterruptedException {
82+
setUpIcebergTable();
83+
validateHmsBucketMetadata(TABLE_NAME);
84+
}
85+
86+
@Test
87+
public void testTotalRowCountAfterClusteredInsert() {
88+
setUpIcebergTable();
89+
List<Object[]> result = shell.executeStatement("SELECT COUNT(*) FROM " + TABLE_NAME);
90+
long count = ((Number) result.getFirst()[0]).longValue();
91+
Assert.assertEquals(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.size(), count);
92+
}
93+
94+
@Test
95+
public void testIcebergDataFilesAfterClusteredInsert() throws IOException {
96+
setUpIcebergTable();
97+
org.apache.iceberg.Table icebergTable = testTables.loadTable(TABLE_ID);
98+
Map<Integer, Long> actualBucketRecordCounts = extractBucketRecordCounts(icebergTable);
99+
Assert.assertEquals(EXPECTED_BUCKET_RECORD_COUNTS, actualBucketRecordCounts);
100+
}
101+
102+
/**
103+
* Tests that CLUSTERED BY metadata and bucketing behavior are preserved when migrating
104+
* a native Hive table to Iceberg using ALTER TABLE CONVERT TO ICEBERG.
105+
*/
106+
@Test
107+
public void testNativeToIcebergMigrationWithClusteredBy() throws TException, InterruptedException, IOException {
108+
String migrationTableName = "migration_clustered_table";
109+
TableIdentifier migrationTableId = TableIdentifier.of("default", migrationTableName);
110+
111+
// Create a Native Hive table with CLUSTERED BY
112+
shell.executeStatement(
113+
"CREATE EXTERNAL TABLE " + migrationTableName +
114+
" (customer_id BIGINT, first_name STRING, last_name STRING) " +
115+
"CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " +
116+
"STORED AS ORC");
117+
118+
// Insert initial records into the Native Hive table
119+
List<org.apache.iceberg.data.Record> initialRecords =
120+
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.subList(0, 4);
121+
shell.executeStatement(testTables.getInsertQuery(initialRecords, migrationTableId, false));
122+
123+
validateHmsBucketMetadata(migrationTableName);
124+
125+
List<Object[]> initialCount = shell.executeStatement("SELECT COUNT(*) FROM " + migrationTableName);
126+
Assert.assertEquals(4L, ((Number) initialCount.getFirst()[0]).longValue());
127+
128+
// Convert the Native Hive table to Iceberg table
129+
shell.executeStatement("ALTER TABLE " + migrationTableName + " CONVERT TO ICEBERG");
130+
131+
validateHmsBucketMetadata(migrationTableName);
132+
Table hmsTable = shell.metastore().getTable("default", migrationTableName);
133+
Assert.assertEquals("org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
134+
hmsTable.getParameters().get("storage_handler"));
135+
136+
// Insert remaining records into the Iceberg table
137+
List<org.apache.iceberg.data.Record> remainingRecords =
138+
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.subList(4, 9);
139+
shell.executeStatement(testTables.getInsertQuery(remainingRecords, migrationTableId, false));
140+
141+
// Validate Iceberg table metadata and bucketing behavior
142+
List<Object[]> finalCount = shell.executeStatement("SELECT COUNT(*) FROM " + migrationTableName);
143+
Assert.assertEquals(HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.size(),
144+
((Number) finalCount.getFirst()[0]).longValue());
145+
146+
org.apache.iceberg.Table icebergTable = testTables.loadTable(migrationTableId);
147+
Map<Integer, Long> bucketCounts = extractBucketRecordCounts(icebergTable);
148+
Assert.assertEquals(EXPECTED_BUCKET_RECORD_COUNTS, bucketCounts);
149+
150+
shell.executeStatement("DROP TABLE IF EXISTS " + migrationTableName);
151+
}
152+
153+
private void validateHmsBucketMetadata(String tableName) throws TException, InterruptedException {
154+
Table hmsTable = shell.metastore().getTable("default", tableName);
155+
Assert.assertEquals(NUM_BUCKETS, hmsTable.getSd().getNumBuckets());
156+
Assert.assertEquals(Collections.singletonList("customer_id"), hmsTable.getSd().getBucketCols());
157+
}
158+
159+
/**
160+
* Extracts bucket ID from file names and counts records per bucket.
161+
* Handles both Iceberg file names ({bucketId}-{attemptId}-...) and native Hive file names ({bucketId}_0).
162+
*/
163+
private Map<Integer, Long> extractBucketRecordCounts(org.apache.iceberg.Table icebergTable) throws IOException {
164+
Map<Integer, Long> bucketRecordCounts = new LinkedHashMap<>();
165+
try (CloseableIterable<FileScanTask> tasks = icebergTable.newScan().planFiles()) {
166+
for (FileScanTask task : tasks) {
167+
String path = task.file().location();
168+
String filename = path.substring(path.lastIndexOf('/') + 1);
169+
170+
int bucketId;
171+
if (!filename.contains("-")) {
172+
// Native Hive: 000000_0 → bucket 0
173+
bucketId = Integer.parseInt(filename.split("_")[0]);
174+
} else {
175+
// Iceberg: 00000-0-... → bucket 0
176+
bucketId = Integer.parseInt(filename.split("-")[0]);
177+
}
178+
179+
// Sum records across multiple files in the same bucket (e.g., multiple inserts)
180+
bucketRecordCounts.merge(bucketId, task.file().recordCount(), Long::sum);
181+
}
182+
}
183+
return bucketRecordCounts;
184+
}
185+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
-- Test Iceberg tables with CLUSTERED BY clause
2+
-- Verify that Iceberg tables follow Hive bucketing behavior for query optimization
3+
4+
-- SORT_QUERY_RESULTS
5+
-- Mask random uuid
6+
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
7+
-- Mask a random snapshot id
8+
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
9+
-- Mask added file size
10+
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
11+
-- Mask total file size
12+
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
13+
-- Mask current-snapshot-timestamp-ms
14+
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
15+
-- Mask removed file size
16+
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
17+
-- Mask number of files
18+
--! qt:replace:/(\s+numFiles\s+)\S+(\s+)/$1#Masked#$2/
19+
-- Mask total data files
20+
--! qt:replace:/(\S\"total-data-files\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
21+
-- Mask iceberg version
22+
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
23+
24+
-- Simple CREATE with CLUSTERED BY
25+
CREATE TABLE ice_bucketed_simple (
26+
id int,
27+
name string,
28+
age int
29+
)
30+
CLUSTERED BY (id) INTO 4 BUCKETS
31+
STORED BY ICEBERG;
32+
33+
DESCRIBE FORMATTED ice_bucketed_simple;
34+
35+
EXPLAIN INSERT INTO ice_bucketed_simple VALUES (1, 'Alice', 25), (2, 'Bob', 30);
36+
37+
INSERT INTO ice_bucketed_simple VALUES
38+
(1, 'Alice', 25),
39+
(2, 'Bob', 30),
40+
(3, 'Carrie', 22),
41+
(4, 'Danny', 28),
42+
(5, 'Eve', 35),
43+
(6, 'Frank', 40),
44+
(7, 'Grace', 27),
45+
(8, 'Henry', 33),
46+
(9, 'Ivy', 29),
47+
(10, 'Jack', 31);
48+
49+
SELECT * FROM ice_bucketed_simple ORDER BY id;
50+
51+
SELECT COUNT(*) from default.ice_bucketed_simple.files;
52+
53+
-- CLUSTERED BY on multiple columns
54+
CREATE TABLE ice_bucketed_multi (
55+
customer_id int,
56+
order_id bigint,
57+
product string
58+
)
59+
CLUSTERED BY (customer_id, order_id) INTO 8 BUCKETS
60+
STORED BY ICEBERG;
61+
62+
DESCRIBE FORMATTED ice_bucketed_multi;
63+
64+
EXPLAIN INSERT INTO ice_bucketed_multi VALUES (100, 1001, 'Widget'), (101, 1002, 'Gadget');
65+
66+
-- Convert EXTERNAL Hive table with CLUSTERED BY to Iceberg
67+
CREATE EXTERNAL TABLE hive_bucketed (
68+
product_id int,
69+
category string,
70+
price decimal(10,2)
71+
)
72+
CLUSTERED BY (product_id) INTO 3 BUCKETS
73+
STORED AS ORC;
74+
75+
INSERT INTO hive_bucketed VALUES (1, 'Electronics', 299.99), (2, 'Books', 15.50);
76+
77+
ALTER TABLE hive_bucketed CONVERT TO ICEBERG;
78+
79+
DESCRIBE FORMATTED hive_bucketed;
80+
81+
EXPLAIN INSERT INTO hive_bucketed VALUES (3, 'Furniture', 450.00);
82+
83+
DROP TABLE IF EXISTS ice_bucketed_simple;
84+
DROP TABLE IF EXISTS ice_bucketed_multi;
85+
DROP TABLE IF EXISTS hive_bucketed;

0 commit comments

Comments
 (0)