Skip to content

Commit 3910e5e

Browse files
committed
refactor hive's _commit_table
1 parent 7971593 commit 3910e5e

File tree

2 files changed

+60
-47
lines changed

2 files changed

+60
-47
lines changed

pyiceberg/catalog/hive.py

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@
6767
NamespaceNotEmptyError,
6868
NoSuchIcebergTableError,
6969
NoSuchNamespaceError,
70+
NoSuchPropertyException,
7071
NoSuchTableError,
7172
TableAlreadyExistsError,
7273
)
73-
from pyiceberg.io import FileIO, load_file_io
7474
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
7575
from pyiceberg.schema import Schema, SchemaVisitor, visit
7676
from pyiceberg.serializers import FromInputFile
@@ -247,10 +247,12 @@ def __init__(self, name: str, **properties: str):
247247
super().__init__(name, **properties)
248248
self._client = _HiveClient(properties["uri"], properties.get("ugi"))
249249

250-
def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
250+
def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
251251
properties: Dict[str, str] = table.parameters
252252
if TABLE_TYPE not in properties:
253-
raise NoSuchTableError(f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}")
253+
raise NoSuchPropertyException(
254+
f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}"
255+
)
254256

255257
table_type = properties[TABLE_TYPE]
256258
if table_type.lower() != ICEBERG:
@@ -261,8 +263,9 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
261263
if prop_metadata_location := properties.get(METADATA_LOCATION):
262264
metadata_location = prop_metadata_location
263265
else:
264-
raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing")
266+
raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} is missing")
265267

268+
io = self._load_file_io(location=metadata_location)
266269
file = io.new_input(metadata_location)
267270
metadata = FromInputFile.table_metadata(file)
268271
return Table(
@@ -299,6 +302,12 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None
299302
except AlreadyExistsException as e:
300303
raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e
301304

305+
def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable:
306+
try:
307+
return open_client.get_table(dbname=database_name, tbl_name=table_name)
308+
except NoSuchObjectException as e:
309+
raise NoSuchTableError(f"Table does not exists: {table_name}") from e
310+
302311
def create_table(
303312
self,
304313
identifier: Union[str, Identifier],
@@ -343,7 +352,7 @@ def create_table(
343352
self._create_hive_table(open_client, tbl)
344353
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
345354

346-
return self._convert_hive_into_iceberg(hive_table, staged_table.io)
355+
return self._convert_hive_into_iceberg(hive_table)
347356

348357
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
349358
"""Register a new table using existing metadata.
@@ -395,47 +404,53 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
395404
if lock.state != LockState.ACQUIRED:
396405
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
397406

398-
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
399-
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
400-
current_table = self._convert_hive_into_iceberg(hive_table, io)
407+
hive_table: Optional[HiveTable]
408+
current_table: Optional[Table]
409+
try:
410+
hive_table = self._get_hive_table(open_client, database_name, table_name)
411+
current_table = self._convert_hive_into_iceberg(hive_table)
412+
except NoSuchTableError:
413+
hive_table = None
414+
current_table = None
401415

402-
base_metadata = current_table.metadata
403416
for requirement in table_request.requirements:
404-
requirement.validate(base_metadata)
417+
requirement.validate(current_table.metadata if current_table else None)
405418

406-
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
407-
if updated_metadata == base_metadata:
408-
# no changes, do nothing
409-
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
410-
411-
# write new metadata
412-
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
413-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
414-
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
415-
416-
hive_table.parameters = _construct_parameters(
417-
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
418-
)
419-
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
420-
except NoSuchObjectException:
421419
updated_metadata = update_table_metadata(
422-
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
420+
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
421+
updates=table_request.updates,
422+
enforce_validation=current_table is None,
423423
)
424-
new_metadata_version = 0
424+
425+
if current_table and updated_metadata == current_table.metadata:
426+
# no changes, do nothing
427+
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
428+
429+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
425430
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
426431
io = self._load_file_io(updated_metadata.properties, new_metadata_location)
427-
self._write_metadata(updated_metadata, io, new_metadata_location)
428-
429-
tbl = self._convert_iceberg_into_hive(
430-
StagedTable(
431-
identifier=(self.name, database_name, table_name),
432-
metadata=updated_metadata,
433-
metadata_location=new_metadata_location,
434-
io=io,
435-
catalog=self,
436-
)
432+
self._write_metadata(
433+
metadata=updated_metadata,
434+
io=io,
435+
metadata_path=new_metadata_location,
437436
)
438-
self._create_hive_table(open_client, tbl)
437+
438+
if hive_table and current_table:
439+
hive_table.parameters = _construct_parameters(
440+
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
441+
)
442+
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
443+
else:
444+
hive_table = self._convert_iceberg_into_hive(
445+
StagedTable(
446+
identifier=(self.name, database_name, table_name),
447+
metadata=updated_metadata,
448+
metadata_location=new_metadata_location,
449+
io=io,
450+
catalog=self,
451+
)
452+
)
453+
self._create_hive_table(open_client, hive_table)
439454
finally:
440455
open_client.unlock(UnlockRequest(lockid=lock.lockid))
441456

@@ -458,14 +473,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
458473
"""
459474
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
460475
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
461-
try:
462-
with self._client as open_client:
463-
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
464-
except NoSuchObjectException as e:
465-
raise NoSuchTableError(f"Table does not exists: {table_name}") from e
466476

467-
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
468-
return self._convert_hive_into_iceberg(hive_table, io)
477+
with self._client as open_client:
478+
hive_table = self._get_hive_table(open_client, database_name, table_name)
479+
480+
return self._convert_hive_into_iceberg(hive_table)
469481

470482
def drop_table(self, identifier: Union[str, Identifier]) -> None:
471483
"""Drop a table.

tests/integration/test_writes/test_writes.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
from pyiceberg.catalog import Catalog
3636
from pyiceberg.catalog.hive import HiveCatalog
37+
from pyiceberg.catalog.rest import RestCatalog
3738
from pyiceberg.catalog.sql import SqlCatalog
3839
from pyiceberg.exceptions import NoSuchTableError
3940
from pyiceberg.table import TableProperties, _dataframe_to_data_files
@@ -609,10 +610,10 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None
609610

610611

611612
@pytest.mark.integration
612-
@pytest.mark.parametrize("format_version", [2])
613+
@pytest.mark.parametrize("format_version", [1, 2])
613614
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')])
614615
def test_create_table_transaction(catalog: Catalog, format_version: int) -> None:
615-
if format_version == 1:
616+
if format_version == 1 and isinstance(catalog, RestCatalog):
616617
pytest.skip(
617618
"There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table"
618619
)

0 commit comments

Comments
 (0)