Skip to content

Commit d22a762

Browse files
authored
Added examples + some small fixes. (#36)
1 parent eb13b3d commit d22a762

File tree

17 files changed

+467
-13
lines changed

17 files changed

+467
-13
lines changed

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,11 @@ serde_json = "1.0.149"
2626
thiserror = "2.0.18"
2727
time = "0.3.47"
2828
tokio = { version = "1.50.0", features = ["full"] }
29+
30+
[profile.release]
31+
lto = "fat"
32+
codegen-units = 1
33+
opt-level = 3
34+
strip = true
35+
debug = false
36+
panic = "abort"

examples/consumers.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import asyncio
2+
3+
from natsrpy import Nats
4+
from natsrpy.js import PullConsumerConfig, PushConsumerConfig, StreamConfig
5+
6+
7+
async def main() -> None:
8+
"""Main function to run the example."""
9+
nats = Nats(["nats://localhost:4222"])
10+
await nats.startup()
11+
12+
js = await nats.jetstream()
13+
14+
stream = await js.streams.create_or_update(
15+
StreamConfig(
16+
name="stream-example",
17+
subjects=["stream.example.>"],
18+
description="Stream example",
19+
),
20+
)
21+
22+
# Push and pull consumers have different configurations.
23+
# If you supply PushConsumerConfig, you will get a push consumer,
24+
# and otherwise you will get a PullConsumer.
25+
#
26+
# They have different APIs.
27+
pull_consumer = await stream.consumers.create(
28+
PullConsumerConfig(
29+
name="example-pull",
30+
durable_name="example-pull",
31+
),
32+
)
33+
push_consumer = await stream.consumers.create(
34+
PushConsumerConfig(
35+
name="example-push",
36+
deliver_subject="example-push",
37+
durable_name="example-push",
38+
),
39+
)
40+
41+
# We publish a single message
42+
await js.publish("stream.example.test", "message for stream")
43+
44+
# We use messages() to get async iterator which we
45+
# use to get messages for push_consumer.
46+
async for push_message in await push_consumer.messages():
47+
print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201
48+
await push_message.ack()
49+
break
50+
51+
# Pull consumers have to request batches of messages.
52+
for pull_message in await pull_consumer.fetch(max_messages=10):
53+
print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201
54+
await pull_message.ack()
55+
56+
# Cleanup
57+
await stream.consumers.delete(push_consumer.name)
58+
await stream.consumers.delete(pull_consumer.name)
59+
await js.streams.delete(stream.name)
60+
61+
# Don't forget to call shutdown.
62+
await nats.shutdown()
63+
64+
65+
if __name__ == "__main__":
66+
asyncio.run(main())

examples/counters.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
3+
from natsrpy import Nats
4+
from natsrpy.js import CountersConfig
5+
6+
7+
async def main() -> None:
8+
"""Main function to run the example."""
9+
nats = Nats(["nats://localhost:4222"])
10+
await nats.startup()
11+
12+
js = await nats.jetstream()
13+
# Counters store is basically a stream,
14+
# but each subject is considered as a counter.
15+
# You can read more about how it works here:
16+
# https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-49.md
17+
counters = await js.counters.create_or_update(
18+
CountersConfig(
19+
name="counters",
20+
subjects=["counters.>"],
21+
),
22+
)
23+
24+
# We have a nice interface for counters.
25+
# Please note, that ADD accepts
26+
# positive and NEGATIVE values.
27+
# I'd rename this method, but this API is
28+
# defined in ADR-49, so we won't change this.
29+
#
30+
# Each add\incr\decr returns current value of the counter.
31+
await counters.add("counters.one", +8)
32+
await counters.add("counters.one", -2)
33+
# Increase by 1
34+
await counters.incr("counters.one")
35+
# Decrease by 1
36+
await counters.decr("counters.one")
37+
38+
print(await counters.get("counters.one")) # noqa: T201
39+
40+
# Don't forget to call shutdown.
41+
await nats.shutdown()
42+
43+
44+
if __name__ == "__main__":
45+
asyncio.run(main())

examples/kv.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
3+
from natsrpy import Nats
4+
from natsrpy.js import KVConfig
5+
6+
7+
async def main() -> None:
8+
"""Main function to run the example."""
9+
nats = Nats(["nats://localhost:4222"])
10+
await nats.startup()
11+
12+
js = await nats.jetstream()
13+
14+
kv = await js.kv.create_or_update(KVConfig(bucket="kv-example"))
15+
16+
watcher = await kv.watch("test-key")
17+
18+
await kv.put("test-key", "one")
19+
await kv.put("test-key", b"two")
20+
21+
# To obtain bytes value.
22+
value = await kv.get("test-key")
23+
if value:
24+
print("[VALUE]", value.decode()) # noqa: T201
25+
# To get kv-entry with all
26+
# the metadata.
27+
entry = await kv.entry("test-key")
28+
if entry:
29+
print("[ENTRY]", entry) # noqa: T201
30+
31+
await kv.delete("test-key")
32+
33+
# Alternatively you can
34+
# use await watcher.next()
35+
async for event in watcher:
36+
print("[EVENT]", event) # noqa: T201
37+
break
38+
39+
await js.kv.delete(kv.name)
40+
41+
# Don't forget to call shutdown.
42+
await nats.shutdown()
43+
44+
45+
if __name__ == "__main__":
46+
asyncio.run(main())

examples/object_store.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
import io
3+
from datetime import timedelta
4+
from pathlib import Path
5+
6+
from natsrpy import Nats
7+
from natsrpy.js import ObjectStoreConfig
8+
9+
10+
async def main() -> None:
11+
"""Main function to run the example."""
12+
nats = Nats(["nats://localhost:4222"])
13+
await nats.startup()
14+
15+
js = await nats.jetstream()
16+
17+
store = await js.object_store.create(
18+
ObjectStoreConfig(
19+
bucket="example-bucket",
20+
max_age=timedelta(minutes=1),
21+
),
22+
)
23+
await store.put("test2.py", Path(__file__).read_bytes())
24+
await store.put("test.py", str(Path(__file__)))
25+
26+
async for obj in await store.list():
27+
print(obj) # noqa: T201
28+
# We use BytesIO, since
29+
# get takes writer as it's argument.
30+
#
31+
# That happens because files can be very large,
32+
# and this approach allows us to stream directly
33+
# to files. using `open('file', 'wb') as output:`
34+
with io.BytesIO() as output:
35+
await store.get(obj.name, output)
36+
assert obj.size == len(output.getvalue()) # noqa: S101
37+
38+
await store.delete(obj.name)
39+
40+
# Don't forget to call shutdown.
41+
await nats.shutdown()
42+
43+
44+
if __name__ == "__main__":
45+
asyncio.run(main())

examples/request_reply.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import asyncio
2+
3+
from natsrpy import Message, Nats
4+
5+
6+
async def main() -> None:
7+
"""Main function to run the example."""
8+
nats = Nats(["nats://localhost:4222"])
9+
await nats.startup()
10+
subj = "request-reply"
11+
12+
# Here we create responder, that will be
13+
# answering to our requests.
14+
async def responder(message: Message) -> None:
15+
print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201
16+
if message.reply:
17+
await nats.publish(
18+
message.reply,
19+
f"reply to {message.payload}",
20+
headers=message.headers,
21+
)
22+
23+
# Start responder using callback-based subsciption.
24+
sub = await nats.subscribe(subj, callback=responder)
25+
# Send 3 concurrent requests.
26+
responses = await asyncio.gather(
27+
nats.request(subj, "request1"),
28+
nats.request(subj, "request2", headers={"header": "value"}),
29+
nats.request(subj, "request3", inbox="test-inbox"),
30+
)
31+
# Disconnect resonder.
32+
await sub.drain()
33+
34+
# Iterate over replies.
35+
for resp in responses:
36+
print(f"[RESPONSE]: {resp}") # noqa: T201
37+
38+
await nats.shutdown()
39+
40+
41+
if __name__ == "__main__":
42+
asyncio.run(main())

examples/simple_publish.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
3+
from natsrpy import Nats
4+
5+
6+
async def main() -> None:
7+
"""Main function to run the example."""
8+
nats = Nats(["nats://localhost:4222"])
9+
await nats.startup()
10+
11+
# Here we initiate subscription.
12+
# We do it before sending messages,
13+
# in order to catch them once we will start reading.
14+
subscription = await nats.subscribe("hello")
15+
16+
# Publish accepts str | bytes | bytearray | memoryview
17+
await nats.publish("hello", "str world")
18+
await nats.publish("hello", b"bytes world")
19+
await nats.publish("hello", bytearray(b"bytearray world"))
20+
await nats.publish("hello", "headers", headers={"one": "two"})
21+
await nats.publish("hello", "headers", headers={"one": ["two", "three"]})
22+
23+
# Calling this method will unsubscribe us,
24+
# after `n` delivered messages.
25+
# or immediately if `n` is not provided.
26+
subscription.unsubscribe(limit=5)
27+
async for message in subscription:
28+
print(message) # noqa: T201
29+
30+
# Don't forget to call shutdown.
31+
await nats.shutdown()
32+
33+
34+
if __name__ == "__main__":
35+
asyncio.run(main())

examples/subscriptions.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import asyncio
2+
3+
from natsrpy import Message, Nats
4+
5+
6+
async def main() -> None:
7+
"""Main function to run the example."""
8+
nats = Nats(["nats://localhost:4222"])
9+
await nats.startup()
10+
11+
cb_lock = asyncio.Event()
12+
13+
async def callback(message: Message) -> None:
14+
print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201
15+
cb_lock.set()
16+
17+
# When subscribing you can set callback.
18+
# In that case CallbackSubscription is returned.
19+
# This type of subscription cannot be iterated.
20+
cb_sub = await nats.subscribe("cb-subj", callback=callback)
21+
22+
# When callback is not set, you get a subscription
23+
# that should be used along with `async for`
24+
# loop, or alternatively you can call
25+
# `await iter_sub.next()` to get a single message.
26+
iter_sub = await nats.subscribe("iter-subj")
27+
28+
# Subscriptions with queue argument create
29+
# subscription with a queue group to distribute
30+
# messages along all subscribers.
31+
queue_sub = await nats.subscribe("queue-subj", queue="example-queue")
32+
33+
await nats.publish("cb-subj", "message for callback")
34+
await nats.publish("iter-subj", "message for iterator")
35+
await nats.publish("queue-subj", "message for queue sub")
36+
37+
# We can unsubscribe after a particular amount of messages.
38+
await iter_sub.unsubscribe(limit=1)
39+
await cb_sub.unsubscribe(limit=1)
40+
await queue_sub.unsubscribe(limit=1)
41+
42+
async for message in iter_sub:
43+
print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201
44+
45+
async for message in queue_sub:
46+
print(f"[FROM_QUEUED] {message.payload}") # noqa: T201
47+
48+
# Making sure that the message in callback is received.
49+
await cb_lock.wait()
50+
51+
# Don't forget to call shutdown.
52+
await nats.shutdown()
53+
54+
55+
if __name__ == "__main__":
56+
asyncio.run(main())

python/natsrpy/_natsrpy_rs/__init__.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,14 @@ class Nats:
206206
self,
207207
subject: str,
208208
callback: Callable[[Message], Awaitable[None]],
209+
queue: str | None = None,
209210
) -> Future[CallbackSubscription]: ...
210211
@overload
211212
def subscribe(
212213
self,
213214
subject: str,
214215
callback: None = None,
216+
queue: str | None = None,
215217
) -> Future[IteratorSubscription]: ...
216218
def jetstream(
217219
self,

python/natsrpy/_natsrpy_rs/js/consumers.pyi

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,14 @@ class PushConsumer:
300300
Messages are delivered by the server to a specified subject.
301301
"""
302302

303+
@property
304+
def name(self) -> str:
305+
"""Get consumer name."""
306+
307+
@property
308+
def stream_name(self) -> str:
309+
"""Get stream name that this consumer attached to."""
310+
303311
def messages(self) -> Future[MessagesIterator]:
304312
"""Get an async iterator for consuming messages.
305313
@@ -313,6 +321,14 @@ class PullConsumer:
313321
Messages are fetched on demand in batches by the client.
314322
"""
315323

324+
@property
325+
def name(self) -> str:
326+
"""Get consumer name."""
327+
328+
@property
329+
def stream_name(self) -> str:
330+
"""Get stream name that this consumer attached to."""
331+
316332
def fetch(
317333
self,
318334
max_messages: int | None = None,

0 commit comments

Comments
 (0)