diff --git a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java index 0e9a9676dfd..a2c2c33c5db 100644 --- a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java +++ b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableDelegator.java @@ -20,7 +20,7 @@ import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_CREATION_MODE; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_STORAGE_OPTIONS_PREFIX; -import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_CREATE_EMPTY; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_DECLARED; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_FORMAT; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_REGISTER; @@ -68,8 +68,8 @@ public List> tablePropertyEntries() { false /* hidden */, false /* reserved */), PropertyEntry.booleanPropertyEntry( - LANCE_TABLE_CREATE_EMPTY, - "Whether this is a lance create empty table (declare table) operation.", + LANCE_TABLE_DECLARED, + "Whether this is a Lance metadata-only declare table operation.", false, true /* immutable */, false /* defaultValue */, diff --git a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java index dccbfe8c04e..9edec20a23c 100644 --- a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java +++ b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java @@ -282,14 +282,14 @@ Table createTableInternal( ident, columns, comment, properties, partitions, distribution, sortOrders, indexes); } - // Check whether it's a create empty table operation. - boolean createEmpty = - Optional.ofNullable(properties.get(LanceConstants.LANCE_TABLE_CREATE_EMPTY)) + // Check whether it's a metadata-only declare table operation. + boolean declaredOnly = + Optional.ofNullable(properties.get(LanceConstants.LANCE_TABLE_DECLARED)) .map(Boolean::parseBoolean) .orElse(false); - if (createEmpty) { - // For create empty table, we just create the table metadata in Gravitino without creating - // the underlying Lance dataset. + if (declaredOnly) { + // For declare table, we just create the table metadata in Gravitino without creating the + // underlying Lance dataset. return super.createTable( ident, columns, comment, properties, partitions, distribution, sortOrders, indexes); } diff --git a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java index 4a39f18ccff..d7b2b5f2181 100644 --- a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java +++ b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java @@ -19,7 +19,7 @@ package org.apache.gravitino.catalog.lakehouse.lance.integration.test; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_CREATION_MODE; -import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_CREATE_EMPTY; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_DECLARED; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_FORMAT; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_REGISTER; @@ -82,7 +82,7 @@ import org.lance.Dataset; import org.lance.Fragment; import org.lance.FragmentMetadata; -import org.lance.Transaction; +import org.lance.SourcedTransaction; import org.lance.WriteParams; import org.lance.ipc.LanceScanner; import org.lance.ipc.ScanOptions; @@ -162,7 +162,7 @@ public void testCrateEmptyTable() { String tableLocation = tempDirectory + "/" + tableName; properties.put("format", "lance"); properties.put("location", tableLocation); - properties.put(LANCE_TABLE_CREATE_EMPTY, "true"); + properties.put(LANCE_TABLE_DECLARED, "true"); properties.put(Table.PROPERTY_EXTERNAL, "true"); Table createdTable = @@ -178,7 +178,7 @@ public void testCrateEmptyTable() { null); Assertions.assertEquals(createdTable.name(), emptyTableName); - // Now try to alter the property LANCE_TABLE_CREATE_EMPTY + // Now try to alter the property LANCE_TABLE_DECLARED IllegalArgumentException e = Assertions.assertThrows( IllegalArgumentException.class, @@ -186,11 +186,10 @@ public void testCrateEmptyTable() { catalog .asTableCatalog() .alterTable( - nameIdentifier, - TableChange.setProperty(LANCE_TABLE_CREATE_EMPTY, "false"))); + nameIdentifier, TableChange.setProperty(LANCE_TABLE_DECLARED, "false"))); Assertions.assertTrue( - e.getMessage().contains("Property lance.create-empty is immutable or reserved")); + e.getMessage().contains("Property lance.declared is immutable or reserved")); } @Test @@ -374,7 +373,7 @@ void testLanceTableFormat() { } // Now try to write some data to the dataset - Transaction trans = + SourcedTransaction trans = dataset .newTransactionBuilder() .operation( @@ -388,10 +387,10 @@ void testLanceTableFormat() { new LanceDataValue(3, 300L, "third")), lanceSchema)) .build()) - .writeParams(ImmutableMap.of()) + .transactionProperties(ImmutableMap.of()) .build(); - Dataset newDataset = dataset.commitTransaction(trans); + Dataset newDataset = trans.commit(); try (LanceScanner scanner = newDataset.newScan( new ScanOptions.Builder() diff --git a/clients/client-python/MANIFEST.in b/clients/client-python/MANIFEST.in index 5656cf186f1..f75d13e1844 100644 --- a/clients/client-python/MANIFEST.in +++ b/clients/client-python/MANIFEST.in @@ -18,6 +18,7 @@ include requirements.txt include requirements-dev.txt +include requirements-lance.txt include README.md include LICENSE include NOTICE diff --git a/clients/client-python/requirements-lance.txt b/clients/client-python/requirements-lance.txt new file mode 100644 index 00000000000..482aeef75f6 --- /dev/null +++ b/clients/client-python/requirements-lance.txt @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Lance integration deps. `lance-ray` owns the compatible `lance-namespace` +# dependency, so do not pin `lance-namespace` here separately. Installed via +# the `lance` extra (e.g. `pip install -e .[lance]`) so the heavy ray/pylance +# native wheels don't slow down the default `dev` install used by lint and +# unit-test tasks. +ray==2.55.1 +lance-ray==0.4.2 diff --git a/clients/client-python/setup.py b/clients/client-python/setup.py index 18ea18e0e8d..4cdbd9302a1 100644 --- a/clients/client-python/setup.py +++ b/clients/client-python/setup.py @@ -56,6 +56,7 @@ install_requires=open("requirements.txt").read(), extras_require={ "dev": open("requirements-dev.txt").read(), + "lance": open("requirements-lance.txt").read(), }, include_package_data=True, ) diff --git a/clients/client-python/tests/integration/test_lance_ray.py b/clients/client-python/tests/integration/test_lance_ray.py new file mode 100644 index 00000000000..d94ee69d8f8 --- /dev/null +++ b/clients/client-python/tests/integration/test_lance_ray.py @@ -0,0 +1,332 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +import shutil +import tempfile +import time +import unittest +from random import randint +from typing import Optional + +import requests + +from gravitino import ( + Catalog, + GravitinoAdminClient, + GravitinoClient, +) +from tests.integration.integration_test_env import IntegrationTestEnv + +logger = logging.getLogger(__name__) + +LANCE_REST_PORT = 9101 +LANCE_REST_BASE_URL = f"http://localhost:{LANCE_REST_PORT}/lance" + +# The Lance REST server runs as an auxiliary service inside the main +# Gravitino process (gravitino.auxService.names = ...,lance-rest), so its +# bind metalake is configured in the *main* gravitino.conf rather than the +# standalone lance-rest conf file. +MAIN_CONF_FILE = "conf/gravitino.conf" +LANCE_REST_METALAKE_KEY = "gravitino.lance-rest.gravitino-metalake" + + +def _missing_lance_ray_deps() -> Optional[str]: + missing = [] + for mod in ("ray", "lance_ray", "lance_namespace"): + try: + __import__(mod) + except ImportError: + missing.append(mod) + return ", ".join(missing) if missing else None + + +# Compute once at module import time so the @skipIf condition and message +# don't trigger two rounds of import attempts. +_MISSING_LANCE_RAY_DEPS = _missing_lance_ray_deps() + + +@unittest.skipIf( + _MISSING_LANCE_RAY_DEPS is not None, + f"lance-ray test deps not installed: {_MISSING_LANCE_RAY_DEPS}. " + "Install with: pip install -e .[lance] (or pip install ray lance-ray " + "lance-namespace). Requires the Gravitino server to expose a lance-rest " + "auxiliary service backed by lance-namespace-core >= 0.7.5.", +) +class TestLanceRayIntegration(IntegrationTestEnv): + """End-to-end test for the lance-ray Python client against a Gravitino-backed + Lance REST namespace. Mirrors the ``ray.data`` -> ``write_lance`` -> + ``read_lance`` flow from the upstream lance-ray docs. + """ + + # Metalake name is fixed because the lance-rest aux service binds to a + # single metalake from gravitino.conf. The per-test table name still gets + # a random suffix to keep individual test methods isolated. + METALAKE_NAME: str = "lance_ray_test_metalake" + CATALOG_NAME: str = "lance_catalog" + SCHEMA_NAME: str = "schema" + TABLE_NAME: str = "lance_ray_tbl_" + str(randint(1, 100000)) + + gravitino_admin_client: Optional[GravitinoAdminClient] = None + gravitino_client: Optional[GravitinoClient] = None + temp_dir: Optional[str] = None + main_conf_path: Optional[str] = None + appended_lance_rest_conf: bool = False + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls._get_gravitino_home() + gravitino_home = cls.gravitino_home + cls.main_conf_path = os.path.join(gravitino_home, MAIN_CONF_FILE) + + # Bind the lance-rest aux service to our test metalake. If the same + # binding is already present (e.g. an earlier run in the same Gradle + # session left it there), skip the conf write and the restart. This + # avoids appending the same conf entry twice if a prior failed run + # already left the binding behind. + if not cls._lance_metalake_already_bound(): + cls._append_conf(cls._lance_rest_config(), cls.main_conf_path) + cls.appended_lance_rest_conf = True + cls.restart_server() + if not cls._wait_for_lance_rest_ready(): + raise RuntimeError( + "Lance REST aux service did not become ready in time at " + + LANCE_REST_BASE_URL + ) + + # Probe whether the server-side `lance-namespace-core` is new enough + # to deserialize requests from the installed PyPI `lance-namespace`. + # We skip cleanly (rather than fail) when the server is older — this + # happens on branches that haven't merged the `lance-namespace-core` + # upgrade yet. The probe runs *before* metalake/catalog/schema setup + # so a skipped run leaves no fixtures behind. + skip_reason = cls._check_lance_namespace_compat() + if skip_reason is not None: + cls._reset_lance_rest_conf() + raise unittest.SkipTest(skip_reason) + + cls.gravitino_admin_client = GravitinoAdminClient("http://localhost:8090") + # Idempotent: tolerate a metalake left over from a prior failed run. + try: + cls.gravitino_admin_client.create_metalake( + cls.METALAKE_NAME, + comment="lance-ray IT metalake", + properties={}, + ) + except Exception as e: # pylint: disable=broad-exception-caught + if "already exists" not in str(e).lower(): + raise + logger.info("Metalake %s already exists, reusing", cls.METALAKE_NAME) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.METALAKE_NAME + ) + cls.temp_dir = tempfile.mkdtemp(prefix="lance_ray_it_") + # Idempotent catalog + schema creation too. + try: + cls.gravitino_client.create_catalog( + name=cls.CATALOG_NAME, + catalog_type=Catalog.Type.RELATIONAL, + provider="lakehouse-generic", + comment="lance-ray IT catalog", + properties={"location": cls.temp_dir}, + ) + except Exception as e: # pylint: disable=broad-exception-caught + if "already exists" not in str(e).lower(): + raise + catalog = cls.gravitino_client.load_catalog(cls.CATALOG_NAME) + try: + catalog.as_schemas().create_schema( + schema_name=cls.SCHEMA_NAME, + comment="lance-ray IT schema", + properties={}, + ) + except Exception as e: # pylint: disable=broad-exception-caught + if "already exists" not in str(e).lower(): + raise + + @classmethod + def tearDownClass(cls): + failures = [] + + try: + if cls.gravitino_client is not None: + cls.gravitino_client.drop_catalog(name=cls.CATALOG_NAME, force=True) + except Exception as e: # pylint: disable=broad-exception-caught + failures.append(("drop catalog", e)) + + try: + if cls.gravitino_admin_client is not None: + cls.gravitino_admin_client.drop_metalake( + name=cls.METALAKE_NAME, force=True + ) + except Exception as e: # pylint: disable=broad-exception-caught + failures.append(("drop metalake", e)) + + try: + cls._reset_lance_rest_conf() + except Exception as e: # pylint: disable=broad-exception-caught + failures.append(("reset lance-rest conf", e)) + + try: + if cls.temp_dir and os.path.exists(cls.temp_dir): + shutil.rmtree(cls.temp_dir, ignore_errors=True) + except Exception as e: # pylint: disable=broad-exception-caught + failures.append(("remove temp dir", e)) + + for step, err in failures: + logger.warning("Cleanup step %s failed: %s", step, err) + + super().tearDownClass() + + @classmethod + def _check_lance_namespace_compat(cls) -> Optional[str]: + """Detect server/client schema drift on lance-namespace. + + The lance-namespace REST model evolves: newer client versions add + request fields (e.g. ``check_declared``) that older + ``lance-namespace-core`` builds on the server side reject with + Jackson's "Unrecognized field ... not marked as ignorable". This + helper sends a harmless ``describe_table`` for a bogus table id so + we can observe the schema-validation error without doing any real + work. Returns a skip reason on incompatibility, or ``None`` if the + server understands the request shape. + """ + try: + # pylint: disable=import-outside-toplevel + import lance_namespace + from lance_namespace import DescribeTableRequest + + # pylint: enable=import-outside-toplevel + except ImportError: + # `@unittest.skipIf` on the class already handles this case; if + # we reach here something odd is going on but it's not our job + # to recover from it. + return None + + try: + ns = lance_namespace.connect("rest", {"uri": LANCE_REST_BASE_URL}) + except Exception as e: # pylint: disable=broad-exception-caught + return f"unable to connect to lance-rest aux service: {e}" + + probe = DescribeTableRequest(id=["__probe__", "__probe__", "__probe__"]) + try: + ns.describe_table(probe) + # Unlikely but not impossible: the probe table actually exists + # in a leftover metalake. That's still a compatible server. + return None + except Exception as e: # pylint: disable=broad-exception-caught + msg = str(e) + if "Unrecognized field" in msg or "not marked as ignorable" in msg: + short = msg.splitlines()[0][:200] + return ( + "lance-rest server's lance-namespace-core is older than " + "the client's lance-namespace (request schema mismatch). " + f"Server reported: {short}. To run this test, use " + "lance-namespace-core 0.7.5 or newer on the server, " + "or roll the client back to a matching version." + ) + # Any other error (table-not-found, metalake-not-found, etc.) + # means the server *did* deserialize the request — i.e. schemas + # are compatible. + return None + + @classmethod + def _lance_metalake_already_bound(cls) -> bool: + if cls.main_conf_path is None or not os.path.exists(cls.main_conf_path): + return False + needle = f"{LANCE_REST_METALAKE_KEY} = {cls.METALAKE_NAME}" + with open(cls.main_conf_path, encoding="utf-8") as f: + for line in f: + if line.strip() == needle: + return True + return False + + @classmethod + def _lance_rest_config(cls): + return {LANCE_REST_METALAKE_KEY: cls.METALAKE_NAME} + + @classmethod + def _reset_lance_rest_conf(cls) -> None: + if not cls.appended_lance_rest_conf or cls.main_conf_path is None: + return + cls._reset_conf(cls._lance_rest_config(), cls.main_conf_path) + cls.appended_lance_rest_conf = False + cls.restart_server() + + @staticmethod + def _wait_for_lance_rest_ready(timeout_s: float = 60.0) -> bool: + # Probe any registered Jersey path on the lance servlet. A 4xx + # response is fine; what we want is *any* response from the lance + # mount instead of connection refused or the bare Jetty 404. + deadline = time.monotonic() + timeout_s + url = LANCE_REST_BASE_URL + "/v1/namespace/list" + while time.monotonic() < deadline: + try: + resp = requests.get(url, timeout=2) + if resp.status_code < 500: + return True + except requests.RequestException: + pass + time.sleep(0.5) + return False + + def test_write_read_filter_via_lance_ray(self): + # Imports are deferred so the skipIf decorator handles missing deps + # cleanly without import errors at module load time. The lance/ray + # extras live in `requirements-lance.txt` (and `setup.py`'s `lance` + # extra), so they aren't present in the default `dev` install used by + # pylint — silence the resulting import-error. + # pylint: disable=import-outside-toplevel,import-error + import ray + from lance_ray import read_lance, write_lance + + # pylint: enable=import-outside-toplevel,import-error + + ns_properties = {"uri": LANCE_REST_BASE_URL} + table_id = [self.CATALOG_NAME, self.SCHEMA_NAME, self.TABLE_NAME] + + ray.init( + ignore_reinit_error=True, + num_cpus=2, + include_dashboard=False, + log_to_driver=False, + ) + try: + data = ray.data.range(1000).map( + lambda row: {"id": row["id"], "value": row["id"] * 2} + ) + + write_lance( + data, + namespace_impl="rest", + namespace_properties=ns_properties, + table_id=table_id, + ) + + ray_dataset = read_lance( + namespace_impl="rest", + namespace_properties=ns_properties, + table_id=table_id, + ) + # value = id * 2, value < 100 => id in [0, 49] => 50 rows + filtered_count = ray_dataset.filter(lambda row: row["value"] < 100).count() + self.assertEqual(50, filtered_count) + finally: + ray.shutdown() diff --git a/core/build.gradle.kts b/core/build.gradle.kts index bdf52f367a9..e92a7f52ab2 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -54,6 +54,13 @@ dependencies { exclude(group = "com.fasterxml.jackson.jaxrs", module = "jackson-jaxrs-json-provider") // using gravitino's version exclude(group = "org.apache.httpcomponents.client5", module = "*") // provided by gravitino exclude(group = "org.lance", module = "lance-namespace-core") // This is unnecessary in the core module + // Same rationale as lance-namespace-core: lance-core 6.0.0 declares + // lance-namespace-apache-client as a transitive, but core never calls into it. + // Leaving it on the main classpath shadows the lance-rest aux service's own + // lance-namespace-apache-client (loaded via lance-rest-server/libs/), and + // because the aux classloader is parent-first, the older transitive wins + // on request deserialization (e.g. dropping fields like `check_declared`). + exclude(group = "org.lance", module = "lance-namespace-apache-client") } implementation(libs.mybatis) diff --git a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java index 724e602372e..ec2579f7a7e 100644 --- a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java +++ b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -71,7 +72,7 @@ import org.lance.Fragment; import org.lance.FragmentMetadata; import org.lance.ReadOptions; -import org.lance.Transaction; +import org.lance.SourcedTransaction; import org.lance.WriteParams; import org.lance.ipc.LanceScanner; import org.lance.ipc.ScanOptions; @@ -108,7 +109,7 @@ public class LancePartitionStatisticStorage implements PartitionStatisticStorage private static final String STATISTIC_VALUE_COLUMN = "statistic_value"; private static final String AUDIT_INFO_COLUMN = "audit_info"; - private final Optional> datasetCache; + private final Optional> datasetCache; private static final Schema SCHEMA = new Schema( @@ -203,11 +204,11 @@ public LancePartitionStatisticStorage(Map properties) { Caffeine.newBuilder() .maximumSize(datasetCacheSize) .scheduler(Scheduler.forScheduledExecutorService(this.scheduler)) - .evictionListener( - (RemovalListener) + .removalListener( + (RemovalListener) (key, value, cause) -> { if (value != null) { - value.close(); + closeDatasetHolder(value); } }) .build()); @@ -297,7 +298,7 @@ private void appendStatisticsImpl(Long tableId, List datasetRead = getDataset(tableId); List fragmentMetas = createFragmentMetadata(tableId, updates); - Transaction appendTxn = + SourcedTransaction appendTxn = datasetRead .newTransactionBuilder() .operation(Append.builder().fragments(fragmentMetas).build()) @@ -306,7 +307,7 @@ private void appendStatisticsImpl(Long tableId, List newDataset = appendTxn.commit(); Dataset finalNewDataset = newDataset; - datasetCache.ifPresent(cache -> cache.put(tableId, finalNewDataset)); + datasetCache.ifPresent(cache -> cache.put(tableId, new DatasetHolder(finalNewDataset))); } finally { if (!datasetCache.isPresent()) { if (datasetRead != null) { @@ -355,15 +356,10 @@ private void dropStatisticsImpl(Long tableId, List drop @Override public void close() throws IOException { if (datasetCache.isPresent()) { - Cache cache = datasetCache.get(); - for (Dataset dataset : cache.asMap().values()) { - try { - dataset.close(); - } catch (Exception e) { - LOG.warn("Failed to close cached Lance dataset", e); - } - } + Cache cache = datasetCache.get(); + cache.asMap().values().forEach(LancePartitionStatisticStorage::closeDatasetHolder); cache.invalidateAll(); + cache.cleanUp(); } if (allocator != null) { @@ -376,7 +372,7 @@ public void close() throws IOException { } @VisibleForTesting - Cache getDatasetCache() { + Cache getDatasetCache() { return datasetCache.orElse(null); } @@ -563,20 +559,20 @@ private Dataset getDataset(Long tableId) { return datasetCache .map( cache -> { - Dataset cachedDataset = + DatasetHolder holder = cache.get( tableId, id -> { newlyCreated.set(true); - return open(getFilePath(id)); + return new DatasetHolder(open(getFilePath(id))); }); // Ensure dataset uses the latest version if (!newlyCreated.get()) { - cachedDataset.checkoutLatest(); + holder.checkoutLatest(); } - return cachedDataset; + return holder.getDataset(); }) .orElse(open(getFilePath(tableId))); } @@ -609,4 +605,43 @@ private Dataset open(String fileName) { private ThreadFactory newDaemonThreadFactory(String name) { return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + "-%d").build(); } + + private static void closeDatasetHolder(DatasetHolder holder) { + try { + holder.close(); + } catch (IOException | RuntimeException e) { + LOG.warn("Failed to close cached Lance dataset", e); + } + } + + /** + * Package-private wrapper around a {@link Dataset} stored in the dataset cache. Exists solely to + * allow test code to mock this holder (and thus verify close-ordering) without requiring Mockito + * to instrument the JNI-heavy {@link Dataset} class itself. + */ + static class DatasetHolder implements Closeable { + + private final Dataset dataset; + + private final AtomicBoolean closed = new AtomicBoolean(false); + + DatasetHolder(Dataset dataset) { + this.dataset = dataset; + } + + Dataset getDataset() { + return dataset; + } + + void checkoutLatest() { + dataset.checkoutLatest(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + dataset.close(); + } + } + } } diff --git a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java index cc298e9b03f..5a6ae16e980 100644 --- a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java +++ b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java @@ -23,6 +23,8 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.github.benmanes.caffeine.cache.Cache; @@ -47,9 +49,9 @@ import org.apache.gravitino.stats.PartitionStatisticsUpdate; import org.apache.gravitino.stats.StatisticValue; import org.apache.gravitino.stats.StatisticValues; +import org.apache.gravitino.stats.storage.LancePartitionStatisticStorage.DatasetHolder; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.lance.Dataset; import org.mockito.InOrder; public class TestLancePartitionStatisticStorage { @@ -623,10 +625,10 @@ public void testCloseReleasesCachedDatasetBeforeAllocator() throws Exception { BufferAllocator allocator = spy(new RootAllocator(Long.MAX_VALUE)); FieldUtils.writeField(storage, "allocator", allocator, true); - Cache datasetCache = storage.getDatasetCache(); + Cache datasetCache = storage.getDatasetCache(); Assertions.assertNotNull(datasetCache); - Dataset dataset = mock(Dataset.class); + DatasetHolder holder = mock(DatasetHolder.class); VarCharVector buffer = new VarCharVector("test", allocator); buffer.allocateNew(1024); @@ -635,21 +637,50 @@ public void testCloseReleasesCachedDatasetBeforeAllocator() throws Exception { buffer.close(); return null; }) - .when(dataset) + .when(holder) .close(); - datasetCache.put(1L, dataset); + datasetCache.put(1L, holder); storage.close(); Assertions.assertEquals(0, allocator.getAllocatedMemory()); - InOrder inOrder = inOrder(dataset, allocator); - inOrder.verify(dataset).close(); + InOrder inOrder = inOrder(holder, allocator); + inOrder.verify(holder).close(); inOrder.verify(allocator).close(); } finally { FileUtils.deleteDirectory(new File(location)); } } + + @Test + public void testDatasetCacheClosesPreviousHolderOnReplacement() throws Exception { + String location = Files.createTempDirectory("lance_stats_replace_cache").toString(); + Map properties = Maps.newHashMap(); + properties.put("location", location); + properties.put("datasetCacheSize", "10"); + + LancePartitionStatisticStorage storage = new LancePartitionStatisticStorage(properties); + + try { + Cache datasetCache = storage.getDatasetCache(); + Assertions.assertNotNull(datasetCache); + + DatasetHolder previousHolder = mock(DatasetHolder.class); + DatasetHolder newHolder = mock(DatasetHolder.class); + + datasetCache.put(1L, previousHolder); + datasetCache.put(1L, newHolder); + + verify(previousHolder, timeout(5000)).close(); + + storage.close(); + + verify(newHolder).close(); + } finally { + FileUtils.deleteDirectory(new File(location)); + } + } } diff --git a/docs/lance-rest-integration.md b/docs/lance-rest-integration.md index 66fbaff4f37..8b87183de7f 100644 --- a/docs/lance-rest-integration.md +++ b/docs/lance-rest-integration.md @@ -20,14 +20,19 @@ This documentation assumes familiarity with the Lance REST service setup as desc The following table outlines the tested compatibility between Gravitino versions and Lance connector versions: -| Gravitino Version (Lance REST) | Supported lance-spark Versions | Supported lance-ray Versions | -|--------------------------------|--------------------------------|------------------------------| -| 1.1.1 - 1.2.1 | 0.0.10 - 0.0.15 | 0.0.6 - 0.0.8 | -| 1.3.0 | 0.1.0 - 0.2.0 | 0.0.6 - 0.2.0 | +| Gravitino Version (Lance REST) | Supported lance-spark Versions | Supported lance-ray Versions | +|--------------------------------|--------------------------------|-----------------------------------------------| +| 1.1.1 - 1.2.1 | 0.0.10 - 0.0.15 | 0.0.6 - 0.0.8 | +| 1.3.0 | 0.1.0 - 0.4.0 | 0.3.0 - 0.4.2, 0.2.0 supports with conditions | :::note - These version ranges show which versions are expected to work together. -- Not all versions in these ranges have been tested. Only some versions were tested. +- For Gravitino 1.3.0, the explicitly verified release versions are + `lance-spark` {0.1.0, 0.1.1, 0.2.0, 0.4.0} and `lance-ray` + {0.3.0, 0.4.2}. By default, lance-ray 0.2.0 and earlier are *not* supported on 1.3.0 + because pip resolves them with an older `lance-namespace` whose request + schema is incompatible with the upgraded server-side `lance-namespace-core` + (0.7.5+). But if you can still use lance-ray 0.2.0 with Gravitino 1.3.0 by pining pylance to 3.x or 4.x; - Before using in production, please test the exact connector versions in your own environment. - The Lance ecosystem is changing quickly, so some versions may introduce breaking changes. ::: @@ -79,7 +84,7 @@ logging.basicConfig(level=logging.INFO) # Replace /path/to/lance-spark-bundle-3.5_2.12-X.X.XX.jar with your actual JAR path and version; # refer to the compatibility matrix for supported lance-spark versions. os.environ["PYSPARK_SUBMIT_ARGS"] = ( - "--jars /path/to/lance-spark-bundle-3.5_2.12-0.0.15.jar " + "--jars /path/to/lance-spark-bundle-3.5_2.12-0.4.0.jar " "--conf \"spark.driver.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED\" " "--conf \"spark.executor.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED\" " "--master local[1] pyspark-shell" @@ -175,7 +180,8 @@ pip install lance-ray :::info - Ray will be automatically installed if not already present -- The lance-namespace version must be less than or equal to 0.4.5. +- For Gravitino 1.3.0, use a `lance-namespace` client compatible with + server-side `lance-namespace-core` 0.7.5 or newer. - Ensure Ray version compatibility in your environment before deployment ::: diff --git a/docs/lance-rest-service.md b/docs/lance-rest-service.md index 5a299ac9249..c2e8b8698f1 100644 --- a/docs/lance-rest-service.md +++ b/docs/lance-rest-service.md @@ -81,8 +81,7 @@ The Lance REST service provides comprehensive support for namespace management, | TableExists | Check whether a table exists | POST | `/lance/v1/table/{id}/exists` | 1.1.0 | | RegisterTable | Register an existing Lance table to a namespace | POST | `/lance/v1/table/{id}/register` | 1.1.0 | | DeregisterTable | Unregister a table from a namespace (metadata only, data remains) | POST | `/lance/v1/table/{id}/deregister` | 1.1.0 | -| CreateEmptyTable | **Deprecated**: Use `DeclareTable` instead. Declare a table and store the metadata without touching lance table data, for more, please refer to [doc](https://docs.lancedb.com/api-reference/rest/table/create-an-empty-table) | POST | `/lance/v1/table/{id}/create-empty` | 1.1.0 | -| DeclareTable | Declare a table and store the metadata without touching lance table data. This is the preferred replacement for `CreateEmptyTable`. | POST | `/lance/v1/table/{id}/declare` | 1.3.0 | +| DeclareTable | Declare a table and store the metadata without touching lance table data. | POST | `/lance/v1/table/{id}/declare` | 1.3.0 | More details, please refer to the [Lance REST API specification](https://lance.org/format/namespace/rest/catalog-spec/) @@ -280,17 +279,7 @@ curl -X POST http://localhost:9101/lance/v1/table/lance_catalog%24schema%24table "mode": "create" }' -# Create a new empty table -# x-lance-table-properties is optional; if omitted, it defaults to an empty map. -curl -X POST http://localhost:9101/lance/v1/table/lance_catalog%24schema%24table02/create-empty \ - -H 'Content-Type: application/json' \ - -H "x-lance-table-properties: {\"description\":\"This is table02\"}" \ - -d '{ - "id": ["lance_catalog", "schema", "table02"], - "location": "/tmp/lance_catalog/schema/table02" - }' - -# Declare a table (preferred replacement for create-empty) +# Declare a table curl -X POST http://localhost:9101/lance/v1/table/lance_catalog%24schema%24table04/declare \ -H 'Content-Type: application/json' \ -d '{ @@ -350,11 +339,11 @@ registerTableRequest.setId(Arrays.asList("lance_catalog", "schema", "table01")); registerTableRequest.setMode("create"); ns.registerTable(registerTableRequest); -// Create an empty table -CreateEmptyTableRequest createEmptyTableRequest = new CreateEmptyTableRequest(); -createEmptyTableRequest.setLocation("/tmp/lance_catalog/schema/table02"); -createEmptyTableRequest.setId(Arrays.asList("lance_catalog", "schema", "table02")); -ns.createEmptyTable(createEmptyTableRequest); +// Declare a table +DeclareTableRequest declareTableRequest = new DeclareTableRequest(); +declareTableRequest.setLocation("/tmp/lance_catalog/schema/table02"); +declareTableRequest.setId(Arrays.asList("lance_catalog", "schema", "table02")); +ns.declareTable(declareTableRequest); // Create a table with schema inferred from Arrow IPC file. // For REST create API, location/properties are passed via headers. @@ -400,12 +389,12 @@ register_table_request = ln.RegisterTableRequest( ) ns.register_table(register_table_request) -# Create an empty table -create_empty_table_request = ln.CreateEmptyTableRequest( +# Declare a table +declare_table_request = ln.DeclareTableRequest( id=['lance_catalog', 'schema', 'table02'], location='/tmp/lance_catalog/schema/table02' ) -ns.create_empty_table(create_empty_table_request) +ns.declare_table(declare_table_request) # Create a table with schema inferred from Arrow IPC file. # For REST create API, location/properties are passed via headers. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index cfa98b7416d..8a7228a3cff 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -28,8 +28,8 @@ guava = "32.1.3-jre" lombok = "1.18.20" slf4j = "2.0.16" log4j = "2.25.4" -lance = "2.0.1" -lance-namespace = "0.4.5" +lance = "6.0.0" +lance-namespace = "0.7.5" delta-kernel = "3.3.0" jetty = "9.4.58.v20250814" jersey = "2.41" diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java index dae90499580..6bdfcb50548 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Optional; -import org.lance.namespace.model.CreateEmptyTableResponse; import org.lance.namespace.model.CreateTableResponse; import org.lance.namespace.model.DeclareTableResponse; import org.lance.namespace.model.DeregisterTableResponse; @@ -36,9 +35,11 @@ public interface LanceTableOperations { * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" * @param delimiter the delimiter used in the namespace * @param version the version of the table to describe, if null, describe the latest version + * @param checkDeclared whether to populate the is_only_declared response field * @return the table description */ - DescribeTableResponse describeTable(String tableId, String delimiter, Optional version); + DescribeTableResponse describeTable( + String tableId, String delimiter, Optional version, boolean checkDeclared); /** * Create a new table. @@ -60,8 +61,7 @@ CreateTableResponse createTable( byte[] arrowStreamBody); /** - * Declare a table without touching storage. This is the preferred API for creating metadata-only - * table entries, replacing the deprecated {@link #createEmptyTable} method. + * Declare a table without touching storage. * * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" * @param delimiter the delimiter used in the namespace @@ -72,21 +72,6 @@ CreateTableResponse createTable( DeclareTableResponse declareTable( String tableId, String delimiter, String tableLocation, Map tableProperties); - /** - * Create a new table without schema. - * - * @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}" - * @param delimiter the delimiter used in the namespace - * @param tableLocation the location where the table data will be stored - * @param tableProperties the properties of the table - * @return the response of the create table operation - * @deprecated Use {@link #declareTable} instead. - */ - @Deprecated - @SuppressWarnings("deprecation") - CreateEmptyTableResponse createEmptyTable( - String tableId, String delimiter, String tableLocation, Map tableProperties); - /** * Register an existing table. * diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java index a1ec2ee9bb7..21a53a86007 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java @@ -22,7 +22,7 @@ import static org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_CREATION_MODE; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION; -import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_CREATE_EMPTY; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_DECLARED; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_FORMAT; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_VERSION; import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; @@ -53,7 +53,6 @@ import org.lance.namespace.errors.TableNotFoundException; import org.lance.namespace.model.AlterTableAlterColumnsRequest; import org.lance.namespace.model.AlterTableDropColumnsRequest; -import org.lance.namespace.model.CreateEmptyTableResponse; import org.lance.namespace.model.CreateTableResponse; import org.lance.namespace.model.DeclareTableResponse; import org.lance.namespace.model.DeregisterTableResponse; @@ -100,7 +99,7 @@ public GravitinoLanceTableOperations(GravitinoLanceNamespaceWrapper namespaceWra @Override public DescribeTableResponse describeTable( - String tableId, String delimiter, Optional version) { + String tableId, String delimiter, Optional version, boolean checkDeclared) { if (!version.isEmpty()) { throw new UnsupportedOperationException( "Describing specific table version is not supported. It should be null to indicate the" @@ -125,6 +124,7 @@ public DescribeTableResponse describeTable( } DescribeTableResponse response = new DescribeTableResponse(); response.setMetadata(table.properties()); + response.setProperties(table.properties()); response.setLocation(table.properties().get(LANCE_LOCATION)); response.setSchema(toJsonArrowSchema(table.columns())); response.setVersion( @@ -133,6 +133,11 @@ public DescribeTableResponse describeTable( .orElse(null)); response.setStorageOptions( LancePropertiesUtils.resolveLanceStorageOptions(catalog.properties(), table.properties())); + response.setManagedVersioning(false); + if (checkDeclared) { + response.setIsOnlyDeclared( + Boolean.parseBoolean(table.properties().getOrDefault(LANCE_TABLE_DECLARED, "false"))); + } return response; } @@ -188,6 +193,7 @@ public CreateTableResponse createTable( response.setVersion( Optional.ofNullable(properties.get(LANCE_TABLE_VERSION)).map(Long::valueOf).orElse(null)); response.setLocation(properties.get(LANCE_LOCATION)); + response.setProperties(properties); return response; } @@ -197,7 +203,7 @@ public DeclareTableResponse declareTable( ImmutableMap props = ImmutableMap.builder() .putAll(tableProperties) - .put(LANCE_TABLE_CREATE_EMPTY, "true") + .put(LANCE_TABLE_DECLARED, "true") .put(Table.PROPERTY_EXTERNAL, "true") .build(); @@ -206,29 +212,11 @@ public DeclareTableResponse declareTable( DeclareTableResponse declareTableResponse = new DeclareTableResponse(); declareTableResponse.setLocation(response.getLocation()); declareTableResponse.setStorageOptions(response.getStorageOptions()); + declareTableResponse.setProperties(response.getProperties()); + declareTableResponse.setManagedVersioning(false); return declareTableResponse; } - @Override - @SuppressWarnings("deprecation") - public CreateEmptyTableResponse createEmptyTable( - String tableId, String delimiter, String tableLocation, Map tableProperties) { - // Empty table creation only supports CREATE mode (not EXIST_OK or OVERWRITE). - ImmutableMap props = - ImmutableMap.builder() - .putAll(tableProperties) - .put(LANCE_TABLE_CREATE_EMPTY, "true") - .put(Table.PROPERTY_EXTERNAL, "true") - .build(); - - CreateTableResponse response = - createTable(tableId, "create", delimiter, tableLocation, props, null); - CreateEmptyTableResponse emptyTableResponse = new CreateEmptyTableResponse(); - emptyTableResponse.setLocation(response.getLocation()); - emptyTableResponse.setStorageOptions(response.getStorageOptions()); - return emptyTableResponse; - } - @Override public RegisterTableResponse registerTable( String tableId, String mode, String delimiter, Map tableProperties) { diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java index 991c6b8f261..f5faaad1c2e 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java @@ -38,8 +38,8 @@ public class LanceConstants { public static final String LANCE_TABLE_REGISTER = "lance.register"; public static final String LANCE_TABLE_VERSION = "lance.version"; - // Mark whether it is to create an empty Lance table(no data files) - public static final String LANCE_TABLE_CREATE_EMPTY = "lance.create-empty"; + // Mark whether the table is declared only in metadata without creating a Lance dataset. + public static final String LANCE_TABLE_DECLARED = "lance.declared"; public static final String LANCE_TABLE_FORMAT = "lance"; } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java index 3555fa3f99a..39fbe93bf4d 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java @@ -68,4 +68,22 @@ public static Map resolveLanceStorageOptions( effectiveStorageOptions.putAll(getLanceStorageOptions(tableProperties)); return effectiveStorageOptions; } + + /** + * Converts Lance storage options to table properties. + * + *

