Skip to content

Commit 2272e20

Browse files
authored
Glue catalog commit table (#140)
* Implement table metadata updater first draft * fix updater error and add tests * implement _commit_table for glue * implement apply_metadata_update which is simpler * remove old implementation * re-organize method place * fix nit * fix test * add another test * clear TODO * add a combined test * Fix merge conflict * update table metadata merged * implement requirements validation * change the exception to CommitFailedException * add docstring * use regex to parse the metadata version * fix lint issue * fix CI issue * make base_metadata optional and add null check * make base_metadata optional and add null check * add integration test * default skip-archive to true and comments * refactor tests * add doc and fix test after merge * make regex more robust, thanks Fokko! * Fix review comments, thanks Patrick!
1 parent 9038676 commit 2272e20

File tree

5 files changed

+218
-19
lines changed

5 files changed

+218
-19
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from __future__ import annotations
1919

2020
import logging
21+
import re
2122
import uuid
2223
from abc import ABC, abstractmethod
2324
from dataclasses import dataclass
@@ -74,6 +75,17 @@
7475
LOCATION = "location"
7576
EXTERNAL_TABLE = "EXTERNAL_TABLE"
7677

78+
TABLE_METADATA_FILE_NAME_REGEX = re.compile(
79+
r"""
80+
(\d+) # version number
81+
- # separator
82+
([\w-]{36}) # UUID (36 characters, including hyphens)
83+
(?:\.\w+)? # optional codec name
84+
\.metadata\.json # file extension
85+
""",
86+
re.X,
87+
)
88+
7789

7890
class CatalogType(Enum):
7991
REST = "rest"
@@ -587,8 +599,38 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) ->
587599
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
588600

589601
@staticmethod
590-
def _get_metadata_location(location: str) -> str:
591-
return f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json"
602+
def _get_metadata_location(location: str, new_version: int = 0) -> str:
603+
if new_version < 0:
604+
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
605+
version_str = f"{new_version:05d}"
606+
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"
607+
608+
@staticmethod
609+
def _parse_metadata_version(metadata_location: str) -> int:
610+
"""Parse the version from the metadata location.
611+
612+
The version is the first part of the file name, before the first dash.
613+
For example, the version of the metadata file
614+
`s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json`
615+
is 1.
616+
If the path does not comply with the pattern, the version is defaulted to be -1, ensuring
617+
that the next metadata file is treated as having version 0.
618+
619+
Args:
620+
metadata_location (str): The location of the metadata file.
621+
622+
Returns:
623+
int: The version of the metadata file. -1 if the file name does not have valid version string
624+
"""
625+
file_name = metadata_location.split("/")[-1]
626+
if file_name_match := TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name):
627+
try:
628+
uuid.UUID(file_name_match.group(2))
629+
except ValueError:
630+
return -1
631+
return int(file_name_match.group(1))
632+
else:
633+
return -1
592634

593635
def _get_updated_props_and_update_summary(
594636
self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties

pyiceberg/catalog/glue.py

Lines changed: 98 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,15 @@
4040
ICEBERG,
4141
LOCATION,
4242
METADATA_LOCATION,
43+
PREVIOUS_METADATA_LOCATION,
4344
TABLE_TYPE,
4445
Catalog,
4546
Identifier,
4647
Properties,
4748
PropertiesUpdateSummary,
4849
)
4950
from pyiceberg.exceptions import (
51+
CommitFailedException,
5052
NamespaceAlreadyExistsError,
5153
NamespaceNotEmptyError,
5254
NoSuchIcebergTableError,
@@ -59,21 +61,40 @@
5961
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6062
from pyiceberg.schema import Schema
6163
from pyiceberg.serializers import FromInputFile
62-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
64+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
6365
from pyiceberg.table.metadata import new_table_metadata
6466
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6567
from pyiceberg.typedef import EMPTY_DICT
6668

67-
68-
def _construct_parameters(metadata_location: str) -> Properties:
69-
return {TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}
70-
71-
72-
def _construct_create_table_input(table_name: str, metadata_location: str, properties: Properties) -> TableInputTypeDef:
69+
# If Glue should skip archiving an old table version when creating a new version in a commit. By
70+
# default, Glue archives all old table versions after an UpdateTable call, but Glue has a default
71+
# max number of archived table versions (can be increased). So for streaming use case with lots
72+
# of commits, it is recommended to set this value to true.
73+
GLUE_SKIP_ARCHIVE = "glue.skip-archive"
74+
GLUE_SKIP_ARCHIVE_DEFAULT = True
75+
76+
77+
def _construct_parameters(
78+
metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
79+
) -> Properties:
80+
new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
81+
new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location})
82+
if prev_metadata_location:
83+
new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location
84+
return new_parameters
85+
86+
87+
def _construct_table_input(
88+
table_name: str,
89+
metadata_location: str,
90+
properties: Properties,
91+
glue_table: Optional[TableTypeDef] = None,
92+
prev_metadata_location: Optional[str] = None,
93+
) -> TableInputTypeDef:
7394
table_input: TableInputTypeDef = {
7495
"Name": table_name,
7596
"TableType": EXTERNAL_TABLE,
76-
"Parameters": _construct_parameters(metadata_location),
97+
"Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location),
7798
}
7899

