-
Notifications
You must be signed in to change notification settings - Fork 469
Improve the InMemory Catalog Implementation #289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
066e8c4
b1a99f7
32c449e
3c6e06a
21b1e50
e2541ac
3013bdb
be3eb1c
f80849a
faea973
4437a30
aabcde0
b91e1cf
2834bae
341f5ba
67c028a
96ba8de
c7f9053
8a7b876
10adb1c
a466e5f
03ec82b
58b34ca
810c3c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,246 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| import uuid | ||
| from typing import ( | ||
| Dict, | ||
| List, | ||
| Optional, | ||
| Set, | ||
| Union, | ||
| ) | ||
|
|
||
| import pyarrow as pa | ||
|
|
||
| from pyiceberg.catalog import ( | ||
| Catalog, | ||
| Identifier, | ||
| Properties, | ||
| PropertiesUpdateSummary, | ||
| ) | ||
| from pyiceberg.exceptions import ( | ||
| NamespaceAlreadyExistsError, | ||
| NamespaceNotEmptyError, | ||
| NoSuchNamespaceError, | ||
| NoSuchTableError, | ||
| TableAlreadyExistsError, | ||
| ) | ||
| from pyiceberg.io import WAREHOUSE | ||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.table import ( | ||
| CommitTableRequest, | ||
| CommitTableResponse, | ||
| Table, | ||
| update_table_metadata, | ||
| ) | ||
| from pyiceberg.table.metadata import new_table_metadata | ||
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder | ||
| from pyiceberg.typedef import EMPTY_DICT | ||
|
|
||
| DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. by default, write on disk to |
||
|
|
||
|
|
||
| class InMemoryCatalog(Catalog): | ||
| """ | ||
| An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. | ||
|
|
||
| This is useful for test, demo, and playground but not in production as data is not persisted. | ||
| """ | ||
|
|
||
| __tables: Dict[Identifier, Table] | ||
| __namespaces: Dict[Identifier, Properties] | ||
|
|
||
| def __init__(self, name: str, **properties: str) -> None: | ||
| super().__init__(name, **properties) | ||
| self.__tables = {} | ||
| self.__namespaces = {} | ||
| self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can pass a warehouse location using properties. warehouse location can be another fs such as s3 |
||
|
|
||
| def create_table( | ||
| self, | ||
| identifier: Union[str, Identifier], | ||
| schema: Union[Schema, "pa.Schema"], | ||
| location: Optional[str] = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| table_uuid: Optional[uuid.UUID] = None, | ||
| ) -> Table: | ||
| schema: Schema = self._convert_schema_if_needed(schema) # type: ignore | ||
|
|
||
| identifier = Catalog.identifier_to_tuple(identifier) | ||
| namespace = Catalog.namespace_from(identifier) | ||
|
|
||
| if identifier in self.__tables: | ||
| raise TableAlreadyExistsError(f"Table already exists: {identifier}") | ||
| else: | ||
| if namespace not in self.__namespaces: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other implementations don't auto-create namespaces, however I think it is fine for the InMemory one. |
||
| self.__namespaces[namespace] = {} | ||
|
|
||
| if not location: | ||
| location = f'{self._warehouse_location}/{"/".join(identifier)}' | ||
|
|
||
| metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we don't write the metadata here, but we write it below at the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, the actual writing is done by
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, but I'm a bit confused here. If I just want to create the table without inserting any data: catalog.create_table(schema, ....)I still expect a new In the previous implementation no file is written. But since we have updated
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gotcha, that makes sense! |
||
|
|
||
| metadata = new_table_metadata( | ||
| schema=schema, | ||
| partition_spec=partition_spec, | ||
| sort_order=sort_order, | ||
| location=location, | ||
| properties=properties, | ||
| table_uuid=table_uuid, | ||
| ) | ||
| table = Table( | ||
| identifier=identifier, | ||
| metadata=metadata, | ||
| metadata_location=metadata_location, | ||
| io=self._load_file_io(properties=metadata.properties, location=metadata_location), | ||
| catalog=self, | ||
| ) | ||
| self.__tables[identifier] = table | ||
| return table | ||
|
|
||
| def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: | ||
| raise NotImplementedError | ||
|
|
||
| def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: | ||
| identifier_tuple = self.identifier_to_tuple_without_catalog( | ||
| tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) | ||
| ) | ||
| current_table = self.load_table(identifier_tuple) | ||
| base_metadata = current_table.metadata | ||
|
|
||
| for requirement in table_request.requirements: | ||
| requirement.validate(base_metadata) | ||
|
|
||
| updated_metadata = update_table_metadata(base_metadata, table_request.updates) | ||
| if updated_metadata == base_metadata: | ||
| # no changes, do nothing | ||
| return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) | ||
|
|
||
| # write new metadata | ||
| new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 | ||
| new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) | ||
| self._write_metadata(updated_metadata, current_table.io, new_metadata_location) | ||
|
|
||
| # update table state | ||
| current_table.metadata = updated_metadata | ||
|
|
||
| return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) | ||
|
|
||
| def load_table(self, identifier: Union[str, Identifier]) -> Table: | ||
| identifier = self.identifier_to_tuple_without_catalog(identifier) | ||
| try: | ||
| return self.__tables[identifier] | ||
| except KeyError as error: | ||
| raise NoSuchTableError(f"Table does not exist: {identifier}") from error | ||
|
|
||
| def drop_table(self, identifier: Union[str, Identifier]) -> None: | ||
| identifier = self.identifier_to_tuple_without_catalog(identifier) | ||
| try: | ||
| self.__tables.pop(identifier) | ||
| except KeyError as error: | ||
| raise NoSuchTableError(f"Table does not exist: {identifier}") from error | ||
|
|
||
| def purge_table(self, identifier: Union[str, Identifier]) -> None: | ||
| self.drop_table(identifier) | ||
|
|
||
| def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: | ||
| from_identifier = self.identifier_to_tuple_without_catalog(from_identifier) | ||
| try: | ||
| table = self.__tables.pop(from_identifier) | ||
| except KeyError as error: | ||
| raise NoSuchTableError(f"Table does not exist: {from_identifier}") from error | ||
|
|
||
| to_identifier = Catalog.identifier_to_tuple(to_identifier) | ||
| to_namespace = Catalog.namespace_from(to_identifier) | ||
| if to_namespace not in self.__namespaces: | ||
| self.__namespaces[to_namespace] = {} | ||
|
|
||
| self.__tables[to_identifier] = Table( | ||
| identifier=to_identifier, | ||
| metadata=table.metadata, | ||
| metadata_location=table.metadata_location, | ||
| io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), | ||
| catalog=self, | ||
| ) | ||
| return self.__tables[to_identifier] | ||
|
|
||
| def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: | ||
| namespace = Catalog.identifier_to_tuple(namespace) | ||
| if namespace in self.__namespaces: | ||
| raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}") | ||
| else: | ||
| self.__namespaces[namespace] = properties if properties else {} | ||
|
|
||
| def drop_namespace(self, namespace: Union[str, Identifier]) -> None: | ||
| namespace = Catalog.identifier_to_tuple(namespace) | ||
| if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: | ||
| raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}") | ||
| try: | ||
| self.__namespaces.pop(namespace) | ||
| except KeyError as error: | ||
| raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error | ||
|
|
||
| def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: | ||
| if namespace: | ||
| namespace = Catalog.identifier_to_tuple(namespace) | ||
| list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]] | ||
| else: | ||
| list_tables = list(self.__tables.keys()) | ||
|
|
||
| return list_tables | ||
|
|
||
| def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: | ||
| # Hierarchical namespace is not supported. Return an empty list | ||
| if namespace: | ||
| return [] | ||
|
|
||
| return list(self.__namespaces.keys()) | ||
|
|
||
| def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: | ||
| namespace = Catalog.identifier_to_tuple(namespace) | ||
| try: | ||
| return self.__namespaces[namespace] | ||
| except KeyError as error: | ||
| raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error | ||
|
|
||
| def update_namespace_properties( | ||
| self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT | ||
| ) -> PropertiesUpdateSummary: | ||
| removed: Set[str] = set() | ||
| updated: Set[str] = set() | ||
|
|
||
| namespace = Catalog.identifier_to_tuple(namespace) | ||
| if namespace in self.__namespaces: | ||
| if removals: | ||
| for key in removals: | ||
| if key in self.__namespaces[namespace]: | ||
| del self.__namespaces[namespace][key] | ||
| removed.add(key) | ||
| if updates: | ||
| for key, value in updates.items(): | ||
| self.__namespaces[namespace][key] = value | ||
| updated.add(key) | ||
| else: | ||
| raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") | ||
|
|
||
| expected_to_change = removed.difference(removals or set()) | ||
|
|
||
| return PropertiesUpdateSummary( | ||
| removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change) | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.