The input map should use unprefixed Lance storage option keys. The returned map prefixes + * each key with {@code lance.storage.}. A {@code null} input returns an empty map. + * + * @param storageOptions the unprefixed Lance storage options + * @return the table properties with Lance storage option prefixes + */ + public static Map toTableProperties(Map storageOptions) { + return storageOptions == null + ? Map.of() + : storageOptions.entrySet().stream() + .collect( + Collectors.toMap( + entry -> LANCE_STORAGE_OPTIONS_PREFIX + entry.getKey(), Map.Entry::getValue)); + } } diff --git a/lance/lance-rest-server/build.gradle.kts b/lance/lance-rest-server/build.gradle.kts index 0198099df51..8a91cc1322d 100644 --- a/lance/lance-rest-server/build.gradle.kts +++ b/lance/lance-rest-server/build.gradle.kts @@ -28,7 +28,7 @@ val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() val sparkVersion: String = libs.versions.spark35.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() -val lanceSparkBundleVersion = "0.2.0" +val lanceSparkBundleVersion = "0.4.0" val lanceSparkBundleJarPathProperty = "gravitino.lance.spark.bundle.jar" val lanceSparkBundleDir = layout.buildDirectory.dir("lance-spark-bundle") val lanceSparkBundle by configurations.creating { diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/JsonNullableMapperProvider.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/JsonNullableMapperProvider.java new file mode 100644 index 00000000000..31470e1a7bf --- /dev/null +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/JsonNullableMapperProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.service.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import javax.ws.rs.ext.ContextResolver; +import javax.ws.rs.ext.Provider; +import org.apache.gravitino.server.web.ObjectMapperProvider; +import org.openapitools.jackson.nullable.JsonNullableModule; + +/** + * JAX-RS {@link ContextResolver} that provides an {@link ObjectMapper} with the {@link + * JsonNullableModule} registered. + * + *

lance-namespace 0.7.5 models use {@code JsonNullable} for optional fields, which requires + * this module for correct Jackson serialization/deserialization. + */ +@Provider +public class JsonNullableMapperProvider implements ContextResolver { + + private static final ObjectMapper MAPPER = + ObjectMapperProvider.objectMapper().copy().registerModule(new JsonNullableModule()); + + @Override + public ObjectMapper getContext(Class type) { + return MAPPER; + } +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java index 667eb1b4986..5114b475c31 100644 --- a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java @@ -45,6 +45,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.lance.common.ops.NamespaceWrapper; import org.apache.gravitino.lance.common.utils.LanceConstants; +import org.apache.gravitino.lance.common.utils.LancePropertiesUtils; import org.apache.gravitino.lance.common.utils.SerializationUtils; import org.apache.gravitino.lance.service.LanceExceptionMapper; import org.apache.gravitino.metrics.MetricNames; @@ -54,7 +55,6 @@ import org.lance.namespace.model.AlterTableAlterColumnsResponse; import org.lance.namespace.model.AlterTableDropColumnsRequest; import org.lance.namespace.model.AlterTableDropColumnsResponse; -import org.lance.namespace.model.CreateEmptyTableResponse; import org.lance.namespace.model.CreateTableResponse; import org.lance.namespace.model.DeclareTableRequest; import org.lance.namespace.model.DeclareTableResponse; @@ -87,13 +87,19 @@ public LanceTableOperations(NamespaceWrapper lanceNamespace) { public Response describeTable( @PathParam("id") String tableId, @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") String delimiter, + @QueryParam("check_declared") Boolean checkDeclared, DescribeTableRequest request) { try { validateDescribeTableRequest(request); + Optional version = + request == null ? Optional.empty() : Optional.ofNullable(request.getVersion()); + boolean shouldCheckDeclared = + Optional.ofNullable(checkDeclared) + .orElse(request != null && Boolean.TRUE.equals(request.getCheckDeclared())); DescribeTableResponse response = lanceNamespace .asTableOps() - .describeTable(tableId, delimiter, Optional.ofNullable(request.getVersion())); + .describeTable(tableId, delimiter, version, shouldCheckDeclared); return Response.ok(response).build(); } catch (Exception e) { return LanceExceptionMapper.toRESTResponse(tableId, e); @@ -110,6 +116,8 @@ public Response createTable( @PathParam("id") String tableId, @QueryParam("mode") @DefaultValue("create") String mode, // create, exist_ok, overwrite @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) String delimiter, + @QueryParam("properties") String queryProperties, + @QueryParam("storage_options") String queryStorageOptions, @Context HttpHeaders headers, byte[] arrowStreamBody) { try { @@ -118,6 +126,10 @@ public Response createTable( String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER); String tableProperties = headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER); Map props = SerializationUtils.deserializeProperties(tableProperties); + props.putAll(SerializationUtils.deserializeProperties(queryProperties)); + props.putAll( + LancePropertiesUtils.toTableProperties( + SerializationUtils.deserializeProperties(queryStorageOptions))); CreateTableResponse response = lanceNamespace .asTableOps() @@ -134,44 +146,6 @@ public Response createTable( } } - /** - * According to the spec of lance-namespace with version 0.0.20 to 0.31, createEmptyTable only - * stores the table metadata including its location, and will never touch lance storage. - */ - @POST - @Path("/create-empty") - @Produces("application/json") - @Timed(name = "create-empty-table." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) - @ResponseMetered(name = "create-empty-table", absolute = true) - @SuppressWarnings("deprecation") - public Response createEmptyTable( - @PathParam("id") String tableId, - @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) String delimiter, - Map requestBody, - @Context HttpHeaders headers) { - try { - validateCreateEmptyTableRequest(requestBody); - String tableLocation = - Optional.ofNullable(requestBody) - .map(body -> body.get(LANCE_LOCATION)) - .map(String::valueOf) - .orElse(null); - Map props = extractPropertiesFromBody(requestBody); - MultivaluedMap headersMap = headers.getRequestHeaders(); - String tableProperties = headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER); - Map headerProps = SerializationUtils.deserializeProperties(tableProperties); - // Keep backward compatibility: accept body properties and let header override on key - // conflict. - props.putAll(headerProps); - - CreateEmptyTableResponse response = - lanceNamespace.asTableOps().createEmptyTable(tableId, delimiter, tableLocation, props); - return Response.ok(response).build(); - } catch (Exception e) { - return LanceExceptionMapper.toRESTResponse(tableId, e); - } - } - @POST @Path("/declare") @Produces("application/json") @@ -189,6 +163,9 @@ public Response declareTable( MultivaluedMap headersMap = headers.getRequestHeaders(); String tableProperties = headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER); Map props = SerializationUtils.deserializeProperties(tableProperties); + if (declareTableRequest.getProperties() != null) { + props.putAll(declareTableRequest.getProperties()); + } DeclareTableResponse response = lanceNamespace.asTableOps().declareTable(tableId, delimiter, tableLocation, props); @@ -331,32 +308,11 @@ public Response alterColumns( } } - @SuppressWarnings({"unused", "deprecation"}) - private void validateCreateEmptyTableRequest(Map requestBody) { - // No specific fields to validate for now - } - private void validateDeclareTableRequest( @SuppressWarnings("unused") DeclareTableRequest request) { // No specific fields to validate for now } - private static Map extractPropertiesFromBody(Map requestBody) { - if (requestBody == null) { - return Maps.newHashMap(); - } - - Object propertiesObject = requestBody.get("properties"); - if (!(propertiesObject instanceof Map)) { - return Maps.newHashMap(); - } - - Map properties = Maps.newHashMap(); - ((Map) propertiesObject) - .forEach((key, value) -> properties.put(String.valueOf(key), String.valueOf(value))); - return properties; - } - private void validateRegisterTableRequest( @SuppressWarnings("unused") RegisterTableRequest request) { // No specific fields to validate for now diff --git a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceModeParsing.java b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceModeParsing.java index 4ac063a4060..cadfccebdfa 100644 --- a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceModeParsing.java +++ b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceModeParsing.java @@ -20,12 +20,16 @@ import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_CREATION_MODE; import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_STORAGE_OPTIONS_PREFIX; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_DECLARED; +import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_VERSION; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.when; import java.util.Map; +import java.util.Optional; import java.util.regex.Pattern; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; @@ -35,6 +39,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.lance.namespace.errors.InvalidInputException; +import org.lance.namespace.model.CreateTableResponse; +import org.lance.namespace.model.DescribeTableResponse; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -162,6 +168,61 @@ void testRegisterModeRejectsMalformedValues() { exception.getMessage().contains("Unknown register table mode: #register$")); } + @Test + void testCreateTableReturnsPropertiesAndStorageOptions() { + TableCatalog tableCatalog = Mockito.mock(TableCatalog.class); + Table table = Mockito.mock(Table.class); + when(table.properties()) + .thenReturn( + Map.of( + "custom-key", + "custom-value", + LANCE_LOCATION, + "/tmp/table", + LANCE_TABLE_VERSION, + "5", + LANCE_STORAGE_OPTIONS_PREFIX + "region", + "us-west-2")); + when(tableCatalog.createTable( + any(NameIdentifier.class), any(Column[].class), isNull(), anyMap())) + .thenReturn(table); + GravitinoLanceTableOperations operations = newTableOperations(tableCatalog); + + CreateTableResponse response = + operations.createTable("catalog.schema.table", "create", ".", null, Map.of(), null); + + Assertions.assertEquals("/tmp/table", response.getLocation()); + Assertions.assertEquals(5L, response.getVersion()); + Assertions.assertEquals("custom-value", response.getProperties().get("custom-key")); + Assertions.assertEquals("us-west-2", response.getStorageOptions().get("region")); + } + + @Test + void testDescribeTableReturnsDeclaredStateWhenRequested() { + TableCatalog tableCatalog = Mockito.mock(TableCatalog.class); + Table table = Mockito.mock(Table.class); + when(table.properties()) + .thenReturn( + Map.of( + LANCE_LOCATION, + "/tmp/table", + LANCE_TABLE_DECLARED, + "true", + LANCE_STORAGE_OPTIONS_PREFIX + "region", + "us-west-2")); + when(table.columns()).thenReturn(new Column[0]); + when(tableCatalog.loadTable(any(NameIdentifier.class))).thenReturn(table); + GravitinoLanceTableOperations operations = newTableOperations(tableCatalog); + + DescribeTableResponse response = + operations.describeTable("catalog.schema.table", ".", Optional.empty(), true); + + Assertions.assertEquals(Boolean.TRUE, response.getIsOnlyDeclared()); + Assertions.assertEquals(Boolean.FALSE, response.getManagedVersioning()); + Assertions.assertEquals("true", response.getProperties().get(LANCE_TABLE_DECLARED)); + Assertions.assertEquals("us-west-2", response.getStorageOptions().get("region")); + } + private static GravitinoLanceTableOperations newTableOperations(TableCatalog tableCatalog) { GravitinoLanceNamespaceWrapper namespaceWrapper = Mockito.mock(GravitinoLanceNamespaceWrapper.class); diff --git a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java index 3c0661971db..40798407e41 100644 --- a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java +++ b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java @@ -64,8 +64,6 @@ import org.lance.namespace.model.AlterTableAlterColumnsResponse; import org.lance.namespace.model.AlterTableDropColumnsRequest; import org.lance.namespace.model.AlterTableDropColumnsResponse; -import org.lance.namespace.model.CreateEmptyTableRequest; -import org.lance.namespace.model.CreateEmptyTableResponse; import org.lance.namespace.model.CreateNamespaceRequest; import org.lance.namespace.model.CreateNamespaceResponse; import org.lance.namespace.model.CreateTableResponse; @@ -221,7 +219,7 @@ public void testDescribeNamespace() { RuntimeException exception = Assertions.assertThrows( RuntimeException.class, () -> ns.describeNamespace(nonExistentCatalogReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":1")); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_NOT_FOUND); // test describe a non-existent schema namespace DescribeNamespaceRequest nonExistentSchemaReq = new DescribeNamespaceRequest(); @@ -230,7 +228,7 @@ public void testDescribeNamespace() { exception = Assertions.assertThrows( RuntimeException.class, () -> ns.describeNamespace(nonExistentSchemaReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":1")); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_NOT_FOUND); } @Test @@ -256,7 +254,7 @@ public void testCreateNamespace() { RuntimeException exception = Assertions.assertThrows( RuntimeException.class, () -> ns.createNamespace(createNamespaceReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":2")); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_ALREADY_EXISTS); // create catalog again with exist_ok mode should succeed createNamespaceReq.setMode("exist_ok"); @@ -298,7 +296,7 @@ public void testCreateNamespace() { // create schema again with default mode (create) should fail exception = Assertions.assertThrows(RuntimeException.class, () -> ns.createNamespace(createSchemaReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":2")); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_ALREADY_EXISTS); // create schema again with exist_ok mode should succeed createSchemaReq.setMode("exist_ok"); @@ -330,7 +328,7 @@ public void testDropNamespace() { dropNamespaceReq.addIdItem("non_existent_catalog"); RuntimeException exception = Assertions.assertThrows(RuntimeException.class, () -> ns.dropNamespace(dropNamespaceReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":1")); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_NOT_FOUND); // test drop a non-existent namespace (catalog) with SKIP mode should succeed dropNamespaceReq.setMode("skip"); @@ -343,7 +341,7 @@ public void testDropNamespace() { dropSchemaReq.addIdItem("non_existent_schema"); exception = Assertions.assertThrows(RuntimeException.class, () -> ns.dropNamespace(dropSchemaReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":1")); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_NOT_FOUND); // test drop a non-existent namespace (schema) with SKIP mode should succeed dropSchemaReq.setMode("skip"); @@ -356,7 +354,7 @@ public void testDropNamespace() { exception = Assertions.assertThrows( RuntimeException.class, () -> ns.dropNamespace(dropNonEmptyCatalogReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":13")); + assertLanceErrorCode(exception, ErrorCode.INVALID_INPUT); // test drop a non-empty namespace (catalog) with CASCADE behavior should succeed dropNonEmptyCatalogReq.setBehavior("cascade"); @@ -384,7 +382,7 @@ public void testDropNamespace() { exception = Assertions.assertThrows( RuntimeException.class, () -> ns.dropNamespace(dropNonEmptySchemaReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":13")); + assertLanceErrorCode(exception, ErrorCode.INVALID_INPUT); Assertions.assertTrue(catalog.asSchemas().schemaExists(schema.name())); // test drop a non-empty namespace (schema) with CASCADE behavior should succeed @@ -410,7 +408,7 @@ public void testNamespaceExists() { RuntimeException exception = Assertions.assertThrows( RuntimeException.class, () -> ns.namespaceExists(nonExistentCatalogReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":1")); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_NOT_FOUND); // test existing schema NamespaceExistsRequest schemaExistsReq = new NamespaceExistsRequest(); @@ -425,63 +423,7 @@ public void testNamespaceExists() { exception = Assertions.assertThrows( RuntimeException.class, () -> ns.namespaceExists(nonExistentSchemaReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":1")); - } - - @Test - void testCreateEmptyTable() throws ApiException { - catalog = createCatalog(CATALOG_NAME); - createSchema(); - - CreateEmptyTableRequest request = new CreateEmptyTableRequest(); - String location = tempDir + "/" + "empty_table/"; - request.setLocation(location); - request.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "empty_table")); - - CreateEmptyTableResponse response = ns.createEmptyTable(request); - Assertions.assertNotNull(response); - Assertions.assertEquals(location, response.getLocation()); - - DescribeTableRequest describeTableRequest = new DescribeTableRequest(); - describeTableRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "empty_table")); - - DescribeTableResponse loadTable = ns.describeTable(describeTableRequest); - Assertions.assertNotNull(loadTable); - Assertions.assertEquals(location, loadTable.getLocation()); - Assertions.assertEquals( - "true", loadTable.getMetadata().get(LanceConstants.LANCE_TABLE_CREATE_EMPTY)); - Assertions.assertEquals("true", loadTable.getMetadata().get(Table.PROPERTY_EXTERNAL)); - - // Try to create the same table again should fail - RuntimeException exception = - Assertions.assertThrows( - RuntimeException.class, - () -> { - ns.createEmptyTable(request); - }); - Assertions.assertTrue(exception.getMessage().contains("\"code\":5")); - - // Create an empty table with non-existent location should succeed - // since storage is not touched - CreateEmptyTableRequest wrongLocationRequest = new CreateEmptyTableRequest(); - wrongLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "another_table")); - String another_location = tempDir + "/" + "another_location/"; - Assertions.assertFalse(new File(another_location).exists()); - wrongLocationRequest.setLocation(another_location); - response = ns.createEmptyTable(wrongLocationRequest); - Assertions.assertNotNull(response); - Assertions.assertEquals(another_location, response.getLocation()); - // Will not touch storage, so the path should not be created. - Assertions.assertFalse(new File(another_location).exists()); - - // Create another empty table at a new location and verify it succeeds - String correctedLocation = tempDir + "/" + "wrong_location_table/"; - wrongLocationRequest.setLocation(correctedLocation); - wrongLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "wrong_location_table")); - CreateEmptyTableResponse wrongLocationResponse = - Assertions.assertDoesNotThrow(() -> ns.createEmptyTable(wrongLocationRequest)); - Assertions.assertNotNull(wrongLocationResponse); - Assertions.assertEquals(correctedLocation, wrongLocationResponse.getLocation()); + assertLanceErrorCode(exception, ErrorCode.NAMESPACE_NOT_FOUND); } @Test @@ -852,16 +794,16 @@ void testDeregisterNonExistingTable() { RuntimeException exception = Assertions.assertThrows( RuntimeException.class, () -> ns.deregisterTable(deregisterTableRequest)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":4")); + assertLanceErrorCode(exception, ErrorCode.TABLE_NOT_FOUND); Assertions.assertTrue(exception.getMessage().contains("Table not found")); // Try to create a table and then deregister table - CreateEmptyTableRequest createEmptyTableRequest = new CreateEmptyTableRequest(); + DeclareTableRequest declareTableRequest = new DeclareTableRequest(); String location = tempDir + "/" + "to_be_deregistered_table/"; ids = List.of(CATALOG_NAME, SCHEMA_NAME, "to_be_deregistered_table"); - createEmptyTableRequest.setLocation(location); - createEmptyTableRequest.setId(ids); - CreateEmptyTableResponse response = - Assertions.assertDoesNotThrow(() -> ns.createEmptyTable(createEmptyTableRequest)); + declareTableRequest.setLocation(location); + declareTableRequest.setId(ids); + DeclareTableResponse response = + Assertions.assertDoesNotThrow(() -> ns.declareTable(declareTableRequest)); Assertions.assertNotNull(response); Assertions.assertEquals(location, response.getLocation()); @@ -881,7 +823,7 @@ void testDeregisterNonExistingTable() { RuntimeException describeException = Assertions.assertThrows( RuntimeException.class, () -> ns.describeTable(describeTableRequest)); - Assertions.assertTrue(describeException.getMessage().contains("\"code\":4")); + assertLanceErrorCode(describeException, ErrorCode.TABLE_NOT_FOUND); describeTableRequest.setVersion(1L); RuntimeException versionException = @@ -899,12 +841,12 @@ void testTableExists() { createSchema(); List ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table_exists"); - CreateEmptyTableRequest createEmptyTableRequest = new CreateEmptyTableRequest(); + DeclareTableRequest declareTableRequest = new DeclareTableRequest(); String location = tempDir + "/" + "table_exists/"; - createEmptyTableRequest.setLocation(location); - createEmptyTableRequest.setId(ids); - CreateEmptyTableResponse response = - Assertions.assertDoesNotThrow(() -> ns.createEmptyTable(createEmptyTableRequest)); + declareTableRequest.setLocation(location); + declareTableRequest.setId(ids); + DeclareTableResponse response = + Assertions.assertDoesNotThrow(() -> ns.declareTable(declareTableRequest)); Assertions.assertNotNull(response); Assertions.assertEquals(location, response.getLocation()); @@ -918,8 +860,8 @@ void testTableExists() { tableExistsReq.setId(nonExistingIds); RuntimeException exception = Assertions.assertThrows(RuntimeException.class, () -> ns.tableExists(tableExistsReq)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":4")); - Assertions.assertTrue(exception.getMessage().contains("Not Found")); + assertLanceErrorCode(exception, ErrorCode.TABLE_NOT_FOUND); + Assertions.assertTrue(exception.getMessage().contains("Table not found")); } @Test @@ -928,12 +870,12 @@ void testDropTable() { createSchema(); List ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table_to_drop"); - CreateEmptyTableRequest createEmptyTableRequest = new CreateEmptyTableRequest(); + DeclareTableRequest declareTableRequest = new DeclareTableRequest(); String location = tempDir + "/" + "table_to_drop/"; - createEmptyTableRequest.setLocation(location); - createEmptyTableRequest.setId(ids); - CreateEmptyTableResponse response = - Assertions.assertDoesNotThrow(() -> ns.createEmptyTable(createEmptyTableRequest)); + declareTableRequest.setLocation(location); + declareTableRequest.setId(ids); + DeclareTableResponse response = + Assertions.assertDoesNotThrow(() -> ns.declareTable(declareTableRequest)); Assertions.assertNotNull(response); Assertions.assertEquals(location, response.getLocation()); @@ -948,13 +890,13 @@ void testDropTable() { RuntimeException exception = Assertions.assertThrows( RuntimeException.class, () -> ns.describeTable(describeTableRequest)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":4")); + assertLanceErrorCode(exception, ErrorCode.TABLE_NOT_FOUND); // Drop a non-existing table should fail dropTableRequest.setId(ids); exception = Assertions.assertThrows(RuntimeException.class, () -> ns.dropTable(dropTableRequest)); - Assertions.assertTrue(exception.getMessage().contains("\"code\":4")); + assertLanceErrorCode(exception, ErrorCode.TABLE_NOT_FOUND); } @Test @@ -978,7 +920,7 @@ void testDeclareTable() { Assertions.assertNotNull(loadTable); Assertions.assertEquals(location, loadTable.getLocation()); Assertions.assertEquals( - "true", loadTable.getMetadata().get(LanceConstants.LANCE_TABLE_CREATE_EMPTY)); + "true", loadTable.getMetadata().get(LanceConstants.LANCE_TABLE_DECLARED)); Assertions.assertEquals("true", loadTable.getMetadata().get(Table.PROPERTY_EXTERNAL)); // Try to declare the same table again should fail @@ -988,7 +930,7 @@ void testDeclareTable() { () -> { ns.declareTable(request); }); - Assertions.assertTrue(declareException.getMessage().contains("\"code\":5")); + assertLanceErrorCode(declareException, ErrorCode.TABLE_ALREADY_EXISTS); // Declare a table with non-existent location should succeed // since storage is not touched @@ -1022,7 +964,8 @@ private CreateTableResponse createTable( try { return createTableApi() - .createTable(String.join(DELIMITER, ids), body, DELIMITER, mode, additionalHeaders); + .createTable( + String.join(DELIMITER, ids), body, DELIMITER, mode, null, null, additionalHeaders); } catch (ApiException e) { throw toLanceNamespaceException(e); } @@ -1059,6 +1002,13 @@ private static LanceNamespaceException toLanceNamespaceException(ApiException e) return new LanceNamespaceException(ErrorCode.INTERNAL, e.getMessage(), e); } + private static void assertLanceErrorCode( + RuntimeException exception, ErrorCode expectedErrorCode) { + Assertions.assertInstanceOf(LanceNamespaceException.class, exception); + Assertions.assertEquals( + expectedErrorCode.getCode(), ((LanceNamespaceException) exception).getCode()); + } + private TableApi createTableApi() { ApiClient apiClient = new ApiClient().setBasePath(getLanceRestServiceUrl()); return new TableApi(apiClient); diff --git a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java index 2d8000fc1b8..a7ab97120af 100644 --- a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java +++ b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java @@ -19,7 +19,7 @@ package org.apache.gravitino.lance.service.rest; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -33,8 +33,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.io.IOException; +import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.regex.Pattern; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.client.Entity; @@ -60,8 +62,6 @@ import org.lance.namespace.model.AlterTableAlterColumnsResponse; import org.lance.namespace.model.AlterTableDropColumnsRequest; import org.lance.namespace.model.AlterTableDropColumnsResponse; -import org.lance.namespace.model.CreateEmptyTableRequest; -import org.lance.namespace.model.CreateEmptyTableResponse; import org.lance.namespace.model.CreateNamespaceRequest; import org.lance.namespace.model.CreateNamespaceResponse; import org.lance.namespace.model.CreateTableResponse; @@ -80,6 +80,7 @@ import org.lance.namespace.model.RegisterTableRequest; import org.lance.namespace.model.RegisterTableResponse; import org.mockito.Mockito; +import org.openapitools.jackson.nullable.JsonNullableModule; @SuppressWarnings("deprecation") public class TestLanceNamespaceOperations extends JerseyTest { @@ -111,10 +112,11 @@ protected Application configure() { // auto-register a Jackson provider that calls ObjectMapper.findAndRegisterModules(). // When jackson-module-scala is on the classpath (pulled in by spark-sql), the // auto-discovered provider deserializes JSON objects as Scala Maps instead of - // java.util.Map, breaking extractPropertiesFromBody(). + // java.util.Map, breaking Lance REST request handling that expects Java Maps. resourceConfig.property(CommonProperties.FEATURE_AUTO_DISCOVERY_DISABLE, true); resourceConfig.property(CommonProperties.MOXY_JSON_FEATURE_DISABLE, true); ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JsonNullableModule()); JacksonJaxbJsonProvider provider = new JacksonJaxbJsonProvider(); provider.setMapper(mapper); resourceConfig.register(provider); @@ -396,6 +398,7 @@ public void testDropNamespace() { @Test void testCreateTable() { + Mockito.reset(tableOps); String tableIds = "catalog.scheme.create_table"; String delimiter = "."; @@ -406,13 +409,27 @@ void testCreateTable() { byte[] bytes = new byte[] {0x01, 0x02, 0x03}; Response resp = - target(String.format("/v1/table/%s/create", tableIds)) - .queryParam("delimiter", delimiter) + client() + .target( + URI.create( + getBaseUri() + + String.format("v1/table/%s/create", tableIds) + + "?delimiter=." + + "&properties=%7B%22custom%22%3A%22value%22%7D" + + "&storage_options=%7B%22region%22%3A%22us-west-2%22%7D")) .request(MediaType.APPLICATION_JSON_TYPE) .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream")); Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); + Mockito.verify(tableOps) + .createTable( + eq(tableIds), + eq("create"), + eq(delimiter), + eq(null), + eq(Map.of("custom", "value", "lance.storage.region", "us-west-2")), + eq(bytes)); // Test illegal argument when(tableOps.createTable(any(), any(), any(), any(), any(), any())) @@ -443,121 +460,6 @@ void testCreateTable() { Assertions.assertEquals("Runtime exception", errorResp.getError()); } - @Test - void testCreateEmptyTable() { - String tableIds = "catalog.scheme.create_empty_table"; - String delimiter = "."; - - // Test normal - CreateEmptyTableResponse createTableResponse = new CreateEmptyTableResponse(); - createTableResponse.setLocation("/path/to/table"); - createTableResponse.setStorageOptions(ImmutableMap.of("key", "value")); - when(tableOps.createEmptyTable(any(), any(), any(), any())).thenReturn(createTableResponse); - - CreateEmptyTableRequest tableRequest = new CreateEmptyTableRequest(); - tableRequest.setLocation("/path/to/table"); - - Response resp = - target(String.format("/v1/table/%s/create-empty", tableIds)) - .queryParam("delimiter", delimiter) - .request(MediaType.APPLICATION_JSON_TYPE) - .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); - - Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); - Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); - CreateEmptyTableResponse response = resp.readEntity(CreateEmptyTableResponse.class); - Assertions.assertEquals(createTableResponse.getLocation(), response.getLocation()); - Assertions.assertEquals(createTableResponse.getStorageOptions(), response.getStorageOptions()); - - Mockito.verify(tableOps) - .createEmptyTable(eq(tableIds), eq(delimiter), eq("/path/to/table"), eq(Map.of())); - - // Backward compatibility: request-body properties should still be accepted. - Mockito.reset(tableOps); - when(tableOps.createEmptyTable(any(), any(), any(), any())).thenReturn(createTableResponse); - String bodyWithProperties = - "{" - + "\"id\":[\"catalog\",\"scheme\",\"create_empty_table\"]," - + "\"location\":\"/path/to/table\"," - + "\"properties\":{\"k1\":\"v1\",\"k2\":2}" - + "}"; - resp = - target(String.format("/v1/table/%s/create-empty", tableIds)) - .queryParam("delimiter", delimiter) - .request(MediaType.APPLICATION_JSON_TYPE) - .post(Entity.entity(bodyWithProperties, MediaType.APPLICATION_JSON_TYPE)); - Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); - Mockito.verify(tableOps) - .createEmptyTable( - eq(tableIds), - eq(delimiter), - eq("/path/to/table"), - argThat( - (Map props) -> - "v1".equals(props.get("k1")) - && "2".equals(props.get("k2")) - && props.size() == 2)); - - // Header properties should override body properties on key conflicts. - Mockito.reset(tableOps); - when(tableOps.createEmptyTable(any(), any(), any(), any())).thenReturn(createTableResponse); - String bodyWithOverlappedProperties = - "{" - + "\"id\":[\"catalog\",\"scheme\",\"create_empty_table\"]," - + "\"location\":\"/path/to/table\"," - + "\"properties\":{\"k1\":\"body\",\"k2\":\"body2\"}" - + "}"; - resp = - target(String.format("/v1/table/%s/create-empty", tableIds)) - .queryParam("delimiter", delimiter) - .request(MediaType.APPLICATION_JSON_TYPE) - .header( - LanceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER, - "{\"k1\":\"header\",\"k3\":\"v3\"}") - .post(Entity.entity(bodyWithOverlappedProperties, MediaType.APPLICATION_JSON_TYPE)); - Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); - Mockito.verify(tableOps) - .createEmptyTable( - eq(tableIds), - eq(delimiter), - eq("/path/to/table"), - argThat( - (Map props) -> - "header".equals(props.get("k1")) - && "body2".equals(props.get("k2")) - && "v3".equals(props.get("k3")) - && props.size() == 3)); - - Mockito.reset(tableOps); - // Test illegal argument - when(tableOps.createEmptyTable(any(), any(), any(), any())) - .thenThrow(new IllegalArgumentException("Illegal argument")); - - resp = - target(String.format("/v1/table/%s/create-empty", tableIds)) - .queryParam("delimiter", delimiter) - .request(MediaType.APPLICATION_JSON_TYPE) - .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); - Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resp.getStatus()); - Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); - - // Test runtime exception - Mockito.reset(tableOps); - when(tableOps.createEmptyTable(any(), any(), any(), any())) - .thenThrow(new RuntimeException("Runtime exception")); - resp = - target(String.format("/v1/table/%s/create-empty", tableIds)) - .queryParam("delimiter", delimiter) - .request(MediaType.APPLICATION_JSON_TYPE) - .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); - - Assertions.assertEquals( - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus()); - Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType()); - ErrorResponse errorResp = resp.readEntity(ErrorResponse.class); - Assertions.assertEquals("Runtime exception", errorResp.getError()); - } - @Test void testRegisterTable() { String tableIds = "catalog.scheme.register_table"; @@ -723,6 +625,7 @@ void testDeregisterTable() { @Test void testDescribeTable() { + Mockito.reset(tableOps); String tableIds = "catalog.scheme.describe_table"; String delimiter = "."; @@ -730,12 +633,13 @@ void testDescribeTable() { DescribeTableResponse createTableResponse = new DescribeTableResponse(); createTableResponse.setLocation("/path/to/describe_table"); createTableResponse.setMetadata(ImmutableMap.of("key", "value")); - when(tableOps.describeTable(any(), any(), any())).thenReturn(createTableResponse); + when(tableOps.describeTable(any(), any(), any(), anyBoolean())).thenReturn(createTableResponse); DescribeTableRequest tableRequest = new DescribeTableRequest(); Response resp = target(String.format("/v1/table/%s/describe", tableIds)) .queryParam("delimiter", delimiter) + .queryParam("check_declared", "true") .request(MediaType.APPLICATION_JSON_TYPE) .post(Entity.entity(tableRequest, MediaType.APPLICATION_JSON_TYPE)); @@ -744,10 +648,12 @@ void testDescribeTable() { DescribeTableResponse response = resp.readEntity(DescribeTableResponse.class); Assertions.assertEquals(createTableResponse.getLocation(), response.getLocation()); Assertions.assertEquals(createTableResponse.getMetadata(), response.getMetadata()); + Mockito.verify(tableOps) + .describeTable(eq(tableIds), eq(delimiter), eq(Optional.empty()), eq(true)); // Test not found exception Mockito.reset(tableOps); - when(tableOps.describeTable(any(), any(), any())) + when(tableOps.describeTable(any(), any(), any(), anyBoolean())) .thenThrow(new TableNotFoundException("Table not found", "", tableIds)); resp = target(String.format("/v1/table/%s/describe", tableIds)) @@ -758,7 +664,7 @@ void testDescribeTable() { // Test runtime exception Mockito.reset(tableOps); - when(tableOps.describeTable(any(), any(), any())) + when(tableOps.describeTable(any(), any(), any(), anyBoolean())) .thenThrow(new RuntimeException("Runtime exception")); resp = target(String.format("/v1/table/%s/describe", tableIds)) @@ -1032,6 +938,7 @@ void testAlterColumns() { @Test void testDeclareTable() { + Mockito.reset(tableOps); String tableIds = "catalog.scheme.declare_table"; String delimiter = "."; @@ -1043,6 +950,7 @@ void testDeclareTable() { DeclareTableRequest tableRequest = new DeclareTableRequest(); tableRequest.setLocation("/path/to/table"); + tableRequest.setProperties(Map.of("declared-key", "declared-value")); Response resp = target(String.format("/v1/table/%s/declare", tableIds)) @@ -1057,7 +965,11 @@ void testDeclareTable() { Assertions.assertEquals(declareTableResponse.getStorageOptions(), response.getStorageOptions()); Mockito.verify(tableOps) - .declareTable(eq(tableIds), eq(delimiter), eq("/path/to/table"), eq(Map.of())); + .declareTable( + eq(tableIds), + eq(delimiter), + eq("/path/to/table"), + eq(Map.of("declared-key", "declared-value"))); // Test illegal argument Mockito.reset(tableOps);