forked from googleapis/python-spanner
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_metrics.py
More file actions
127 lines (102 loc) · 4.42 KB
/
test_metrics.py
File metadata and controls
127 lines (102 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from unittest.mock import MagicMock
from google.api_core.exceptions import ServiceUnavailable
from google.auth import exceptions
from google.auth.credentials import Credentials
from google.cloud.spanner_v1.client import Client
from unittest.mock import patch
from grpc._interceptor import _UnaryOutcome
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import (
SpannerMetricsTracerFactory,
)
from opentelemetry import metrics
pytest.importorskip("opentelemetry")
# Skip if semconv attributes are not present, as tracing won't be enabled either
# pytest.importorskip("opentelemetry.semconv.attributes.otel_attributes")
class MockCredentials(Credentials):
@property
def expired(self):
return False
@property
def valid(self):
return True
def refresh(self, request):
raise exceptions.InvalidOperation("Anonymous credentials cannot be refreshed.")
def apply(self, headers, token=None):
if token is not None:
raise exceptions.InvalidValue("Anonymous credentials don't support tokens.")
def before_request(self, request, method, url, headers):
"""Anonymous credentials do nothing to the request."""
@pytest.fixture(autouse=True)
def patched_client(monkeypatch):
monkeypatch.setenv("SPANNER_ENABLE_BUILTIN_METRICS", "true")
metrics.set_meter_provider(metrics.NoOpMeterProvider())
# Remove the Tracer factory to avoid previously disabled factory polluting from other tests
if SpannerMetricsTracerFactory._metrics_tracer_factory is not None:
SpannerMetricsTracerFactory._metrics_tracer_factory = None
# Reset the global flag to ensure metrics initialization runs
from google.cloud.spanner_v1 import client as client_module
client_module._metrics_monitor_initialized = False
with patch(
"google.cloud.spanner_v1.metrics.metrics_exporter.MetricServiceClient"
), patch(
"google.cloud.spanner_v1.metrics.metrics_exporter.CloudMonitoringMetricsExporter"
), patch(
"opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader"
):
client = Client(
project="test",
credentials=MockCredentials(),
)
yield client
# Resetting
metrics.set_meter_provider(metrics.NoOpMeterProvider())
SpannerMetricsTracerFactory._metrics_tracer_factory = None
SpannerMetricsTracerFactory.current_metrics_tracer = None
def test_metrics_emission_with_failure_attempt(patched_client):
instance = patched_client.instance("test-instance")
database = instance.database("example-db")
factory = SpannerMetricsTracerFactory()
assert factory.enabled
transport = database.spanner_api._transport
metrics_interceptor = transport._metrics_interceptor
original_intercept = metrics_interceptor.intercept
first_attempt = True
def mocked_raise(*args, **kwargs):
raise ServiceUnavailable("Service Unavailable")
def mocked_call(*args, **kwargs):
return _UnaryOutcome(MagicMock(), MagicMock())
def intercept_wrapper(invoked_method, request_or_iterator, call_details):
nonlocal first_attempt
invoked_method = mocked_call
if first_attempt:
first_attempt = False
invoked_method = mocked_raise
response = original_intercept(
invoked_method=invoked_method,
request_or_iterator=request_or_iterator,
call_details=call_details,
)
return response
metrics_interceptor.intercept = intercept_wrapper
patch_path = "google.cloud.spanner_v1.metrics.metrics_exporter.CloudMonitoringMetricsExporter.export"
with patch(patch_path):
with database.snapshot():
pass
# Verify that the attempt count increased from the failed initial attempt
assert (
SpannerMetricsTracerFactory.current_metrics_tracer.current_op.attempt_count
) == 2