Skip to content

Commit 37918eb

Browse files
committed
extract InMemoryCatalog out of test
1 parent 1befad7 commit 37918eb

File tree

3 files changed

+236
-208
lines changed

3 files changed

+236
-208
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ class CatalogType(Enum):
9393
GLUE = "glue"
9494
DYNAMODB = "dynamodb"
9595
SQL = "sql"
96+
MEMORY = "memory"
9697

9798

9899
def load_rest(name: str, conf: Properties) -> Catalog:
@@ -137,12 +138,19 @@ def load_sql(name: str, conf: Properties) -> Catalog:
137138
raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-postgres]'") from exc
138139

139140

141+
def load_memory(name: str, conf: Properties) -> Catalog:
142+
from pyiceberg.catalog.memory import InMemoryCatalog
143+
144+
return InMemoryCatalog(name, **conf)
145+
146+
140147
AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
141148
CatalogType.REST: load_rest,
142149
CatalogType.HIVE: load_hive,
143150
CatalogType.GLUE: load_glue,
144151
CatalogType.DYNAMODB: load_dynamodb,
145152
CatalogType.SQL: load_sql,
153+
CatalogType.MEMORY: load_memory,
146154
}
147155

148156

pyiceberg/catalog/memory.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
from typing import (
2+
Dict,
3+
List,
4+
Optional,
5+
Set,
6+
Union,
7+
)
8+
9+
from pyiceberg.catalog import (
10+
Catalog,
11+
Identifier,
12+
Properties,
13+
PropertiesUpdateSummary,
14+
)
15+
from pyiceberg.exceptions import (
16+
NamespaceAlreadyExistsError,
17+
NamespaceNotEmptyError,
18+
NoSuchNamespaceError,
19+
NoSuchTableError,
20+
TableAlreadyExistsError,
21+
)
22+
from pyiceberg.io import load_file_io
23+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
24+
from pyiceberg.schema import Schema
25+
from pyiceberg.table import (
26+
AddSchemaUpdate,
27+
CommitTableRequest,
28+
CommitTableResponse,
29+
Table,
30+
)
31+
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
32+
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
33+
from pyiceberg.typedef import EMPTY_DICT
34+
35+
36+
class InMemoryCatalog(Catalog):
37+
"""An in-memory catalog implementation for testing purposes."""
38+
39+
__tables: Dict[Identifier, Table]
40+
__namespaces: Dict[Identifier, Properties]
41+
42+
def __init__(self, name: str, **properties: str) -> None:
43+
super().__init__(name, **properties)
44+
self.__tables = {}
45+
self.__namespaces = {}
46+
47+
def create_table(
48+
self,
49+
identifier: Union[str, Identifier],
50+
schema: Schema,
51+
location: Optional[str] = None,
52+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
53+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
54+
properties: Properties = EMPTY_DICT,
55+
) -> Table:
56+
identifier = Catalog.identifier_to_tuple(identifier)
57+
namespace = Catalog.namespace_from(identifier)
58+
59+
if identifier in self.__tables:
60+
raise TableAlreadyExistsError(f"Table already exists: {identifier}")
61+
else:
62+
if namespace not in self.__namespaces:
63+
self.__namespaces[namespace] = {}
64+
65+
new_location = location or f's3://warehouse/{"/".join(identifier)}/data'
66+
metadata = TableMetadataV1(**{
67+
"format-version": 1,
68+
"table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
69+
"location": new_location,
70+
"last-updated-ms": 1602638573874,
71+
"last-column-id": schema.highest_field_id,
72+
"schema": schema.model_dump(),
73+
"partition-spec": partition_spec.model_dump()["fields"],
74+
"properties": properties,
75+
"current-snapshot-id": -1,
76+
"snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}],
77+
})
78+
table = Table(
79+
identifier=identifier,
80+
metadata=metadata,
81+
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
82+
io=load_file_io(),
83+
catalog=self,
84+
)
85+
self.__tables[identifier] = table
86+
return table
87+
88+
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
89+
raise NotImplementedError
90+
91+
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
92+
new_metadata: Optional[TableMetadata] = None
93+
metadata_location = ""
94+
for update in table_request.updates:
95+
if isinstance(update, AddSchemaUpdate):
96+
add_schema_update: AddSchemaUpdate = update
97+
identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,)
98+
table = self.__tables[identifier]
99+
new_metadata = new_table_metadata(
100+
add_schema_update.schema_,
101+
table.metadata.partition_specs[0],
102+
table.sort_order(),
103+
table.location(),
104+
table.properties,
105+
table.metadata.table_uuid,
106+
)
107+
108+
table = Table(
109+
identifier=identifier,
110+
metadata=new_metadata,
111+
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
112+
io=load_file_io(),
113+
catalog=self,
114+
)
115+
116+
self.__tables[identifier] = table
117+
metadata_location = f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json'
118+
119+
return CommitTableResponse(
120+
metadata=new_metadata.model_dump() if new_metadata else {},
121+
metadata_location=metadata_location if metadata_location else "",
122+
)
123+
124+
def load_table(self, identifier: Union[str, Identifier]) -> Table:
125+
identifier = self.identifier_to_tuple_without_catalog(identifier)
126+
try:
127+
return self.__tables[identifier]
128+
except KeyError as error:
129+
raise NoSuchTableError(f"Table does not exist: {identifier}") from error
130+
131+
def drop_table(self, identifier: Union[str, Identifier]) -> None:
132+
identifier = self.identifier_to_tuple_without_catalog(identifier)
133+
try:
134+
self.__tables.pop(identifier)
135+
except KeyError as error:
136+
raise NoSuchTableError(f"Table does not exist: {identifier}") from error
137+
138+
def purge_table(self, identifier: Union[str, Identifier]) -> None:
139+
self.drop_table(identifier)
140+
141+
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
142+
from_identifier = self.identifier_to_tuple_without_catalog(from_identifier)
143+
try:
144+
table = self.__tables.pop(from_identifier)
145+
except KeyError as error:
146+
raise NoSuchTableError(f"Table does not exist: {from_identifier}") from error
147+
148+
to_identifier = Catalog.identifier_to_tuple(to_identifier)
149+
to_namespace = Catalog.namespace_from(to_identifier)
150+
if to_namespace not in self.__namespaces:
151+
self.__namespaces[to_namespace] = {}
152+
153+
self.__tables[to_identifier] = Table(
154+
identifier=to_identifier,
155+
metadata=table.metadata,
156+
metadata_location=table.metadata_location,
157+
io=load_file_io(),
158+
catalog=self,
159+
)
160+
return self.__tables[to_identifier]
161+
162+
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
163+
namespace = Catalog.identifier_to_tuple(namespace)
164+
if namespace in self.__namespaces:
165+
raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}")
166+
else:
167+
self.__namespaces[namespace] = properties if properties else {}
168+
169+
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
170+
namespace = Catalog.identifier_to_tuple(namespace)
171+
if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]:
172+
raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}")
173+
try:
174+
self.__namespaces.pop(namespace)
175+
except KeyError as error:
176+
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error
177+
178+
def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]:
179+
if namespace:
180+
namespace = Catalog.identifier_to_tuple(namespace)
181+
list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]
182+
else:
183+
list_tables = list(self.__tables.keys())
184+
185+
return list_tables
186+
187+
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
188+
# Hierarchical namespace is not supported. Return an empty list
189+
if namespace:
190+
return []
191+
192+
return list(self.__namespaces.keys())
193+
194+
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
195+
namespace = Catalog.identifier_to_tuple(namespace)
196+
try:
197+
return self.__namespaces[namespace]
198+
except KeyError as error:
199+
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error
200+
201+
def update_namespace_properties(
202+
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
203+
) -> PropertiesUpdateSummary:
204+
removed: Set[str] = set()
205+
updated: Set[str] = set()
206+
207+
namespace = Catalog.identifier_to_tuple(namespace)
208+
if namespace in self.__namespaces:
209+
if removals:
210+
for key in removals:
211+
if key in self.__namespaces[namespace]:
212+
del self.__namespaces[namespace][key]
213+
removed.add(key)
214+
if updates:
215+
for key, value in updates.items():
216+
self.__namespaces[namespace][key] = value
217+
updated.add(key)
218+
else:
219+
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")
220+
221+
expected_to_change = removed.difference(removals or set())
222+
223+
return PropertiesUpdateSummary(
224+
removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change)
225+
)

0 commit comments

Comments
 (0)