Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 119 additions & 87 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,109 @@ def _do_wait_for_lock() -> LockResponse:

return _do_wait_for_lock()

def _do_commit(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you move this below the commit_table function so that github diff can (hopefully) render it better

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still looks like we have a lot of noise here :(

self,
open_client: Client,
table_identifier: Identifier,
database_name: str,
table_name: str,
requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...],
) -> CommitTableResponse:
"""Perform the actual commit logic (get table, update, write metadata, alter/create in HMS).

This method contains the core commit logic, separated from locking concerns.
"""
hive_table: HiveTable | None
current_table: Table | None
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
current_table = self._convert_hive_into_iceberg(hive_table)
except NoSuchTableError:
hive_table = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

if hive_table and current_table:
# Table exists, update it.

# Note on table properties:
# - Iceberg table properties are stored in both HMS and Iceberg metadata JSON.
# - Updates are reflected in both locations
# - Existing HMS table properties (set by external systems like Hive/Spark) are preserved.
#
# While it is possible to modify HMS table properties through this API, it is not recommended:
# - Mixing HMS-specific properties in Iceberg metadata can cause confusion
# - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific)
# - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg
# (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata,
# and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS)
new_iceberg_properties = _construct_parameters(
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
metadata_properties=updated_staged_table.properties,
)
# Detect properties that were removed from Iceberg metadata
deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys()

# Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties
existing_hms_parameters = dict(hive_table.parameters or {})
for key in deleted_iceberg_properties:
existing_hms_parameters.pop(key, None)
existing_hms_parameters.update(new_iceberg_properties)
hive_table.parameters = existing_hms_parameters

# Update hive's schema and properties
hive_table.sd = _construct_hive_storage_descriptor(
updated_staged_table.schema(),
updated_staged_table.location(),
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
)
open_client.alter_table_with_environment_context(
dbname=database_name,
tbl_name=table_name,
new_tbl=hive_table,
environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expected_parameter_key and expected_parameter_value look missing. Could you confirm HiveOperationsBase.java?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I extended my PR. Hopefully I got that right, as the flow is pretty complex. I pushed the code to our fork to see if it hasn't affected our production jobs.

)
else:
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(database_name, table_name),
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
catalog=self,
)
)
self._create_hive_table(open_client, hive_table)

return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

@staticmethod
def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool:
"""Determine whether HMS locking is enabled for a commit.

Matches the Java implementation in HiveTableOperations: checks the table property first,
then falls back to catalog properties, then defaults to True.
"""
if TableProperties.HIVE_LOCK_ENABLED in table_properties:
return property_as_bool(
table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT
)
return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT)

def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
Expand All @@ -518,98 +621,27 @@ def commit_table(
"""
table_identifier = table.name()
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
lock_enabled = self._hive_lock_enabled(table.properties, self.properties)
# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
if lock_enabled:
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))

try:
if lock.state != LockState.ACQUIRED:
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")

hive_table: HiveTable | None
current_table: Table | None
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
current_table = self._convert_hive_into_iceberg(hive_table)
except NoSuchTableError:
hive_table = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

if hive_table and current_table:
# Table exists, update it.

# Note on table properties:
# - Iceberg table properties are stored in both HMS and Iceberg metadata JSON.
# - Updates are reflected in both locations
# - Existing HMS table properties (set by external systems like Hive/Spark) are preserved.
#
# While it is possible to modify HMS table properties through this API, it is not recommended:
# - Mixing HMS-specific properties in Iceberg metadata can cause confusion
# - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific)
# - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg
# (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata,
# and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS)
new_iceberg_properties = _construct_parameters(
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
metadata_properties=updated_staged_table.properties,
)
# Detect properties that were removed from Iceberg metadata
deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys()

# Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties
existing_hms_parameters = dict(hive_table.parameters or {})
for key in deleted_iceberg_properties:
existing_hms_parameters.pop(key, None)
existing_hms_parameters.update(new_iceberg_properties)
hive_table.parameters = existing_hms_parameters

# Update hive's schema and properties
hive_table.sd = _construct_hive_storage_descriptor(
updated_staged_table.schema(),
updated_staged_table.location(),
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
)
open_client.alter_table_with_environment_context(
dbname=database_name,
tbl_name=table_name,
new_tbl=hive_table,
environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
)
else:
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(database_name, table_name),
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
catalog=self,
)
)
self._create_hive_table(open_client, hive_table)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)
if lock.state != LockState.ACQUIRED:
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")

return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))
else:
return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)

def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and return the table instance.
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ class TableProperties:
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1

HIVE_LOCK_ENABLED = "engine.hive.lock-enabled"
HIVE_LOCK_ENABLED_DEFAULT = True


class Transaction:
_table: Table
Expand Down
100 changes: 100 additions & 0 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
)
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import (
Expand Down Expand Up @@ -1407,3 +1408,102 @@ def test_create_hive_client_with_kerberos_using_context_manager(
# closing and re-opening work as expected.
with client as open_client:
assert open_client._iprot.trans.isOpen()


def test_hive_lock_enabled_defaults_to_true() -> None:
"""Without any lock property set, locking should be enabled (backward compatible)."""
assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties={}) is True


def test_hive_lock_enabled_table_property_disables_lock() -> None:
"""Table property engine.hive.lock-enabled=false disables locking."""
table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties={}) is False


def test_hive_lock_enabled_catalog_property_disables_lock() -> None:
"""Catalog property engine.hive.lock-enabled=false disables locking when table doesn't set it."""
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties=catalog_props) is False


def test_hive_lock_enabled_table_property_overrides_catalog() -> None:
"""Table property takes precedence over catalog property."""
table_props = {TableProperties.HIVE_LOCK_ENABLED: "true"}
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is True

table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "true"}
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is False


def test_commit_table_skips_locking_when_table_property_disables_it() -> None:
"""When table property engine.hive.lock-enabled=false, commit_table must not lock/unlock."""
prop = {"uri": HIVE_METASTORE_FAKE_URL}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
catalog._client = MagicMock()

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")
mock_table.properties = {TableProperties.HIVE_LOCK_ENABLED: "false"}

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

mock_do_commit.assert_called_once()
catalog._client.__enter__().lock.assert_not_called()
catalog._client.__enter__().check_lock.assert_not_called()
catalog._client.__enter__().unlock.assert_not_called()


def test_commit_table_skips_locking_when_catalog_property_disables_it() -> None:
"""When catalog property engine.hive.lock-enabled=false, commit_table must not lock/unlock."""
prop = {"uri": HIVE_METASTORE_FAKE_URL, TableProperties.HIVE_LOCK_ENABLED: "false"}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
catalog._client = MagicMock()

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")
mock_table.properties = {}

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

mock_do_commit.assert_called_once()
catalog._client.__enter__().lock.assert_not_called()
catalog._client.__enter__().check_lock.assert_not_called()
catalog._client.__enter__().unlock.assert_not_called()


def test_commit_table_uses_locking_by_default() -> None:
"""When no lock property is set, commit_table must acquire and release a lock."""
lockid = 99999
prop = {"uri": HIVE_METASTORE_FAKE_URL}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)

mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.lock.return_value = LockResponse(lockid=lockid, state=LockState.ACQUIRED)
catalog._client = mock_client

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")
mock_table.properties = {}

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

mock_client.lock.assert_called_once()
mock_client.unlock.assert_called_once()
mock_do_commit.assert_called_once()