1616# under the License.
1717# pylint:disable=redefined-outer-name
1818
19+
20+ import uuid
21+ from pathlib import PosixPath
1922from typing import (
2023 Dict ,
2124 List ,
4245 NoSuchTableError ,
4346 TableAlreadyExistsError ,
4447)
45- from pyiceberg .io import load_file_io
48+ from pyiceberg .io import WAREHOUSE , load_file_io
4649from pyiceberg .partitioning import UNPARTITIONED_PARTITION_SPEC , PartitionField , PartitionSpec
4750from pyiceberg .schema import Schema
4851from pyiceberg .table import (
5558 TableIdentifier ,
5659 update_table_metadata ,
5760)
58- from pyiceberg .table .metadata import TableMetadataV1
61+ from pyiceberg .table .metadata import new_table_metadata
5962from pyiceberg .table .sorting import UNSORTED_SORT_ORDER , SortOrder
6063from pyiceberg .transforms import IdentityTransform
6164from pyiceberg .typedef import EMPTY_DICT
6265from pyiceberg .types import IntegerType , LongType , NestedField
6366
67+ DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"
68+
6469
6570class InMemoryCatalog (Catalog ):
66- """An in-memory catalog implementation for testing purposes."""
71+ """
72+ An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.
73+
74+ This is useful for test, demo, and playground but not in production as data is not persisted.
75+ """
6776
6877 __tables : Dict [Identifier , Table ]
6978 __namespaces : Dict [Identifier , Properties ]
@@ -72,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None:
7281 super ().__init__ (name , ** properties )
7382 self .__tables = {}
7483 self .__namespaces = {}
84+ self ._warehouse_location = properties .get (WAREHOUSE , None ) or DEFAULT_WAREHOUSE_LOCATION
7585
7686 def create_table (
7787 self ,
@@ -81,6 +91,7 @@ def create_table(
8191 partition_spec : PartitionSpec = UNPARTITIONED_PARTITION_SPEC ,
8292 sort_order : SortOrder = UNSORTED_SORT_ORDER ,
8393 properties : Properties = EMPTY_DICT ,
94+ table_uuid : Optional [uuid .UUID ] = None ,
8495 ) -> Table :
8596 schema : Schema = self ._convert_schema_if_needed (schema ) # type: ignore
8697
@@ -93,24 +104,26 @@ def create_table(
93104 if namespace not in self .__namespaces :
94105 self .__namespaces [namespace ] = {}
95106
96- new_location = location or f's3://warehouse/{ "/" .join (identifier )} /data'
97- metadata = TableMetadataV1 (** {
98- "format-version" : 1 ,
99- "table-uuid" : "d20125c8-7284-442c-9aea-15fee620737c" ,
100- "location" : new_location ,
101- "last-updated-ms" : 1602638573874 ,
102- "last-column-id" : schema .highest_field_id ,
103- "schema" : schema .model_dump (),
104- "partition-spec" : partition_spec .model_dump ()["fields" ],
105- "properties" : properties ,
106- "current-snapshot-id" : - 1 ,
107- "snapshots" : [{"snapshot-id" : 1925 , "timestamp-ms" : 1602638573822 }],
108- })
107+ if not location :
108+ location = f'{ self ._warehouse_location } /{ "/" .join (identifier )} '
109+
110+ metadata_location = self ._get_metadata_location (location = location )
111+ metadata = new_table_metadata (
112+ schema = schema ,
113+ partition_spec = partition_spec ,
114+ sort_order = sort_order ,
115+ location = location ,
116+ properties = properties ,
117+ table_uuid = table_uuid ,
118+ )
119+ io = load_file_io ({** self .properties , ** properties }, location = location )
120+ self ._write_metadata (metadata , io , metadata_location )
121+
109122 table = Table (
110123 identifier = identifier ,
111124 metadata = metadata ,
112- metadata_location = f's3://warehouse/ { "/" . join ( identifier ) } /metadata/metadata.json' ,
113- io = load_file_io () ,
125+ metadata_location = metadata_location ,
126+ io = io ,
114127 catalog = self ,
115128 )
116129 self .__tables [identifier ] = table
@@ -120,14 +133,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
120133 raise NotImplementedError
121134
122135 def _commit_table (self , table_request : CommitTableRequest ) -> CommitTableResponse :
123- identifier = tuple (table_request .identifier .namespace .root ) + (table_request .identifier .name ,)
124- table = self .__tables [identifier ]
125- table .metadata = update_table_metadata (base_metadata = table .metadata , updates = table_request .updates )
126-
127- return CommitTableResponse (
128- metadata = table .metadata .model_dump (),
129- metadata_location = table .location (),
136+ identifier_tuple = self .identifier_to_tuple_without_catalog (
137+ tuple (table_request .identifier .namespace .root + [table_request .identifier .name ])
130138 )
139+ current_table = self .load_table (identifier_tuple )
140+ base_metadata = current_table .metadata
141+
142+ for requirement in table_request .requirements :
143+ requirement .validate (base_metadata )
144+
145+ updated_metadata = update_table_metadata (base_metadata , table_request .updates )
146+ if updated_metadata == base_metadata :
147+ # no changes, do nothing
148+ return CommitTableResponse (metadata = base_metadata , metadata_location = current_table .metadata_location )
149+
150+ # write new metadata
151+ new_metadata_version = self ._parse_metadata_version (current_table .metadata_location ) + 1
152+ new_metadata_location = self ._get_metadata_location (current_table .metadata .location , new_metadata_version )
153+ self ._write_metadata (updated_metadata , current_table .io , new_metadata_location )
154+
155+ # update table state
156+ current_table .metadata = updated_metadata
157+
158+ return CommitTableResponse (metadata = updated_metadata , metadata_location = new_metadata_location )
131159
132160 def load_table (self , identifier : Union [str , Identifier ]) -> Table :
133161 identifier = self .identifier_to_tuple_without_catalog (identifier )
@@ -162,7 +190,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
162190 identifier = to_identifier ,
163191 metadata = table .metadata ,
164192 metadata_location = table .metadata_location ,
165- io = load_file_io ( ),
193+ io = self . _load_file_io ( properties = table . metadata . properties , location = table . metadata_location ),
166194 catalog = self ,
167195 )
168196 return self .__tables [to_identifier ]
@@ -234,8 +262,8 @@ def update_namespace_properties(
234262
235263
236264@pytest .fixture
237- def catalog () -> InMemoryCatalog :
238- return InMemoryCatalog ("test.in.memory. catalog" , ** {"test.key" : "test.value" })
265+ def catalog (tmp_path : PosixPath ) -> InMemoryCatalog :
266+ return InMemoryCatalog ("test.in_memory. catalog" , ** {WAREHOUSE : tmp_path . absolute (). as_posix (), "test.key" : "test.value" })
239267
240268
241269TEST_TABLE_IDENTIFIER = ("com" , "organization" , "department" , "my_table" )
@@ -246,7 +274,6 @@ def catalog() -> InMemoryCatalog:
246274 NestedField (2 , "y" , LongType (), doc = "comment" ),
247275 NestedField (3 , "z" , LongType ()),
248276)
249- TEST_TABLE_LOCATION = "protocol://some/location"
250277TEST_TABLE_PARTITION_SPEC = PartitionSpec (PartitionField (name = "x" , transform = IdentityTransform (), source_id = 1 , field_id = 1000 ))
251278TEST_TABLE_PROPERTIES = {"key1" : "value1" , "key2" : "value2" }
252279NO_SUCH_TABLE_ERROR = "Table does not exist: \\ ('com', 'organization', 'department', 'my_table'\\ )"
@@ -263,7 +290,6 @@ def given_catalog_has_a_table(
263290 return catalog .create_table (
264291 identifier = TEST_TABLE_IDENTIFIER ,
265292 schema = TEST_TABLE_SCHEMA ,
266- location = TEST_TABLE_LOCATION ,
267293 partition_spec = TEST_TABLE_PARTITION_SPEC ,
268294 properties = properties or TEST_TABLE_PROPERTIES ,
269295 )
@@ -309,13 +335,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None:
309335 table = catalog .create_table (
310336 identifier = TEST_TABLE_IDENTIFIER ,
311337 schema = TEST_TABLE_SCHEMA ,
312- location = TEST_TABLE_LOCATION ,
313338 partition_spec = TEST_TABLE_PARTITION_SPEC ,
314339 properties = TEST_TABLE_PROPERTIES ,
315340 )
316341 assert catalog .load_table (TEST_TABLE_IDENTIFIER ) == table
317342
318343
344+ def test_create_table_location_override (catalog : InMemoryCatalog ) -> None :
345+ new_location = f"{ catalog ._warehouse_location } /new_location"
346+ table = catalog .create_table (
347+ identifier = TEST_TABLE_IDENTIFIER ,
348+ schema = TEST_TABLE_SCHEMA ,
349+ location = new_location ,
350+ partition_spec = TEST_TABLE_PARTITION_SPEC ,
351+ properties = TEST_TABLE_PROPERTIES ,
352+ )
353+ assert catalog .load_table (TEST_TABLE_IDENTIFIER ) == table
354+ assert table .location () == new_location
355+
356+
319357@pytest .mark .parametrize (
320358 "schema,expected" ,
321359 [
@@ -337,8 +375,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si
337375 table = catalog .create_table (
338376 identifier = TEST_TABLE_IDENTIFIER ,
339377 schema = pyarrow_schema_simple_without_ids ,
340- location = TEST_TABLE_LOCATION ,
341- partition_spec = TEST_TABLE_PARTITION_SPEC ,
342378 properties = TEST_TABLE_PROPERTIES ,
343379 )
344380 assert catalog .load_table (TEST_TABLE_IDENTIFIER ) == table
@@ -584,6 +620,7 @@ def test_commit_table(catalog: InMemoryCatalog) -> None:
584620 NestedField (2 , "y" , LongType (), doc = "comment" ),
585621 NestedField (3 , "z" , LongType ()),
586622 NestedField (4 , "add" , LongType ()),
623+ schema_id = 1 ,
587624 )
588625
589626 # When
@@ -664,7 +701,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:
664701
665702def test_catalog_repr (catalog : InMemoryCatalog ) -> None :
666703 s = repr (catalog )
667- assert s == "test.in.memory .catalog (<class 'test_base.InMemoryCatalog'>)"
704+ assert s == "test.in_memory .catalog (<class 'test_base.InMemoryCatalog'>)"
668705
669706
670707def test_table_properties_int_value (catalog : InMemoryCatalog ) -> None :
0 commit comments