Skip to content

Commit dcab2bf

Browse files
committed
implement commit table
1 parent 09de790 commit dcab2bf

File tree

1 file changed

+115
-4
lines changed

1 file changed

+115
-4
lines changed

pyiceberg/catalog/bigquery_metastore.py

Lines changed: 115 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,14 @@
2626
from google.oauth2 import service_account
2727

2828
from pyiceberg.catalog import WAREHOUSE_LOCATION, MetastoreCatalog, PropertiesUpdateSummary
29-
from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError
29+
from pyiceberg.exceptions import (
30+
CommitFailedException,
31+
CommitStateUnknownException,
32+
NamespaceAlreadyExistsError,
33+
NoSuchNamespaceError,
34+
NoSuchTableError,
35+
TableAlreadyExistsError,
36+
)
3037
from pyiceberg.io import load_file_io
3138
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
3239
from pyiceberg.schema import Schema
@@ -229,7 +236,88 @@ def drop_table(self, identifier: str | Identifier) -> None:
229236
def commit_table(
230237
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
231238
) -> CommitTableResponse:
232-
raise NotImplementedError
239+
table_identifier = table.name()
240+
dataset_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
241+
table_ref = TableReference(
242+
dataset_ref=DatasetReference(project=self.project_id, dataset_id=dataset_name),
243+
table_id=table_name,
244+
)
245+
246+
current_bq_table: BQTable | None
247+
current_table: Table | None
248+
try:
249+
current_bq_table = self.client.get_table(table_ref)
250+
except NotFound:
251+
current_bq_table = None
252+
current_table = None
253+
else:
254+
current_table = self._convert_bigquery_table_to_iceberg_table(table_identifier, current_bq_table)
255+
256+
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
257+
if current_table and updated_staged_table.metadata == current_table.metadata:
258+
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
259+
260+
self._write_metadata(
261+
metadata=updated_staged_table.metadata,
262+
io=updated_staged_table.io,
263+
metadata_path=updated_staged_table.metadata_location,
264+
)
265+
266+
commit_error: Exception | None = None
267+
try:
268+
if current_bq_table and current_table:
269+
current_bq_table.external_catalog_table_options = self._create_external_catalog_table_options(
270+
updated_staged_table.metadata.location,
271+
self._create_table_parameters(
272+
metadata_file_location=updated_staged_table.metadata_location,
273+
table_metadata=updated_staged_table.metadata,
274+
previous_metadata_location=current_table.metadata_location,
275+
),
276+
)
277+
self.client.update_table(current_bq_table, ["external_catalog_table_options"])
278+
else:
279+
self.client.create_table(
280+
self._make_new_table(
281+
updated_staged_table.metadata,
282+
updated_staged_table.metadata_location,
283+
table_ref,
284+
)
285+
)
286+
except NotFound as e:
287+
commit_error = (
288+
CommitFailedException(f"Table does not exist: {dataset_name}.{table_name}")
289+
if current_table
290+
else NoSuchNamespaceError(f"Namespace does not exist: {dataset_name}")
291+
)
292+
commit_error.__cause__ = e
293+
except Conflict as e:
294+
commit_error = (
295+
CommitFailedException(f"Table has been updated by another process: {dataset_name}.{table_name}")
296+
if current_table
297+
else TableAlreadyExistsError(f"Table {table_name} already exists")
298+
)
299+
commit_error.__cause__ = e
300+
except Exception as e:
301+
commit_error = e
302+
finally:
303+
if commit_error:
304+
commit_status = self._check_bigquery_commit_status(table_ref, updated_staged_table.metadata_location)
305+
if commit_status == "SUCCESS":
306+
commit_error = None
307+
elif commit_status == "UNKNOWN":
308+
raise CommitStateUnknownException(
309+
f"Commit state unknown for table {dataset_name}.{table_name}"
310+
) from commit_error
311+
312+
if commit_error:
313+
raise commit_error
314+
315+
if current_table:
316+
self._delete_old_metadata(updated_staged_table.io, current_table.metadata, updated_staged_table.metadata)
317+
318+
return CommitTableResponse(
319+
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
320+
)
233321

234322
def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table:
235323
raise NotImplementedError
@@ -381,11 +469,20 @@ def _convert_bigquery_table_to_iceberg_table(self, identifier: str | Identifier,
381469
catalog=self,
382470
)
383471

384-
def _create_table_parameters(self, metadata_file_location: str, table_metadata: TableMetadata) -> dict[str, Any]:
385-
parameters: dict[str, Any] = table_metadata.properties
472+
def _create_table_parameters(
473+
self,
474+
metadata_file_location: str,
475+
table_metadata: TableMetadata,
476+
previous_metadata_location: str | None = None,
477+
) -> dict[str, Any]:
478+
parameters: dict[str, Any] = dict(table_metadata.properties)
386479
if table_metadata.table_uuid:
387480
parameters["uuid"] = str(table_metadata.table_uuid)
388481
parameters[METADATA_LOCATION_PROP] = metadata_file_location
482+
if previous_metadata_location:
483+
parameters[PREVIOUS_METADATA_LOCATION_PROP] = previous_metadata_location
484+
else:
485+
parameters.pop(PREVIOUS_METADATA_LOCATION_PROP, None)
389486
parameters[TABLE_TYPE_PROP] = ICEBERG_TABLE_TYPE_VALUE
390487
parameters["EXTERNAL"] = True
391488

@@ -405,6 +502,20 @@ def _create_table_parameters(self, metadata_file_location: str, table_metadata:
405502

406503
return parameters
407504

505+
def _check_bigquery_commit_status(self, table_ref: TableReference, new_metadata_location: str) -> str:
506+
try:
507+
bq_table = self.client.get_table(table_ref)
508+
parameters = (
509+
bq_table.external_catalog_table_options.parameters
510+
if bq_table.external_catalog_table_options and bq_table.external_catalog_table_options.parameters
511+
else {}
512+
)
513+
return "SUCCESS" if parameters.get(METADATA_LOCATION_PROP) == new_metadata_location else "FAILURE"
514+
except NotFound:
515+
return "FAILURE"
516+
except Exception:
517+
return "UNKNOWN"
518+
408519
def _default_storage_location(self, location: str | None, dataset_ref: DatasetReference) -> str | None:
409520
if location:
410521
return location

0 commit comments

Comments
 (0)