Skip to content

Commit 7bbd0c1

Browse files
authored
Merge pull request #1 from Automattic/disable-locks
Added lock disable
2 parents 7d4a8ef + 0677f8e commit 7bbd0c1

File tree

2 files changed

+171
-87
lines changed

2 files changed

+171
-87
lines changed

pyiceberg/catalog/hive.py

Lines changed: 104 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@
127127
HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name"
128128
HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive"
129129

130+
LOCK_ENABLED = "lock-enabled"
131+
DEFAULT_LOCK_ENABLED = True
132+
130133
LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
131134
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
132135
LOCK_CHECK_RETRIES = "lock-check-retries"
@@ -301,6 +304,7 @@ def __init__(self, name: str, **properties: str):
301304
super().__init__(name, **properties)
302305
self._client = self._create_hive_client(properties)
303306

307+
self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED)
304308
self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME)
305309
self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME)
306310
self._lock_check_retries = property_as_float(
@@ -499,6 +503,91 @@ def _do_wait_for_lock() -> LockResponse:
499503

500504
return _do_wait_for_lock()
501505

506+
def _do_commit(
507+
self, open_client: Client, table_identifier: Identifier, database_name: str, table_name: str,
508+
requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...],
509+
) -> CommitTableResponse:
510+
"""Perform the actual commit logic (get table, update, write metadata, alter/create in HMS).
511+
512+
This method contains the core commit logic, separated from locking concerns.
513+
"""
514+
hive_table: HiveTable | None
515+
current_table: Table | None
516+
try:
517+
hive_table = self._get_hive_table(open_client, database_name, table_name)
518+
current_table = self._convert_hive_into_iceberg(hive_table)
519+
except NoSuchTableError:
520+
hive_table = None
521+
current_table = None
522+
523+
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
524+
if current_table and updated_staged_table.metadata == current_table.metadata:
525+
# no changes, do nothing
526+
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
527+
self._write_metadata(
528+
metadata=updated_staged_table.metadata,
529+
io=updated_staged_table.io,
530+
metadata_path=updated_staged_table.metadata_location,
531+
)
532+
533+
if hive_table and current_table:
534+
# Table exists, update it.
535+
536+
# Note on table properties:
537+
# - Iceberg table properties are stored in both HMS and Iceberg metadata JSON.
538+
# - Updates are reflected in both locations
539+
# - Existing HMS table properties (set by external systems like Hive/Spark) are preserved.
540+
#
541+
# While it is possible to modify HMS table properties through this API, it is not recommended:
542+
# - Mixing HMS-specific properties in Iceberg metadata can cause confusion
543+
# - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific)
544+
# - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg
545+
# (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata,
546+
# and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS)
547+
new_iceberg_properties = _construct_parameters(
548+
metadata_location=updated_staged_table.metadata_location,
549+
previous_metadata_location=current_table.metadata_location,
550+
metadata_properties=updated_staged_table.properties,
551+
)
552+
# Detect properties that were removed from Iceberg metadata
553+
deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys()
554+
555+
# Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties
556+
existing_hms_parameters = dict(hive_table.parameters or {})
557+
for key in deleted_iceberg_properties:
558+
existing_hms_parameters.pop(key, None)
559+
existing_hms_parameters.update(new_iceberg_properties)
560+
hive_table.parameters = existing_hms_parameters
561+
562+
# Update hive's schema and properties
563+
hive_table.sd = _construct_hive_storage_descriptor(
564+
updated_staged_table.schema(),
565+
updated_staged_table.location(),
566+
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
567+
)
568+
open_client.alter_table_with_environment_context(
569+
dbname=database_name,
570+
tbl_name=table_name,
571+
new_tbl=hive_table,
572+
environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
573+
)
574+
else:
575+
# Table does not exist, create it.
576+
hive_table = self._convert_iceberg_into_hive(
577+
StagedTable(
578+
identifier=(database_name, table_name),
579+
metadata=updated_staged_table.metadata,
580+
metadata_location=updated_staged_table.metadata_location,
581+
io=updated_staged_table.io,
582+
catalog=self,
583+
)
584+
)
585+
self._create_hive_table(open_client, hive_table)
586+
587+
return CommitTableResponse(
588+
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
589+
)
590+
502591
def commit_table(
503592
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
504593
) -> CommitTableResponse:
@@ -521,95 +610,23 @@ def commit_table(
521610
# commit to hive
522611
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
523612
with self._client as open_client:
524-
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
613+
if self._lock_enabled:
614+
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
525615

526-
try:
527-
if lock.state != LockState.ACQUIRED:
528-
if lock.state == LockState.WAITING:
529-
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
530-
else:
531-
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")
532-
533-
hive_table: HiveTable | None
534-
current_table: Table | None
535616
try:
536-
hive_table = self._get_hive_table(open_client, database_name, table_name)
537-
current_table = self._convert_hive_into_iceberg(hive_table)
538-
except NoSuchTableError:
539-
hive_table = None
540-
current_table = None
541-
542-
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
543-
if current_table and updated_staged_table.metadata == current_table.metadata:
544-
# no changes, do nothing
545-
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
546-
self._write_metadata(
547-
metadata=updated_staged_table.metadata,
548-
io=updated_staged_table.io,
549-
metadata_path=updated_staged_table.metadata_location,
550-
)
551-
552-
if hive_table and current_table:
553-
# Table exists, update it.
554-
555-
# Note on table properties:
556-
# - Iceberg table properties are stored in both HMS and Iceberg metadata JSON.
557-
# - Updates are reflected in both locations
558-
# - Existing HMS table properties (set by external systems like Hive/Spark) are preserved.
559-
#
560-
# While it is possible to modify HMS table properties through this API, it is not recommended:
561-
# - Mixing HMS-specific properties in Iceberg metadata can cause confusion
562-
# - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific)
563-
# - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg
564-
# (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata,
565-
# and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS)
566-
new_iceberg_properties = _construct_parameters(
567-
metadata_location=updated_staged_table.metadata_location,
568-
previous_metadata_location=current_table.metadata_location,
569-
metadata_properties=updated_staged_table.properties,
570-
)
571-
# Detect properties that were removed from Iceberg metadata
572-
deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys()
573-
574-
# Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties
575-
existing_hms_parameters = dict(hive_table.parameters or {})
576-
for key in deleted_iceberg_properties:
577-
existing_hms_parameters.pop(key, None)
578-
existing_hms_parameters.update(new_iceberg_properties)
579-
hive_table.parameters = existing_hms_parameters
580-
581-
# Update hive's schema and properties
582-
hive_table.sd = _construct_hive_storage_descriptor(
583-
updated_staged_table.schema(),
584-
updated_staged_table.location(),
585-
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
586-
)
587-
open_client.alter_table_with_environment_context(
588-
dbname=database_name,
589-
tbl_name=table_name,
590-
new_tbl=hive_table,
591-
environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
592-
)
593-
else:
594-
# Table does not exist, create it.
595-
hive_table = self._convert_iceberg_into_hive(
596-
StagedTable(
597-
identifier=(database_name, table_name),
598-
metadata=updated_staged_table.metadata,
599-
metadata_location=updated_staged_table.metadata_location,
600-
io=updated_staged_table.io,
601-
catalog=self,
602-
)
603-
)
604-
self._create_hive_table(open_client, hive_table)
605-
except WaitingForLockException as e:
606-
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
607-
finally:
608-
open_client.unlock(UnlockRequest(lockid=lock.lockid))
609-
610-
return CommitTableResponse(
611-
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
612-
)
617+
if lock.state != LockState.ACQUIRED:
618+
if lock.state == LockState.WAITING:
619+
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
620+
else:
621+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")
622+
623+
return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)
624+
except WaitingForLockException as e:
625+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
626+
finally:
627+
open_client.unlock(UnlockRequest(lockid=lock.lockid))
628+
else:
629+
return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)
613630

