forked from instana/python-sensor
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_google-cloud-pubsub.py
More file actions
206 lines (166 loc) · 6.91 KB
/
Copy pathtest_google-cloud-pubsub.py
File metadata and controls
206 lines (166 loc) · 6.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# (c) Copyright IBM Corp. 2021
# (c) Copyright Instana Inc. 2021
import os
import sys
import threading
import time
from typing import Generator
import pytest
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
from google.cloud.pubsub_v1.publisher import exceptions
from opentelemetry.trace import SpanKind
from instana.singletons import agent, get_tracer
from instana.span.span import get_current_span
from tests.test_utils import _TraceContextMixin
# Use PubSub Emulator exposed at :8085
os.environ["PUBSUB_EMULATOR_HOST"] = "localhost:8681"
class TestPubSubPublish(_TraceContextMixin):
publisher = PublisherClient()
@pytest.fixture(autouse=True)
def _resource(self) -> Generator[None, None, None]:
self.tracer = get_tracer()
self.recorder = self.tracer.span_processor
self.recorder.clear_spans()
self.project_id = "test-project"
self.topic_name = "test-topic"
# setup topic_path & topic
self.topic_path = self.publisher.topic_path(self.project_id, self.topic_name)
try:
self.publisher.create_topic(request={"name": self.topic_path})
except AlreadyExists:
self.publisher.delete_topic(request={"topic": self.topic_path})
self.publisher.create_topic(request={"name": self.topic_path})
yield
self.publisher.delete_topic(request={"topic": self.topic_path})
agent.options.allow_exit_as_root = False
def test_publish(self) -> None:
# publish a single message
with self.tracer.start_as_current_span("test"):
future = self.publisher.publish(
self.topic_path, b"Test Message", origin="instana"
)
time.sleep(2.0) # for sanity
result = future.result()
assert isinstance(result, str)
spans = self.recorder.queued_spans()
gcps_span, test_span = spans[0], spans[1]
assert len(spans) == 2
current_span = get_current_span()
assert not current_span.is_recording()
assert gcps_span.n == "gcps"
assert gcps_span.k is SpanKind.CLIENT
assert gcps_span.data["gcps"]["op"] == "publish"
assert self.topic_name == gcps_span.data["gcps"]["top"]
# Trace Context Propagation
self.assertTraceContextPropagated(test_span, gcps_span)
# Error logging
self.assertErrorLogging(spans)
def test_publish_as_root_exit_span(self) -> None:
agent.options.allow_exit_as_root = True
# publish a single message
future = self.publisher.publish(
self.topic_path, b"Test Message", origin="instana"
)
time.sleep(2.0) # for sanity
result = future.result()
assert isinstance(result, str)
spans = self.recorder.queued_spans()
assert len(spans) == 1
gcps_span = spans[0]
current_span = get_current_span()
assert not current_span.is_recording()
assert gcps_span.n == "gcps"
assert gcps_span.k is SpanKind.CLIENT
assert gcps_span.data["gcps"]["op"] == "publish"
assert self.topic_name == gcps_span.data["gcps"]["top"]
# Error logging
self.assertErrorLogging(spans)
class AckCallback(object):
def __init__(self) -> None:
self.calls = 0
self.lock = threading.Lock()
def __call__(self, message) -> None:
message.ack()
# Only increment the number of calls **after** finishing.
with self.lock:
self.calls += 1
@pytest.mark.skipif(
sys.version_info >= (3, 15),
reason="Avoiding CircleCI 'Too long with no output' error",
)
class TestPubSubSubscribe(_TraceContextMixin):
@classmethod
def setup_class(cls) -> None:
cls.publisher = PublisherClient()
cls.subscriber = SubscriberClient()
cls.tracer = get_tracer()
@pytest.fixture(autouse=True)
def _resource(self) -> Generator[None, None, None]:
self.recorder = self.tracer.span_processor
self.recorder.clear_spans()
self.project_id = "test-project"
self.topic_name = "test-topic"
self.subscription_name = "test-subscription"
# setup topic_path & topic
self.topic_path = self.publisher.topic_path(self.project_id, self.topic_name)
try:
self.publisher.create_topic(request={"name": self.topic_path})
except AlreadyExists:
self.publisher.delete_topic(request={"topic": self.topic_path})
self.publisher.create_topic(request={"name": self.topic_path})
# setup subscription path & attach subscription
self.subscription_path = self.subscriber.subscription_path(
self.project_id,
self.subscription_name,
)
try:
self.subscriber.create_subscription(
request={"name": self.subscription_path, "topic": self.topic_path}
)
except AlreadyExists:
self.subscriber.delete_subscription(
request={"subscription": self.subscription_path}
)
self.subscriber.create_subscription(
request={"name": self.subscription_path, "topic": self.topic_path}
)
yield
self.publisher.delete_topic(request={"topic": self.topic_path})
self.subscriber.delete_subscription(
request={"subscription": self.subscription_path}
)
def test_subscribe(self) -> None:
with self.tracer.start_as_current_span("test"):
# Publish a message
future = self.publisher.publish(
self.topic_path, b"Test Message to PubSub", origin="instana"
)
assert isinstance(future.result(), str)
time.sleep(2.0) # for sanity
# Subscribe to the subscription
callback_handler = AckCallback()
future = self.subscriber.subscribe(self.subscription_path, callback_handler)
timeout = 2.0
try:
future.result(timeout)
except exceptions.TimeoutError:
future.cancel()
spans = self.recorder.queued_spans()
producer_span = spans[0]
consumer_span = spans[1]
test_span = spans[2]
assert len(spans) == 3
current_span = get_current_span()
assert not current_span.is_recording()
assert producer_span.data["gcps"]["op"] == "publish"
assert consumer_span.data["gcps"]["op"] == "consume"
assert self.topic_name == producer_span.data["gcps"]["top"]
assert self.subscription_name == consumer_span.data["gcps"]["sub"]
assert producer_span.k is SpanKind.CLIENT
assert consumer_span.k is SpanKind.SERVER
# Trace Context Propagation
self.assertTraceContextPropagated(producer_span, consumer_span)
self.assertTraceContextPropagated(test_span, producer_span)
# Error logging
self.assertErrorLogging(spans)