Skip to content

Commit 5128362

Browse files
Copilots3rius
andauthored
Add tests to improve Rust code path coverage (#41)
Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
1 parent 6e82b6f commit 5128362

9 files changed

+619
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,4 @@ docs/_build/
7272
.python-version
7373
.venv/
7474
target/
75+
*.profraw
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import asyncio
2+
import uuid
3+
4+
from natsrpy import CallbackSubscription, Nats
5+
6+
7+
async def test_callback_unsubscribe(nats: Nats) -> None:
8+
subj = uuid.uuid4().hex
9+
10+
async def callback(msg: object) -> None:
11+
pass
12+
13+
sub = await nats.subscribe(subject=subj, callback=callback)
14+
assert isinstance(sub, CallbackSubscription)
15+
await sub.unsubscribe()
16+
17+
18+
async def test_callback_unsubscribe_with_limit(nats: Nats) -> None:
19+
subj = uuid.uuid4().hex
20+
received: list[bytes] = []
21+
event = asyncio.Event()
22+
23+
async def callback(msg: object) -> None:
24+
received.append(msg.payload) # type: ignore[attr-defined]
25+
if len(received) >= 2:
26+
event.set()
27+
28+
sub = await nats.subscribe(subject=subj, callback=callback)
29+
assert isinstance(sub, CallbackSubscription)
30+
await sub.unsubscribe(limit=2)
31+
await nats.publish(subj, b"msg-1")
32+
await nats.publish(subj, b"msg-2")
33+
await asyncio.wait_for(event.wait(), timeout=5.0)
34+
assert received == [b"msg-1", b"msg-2"]
35+
36+
37+
async def test_callback_drain(nats_url: str) -> None:
38+
client = Nats(addrs=[nats_url])
39+
await client.startup()
40+
subj = uuid.uuid4().hex
41+
42+
async def callback(msg: object) -> None:
43+
pass
44+
45+
sub = await client.subscribe(subject=subj, callback=callback)
46+
assert isinstance(sub, CallbackSubscription)
47+
await sub.drain()
48+
await client.shutdown()
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import uuid
2+
3+
from natsrpy.js import (
4+
JetStream,
5+
PullConsumer,
6+
PullConsumerConfig,
7+
PushConsumer,
8+
PushConsumerConfig,
9+
StreamConfig,
10+
)
11+
12+
13+
async def test_pull_consumer_name_and_stream(js: JetStream) -> None:
14+
stream_name = f"test-pcns-{uuid.uuid4().hex[:8]}"
15+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
16+
stream = await js.streams.create(config)
17+
try:
18+
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
19+
consumer = await stream.consumers.create(
20+
PullConsumerConfig(name=consumer_name),
21+
)
22+
assert isinstance(consumer, PullConsumer)
23+
assert consumer.name == consumer_name
24+
assert consumer.stream_name == stream_name
25+
finally:
26+
await js.streams.delete(stream_name)
27+
28+
29+
async def test_push_consumer_name_and_stream(js: JetStream) -> None:
30+
stream_name = f"test-pushns-{uuid.uuid4().hex[:8]}"
31+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
32+
stream = await js.streams.create(config)
33+
try:
34+
deliver_subj = uuid.uuid4().hex
35+
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
36+
consumer = await stream.consumers.create(
37+
PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name),
38+
)
39+
assert isinstance(consumer, PushConsumer)
40+
assert consumer.name == consumer_name
41+
assert consumer.stream_name == stream_name
42+
finally:
43+
await js.streams.delete(stream_name)
44+
45+
46+
async def test_pull_consumer_messages_iterator(js: JetStream) -> None:
47+
stream_name = f"test-pullmsgiter-{uuid.uuid4().hex[:8]}"
48+
subj = f"{stream_name}.data"
49+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
50+
stream = await js.streams.create(config)
51+
try:
52+
await js.publish(subj, b"iter-msg-1", wait=True)
53+
await js.publish(subj, b"iter-msg-2", wait=True)
54+
consumer = await stream.consumers.create(
55+
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
56+
)
57+
msgs = await consumer.fetch(max_messages=2, timeout=5.0)
58+
assert len(msgs) == 2
59+
assert msgs[0].payload == b"iter-msg-1"
60+
assert msgs[1].payload == b"iter-msg-2"
61+
finally:
62+
await js.streams.delete(stream_name)
63+
64+
65+
async def test_pull_consumer_fetch_empty(js: JetStream) -> None:
66+
stream_name = f"test-fetchempty-{uuid.uuid4().hex[:8]}"
67+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
68+
stream = await js.streams.create(config)
69+
try:
70+
consumer = await stream.consumers.create(
71+
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
72+
)
73+
msgs = await consumer.fetch(max_messages=1, timeout=0.5)
74+
assert len(msgs) == 0
75+
finally:
76+
await js.streams.delete(stream_name)
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import uuid
2+
from datetime import timedelta
3+
4+
from natsrpy.js import (
5+
JetStream,
6+
PullConsumer,
7+
PullConsumerConfig,
8+
PushConsumer,
9+
PushConsumerConfig,
10+
StreamConfig,
11+
)
12+
13+
14+
async def test_consumers_manager_delete(js: JetStream) -> None:
15+
stream_name = f"test-cmdel-{uuid.uuid4().hex[:8]}"
16+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
17+
stream = await js.streams.create(config)
18+
try:
19+
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
20+
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
21+
result = await stream.consumers.delete(consumer_name)
22+
assert result is True
23+
finally:
24+
await js.streams.delete(stream_name)
25+
26+
27+
async def test_consumers_manager_get_push(js: JetStream) -> None:
28+
stream_name = f"test-cmgp-{uuid.uuid4().hex[:8]}"
29+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
30+
stream = await js.streams.create(config)
31+
try:
32+
deliver_subj = uuid.uuid4().hex
33+
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
34+
await stream.consumers.create(
35+
PushConsumerConfig(deliver_subject=deliver_subj, name=consumer_name),
36+
)
37+
consumer = await stream.consumers.get_push(consumer_name)
38+
assert isinstance(consumer, PushConsumer)
39+
assert consumer.name == consumer_name
40+
finally:
41+
await js.streams.delete(stream_name)
42+
43+
44+
async def test_consumers_manager_update_pull(js: JetStream) -> None:
45+
stream_name = f"test-cmup-{uuid.uuid4().hex[:8]}"
46+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
47+
stream = await js.streams.create(config)
48+
try:
49+
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
50+
cfg = PullConsumerConfig(name=consumer_name, description="original")
51+
await stream.consumers.create(cfg)
52+
cfg.description = "updated"
53+
updated = await stream.consumers.update(cfg)
54+
assert isinstance(updated, PullConsumer)
55+
finally:
56+
await js.streams.delete(stream_name)
57+
58+
59+
async def test_consumers_manager_update_push(js: JetStream) -> None:
60+
stream_name = f"test-cmupp-{uuid.uuid4().hex[:8]}"
61+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
62+
stream = await js.streams.create(config)
63+
try:
64+
deliver_subj = uuid.uuid4().hex
65+
consumer_name = f"push-{uuid.uuid4().hex[:8]}"
66+
cfg = PushConsumerConfig(
67+
deliver_subject=deliver_subj,
68+
name=consumer_name,
69+
description="original",
70+
)
71+
await stream.consumers.create(cfg)
72+
cfg.description = "updated"
73+
updated = await stream.consumers.update(cfg)
74+
assert isinstance(updated, PushConsumer)
75+
finally:
76+
await js.streams.delete(stream_name)
77+
78+
79+
async def test_consumers_manager_pause_and_resume(js: JetStream) -> None:
80+
stream_name = f"test-cmpr-{uuid.uuid4().hex[:8]}"
81+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
82+
stream = await js.streams.create(config)
83+
try:
84+
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
85+
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
86+
paused = await stream.consumers.pause(consumer_name, delay=60.0)
87+
assert isinstance(paused, bool)
88+
resumed = await stream.consumers.resume(consumer_name)
89+
assert isinstance(resumed, bool)
90+
finally:
91+
await js.streams.delete(stream_name)
92+
93+
94+
async def test_consumers_manager_pause_timedelta(js: JetStream) -> None:
95+
stream_name = f"test-cmpd-{uuid.uuid4().hex[:8]}"
96+
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
97+
stream = await js.streams.create(config)
98+
try:
99+
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
100+
await stream.consumers.create(PullConsumerConfig(name=consumer_name))
101+
paused = await stream.consumers.pause(
102+
consumer_name,
103+
delay=timedelta(seconds=60),
104+
)
105+
assert isinstance(paused, bool)
106+
finally:
107+
await js.streams.delete(stream_name)

0 commit comments

Comments
 (0)