Skip to content

Commit 0b26ace

Browse files
Copilots3rius
andauthored
Add tests for new Stream and ConsumersManager methods, export new types in js.py
Agent-Logs-Url: https://github.com/taskiq-python/natsrpy/sessions/86c923dd-8c47-4c2d-9068-513eedfff7db Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com>
1 parent e356f0e commit 0b26ace

File tree

2 files changed

+188
-0
lines changed

2 files changed

+188
-0
lines changed

python/natsrpy/js.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
KVOperation,
2121
KVStatus,
2222
)
23+
from ._natsrpy_rs.js.managers import ConsumersIterator, ConsumersNamesIterator
2324
from ._natsrpy_rs.js.object_store import (
2425
ObjectInfo,
2526
ObjectInfoIterator,
@@ -54,6 +55,8 @@
5455
"ClusterInfo",
5556
"Compression",
5657
"ConsumerLimits",
58+
"ConsumersIterator",
59+
"ConsumersNamesIterator",
5760
"CounterEntry",
5861
"Counters",
5962
"CountersConfig",
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import uuid
2+
3+
from natsrpy.js import (
4+
JetStream,
5+
PullConsumer,
6+
PullConsumerConfig,
7+
PushConsumer,
8+
PushConsumerConfig,
9+
StreamConfig,
10+
)
11+
12+
13+
async def test_stream_direct_get_next_for_subject(js: JetStream) -> None:
14+
name = f"test-dgnfs-{uuid.uuid4().hex[:8]}"
15+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
16+
stream = await js.streams.create(config)
17+
try:
18+
await js.publish(f"{name}.a", b"msg-a-1", wait=True)
19+
await js.publish(f"{name}.a", b"msg-a-2", wait=True)
20+
await js.publish(f"{name}.b", b"msg-b-1", wait=True)
21+
msg = await stream.direct_get_next_for_subject(f"{name}.a")
22+
assert msg.payload == b"msg-a-1"
23+
assert msg.subject == f"{name}.a"
24+
finally:
25+
await js.streams.delete(name)
26+
27+
28+
async def test_stream_direct_get_next_for_subject_with_sequence(
29+
js: JetStream,
30+
) -> None:
31+
name = f"test-dgnfss-{uuid.uuid4().hex[:8]}"
32+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
33+
stream = await js.streams.create(config)
34+
try:
35+
await js.publish(f"{name}.a", b"msg-a-1", wait=True)
36+
await js.publish(f"{name}.a", b"msg-a-2", wait=True)
37+
await js.publish(f"{name}.b", b"msg-b-1", wait=True)
38+
msg = await stream.direct_get_next_for_subject(f"{name}.a", sequence=2)
39+
assert msg.payload == b"msg-a-2"
40+
finally:
41+
await js.streams.delete(name)
42+
43+
44+
async def test_stream_direct_get_first_for_subject(js: JetStream) -> None:
45+
name = f"test-dgffs-{uuid.uuid4().hex[:8]}"
46+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
47+
stream = await js.streams.create(config)
48+
try:
49+
await js.publish(f"{name}.a", b"first-msg", wait=True)
50+
await js.publish(f"{name}.a", b"second-msg", wait=True)
51+
msg = await stream.direct_get_first_for_subject(f"{name}.a")
52+
assert msg.payload == b"first-msg"
53+
assert msg.subject == f"{name}.a"
54+
assert msg.sequence == 1
55+
finally:
56+
await js.streams.delete(name)
57+
58+
59+
async def test_stream_direct_get_last_for_subject(js: JetStream) -> None:
60+
name = f"test-dglfs-{uuid.uuid4().hex[:8]}"
61+
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
62+
stream = await js.streams.create(config)
63+
try:
64+
await js.publish(f"{name}.a", b"first-msg", wait=True)
65+
await js.publish(f"{name}.a", b"last-msg", wait=True)
66+
msg = await stream.direct_get_last_for_subject(f"{name}.a")
67+
assert msg.payload == b"last-msg"
68+
assert msg.subject == f"{name}.a"
69+
assert msg.sequence == 2
70+
finally:
71+
await js.streams.delete(name)
72+
73+
74+
async def test_stream_delete_message(js: JetStream) -> None:
75+
name = f"test-delmsg-{uuid.uuid4().hex[:8]}"
76+
subj = f"{name}.data"
77+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
78+
stream = await js.streams.create(config)
79+
try:
80+
await js.publish(subj, b"msg-1", wait=True)
81+
await js.publish(subj, b"msg-2", wait=True)
82+
await js.publish(subj, b"msg-3", wait=True)
83+
info = await stream.get_info()
84+
assert info.state.messages == 3
85+
86+
await stream.delete_message(sequence=2)
87+
88+
info = await stream.get_info()
89+
assert info.state.messages == 2
90+
finally:
91+
await js.streams.delete(name)
92+
93+
94+
async def test_consumers_list(js: JetStream) -> None:
95+
name = f"test-clist-{uuid.uuid4().hex[:8]}"
96+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
97+
stream = await js.streams.create(config)
98+
try:
99+
consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}"
100+
consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}"
101+
await stream.consumers.create(PullConsumerConfig(name=consumer_name1))
102+
await stream.consumers.create(PullConsumerConfig(name=consumer_name2))
103+
104+
consumers_iter = await stream.consumers.list()
105+
found = []
106+
async for consumer in consumers_iter:
107+
assert isinstance(consumer, (PullConsumer, PushConsumer))
108+
found.append(consumer)
109+
assert len(found) == 2
110+
finally:
111+
await js.streams.delete(name)
112+
113+
114+
async def test_consumers_list_returns_correct_types(js: JetStream) -> None:
115+
name = f"test-cltype-{uuid.uuid4().hex[:8]}"
116+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
117+
stream = await js.streams.create(config)
118+
try:
119+
pull_name = f"pull-{uuid.uuid4().hex[:8]}"
120+
push_name = f"push-{uuid.uuid4().hex[:8]}"
121+
await stream.consumers.create(PullConsumerConfig(name=pull_name))
122+
deliver_subj = uuid.uuid4().hex
123+
await stream.consumers.create(
124+
PushConsumerConfig(deliver_subject=deliver_subj, name=push_name),
125+
)
126+
127+
consumers_iter = await stream.consumers.list()
128+
types_found: dict[str, type] = {}
129+
async for consumer in consumers_iter:
130+
if isinstance(consumer, PullConsumer):
131+
types_found["pull"] = type(consumer)
132+
elif isinstance(consumer, PushConsumer):
133+
types_found["push"] = type(consumer)
134+
assert types_found.get("pull") is PullConsumer
135+
assert types_found.get("push") is PushConsumer
136+
finally:
137+
await js.streams.delete(name)
138+
139+
140+
async def test_consumers_list_names(js: JetStream) -> None:
141+
name = f"test-clnames-{uuid.uuid4().hex[:8]}"
142+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
143+
stream = await js.streams.create(config)
144+
try:
145+
consumer_name1 = f"consumer-{uuid.uuid4().hex[:8]}"
146+
consumer_name2 = f"consumer-{uuid.uuid4().hex[:8]}"
147+
await stream.consumers.create(PullConsumerConfig(name=consumer_name1))
148+
await stream.consumers.create(PullConsumerConfig(name=consumer_name2))
149+
150+
names_iter = await stream.consumers.list_names()
151+
found_names: list[str] = []
152+
async for cname in names_iter:
153+
assert isinstance(cname, str)
154+
found_names.append(cname)
155+
assert sorted(found_names) == sorted([consumer_name1, consumer_name2])
156+
finally:
157+
await js.streams.delete(name)
158+
159+
160+
async def test_consumers_list_empty(js: JetStream) -> None:
161+
name = f"test-clempty-{uuid.uuid4().hex[:8]}"
162+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
163+
stream = await js.streams.create(config)
164+
try:
165+
consumers_iter = await stream.consumers.list()
166+
found = []
167+
async for consumer in consumers_iter:
168+
found.append(consumer)
169+
assert len(found) == 0
170+
finally:
171+
await js.streams.delete(name)
172+
173+
174+
async def test_consumers_list_names_empty(js: JetStream) -> None:
175+
name = f"test-clnempty-{uuid.uuid4().hex[:8]}"
176+
config = StreamConfig(name=name, subjects=[f"{name}.>"])
177+
stream = await js.streams.create(config)
178+
try:
179+
names_iter = await stream.consumers.list_names()
180+
found_names: list[str] = []
181+
async for cname in names_iter:
182+
found_names.append(cname)
183+
assert len(found_names) == 0
184+
finally:
185+
await js.streams.delete(name)

0 commit comments

Comments
 (0)