79100
if "Description" in properties:
@@ -177,6 +198,28 @@ def _create_glue_table(self, database_name: str, table_name: str, table_input: T
177198
except self.glue.exceptions.EntityNotFoundException as e:
178199
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e
179200

201+
def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None:
202+
try:
203+
self.glue.update_table(
204+
DatabaseName=database_name,
205+
TableInput=table_input,
206+
SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
207+
VersionId=version_id,
208+
)
209+
except self.glue.exceptions.EntityNotFoundException as e:
210+
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name} (Glue table version {version_id})") from e
211+
except self.glue.exceptions.ConcurrentModificationException as e:
212+
raise CommitFailedException(
213+
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
214+
) from e
215+
216+
def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
217+
try:
218+
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
219+
return load_table_response["Table"]
220+
except self.glue.exceptions.EntityNotFoundException as e:
221+
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e
222+
180223
def create_table(
181224
self,
182225
identifier: Union[str, Identifier],
@@ -215,7 +258,7 @@ def create_table(
215258
io = load_file_io(properties=self.properties, location=metadata_location)
216259
self._write_metadata(metadata, io, metadata_location)
217260

218-
table_input = _construct_create_table_input(table_name, metadata_location, properties)
261+
table_input = _construct_table_input(table_name, metadata_location, properties)
219262
database_name, table_name = self.identifier_to_database_and_table(identifier)
220263
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
221264

@@ -247,8 +290,52 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
247290
248291
Raises:
249292
NoSuchTableError: If a table with the given identifier does not exist.
293+
CommitFailedException: If the commit failed.
250294
"""
251-
raise NotImplementedError
295+
identifier_tuple = self.identifier_to_tuple_without_catalog(
296+
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
297+
)
298+
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
299+
300+
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
301+
glue_table_version_id = current_glue_table.get("VersionId")
302+
if not glue_table_version_id:
303+
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
304+
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
305+
base_metadata = current_table.metadata
306+
307+
# Validate the update requirements
308+
for requirement in table_request.requirements:
309+
requirement.validate(base_metadata)
310+
311+
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
312+
if updated_metadata == base_metadata:
313+
# no changes, do nothing
314+
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
315+
316+
# write new metadata
317+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
318+
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
319+
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
320+
321+
update_table_input = _construct_table_input(
322+
table_name=table_name,
323+
metadata_location=new_metadata_location,
324+
properties=current_table.properties,
325+
glue_table=current_glue_table,
326+
prev_metadata_location=current_table.metadata_location,
327+
)
328+
329+
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
330+
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
331+
self._update_glue_table(
332+
database_name=database_name,
333+
table_name=table_name,
334+
table_input=update_table_input,
335+
version_id=glue_table_version_id,
336+
)
337+
338+
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
252339

253340
def load_table(self, identifier: Union[str, Identifier]) -> Table:
254341
"""Load the table's metadata and returns the table instance.
@@ -267,12 +354,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
267354
"""
268355
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
269356
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
270-
try:
271-
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
272-
except self.glue.exceptions.EntityNotFoundException as e:
273-
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e
274357

275-
return self._convert_glue_to_iceberg(load_table_response["Table"])
358+
return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))
276359

277360
def drop_table(self, identifier: Union[str, Identifier]) -> None:
278361
"""Drop a table.

tests/catalog/integration_test_glue.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
TableAlreadyExistsError,
3232
)
3333
from pyiceberg.schema import Schema
34+
from pyiceberg.types import IntegerType
3435
from tests.conftest import clean_up, get_bucket_name, get_s3_path
3536

3637
# The number of tables/databases used in list_table/namespace test
@@ -61,6 +62,7 @@ def test_create_table(
6162
assert table.identifier == (CATALOG_NAME,) + identifier
6263
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
6364
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
65+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
6466

6567

6668
def test_create_table_with_invalid_location(table_schema_nested: Schema, table_name: str, database_name: str) -> None:
@@ -82,6 +84,7 @@ def test_create_table_with_default_location(
8284
assert table.identifier == (CATALOG_NAME,) + identifier
8385
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
8486
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
87+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
8588

8689

8790
def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None:
@@ -105,6 +108,7 @@ def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_na
105108
assert table.identifier == loaded_table.identifier
106109
assert table.metadata_location == loaded_table.metadata_location
107110
assert table.metadata == loaded_table.metadata
111+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
108112

109113

110114
def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_list: List[str]) -> None:
@@ -126,6 +130,7 @@ def test_rename_table(
126130
new_table_name = f"rename-{table_name}"
127131
identifier = (database_name, table_name)
128132
table = test_catalog.create_table(identifier, table_schema_nested)
133+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
129134
assert table.identifier == (CATALOG_NAME,) + identifier
130135
new_identifier = (new_database_name, new_table_name)
131136
test_catalog.rename_table(identifier, new_identifier)
@@ -261,3 +266,31 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str)
261266
else:
262267
assert k in update_report.removed
263268
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
269+
270+
271+
def test_commit_table_update_schema(
272+
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str
273+
) -> None:
274+
identifier = (database_name, table_name)
275+
test_catalog.create_namespace(namespace=database_name)
276+
table = test_catalog.create_table(identifier, table_schema_nested)
277+
original_table_metadata = table.metadata
278+
279+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
280+
assert original_table_metadata.current_schema_id == 0
281+
282+
transaction = table.transaction()
283+
update = transaction.update_schema()
284+
update.add_column(path="b", field_type=IntegerType())
285+
update.commit()
286+
transaction.commit_transaction()
287+
288+
updated_table_metadata = table.metadata
289+
290+
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
291+
assert updated_table_metadata.current_schema_id == 1
292+
assert len(updated_table_metadata.schemas) == 2
293+
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
294+
assert new_schema
295+
assert new_schema == update._apply()
296+
assert new_schema.find_field("b").field_type == IntegerType()

0 commit comments

Comments
 (0)