Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 7a913fc

Browse files
authored
Using pytest fixtures in Pub/Sub system tests. (#4440)
Also - Using `topic_path()` and `subscription_path()` (along with `unique_resource_id()`) rather than a hard-coded path in `_resource_name()` - Dropping the usage of `mock` in favor of `__call__`-able types - Re-using the **same** publisher and subscriber via module-level fixtures - Adding a `cleanup` fixture instead of the `try/finally` indirection - Making sure that `call_count` (now named `calls`) is thread safe - Cleaning up subscriptions that are created (they were previously leaked)
1 parent 68ce222 commit 7a913fc

1 file changed

Lines changed: 184 additions & 138 deletions

File tree

tests/system.py

Lines changed: 184 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -15,154 +15,200 @@
1515
from __future__ import absolute_import
1616

1717
import datetime
18+
import threading
1819
import time
19-
import uuid
2020

21-
import mock
21+
import pytest
2222
import six
2323

24-
from google import auth
24+
import google.auth
2525
from google.cloud import pubsub_v1
2626

2727

28-
def _resource_name(resource_type):
29-
"""Return a randomly selected name for a resource.
28+
from test_utils.system import unique_resource_id
3029

31-
Args:
32-
resource_type (str): The resource for which a name is being
33-
generated. Should be singular (e.g. "topic", "subscription")
34-
"""
35-
return 'projects/{project}/{resource_type}s/st-n{random}'.format(
36-
project=auth.default()[1],
37-
random=str(uuid.uuid4())[0:8],
38-
resource_type=resource_type,
39-
)
4030

31+
@pytest.fixture(scope=u'module')
32+
def project():
33+
_, default_project = google.auth.default()
34+
yield default_project
35+
36+
37+
@pytest.fixture(scope=u'module')
38+
def publisher():
39+
yield pubsub_v1.PublisherClient()
40+
41+
42+
@pytest.fixture(scope=u'module')
43+
def subscriber():
44+
yield pubsub_v1.SubscriberClient()
45+
46+
47+
@pytest.fixture
48+
def topic_path(project, publisher):
49+
topic_name = 't' + unique_resource_id('-')
50+
yield publisher.topic_path(project, topic_name)
4151

42-
def test_publish_messages():
43-
publisher = pubsub_v1.PublisherClient()
44-
topic_name = _resource_name('topic')
45-
futures = []
4652

47-
try:
48-
publisher.create_topic(topic_name)
49-
for i in range(0, 500):
50-
futures.append(
51-
publisher.publish(
52-
topic_name,
53-
b'The hail in Wales falls mainly on the snails.',
54-
num=str(i),
55-
),
56-
)
57-
for future in futures:
58-
result = future.result()
59-
assert isinstance(result, (six.text_type, six.binary_type))
60-
finally:
61-
publisher.delete_topic(topic_name)
62-
63-
64-
def test_subscribe_to_messages():
65-
publisher = pubsub_v1.PublisherClient()
66-
subscriber = pubsub_v1.SubscriberClient()
67-
topic_name = _resource_name('topic')
68-
sub_name = _resource_name('subscription')
69-
70-
try:
71-
# Create a topic.
72-
publisher.create_topic(topic_name)
73-
74-
# Subscribe to the topic. This must happen before the messages
75-
# are published.
76-
subscriber.create_subscription(sub_name, topic_name)
77-
subscription = subscriber.subscribe(sub_name)
78-
79-
# Publish some messages.
80-
futures = [publisher.publish(
81-
topic_name,
53+
@pytest.fixture
54+
def subscription_path(project, subscriber):
55+
sub_name = 's' + unique_resource_id('-')
56+
yield subscriber.subscription_path(project, sub_name)
57+
58+
59+
@pytest.fixture
60+
def cleanup():
61+
registry = []
62+
yield registry
63+
64+
# Perform all clean up.
65+
for to_call, argument in registry:
66+
to_call(argument)
67+
68+
69+
def test_publish_messages(publisher, topic_path, cleanup):
70+
futures = []
71+
# Make sure the topic gets deleted.
72+
cleanup.append((publisher.delete_topic, topic_path))
73+
74+
publisher.create_topic(topic_path)
75+
for index in six.moves.range(500):
76+
futures.append(
77+
publisher.publish(
78+
topic_path,
79+
b'The hail in Wales falls mainly on the snails.',
80+
num=str(index),
81+
),
82+
)
83+
84+
for future in futures:
85+
result = future.result()
86+
assert isinstance(result, six.string_types)
87+
88+
89+
def test_subscribe_to_messages(
90+
publisher, topic_path, subscriber, subscription_path, cleanup):
91+
# Make sure the topic and subscription get deleted.
92+
cleanup.append((publisher.delete_topic, topic_path))
93+
cleanup.append((subscriber.delete_subscription, subscription_path))
94+
95+
# Create a topic.
96+
publisher.create_topic(topic_path)
97+
98+
# Subscribe to the topic. This must happen before the messages
99+
# are published.
100+
subscriber.create_subscription(subscription_path, topic_path)
101+
subscription = subscriber.subscribe(subscription_path)
102+
103+
# Publish some messages.
104+
futures = [
105+
publisher.publish(
106+
topic_path,
82107
b'Wooooo! The claaaaaw!',
83-
num=str(i),
84-
) for i in range(0, 50)]
85-
86-
# Make sure the publish completes.
87-
[f.result() for f in futures]
88-
89-
# The callback should process the message numbers to prove
90-
# that we got everything at least once.
91-
callback = mock.Mock(wraps=lambda message: message.ack())
92-
93-
# Actually open the subscription and hold it open for a few seconds.
94-
subscription.open(callback)
95-
for second in range(0, 10):
96-
time.sleep(1)
97-
98-
# The callback should have fired at least fifty times, but it
99-
# may take some time.
100-
if callback.call_count >= 50:
101-
return
102-
103-
# Okay, we took too long; fail out.
104-
assert callback.call_count >= 50
105-
finally:
106-
publisher.delete_topic(topic_name)
107-
108-
109-
def test_subscribe_to_messages_async_callbacks():
110-
publisher = pubsub_v1.PublisherClient()
111-
subscriber = pubsub_v1.SubscriberClient()
112-
topic_name = _resource_name('topic')
113-
sub_name = _resource_name('subscription')
114-
115-
try:
116-
# Create a topic.
117-
publisher.create_topic(topic_name)
118-
119-
# Subscribe to the topic. This must happen before the messages
120-
# are published.
121-
subscriber.create_subscription(sub_name, topic_name)
122-
subscription = subscriber.subscribe(sub_name)
123-
124-
# Publish some messages.
125-
futures = [publisher.publish(
126-
topic_name,
108+
num=str(index),
109+
)
110+
for index in six.moves.range(50)
111+
]
112+
113+
# Make sure the publish completes.
114+
for future in futures:
115+
future.result()
116+
117+
# Actually open the subscription and hold it open for a few seconds.
118+
# The callback should process the message numbers to prove
119+
# that we got everything at least once.
120+
callback = AckCallback()
121+
subscription.open(callback)
122+
for second in six.moves.range(10):
123+
time.sleep(1)
124+
125+
# The callback should have fired at least fifty times, but it
126+
# may take some time.
127+
if callback.calls >= 50:
128+
return
129+
130+
# Okay, we took too long; fail out.
131+
assert callback.calls >= 50
132+
133+
134+
def test_subscribe_to_messages_async_callbacks(
135+
publisher, topic_path, subscriber, subscription_path, cleanup):
136+
# Make sure the topic and subscription get deleted.
137+
cleanup.append((publisher.delete_topic, topic_path))
138+
cleanup.append((subscriber.delete_subscription, subscription_path))
139+
140+
# Create a topic.
141+
publisher.create_topic(topic_path)
142+
143+
# Subscribe to the topic. This must happen before the messages
144+
# are published.
145+
subscriber.create_subscription(subscription_path, topic_path)
146+
subscription = subscriber.subscribe(subscription_path)
147+
148+
# Publish some messages.
149+
futures = [
150+
publisher.publish(
151+
topic_path,
127152
b'Wooooo! The claaaaaw!',
128-
num=str(i),
129-
) for i in range(0, 2)]
130-
131-
# Make sure the publish completes.
132-
[f.result() for f in futures]
133-
134-
# We want to make sure that the callback was called asynchronously. So
135-
# track when each call happened and make sure below.
136-
call_times = []
137-
138-
def process_message(message):
139-
# list.append() is thread-safe.
140-
call_times.append(datetime.datetime.now())
141-
time.sleep(2)
142-
message.ack()
143-
144-
callback = mock.Mock(wraps=process_message)
145-
side_effect = mock.Mock()
146-
callback.side_effect = side_effect
147-
148-
# Actually open the subscription and hold it open for a few seconds.
149-
subscription.open(callback)
150-
for second in range(0, 5):
151-
time.sleep(4)
152-
153-
# The callback should have fired at least two times, but it may
154-
# take some time.
155-
if callback.call_count >= 2 and side_effect.call_count >= 2:
156-
first = min(call_times[:2])
157-
last = max(call_times[:2])
158-
diff = last - first
159-
# "Ensure" the first two callbacks were executed asynchronously
160-
# (sequentially would have resulted in a difference of 2+
161-
# seconds).
162-
assert diff.days == 0
163-
assert diff.seconds < 2
164-
165-
# Okay, we took too long; fail out.
166-
assert callback.call_count >= 2
167-
finally:
168-
publisher.delete_topic(topic_name)
153+
num=str(index),
154+
)
155+
for index in six.moves.range(2)
156+
]
157+
158+
# Make sure the publish completes.
159+
for future in futures:
160+
future.result()
161+
162+
# We want to make sure that the callback was called asynchronously. So
163+
# track when each call happened and make sure below.
164+
callback = TimesCallback(2)
165+
166+
# Actually open the subscription and hold it open for a few seconds.
167+
subscription.open(callback)
168+
for second in six.moves.range(5):
169+
time.sleep(4)
170+
171+
# The callback should have fired at least two times, but it may
172+
# take some time.
173+
if callback.calls >= 2:
174+
first, last = sorted(callback.call_times[:2])
175+
diff = last - first
176+
# "Ensure" the first two callbacks were executed asynchronously
177+
# (sequentially would have resulted in a difference of 2+
178+
# seconds).
179+
assert diff.days == 0
180+
assert diff.seconds < callback.sleep_time
181+
182+
# Okay, we took too long; fail out.
183+
assert callback.calls >= 2
184+
185+
186+
class AckCallback(object):
187+
188+
def __init__(self):
189+
self.calls = 0
190+
191+
def __call__(self, message):
192+
message.ack()
193+
# Only increment the number of calls **after** finishing.
194+
with threading.Lock():
195+
self.calls += 1
196+
197+
198+
class TimesCallback(object):
199+
200+
def __init__(self, sleep_time):
201+
self.sleep_time = sleep_time
202+
self.calls = 0
203+
self.call_times = []
204+
205+
def __call__(self, message):
206+
now = datetime.datetime.now()
207+
time.sleep(self.sleep_time)
208+
message.ack()
209+
# Only increment the number of calls **after** finishing.
210+
with threading.Lock():
211+
# list.append() is thread-safe, but we still wait until
212+
# ``calls`` is incremented to do it.
213+
self.call_times.append(now)
214+
self.calls += 1

0 commit comments

Comments
 (0)