Skip to content

Commit 6f30ab7

Browse files
committed
Added missing API methods.
1 parent d4bc5a4 commit 6f30ab7

File tree

7 files changed

+358
-8
lines changed

7 files changed

+358
-8
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ name: "Testing package"
22

33
on:
44
push:
5+
pull_request:
56

67
jobs:
78
py-lint:

python/natsrpy/_natsrpy_rs/js/managers.pyi

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from datetime import timedelta
22
from typing import final, overload
33

4+
from typing_extensions import Self
5+
46
from .consumers import (
57
PullConsumer,
68
PullConsumerConfig,
@@ -13,13 +15,63 @@ from .object_store import ObjectStore, ObjectStoreConfig
1315
from .stream import Stream, StreamConfig
1416

1517
__all__ = [
18+
"ConsumersIterator",
1619
"ConsumersManager",
20+
"ConsumersNamesIterator",
1721
"CountersManager",
1822
"KVManager",
1923
"ObjectStoreManager",
2024
"StreamsManager",
2125
]
2226

27+
@final
28+
class ConsumersIterator:
29+
"""Async iterator over consumers subscribed to a stream.
30+
31+
Returned by :meth:`ConsumersManager.list`.
32+
Consumers can be received using ``async for`` or by calling :meth:`next`
33+
directly.
34+
35+
Consumer type is identified by its config. If it has deliver_subject set,
36+
then PushConsumer is returned.
37+
"""
38+
39+
def __aiter__(self) -> Self: ...
40+
async def __anext__(self) -> PullConsumer | PushConsumer: ...
41+
async def next(
42+
self,
43+
timeout: float | timedelta | None = None,
44+
) -> PullConsumer | PushConsumer:
45+
"""Receive the next consumer from the stream.
46+
47+
:param timeout: maximum time to wait for a message in seconds
48+
or as a timedelta, defaults to None (wait indefinitely).
49+
:return: the next consumer.
50+
:raises StopAsyncIteration: when the subscription is drained or
51+
unsubscribed.
52+
"""
53+
54+
@final
55+
class ConsumersNamesIterator:
56+
"""Async iterator over names of consumers subscribed to a stream.
57+
58+
Returned by :meth:`ConsumersManager.list_names`.
59+
Consumer names can be received using ``async for`` or by calling :meth:`next`
60+
directly.
61+
"""
62+
63+
def __aiter__(self) -> Self: ...
64+
async def __anext__(self) -> str: ...
65+
async def next(self, timeout: float | timedelta | None = None) -> str:
66+
"""Receive the next consumer name from the stream.
67+
68+
:param timeout: maximum time to wait for a message in seconds
69+
or as a timedelta, defaults to None (wait indefinitely).
70+
:return: the next consumer name.
71+
:raises StopAsyncIteration: when the subscription is drained or
72+
unsubscribed.
73+
"""
74+
2375
@final
2476
class StreamsManager:
2577
"""Manager for JetStream stream CRUD operations."""
@@ -185,6 +237,27 @@ class ConsumersManager:
185237
:return: True if the consumer was resumed.
186238
"""
187239

240+
async def list(self) -> ConsumersIterator:
241+
"""List consumers subscribed to the stream.
242+
243+
This method iterates over all consumers on a
244+
stream and retunrns correct types, by looking
245+
at their config.
246+
247+
If you only need names, use :meth:`ConsumersManager.list_names` instead.
248+
249+
:return: an async iterator over consumers.
250+
"""
251+
252+
async def list_names(self) -> ConsumersNamesIterator:
253+
"""List names of consumers subscribed to the stream.
254+
255+
This method iterates over all consumer names on a
256+
stream.
257+
258+
:return: an async iterator over consumer names.
259+
"""
260+
188261
@final
189262
class ObjectStoreManager:
190263
"""Manager for object store bucket operations."""

python/natsrpy/_natsrpy_rs/js/stream.pyi

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,10 @@ class Stream:
467467
accessing messages in the stream, as well as managing consumers.
468468
"""
469469

470+
@property
471+
def consumers(self) -> ConsumersManager:
472+
"""Manager for consumers bound to this stream."""
473+
470474
async def direct_get(
471475
self,
472476
sequence: int,
@@ -479,6 +483,45 @@ class Stream:
479483
:return: the stream message.
480484
"""
481485

486+
async def direct_get_next_for_subject(
487+
self,
488+
subject: str,
489+
sequence: int | None = None,
490+
timeout: float | timedelta | None = None,
491+
) -> StreamMessage:
492+
"""Get the next message for a subject directly from the stream.
493+
494+
:param subject: subject to get the next message for.
495+
:param sequence: optional sequence number to start searching from.
496+
If not provided, starts from the beginning of the stream.
497+
:param timeout: operation timeout.
498+
:return: the next stream message matching the subject filter.
499+
"""
500+
501+
async def direct_get_first_for_subject(
502+
self,
503+
subject: str,
504+
timeout: float | timedelta | None = None,
505+
) -> StreamMessage:
506+
"""Get the first message for a subject directly from the stream.
507+
508+
:param subject: subject to get the first message for.
509+
:param timeout: operation timeout.
510+
:return: the first stream message matching the subject filter.
511+
"""
512+
513+
async def direct_get_last_for_subject(
514+
self,
515+
subject: str,
516+
timeout: float | timedelta | None = None,
517+
) -> StreamMessage:
518+
"""Get the last message for a subject directly from the stream.
519+
520+
:param subject: subject to get the last message for.
521+
:param timeout: operation timeout.
522+
:return: the last stream message matching the subject filter.
523+
"""
524+
482525
async def get_info(self, timeout: float | datetime | None = None) -> StreamInfo:
483526
"""Get information about the stream.
484527
@@ -505,6 +548,13 @@ class Stream:
505548
:return: number of messages purged.
506549
"""
507550

508-
@property
509-
def consumers(self) -> ConsumersManager:
510-
"""Manager for consumers bound to this stream."""
551+
async def delete_message(
552+
self,
553+
sequence: int,
554+
timeout: float | datetime | None = None,
555+
) -> None:
556+
"""Delete a message from the stream by sequence number.
557+
558+
:param sequence: sequence number of the message to delete.
559+
:param timeout: operation timeout.
560+
"""

src/exceptions/rust_err.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,12 @@ pub enum NatsrpyError {
8585
#[error(transparent)]
8686
StreamLastRawMessageError(#[from] async_nats::jetstream::stream::LastRawMessageError),
8787
#[error(transparent)]
88-
PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError),
88+
StreamDeleteMessageError(#[from] async_nats::jetstream::stream::DeleteMessageError),
8989
#[error(transparent)]
9090
ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
9191
#[error(transparent)]
92+
PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError),
93+
#[error(transparent)]
9294
PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError),
9395
#[error(transparent)]
9496
PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError),

src/js/managers/consumers.rs

Lines changed: 152 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,144 @@
11
use std::{sync::Arc, time::Duration};
22

3-
use pyo3::{Bound, FromPyObject, IntoPyObjectExt, PyAny, Python};
4-
use tokio::sync::RwLock;
3+
use futures_util::StreamExt;
4+
use pyo3::{Bound, FromPyObject, IntoPyObjectExt, PyAny, PyRef, Python};
5+
use tokio::sync::{Mutex, RwLock};
56

67
use crate::{
78
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
89
js::consumers::{self, pull::PullConsumer, push::PushConsumer},
9-
utils::{natsrpy_future, py_types::TimeValue},
10+
utils::{
11+
futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue,
12+
streamer::Streamer,
13+
},
1014
};
1115

16+
#[pyo3::pyclass]
17+
pub struct ConsumersIterator {
18+
streamer: Arc<
19+
Mutex<
20+
Streamer<
21+
Result<
22+
async_nats::jetstream::consumer::Info,
23+
async_nats::jetstream::stream::ConsumersError,
24+
>,
25+
>,
26+
>,
27+
>,
28+
stream: Arc<RwLock<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>>,
29+
}
30+
31+
#[pyo3::pyclass]
32+
pub struct ConsumersNamesIterator {
33+
streamer: Arc<Mutex<Streamer<Result<String, async_nats::jetstream::stream::ConsumersError>>>>,
34+
}
35+
36+
impl ConsumersNamesIterator {
37+
#[must_use]
38+
pub fn new(
39+
streamer: Streamer<Result<String, async_nats::jetstream::stream::ConsumersError>>,
40+
) -> Self {
41+
Self {
42+
streamer: Arc::new(Mutex::new(streamer)),
43+
}
44+
}
45+
}
46+
47+
#[pyo3::pymethods]
48+
impl ConsumersNamesIterator {
49+
#[must_use]
50+
pub const fn __aiter__(slf: PyRef<Self>) -> PyRef<Self> {
51+
slf
52+
}
53+
54+
#[pyo3(signature=(timeout=None))]
55+
pub fn next<'py>(
56+
&self,
57+
py: Python<'py>,
58+
timeout: Option<TimeValue>,
59+
) -> NatsrpyResult<Bound<'py, PyAny>> {
60+
let ctx = self.streamer.clone();
61+
natsrpy_future_with_timeout(py, timeout, async move {
62+
let value = ctx.lock().await.next().await;
63+
match value {
64+
Some(name) => Ok(name?),
65+
None => Err(NatsrpyError::AsyncStopIteration),
66+
}
67+
})
68+
}
69+
70+
pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
71+
self.next(py, None)
72+
}
73+
}
74+
75+
impl ConsumersIterator {
76+
pub fn new(
77+
stream: Arc<
78+
RwLock<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>,
79+
>,
80+
streamer: Streamer<
81+
Result<
82+
async_nats::jetstream::consumer::Info,
83+
async_nats::jetstream::stream::ConsumersError,
84+
>,
85+
>,
86+
) -> Self {
87+
Self {
88+
stream,
89+
streamer: Arc::new(Mutex::new(streamer)),
90+
}
91+
}
92+
}
93+
94+
#[pyo3::pymethods]
95+
impl ConsumersIterator {
96+
#[must_use]
97+
pub const fn __aiter__(slf: PyRef<Self>) -> PyRef<Self> {
98+
slf
99+
}
100+
101+
#[pyo3(signature=(timeout=None))]
102+
pub fn next<'py>(
103+
&self,
104+
py: Python<'py>,
105+
timeout: Option<TimeValue>,
106+
) -> NatsrpyResult<Bound<'py, PyAny>> {
107+
let ctx = self.streamer.clone();
108+
let stream = self.stream.clone();
109+
natsrpy_future_with_timeout(py, timeout, async move {
110+
let value = ctx.lock().await.next().await;
111+
match value {
112+
Some(info) => {
113+
let info = info?;
114+
let Some(consumer_name) = info.config.name else {
115+
return Err(NatsrpyError::SessionError(String::from(
116+
"Received consumer without a name.",
117+
)));
118+
};
119+
// That means that the consumer is PushBased.
120+
if info.config.deliver_subject.is_some() {
121+
let consumer = consumers::push::consumer::PushConsumer::new(
122+
stream.read().await.get_consumer(&consumer_name).await?,
123+
);
124+
Ok(Python::attach(|py| consumer.into_py_any(py))?)
125+
} else {
126+
let consumer = consumers::pull::consumer::PullConsumer::new(
127+
stream.read().await.get_consumer(&consumer_name).await?,
128+
);
129+
Ok(Python::attach(|py| consumer.into_py_any(py))?)
130+
}
131+
}
132+
None => Err(NatsrpyError::AsyncStopIteration),
133+
}
134+
})
135+
}
136+
137+
pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
138+
self.next(py, None)
139+
}
140+
}
141+
12142
#[pyo3::pyclass]
13143
pub struct ConsumersManager {
14144
stream: Arc<RwLock<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>>,
@@ -146,4 +276,23 @@ impl ConsumersManager {
146276
Ok(ctx.read().await.delete_consumer(&name).await?.success)
147277
})
148278
}
279+
280+
pub fn list<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
281+
let ctx = self.stream.clone();
282+
natsrpy_future(py, async move {
283+
let consumers = ctx.read().await.consumers();
284+
Ok(ConsumersIterator::new(
285+
ctx.clone(),
286+
Streamer::new(consumers),
287+
))
288+
})
289+
}
290+
291+
pub fn list_names<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
292+
let ctx = self.stream.clone();
293+
natsrpy_future(py, async move {
294+
let consumers = ctx.read().await.consumer_names();
295+
Ok(ConsumersNamesIterator::new(Streamer::new(consumers)))
296+
})
297+
}
149298
}

src/js/managers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub mod streams;
77
#[pyo3::pymodule(submodule, name = "managers")]
88
pub mod pymod {
99
#[pymodule_export]
10-
use super::consumers::ConsumersManager;
10+
use super::consumers::{ConsumersIterator, ConsumersManager, ConsumersNamesIterator};
1111
#[pymodule_export]
1212
use super::counters::CountersManager;
1313
#[pymodule_export]

0 commit comments

Comments
 (0)