Skip to content

Commit acfa5bc

Browse files
authored
Implement Hive catalog _commit_table (#294)
* implement hive catalog _commit_table * also works for table version upgrade
1 parent 1befad7 commit acfa5bc

File tree

3 files changed

+40
-9
lines changed

3 files changed

+40
-9
lines changed

pyiceberg/catalog/hive.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6767
from pyiceberg.schema import Schema, SchemaVisitor, visit
6868
from pyiceberg.serializers import FromInputFile
69-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
69+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
7070
from pyiceberg.table.metadata import new_table_metadata
7171
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
7272
from pyiceberg.typedef import EMPTY_DICT
@@ -150,6 +150,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str])
150150
PROP_TABLE_TYPE = "table_type"
151151
PROP_METADATA_LOCATION = "metadata_location"
152152
PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
153+
DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
153154

154155

155156
def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]:
@@ -272,14 +273,19 @@ def create_table(
272273
AlreadyExistsError: If a table with the name already exists.
273274
ValueError: If the identifier is invalid.
274275
"""
276+
properties = {**DEFAULT_PROPERTIES, **properties}
275277
database_name, table_name = self.identifier_to_database_and_table(identifier)
276278
current_time_millis = int(time.time() * 1000)
277279

278280
location = self._resolve_table_location(location, database_name, table_name)
279281

280282
metadata_location = self._get_metadata_location(location=location)
281283
metadata = new_table_metadata(
282-
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
284+
location=location,
285+
schema=schema,
286+
partition_spec=partition_spec,
287+
sort_order=sort_order,
288+
properties=properties,
283289
)
284290
io = load_file_io({**self.properties, **properties}, location=location)
285291
self._write_metadata(metadata, io, metadata_location)
@@ -330,7 +336,37 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
330336
Raises:
331337
NoSuchTableError: If a table with the given identifier does not exist.
332338
"""
333-
raise NotImplementedError
339+
identifier_tuple = self.identifier_to_tuple_without_catalog(
340+
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
341+
)
342+
current_table = self.load_table(identifier_tuple)
343+
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
344+
base_metadata = current_table.metadata
345+
for requirement in table_request.requirements:
346+
requirement.validate(base_metadata)
347+
348+
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
349+
if updated_metadata == base_metadata:
350+
# no changes, do nothing
351+
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
352+
353+
# write new metadata
354+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
355+
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
356+
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
357+
358+
# commit to hive
359+
try:
360+
with self._client as open_client:
361+
tbl = open_client.get_table(dbname=database_name, tbl_name=table_name)
362+
tbl.parameters = _construct_parameters(
363+
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
364+
)
365+
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl)
366+
except NoSuchObjectException as e:
367+
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
368+
369+
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
334370

335371
def load_table(self, identifier: Union[str, Identifier]) -> Table:
336372
"""Load the table's metadata and return the table instance.

tests/catalog/test_hive.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase,
278278
],
279279
current_schema_id=0,
280280
last_partition_id=1000,
281-
properties={"owner": "javaberg"},
281+
properties={"owner": "javaberg", 'write.parquet.compression-codec': 'zstd'},
282282
partition_specs=[PartitionSpec()],
283283
default_spec_id=0,
284284
current_snapshot_id=None,

tests/integration/test_reads.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
from pyarrow.fs import S3FileSystem
2626

2727
from pyiceberg.catalog import Catalog, load_catalog
28-
from pyiceberg.catalog.hive import HiveCatalog
2928
from pyiceberg.exceptions import NoSuchTableError
3029
from pyiceberg.expressions import (
3130
And,
@@ -101,8 +100,6 @@ def create_table(catalog: Catalog) -> Table:
101100
@pytest.mark.integration
102101
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')])
103102
def test_table_properties(catalog: Catalog) -> None:
104-
if isinstance(catalog, HiveCatalog):
105-
pytest.skip("Not yet implemented: https://github.com/apache/iceberg-python/issues/275")
106103
table = create_table(catalog)
107104

108105
assert table.properties == DEFAULT_PROPERTIES
@@ -398,8 +395,6 @@ def test_filter_on_new_column(catalog: Catalog) -> None:
398395
@pytest.mark.integration
399396
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')])
400397
def test_upgrade_table_version(catalog: Catalog) -> None:
401-
if isinstance(catalog, HiveCatalog):
402-
pytest.skip("Not yet implemented: https://github.com/apache/iceberg-python/issues/274")
403398
table_test_table_version = catalog.load_table("default.test_table_version")
404399

405400
assert table_test_table_version.format_version == 1

0 commit comments

Comments
 (0)