Skip to content

Commit 6fd7489

Browse files
committed
Feat: Added Metric Interceptor integration with Attempt metrics
1 parent 32a351a commit 6fd7489

File tree

16 files changed

+479
-6
lines changed

16 files changed

+479
-6
lines changed

google/cloud/spanner_v1/client.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,27 @@
4848
from google.cloud.spanner_v1._helpers import _merge_query_options
4949
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
5050
from google.cloud.spanner_v1.instance import Instance
51+
from google.cloud.spanner_v1.metrics.constants import ENABLE_SPANNER_METRICS_ENV_VAR
52+
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import (
53+
SpannerMetricsTracerFactory,
54+
)
55+
from google.cloud.spanner_v1.metrics.metrics_exporter import (
56+
CloudMonitoringMetricsExporter,
57+
)
58+
59+
try:
60+
from opentelemetry import metrics
61+
from opentelemetry.sdk.metrics import MeterProvider
62+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
63+
64+
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True
65+
except ImportError: # pragma: NO COVER
66+
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
67+
5168

5269
_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
5370
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
71+
ENABLE_BUILTIN_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
5472
_EMULATOR_HOST_HTTP_SCHEME = (
5573
"%s contains a http scheme. When used with a scheme it may cause gRPC's "
5674
"DNS resolver to endlessly attempt to resolve. %s is intended to be used "
@@ -73,6 +91,10 @@ def _get_spanner_optimizer_statistics_package():
7391
return os.getenv(OPTIMIZER_STATISITCS_PACKAGE_ENV_VAR, "")
7492

7593

94+
def _get_spanner_enable_builtin_metrics():
95+
return os.getenv(ENABLE_SPANNER_METRICS_ENV_VAR) == "true"
96+
97+
7698
class Client(ClientWithProject):
7799
"""Client for interacting with Cloud Spanner API.
78100
@@ -196,6 +218,18 @@ def __init__(
196218
):
197219
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)
198220

221+
# Check flag to enable Spanner builtin metrics
222+
if _get_spanner_enable_builtin_metrics():
223+
meter_provider = metrics.NoOpMeterProvider()
224+
if not _get_spanner_emulator_host():
225+
meter_provider = MeterProvider(
226+
metric_readers=[
227+
PeriodicExportingMetricReader(CloudMonitoringMetricsExporter())
228+
]
229+
)
230+
metrics.set_meter_provider(meter_provider)
231+
SpannerMetricsTracerFactory()
232+
199233
self._route_to_leader_enabled = route_to_leader_enabled
200234
self._directed_read_options = directed_read_options
201235
self._observability_options = observability_options

google/cloud/spanner_v1/metrics/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
BUILT_IN_METRICS_METER_NAME = "gax-python"
1616
NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client"
1717
SPANNER_RESOURCE_TYPE = "spanner_instance_client"
18+
SPANNER_SERVICE_NAME = "spanner-python"
19+
GOOGLE_CLOUD_RESOURCE_KEY = "google-cloud-resource-prefix"
20+
GOOGLE_CLOUD_REGION_KEY = "cloud.region"
21+
GOOGLE_CLOUD_REGION_GLOBAL = "global"
22+
SPANNER_METHOD_PREFIX = "/google.spanner.v1."
23+
ENABLE_SPANNER_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
1824

1925
# Monitored resource labels
2026
MONITORED_RES_LABEL_KEY_PROJECT = "project_id"
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
"""Interceptor for collecting Cloud Spanner metrics."""
2+
3+
from grpc_interceptor import ClientInterceptor
4+
from .constants import (
5+
GOOGLE_CLOUD_RESOURCE_KEY,
6+
SPANNER_METHOD_PREFIX,
7+
)
8+
from .metrics_tracer import MetricsTracer
9+
from typing import Dict
10+
from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
11+
import re
12+
13+
14+
class MetricsInterceptor(ClientInterceptor):
15+
"""Interceptor that collects metrics for Cloud Spanner operations."""
16+
17+
@staticmethod
18+
def _parse_resource_path(path: str) -> dict:
19+
"""Parse the resource path to extract project, instance and database.
20+
21+
Args:
22+
path (str): The resource path from the request
23+
24+
Returns:
25+
dict: Extracted resource components
26+
"""
27+
# Match paths like:
28+
# projects/{project}/instances/{instance}/databases/{database}/sessions/{session}
29+
# projects/{project}/instances/{instance}/databases/{database}
30+
# projects/{project}/instances/{instance}
31+
pattern = r"^projects/(?P<project>[^/]+)(/instances/(?P<instance>[^/]+))?(/databases/(?P<database>[^/]+))?(/sessions/(?P<session>[^/]+))?.*$"
32+
match = re.match(pattern, path)
33+
if match:
34+
return {k: v for k, v in match.groupdict().items() if v is not None}
35+
return {}
36+
37+
@staticmethod
38+
def _extract_resource_from_path(metadata: Dict[str, str]) -> Dict[str, str]:
39+
"""
40+
Extracts resource information from the metadata based on the path.
41+
42+
This method iterates through the metadata dictionary to find the first tuple containing the key 'google-cloud-resource-prefix'. It then extracts the path from this tuple and parses it to extract project, instance, and database information using the _parse_resource_path method.
43+
44+
Args:
45+
metadata (Dict[str, str]): A dictionary containing metadata information.
46+
47+
Returns:
48+
Dict[str, str]: A dictionary containing extracted project, instance, and database information.
49+
"""
50+
# Extract resource info from the first metadata tuple containing :path
51+
path = next(
52+
(value for key, value in metadata if key == GOOGLE_CLOUD_RESOURCE_KEY), ""
53+
)
54+
55+
resources = MetricsInterceptor._parse_resource_path(path)
56+
return resources
57+
58+
@staticmethod
59+
def _remove_prefix(s: str, prefix: str) -> str:
60+
"""
61+
This function removes the prefix from the given string.
62+
63+
Args:
64+
s (str): The string from which the prefix is to be removed.
65+
prefix (str): The prefix to be removed from the string.
66+
67+
Returns:
68+
str: The string with the prefix removed.
69+
70+
Note:
71+
This function is used because the `removeprefix` method does not exist in Python 3.8.
72+
"""
73+
if s.startswith(prefix):
74+
return s[len(prefix) :]
75+
return s
76+
77+
def _set_metrics_tracer_attributes(self, resources: Dict[str, str]) -> None:
78+
"""
79+
Sets the metric tracer attributes based on the provided resources.
80+
81+
This method updates the current metric tracer's attributes with the project, instance, and database information extracted from the resources dictionary. If the current metric tracer is not set, the method does nothing.
82+
83+
Args:
84+
resources (Dict[str, str]): A dictionary containing project, instance, and database information.
85+
"""
86+
if SpannerMetricsTracerFactory.current_metrics_tracer is None:
87+
return
88+
89+
if resources:
90+
if "project" in resources:
91+
SpannerMetricsTracerFactory.current_metrics_tracer.set_project(resources["project"])
92+
if "instance" in resources:
93+
SpannerMetricsTracerFactory.current_metrics_tracer.set_instance(resources["instance"])
94+
if "database" in resources:
95+
SpannerMetricsTracerFactory.current_metrics_tracer.set_database(resources["database"])
96+
97+
def intercept(self, invoked_method, request_or_iterator, call_details):
98+
"""Intercept gRPC calls to collect metrics.
99+
100+
Args:
101+
invoked_method: The RPC method
102+
request_or_iterator: The RPC request
103+
call_details: Details about the RPC call
104+
105+
Returns:
106+
The RPC response
107+
"""
108+
if SpannerMetricsTracerFactory.current_metrics_tracer is None:
109+
return invoked_method(request_or_iterator, call_details)
110+
111+
# Setup Metric Tracer attributes from call details
112+
## Extract Project / Instance / Databse from header information
113+
resources = self._extract_resource_from_path(call_details.metadata)
114+
self._set_metrics_tracer_attributes(resources)
115+
## Format method to be be spanner.<method name>
116+
method_name = self._remove_prefix(
117+
call_details.method, SPANNER_METHOD_PREFIX
118+
).replace("/", ".")
119+
SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name)
120+
121+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start()
122+
response = invoked_method(request_or_iterator, call_details)
123+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion()
124+
125+
return response

google/cloud/spanner_v1/metrics/metrics_tracer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
METRIC_LABEL_KEY_CLIENT_UID,
2929
METRIC_LABEL_KEY_DATABASE,
3030
METRIC_LABEL_KEY_DIRECT_PATH_ENABLED,
31-
METRIC_LABEL_KEY_DIRECT_PATH_USED,
3231
METRIC_LABEL_KEY_METHOD,
3332
METRIC_LABEL_KEY_STATUS,
3433
MONITORED_RES_LABEL_KEY_CLIENT_HASH,
@@ -399,7 +398,6 @@ def _create_operation_otel_attributes(self) -> dict:
399398
self._client_attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.status
400399
return self._client_attributes
401400

402-
403401
def _create_attempt_otel_attributes(self) -> dict:
404402
"""
405403
Create additional attributes for attempt metrics tracing.
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
"""This module provides a singleton factory for creating SpannerMetricsTracer instances."""
18+
19+
from .metrics_tracer_factory import MetricsTracerFactory
20+
import os
21+
from .constants import (
22+
SPANNER_SERVICE_NAME,
23+
GOOGLE_CLOUD_REGION_KEY,
24+
GOOGLE_CLOUD_REGION_GLOBAL,
25+
)
26+
from opentelemetry.resourcedetector.gcp_resource_detector import (
27+
GoogleCloudResourceDetector,
28+
)
29+
from .metrics_tracer import MetricsTracer
30+
from google.cloud.spanner_v1 import __version__
31+
import mmh3
32+
from uuid import uuid4
33+
34+
35+
class SpannerMetricsTracerFactory(MetricsTracerFactory):
36+
"""A factory for creating SpannerMetricsTracer instances."""
37+
38+
_metrics_tracer_factory: "SpannerMetricsTracerFactory" = None
39+
current_metrics_tracer: MetricsTracer = None
40+
41+
def __new__(cls, enabled: bool = True) -> "SpannerMetricsTracerFactory":
42+
"""Create a new instance of SpannerMetricsTracerFactory if it doesn't already exist."""
43+
if cls._metrics_tracer_factory is None:
44+
cls._metrics_tracer_factory = MetricsTracerFactory(
45+
enabled, SPANNER_SERVICE_NAME
46+
)
47+
48+
client_uid = cls._generate_client_uid()
49+
cls._metrics_tracer_factory.set_client_uid(client_uid)
50+
cls._metrics_tracer_factory.set_instance_config(cls._get_instance_config())
51+
cls._metrics_tracer_factory.set_client_name(cls._get_client_name)
52+
cls._metrics_tracer_factory.set_client_hash(
53+
cls._generate_client_hash(client_uid)
54+
)
55+
cls._metrics_tracer_factory.set_location(cls._get_location())
56+
return cls._metrics_tracer_factory
57+
58+
@staticmethod
59+
def _generate_client_uid() -> str:
60+
"""Generate a client UID in the form of uuidv4@pid@hostname."""
61+
try:
62+
hostname = os.uname()[1]
63+
pid = str(os.getpid())[0:10] # Limit PID to 10 characters
64+
uuid = uuid4()
65+
return f"{uuid}@{pid}@{hostname}"
66+
except Exception:
67+
return ""
68+
69+
@staticmethod
70+
def _get_instance_config() -> str:
71+
"""Get the instance configuration."""
72+
# TODO: unknown until there's a good way to get it.
73+
return "unknown"
74+
75+
@staticmethod
76+
def _get_client_name() -> str:
77+
"""Get the client name."""
78+
return f"{SPANNER_SERVICE_NAME}/{__version__}"
79+
80+
@staticmethod
81+
def _generate_client_hash(client_uid: str) -> str:
82+
"""
83+
Generate a 6-digit zero-padded lowercase hexadecimal hash using the 10 most significant bits of a 64-bit hash value.
84+
85+
The primary purpose of this function is to generate a hash value for the `client_hash`
86+
resource label using `client_uid` metric field. The range of values is chosen to be small
87+
enough to keep the cardinality of the Resource targets under control. Note: If at later time
88+
the range needs to be increased, it can be done by increasing the value of `kPrefixLength` to
89+
up to 24 bits without changing the format of the returned value.
90+
"""
91+
if not client_uid:
92+
return "000000"
93+
hashed_client = mmh3.hash64(client_uid)
94+
95+
# Join the hashes back together since mmh3 splits into high and low 32bits
96+
full_hash = (hashed_client[0] << 32) | (hashed_client[1] & 0xFFFFFFFF)
97+
unsigned_hash = full_hash & 0xFFFFFFFFFFFFFFFF
98+
99+
k_prefix_length = 10
100+
sig_figs = unsigned_hash >> (64 - k_prefix_length)
101+
102+
# Return as 6 digit zero padded hex string
103+
return f"{sig_figs:06x}"
104+
105+
@staticmethod
106+
def _get_location() -> str:
107+
"""Get the location of the resource."""
108+
detector = GoogleCloudResourceDetector()
109+
resources = detector.detect()
110+
if GOOGLE_CLOUD_REGION_KEY not in resources.attributes:
111+
return GOOGLE_CLOUD_REGION_GLOBAL
112+
else:
113+
return resources[GOOGLE_CLOUD_REGION_KEY]

google/cloud/spanner_v1/services/spanner/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
from .transports.grpc import SpannerGrpcTransport
6363
from .transports.grpc_asyncio import SpannerGrpcAsyncIOTransport
6464
from .transports.rest import SpannerRestTransport
65+
from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor
6566

6667

6768
class SpannerClientMeta(type):
@@ -705,6 +706,7 @@ def __init__(
705706
client_info=client_info,
706707
always_use_jwt_access=True,
707708
api_audience=self._client_options.api_audience,
709+
metrics_interceptor=MetricsInterceptor(),
708710
)
709711

710712
def create_session(

google/cloud/spanner_v1/services/spanner/transports/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from google.cloud.spanner_v1.types import spanner
3232
from google.cloud.spanner_v1.types import transaction
3333
from google.protobuf import empty_pb2 # type: ignore
34+
from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor
3435

3536
DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
3637
gapic_version=package_version.__version__
@@ -58,6 +59,7 @@ def __init__(
5859
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
5960
always_use_jwt_access: Optional[bool] = False,
6061
api_audience: Optional[str] = None,
62+
metrics_interceptor: Optional[MetricsInterceptor] = None,
6163
**kwargs,
6264
) -> None:
6365
"""Instantiate the transport.

google/cloud/spanner_v1/services/spanner/transports/grpc.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
from google.cloud.spanner_v1.types import result_set
2929
from google.cloud.spanner_v1.types import spanner
3030
from google.cloud.spanner_v1.types import transaction
31+
32+
from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor
33+
from google.cloud.spanner_v1.metrics.constants import ENABLE_SPANNER_METRICS_ENV_VAR
3134
from google.protobuf import empty_pb2 # type: ignore
3235
from .base import SpannerTransport, DEFAULT_CLIENT_INFO
3336

@@ -66,6 +69,7 @@ def __init__(
6669
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
6770
always_use_jwt_access: Optional[bool] = False,
6871
api_audience: Optional[str] = None,
72+
metrics_interceptor: Optional[MetricsInterceptor] = None,
6973
) -> None:
7074
"""Instantiate the transport.
7175
@@ -187,6 +191,12 @@ def __init__(
187191
],
188192
)
189193

194+
# Wrap the gRPC channel with the metric interceptor
195+
if metrics_interceptor is not None:
196+
self._grpc_channel = grpc.intercept_channel(
197+
self._grpc_channel, metrics_interceptor
198+
)
199+
190200
# Wrap messages. This must be done after self._grpc_channel exists
191201
self._prep_wrapped_messages(client_info)
192202

0 commit comments

Comments
 (0)