-
Notifications
You must be signed in to change notification settings - Fork 471
Hive locking #405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hive locking #405
Changes from 4 commits
367a9b1
ff0e8a0
8eac35b
a49f985
b9405a6
ce070e2
7ff3950
6e992ba
d2c023f
7f6dc46
2b67562
f43ebcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| import getpass | ||
| import socket | ||
| import time | ||
| from types import TracebackType | ||
| from typing import ( | ||
|
|
@@ -34,10 +35,17 @@ | |
| AlreadyExistsException, | ||
| FieldSchema, | ||
| InvalidOperationException, | ||
| LockComponent, | ||
| LockLevel, | ||
| LockRequest, | ||
| LockResponse, | ||
| LockState, | ||
| LockType, | ||
| MetaException, | ||
| NoSuchObjectException, | ||
| SerDeInfo, | ||
| StorageDescriptor, | ||
| UnlockRequest, | ||
| ) | ||
| from hive_metastore.ttypes import Database as HiveDatabase | ||
| from hive_metastore.ttypes import Table as HiveTable | ||
|
|
@@ -56,6 +64,7 @@ | |
| PropertiesUpdateSummary, | ||
| ) | ||
| from pyiceberg.exceptions import ( | ||
| CommitFailedException, | ||
| NamespaceAlreadyExistsError, | ||
| NamespaceNotEmptyError, | ||
| NoSuchIcebergTableError, | ||
|
|
@@ -67,7 +76,7 @@ | |
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec | ||
| from pyiceberg.schema import Schema, SchemaVisitor, visit | ||
| from pyiceberg.serializers import FromInputFile | ||
| from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata | ||
| from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata | ||
| from pyiceberg.table.metadata import new_table_metadata | ||
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder | ||
| from pyiceberg.typedef import EMPTY_DICT | ||
|
|
@@ -155,7 +164,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) | |
| PROP_TABLE_TYPE = "table_type" | ||
| PROP_METADATA_LOCATION = "metadata_location" | ||
| PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location" | ||
| DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT} | ||
| DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'} | ||
|
Fokko marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
| def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]: | ||
|
|
@@ -331,6 +340,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: | |
| """ | ||
| raise NotImplementedError | ||
|
|
||
| def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest: | ||
| lock_component: LockComponent = LockComponent( | ||
| level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True | ||
| ) | ||
|
|
||
| lock_request: LockRequest = LockRequest(component=[lock_component], user=getpass.getuser(), hostname=socket.gethostname()) | ||
|
|
||
| return lock_request | ||
|
|
||
| def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: | ||
| """Update the table. | ||
|
|
||
|
|
@@ -363,15 +381,23 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons | |
| self._write_metadata(updated_metadata, current_table.io, new_metadata_location) | ||
|
|
||
| # commit to hive | ||
| try: | ||
| with self._client as open_client: | ||
| # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 | ||
| with self._client as open_client: | ||
| lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) | ||
|
|
||
| try: | ||
| if lock.state != LockState.ACQUIRED: | ||
| raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") | ||
|
|
||
| tbl = open_client.get_table(dbname=database_name, tbl_name=table_name) | ||
| tbl.parameters = _construct_parameters( | ||
| metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location | ||
| ) | ||
| open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl) | ||
| except NoSuchObjectException as e: | ||
| raise NoSuchTableError(f"Table does not exist: {table_name}") from e | ||
| except NoSuchObjectException as e: | ||
| raise NoSuchTableError(f"Table does not exist: {table_name}") from e | ||
| finally: | ||
| open_client.unlock(UnlockRequest(lockid=lock.lockid)) | ||
|
Comment on lines
+397
to
+400
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't consider this a blocker, but it'd be nice to have a test to verify that in case of any exception thrown we do actually perform the unlock.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a really hard one because it requires that we've already acquired the lock, but then something happened (timeout?) and the the lock state changed. I'd have to think though how we could replicate that scenario. |
||
|
|
||
| return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.