Skip to content

Commit 984592a

Browse files
authored
Merge pull request #2 from Automattic/hive-lock-refactor
Refactored locking
2 parents 3f18368 + 4c58d42 commit 984592a

File tree

3 files changed

+70
-24
lines changed

3 files changed

+70
-24
lines changed

pyiceberg/catalog/hive.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@
127127
HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name"
128128
HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive"
129129

130-
LOCK_ENABLED = "lock-enabled"
131-
DEFAULT_LOCK_ENABLED = True
132-
133130
LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
134131
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
135132
LOCK_CHECK_RETRIES = "lock-check-retries"
@@ -304,7 +301,6 @@ def __init__(self, name: str, **properties: str):
304301
super().__init__(name, **properties)
305302
self._client = self._create_hive_client(properties)
306303

307-
self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED)
308304
self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME)
309305
self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME)
310306
self._lock_check_retries = property_as_float(
@@ -593,6 +589,19 @@ def _do_commit(
593589
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
594590
)
595591

592+
@staticmethod
593+
def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool:
594+
"""Determine whether HMS locking is enabled for a commit.
595+
596+
Matches the Java implementation in HiveTableOperations: checks the table property first,
597+
then falls back to catalog properties, then defaults to True.
598+
"""
599+
if TableProperties.HIVE_LOCK_ENABLED in table_properties:
600+
return property_as_bool(
601+
table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT
602+
)
603+
return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT)
604+
596605
def commit_table(
597606
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
598607
) -> CommitTableResponse:
@@ -612,10 +621,11 @@ def commit_table(
612621
"""
613622
table_identifier = table.name()
614623
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
624+
lock_enabled = self._hive_lock_enabled(table.properties, self.properties)
615625
# commit to hive
616626
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
617627
with self._client as open_client:
618-
if self._lock_enabled:
628+
if lock_enabled:
619629
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
620630

621631
try:

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ class TableProperties:
247247
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
248248
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
249249

250+
HIVE_LOCK_ENABLED = "engine.hive.lock-enabled"
251+
HIVE_LOCK_ENABLED_DEFAULT = True
252+
250253

251254
class Transaction:
252255
_table: Table

tests/catalog/test_hive.py

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
LOCK_CHECK_MAX_WAIT_TIME,
5252
LOCK_CHECK_MIN_WAIT_TIME,
5353
LOCK_CHECK_RETRIES,
54-
LOCK_ENABLED,
5554
HiveCatalog,
5655
_construct_hive_storage_descriptor,
5756
_HiveClient,
@@ -66,6 +65,7 @@
6665
)
6766
from pyiceberg.partitioning import PartitionField, PartitionSpec
6867
from pyiceberg.schema import Schema
68+
from pyiceberg.table import TableProperties
6969
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2
7070
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
7171
from pyiceberg.table.snapshots import (
@@ -1410,46 +1410,80 @@ def test_create_hive_client_with_kerberos_using_context_manager(
14101410
assert open_client._iprot.trans.isOpen()
14111411

14121412

1413-
def test_lock_enabled_defaults_to_true() -> None:
1414-
"""Verify that lock-enabled defaults to True for backward compatibility."""
1413+
def test_hive_lock_enabled_defaults_to_true() -> None:
1414+
"""Without any lock property set, locking should be enabled (backward compatible)."""
1415+
assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties={}) is True
1416+
1417+
1418+
def test_hive_lock_enabled_table_property_disables_lock() -> None:
1419+
"""Table property engine.hive.lock-enabled=false disables locking."""
1420+
table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
1421+
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties={}) is False
1422+
1423+
1424+
def test_hive_lock_enabled_catalog_property_disables_lock() -> None:
1425+
"""Catalog property engine.hive.lock-enabled=false disables locking when table doesn't set it."""
1426+
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
1427+
assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties=catalog_props) is False
1428+
1429+
1430+
def test_hive_lock_enabled_table_property_overrides_catalog() -> None:
1431+
"""Table property takes precedence over catalog property."""
1432+
table_props = {TableProperties.HIVE_LOCK_ENABLED: "true"}
1433+
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
1434+
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is True
1435+
1436+
table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
1437+
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "true"}
1438+
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is False
1439+
1440+
1441+
def test_commit_table_skips_locking_when_table_property_disables_it() -> None:
1442+
"""When table property engine.hive.lock-enabled=false, commit_table must not lock/unlock."""
14151443
prop = {"uri": HIVE_METASTORE_FAKE_URL}
14161444
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1417-
assert catalog._lock_enabled is True
1445+
catalog._client = MagicMock()
1446+
1447+
mock_table = MagicMock()
1448+
mock_table.name.return_value = ("default", "my_table")
1449+
mock_table.properties = {TableProperties.HIVE_LOCK_ENABLED: "false"}
14181450

