Skip to content

Commit 5e105ad

Browse files
committed
fix(glue): Support create_table_transaction for S3 Tables federated databases
For S3 Tables, the warehouse location is managed by S3 Tables and only available after a Glue table entry is created. create_table() already handled this via _create_table_s3tables(), but create_table_transaction() went through the base class path which failed with 'No default path is set'. This adds a create_table_transaction() override on GlueCatalog that, for S3 Tables federated databases: 1. Pre-creates a minimal Glue table entry so S3 Tables allocates storage 2. Retrieves the managed location from the Glue table 3. Builds the staged table targeting that managed location 4. Returns an _S3TablesCreateTableTransaction that cleans up the staging table on abort/failure Also refactors commit_table() to handle the case where a Glue table exists but has no metadata_location yet (the staging table case), branching on current_glue_table instead of current_table for the update-vs-create decision.
1 parent 536a7d0 commit 5e105ad

File tree

2 files changed

+231
-4
lines changed

2 files changed

+231
-4
lines changed

pyiceberg/catalog/glue.py

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818

1919
import logging
20+
from types import TracebackType
2021
from typing import (
2122
TYPE_CHECKING,
2223
Any,
@@ -55,6 +56,8 @@
5556
from pyiceberg.serializers import FromInputFile, ToOutputFile
5657
from pyiceberg.table import (
5758
CommitTableResponse,
59+
CreateTableTransaction,
60+
StagedTable,
5861
Table,
5962
)
6063
from pyiceberg.table.metadata import TableMetadata
@@ -314,6 +317,47 @@ def add_glue_catalog_id(params: dict[str, str], **kwargs: Any) -> None:
314317
event_system.register("provide-client-params.glue", add_glue_catalog_id)
315318

316319

320+
class _S3TablesCreateTableTransaction(CreateTableTransaction):
321+
"""CreateTableTransaction that cleans up the S3 Tables staging table on failure.
322+
323+
When ``create_table_transaction`` pre-creates a Glue table entry for an S3 Tables
324+
federated database, that entry must be deleted if the transaction is never committed
325+
(e.g. an exception inside the ``with`` block) or if the commit itself fails.
326+
"""
327+
328+
def __init__(self, staged_table: StagedTable, catalog: "GlueCatalog", database_name: str, table_name: str):
329+
super().__init__(staged_table)
330+
self._catalog = catalog
331+
self._database_name = database_name
332+
self._table_name = table_name
333+
self._staging_table_needs_cleanup = True
334+
335+
def commit_transaction(self) -> Table:
336+
try:
337+
result = super().commit_transaction()
338+
self._staging_table_needs_cleanup = False # commit succeeded; staging table is now the real table
339+
return result
340+
except Exception:
341+
self._cleanup_staging_table()
342+
raise
343+
344+
def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
345+
super().__exit__(exctype, excinst, exctb)
346+
self._cleanup_staging_table()
347+
348+
def _cleanup_staging_table(self) -> None:
349+
if not self._staging_table_needs_cleanup:
350+
return
351+
self._staging_table_needs_cleanup = False
352+
try:
353+
self._catalog.glue.delete_table(DatabaseName=self._database_name, Name=self._table_name)
354+
except Exception:
355+
logger.warning(
356+
f"Failed to clean up S3 Tables staging table {self._database_name}.{self._table_name}",
357+
exc_info=logger.isEnabledFor(logging.DEBUG),
358+
)
359+
360+
317361
class GlueCatalog(MetastoreCatalog):
318362
glue: "GlueClient"
319363

@@ -601,6 +645,82 @@ def create_table(
601645
catalog=self,
602646
)
603647

648+
def create_table_transaction(
649+
self,
650+
identifier: str | Identifier,
651+
schema: Union[Schema, "pa.Schema"],
652+
location: str | None = None,
653+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
654+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
655+
properties: Properties = EMPTY_DICT,
656+
) -> CreateTableTransaction:
657+
"""Create a CreateTableTransaction.
658+
659+
For S3 Tables federated databases, storage must be allocated before the table
660+
metadata can be built, because S3 Tables manages the table location. This
661+
override pre-creates a minimal Glue table entry, retrieves the managed location,
662+
and then builds the staged table targeting that location. The commit path in
663+
``commit_table`` will find the existing Glue table and update it with the final
664+
metadata pointer.
665+
666+
For non-S3 Tables databases, this delegates to the base class.
667+
"""
668+
database_name, table_name = self.identifier_to_database_and_table(identifier)
669+
670+
if not self._is_s3tables_database(database_name):
671+
return super().create_table_transaction(
672+
identifier=identifier,
673+
schema=schema,
674+
location=location,
675+
partition_spec=partition_spec,
676+
sort_order=sort_order,
677+
properties=properties,
678+
)
679+
680+
if location is not None:
681+
raise ValueError(
682+
f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. "
683+
"S3 Tables manages the storage location automatically."
684+
)
685+
686+
# Create a minimal table in Glue so S3 Tables allocates storage
687+
self._create_glue_table(
688+
database_name=database_name,
689+
table_name=table_name,
690+
table_input={
691+
"Name": table_name,
692+
"Parameters": {"format": "ICEBERG"},
693+
},
694+
)
695+
696+
try:
697+
# Retrieve the managed storage location.
698+
glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
699+
storage_descriptor = glue_table.get("StorageDescriptor", {})
700+
managed_location = storage_descriptor.get("Location")
701+
if not managed_location:
702+
raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}")
703+
704+
staged_table = self._create_staged_table(
705+
identifier=identifier,
706+
schema=schema,
707+
location=managed_location,
708+
partition_spec=partition_spec,
709+
sort_order=sort_order,
710+
properties=properties,
711+
)
712+
except Exception:
713+
try:
714+
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
715+
except Exception:
716+
logger.warning(
717+
f"Failed to clean up S3 Tables table {database_name}.{table_name}",
718+
exc_info=logger.isEnabledFor(logging.DEBUG),
719+
)
720+
raise
721+
722+
return _S3TablesCreateTableTransaction(staged_table, self, database_name, table_name)
723+
604724
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
605725
"""Register a new table using existing metadata.
606726
@@ -649,7 +769,12 @@ def commit_table(
649769
try:
650770
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
651771
glue_table_version_id = current_glue_table.get("VersionId")
652-
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
772+
# A staging table (pre-created by create_table_transaction for S3 Tables)
773+
# exists in Glue but has no metadata_location yet — skip loading Iceberg metadata.
774+
if current_glue_table.get("Parameters", {}).get(METADATA_LOCATION):
775+
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
776+
else:
777+
current_table = None
653778
except NoSuchTableError:
654779
current_glue_table = None
655780
glue_table_version_id = None
@@ -669,8 +794,9 @@ def commit_table(
669794
metadata_path=updated_staged_table.metadata_location,
670795
)
671796

672-
if current_table:
673-
# table exists, update the table
797+
if current_glue_table is not None:
798+
# Glue table exists — either a fully committed table or a staging table
799+
# pre-created by create_table_transaction. Update it with the metadata pointer.
674800
if not glue_table_version_id:
675801
raise CommitFailedException(
676802
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
@@ -684,7 +810,7 @@ def commit_table(
684810
properties=updated_staged_table.properties,
685811
metadata=updated_staged_table.metadata,
686812
glue_table=current_glue_table,
687-
prev_metadata_location=current_table.metadata_location,
813+
prev_metadata_location=current_table.metadata_location if current_table else None,
688814
)
689815
self._update_glue_table(
690816
database_name=database_name,

tests/catalog/test_glue.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,107 @@ def test_create_duplicated_table(
312312
test_catalog.create_table(identifier, table_schema_nested)
313313

314314

315+
@mock_aws
316+
def test_create_table_transaction_s3tables(
317+
monkeypatch: pytest.MonkeyPatch,
318+
_bucket_initialize: None,
319+
moto_endpoint_url: str,
320+
table_schema_nested: Schema,
321+
database_name: str,
322+
table_name: str,
323+
) -> None:
324+
_patch_moto_for_s3tables(monkeypatch)
325+
326+
identifier = (database_name, table_name)
327+
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
328+
_create_s3tables_database(test_catalog, database_name)
329+
330+
with test_catalog.create_table_transaction(
331+
identifier,
332+
table_schema_nested,
333+
properties={"test_key": "test_value"},
334+
):
335+
pass
336+
337+
table = test_catalog.load_table(identifier)
338+
assert table.name() == identifier
339+
assert table.location().rstrip("/") == f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}"
340+
assert table.properties["test_key"] == "test_value"
341+
342+
343+
@mock_aws
344+
def test_create_table_transaction_s3tables_with_schema_evolution(
345+
monkeypatch: pytest.MonkeyPatch,
346+
_bucket_initialize: None,
347+
moto_endpoint_url: str,
348+
table_schema_nested: Schema,
349+
database_name: str,
350+
table_name: str,
351+
) -> None:
352+
_patch_moto_for_s3tables(monkeypatch)
353+
354+
identifier = (database_name, table_name)
355+
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
356+
_create_s3tables_database(test_catalog, database_name)
357+
358+
with test_catalog.create_table_transaction(
359+
identifier,
360+
table_schema_nested,
361+
) as txn:
362+
with txn.update_schema() as update_schema:
363+
update_schema.add_column(path="new_col", field_type=IntegerType())
364+
365+
table = test_catalog.load_table(identifier)
366+
assert table.schema().find_field("new_col").field_type == IntegerType()
367+
368+
369+
@mock_aws
370+
def test_create_table_transaction_s3tables_rejects_location(
371+
monkeypatch: pytest.MonkeyPatch,
372+
_bucket_initialize: None,
373+
moto_endpoint_url: str,
374+
table_schema_nested: Schema,
375+
database_name: str,
376+
table_name: str,
377+
) -> None:
378+
_patch_moto_for_s3tables(monkeypatch)
379+
380+
identifier = (database_name, table_name)
381+
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
382+
_create_s3tables_database(test_catalog, database_name)
383+
384+
with pytest.raises(ValueError, match="Cannot specify a location for S3 Tables table"):
385+
test_catalog.create_table_transaction(identifier, table_schema_nested, location="s3://some-bucket/some-path")
386+
387+
388+
@mock_aws
389+
def test_create_table_transaction_s3tables_cleanup_on_exception(
390+
monkeypatch: pytest.MonkeyPatch,
391+
_bucket_initialize: None,
392+
moto_endpoint_url: str,
393+
table_schema_nested: Schema,
394+
database_name: str,
395+
table_name: str,
396+
) -> None:
397+
"""Staging table should be cleaned up if the transaction is not committed."""
398+
_patch_moto_for_s3tables(monkeypatch)
399+
400+
identifier = (database_name, table_name)
401+
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
402+
_create_s3tables_database(test_catalog, database_name)
403+
404+
with pytest.raises(RuntimeError, match="intentional"):
405+
with test_catalog.create_table_transaction(
406+
identifier,
407+
table_schema_nested,
408+
):
409+
raise RuntimeError("intentional")
410+
411+
# The staging table should have been cleaned up, so creating the table again should work.
412+
table = test_catalog.create_table(identifier, table_schema_nested) # type: ignore[unreachable]
413+
assert table.name() == identifier
414+
415+
315416
@mock_aws
316417
def test_load_table(
317418
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str

0 commit comments

Comments
 (0)