-
Notifications
You must be signed in to change notification settings - Fork 466
Added a flag which allows disabling locks with Hive catalog #3121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
0677f8e
7bbd0c1
3f18368
14ed056
4c58d42
984592a
3b09ecd
54610a2
b955e94
77121d9
fc0586b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -499,6 +499,109 @@ def _do_wait_for_lock() -> LockResponse: | |
|
|
||
| return _do_wait_for_lock() | ||
|
|
||
| def _do_commit( | ||
| self, | ||
| open_client: Client, | ||
| table_identifier: Identifier, | ||
| database_name: str, | ||
| table_name: str, | ||
| requirements: tuple[TableRequirement, ...], | ||
| updates: tuple[TableUpdate, ...], | ||
| ) -> CommitTableResponse: | ||
| """Perform the actual commit logic (get table, update, write metadata, alter/create in HMS). | ||
|
|
||
| This method contains the core commit logic, separated from locking concerns. | ||
| """ | ||
| hive_table: HiveTable | None | ||
| current_table: Table | None | ||
| try: | ||
| hive_table = self._get_hive_table(open_client, database_name, table_name) | ||
| current_table = self._convert_hive_into_iceberg(hive_table) | ||
| except NoSuchTableError: | ||
| hive_table = None | ||
| current_table = None | ||
|
|
||
| updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) | ||
| if current_table and updated_staged_table.metadata == current_table.metadata: | ||
| # no changes, do nothing | ||
| return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) | ||
| self._write_metadata( | ||
| metadata=updated_staged_table.metadata, | ||
| io=updated_staged_table.io, | ||
| metadata_path=updated_staged_table.metadata_location, | ||
| ) | ||
|
|
||
| if hive_table and current_table: | ||
| # Table exists, update it. | ||
|
|
||
| # Note on table properties: | ||
| # - Iceberg table properties are stored in both HMS and Iceberg metadata JSON. | ||
| # - Updates are reflected in both locations | ||
| # - Existing HMS table properties (set by external systems like Hive/Spark) are preserved. | ||
| # | ||
| # While it is possible to modify HMS table properties through this API, it is not recommended: | ||
| # - Mixing HMS-specific properties in Iceberg metadata can cause confusion | ||
| # - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific) | ||
| # - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg | ||
| # (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata, | ||
| # and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS) | ||
| new_iceberg_properties = _construct_parameters( | ||
| metadata_location=updated_staged_table.metadata_location, | ||
| previous_metadata_location=current_table.metadata_location, | ||
| metadata_properties=updated_staged_table.properties, | ||
| ) | ||
| # Detect properties that were removed from Iceberg metadata | ||
| deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys() | ||
|
|
||
| # Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties | ||
| existing_hms_parameters = dict(hive_table.parameters or {}) | ||
| for key in deleted_iceberg_properties: | ||
| existing_hms_parameters.pop(key, None) | ||
| existing_hms_parameters.update(new_iceberg_properties) | ||
| hive_table.parameters = existing_hms_parameters | ||
|
|
||
| # Update hive's schema and properties | ||
| hive_table.sd = _construct_hive_storage_descriptor( | ||
| updated_staged_table.schema(), | ||
| updated_staged_table.location(), | ||
| property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), | ||
| ) | ||
| open_client.alter_table_with_environment_context( | ||
| dbname=database_name, | ||
| tbl_name=table_name, | ||
| new_tbl=hive_table, | ||
| environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. I extended my PR. Hopefully I got that right, as the flow is pretty complex. I pushed the code to our fork to see if it hasn't affected our production jobs. |
||
| ) | ||
| else: | ||
| # Table does not exist, create it. | ||
| hive_table = self._convert_iceberg_into_hive( | ||
| StagedTable( | ||
| identifier=(database_name, table_name), | ||
| metadata=updated_staged_table.metadata, | ||
| metadata_location=updated_staged_table.metadata_location, | ||
| io=updated_staged_table.io, | ||
| catalog=self, | ||
| ) | ||
| ) | ||
| self._create_hive_table(open_client, hive_table) | ||
|
|
||
| return CommitTableResponse( | ||
| metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool: | ||
| """Determine whether HMS locking is enabled for a commit. | ||
|
|
||
| Matches the Java implementation in HiveTableOperations: checks the table property first, | ||
| then falls back to catalog properties, then defaults to True. | ||
| """ | ||
| if TableProperties.HIVE_LOCK_ENABLED in table_properties: | ||
| return property_as_bool( | ||
| table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT | ||
| ) | ||
| return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT) | ||
|
|
||
| def commit_table( | ||
| self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] | ||
| ) -> CommitTableResponse: | ||
|
|
@@ -518,98 +621,27 @@ def commit_table( | |
| """ | ||
| table_identifier = table.name() | ||
| database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) | ||
| lock_enabled = self._hive_lock_enabled(table.properties, self.properties) | ||
| # commit to hive | ||
| # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 | ||
| with self._client as open_client: | ||
| lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) | ||
| if lock_enabled: | ||
| lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) | ||
|
|
||
| try: | ||
| if lock.state != LockState.ACQUIRED: | ||
| if lock.state == LockState.WAITING: | ||
| self._wait_for_lock(database_name, table_name, lock.lockid, open_client) | ||
| else: | ||
| raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") | ||
|
|
||
| hive_table: HiveTable | None | ||
| current_table: Table | None | ||
| try: | ||
| hive_table = self._get_hive_table(open_client, database_name, table_name) | ||
| current_table = self._convert_hive_into_iceberg(hive_table) | ||
| except NoSuchTableError: | ||
| hive_table = None | ||
| current_table = None | ||
|
|
||
| updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) | ||
| if current_table and updated_staged_table.metadata == current_table.metadata: | ||
| # no changes, do nothing | ||
| return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) | ||
| self._write_metadata( | ||
| metadata=updated_staged_table.metadata, | ||
| io=updated_staged_table.io, | ||
| metadata_path=updated_staged_table.metadata_location, | ||
| ) | ||
|
|
||
| if hive_table and current_table: | ||
| # Table exists, update it. | ||
|
|
||
| # Note on table properties: | ||
| # - Iceberg table properties are stored in both HMS and Iceberg metadata JSON. | ||
| # - Updates are reflected in both locations | ||
| # - Existing HMS table properties (set by external systems like Hive/Spark) are preserved. | ||
| # | ||
| # While it is possible to modify HMS table properties through this API, it is not recommended: | ||
| # - Mixing HMS-specific properties in Iceberg metadata can cause confusion | ||
| # - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific) | ||
| # - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg | ||
| # (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata, | ||
| # and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS) | ||
| new_iceberg_properties = _construct_parameters( | ||
| metadata_location=updated_staged_table.metadata_location, | ||
| previous_metadata_location=current_table.metadata_location, | ||
| metadata_properties=updated_staged_table.properties, | ||
| ) | ||
| # Detect properties that were removed from Iceberg metadata | ||
| deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys() | ||
|
|
||
| # Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties | ||
| existing_hms_parameters = dict(hive_table.parameters or {}) | ||
| for key in deleted_iceberg_properties: | ||
| existing_hms_parameters.pop(key, None) | ||
| existing_hms_parameters.update(new_iceberg_properties) | ||
| hive_table.parameters = existing_hms_parameters | ||
|
|
||
| # Update hive's schema and properties | ||
| hive_table.sd = _construct_hive_storage_descriptor( | ||
| updated_staged_table.schema(), | ||
| updated_staged_table.location(), | ||
| property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), | ||
| ) | ||
| open_client.alter_table_with_environment_context( | ||
| dbname=database_name, | ||
| tbl_name=table_name, | ||
| new_tbl=hive_table, | ||
| environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}), | ||
| ) | ||
| else: | ||
| # Table does not exist, create it. | ||
| hive_table = self._convert_iceberg_into_hive( | ||
| StagedTable( | ||
| identifier=(database_name, table_name), | ||
| metadata=updated_staged_table.metadata, | ||
| metadata_location=updated_staged_table.metadata_location, | ||
| io=updated_staged_table.io, | ||
| catalog=self, | ||
| ) | ||
| ) | ||
| self._create_hive_table(open_client, hive_table) | ||
| except WaitingForLockException as e: | ||
| raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e | ||
| finally: | ||
| open_client.unlock(UnlockRequest(lockid=lock.lockid)) | ||
|
|
||
| return CommitTableResponse( | ||
| metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location | ||
| ) | ||
| if lock.state != LockState.ACQUIRED: | ||
| if lock.state == LockState.WAITING: | ||
| self._wait_for_lock(database_name, table_name, lock.lockid, open_client) | ||
| else: | ||
| raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") | ||
|
|
||
| return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) | ||
| except WaitingForLockException as e: | ||
| raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e | ||
| finally: | ||
| open_client.unlock(UnlockRequest(lockid=lock.lockid)) | ||
| else: | ||
| return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) | ||
|
|
||
| def load_table(self, identifier: str | Identifier) -> Table: | ||
| """Load the table's metadata and return the table instance. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you move this below the
commit_tablefunction so that github diff can (hopefully) render it betterThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still looks like we have a lot of noise here :(