Skip to content

Commit 1cb455b

Browse files
difinDmitriy FingermanDmitriy Fingerman
authored
HIVE-29578: Iceberg: add support for logical views (#6449)
* HIVE-29578: Iceberg: support for Iceberg logical views. --------- Co-authored-by: Dmitriy Fingerman <dfingerman@cloudera.com> Co-authored-by: Dmitriy Fingerman <difin@apache.org>
1 parent 8c6f824 commit 1cb455b

27 files changed

Lines changed: 1906 additions & 56 deletions

File tree

common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ public enum ErrorMsg {
445445
@Deprecated // kept for backwards reference
446446
REPLACE_VIEW_WITH_MATERIALIZED(10400, "Attempt to replace view {0} with materialized view", true),
447447
REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true),
448+
VIEW_STORAGE_HANDLER_UNSUPPORTED(10448, "Storage handler {0} doesn't support external logical views", true),
448449
UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),
449450
MATERIALIZED_VIEW_DEF_EMPTY(10403, "Query for the materialized view rebuild could not be retrieved"),
450451
MERGE_PREDIACTE_REQUIRED(10404, "MERGE statement with both UPDATE and DELETE clauses " +

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public static void updateHmsTableForIcebergView(
161161
HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
162162
metadata.schema(),
163163
maxHiveTablePropertySize);
164+
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, HIVE_ICEBERG_STORAGE_HANDLER);
164165
tbl.setParameters(parameters);
165166
}
166167

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import org.slf4j.LoggerFactory;
4747

