-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_consumer_properties.py
More file actions
76 lines (67 loc) · 2.75 KB
/
test_consumer_properties.py
File metadata and controls
76 lines (67 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import uuid
from natsrpy.js import (
JetStream,
PullConsumer,
PullConsumerConfig,
PushConsumer,
PushConsumerConfig,
StreamConfig,
)
async def test_pull_consumer_name_and_stream(js: JetStream) -> None:
stream_name = f"test-pcns-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
consumer = await stream.consumers.create(
PullConsumerConfig(name=consumer_name),
)
assert isinstance(consumer, PullConsumer)
assert consumer.name == consumer_name
assert consumer.stream_name == stream_name
finally:
await js.streams.delete(stream_name)
async def test_push_consumer_name_and_stream(js: JetStream) -> None:
stream_name = f"test-pushns-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
deliver_subj = uuid.uuid4().hex
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
consumer = await stream.consumers.create(
PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name),
)
assert isinstance(consumer, PushConsumer)
assert consumer.name == consumer_name
assert consumer.stream_name == stream_name
finally:
await js.streams.delete(stream_name)
async def test_pull_consumer_messages_iterator(js: JetStream) -> None:
stream_name = f"test-pullmsgiter-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"iter-msg-1", wait=True)
await js.publish(subj, b"iter-msg-2", wait=True)
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
msgs = await consumer.fetch(max_messages=2, timeout=5.0)
assert len(msgs) == 2
assert msgs[0].payload == b"iter-msg-1"
assert msgs[1].payload == b"iter-msg-2"
finally:
await js.streams.delete(stream_name)
async def test_pull_consumer_fetch_empty(js: JetStream) -> None:
stream_name = f"test-fetchempty-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
msgs = await consumer.fetch(max_messages=1, timeout=0.5)
assert len(msgs) == 0
finally:
await js.streams.delete(stream_name)