Skip to content

Commit 636af26

Browse files
feat(bigtable): add client side metric instrumentation to basic rpcs (#16712)
Migration of googleapis/python-bigtable#1188 to the monorepo This PR builds off of googleapis/python-bigtable#1187 to add instrumentation to basic data client rpcs (check_and_mutate, read_modify_write, sample_row_keys, mutate_row) Metrics are not currently being exported anywhere, just collected and dropped. A future PR will add a GCP exporter to the system --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent e9c52b1 commit 636af26

14 files changed

Lines changed: 2361 additions & 598 deletions

File tree

packages/google-cloud-bigtable/google/cloud/bigtable/data/_async/client.py

Lines changed: 84 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@
6363
_validate_timeouts,
6464
_WarmedInstanceKey,
6565
)
66-
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
66+
from google.cloud.bigtable.data._metrics import (
67+
BigtableClientSideMetricsController,
68+
OperationType,
69+
tracked_retry,
70+
)
6771
from google.cloud.bigtable.data.exceptions import (
6872
FailedQueryShardError,
6973
ShardedReadRowsExceptionGroup,
@@ -1431,26 +1435,28 @@ async def sample_row_keys(
14311435
retryable_excs = _get_retryable_errors(retryable_errors, self)
14321436
predicate = retries.if_exception_type(*retryable_excs)
14331437

1434-
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
1435-
1436-
@CrossSync.convert
1437-
async def execute_rpc():
1438-
results = await self.client._gapic_client.sample_row_keys(
1439-
request=SampleRowKeysRequest(
1440-
app_profile_id=self.app_profile_id, **self._request_path
1441-
),
1442-
timeout=next(attempt_timeout_gen),
1443-
retry=None,
1438+
with self._metrics.create_operation(
1439+
OperationType.SAMPLE_ROW_KEYS
1440+
) as operation_metric:
1441+
1442+
@CrossSync.convert
1443+
async def execute_rpc():
1444+
results = await self.client._gapic_client.sample_row_keys(
1445+
request=SampleRowKeysRequest(
1446+
app_profile_id=self.app_profile_id, **self._request_path
1447+
),
1448+
timeout=next(attempt_timeout_gen),
1449+
retry=None,
1450+
)
1451+
return [(s.row_key, s.offset_bytes) async for s in results]
1452+
1453+
return await tracked_retry(
1454+
retry_fn=CrossSync.retry_target,
1455+
operation=operation_metric,
1456+
target=execute_rpc,
1457+
predicate=predicate,
1458+
timeout=operation_timeout,
14441459
)
1445-
return [(s.row_key, s.offset_bytes) async for s in results]
1446-
1447-
return await CrossSync.retry_target(
1448-
execute_rpc,
1449-
predicate,
1450-
sleep_generator,
1451-
operation_timeout,
1452-
exception_factory=_retry_exception_factory,
1453-
)
14541460

14551461
@CrossSync.convert(replace_symbols={"MutationsBatcherAsync": "MutationsBatcher"})
14561462
def mutations_batcher(
@@ -1561,28 +1567,29 @@ async def mutate_row(
15611567
# mutations should not be retried
15621568
predicate = retries.if_exception_type()
15631569

1564-
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
1565-
1566-
target = partial(
1567-
self.client._gapic_client.mutate_row,
1568-
request=MutateRowRequest(
1569-
row_key=row_key.encode("utf-8")
1570-
if isinstance(row_key, str)
1571-
else row_key,
1572-
mutations=[mutation._to_pb() for mutation in mutations_list],
1573-
app_profile_id=self.app_profile_id,
1574-
**self._request_path,
1575-
),
1576-
timeout=attempt_timeout,
1577-
retry=None,
1578-
)
1579-
return await CrossSync.retry_target(
1580-
target,
1581-
predicate,
1582-
sleep_generator,
1583-
operation_timeout,
1584-
exception_factory=_retry_exception_factory,
1585-
)
1570+
with self._metrics.create_operation(
1571+
OperationType.MUTATE_ROW
1572+
) as operation_metric:
1573+
target = partial(
1574+
self.client._gapic_client.mutate_row,
1575+
request=MutateRowRequest(
1576+
row_key=row_key.encode("utf-8")
1577+
if isinstance(row_key, str)
1578+
else row_key,
1579+
mutations=[mutation._to_pb() for mutation in mutations_list],
1580+
app_profile_id=self.app_profile_id,
1581+
**self._request_path,
1582+
),
1583+
timeout=attempt_timeout,
1584+
retry=None,
1585+
)
1586+
return await tracked_retry(
1587+
retry_fn=CrossSync.retry_target,
1588+
operation=operation_metric,
1589+
target=target,
1590+
predicate=predicate,
1591+
timeout=operation_timeout,
1592+
)
15861593

15871594
@CrossSync.convert
15881595
async def bulk_mutate_rows(
@@ -1693,21 +1700,25 @@ async def check_and_mutate_row(
16931700
):
16941701
false_case_mutations = [false_case_mutations]
16951702
false_case_list = [m._to_pb() for m in false_case_mutations or []]
1696-
result = await self.client._gapic_client.check_and_mutate_row(
1697-
request=CheckAndMutateRowRequest(
1698-
true_mutations=true_case_list,
1699-
false_mutations=false_case_list,
1700-
predicate_filter=predicate._to_pb() if predicate is not None else None,
1701-
row_key=row_key.encode("utf-8")
1702-
if isinstance(row_key, str)
1703-
else row_key,
1704-
app_profile_id=self.app_profile_id,
1705-
**self._request_path,
1706-
),
1707-
timeout=operation_timeout,
1708-
retry=None,
1709-
)
1710-
return result.predicate_matched
1703+
1704+
with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE):
1705+
result = await self.client._gapic_client.check_and_mutate_row(
1706+
request=CheckAndMutateRowRequest(
1707+
true_mutations=true_case_list,
1708+
false_mutations=false_case_list,
1709+
predicate_filter=predicate._to_pb()
1710+
if predicate is not None
1711+
else None,
1712+
row_key=row_key.encode("utf-8")
1713+
if isinstance(row_key, str)
1714+
else row_key,
1715+
app_profile_id=self.app_profile_id,
1716+
**self._request_path,
1717+
),
1718+
timeout=operation_timeout,
1719+
retry=None,
1720+
)
1721+
return result.predicate_matched
17111722

17121723
@CrossSync.convert
17131724
async def read_modify_write_row(
@@ -1747,20 +1758,22 @@ async def read_modify_write_row(
17471758
rules = [rules]
17481759
if not rules:
17491760
raise ValueError("rules must contain at least one item")
1750-
result = await self.client._gapic_client.read_modify_write_row(
1751-
request=ReadModifyWriteRowRequest(
1752-
rules=[rule._to_pb() for rule in rules],
1753-
row_key=row_key.encode("utf-8")
1754-
if isinstance(row_key, str)
1755-
else row_key,
1756-
app_profile_id=self.app_profile_id,
1757-
**self._request_path,
1758-
),
1759-
timeout=operation_timeout,
1760-
retry=None,
1761-
)
1762-
# construct Row from result
1763-
return Row._from_pb(result.row)
1761+
1762+
with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE):
1763+
result = await self.client._gapic_client.read_modify_write_row(
1764+
request=ReadModifyWriteRowRequest(
1765+
rules=[rule._to_pb() for rule in rules],
1766+
row_key=row_key.encode("utf-8")
1767+
if isinstance(row_key, str)
1768+
else row_key,
1769+
app_profile_id=self.app_profile_id,
1770+
**self._request_path,
1771+
),
1772+
timeout=operation_timeout,
1773+
retry=None,
1774+
)
1775+
# construct Row from result
1776+
return Row._from_pb(result.row)
17641777

17651778
@CrossSync.convert
17661779
async def close(self):

packages/google-cloud-bigtable/google/cloud/bigtable/data/_helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def _retry_exception_factory(
105105
tuple[Exception, Exception|None]:
106106
tuple of the exception to raise, and a cause exception if applicable
107107
"""
108+
exc_list = exc_list.copy()
108109
if reason == RetryFailureReason.TIMEOUT:
109110
timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else ""
110111
# if failed due to timeout, raise deadline exceeded as primary exception

packages/google-cloud-bigtable/google/cloud/bigtable/data/_sync_autogen/client.py

Lines changed: 80 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@
5757
_validate_timeouts,
5858
_WarmedInstanceKey,
5959
)
60-
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
60+
from google.cloud.bigtable.data._metrics import (
61+
BigtableClientSideMetricsController,
62+
OperationType,
63+
tracked_retry,
64+
)
6165
from google.cloud.bigtable.data._sync_autogen._swappable_channel import (
6266
SwappableChannel as SwappableChannelType,
6367
)
@@ -1176,25 +1180,27 @@ def sample_row_keys(
11761180
)
11771181
retryable_excs = _get_retryable_errors(retryable_errors, self)
11781182
predicate = retries.if_exception_type(*retryable_excs)
1179-
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
1180-
1181-
def execute_rpc():
1182-
results = self.client._gapic_client.sample_row_keys(
1183-
request=SampleRowKeysRequest(
1184-
app_profile_id=self.app_profile_id, **self._request_path
1185-
),
1186-
timeout=next(attempt_timeout_gen),
1187-
retry=None,
1183+
with self._metrics.create_operation(
1184+
OperationType.SAMPLE_ROW_KEYS
1185+
) as operation_metric:
1186+
1187+
def execute_rpc():
1188+
results = self.client._gapic_client.sample_row_keys(
1189+
request=SampleRowKeysRequest(
1190+
app_profile_id=self.app_profile_id, **self._request_path
1191+
),
1192+
timeout=next(attempt_timeout_gen),
1193+
retry=None,
1194+
)
1195+
return [(s.row_key, s.offset_bytes) for s in results]
1196+
1197+
return tracked_retry(
1198+
retry_fn=CrossSync._Sync_Impl.retry_target,
1199+
operation=operation_metric,
1200+
target=execute_rpc,
1201+
predicate=predicate,
1202+
timeout=operation_timeout,
11881203
)
1189-
return [(s.row_key, s.offset_bytes) for s in results]
1190-
1191-
return CrossSync._Sync_Impl.retry_target(
1192-
execute_rpc,
1193-
predicate,
1194-
sleep_generator,
1195-
operation_timeout,
1196-
exception_factory=_retry_exception_factory,
1197-
)
11981204

11991205
def mutations_batcher(
12001206
self,
@@ -1294,27 +1300,29 @@ def mutate_row(
12941300
)
12951301
else:
12961302
predicate = retries.if_exception_type()
1297-
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
1298-
target = partial(
1299-
self.client._gapic_client.mutate_row,
1300-
request=MutateRowRequest(
1301-
row_key=row_key.encode("utf-8")
1302-
if isinstance(row_key, str)
1303-
else row_key,
1304-
mutations=[mutation._to_pb() for mutation in mutations_list],
1305-
app_profile_id=self.app_profile_id,
1306-
**self._request_path,
1307-
),
1308-
timeout=attempt_timeout,
1309-
retry=None,
1310-
)
1311-
return CrossSync._Sync_Impl.retry_target(
1312-
target,
1313-
predicate,
1314-
sleep_generator,
1315-
operation_timeout,
1316-
exception_factory=_retry_exception_factory,
1317-
)
1303+
with self._metrics.create_operation(
1304+
OperationType.MUTATE_ROW
1305+
) as operation_metric:
1306+
target = partial(
1307+
self.client._gapic_client.mutate_row,
1308+
request=MutateRowRequest(
1309+
row_key=row_key.encode("utf-8")
1310+
if isinstance(row_key, str)
1311+
else row_key,
1312+
mutations=[mutation._to_pb() for mutation in mutations_list],
1313+
app_profile_id=self.app_profile_id,
1314+
**self._request_path,
1315+
),
1316+
timeout=attempt_timeout,
1317+
retry=None,
1318+
)
1319+
return tracked_retry(
1320+
retry_fn=CrossSync._Sync_Impl.retry_target,
1321+
operation=operation_metric,
1322+
target=target,
1323+
predicate=predicate,
1324+
timeout=operation_timeout,
1325+
)
13181326

13191327
def bulk_mutate_rows(
13201328
self,
@@ -1418,21 +1426,24 @@ def check_and_mutate_row(
14181426
):
14191427
false_case_mutations = [false_case_mutations]
14201428
false_case_list = [m._to_pb() for m in false_case_mutations or []]
1421-
result = self.client._gapic_client.check_and_mutate_row(
1422-
request=CheckAndMutateRowRequest(
1423-
true_mutations=true_case_list,
1424-
false_mutations=false_case_list,
1425-
predicate_filter=predicate._to_pb() if predicate is not None else None,
1426-
row_key=row_key.encode("utf-8")
1427-
if isinstance(row_key, str)
1428-
else row_key,
1429-
app_profile_id=self.app_profile_id,
1430-
**self._request_path,
1431-
),
1432-
timeout=operation_timeout,
1433-
retry=None,
1434-
)
1435-
return result.predicate_matched
1429+
with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE):
1430+
result = self.client._gapic_client.check_and_mutate_row(
1431+
request=CheckAndMutateRowRequest(
1432+
true_mutations=true_case_list,
1433+
false_mutations=false_case_list,
1434+
predicate_filter=predicate._to_pb()
1435+
if predicate is not None
1436+
else None,
1437+
row_key=row_key.encode("utf-8")
1438+
if isinstance(row_key, str)
1439+
else row_key,
1440+
app_profile_id=self.app_profile_id,
1441+
**self._request_path,
1442+
),
1443+
timeout=operation_timeout,
1444+
retry=None,
1445+
)
1446+
return result.predicate_matched
14361447

14371448
def read_modify_write_row(
14381449
self,
@@ -1469,19 +1480,20 @@ def read_modify_write_row(
14691480
rules = [rules]
14701481
if not rules:
14711482
raise ValueError("rules must contain at least one item")
1472-
result = self.client._gapic_client.read_modify_write_row(
1473-
request=ReadModifyWriteRowRequest(
1474-
rules=[rule._to_pb() for rule in rules],
1475-
row_key=row_key.encode("utf-8")
1476-
if isinstance(row_key, str)
1477-
else row_key,
1478-
app_profile_id=self.app_profile_id,
1479-
**self._request_path,
1480-
),
1481-
timeout=operation_timeout,
1482-
retry=None,
1483-
)
1484-
return Row._from_pb(result.row)
1483+
with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE):
1484+
result = self.client._gapic_client.read_modify_write_row(
1485+
request=ReadModifyWriteRowRequest(
1486+
rules=[rule._to_pb() for rule in rules],
1487+
row_key=row_key.encode("utf-8")
1488+
if isinstance(row_key, str)
1489+
else row_key,
1490+
app_profile_id=self.app_profile_id,
1491+
**self._request_path,
1492+
),
1493+
timeout=operation_timeout,
1494+
retry=None,
1495+
)
1496+
return Row._from_pb(result.row)
14851497

14861498
def close(self):
14871499
"""Called to close the Table instance and release any resources held by it."""

0 commit comments

Comments
 (0)