1451+
mock_do_commit = MagicMock()
1452+
mock_do_commit.return_value = MagicMock()
14191453

1420-
def test_lock_enabled_can_be_disabled() -> None:
1421-
"""Verify that lock-enabled can be set to false."""
1422-
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
1423-
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1424-
assert catalog._lock_enabled is False
1454+
with patch.object(catalog, "_do_commit", mock_do_commit):
1455+
catalog.commit_table(mock_table, requirements=(), updates=())
1456+
1457+
mock_do_commit.assert_called_once()
1458+
catalog._client.__enter__().lock.assert_not_called()
1459+
catalog._client.__enter__().check_lock.assert_not_called()
1460+
catalog._client.__enter__().unlock.assert_not_called()
14251461

14261462

1427-
def test_commit_table_skips_locking_when_lock_disabled() -> None:
1428-
"""When lock-enabled is false, commit_table must not call lock, check_lock, or unlock."""
1429-
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
1463+
def test_commit_table_skips_locking_when_catalog_property_disables_it() -> None:
1464+
"""When catalog property engine.hive.lock-enabled=false, commit_table must not lock/unlock."""
1465+
prop = {"uri": HIVE_METASTORE_FAKE_URL, TableProperties.HIVE_LOCK_ENABLED: "false"}
14301466
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
14311467
catalog._client = MagicMock()
14321468

14331469
mock_table = MagicMock()
14341470
mock_table.name.return_value = ("default", "my_table")
1471+
mock_table.properties = {}
14351472

14361473
mock_do_commit = MagicMock()
14371474
mock_do_commit.return_value = MagicMock()
14381475

14391476
with patch.object(catalog, "_do_commit", mock_do_commit):
14401477
catalog.commit_table(mock_table, requirements=(), updates=())
14411478

1442-
# The core commit logic should still be called
14431479
mock_do_commit.assert_called_once()
1444-
1445-
# But no locking operations should have been performed
14461480
catalog._client.__enter__().lock.assert_not_called()
14471481
catalog._client.__enter__().check_lock.assert_not_called()
14481482
catalog._client.__enter__().unlock.assert_not_called()
14491483

14501484

1451-
def test_commit_table_uses_locking_when_lock_enabled() -> None:
1452-
"""When lock-enabled is true (default), commit_table must call lock and unlock."""
1485+
def test_commit_table_uses_locking_by_default() -> None:
1486+
"""When no lock property is set, commit_table must acquire and release a lock."""
14531487
lockid = 99999
14541488
prop = {"uri": HIVE_METASTORE_FAKE_URL}
14551489
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
@@ -1462,15 +1496,14 @@ def test_commit_table_uses_locking_when_lock_enabled() -> None:
14621496

14631497
mock_table = MagicMock()
14641498
mock_table.name.return_value = ("default", "my_table")
1499+
mock_table.properties = {}
14651500

14661501
mock_do_commit = MagicMock()
14671502
mock_do_commit.return_value = MagicMock()
14681503

14691504
with patch.object(catalog, "_do_commit", mock_do_commit):
14701505
catalog.commit_table(mock_table, requirements=(), updates=())
14711506

1472-
# Locking operations should have been performed
14731507
mock_client.lock.assert_called_once()
14741508
mock_client.unlock.assert_called_once()
1475-
# The core commit logic should still be called
14761509
mock_do_commit.assert_called_once()

0 commit comments

Comments
 (0)