Skip to content

Commit 7971593

Browse files
committed
Merge branch 'refactor_glue_commit_table' into create_table_transaction_for_hive_and_sql
2 parents 99e4371 + 4efc975 commit 7971593

File tree

3 files changed

+63
-42
lines changed

3 files changed

+63
-42
lines changed

pyiceberg/catalog/glue.py

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -439,71 +439,73 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
439439
)
440440
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
441441

442+
current_glue_table: Optional[TableTypeDef]
443+
glue_table_version_id: Optional[str]
444+
current_table: Optional[Table]
442445
try:
443446
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
444-
# Update the table
445447
glue_table_version_id = current_glue_table.get("VersionId")
446-
if not glue_table_version_id:
447-
raise CommitFailedException(
448-
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
449-
)
450448
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
451-
base_metadata = current_table.metadata
449+
except NoSuchTableError:
450+
current_glue_table = None
451+
glue_table_version_id = None
452+
current_table = None
453+
454+
for requirement in table_request.requirements:
455+
requirement.validate(current_table.metadata if current_table else None)
452456

453-
# Validate the update requirements
454-
for requirement in table_request.requirements:
455-
requirement.validate(base_metadata)
457+
updated_metadata = update_table_metadata(
458+
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
459+
updates=table_request.updates,
460+
enforce_validation=current_table is None,
461+
)
456462

457-
updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates)
458-
if updated_metadata == base_metadata:
459-
# no changes, do nothing
460-
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
463+
if current_table and updated_metadata == current_table.metadata:
464+
# no changes, do nothing
465+
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
461466

462-
# write new metadata
463-
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
464-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
465-
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
467+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
468+
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
469+
self._write_metadata(
470+
metadata=updated_metadata,
471+
io=self._load_file_io(updated_metadata.properties, new_metadata_location),
472+
metadata_path=new_metadata_location,
473+
)
474+
475+
if current_table:
476+
# table exists, update the table
477+
if not glue_table_version_id:
478+
raise CommitFailedException(
479+
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
480+
)
466481

482+
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
483+
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
467484
update_table_input = _construct_table_input(
468485
table_name=table_name,
469486
metadata_location=new_metadata_location,
470-
properties=current_table.properties,
487+
properties=updated_metadata.properties,
471488
metadata=updated_metadata,
472489
glue_table=current_glue_table,
473490
prev_metadata_location=current_table.metadata_location,
474491
)
475-
476-
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
477-
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
478492
self._update_glue_table(
479493
database_name=database_name,
480494
table_name=table_name,
481495
table_input=update_table_input,
482496
version_id=glue_table_version_id,
483497
)
484-
485-
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
486-
except NoSuchTableError:
487-
# Create the table
488-
updated_metadata = update_table_metadata(
489-
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
490-
)
491-
new_metadata_version = 0
492-
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
493-
self._write_metadata(
494-
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
495-
)
496-
498+
else:
499+
# table does not exist, create the table
497500
create_table_input = _construct_table_input(
498501
table_name=table_name,
499502
metadata_location=new_metadata_location,
500503
properties=updated_metadata.properties,
501504
metadata=updated_metadata,
502505
)
503-
504506
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)
505507

506-
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
508+
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
507509

508510
def load_table(self, identifier: Union[str, Identifier]) -> Table:
509511
"""Load the table's metadata and returns the table instance.

tests/catalog/integration_test_glue.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,21 +462,29 @@ def test_commit_table_update_schema(
462462
]
463463

464464

465-
def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
465+
def test_commit_table_properties(
466+
test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str
467+
) -> None:
466468
identifier = (database_name, table_name)
467469
test_catalog.create_namespace(namespace=database_name)
468470
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"})
469471

470472
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
471473

472474
transaction = table.transaction()
473-
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
475+
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
474476
transaction.remove_properties("test_b")
475477
transaction.commit_transaction()
476478

477479
updated_table_metadata = table.metadata
478480
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1
479-
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
481+
assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"}
482+
483+
table_info = glue.get_table(
484+
DatabaseName=database_name,
485+
Name=table_name,
486+
)
487+
assert table_info["Table"]["Description"] == "test_description"
480488

481489

482490
@pytest.mark.parametrize("format_version", [1, 2])

tests/catalog/test_glue.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,12 @@ def test_commit_table_update_schema(
677677

678678
@mock_aws
679679
def test_commit_table_properties(
680-
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
680+
_glue: boto3.client,
681+
_bucket_initialize: None,
682+
moto_endpoint_url: str,
683+
table_schema_nested: Schema,
684+
database_name: str,
685+
table_name: str,
681686
) -> None:
682687
catalog_name = "glue"
683688
identifier = (database_name, table_name)
@@ -688,13 +693,19 @@ def test_commit_table_properties(
688693
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
689694

690695
transaction = table.transaction()
691-
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
696+
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
692697
transaction.remove_properties("test_b")
693698
transaction.commit_transaction()
694699

695700
updated_table_metadata = table.metadata
696701
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
697-
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
702+
assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"}
703+
704+
table_info = _glue.get_table(
705+
DatabaseName=database_name,
706+
Name=table_name,
707+
)
708+
assert table_info["Table"]["Description"] == "test_description"
698709

699710

700711
@mock_aws

0 commit comments

Comments
 (0)