4848
/** All the HMS operations like table,view,materialized_view should implement this. */
49-
interface HiveOperationsBase {
49+
public interface HiveOperationsBase {
5050

5151
Logger LOG = LoggerFactory.getLogger(HiveOperationsBase.class);
5252
// The max size is based on HMS backend database. For Hive versions below 2.3, the max table

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ private Table newHMSView(ViewMetadata metadata) {
302302
tableType().name());
303303
}
304304

305-
private String sqlFor(ViewMetadata metadata) {
305+
public static String sqlFor(ViewMetadata metadata) {
306306
SQLViewRepresentation closest = null;
307307
for (ViewRepresentation representation : metadata.currentVersion().representations()) {
308308
if (representation instanceof SQLViewRepresentation) {
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.hive;
21+
22+
import java.io.Closeable;
23+
import java.io.IOException;
24+
import java.io.UncheckedIOException;
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.Map;
28+
import org.apache.commons.lang3.StringUtils;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.hive.metastore.api.FieldSchema;
31+
import org.apache.iceberg.CatalogUtil;
32+
import org.apache.iceberg.catalog.Catalog;
33+
import org.apache.iceberg.catalog.Namespace;
34+
import org.apache.iceberg.catalog.TableIdentifier;
35+
import org.apache.iceberg.catalog.ViewCatalog;
36+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
37+
import org.apache.iceberg.view.ViewBuilder;
38+
39+
/**
40+
* Commits a native Iceberg view through the configured default Iceberg catalog (HiveCatalog or REST
41+
* catalog, etc.) when {@code Catalog} also implements {@link ViewCatalog}.
42+
*/
43+
public final class IcebergViewSupport {
44+
45+
private IcebergViewSupport() {
46+
}
47+
48+
/**
49+
* Loads the native Iceberg logical view definition and applies SQL, schema, and Iceberg params to {@code hmsTable}
50+
*/
51+
public static void enrichHmsTableFromIcebergView(
52+
org.apache.hadoop.hive.metastore.api.Table hmsTable, Configuration conf) {
53+
TableIdentifier identifier = TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName());
54+
String catalogName = IcebergCatalogProperties.getCatalogName(conf);
55+
Map<String, String> catalogProps = IcebergCatalogProperties.getCatalogProperties(conf, catalogName);
56+
Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, conf);
57+
runWithCatalog(catalog, () -> loadAndApplyView(hmsTable, conf, catalog, catalogName, identifier));
58+
}
59+
60+
private static void loadAndApplyView(
61+
org.apache.hadoop.hive.metastore.api.Table hmsTable,
62+
Configuration conf,
63+
Catalog catalog,
64+
String catalogName,
65+
TableIdentifier identifier) {
66+
ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName);
67+
MetastoreUtil.applyIcebergViewToHmsTable(hmsTable, viewCatalog.loadView(identifier), conf);
68+
}
69+
70+
public static void createOrReplaceView(
71+
Configuration conf,
72+
String databaseName,
73+
String viewName,
74+
List<FieldSchema> fieldSchemas,
75+
String viewSql,
76+
Map<String, String> tblProperties,
77+
String comment) {
78+
79+
TableIdentifier identifier = TableIdentifier.of(databaseName, viewName);
80+
String catalogName = IcebergCatalogProperties.getCatalogName(conf);
81+
Map<String, String> catalogProps = IcebergCatalogProperties.getCatalogProperties(conf, catalogName);
82+
Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, conf);
83+
runWithCatalog(
84+
catalog,
85+
() -> commitView(catalog, catalogName, identifier, fieldSchemas, viewSql, tblProperties, comment));
86+
}
87+
88+
/**
89+
* Runs {@code action} with {@code catalog}, then closes it when the catalog implements
90+
* {@link Closeable} (e.g. REST catalog clients).
91+
*/
92+
private static void runWithCatalog(Catalog catalog, Runnable action) {
93+
if (catalog instanceof Closeable closeable) {
94+
try (Closeable ignored = closeable) {
95+
action.run();
96+
} catch (IOException e) {
97+
throw new UncheckedIOException("Failed to close Iceberg catalog", e);
98+
}
99+
} else {
100+
action.run();
101+
}
102+
}
103+
104+
private static void commitView(
105+
Catalog catalog,
106+
String catalogName,
107+
TableIdentifier identifier,
108+
List<FieldSchema> fieldSchemas,
109+
String viewSql,
110+
Map<String, String> tblProperties,
111+
String comment) {
112+
ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName);
113+
114+
ViewBuilder builder =
115+
viewCatalog
116+
.buildView(identifier)
117+
.withSchema(HiveSchemaUtil.convert(fieldSchemas, Collections.emptyMap(), true))
118+
.withDefaultNamespace(Namespace.of(identifier.namespace().level(0)))
119+
.withQuery("hive", viewSql);
120+
121+
if (StringUtils.isNotBlank(comment)) {
122+
builder = builder.withProperty("comment", comment);
123+
}
124+
125+
Map<String, String> tblProps =
126+
tblProperties == null ? Maps.newHashMap() : Maps.newHashMap(tblProperties);
127+
128+
builder.withProperties(tblProps);
129+
130+
builder.createOrReplace();
131+
}
132+
133+
private static ViewCatalog asViewCatalog(Catalog catalog, String catalogName) {
134+
if (catalog instanceof ViewCatalog viewCatalog) {
135+
return viewCatalog;
136+
}
137+
throw new UnsupportedOperationException(
138+
String.format(
139+
"Iceberg catalog '%s' does not implement ViewCatalog.",
140+
catalogName) +
141+
" Iceberg views require a catalog that implements ViewCatalog (e.g. HiveCatalog or REST).");
142+
}
143+
}

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
3737
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
3838
import org.apache.hadoop.hive.metastore.api.Table;
39+
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
3940
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
4041
import org.apache.hadoop.hive.serde.serdeConstants;
42+
import org.apache.iceberg.BaseMetastoreTableOperations;
4143
import org.apache.iceberg.BaseTable;
4244
import org.apache.iceberg.CatalogUtil;
4345
import org.apache.iceberg.Schema;
@@ -46,6 +48,11 @@
4648
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4749
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4850
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
51+
import org.apache.iceberg.util.PropertyUtil;
52+
import org.apache.iceberg.view.BaseView;
53+
import org.apache.iceberg.view.SQLViewRepresentation;
54+
import org.apache.iceberg.view.View;
55+
import org.apache.iceberg.view.ViewMetadata;
4956
import org.apache.thrift.TException;
5057

5158
public class MetastoreUtil {
@@ -134,20 +141,101 @@ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration co
134141
result.setDbName(tableName.getDb());
135142
result.setTableName(tableName.getTable());
136143
result.setTableType(TableType.EXTERNAL_TABLE.toString());
137-
result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
144+
145+
// TODO: Revert after HIVE-29633 is fixed
146+
// result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
147+
result.setPartitionKeys(Lists.newArrayList());
148+
138149
TableMetadata metadata = ((BaseTable) table).operations().current();
139150
long maxHiveTablePropertySize = conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
140151
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
141152
HMSTablePropertyHelper.updateHmsTableForIcebergTable(metadata.metadataFileLocation(), result, metadata,
142153
null, true, maxHiveTablePropertySize, null);
143154
String catalogType = IcebergCatalogProperties.getCatalogType(conf);
144155
if (!StringUtils.isEmpty(catalogType) && !IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) {
145-
result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, IcebergCatalogProperties.getCatalogType(conf));
156+
result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType);
146157
}
147158
result.setSd(getHiveStorageDescriptor(table));
148159
return result;
149160
}
150161

