Skip to content

Commit 5aaeb9f

Browse files
committed
Feat: Added Metric Interceptor integration with Attempt metrics
1 parent c21e2d1 commit 5aaeb9f

File tree

18 files changed

+565
-59
lines changed

18 files changed

+565
-59
lines changed

google/cloud/spanner_v1/client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,27 @@
4848
from google.cloud.spanner_v1._helpers import _merge_query_options
4949
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
5050
from google.cloud.spanner_v1.instance import Instance
51+
from google.cloud.spanner_v1.metrics.constants import ENABLE_SPANNER_METRICS_ENV_VAR
52+
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import (
53+
SpannerMetricsTracerFactory,
54+
)
55+
from google.cloud.spanner_v1.metrics.metrics_exporter import (
56+
CloudMonitoringMetricsExporter,
57+
)
58+
59+
try:
60+
from opentelemetry import metrics
61+
from opentelemetry.sdk.metrics import MeterProvider
62+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
63+
64+
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True
65+
except ImportError: # pragma: NO COVER
66+
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
67+
5168

5269
_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
5370
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
71+
ENABLE_BUILTIN_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
5472
_EMULATOR_HOST_HTTP_SCHEME = (
5573
"%s contains a http scheme. When used with a scheme it may cause gRPC's "
5674
"DNS resolver to endlessly attempt to resolve. %s is intended to be used "
@@ -73,6 +91,10 @@ def _get_spanner_optimizer_statistics_package():
7391
return os.getenv(OPTIMIZER_STATISITCS_PACKAGE_ENV_VAR, "")
7492

7593

94+
def _get_spanner_enable_builtin_metrics():
95+
return os.getenv(ENABLE_SPANNER_METRICS_ENV_VAR) == "true"
96+
97+
7698
class Client(ClientWithProject):
7799
"""Client for interacting with Cloud Spanner API.
78100
@@ -196,6 +218,21 @@ def __init__(
196218
):
197219
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)
198220

221+
# Check flag to enable Spanner builtin metrics
222+
if (
223+
_get_spanner_enable_builtin_metrics()
224+
and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED
225+
):
226+
meter_provider = metrics.NoOpMeterProvider()
227+
if not _get_spanner_emulator_host():
228+
meter_provider = MeterProvider(
229+
metric_readers=[
230+
PeriodicExportingMetricReader(CloudMonitoringMetricsExporter())
231+
]
232+
)
233+
metrics.set_meter_provider(meter_provider)
234+
SpannerMetricsTracerFactory()
235+
199236
self._route_to_leader_enabled = route_to_leader_enabled
200237
self._directed_read_options = directed_read_options
201238
self._observability_options = observability_options

google/cloud/spanner_v1/metrics/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
BUILT_IN_METRICS_METER_NAME = "gax-python"
1616
NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client"
1717
SPANNER_RESOURCE_TYPE = "spanner_instance_client"
18+
SPANNER_SERVICE_NAME = "spanner-python"
19+
GOOGLE_CLOUD_RESOURCE_KEY = "google-cloud-resource-prefix"
20+
GOOGLE_CLOUD_REGION_KEY = "cloud.region"
21+
GOOGLE_CLOUD_REGION_GLOBAL = "global"
22+
SPANNER_METHOD_PREFIX = "/google.spanner.v1."
23+
ENABLE_SPANNER_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
1824

1925
# Monitored resource labels
2026
MONITORED_RES_LABEL_KEY_PROJECT = "project_id"

google/cloud/spanner_v1/metrics/metrics_exporter.py

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
)
2424

2525
import logging
26-
from typing import Optional, List, Union, NoReturn, Tuple
26+
from typing import Optional, List, Union, NoReturn, Tuple, Dict
2727

2828
import google.auth
2929
from google.api.distribution_pb2 import ( # pylint: disable=no-name-in-module
@@ -39,10 +39,6 @@
3939
MonitoredResource,
4040
)
4141

42-
from google.cloud.monitoring_v3.services.metric_service.transports.grpc import (
43-
MetricServiceGrpcTransport,
44-
)
45-
4642
# pylint: disable=no-name-in-module
4743
from google.protobuf.timestamp_pb2 import Timestamp
4844
from google.cloud.spanner_v1.gapic_version import __version__
@@ -60,12 +56,9 @@
6056
Sum,
6157
)
6258
from opentelemetry.sdk.resources import Resource
63-
64-
HAS_OPENTELEMETRY_INSTALLED = True
65-
except ImportError: # pragma: NO COVER
66-
HAS_OPENTELEMETRY_INSTALLED = False
67-
68-
try:
59+
from google.cloud.monitoring_v3.services.metric_service.transports.grpc import (
60+
MetricServiceGrpcTransport,
61+
)
6962
from google.cloud.monitoring_v3 import (
7063
CreateTimeSeriesRequest,
7164
MetricServiceClient,
@@ -75,13 +68,10 @@
7568
TypedValue,
7669
)
7770

78-
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True
79-
except ImportError:
80-
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
81-
82-
HAS_DEPENDENCIES_INSTALLED = (
83-
HAS_OPENTELEMETRY_INSTALLED and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED
84-
)
71+
HAS_OPENTELEMETRY_INSTALLED = True
72+
except ImportError: # pragma: NO COVER
73+
HAS_OPENTELEMETRY_INSTALLED = False
74+
MetricExporter = object
8575

8676
logger = logging.getLogger(__name__)
8777
MAX_BATCH_WRITE = 200
@@ -120,7 +110,7 @@ class CloudMonitoringMetricsExporter(MetricExporter):
120110
def __init__(
121111
self,
122112
project_id: Optional[str] = None,
123-
client: Optional[MetricServiceClient] = None,
113+
client: Optional["MetricServiceClient"] = None,
124114
):
125115
"""Initialize a custom exporter to send metrics for the Spanner Service Metrics."""
126116
# Default preferred_temporality is all CUMULATIVE so need to customize
@@ -144,7 +134,7 @@ def __init__(
144134
self.project_id = project_id
145135
self.project_name = self.client.common_project_path(self.project_id)
146136

147-
def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None:
137+
def _batch_write(self, series: List["TimeSeries"], timeout_millis: float) -> None:
148138
"""Cloud Monitoring allows writing up to 200 time series at once.
149139
150140
:param series: ProtoBuf TimeSeries
@@ -166,8 +156,8 @@ def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None:
166156

167157
@staticmethod
168158
def _resource_to_monitored_resource_pb(
169-
resource: Resource, labels: any
170-
) -> MonitoredResource:
159+
resource: "Resource", labels: Dict[str, str]
160+
) -> "MonitoredResource":
171161
"""
172162
Convert the resource to a Google Cloud Monitoring monitored resource.
173163
@@ -182,7 +172,7 @@ def _resource_to_monitored_resource_pb(
182172
return monitored_resource
183173

184174
@staticmethod
185-
def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind:
175+
def _to_metric_kind(metric: "Metric") -> MetricDescriptor.MetricKind:
186176
"""
187177
Convert the metric to a Google Cloud Monitoring metric kind.
188178
@@ -210,7 +200,7 @@ def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind:
210200

