Skip to content

Commit 9368e66

Browse files
committed
Feat: Added Operation and GFE Metrics
1 parent ee8c878 commit 9368e66

File tree

8 files changed

+302
-21
lines changed

8 files changed

+302
-21
lines changed

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
except ImportError:
3434
HAS_OPENTELEMETRY_INSTALLED = False
3535

36+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
37+
3638
TRACER_NAME = "cloud.google.com/python/spanner"
3739
TRACER_VERSION = gapic_version.__version__
3840
extended_tracing_globally_disabled = (
@@ -107,26 +109,27 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
107109
with tracer.start_as_current_span(
108110
name, kind=trace.SpanKind.CLIENT, attributes=attributes
109111
) as span:
110-
try:
111-
yield span
112-
except Exception as error:
113-
span.set_status(Status(StatusCode.ERROR, str(error)))
114-
# OpenTelemetry-Python imposes invoking span.record_exception on __exit__
115-
# on any exception. We should file a bug later on with them to only
116-
# invoke .record_exception if not already invoked, hence we should not
117-
# invoke .record_exception on our own else we shall have 2 exceptions.
118-
raise
119-
else:
120-
# All spans still have set_status available even if for example
121-
# NonRecordingSpan doesn't have "_status".
122-
absent_span_status = getattr(span, "_status", None) is None
123-
if absent_span_status or span._status.status_code == StatusCode.UNSET:
124-
# OpenTelemetry-Python only allows a status change
125-
# if the current code is UNSET or ERROR. At the end
126-
# of the generator's consumption, only set it to OK
127-
# it wasn't previously set otherwise.
128-
# https://github.com/googleapis/python-spanner/issues/1246
129-
span.set_status(Status(StatusCode.OK))
112+
with MetricsCapture():
113+
try:
114+
yield span
115+
except Exception as error:
116+
span.set_status(Status(StatusCode.ERROR, str(error)))
117+
# OpenTelemetry-Python imposes invoking span.record_exception on __exit__
118+
# on any exception. We should file a bug later on with them to only
119+
# invoke .record_exception if not already invoked, hence we should not
120+
# invoke .record_exception on our own else we shall have 2 exceptions.
121+
raise
122+
else:
123+
# All spans still have set_status available even if for example
124+
# NonRecordingSpan doesn't have "_status".
125+
absent_span_status = getattr(span, "_status", None) is None
126+
if absent_span_status or span._status.status_code == StatusCode.UNSET:
127+
# OpenTelemetry-Python only allows a status change
128+
# if the current code is UNSET or ERROR. At the end
129+
# of the generator's consumption, only set it to OK
130+
# it wasn't previously set otherwise.
131+
# https://github.com/googleapis/python-spanner/issues/1246
132+
span.set_status(Status(StatusCode.OK))
130133

131134

132135
def get_current_span():
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
2+
from .metrics_interceptor import MetricsInterceptor
3+
4+
class MetricsCapture:
5+
def __enter__(self):
6+
factory = SpannerMetricsTracerFactory()
7+
8+
# Define a new metrics tracer for the new operation
9+
SpannerMetricsTracerFactory.current_metrics_tracer = factory.create_metrics_tracer()
10+
SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_start()
11+
return self
12+
13+
def __exit__(self, exc_type, exc_value, traceback):
14+
SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_completion()
15+
return False # Propagate the exception if any
16+
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
try:
18+
from opentelemetry.metrics import Counter, Histogram, get_meter_provider
19+
20+
HAS_OPENTELEMETRY_INSTALLED = True
21+
except ImportError: # pragma: NO COVER
22+
HAS_OPENTELEMETRY_INSTALLED = False
23+
from typing import List
24+
import re
25+
from google.cloud.spanner_v1.metrics.constants import (
26+
BUILT_IN_METRICS_METER_NAME,
27+
SPANNER_SERVICE_NAME,
28+
METRIC_NAME_GFE_LATENCY,
29+
METRIC_NAME_GFE_MISSING_HEADER_COUNT
30+
)
31+
from google.cloud.spanner_v1 import __version__
32+
33+
class MetricsGfeTracer():
34+
_instrument_gfe_latency: Histogram
35+
_instrument_gfe_missing_header_count: Counter
36+
enabled = False
37+
38+
def __init__(self) -> None:
39+
self._create_gfe_metric_instruments()
40+
41+
def record_gfe_metrics(self, metadata: List):
42+
if not self.enabled:
43+
return
44+
45+
gfe_headers = [header.value for header in metadata if header.key == 'server-timing' and header.value.startswith("gfe")]
46+
47+
if len(gfe_headers) == 0:
48+
self.record_gfe_missing_header_count()
49+
return
50+
51+
for gfe_header in gfe_headers:
52+
match = re.search(r'dur=(\d+)', gfe_header)
53+
if match:
54+
duration = int(match.group(1))
55+
self.record_gfe_latency(duration)
56+
57+
58+
def record_gfe_latency(self, latency: int) -> None:
59+
print("real called")
60+
if not self.enabled:
61+
return
62+
self._instrument_gfe_latency.record(amount=latency)
63+
64+
def record_gfe_missing_header_count(self) -> None:
65+
if not self.enabled:
66+
return
67+
self._instrument_gfe_missing_header_count.add(amount=1)
68+
69+
def _create_gfe_metric_instruments(self):
70+
meter_provider = get_meter_provider()
71+
meter = meter_provider.get_meter(
72+
name=BUILT_IN_METRICS_METER_NAME, version=__version__
73+
)
74+
self._instrument_gfe_latency = meter.create_histogram(
75+
name=f"{SPANNER_SERVICE_NAME}/{METRIC_NAME_GFE_LATENCY}",
76+
unit="ms",
77+
description="GFE Latency."
78+
)
79+
self._instrument_gfe_missing_header_count = meter.create_counter(
80+
name=f"{SPANNER_SERVICE_NAME}/{METRIC_NAME_GFE_MISSING_HEADER_COUNT}",
81+
unit="1",
82+
description="GFE missing header count."
83+
84+
)

google/cloud/spanner_v1/metrics/metrics_interceptor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
GOOGLE_CLOUD_RESOURCE_KEY,
2020
SPANNER_METHOD_PREFIX,
2121
)
22+
2223
from typing import Dict
2324
from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
2425
import re
@@ -142,4 +143,10 @@ def intercept(self, invoked_method, request_or_iterator, call_details):
142143
response = invoked_method(request_or_iterator, call_details)
143144
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion()
144145

