Skip to content

Commit 4005e66

Browse files
feat: added client side metric instrumentation to read_rows and mutate_rows (#16758)
Migration of googleapis/python-bigtable#1256 to the monorepo Builds off of #16712 to add instrumentation to read_rows and mutate_rows, along with the mutation batcher
1 parent e096127 commit 4005e66

20 files changed

Lines changed: 2931 additions & 442 deletions

packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/_mutate_rows.py

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
import google.cloud.bigtable.data.exceptions as bt_exceptions
2323
import google.cloud.bigtable_v2.types.bigtable as types_pb
2424
from google.cloud.bigtable.data._cross_sync import CrossSync
25-
from google.cloud.bigtable.data._helpers import (
26-
_attempt_timeout_generator,
27-
_retry_exception_factory,
28-
)
25+
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
26+
from google.cloud.bigtable.data._metrics import tracked_retry
2927

3028
# mutate_rows requests are limited to this number of mutations
3129
from google.cloud.bigtable.data.mutations import (
@@ -34,6 +32,7 @@
3432
)
3533

3634
if TYPE_CHECKING:
35+
from google.cloud.bigtable.data._metrics import ActiveOperationMetric
3736
from google.cloud.bigtable.data.mutations import RowMutationEntry
3837

3938
if CrossSync.is_async:
@@ -72,6 +71,8 @@ class _MutateRowsOperationAsync:
7271
operation_timeout: the timeout to use for the entire operation, in seconds.
7372
attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
7473
If not specified, the request will run until operation_timeout is reached.
74+
metric: the metric object representing the active operation
75+
retryable_exceptions: a list of exceptions that should be retried
7576
"""
7677

7778
@CrossSync.convert
@@ -82,6 +83,7 @@ def __init__(
8283
mutation_entries: list["RowMutationEntry"],
8384
operation_timeout: float,
8485
attempt_timeout: float | None,
86+
metric: ActiveOperationMetric,
8587
retryable_exceptions: Sequence[type[Exception]] = (),
8688
):
8789
# check that mutations are within limits
@@ -101,13 +103,12 @@ def __init__(
101103
# Entry level errors
102104
bt_exceptions._MutateRowsIncomplete,
103105
)
104-
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
105-
self._operation = lambda: CrossSync.retry_target(
106-
self._run_attempt,
107-
self.is_retryable,
108-
sleep_generator,
109-
operation_timeout,
110-
exception_factory=_retry_exception_factory,
106+
self._operation = lambda: tracked_retry(
107+
retry_fn=CrossSync.retry_target,
108+
operation=metric,
109+
target=self._run_attempt,
110+
predicate=self.is_retryable,
111+
timeout=operation_timeout,
111112
)
112113
# initialize state
113114
self.timeout_generator = _attempt_timeout_generator(
@@ -116,6 +117,8 @@ def __init__(
116117
self.mutations = [_EntryWithProto(m, m._to_pb()) for m in mutation_entries]
117118
self.remaining_indices = list(range(len(self.mutations)))
118119
self.errors: dict[int, list[Exception]] = {}
120+
# set up metrics
121+
self._operation_metric = metric
119122

120123
@CrossSync.convert
121124
async def start(self):
@@ -125,34 +128,35 @@ async def start(self):
125128
Raises:
126129
MutationsExceptionGroup: if any mutations failed
127130
"""
128-
try:
129-
# trigger mutate_rows
130-
await self._operation()
131-
except Exception as exc:
132-
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
133-
incomplete_indices = self.remaining_indices.copy()
134-
for idx in incomplete_indices:
135-
self._handle_entry_error(idx, exc)
136-
finally:
137-
# raise exception detailing incomplete mutations
138-
all_errors: list[Exception] = []
139-
for idx, exc_list in self.errors.items():
140-
if len(exc_list) == 0:
141-
raise core_exceptions.ClientError(
142-
f"Mutation {idx} failed with no associated errors"
131+
with self._operation_metric:
132+
try:
133+
# trigger mutate_rows
134+
await self._operation()
135+
except Exception as exc:
136+
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
137+
incomplete_indices = self.remaining_indices.copy()
138+
for idx in incomplete_indices:
139+
self._handle_entry_error(idx, exc)
140+
finally:
141+
# raise exception detailing incomplete mutations
142+
all_errors: list[Exception] = []
143+
for idx, exc_list in self.errors.items():
144+
if len(exc_list) == 0:
145+
raise core_exceptions.ClientError(
146+
f"Mutation {idx} failed with no associated errors"
147+
)
148+
elif len(exc_list) == 1:
149+
cause_exc = exc_list[0]
150+
else:
151+
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
152+
entry = self.mutations[idx].entry
153+
all_errors.append(
154+
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
155+
)
156+
if all_errors:
157+
raise bt_exceptions.MutationsExceptionGroup(
158+
all_errors, len(self.mutations)
143159
)
144-
elif len(exc_list) == 1:
145-
cause_exc = exc_list[0]
146-
else:
147-
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
148-
entry = self.mutations[idx].entry
149-
all_errors.append(
150-
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
151-
)
152-
if all_errors:
153-
raise bt_exceptions.MutationsExceptionGroup(
154-
all_errors, len(self.mutations)
155-
)
156160

157161
@CrossSync.convert
158162
async def _run_attempt(self):
@@ -164,6 +168,8 @@ async def _run_attempt(self):
164168
retry after the attempt is complete
165169
GoogleAPICallError: if the gapic rpc fails
166170
"""
171+
# register attempt start
172+
self._operation_metric.start_attempt()
167173
request_entries = [self.mutations[idx].proto for idx in self.remaining_indices]
168174
# track mutations in this request that have not been finalized yet
169175
active_request_indices = {

0 commit comments

Comments
 (0)