Skip to content

Commit cb8f791

Browse files
committed
Add catalog properties
1 parent cf8b46e commit cb8f791

File tree

9 files changed

+139
-20
lines changed

9 files changed

+139
-20
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,9 +733,33 @@ def namespace_to_string(identifier: str | Identifier, err: type[ValueError] | ty
733733

734734
return ".".join(segment.strip() for segment in tuple_identifier)
735735

736+
@abstractmethod
736737
def supports_server_side_planning(self) -> bool:
737738
"""Check if the catalog supports server-side scan planning."""
738-
return False
739+
740+
@abstractmethod
741+
def supports_purge_table(self) -> bool:
742+
"""Check if the catalog supports purging tables."""
743+
744+
@abstractmethod
745+
def supports_atomic_concurrent_updates(self) -> bool:
746+
"""Check if the catalog supports atomic concurrent updates."""
747+
748+
@abstractmethod
749+
def supports_nested_namespaces(self) -> bool:
750+
"""Check if the catalog supports nested namespaces."""
751+
752+
@abstractmethod
753+
def supports_schema_evolution(self) -> bool:
754+
"""Check if the catalog supports schema evolution."""
755+
756+
@abstractmethod
757+
def supports_slash_in_identifier(self) -> bool:
758+
"""Check if the catalog supports slash in identifier."""
759+
760+
@abstractmethod
761+
def supports_dot_in_identifier(self) -> bool:
762+
"""Check if the catalog supports dot in identifier."""
739763

740764
@staticmethod
741765
def identifier_to_database(
@@ -836,6 +860,27 @@ class MetastoreCatalog(Catalog, ABC):
836860
def __init__(self, name: str, **properties: str):
837861
super().__init__(name, **properties)
838862

863+
def supports_server_side_planning(self) -> bool:
864+
return False
865+
866+
def supports_purge_table(self) -> bool:
867+
return True
868+
869+
def supports_atomic_concurrent_updates(self) -> bool:
870+
return True
871+
872+
def supports_nested_namespaces(self) -> bool:
873+
return True
874+
875+
def supports_schema_evolution(self) -> bool:
876+
return True
877+
878+
def supports_slash_in_identifier(self) -> bool:
879+
return True
880+
881+
def supports_dot_in_identifier(self) -> bool:
882+
return True
883+
839884
def create_table_transaction(
840885
self,
841886
identifier: str | Identifier,

pyiceberg/catalog/bigquery_metastore.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ def __init__(self, name: str, **properties: str):
9898
self.location = location
9999
self.project_id = project_id
100100

101+
def supports_nested_namespaces(self) -> bool:
102+
return False
103+
101104
def create_table(
102105
self,
103106
identifier: str | Identifier,

pyiceberg/catalog/dynamodb.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **prope
117117
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
118118
self._ensure_catalog_table_exists_or_create()
119119

120+
def supports_nested_namespaces(self) -> bool:
121+
return False
122+
120123
def _ensure_catalog_table_exists_or_create(self) -> None:
121124
if self._dynamodb_table_exists():
122125
return None

pyiceberg/catalog/glue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,9 @@ def __init__(self, name: str, client: Optional["GlueClient"] = None, **propertie
350350
if glue_catalog_id := properties.get(GLUE_ID):
351351
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)
352352

353+
def supports_nested_namespaces(self) -> bool:
354+
return False
355+
353356
def _convert_glue_to_iceberg(self, glue_table: "TableTypeDef") -> Table:
354357
if (database_name := glue_table.get("DatabaseName")) is None:
355358
raise ValueError("Glue table is missing DatabaseName property")

pyiceberg/catalog/hive.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,24 @@ def drop_table(self, identifier: str | Identifier) -> None:
650650
# When the namespace doesn't exist, it throws the same error
651651
raise NoSuchTableError(f"Table does not exists: {table_name}") from e
652652

653+
def supports_purge_table(self) -> bool:
654+
return False
655+
656+
def supports_atomic_concurrent_updates(self) -> bool:
657+
return False
658+
659+
def supports_nested_namespaces(self) -> bool:
660+
return False
661+
662+
def supports_schema_evolution(self) -> bool:
663+
return False
664+
665+
def supports_slash_in_identifier(self) -> bool:
666+
return False
667+
668+
def supports_dot_in_identifier(self) -> bool:
669+
return False
670+
653671
def purge_table(self, identifier: str | Identifier) -> None:
654672
# This requires to traverse the reachability set, and drop all the data files.
655673
raise NotImplementedError("Not yet implemented")
@@ -729,7 +747,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
729747
open_client.drop_database(database_name, deleteData=False, cascade=False)
730748
except InvalidOperationException as e:
731749
raise NamespaceNotEmptyError(f"Database {database_name} is not empty") from e
732-
except MetaException as e:
750+
except (MetaException, NoSuchObjectException) as e:
733751
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
734752

735753
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:

pyiceberg/catalog/noop.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,27 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
8585
def drop_table(self, identifier: str | Identifier) -> None:
8686
raise NotImplementedError
8787

88+
def supports_server_side_planning(self) -> bool:
89+
raise NotImplementedError
90+
91+
def supports_purge_table(self) -> bool:
92+
raise NotImplementedError
93+
94+
def supports_atomic_concurrent_updates(self) -> bool:
95+
raise NotImplementedError
96+
97+
def supports_nested_namespaces(self) -> bool:
98+
raise NotImplementedError
99+
100+
def supports_schema_evolution(self) -> bool:
101+
raise NotImplementedError
102+
103+
def supports_slash_in_identifier(self) -> bool:
104+
raise NotImplementedError
105+
106+
def supports_dot_in_identifier(self) -> bool:
107+
raise NotImplementedError
108+
88109
def purge_table(self, identifier: str | Identifier) -> None:
89110
raise NotImplementedError
90111

pyiceberg/catalog/rest/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,30 @@ def supports_server_side_planning(self) -> bool:
403403
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
404404
)
405405

406+
def supports_purge_table(self) -> bool:
407+
"""Check if the catalog supports purging tables."""
408+
return property_as_bool(self.properties, "supports_purge_table", True)
409+
410+
def supports_atomic_concurrent_updates(self) -> bool:
411+
"""Check if the catalog supports atomic concurrent updates."""
412+
return property_as_bool(self.properties, "supports_atomic_concurrent_updates", True)
413+
414+
def supports_nested_namespaces(self) -> bool:
415+
"""Check if the catalog supports nested namespaces."""
416+
return property_as_bool(self.properties, "supports_nested_namespaces", True)
417+
418+
def supports_schema_evolution(self) -> bool:
419+
"""Check if the catalog supports schema evolution."""
420+
return property_as_bool(self.properties, "supports_schema_evolution", True)
421+
422+
def supports_slash_in_identifier(self) -> bool:
423+
"""Check if the catalog supports slash in identifier."""
424+
return property_as_bool(self.properties, "supports_slash_in_identifier", True)
425+
426+
def supports_dot_in_identifier(self) -> bool:
427+
"""Check if the catalog supports dot in identifier."""
428+
return property_as_bool(self.properties, "supports_dot_in_identifier", True)
429+
406430
@retry(**_RETRY_ARGS)
407431
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:
408432
"""Submit a scan plan request to the REST server.

pyiceberg/catalog/sql.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ def create_tables(self) -> None:
149149
def destroy_tables(self) -> None:
150150
SqlCatalogBaseTable.metadata.drop_all(self.engine)
151151

152+
def supports_slash_in_identifier(self) -> bool:
153+
return False
154+
155+
def supports_dot_in_identifier(self) -> bool:
156+
return False
157+
152158
def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table:
153159
# Check for expected properties.
154160
if not (metadata_location := orm_table.metadata_location):

tests/integration/test_catalog.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ def test_drop_table(test_catalog: Catalog, table_schema_nested: Schema, table_na
246246
@pytest.mark.integration
247247
@pytest.mark.parametrize("test_catalog", CATALOGS)
248248
def test_purge_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
249-
if isinstance(test_catalog, HiveCatalog):
250-
pytest.skip("HiveCatalog does not support purge_table operation yet")
249+
if not test_catalog.supports_purge_table():
250+
pytest.skip("Catalog does not support purge_table operation")
251251

252252
identifier = (database_name, table_name)
253253
test_catalog.create_namespace(database_name)
@@ -299,8 +299,8 @@ def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, ta
299299
@pytest.mark.integration
300300
@pytest.mark.parametrize("test_catalog", CATALOGS)
301301
def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
302-
if isinstance(test_catalog, HiveCatalog):
303-
pytest.skip("HiveCatalog fails in this test, need to investigate")
302+
if not test_catalog.supports_atomic_concurrent_updates():
303+
pytest.skip("Catalog does not support atomic concurrent updates")
304304

305305
identifier = (database_name, table_name)
306306

@@ -646,8 +646,8 @@ def test_rest_custom_namespace_separator(rest_catalog: RestCatalog, table_schema
646646
def test_incompatible_partitioned_schema_evolution(
647647
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str
648648
) -> None:
649-
if isinstance(test_catalog, HiveCatalog):
650-
pytest.skip("HiveCatalog does not support schema evolution")
649+
if not test_catalog.supports_schema_evolution():
650+
pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution")
651651

652652
identifier = (database_name, table_name)
653653
test_catalog.create_namespace(database_name)
@@ -675,7 +675,7 @@ def test_incompatible_partitioned_schema_evolution(
675675
@pytest.mark.integration
676676
@pytest.mark.parametrize("test_catalog", CATALOGS)
677677
def test_namespace_with_slash(test_catalog: Catalog) -> None:
678-
if isinstance(test_catalog, HiveCatalog):
678+
if not test_catalog.supports_slash_in_identifier():
679679
pytest.skip(f"{type(test_catalog).__name__} does not support slash in namespace")
680680

681681
namespace = ("new/db",)
@@ -700,8 +700,8 @@ def test_namespace_with_slash(test_catalog: Catalog) -> None:
700700
def test_incompatible_sorted_schema_evolution(
701701
test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str
702702
) -> None:
703-
if isinstance(test_catalog, HiveCatalog):
704-
pytest.skip("HiveCatalog does not support schema evolution")
703+
if not test_catalog.supports_schema_evolution():
704+
pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution")
705705

706706
identifier = (database_name, table_name)
707707
test_catalog.create_namespace(database_name)
@@ -720,7 +720,7 @@ def test_incompatible_sorted_schema_evolution(
720720
@pytest.mark.integration
721721
@pytest.mark.parametrize("test_catalog", CATALOGS)
722722
def test_namespace_with_dot(test_catalog: Catalog) -> None:
723-
if isinstance(test_catalog, (HiveCatalog, SqlCatalog)):
723+
if not test_catalog.supports_dot_in_identifier():
724724
pytest.skip(f"{type(test_catalog).__name__} does not support dot in namespace")
725725

726726
namespace = ("new.db",)
@@ -733,9 +733,8 @@ def test_namespace_with_dot(test_catalog: Catalog) -> None:
733733
test_catalog.create_namespace(namespace)
734734
assert test_catalog.namespace_exists(namespace)
735735

736-
# REST Catalog fixture treats this as a hierarchical namespace.
737-
# Calling list namespaces will get `new`, not `new.db`.
738-
if isinstance(test_catalog, RestCatalog):
736+
# Hierarchical catalogs might treat this as multiple levels.
737+
if test_catalog.supports_nested_namespaces():
739738
namespaces = test_catalog.list_namespaces()
740739
assert ("new",) in namespaces or ("new.db",) in namespaces
741740
else:
@@ -751,7 +750,7 @@ def test_namespace_with_dot(test_catalog: Catalog) -> None:
751750
@pytest.mark.integration
752751
@pytest.mark.parametrize("test_catalog", CATALOGS)
753752
def test_table_name_with_slash(test_catalog: Catalog, table_schema_simple: Schema) -> None:
754-
if isinstance(test_catalog, (HiveCatalog, SqlCatalog)):
753+
if not test_catalog.supports_slash_in_identifier():
755754
pytest.skip(f"{type(test_catalog).__name__} does not support slash in table name")
756755

757756
namespace = ("ns_slash",)
@@ -778,7 +777,7 @@ def test_table_name_with_slash(test_catalog: Catalog, table_schema_simple: Schem
778777
@pytest.mark.integration
779778
@pytest.mark.parametrize("test_catalog", CATALOGS)
780779
def test_table_name_with_dot(test_catalog: Catalog, table_schema_simple: Schema) -> None:
781-
if isinstance(test_catalog, (HiveCatalog, SqlCatalog)):
780+
if not test_catalog.supports_dot_in_identifier():
782781
pytest.skip(f"{type(test_catalog).__name__} does not support dot in table name")
783782

784783
namespace = ("ns_dot",)
@@ -817,9 +816,6 @@ def test_drop_missing_table(test_catalog: Catalog, database_name: str) -> None:
817816
@pytest.mark.integration
818817
@pytest.mark.parametrize("test_catalog", CATALOGS)
819818
def test_drop_nonexistent_namespace(test_catalog: Catalog) -> None:
820-
if isinstance(test_catalog, HiveCatalog):
821-
pytest.skip("HiveCatalog raises NoSuchObjectException instead of NoSuchNamespaceError")
822-
823819
namespace = ("non_existent_namespace",)
824820
with pytest.raises(NoSuchNamespaceError):
825821
test_catalog.drop_namespace(namespace)

0 commit comments

Comments
 (0)