211201
@staticmethod
212202
def _extract_metric_labels(
213-
data_point: Union[NumberDataPoint, HistogramDataPoint]
203+
data_point: Union["NumberDataPoint", "HistogramDataPoint"]
214204
) -> Tuple[dict, dict]:
215205
"""
216206
Extract the metric labels from the data point.
@@ -233,8 +223,8 @@ def _extract_metric_labels(
233223
@staticmethod
234224
def _to_point(
235225
kind: "MetricDescriptor.MetricKind.V",
236-
data_point: Union[NumberDataPoint, HistogramDataPoint],
237-
) -> Point:
226+
data_point: Union["NumberDataPoint", "HistogramDataPoint"],
227+
) -> "Point":
238228
# Create a Google Cloud Monitoring data point value based on the OpenTelemetry metric data point type
239229
## For histograms, we need to calculate the mean and bucket counts
240230
if isinstance(data_point, HistogramDataPoint):
@@ -281,7 +271,7 @@ def _data_point_to_timeseries_pb(
281271
metric,
282272
monitored_resource,
283273
labels,
284-
) -> TimeSeries:
274+
) -> "TimeSeries":
285275
"""
286276
Convert the data point to a Google Cloud Monitoring time series.
287277
@@ -308,8 +298,8 @@ def _data_point_to_timeseries_pb(
308298

309299
@staticmethod
310300
def _resource_metrics_to_timeseries_pb(
311-
metrics_data: MetricsData,
312-
) -> List[TimeSeries]:
301+
metrics_data: "MetricsData",
302+
) -> List["TimeSeries"]:
313303
"""
314304
Convert the metrics data to a list of Google Cloud Monitoring time series.
315305
@@ -346,18 +336,18 @@ def _resource_metrics_to_timeseries_pb(
346336

347337
def export(
348338
self,
349-
metrics_data: MetricsData,
339+
metrics_data: "MetricsData",
350340
timeout_millis: float = 10_000,
351341
**kwargs,
352-
) -> MetricExportResult:
342+
) -> "MetricExportResult":
353343
"""
354344
Export the metrics data to Google Cloud Monitoring.
355345
356346
:param metrics_data: OpenTelemetry metrics data
357347
:param timeout_millis: timeout in milliseconds
358348
:return: MetricExportResult
359349
"""
360-
if not HAS_DEPENDENCIES_INSTALLED:
350+
if not HAS_OPENTELEMETRY_INSTALLED:
361351
logger.warning("Metric exporter called without dependencies installed.")
362352
return False
363353

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Interceptor for collecting Cloud Spanner metrics."""
16+
17+
from grpc_interceptor import ClientInterceptor
18+
from .constants import (
19+
GOOGLE_CLOUD_RESOURCE_KEY,
20+
SPANNER_METHOD_PREFIX,
21+
)
22+
from typing import Dict
23+
from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
24+
import re
25+
26+
27+
class MetricsInterceptor(ClientInterceptor):
28+
"""Interceptor that collects metrics for Cloud Spanner operations."""
29+
30+
@staticmethod
31+
def _parse_resource_path(path: str) -> dict:
32+
"""Parse the resource path to extract project, instance and database.
33+
34+
Args:
35+
path (str): The resource path from the request
36+
37+
Returns:
38+
dict: Extracted resource components
39+
"""
40+
# Match paths like:
41+
# projects/{project}/instances/{instance}/databases/{database}/sessions/{session}
42+
# projects/{project}/instances/{instance}/databases/{database}
43+
# projects/{project}/instances/{instance}
44+
pattern = r"^projects/(?P<project>[^/]+)(/instances/(?P<instance>[^/]+))?(/databases/(?P<database>[^/]+))?(/sessions/(?P<session>[^/]+))?.*$"
45+
match = re.match(pattern, path)
46+
if match:
47+
return {k: v for k, v in match.groupdict().items() if v is not None}
48+
return {}
49+
50+
@staticmethod
51+
def _extract_resource_from_path(metadata: Dict[str, str]) -> Dict[str, str]:
52+
"""
53+
Extracts resource information from the metadata based on the path.
54+
55+
This method iterates through the metadata dictionary to find the first tuple containing the key 'google-cloud-resource-prefix'. It then extracts the path from this tuple and parses it to extract project, instance, and database information using the _parse_resource_path method.
56+
57+
Args:
58+
metadata (Dict[str, str]): A dictionary containing metadata information.
59+
60+
Returns:
61+
Dict[str, str]: A dictionary containing extracted project, instance, and database information.
62+
"""
63+
# Extract resource info from the first metadata tuple containing :path
64+
path = next(
65+
(value for key, value in metadata if key == GOOGLE_CLOUD_RESOURCE_KEY), ""
66+
)
67+
68+
resources = MetricsInterceptor._parse_resource_path(path)
69+
return resources
70+
71+
@staticmethod
72+
def _remove_prefix(s: str, prefix: str) -> str:
73+
"""
74+
This function removes the prefix from the given string.
75+
76+
Args:
77+
s (str): The string from which the prefix is to be removed.
78+
prefix (str): The prefix to be removed from the string.
79+
80+
Returns:
81+
str: The string with the prefix removed.
82+
83+
Note:
84+
This function is used because the `removeprefix` method does not exist in Python 3.8.
85+
"""
86+
if s.startswith(prefix):
87+
return s[len(prefix) :]
88+
return s
89+
90+
def _set_metrics_tracer_attributes(self, resources: Dict[str, str]) -> None:
91+
"""
92+
Sets the metric tracer attributes based on the provided resources.
93+
94+
This method updates the current metric tracer's attributes with the project, instance, and database information extracted from the resources dictionary. If the current metric tracer is not set, the method does nothing.
95+
96+
Args:
97+
resources (Dict[str, str]): A dictionary containing project, instance, and database information.
98+
"""
99+
if SpannerMetricsTracerFactory.current_metrics_tracer is None:
100+
return
101+
102+
if resources:
103+
if "project" in resources:
104+
SpannerMetricsTracerFactory.current_metrics_tracer.set_project(
105+
resources["project"]
106+
)
107+
if "instance" in resources:
108+
SpannerMetricsTracerFactory.current_metrics_tracer.set_instance(
109+
resources["instance"]
110+
)
111+
if "database" in resources:
112+
SpannerMetricsTracerFactory.current_metrics_tracer.set_database(
113+
resources["database"]
114+
)
115+
116+
def intercept(self, invoked_method, request_or_iterator, call_details):
117+
"""Intercept gRPC calls to collect metrics.
118+
119+
Args:
120+
invoked_method: The RPC method
121+
request_or_iterator: The RPC request
122+
call_details: Details about the RPC call
123+
124+
Returns:
125+
The RPC response
126+
"""
127+
if SpannerMetricsTracerFactory.current_metrics_tracer is None:
128+
return invoked_method(request_or_iterator, call_details)
129+
130+
# Setup Metric Tracer attributes from call details
131+
## Extract Project / Instance / Databse from header information
132+
resources = self._extract_resource_from_path(call_details.metadata)
133+
self._set_metrics_tracer_attributes(resources)
134+
135+
## Format method to be be spanner.<method name>
136+
method_name = self._remove_prefix(
137+
call_details.method, SPANNER_METHOD_PREFIX
138+
).replace("/", ".")
139+
SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name)
140+
141+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start()
142+
response = invoked_method(request_or_iterator, call_details)
143+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion()
144+
145+
return response

0 commit comments

Comments
 (0)