-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_consumers_manager.py
More file actions
107 lines (94 loc) · 3.99 KB
/
test_consumers_manager.py
File metadata and controls
107 lines (94 loc) · 3.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import uuid
from datetime import timedelta
from natsrpy.js import (
JetStream,
PullConsumer,
PullConsumerConfig,
PushConsumer,
PushConsumerConfig,
StreamConfig,
)
async def test_consumers_manager_delete(js: JetStream) -> None:
stream_name = f"test-cmdel-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
result = await stream.consumers.delete(consumer_name)
assert result is True
finally:
await js.streams.delete(stream_name)
async def test_consumers_manager_get_push(js: JetStream) -> None:
stream_name = f"test-cmgp-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
deliver_subj = uuid.uuid4().hex
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(
PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name),
)
consumer = await stream.consumers.get_push(consumer_name)
assert isinstance(consumer, PushConsumer)
assert consumer.name == consumer_name
finally:
await js.streams.delete(stream_name)
async def test_consumers_manager_update_pull(js: JetStream) -> None:
stream_name = f"test-cmup-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
cfg = PullConsumerConfig(name=consumer_name, description="original")
await stream.consumers.create(cfg)
cfg.description = "updated"
updated = await stream.consumers.update(cfg)
assert isinstance(updated, PullConsumer)
finally:
await js.streams.delete(stream_name)
async def test_consumers_manager_update_push(js: JetStream) -> None:
stream_name = f"test-cmupp-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
deliver_subj = uuid.uuid4().hex
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
cfg = PushConsumerConfig(
deliver_subject=deliver_subj,
name=consumer_name,
description="original",
)
await stream.consumers.create(cfg)
cfg.description = "updated"
updated = await stream.consumers.update(cfg)
assert isinstance(updated, PushConsumer)
finally:
await js.streams.delete(stream_name)
async def test_consumers_manager_pause_and_resume(js: JetStream) -> None:
stream_name = f"test-cmpr-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
paused = await stream.consumers.pause(consumer_name, delay=60.0)
assert isinstance(paused, bool)
resumed = await stream.consumers.resume(consumer_name)
assert isinstance(resumed, bool)
finally:
await js.streams.delete(stream_name)
async def test_consumers_manager_pause_timedelta(js: JetStream) -> None:
stream_name = f"test-cmpd-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
paused = await stream.consumers.pause(
consumer_name,
delay=timedelta(seconds=60),
)
assert isinstance(paused, bool)
finally:
await js.streams.delete(stream_name)