162+
/**
163+
* Builds a minimal HMS {@link Table} shell for Iceberg view (identity, view type,
164+
* and Iceberg storage-handler markers only). The storage handler {@code postGetTable} hook enriches
165+
* this object via {@link IcebergViewSupport#enrichHmsTableFromIcebergView} (view SQL,
166+
* schema, and Iceberg parameters).
167+
*/
168+
public static Table buildMinimalHMSView(String catName, String dbName, String tableName) {
169+
Table result = new Table();
170+
result.setCatName(catName);
171+
result.setDbName(dbName);
172+
result.setTableName(tableName);
173+
result.setTableType(TableType.VIRTUAL_VIEW.toString());
174+
175+
Map<String, String> parameters = Maps.newHashMap();
176+
parameters.put(
177+
BaseMetastoreTableOperations.TABLE_TYPE_PROP, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE);
178+
parameters.put(
179+
hive_metastoreConstants.META_TABLE_STORAGE, HMSTablePropertyHelper.HIVE_ICEBERG_STORAGE_HANDLER);
180+
result.setParameters(parameters);
181+
return result;
182+
}
183+
184+
/**
185+
* Applies Iceberg view metadata (SQL, schema, params) onto an existing HMS {@link Table}.
186+
*/
187+
public static void applyIcebergViewToHmsTable(Table hmsTable, View view, Configuration conf) {
188+
ViewMetadata metadata = ((BaseView) view).operations().current();
189+
String sqlText = viewSqlText(view, metadata);
190+
191+
boolean hiveEngineEnabled = false;
192+
hmsTable.setSd(HiveOperationsBase.storageDescriptor(metadata.schema(), metadata.location(), hiveEngineEnabled));
193+
StorageDescriptor sd = hmsTable.getSd();
194+
195+
if (sd.getBucketCols() == null) {
196+
sd.setBucketCols(Lists.newArrayList());
197+
}
198+
199+
if (sd.getSortCols() == null) {
200+
sd.setSortCols(Lists.newArrayList());
201+
}
202+
203+
long maxHiveTablePropertySize =
204+
conf.getLong(
205+
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
206+
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
207+
HMSTablePropertyHelper.updateHmsTableForIcebergView(
208+
metadata.metadataFileLocation(),
209+
hmsTable,
210+
metadata,
211+
Collections.emptySet(),
212+
maxHiveTablePropertySize,
213+
null);
214+
215+
hmsTable.setCreateTime((int) (metadata.version(1).timestampMillis() / 1000));
216+
hmsTable.setLastAccessTime((int) (metadata.currentVersion().timestampMillis() / 1000));
217+
hmsTable.setOwner(
218+
PropertyUtil.propertyAsString(
219+
metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser()));
220+
221+
// In-memory overlay for compile/describe: authoritative SQL comes from Iceberg metadata.
222+
hmsTable.setViewOriginalText(sqlText);
223+
hmsTable.setViewExpandedText(sqlText);
224+
225+
String catalogType = IcebergCatalogProperties.getCatalogType(conf);
226+
if (!StringUtils.isEmpty(catalogType) && !IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) {
227+
hmsTable.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, IcebergCatalogProperties.getCatalogType(conf));
228+
}
229+
}
230+
231+
private static String viewSqlText(View view, ViewMetadata metadata) {
232+
SQLViewRepresentation hiveRepr = view.sqlFor("hive");
233+
if (hiveRepr != null) {
234+
return hiveRepr.sql();
235+
}
236+
return HiveViewOperations.sqlFor(metadata);
237+
}
238+
151239
private static StorageDescriptor getHiveStorageDescriptor(org.apache.iceberg.Table table) {
152240
var result = new StorageDescriptor();
153241
result.setCols(HiveSchemaUtil.convert(table.schema()));

0 commit comments

Comments
 (0)