614631
def load_table(self, identifier: str | Identifier) -> Table:
615632
"""Load the table's metadata and return the table instance.

tests/catalog/test_hive.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
LOCK_CHECK_MAX_WAIT_TIME,
5252
LOCK_CHECK_MIN_WAIT_TIME,
5353
LOCK_CHECK_RETRIES,
54+
LOCK_ENABLED,
5455
HiveCatalog,
5556
_construct_hive_storage_descriptor,
5657
_HiveClient,
@@ -1407,3 +1408,69 @@ def test_create_hive_client_with_kerberos_using_context_manager(
14071408
# closing and re-opening work as expected.
14081409
with client as open_client:
14091410
assert open_client._iprot.trans.isOpen()
1411+
1412+
1413+
def test_lock_enabled_defaults_to_true() -> None:
1414+
"""Verify that lock-enabled defaults to True for backward compatibility."""
1415+
prop = {"uri": HIVE_METASTORE_FAKE_URL}
1416+
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1417+
assert catalog._lock_enabled is True
1418+
1419+
1420+
def test_lock_enabled_can_be_disabled() -> None:
1421+
"""Verify that lock-enabled can be set to false."""
1422+
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
1423+
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1424+
assert catalog._lock_enabled is False
1425+
1426+
1427+
def test_commit_table_skips_locking_when_lock_disabled() -> None:
1428+
"""When lock-enabled is false, commit_table must not call lock, check_lock, or unlock."""
1429+
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
1430+
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1431+
catalog._client = MagicMock()
1432+
1433+
mock_table = MagicMock()
1434+
mock_table.name.return_value = ("default", "my_table")
1435+
1436+
mock_do_commit = MagicMock()
1437+
mock_do_commit.return_value = MagicMock()
1438+
1439+
with patch.object(catalog, "_do_commit", mock_do_commit):
1440+
catalog.commit_table(mock_table, requirements=(), updates=())
1441+
1442+
# The core commit logic should still be called
1443+
mock_do_commit.assert_called_once()
1444+
1445+
# But no locking operations should have been performed
1446+
catalog._client.__enter__().lock.assert_not_called()
1447+
catalog._client.__enter__().check_lock.assert_not_called()
1448+
catalog._client.__enter__().unlock.assert_not_called()
1449+
1450+
1451+
def test_commit_table_uses_locking_when_lock_enabled() -> None:
1452+
"""When lock-enabled is true (default), commit_table must call lock and unlock."""
1453+
lockid = 99999
1454+
prop = {"uri": HIVE_METASTORE_FAKE_URL}
1455+
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
1456+
1457+
mock_client = MagicMock()
1458+
mock_client.__enter__ = MagicMock(return_value=mock_client)
1459+
mock_client.__exit__ = MagicMock(return_value=False)
1460+
mock_client.lock.return_value = LockResponse(lockid=lockid, state=LockState.ACQUIRED)
1461+
catalog._client = mock_client
1462+
1463+
mock_table = MagicMock()
1464+
mock_table.name.return_value = ("default", "my_table")
1465+
1466+
mock_do_commit = MagicMock()
1467+
mock_do_commit.return_value = MagicMock()
1468+
1469+
with patch.object(catalog, "_do_commit", mock_do_commit):
1470+
catalog.commit_table(mock_table, requirements=(), updates=())
1471+
1472+
# Locking operations should have been performed
1473+
mock_client.lock.assert_called_once()
1474+
mock_client.unlock.assert_called_once()
1475+
# The core commit logic should still be called
1476+
mock_do_commit.assert_called_once()

0 commit comments

Comments
 (0)