Skip to content

Commit d86b9a0

Browse files
committed
DAT-4672: improve logging for commits
1 parent f6d0368 commit d86b9a0

3 files changed

Lines changed: 35 additions & 20 deletions

File tree

pyiceberg/catalog/hive.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import getpass
18-
import logging
1918
import socket
2019
import time
2120
from types import TracebackType
@@ -77,6 +76,7 @@
7776
TableAlreadyExistsError,
7877
WaitingForLockException,
7978
)
79+
from pyiceberg.logger import get_logger
8080
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
8181
from pyiceberg.schema import Schema, SchemaVisitor, visit
8282
from pyiceberg.serializers import FromInputFile
@@ -136,7 +136,8 @@
136136
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
137137
DEFAULT_LOCK_CHECK_RETRIES = 4
138138

139-
logger = logging.getLogger(__name__)
139+
140+
logger = get_logger(__name__)
140141

141142

142143
class _HiveClient:
@@ -464,27 +465,28 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque
464465

465466
def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse:
466467
def _on_wait_for_lock_fail(state: RetryCallState) -> None:
467-
raise WaitingForLockException(
468-
f"Failed after {state.attempt_number} attempts to wait on lock for {database_name}.{table_name}"
469-
)
468+
msg = f"Failed after {state.attempt_number} attempts to wait on lock for `{database_name}.{table_name}`"
469+
logger.debug(msg)
470+
raise WaitingForLockException(msg)
470471

471472
@retry(
472473
retry=retry_if_exception_type(WaitingForLockException),
473474
wait=wait_exponential(multiplier=2, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
474475
stop=stop_after_attempt(self._lock_check_retries),
475-
before=lambda state: logger.warning(f"({state.attempt_number}) Waiting on lock for {database_name}.{table_name}..."),
476+
before=lambda state: logger.debug(f"({state.attempt_number}) Waiting on lock for `{database_name}.{table_name}`..."),
476477
retry_error_callback=_on_wait_for_lock_fail,
477478
)
478479
def _do_wait_for_lock() -> LockResponse:
479480
response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid))
480481
if response.state == LockState.ACQUIRED:
482+
logger.debug("Acquired lock after waiting.")
481483
return response
482484
elif response.state == LockState.WAITING:
483-
msg = f"Waiting on lock for {database_name}.{table_name}..."
484-
logger.warning(msg)
485-
raise WaitingForLockException(msg)
485+
raise WaitingForLockException()
486486
else:
487-
raise CommitFailedException(f"Failed to check lock for {database_name}.{table_name}, state: {response.state}")
487+
raise CommitFailedException(
488+
f"Failed to check lock for {database_name}.{table_name}, lock state: {response.state}"
489+
)
488490

489491
return _do_wait_for_lock()
490492

@@ -511,13 +513,14 @@ def commit_table(
511513
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
512514
with self._client as open_client:
513515
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
514-
515516
try:
516517
if lock.state != LockState.ACQUIRED:
517518
if lock.state == LockState.WAITING:
518519
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
519520
else:
520-
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")
521+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, lock state: {lock.state}")
522+
else:
523+
logger.debug("Acquired lock on initial attempt.")
521524

522525
hive_table: Optional[HiveTable]
523526
current_table: Optional[Table]
@@ -558,7 +561,7 @@ def commit_table(
558561
)
559562
self._create_hive_table(open_client, hive_table)
560563
except WaitingForLockException as e:
561-
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
564+
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, lock state: {lock.state}") from e
562565
finally:
563566
open_client.unlock(UnlockRequest(lockid=lock.lockid))
564567

pyiceberg/logger.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import logging
2+
import sys
3+
4+
5+
def get_logger(name: str) -> logging.Logger:
6+
logger = logging.getLogger(name)
7+
if not logger.hasHandlers():
8+
handler = logging.StreamHandler(sys.stdout)
9+
handler.setLevel(logging.DEBUG)
10+
formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(name)s | %(message)s")
11+
handler.setFormatter(formatter)
12+
logger.addHandler(handler)
13+
return logger

pyiceberg/table/__init__.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
rewrite_not,
6767
)
6868
from pyiceberg.io import FileIO, load_file_io
69+
from pyiceberg.logger import get_logger
6970
from pyiceberg.manifest import (
7071
POSITIONAL_DELETE_SCHEMA,
7172
DataFile,
@@ -253,7 +254,7 @@ class TableProperties:
253254
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
254255

255256

256-
logger = getLogger(__name__)
257+
logger = get_logger(__name__)
257258

258259

259260
class Transaction:
@@ -802,24 +803,22 @@ def commit_transaction(self) -> Table:
802803
def _before_attempt(state: RetryCallState):
803804
if state.attempt_number > 1:
804805
namespace, table = self._table.name()
805-
logger.debug(f"Refreshing metadata and operations for {namespace}.{table} and retrying transaction commit...")
806+
logger.debug(
807+
f"({state.attempt_number - 1}) Refreshing metadata and operations for `{namespace}.{table}` and retrying transaction commit..."
808+
)
806809
self._table.refresh()
807810
self._updates, self._requirements = (), ()
808811
for op in self._snapshot_operations:
809812
op._cleanup_commit_failure()
810813
self._apply(*op._commit())
811814
logger.debug(f"Committing transaction...")
812815

813-
def _after_error(state: RetryCallState):
814-
logger.debug(f'Encountered CommitFailedException: "{state.outcome.exception()}"...')
815-
816816
@wraps(self.commit_transaction)
817817
@retry(
818818
wait=wait_random_exponential(min=min_wait_ms / 1000, max=max_wait_ms / 1000),
819819
stop=stop_after_attempt(num_retries),
820820
retry=retry_if_exception_type(CommitFailedException),
821821
before=_before_attempt,
822-
after=_after_error,
823822
reraise=True,
824823
)
825824
def _commit_transaction():
@@ -1502,7 +1501,7 @@ def __init__(
15021501
snapshot_id: Optional[int] = None,
15031502
options: Properties = EMPTY_DICT,
15041503
limit: Optional[int] = None,
1505-
bound_filter: Optional[BooleanExpression] = None
1504+
bound_filter: Optional[BooleanExpression] = None,
15061505
):
15071506
self.table_metadata = table_metadata
15081507
self.io = io

0 commit comments

Comments
 (0)