1717# pylint:disable=redefined-outer-name
1818
1919
20- import uuid
2120from pathlib import PosixPath
22- from typing import (
23- Dict ,
24- List ,
25- Optional ,
26- Set ,
27- Tuple ,
28- Union ,
29- )
21+ from typing import Optional , Union
3022
3123import pyarrow as pa
3224import pytest
3325from pydantic_core import ValidationError
3426from pytest_lazyfixture import lazy_fixture
3527
36- from pyiceberg .catalog import Catalog , MetastoreCatalog , PropertiesUpdateSummary , load_catalog
28+ from pyiceberg .catalog import Catalog , load_catalog
29+ from pyiceberg .catalog .sql import SqlCatalog
3730from pyiceberg .exceptions import (
3831 NamespaceAlreadyExistsError ,
3932 NamespaceNotEmptyError ,
4033 NoSuchNamespaceError ,
4134 NoSuchTableError ,
4235 TableAlreadyExistsError ,
4336)
44- from pyiceberg .io import WAREHOUSE , load_file_io
37+ from pyiceberg .io import WAREHOUSE
4538from pyiceberg .partitioning import UNPARTITIONED_PARTITION_SPEC , PartitionField , PartitionSpec
4639from pyiceberg .schema import Schema
4740from pyiceberg .table import (
4841 AddSchemaUpdate ,
49- CommitTableResponse ,
5042 SetCurrentSchemaUpdate ,
5143 Table ,
52- TableRequirement ,
53- TableUpdate ,
54- update_table_metadata ,
5544)
56- from pyiceberg .table .metadata import new_table_metadata
5745from pyiceberg .table .sorting import UNSORTED_SORT_ORDER , SortOrder
5846from pyiceberg .transforms import IdentityTransform
5947from pyiceberg .typedef import EMPTY_DICT , Identifier , Properties
6250DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"
6351
6452
65- class InMemoryCatalog (MetastoreCatalog ):
53+ class InMemoryCatalog (SqlCatalog ):
6654 """
67- An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables .
55+ An in-memory catalog implementation that uses SqlCatalog with SQLite in-memory database .
6856
6957 This is useful for test, demo, and playground but not in production as data is not persisted.
7058 """
7159
72- __tables : Dict [Identifier , Table ]
73- __namespaces : Dict [Identifier , Properties ]
74-
75- def __init__ (self , name : str , ** properties : str ) -> None :
76- super ().__init__ (name , ** properties )
77- self .__tables = {}
78- self .__namespaces = {}
79- self ._warehouse_location = properties .get (WAREHOUSE , DEFAULT_WAREHOUSE_LOCATION )
60+ def __init__ (self , name : str , ** kwargs : str ) -> None :
61+ if WAREHOUSE not in kwargs :
62+ kwargs [WAREHOUSE ] = DEFAULT_WAREHOUSE_LOCATION
63+ self ._warehouse_location = kwargs [WAREHOUSE ]
64+ if "uri" not in kwargs :
65+ kwargs ["uri" ] = "sqlite:///:memory:"
66+ super ().__init__ (name = name , ** kwargs )
8067
8168 def create_table (
8269 self ,
@@ -86,181 +73,24 @@ def create_table(
8673 partition_spec : PartitionSpec = UNPARTITIONED_PARTITION_SPEC ,
8774 sort_order : SortOrder = UNSORTED_SORT_ORDER ,
8875 properties : Properties = EMPTY_DICT ,
89- table_uuid : Optional [uuid .UUID ] = None ,
9076 ) -> Table :
91- schema : Schema = self ._convert_schema_if_needed (schema ) # type: ignore
92-
93- identifier = Catalog .identifier_to_tuple (identifier )
94- namespace = Catalog .namespace_from (identifier )
95-
96- if identifier in self .__tables :
97- raise TableAlreadyExistsError (f"Table already exists: { identifier } " )
98- else :
99- if namespace not in self .__namespaces :
100- self .__namespaces [namespace ] = {}
101-
102- if not location :
103- location = f'{ self ._warehouse_location } /{ "/" .join (identifier )} '
104- location = location .rstrip ("/" )
105-
106- metadata_location = self ._get_metadata_location (location = location )
107- metadata = new_table_metadata (
108- schema = schema ,
109- partition_spec = partition_spec ,
110- sort_order = sort_order ,
111- location = location ,
112- properties = properties ,
113- table_uuid = table_uuid ,
114- )
115- io = load_file_io ({** self .properties , ** properties }, location = location )
116- self ._write_metadata (metadata , io , metadata_location )
117-
118- table = Table (
119- identifier = identifier ,
120- metadata = metadata ,
121- metadata_location = metadata_location ,
122- io = io ,
123- catalog = self ,
124- )
125- self .__tables [identifier ] = table
126- return table
127-
128- def register_table (self , identifier : Union [str , Identifier ], metadata_location : str ) -> Table :
129- raise NotImplementedError
130-
131- def commit_table (
132- self , table : Table , requirements : Tuple [TableRequirement , ...], updates : Tuple [TableUpdate , ...]
133- ) -> CommitTableResponse :
134- identifier_tuple = self ._identifier_to_tuple_without_catalog (table .identifier )
135- current_table = self .load_table (identifier_tuple )
136- base_metadata = current_table .metadata
137-
138- for requirement in requirements :
139- requirement .validate (base_metadata )
140-
141- updated_metadata = update_table_metadata (base_metadata , updates )
142- if updated_metadata == base_metadata :
143- # no changes, do nothing
144- return CommitTableResponse (metadata = base_metadata , metadata_location = current_table .metadata_location )
145-
146- # write new metadata
147- new_metadata_version = self ._parse_metadata_version (current_table .metadata_location ) + 1
148- new_metadata_location = self ._get_metadata_location (current_table .metadata .location , new_metadata_version )
149- self ._write_metadata (updated_metadata , current_table .io , new_metadata_location )
150-
151- # update table state
152- current_table .metadata = updated_metadata
153-
154- return CommitTableResponse (metadata = updated_metadata , metadata_location = new_metadata_location )
155-
156- def load_table (self , identifier : Union [str , Identifier ]) -> Table :
157- identifier_tuple = self ._identifier_to_tuple_without_catalog (identifier )
158- try :
159- return self .__tables [identifier_tuple ]
160- except KeyError as error :
161- raise NoSuchTableError (f"Table does not exist: { identifier_tuple } " ) from error
162-
163- def drop_table (self , identifier : Union [str , Identifier ]) -> None :
164- identifier_tuple = self ._identifier_to_tuple_without_catalog (identifier )
165- try :
166- self .__tables .pop (identifier_tuple )
167- except KeyError as error :
168- raise NoSuchTableError (f"Table does not exist: { identifier_tuple } " ) from error
169-
170- def purge_table (self , identifier : Union [str , Identifier ]) -> None :
171- self .drop_table (identifier )
172-
173- def rename_table (self , from_identifier : Union [str , Identifier ], to_identifier : Union [str , Identifier ]) -> Table :
174- identifier_tuple = self ._identifier_to_tuple_without_catalog (from_identifier )
175- try :
176- table = self .__tables .pop (identifier_tuple )
177- except KeyError as error :
178- raise NoSuchTableError (f"Table does not exist: { identifier_tuple } " ) from error
179-
180- to_identifier = Catalog .identifier_to_tuple (to_identifier )
181- to_namespace = Catalog .namespace_from (to_identifier )
182- if to_namespace not in self .__namespaces :
183- self .__namespaces [to_namespace ] = {}
184-
185- self .__tables [to_identifier ] = Table (
186- identifier = to_identifier ,
187- metadata = table .metadata ,
188- metadata_location = table .metadata_location ,
189- io = self ._load_file_io (properties = table .metadata .properties , location = table .metadata_location ),
190- catalog = self ,
77+ namespace_identifier = Catalog .namespace_from (identifier )
78+ if not self ._namespace_exists (namespace_identifier ):
79+ self .create_namespace (namespace_identifier )
80+ return super ().create_table (
81+ identifier = identifier ,
82+ schema = schema ,
83+ location = location ,
84+ partition_spec = partition_spec ,
85+ sort_order = sort_order ,
86+ properties = properties ,
19187 )
192- return self .__tables [to_identifier ]
193-
194- def create_namespace (self , namespace : Union [str , Identifier ], properties : Properties = EMPTY_DICT ) -> None :
195- namespace = Catalog .identifier_to_tuple (namespace )
196- if namespace in self .__namespaces :
197- raise NamespaceAlreadyExistsError (f"Namespace already exists: { namespace } " )
198- else :
199- self .__namespaces [namespace ] = properties if properties else {}
200-
201- def drop_namespace (self , namespace : Union [str , Identifier ]) -> None :
202- namespace = Catalog .identifier_to_tuple (namespace )
203- if [table_identifier for table_identifier in self .__tables .keys () if namespace == table_identifier [:- 1 ]]:
204- raise NamespaceNotEmptyError (f"Namespace is not empty: { namespace } " )
205- try :
206- self .__namespaces .pop (namespace )
207- except KeyError as error :
208- raise NoSuchNamespaceError (f"Namespace does not exist: { namespace } " ) from error
209-
210- def list_tables (self , namespace : Optional [Union [str , Identifier ]] = None ) -> List [Identifier ]:
211- if namespace :
212- namespace = Catalog .identifier_to_tuple (namespace )
213- list_tables = [table_identifier for table_identifier in self .__tables .keys () if namespace == table_identifier [:- 1 ]]
214- else :
215- list_tables = list (self .__tables .keys ())
216-
217- return list_tables
218-
219- def list_namespaces (self , namespace : Union [str , Identifier ] = ()) -> List [Identifier ]:
220- # Hierarchical namespace is not supported. Return an empty list
221- if namespace :
222- return []
223-
224- return list (self .__namespaces .keys ())
225-
226- def load_namespace_properties (self , namespace : Union [str , Identifier ]) -> Properties :
227- namespace = Catalog .identifier_to_tuple (namespace )
228- try :
229- return self .__namespaces [namespace ]
230- except KeyError as error :
231- raise NoSuchNamespaceError (f"Namespace does not exist: { namespace } " ) from error
232-
233- def update_namespace_properties (
234- self , namespace : Union [str , Identifier ], removals : Optional [Set [str ]] = None , updates : Properties = EMPTY_DICT
235- ) -> PropertiesUpdateSummary :
236- removed : Set [str ] = set ()
237- updated : Set [str ] = set ()
238-
239- namespace = Catalog .identifier_to_tuple (namespace )
240- if namespace in self .__namespaces :
241- if removals :
242- for key in removals :
243- if key in self .__namespaces [namespace ]:
244- del self .__namespaces [namespace ][key ]
245- removed .add (key )
246- if updates :
247- for key , value in updates .items ():
248- self .__namespaces [namespace ][key ] = value
249- updated .add (key )
250- else :
251- raise NoSuchNamespaceError (f"Namespace does not exist: { namespace } " )
252-
253- expected_to_change = removed .difference (removals or set ())
254-
255- return PropertiesUpdateSummary (
256- removed = list (removed or []), updated = list (updates .keys () if updates else []), missing = list (expected_to_change )
257- )
258-
259- def list_views (self , namespace : Optional [Union [str , Identifier ]] = None ) -> List [Identifier ]:
260- raise NotImplementedError
26188
262- def drop_view (self , identifier : Union [str , Identifier ]) -> None :
263- raise NotImplementedError
89+ def rename_table (self , from_identifier : Union [str , Identifier ], to_identifier : Union [str , Identifier ]) -> Table :
90+ namespace_identifier = Catalog .namespace_from (to_identifier )
91+ if not self ._namespace_exists (namespace_identifier ):
92+ self .create_namespace (namespace_identifier )
93+ return super ().rename_table (from_identifier , to_identifier )
26494
26595
26696@pytest .fixture
@@ -278,11 +108,13 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
278108)
279109TEST_TABLE_PARTITION_SPEC = PartitionSpec (PartitionField (name = "x" , transform = IdentityTransform (), source_id = 1 , field_id = 1000 ))
280110TEST_TABLE_PROPERTIES = {"key1" : "value1" , "key2" : "value2" }
281- NO_SUCH_TABLE_ERROR = "Table does not exist: \\ ('com', 'organization', 'department', 'my_table'\\ )"
282- TABLE_ALREADY_EXISTS_ERROR = "Table already exists: \\ ('com', 'organization', 'department', 'my_table'\\ )"
283- NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace already exists: \\ ('com', 'organization', 'department'\\ )"
284- NO_SUCH_NAMESPACE_ERROR = "Namespace does not exist: \\ ('com', 'organization', 'department'\\ )"
285- NAMESPACE_NOT_EMPTY_ERROR = "Namespace is not empty: \\ ('com', 'organization', 'department'\\ )"
111+ NO_SUCH_TABLE_ERROR = "Table does not exist: com.organization.department.my_table"
112+ TABLE_ALREADY_EXISTS_ERROR = "Table com.organization.department.my_table already exists"
113+ NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace \\ ('com', 'organization', 'department'\\ ) already exists"
114+ # TODO: consolidate namespace error messages then remove this
115+ DROP_NOT_EXISTING_NAMESPACE_ERROR = "Namespace does not exist: \\ ('com', 'organization', 'department'\\ )"
116+ NO_SUCH_NAMESPACE_ERROR = "Namespace com.organization.department does not exists"
117+ NAMESPACE_NOT_EMPTY_ERROR = "Namespace com.organization.department is not empty"
286118
287119
288120def given_catalog_has_a_table (
@@ -591,7 +423,7 @@ def test_drop_namespace(catalog: InMemoryCatalog) -> None:
591423
592424
593425def test_drop_namespace_raises_error_when_namespace_does_not_exist (catalog : InMemoryCatalog ) -> None :
594- with pytest .raises (NoSuchNamespaceError , match = NO_SUCH_NAMESPACE_ERROR ):
426+ with pytest .raises (NoSuchNamespaceError , match = DROP_NOT_EXISTING_NAMESPACE_ERROR ):
595427 catalog .drop_namespace (TEST_TABLE_NAMESPACE )
596428
597429
@@ -607,7 +439,7 @@ def test_list_tables(catalog: InMemoryCatalog) -> None:
607439 # Given
608440 given_catalog_has_a_table (catalog )
609441 # When
610- tables = catalog .list_tables ()
442+ tables = catalog .list_tables (namespace = TEST_TABLE_NAMESPACE )
611443 # Then
612444 assert tables
613445 assert TEST_TABLE_IDENTIFIER in tables
@@ -619,7 +451,7 @@ def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None:
619451 new_namespace = ("new" , "namespace" )
620452 catalog .create_namespace (new_namespace )
621453 # When
622- all_tables = catalog .list_tables ()
454+ all_tables = catalog .list_tables (namespace = TEST_TABLE_NAMESPACE )
623455 new_namespace_tables = catalog .list_tables (new_namespace )
624456 # Then
625457 assert all_tables
@@ -638,7 +470,9 @@ def test_update_namespace_metadata(catalog: InMemoryCatalog) -> None:
638470 # Then
639471 assert TEST_TABLE_NAMESPACE in catalog .list_namespaces ()
640472 assert new_metadata .items () <= catalog .load_namespace_properties (TEST_TABLE_NAMESPACE ).items ()
641- assert summary == PropertiesUpdateSummary (removed = [], updated = ["key3" , "key4" ], missing = [])
473+ assert summary .removed == []
474+ assert sorted (summary .updated ) == ["key3" , "key4" ]
475+ assert summary .missing == []
642476
643477
644478def test_update_namespace_metadata_removals (catalog : InMemoryCatalog ) -> None :
@@ -654,7 +488,9 @@ def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
654488 assert TEST_TABLE_NAMESPACE in catalog .list_namespaces ()
655489 assert new_metadata .items () <= catalog .load_namespace_properties (TEST_TABLE_NAMESPACE ).items ()
656490 assert remove_metadata .isdisjoint (catalog .load_namespace_properties (TEST_TABLE_NAMESPACE ).keys ())
657- assert summary == PropertiesUpdateSummary (removed = ["key1" ], updated = ["key3" , "key4" ], missing = [])
491+ assert summary .removed == ["key1" ]
492+ assert sorted (summary .updated ) == ["key3" , "key4" ]
493+ assert summary .missing == []
658494
659495
660496def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist (catalog : InMemoryCatalog ) -> None :
0 commit comments