This repository was archived by the owner on Mar 31, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 104
Expand file tree
/
Copy pathtest__opentelemetry_tracing.py
More file actions
229 lines (190 loc) · 8.71 KB
/
test__opentelemetry_tracing.py
File metadata and controls
229 lines (190 loc) · 8.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import mock
try:
from opentelemetry import trace as trace_api
from opentelemetry.trace.status import StatusCode
except ImportError:
pass
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.spanner_v1._helpers import GOOGLE_CLOUD_REGION_GLOBAL
from google.cloud.spanner_v1 import _opentelemetry_tracing
from tests._helpers import (
OpenTelemetryBase,
LIB_VERSION,
enrich_with_otel_scope,
)
def _make_rpc_error(error_cls, trailing_metadata=None):
import grpc
grpc_error = mock.create_autospec(grpc.Call, instance=True)
grpc_error.trailing_metadata.return_value = trailing_metadata
return error_cls("error", errors=(grpc_error,))
def _make_session():
from google.cloud.spanner_v1.session import Session
session = mock.Mock(autospec=Session, instance=True)
# Set a string name to allow concatenation
session._database.name = "projects/p/instances/i/databases/d"
return session
class TestTracing(OpenTelemetryBase):
@mock.patch(
"google.cloud.spanner_v1._opentelemetry_tracing._get_cloud_region",
return_value="global",
)
def test_trace_call(self, mock_region):
extra_attributes = {
"attribute1": "value1",
# Since our database is mocked, we have to override the db.instance parameter so it is a string
"db.instance": "database_name",
}
expected_attributes = enrich_with_otel_scope(
{
"db.type": "spanner",
"db.url": "spanner.googleapis.com",
"net.host.name": "spanner.googleapis.com",
"cloud.region": GOOGLE_CLOUD_REGION_GLOBAL,
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
"gcp.resource.name": _opentelemetry_tracing.GCP_RESOURCE_NAME_PREFIX
+ "projects/p/instances/i/databases/d",
}
)
expected_attributes.update(extra_attributes)
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
span.set_attribute("after_setup_attribute", 1)
expected_attributes["after_setup_attribute"] = 1
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.attributes, expected_attributes)
self.assertEqual(span.name, "CloudSpanner.Test")
self.assertEqual(span.status.status_code, StatusCode.OK)
@mock.patch(
"google.cloud.spanner_v1._opentelemetry_tracing._get_cloud_region",
return_value="global",
)
def test_trace_error(self, mock_region):
extra_attributes = {"db.instance": "database_name"}
expected_attributes = enrich_with_otel_scope(
{
"db.type": "spanner",
"db.url": "spanner.googleapis.com",
"net.host.name": "spanner.googleapis.com",
"cloud.region": GOOGLE_CLOUD_REGION_GLOBAL,
"gcp.client.service": "spanner",
"gcp.client.version": LIB_VERSION,
"gcp.client.repo": "googleapis/python-spanner",
"gcp.resource.name": _opentelemetry_tracing.GCP_RESOURCE_NAME_PREFIX
+ "projects/p/instances/i/databases/d",
}
)
expected_attributes.update(extra_attributes)
with self.assertRaises(GoogleAPICallError):
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
from google.api_core.exceptions import InvalidArgument
raise _make_rpc_error(InvalidArgument)
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(dict(span.attributes), expected_attributes)
self.assertEqual(span.name, "CloudSpanner.Test")
self.assertEqual(span.status.status_code, StatusCode.ERROR)
def test_trace_grpc_error(self):
extra_attributes = {"db.instance": "database_name"}
expected_attributes = enrich_with_otel_scope(
{
"db.type": "spanner",
"db.url": "spanner.googleapis.com:443",
"net.host.name": "spanner.googleapis.com:443",
}
)
expected_attributes.update(extra_attributes)
with self.assertRaises(GoogleAPICallError):
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
from google.api_core.exceptions import DataLoss
raise DataLoss("error")
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.status.status_code, StatusCode.ERROR)
def test_trace_codeless_error(self):
extra_attributes = {"db.instance": "database_name"}
expected_attributes = enrich_with_otel_scope(
{
"db.type": "spanner",
"db.url": "spanner.googleapis.com:443",
"net.host.name": "spanner.googleapis.com:443",
}
)
expected_attributes.update(extra_attributes)
with self.assertRaises(GoogleAPICallError):
with _opentelemetry_tracing.trace_call(
"CloudSpanner.Test", _make_session(), extra_attributes
) as span:
raise GoogleAPICallError("error")
span_list = self.ot_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(span.status.status_code, StatusCode.ERROR)
def test_trace_call_terminal_span_status_ALWAYS_ON_sampler(self):
# Verify that we don't unconditionally set the terminal span status to
# SpanStatus.OK per https://github.com/googleapis/python-spanner/issues/1246
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = InMemorySpanExporter()
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
observability_options = dict(tracer_provider=tracer_provider)
session = _make_session()
with _opentelemetry_tracing.trace_call(
"VerifyTerminalSpanStatus",
session,
observability_options=observability_options,
) as span:
span.set_status(Status(StatusCode.ERROR, "Our error exhibit"))
span_list = trace_exporter.get_finished_spans()
got_statuses = []
for span in span_list:
got_statuses.append(
(span.name, span.status.status_code, span.status.description)
)
want_statuses = [
("VerifyTerminalSpanStatus", StatusCode.ERROR, "Our error exhibit"),
]
assert got_statuses == want_statuses
def test_trace_call_terminal_span_status_ALWAYS_OFF_sampler(self):
# Verify that we get the correct status even when using the ALWAYS_OFF
# sampler which produces the NonRecordingSpan per
# https://github.com/googleapis/python-spanner/issues/1286
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
tracer_provider = TracerProvider(sampler=ALWAYS_OFF)
trace_exporter = InMemorySpanExporter()
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
observability_options = dict(tracer_provider=tracer_provider)
session = _make_session()
used_span = None
with _opentelemetry_tracing.trace_call(
"VerifyWithNonRecordingSpan",
session,
observability_options=observability_options,
) as span:
used_span = span
assert type(used_span).__name__ == "NonRecordingSpan"
span_list = list(trace_exporter.get_finished_spans())
assert span_list == []