146+
# Process and send GFE metrics if enabled
147+
if SpannerMetricsTracerFactory.metrics_gfe_tracer.enabled:
148+
metadata = response.initial_metadata()
149+
SpannerMetricsTracerFactory.metrics_gfe_tracer.record_gfe_metrics(metadata)
150+
else:
151+
print("disabled")
145152
return response

google/cloud/spanner_v1/metrics/metrics_tracer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class MetricAttemptTracer:
5656
direct_path_used: bool
5757
status: str
5858

59-
def __init__(self):
59+
def __init__(self) -> None:
6060
"""
6161
Initialize a MetricAttemptTracer instance with default values.
6262

google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
GoogleCloudResourceDetector,
2828
)
2929
from .metrics_tracer import MetricsTracer
30+
from .metrics_gfe_tracer import MetricsGfeTracer
3031
from google.cloud.spanner_v1 import __version__
3132
import mmh3
3233
from uuid import uuid4
@@ -37,6 +38,7 @@ class SpannerMetricsTracerFactory(MetricsTracerFactory):
3738

3839
_metrics_tracer_factory: "SpannerMetricsTracerFactory" = None
3940
current_metrics_tracer: MetricsTracer = None
41+
metrics_gfe_tracer: MetricsGfeTracer = MetricsGfeTracer()
4042

4143
def __new__(cls, enabled: bool = True) -> "SpannerMetricsTracerFactory":
4244
"""Create a new instance of SpannerMetricsTracerFactory if it doesn't already exist."""
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import pytest
2+
from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor
3+
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
4+
from google.cloud.spanner_v1.metrics.metrics_gfe_tracer import MetricsGfeTracer
5+
from unittest.mock import MagicMock
6+
7+
8+
@pytest.fixture
9+
def interceptor():
10+
return MetricsInterceptor()
11+
12+
def test_parse_resource_path_valid(interceptor):
13+
path = "projects/my_project/instances/my_instance/databases/my_database"
14+
expected = {
15+
"project": "my_project",
16+
"instance": "my_instance",
17+
"database": "my_database",
18+
}
19+
assert interceptor._parse_resource_path(path) == expected
20+
21+
22+
def test_parse_resource_path_invalid(interceptor):
23+
path = "invalid/path"
24+
expected = {}
25+
assert interceptor._parse_resource_path(path) == expected
26+
27+
28+
def test_extract_resource_from_path(interceptor):
29+
metadata = [
30+
(
31+
"google-cloud-resource-prefix",
32+
"projects/my_project/instances/my_instance/databases/my_database",
33+
)
34+
]
35+
expected = {
36+
"project": "my_project",
37+
"instance": "my_instance",
38+
"database": "my_database",
39+
}
40+
assert interceptor._extract_resource_from_path(metadata) == expected
41+
42+
43+
def test_set_metrics_tracer_attributes(interceptor):
44+
SpannerMetricsTracerFactory.current_metrics_tracer = MockMetricTracer()
45+
resources = {
46+
"project": "my_project",
47+
"instance": "my_instance",
48+
"database": "my_database",
49+
}
50+
51+
interceptor._set_metrics_tracer_attributes(resources)
52+
assert SpannerMetricsTracerFactory.current_metrics_tracer.project == "my_project"
53+
assert SpannerMetricsTracerFactory.current_metrics_tracer.instance == "my_instance"
54+
assert SpannerMetricsTracerFactory.current_metrics_tracer.database == "my_database"
55+
56+
57+
def test_intercept_without_tracer(interceptor):
58+
mock_invoked_method = MagicMock(return_value="response")
59+
mock_details = MagicMock(metadata={})
60+
response = interceptor.intercept(mock_invoked_method, "request", mock_details)
61+
62+
assert response == "response"
63+
mock_invoked_method.assert_called_once_with("request", mock_details)
64+
65+
66+
def test_intercept_with_tracer(interceptor):
67+
SpannerMetricsTracerFactory.current_metrics_tracer = MockMetricTracer()
68+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start = MagicMock()
69+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion = MagicMock()
70+
71+
mock_invoked_method = MagicMock(return_value="response")
72+
call_details = MagicMock(
73+
method="spanner.someMethod",
74+
metadata=[
75+
(
76+
"google-cloud-resource-prefix",
77+
"projects/my_project/instances/my_instance/databases/my_database",
78+
)
79+
],
80+
)
81+
82+
response = interceptor.intercept(mock_invoked_method, "request", call_details)
83+
84+
assert response == "response"
85+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start.assert_called_once()
86+
SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion.assert_called_once()
87+
mock_invoked_method.assert_called_once_with("request", call_details)
88+
89+
def test_intercept_with_gfe_metrics(interceptor):
90+
91+
# Set the mock GFE tracer
92+
SpannerMetricsTracerFactory.metrics_gfe_tracer = MockGfeTracer()
93+
SpannerMetricsTracerFactory.metrics_gfe_tracer.enabled = True
94+
mock_invoked_method = MagicMock(return_value=MagicMock(initial_metadata=lambda: [("google-cloud-resource-prefix", "projects/my_project/instances/my_instance/databases/my_database")]))
95+
call_details = MagicMock(
96+
method="spanner.someMethod",
97+
metadata=[
98+
(
99+
"google-cloud-resource-prefix",
100+
"projects/my_project/instances/my_instance/databases/my_database",
101+
)
102+
],
103+
)
104+
105+
response = interceptor.intercept(mock_invoked_method, "request", call_details)
106+
107+
assert response == mock_invoked_method.return_value
108+
assert SpannerMetricsTracerFactory.metrics_gfe_tracer.metadata == [("google-cloud-resource-prefix", "projects/my_project/instances/my_instance/databases/my_database")]
109+
mock_invoked_method.assert_called_once_with("request", call_details)
110+
111+
class MockMetricTracer:
112+
def __init__(self):
113+
self.project = None
114+
self.instance = None
115+
self.database = None
116+
self.method = None
117+
118+
def set_project(self, project):
119+
self.project = project
120+
121+
def set_instance(self, instance):
122+
self.instance = instance
123+
124+
def set_database(self, database):
125+
self.database = database
126+
127+
def set_method(self, method):
128+
self.method = method
129+
130+
def record_attempt_start(self):
131+
pass
132+
133+
def record_attempt_completion(self):
134+
pass
135+
class MockGfeTracer:
136+
enabled = True
137+
metadata = []
138+
139+
def record_gfe_metrics(self, metadata):
140+
print("???")
141+
self.metadata = metadata

tests/unit/test_metrics_capture.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import pytest
2+
from unittest import mock
3+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
4+
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
5+
from google.cloud.spanner_v1.metrics.metrics_tracer_factory import MetricsTracerFactory
6+
7+
@pytest.fixture
8+
def mock_tracer_factory():
9+
with mock.patch.object(MetricsTracerFactory, 'create_metrics_tracer') as mock_create:
10+
yield mock_create
11+
12+
def test_metrics_capture_enter(mock_tracer_factory):
13+
mock_tracer = mock.Mock()
14+
mock_tracer_factory.return_value = mock_tracer
15+
16+
with MetricsCapture() as capture:
17+
assert capture is not None
18+
mock_tracer_factory.assert_called_once()
19+
mock_tracer.record_operation_start.assert_called_once()
20+
21+
def test_metrics_capture_exit(mock_tracer_factory):
22+
mock_tracer = mock.Mock()
23+
mock_tracer_factory.return_value = mock_tracer
24+
25+
with MetricsCapture():
26+
pass
27+
28+
mock_tracer.record_operation_completion.assert_called_once()

0 commit comments

Comments
 (0)