Skip to content

Commit 54610a2

Browse files
committed
Merge branch 'hive-lock-refactor'
2 parents 984592a + 3b09ecd commit 54610a2

File tree

2 files changed

+168
-55
lines changed

2 files changed

+168
-55
lines changed

pyiceberg/catalog/hive.py

Lines changed: 72 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@
136136
DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS"
137137
DO_NOT_UPDATE_STATS_DEFAULT = "true"
138138

139+
NO_LOCK_EXPECTED_KEY = "expected_parameter_key"
140+
NO_LOCK_EXPECTED_VALUE = "expected_parameter_value"
141+
139142
logger = logging.getLogger(__name__)
140143

141144

@@ -499,6 +502,66 @@ def _do_wait_for_lock() -> LockResponse:
499502

500503
return _do_wait_for_lock()
501504

505+
@staticmethod
506+
def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool:
507+
"""Determine whether HMS locking is enabled for a commit.
508+
509+
Matches the Java implementation in HiveTableOperations: checks the table property first,
510+
then falls back to catalog properties, then defaults to True.
511+
"""
512+
if TableProperties.HIVE_LOCK_ENABLED in table_properties:
513+
return property_as_bool(
514+
table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT
515+
)
516+
return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT)
517+
518+
def commit_table(
519+
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
520+
) -> CommitTableResponse:
521+
"""Commit updates to a table.
522+
523+
Args:
524+
table (Table): The table to be updated.
525+
requirements: (Tuple[TableRequirement, ...]): Table requirements.
526+
updates: (Tuple[TableUpdate, ...]): Table updates.
527+
528+
Returns:
529+
CommitTableResponse: The updated metadata.
530+
531+
Raises:
532+
NoSuchTableError: If a table with the given identifier does not exist.
533+
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
534+
"""
535+
table_identifier = table.name()
536+
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
537+
lock_enabled = self._hive_lock_enabled(table.properties, self.properties)
538+
# commit to hive
539+
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
540+
with self._client as open_client:
541+
if lock_enabled:
542+
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
543+
544+
try:
545+
if lock.state != LockState.ACQUIRED:
546+
if lock.state == LockState.WAITING:
547+
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
548+
else:
549+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")
550+
551+
return self._do_commit(
552+
open_client, table_identifier, database_name, table_name, requirements, updates,
553+
lock_enabled=True,
554+
)
555+
except WaitingForLockException as e:
556+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
557+
finally:
558+
open_client.unlock(UnlockRequest(lockid=lock.lockid))
559+
else:
560+
return self._do_commit(
561+
open_client, table_identifier, database_name, table_name, requirements, updates,
562+
lock_enabled=False,
563+
)
564+
502565
def _do_commit(
503566
self,
504567
open_client: Client,
@@ -507,10 +570,13 @@ def _do_commit(
507570
table_name: str,
508571
requirements: tuple[TableRequirement, ...],
509572
updates: tuple[TableUpdate, ...],
573+
lock_enabled: bool = True,
510574
) -> CommitTableResponse:
511575
"""Perform the actual commit logic (get table, update, write metadata, alter/create in HMS).
512576
513577
This method contains the core commit logic, separated from locking concerns.
578+
When lock_enabled is False, an optimistic concurrency check via the HMS EnvironmentContext
579+
is used instead (requires HIVE-26882 on the server).
514580
"""
515581
hive_table: HiveTable | None
516582
current_table: Table | None
@@ -566,11 +632,16 @@ def _do_commit(
566632
updated_staged_table.location(),
567633
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
568634
)
635+
env_context_properties: dict[str, str] = {DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}
636+
if not lock_enabled:
637+
env_context_properties[NO_LOCK_EXPECTED_KEY] = PROP_METADATA_LOCATION
638+
env_context_properties[NO_LOCK_EXPECTED_VALUE] = current_table.metadata_location
639+
569640
open_client.alter_table_with_environment_context(
570641
dbname=database_name,
571642
tbl_name=table_name,
572643
new_tbl=hive_table,
573-
environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
644+
environment_context=EnvironmentContext(properties=env_context_properties),
574645
)
575646
else:
576647
# Table does not exist, create it.
@@ -589,60 +660,6 @@ def _do_commit(
589660
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
590661
)
591662

592-
@staticmethod
593-
def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool:
594-
"""Determine whether HMS locking is enabled for a commit.
595-
596-
Matches the Java implementation in HiveTableOperations: checks the table property first,
597-
then falls back to catalog properties, then defaults to True.
598-
"""
599-
if TableProperties.HIVE_LOCK_ENABLED in table_properties:
600-
return property_as_bool(
601-
table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT
602-
)
603-
return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT)
604-
605-
def commit_table(
606-
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
607-
) -> CommitTableResponse:
608-
"""Commit updates to a table.
609-
610-
Args:
611-
table (Table): The table to be updated.
612-
requirements: (Tuple[TableRequirement, ...]): Table requirements.
613-
updates: (Tuple[TableUpdate, ...]): Table updates.
614-
615-
Returns:
616-
CommitTableResponse: The updated metadata.
617-
618-
Raises:
619-
NoSuchTableError: If a table with the given identifier does not exist.
620-
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
621-
"""
622-
table_identifier = table.name()
623-
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
624-
lock_enabled = self._hive_lock_enabled(table.properties, self.properties)
625-
# commit to hive
626-
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
627-
with self._client as open_client:
628-
if lock_enabled:
629-
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
630-
631-
try:
632-
if lock.state != LockState.ACQUIRED:
633-
if lock.state == LockState.WAITING:
634-
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
635-
else:
636-
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")
637-
638-
return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)
639-
except WaitingForLockException as e:
640-
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
641-
finally:
642-
open_client.unlock(UnlockRequest(lockid=lock.lockid))
643-
else:
644-
return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)
645-
646663
def load_table(self, identifier: str | Identifier) -> Table:
647664
"""Load the table's metadata and return the table instance.
648665

tests/catalog/test_hive.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
LOCK_CHECK_MAX_WAIT_TIME,
5252
LOCK_CHECK_MIN_WAIT_TIME,
5353
LOCK_CHECK_RETRIES,
54+
NO_LOCK_EXPECTED_KEY,
55+
NO_LOCK_EXPECTED_VALUE,
56+
PROP_METADATA_LOCATION,
5457
HiveCatalog,
5558
_construct_hive_storage_descriptor,
5659
_HiveClient,
@@ -1455,6 +1458,8 @@ def test_commit_table_skips_locking_when_table_property_disables_it() -> None:
14551458
catalog.commit_table(mock_table, requirements=(), updates=())
14561459

14571460
mock_do_commit.assert_called_once()
1461+
_, kwargs = mock_do_commit.call_args
1462+
assert kwargs["lock_enabled"] is False
14581463
catalog._client.__enter__().lock.assert_not_called()
14591464
catalog._client.__enter__().check_lock.assert_not_called()
14601465
catalog._client.__enter__().unlock.assert_not_called()
@@ -1507,3 +1512,94 @@ def test_commit_table_uses_locking_by_default() -> None:
15071512
mock_client.lock.assert_called_once()
15081513
mock_client.unlock.assert_called_once()
15091514
mock_do_commit.assert_called_once()
1515+
_, kwargs = mock_do_commit.call_args
1516+
assert kwargs["lock_enabled"] is True
1517+
1518+
1519+
def test_do_commit_env_context_includes_expected_params_when_lock_disabled() -> None:
1520+
"""When lock_enabled=False, alter_table_with_environment_context must include
1521+
expected_parameter_key and expected_parameter_value for optimistic concurrency."""
1522+
prop = {"uri": HIVE_METASTORE_FAKE_URL}
1523+
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1524+
1525+
current_metadata_location = "s3://bucket/db/table/metadata/v1.metadata.json"
1526+
1527+
mock_client = MagicMock()
1528+
mock_hive_table = MagicMock()
1529+
mock_hive_table.parameters = {
1530+
"table_type": "ICEBERG",
1531+
"metadata_location": current_metadata_location,
1532+
}
1533+
mock_client.get_table.return_value = mock_hive_table
1534+
1535+
with (
1536+
patch.object(catalog, "_convert_hive_into_iceberg") as mock_convert,
1537+
patch.object(catalog, "_update_and_stage_table") as mock_stage,
1538+
patch.object(catalog, "_write_metadata"),
1539+
patch.object(catalog, "_convert_iceberg_into_hive"),
1540+
patch("pyiceberg.catalog.hive.CommitTableResponse"),
1541+
):
1542+
mock_current_table = MagicMock()
1543+
mock_current_table.metadata_location = current_metadata_location
1544+
mock_current_table.metadata = MagicMock()
1545+
mock_current_table.properties = {}
1546+
mock_convert.return_value = mock_current_table
1547+
1548+
mock_staged = MagicMock()
1549+
mock_staged.metadata = MagicMock()
1550+
mock_staged.properties = {}
1551+
mock_stage.return_value = mock_staged
1552+
1553+
catalog._do_commit(
1554+
mock_client, ("default", "my_table"), "default", "my_table",
1555+
requirements=(), updates=(), lock_enabled=False,
1556+
)
1557+
1558+
mock_client.alter_table_with_environment_context.assert_called_once()
1559+
env_ctx = mock_client.alter_table_with_environment_context.call_args[1]["environment_context"]
1560+
assert env_ctx.properties[NO_LOCK_EXPECTED_KEY] == PROP_METADATA_LOCATION
1561+
assert env_ctx.properties[NO_LOCK_EXPECTED_VALUE] == current_metadata_location
1562+
assert env_ctx.properties[DO_NOT_UPDATE_STATS] == DO_NOT_UPDATE_STATS_DEFAULT
1563+
1564+
1565+
def test_do_commit_env_context_excludes_expected_params_when_lock_enabled() -> None:
1566+
"""When lock_enabled=True (default), alter_table_with_environment_context must NOT include
1567+
expected_parameter_key or expected_parameter_value."""
1568+
prop = {"uri": HIVE_METASTORE_FAKE_URL}
1569+
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1570+
1571+
mock_client = MagicMock()
1572+
mock_hive_table = MagicMock()
1573+
mock_hive_table.parameters = {
1574+
"table_type": "ICEBERG",
1575+
"metadata_location": "s3://bucket/db/table/metadata/v1.metadata.json",
1576+
}
1577+
mock_client.get_table.return_value = mock_hive_table
1578+
1579+
with (
1580+
patch.object(catalog, "_convert_hive_into_iceberg") as mock_convert,
1581+
patch.object(catalog, "_update_and_stage_table") as mock_stage,
1582+
patch.object(catalog, "_write_metadata"),
1583+
patch.object(catalog, "_convert_iceberg_into_hive"),
1584+
patch("pyiceberg.catalog.hive.CommitTableResponse"),
1585+
):
1586+
mock_current_table = MagicMock()
1587+
mock_current_table.metadata = MagicMock()
1588+
mock_current_table.properties = {}
1589+
mock_convert.return_value = mock_current_table
1590+
1591+
mock_staged = MagicMock()
1592+
mock_staged.metadata = MagicMock()
1593+
mock_staged.properties = {}
1594+
mock_stage.return_value = mock_staged
1595+
1596+
catalog._do_commit(
1597+
mock_client, ("default", "my_table"), "default", "my_table",
1598+
requirements=(), updates=(), lock_enabled=True,
1599+
)
1600+
1601+
mock_client.alter_table_with_environment_context.assert_called_once()
1602+
env_ctx = mock_client.alter_table_with_environment_context.call_args[1]["environment_context"]
1603+
assert NO_LOCK_EXPECTED_KEY not in env_ctx.properties
1604+
assert NO_LOCK_EXPECTED_VALUE not in env_ctx.properties
1605+
assert env_ctx.properties[DO_NOT_UPDATE_STATS] == DO_NOT_UPDATE_STATS_DEFAULT

0 commit comments

Comments
 (0)