Skip to content

Commit 00aed5a

Browse files
author
lijinglun
committed
feat: add hive create namespace
1 parent ba184cd commit 00aed5a

12 files changed

Lines changed: 690 additions & 51 deletions

File tree

java/lance-namespace-adapter/src/test/java/com/lancedb/lance/namespace/adapter/MockExceptionController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
public class MockExceptionController {
2424
@GetMapping("/testNotFound")
2525
public String testNotFound(@RequestParam(required = false) String param) {
26-
String type = "Mock resource not found";
27-
String error = "Not found Error";
26+
String error = "Mock resource not found";
27+
String type = "Not found Error";
2828
String instance = "/v1/namespaces";
2929
String detail = String.format("%s not found", param);
30-
throw LanceNamespaceException.notFound(type, error, instance, detail);
30+
throw LanceNamespaceException.notFound(error, type, instance, detail);
3131
}
3232

3333
@GetMapping("/testInternalError")
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance.namespace.util;
15+
16+
import com.google.common.base.Joiner;
17+
18+
import java.util.Arrays;
19+
20+
public class CommonUtil {
21+
public static String formatCurrentStackTrace() {
22+
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
23+
return formatStackTrace(stackTrace);
24+
}
25+
26+
public static String formatStackTrace(StackTraceElement[] stackTrace) {
27+
return Joiner.on("\n\t").join(Arrays.copyOfRange(stackTrace, 1, stackTrace.length));
28+
}
29+
}

java/lance-namespace-core/src/main/java/com/lancedb/lance/namespace/util/ValidationUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ public static void checkNotNull(Object object, String message, Object... args) {
2828
checkArgument(object != null, message, args);
2929
}
3030

31-
public static void checkNotNullOrEmptyString(String str, String message, Object... args) {
31+
public static String checkNotNullOrEmptyString(String str, String message, Object... args) {
3232
checkArgument(str != null && !str.isEmpty(), message, args);
33+
return str;
3334
}
3435

3536
public static void checkState(boolean expression, String message, Object... args) {
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance.namespace.hive;
15+
16+
/** Error types for hive lance namespace */
17+
public enum ErrorType {
18+
HiveMetaStoreError("HiveMetaStoreError"),
19+
UnknownCatalog("UnknownCatalog"),
20+
CatalogAlreadyExist("CatalogAlreadyExist"),
21+
DatabaseAlreadyExist("DatabaseAlreadyExist");
22+
23+
private String type;
24+
25+
ErrorType(String type) {
26+
this.type = type;
27+
}
28+
29+
public String getType() {
30+
return type;
31+
}
32+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance.namespace.hive;
15+
16+
import com.lancedb.lance.namespace.LanceNamespaceException;
17+
import com.lancedb.lance.namespace.ObjectIdentifier;
18+
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
19+
import com.lancedb.lance.namespace.util.CommonUtil;
20+
import com.lancedb.lance.namespace.util.HiveUtil;
21+
import com.lancedb.lance.namespace.util.ValidationUtil;
22+
23+
import com.google.common.collect.Lists;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hive.conf.HiveConf;
26+
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
27+
import org.apache.hadoop.hive.metastore.api.Database;
28+
import org.apache.thrift.TException;
29+
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.function.Supplier;
33+
34+
import static com.lancedb.lance.namespace.hive.ErrorType.DatabaseAlreadyExist;
35+
import static com.lancedb.lance.namespace.hive.ErrorType.HiveMetaStoreError;
36+
37+
public class Hive2Adapter implements HiveAdapter {
38+
private HiveClientPool clientPool;
39+
private Configuration hadoopConf;
40+
41+
public Hive2Adapter(HiveClientPool clientPool, Configuration hadoopConf) {
42+
this.clientPool = clientPool;
43+
this.hadoopConf = hadoopConf;
44+
}
45+
46+
@Override
47+
public List<String> listNamespaces(ObjectIdentifier parent) {
48+
ValidationUtil.checkArgument(
49+
parent.levels() <= 2, "Expect a 2-level namespace but get %s", parent);
50+
51+
try {
52+
if (parent.isRoot()) {
53+
return clientPool.run(IMetaStoreClient::getAllDatabases);
54+
} else {
55+
return Lists.newArrayList();
56+
}
57+
} catch (TException | InterruptedException e) {
58+
if (e instanceof InterruptedException) {
59+
Thread.currentThread().interrupt();
60+
}
61+
throw LanceNamespaceException.serviceUnavailable(
62+
e.getMessage(), HiveMetaStoreError.getType(), "", CommonUtil.formatCurrentStackTrace());
63+
}
64+
}
65+
66+
@Override
67+
public void createNamespace(
68+
ObjectIdentifier id, CreateNamespaceRequest.ModeEnum mode, Map<String, String> properties) {
69+
ValidationUtil.checkArgument(id.levels() == 2, "Expect a 2-level namespace but get %s", id);
70+
71+
try {
72+
String db = id.level(0).toLowerCase();
73+
createDatabase(db, mode, properties);
74+
} catch (TException | InterruptedException e) {
75+
if (e instanceof InterruptedException) {
76+
Thread.currentThread().interrupt();
77+
}
78+
throw LanceNamespaceException.serviceUnavailable(
79+
e.getMessage(), HiveMetaStoreError.getType(), "", CommonUtil.formatCurrentStackTrace());
80+
}
81+
}
82+
83+
private void createDatabase(
84+
String dbName, CreateNamespaceRequest.ModeEnum mode, Map<String, String> properties)
85+
throws TException, InterruptedException {
86+
Database oldDb = HiveUtil.getDatabaseOrNull(clientPool, dbName);
87+
if (oldDb != null) {
88+
switch (mode) {
89+
case CREATE:
90+
throw LanceNamespaceException.conflict(
91+
String.format("Database %s already exist", dbName),
92+
DatabaseAlreadyExist.getType(),
93+
"",
94+
CommonUtil.formatCurrentStackTrace());
95+
case EXIST_OK:
96+
return;
97+
case OVERWRITE:
98+
clientPool.run(
99+
client -> {
100+
client.dropDatabase(dbName, false, true, false);
101+
return null;
102+
});
103+
}
104+
}
105+
106+
// Create database
107+
Supplier<String> warehouseLocation =
108+
() ->
109+
ValidationUtil.checkNotNullOrEmptyString(
110+
hadoopConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
111+
String.format(
112+
"Warehouse location is not set: %s=null",
113+
HiveConf.ConfVars.METASTOREWAREHOUSE.varname));
114+
115+
Database database = new Database();
116+
database.setName(dbName);
117+
HiveUtil.setDatabaseProperties(database, warehouseLocation, dbName, properties);
118+
119+
clientPool.run(
120+
client -> {
121+
client.createDatabase(database);
122+
return null;
123+
});
124+
}
125+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance.namespace.hive;
15+
16+
import com.lancedb.lance.namespace.LanceNamespaceException;
17+
import com.lancedb.lance.namespace.ObjectIdentifier;
18+
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
19+
import com.lancedb.lance.namespace.util.CommonUtil;
20+
import com.lancedb.lance.namespace.util.HiveUtil;
21+
import com.lancedb.lance.namespace.util.ValidationUtil;
22+
23+
import com.google.common.collect.Lists;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
26+
import org.apache.hadoop.hive.metastore.api.Catalog;
27+
import org.apache.hadoop.hive.metastore.api.Database;
28+
import org.apache.thrift.TException;
29+
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
import static com.lancedb.lance.namespace.hive.ErrorType.CatalogAlreadyExist;
34+
import static com.lancedb.lance.namespace.hive.ErrorType.DatabaseAlreadyExist;
35+
import static com.lancedb.lance.namespace.hive.ErrorType.HiveMetaStoreError;
36+
37+
public class Hive3Adapter implements HiveAdapter {
38+
private HiveClientPool clientPool;
39+
private Configuration hadoopConf;
40+
41+
public Hive3Adapter(HiveClientPool clientPool, Configuration hadoopConf) {
42+
this.clientPool = clientPool;
43+
this.hadoopConf = hadoopConf;
44+
}
45+
46+
@Override
47+
public List<String> listNamespaces(ObjectIdentifier parent) {
48+
ValidationUtil.checkArgument(
49+
parent.levels() <= 3, "Expect a 3-level namespace but get %s", parent);
50+
51+
try {
52+
if (parent.isRoot()) {
53+
return clientPool.run(IMetaStoreClient::getCatalogs);
54+
} else if (parent.levels() == 2) {
55+
return clientPool.run(client -> client.getAllDatabases(parent.level(0)));
56+
} else {
57+
return Lists.newArrayList();
58+
}
59+
} catch (TException | InterruptedException e) {
60+
if (e instanceof InterruptedException) {
61+
Thread.currentThread().interrupt();
62+
}
63+
throw LanceNamespaceException.serviceUnavailable(
64+
e.getMessage(), HiveMetaStoreError.getType(), "", CommonUtil.formatCurrentStackTrace());
65+
}
66+
}
67+
68+
@Override
69+
public void createNamespace(
70+
ObjectIdentifier id, CreateNamespaceRequest.ModeEnum mode, Map<String, String> properties) {
71+
ValidationUtil.checkArgument(
72+
!id.isRoot() && id.levels() <= 3, "Expect a 3-level namespace but get %s", id);
73+
74+
try {
75+
if (id.levels() == 2) {
76+
String name = id.level(0).toLowerCase();
77+
createCatalog(name, mode, properties);
78+
} else {
79+
String catalog = id.level(0).toLowerCase();
80+
String db = id.level(1).toLowerCase();
81+
createDatabase(catalog, db, mode, properties);
82+
}
83+
} catch (TException | InterruptedException e) {
84+
if (e instanceof InterruptedException) {
85+
Thread.currentThread().interrupt();
86+
}
87+
throw LanceNamespaceException.serviceUnavailable(
88+
e.getMessage(), HiveMetaStoreError.getType(), "", CommonUtil.formatCurrentStackTrace());
89+
}
90+
}
91+
92+
private void createCatalog(
93+
String catalogName, CreateNamespaceRequest.ModeEnum mode, Map<String, String> properties)
94+
throws TException, InterruptedException {
95+
ValidationUtil.checkNotNullOrEmptyString(
96+
properties.get(HiveNamespaceConfig.CATALOG_LOCATION_URI),
97+
"Expect %s to be set",
98+
HiveNamespaceConfig.CATALOG_LOCATION_URI);
99+
100+
Catalog oldCatalog = HiveUtil.getCatalogOrNull(clientPool, catalogName);
101+
102+
if (oldCatalog != null) {
103+
switch (mode) {
104+
case CREATE:
105+
throw LanceNamespaceException.conflict(
106+
String.format("Catalog %s already exist", catalogName),
107+
CatalogAlreadyExist.getType(),
108+
"",
109+
CommonUtil.formatCurrentStackTrace());
110+
case EXIST_OK:
111+
return;
112+
case OVERWRITE:
113+
clientPool.run(
114+
client -> {
115+
client.dropCatalog(catalogName);
116+
return null;
117+
});
118+
}
119+
}
120+
121+
// Create catalog
122+
Catalog catalog = new Catalog();
123+
124+
catalog.setName(catalogName);
125+
catalog.setLocationUri(properties.get(HiveNamespaceConfig.CATALOG_LOCATION_URI));
126+
127+
String description = properties.get(HiveNamespaceConfig.CATALOG_DESCRIPTION);
128+
if (description != null) {
129+
catalog.setDescription(description);
130+
}
131+
132+
clientPool.run(
133+
client -> {
134+
client.createCatalog(catalog);
135+
return null;
136+
});
137+
}
138+
139+
private void createDatabase(
140+
String catalogName,
141+
String dbName,
142+
CreateNamespaceRequest.ModeEnum mode,
143+
Map<String, String> properties)
144+
throws TException, InterruptedException {
145+
Catalog catalog = HiveUtil.getCatalogOrThrowNotFoundException(clientPool, catalogName);
146+
147+
Database oldDb = HiveUtil.getDatabaseOrNull(clientPool, catalogName, dbName);
148+
if (oldDb != null) {
149+
switch (mode) {
150+
case CREATE:
151+
throw LanceNamespaceException.conflict(
152+
String.format("Database %s.%s already exist", catalogName, dbName),
153+
DatabaseAlreadyExist.getType(),
154+
"",
155+
CommonUtil.formatCurrentStackTrace());
156+
case EXIST_OK:
157+
return;
158+
case OVERWRITE:
159+
clientPool.run(
160+
client -> {
161+
client.dropDatabase(catalogName, dbName, false, true, false);
162+
return null;
163+
});
164+
}
165+
}
166+
167+
// Create database
168+
Database database = new Database();
169+
database.setCatalogName(catalogName);
170+
database.setName(dbName);
171+
HiveUtil.setDatabaseProperties(database, () -> catalog.getLocationUri(), dbName, properties);
172+
173+
clientPool.run(
174+
client -> {
175+
client.createDatabase(database);
176+
return null;
177+
});
178+
}
179+
}

0 commit comments

Comments
 (0)