Skip to content

Commit 4d76d59

Browse files
authored
Implemented push and pull consumers (#11)
1 parent d1993f8 commit 4d76d59

File tree

10 files changed

+252
-59
lines changed

10 files changed

+252
-59
lines changed

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import timedelta
12
from typing import Any
23

34
from .managers import KVManager, StreamsManager
@@ -26,4 +27,57 @@ class JetStreamMessage:
2627
def payload(self) -> bytes: ...
2728
@property
2829
def headers(self) -> dict[str, Any]: ...
29-
async def ack(self) -> None: ...
30+
async def ack(self, double: bool = False) -> None:
31+
"""
32+
Acknowledge that a message was handled.
33+
34+
:param double: whether to wait for server response, defaults to False
35+
"""
36+
37+
async def nack(
38+
self,
39+
delay: float | timedelta | None = None,
40+
double: bool = False,
41+
) -> None:
42+
"""
43+
Negative acknowledgement.
44+
45+
Signals that the message will not be processed now
46+
and processing can move onto the next message, NAK'd
47+
message will be retried.
48+
49+
:param duration: time, defaults to None
50+
:param double: whether to wait for server response, defaults to False
51+
"""
52+
53+
async def progress(self, double: bool = False) -> None:
54+
"""
55+
Progress acknowledgement.
56+
57+
Singnals that the mesasge is being handled right now.
58+
Sending this request before the AckWait will extend wait period
59+
before redelivering a message.
60+
61+
:param double: whether to wait for server response, defaults to False
62+
"""
63+
64+
async def next(self, double: bool = False) -> None:
65+
"""
66+
Next acknowledgement.
67+
68+
Only applies to pull consumers!
69+
Acknowledges message processing and instructs server to send
70+
delivery of the next message to the reply subject.
71+
72+
:param double: whether to wait for server response, defaults to False
73+
"""
74+
75+
async def term(self, double: bool = False) -> None:
76+
"""
77+
Term acknowledgement.
78+
79+
Instructs server to stop redelivering message.
80+
Useful to stop redelivering a message after multiple NACKs.
81+
82+
:param double: whether to wait for server response, defaults to False
83+
"""

python/natsrpy/_natsrpy_rs/js/consumers.pyi

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,16 @@ class PushConsumerConfig:
147147
pause_until: int | None = None,
148148
) -> None: ...
149149

150-
class PushConsumer: ...
150+
class MessagesIterator:
151+
def __aiter__(self) -> MessagesIterator: ...
152+
async def __anext__(self) -> JetStreamMessage: ...
153+
async def next(
154+
self,
155+
timeout: float | timedelta | None = None,
156+
) -> JetStreamMessage: ...
157+
158+
class PushConsumer:
159+
async def messages(self) -> MessagesIterator: ...
151160

152161
class PullConsumer:
153162
async def fetch(

src/exceptions/rust_err.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use pyo3::exceptions::{PyTimeoutError, PyTypeError};
1+
use pyo3::exceptions::{PyStopAsyncIteration, PyTimeoutError, PyTypeError};
22

33
use crate::exceptions::py_err::{NatsrpyPublishError, NatsrpySessionError};
44

@@ -12,6 +12,8 @@ pub enum NatsrpyError {
1212
InvalidArgument(String),
1313
#[error("Session is not initialized. Call startup() first.")]
1414
NotInitialized,
15+
#[error("The end of stream")]
16+
AsyncStopIteration,
1517
#[error("Connection is closed or lost.")]
1618
Disconnected,
1719
#[error(transparent)]
@@ -66,12 +68,17 @@ pub enum NatsrpyError {
6668
PullConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
6769
#[error(transparent)]
6870
PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError),
71+
#[error(transparent)]
72+
PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError),
73+
#[error(transparent)]
74+
ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError),
6975
}
7076

7177
impl From<NatsrpyError> for pyo3::PyErr {
7278
fn from(value: NatsrpyError) -> Self {
7379
match value {
7480
NatsrpyError::PublishError(_) => NatsrpyPublishError::new_err(value.to_string()),
81+
NatsrpyError::AsyncStopIteration => PyStopAsyncIteration::new_err("End of the stream."),
7582
NatsrpyError::Timeout(_) => PyTimeoutError::new_err(value.to_string()),
7683
NatsrpyError::PyError(py_err) => py_err,
7784
NatsrpyError::InvalidArgument(descr) => PyTypeError::new_err(descr),

src/js/consumers/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ pub mod pymod {
99
#[pymodule_export]
1010
pub use super::pull::{config::PullConsumerConfig, consumer::PullConsumer};
1111
#[pymodule_export]
12-
pub use super::push::{config::PushConsumerConfig, consumer::PushConsumer};
12+
pub use super::push::{
13+
config::PushConsumerConfig, consumer::MessagesIterator, consumer::PushConsumer,
14+
};
1315
}

src/js/consumers/pull/consumer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::sync::RwLock;
66

77
use crate::{
88
exceptions::rust_err::NatsrpyResult,
9-
utils::{futures::natsrpy_future_with_timeout, py_types::TimeoutValue},
9+
utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue},
1010
};
1111

1212
type NatsPullConsumer =
@@ -51,7 +51,7 @@ impl PullConsumer {
5151
expires: Option<Duration>,
5252
min_pending: Option<usize>,
5353
min_ack_pending: Option<usize>,
54-
timeout: Option<TimeoutValue>,
54+
timeout: Option<TimeValue>,
5555
) -> NatsrpyResult<Bound<'py, PyAny>> {
5656
let ctx = self.consumer.clone();
5757

@@ -89,7 +89,7 @@ impl PullConsumer {
8989
let mut ret_messages = Vec::new();
9090
while let Some(msg) = messages.next().await {
9191
let raw_msg = msg?;
92-
ret_messages.push(crate::js::message::JetStreamMessage::from(raw_msg));
92+
ret_messages.push(crate::js::message::JetStreamMessage::try_from(raw_msg)?);
9393
}
9494
Ok(ret_messages)
9595
})

src/js/consumers/push/consumer.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
use std::sync::Arc;
22

3+
use futures_util::StreamExt;
4+
use pyo3::{Bound, PyAny, PyRef, Python};
35
use tokio::sync::RwLock;
46

7+
use crate::{
8+
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
9+
js::pymod::JetStreamMessage,
10+
utils::{futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue},
11+
};
12+
513
type NatsPushConsumer =
614
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>;
715

816
#[pyo3::pyclass(from_py_object)]
917
#[derive(Debug, Clone)]
10-
#[allow(dead_code)] // TODO! remove later.
1118
pub struct PushConsumer {
1219
consumer: Arc<RwLock<NatsPushConsumer>>,
1320
}
@@ -21,5 +28,67 @@ impl PushConsumer {
2128
}
2229
}
2330

31+
#[pyo3::pyclass]
32+
pub struct MessagesIterator {
33+
messages: Option<Arc<RwLock<async_nats::jetstream::consumer::push::Messages>>>,
34+
}
35+
36+
impl From<async_nats::jetstream::consumer::push::Messages> for MessagesIterator {
37+
fn from(value: async_nats::jetstream::consumer::push::Messages) -> Self {
38+
Self {
39+
messages: Some(Arc::new(RwLock::new(value))),
40+
}
41+
}
42+
}
43+
44+
#[pyo3::pymethods]
45+
impl PushConsumer {
46+
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
47+
let consumer_guard = self.consumer.clone();
48+
natsrpy_future(py, async move {
49+
Ok(MessagesIterator::from(
50+
consumer_guard.read().await.messages().await?,
51+
))
52+
})
53+
}
54+
}
55+
2456
#[pyo3::pymethods]
25-
impl PushConsumer {}
57+
impl MessagesIterator {
58+
#[must_use]
59+
pub const fn __aiter__(slf: PyRef<Self>) -> PyRef<Self> {
60+
slf
61+
}
62+
63+
pub fn next<'py>(
64+
&self,
65+
py: Python<'py>,
66+
timeout: Option<TimeValue>,
67+
) -> NatsrpyResult<Bound<'py, PyAny>> {
68+
let Some(messages_guard) = self.messages.clone() else {
69+
unreachable!("Message is always Some in runtime.")
70+
};
71+
#[allow(clippy::significant_drop_tightening)]
72+
natsrpy_future_with_timeout(py, timeout, async move {
73+
let mut messages = messages_guard.write().await;
74+
let Some(message) = messages.next().await else {
75+
return Err(NatsrpyError::AsyncStopIteration);
76+
};
77+
let message = message?;
78+
79+
JetStreamMessage::try_from(message)
80+
})
81+
}
82+
83+
pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
84+
self.next(py, None)
85+
}
86+
}
87+
88+
impl Drop for MessagesIterator {
89+
fn drop(&mut self) {
90+
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
91+
self.messages = None;
92+
});
93+
}
94+
}

src/js/message.rs

Lines changed: 85 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,114 @@
1-
use pyo3::{Bound, Py, PyAny, Python, types::PyDict};
1+
use pyo3::{
2+
Bound, Py, PyAny, Python,
3+
types::{PyBytes, PyDict},
4+
};
25
use std::sync::Arc;
36
use tokio::sync::RwLock;
47

58
use crate::{
6-
exceptions::rust_err::NatsrpyResult,
7-
utils::{headers::NatsrpyHeadermapExt, natsrpy_future},
9+
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
10+
utils::{natsrpy_future, py_types::TimeValue},
811
};
912

1013
#[pyo3::pyclass]
1114
pub struct JetStreamMessage {
12-
message: async_nats::Message,
13-
headers: Option<Py<PyDict>>,
15+
message: crate::message::Message,
1416
acker: Arc<RwLock<async_nats::jetstream::message::Acker>>,
1517
}
1618

17-
impl From<async_nats::jetstream::Message> for JetStreamMessage {
18-
fn from(value: async_nats::jetstream::Message) -> Self {
19+
impl TryFrom<async_nats::jetstream::Message> for JetStreamMessage {
20+
type Error = NatsrpyError;
21+
22+
fn try_from(value: async_nats::jetstream::Message) -> Result<Self, Self::Error> {
1923
let (message, acker) = value.split();
20-
Self {
21-
message,
22-
headers: None,
24+
Ok(Self {
25+
message: message.try_into()?,
2326
acker: Arc::new(RwLock::new(acker)),
24-
}
27+
})
28+
}
29+
}
30+
31+
impl JetStreamMessage {
32+
pub fn inner_ack<'py>(
33+
&self,
34+
py: Python<'py>,
35+
kind: async_nats::jetstream::message::AckKind,
36+
double: bool,
37+
) -> NatsrpyResult<Bound<'py, PyAny>> {
38+
let acker_guard = self.acker.clone();
39+
natsrpy_future(py, async move {
40+
if double {
41+
acker_guard.read().await.double_ack_with(kind).await?;
42+
} else {
43+
acker_guard.read().await.ack_with(kind).await?;
44+
}
45+
Ok(())
46+
})
2547
}
2648
}
2749

2850
#[pyo3::pymethods]
2951
impl JetStreamMessage {
3052
#[getter]
31-
pub fn subject(&self) -> &str {
53+
#[must_use]
54+
pub const fn subject(&self) -> &str {
3255
self.message.subject.as_str()
3356
}
3457
#[getter]
35-
pub fn reply(&self) -> Option<&str> {
36-
self.message.reply.as_ref().map(async_nats::Subject::as_str)
58+
#[must_use]
59+
pub const fn reply(&self) -> &Option<String> {
60+
&self.message.reply
3761
}
3862
#[getter]
39-
pub fn payload(&self) -> &[u8] {
63+
#[must_use]
64+
pub const fn payload(&self) -> &Py<PyBytes> {
4065
&self.message.payload
4166
}
4267
#[getter]
43-
pub fn headers(&mut self, py: Python<'_>) -> NatsrpyResult<Py<PyDict>> {
44-
if let Some(headers) = &self.headers {
45-
Ok(headers.clone_ref(py))
46-
} else {
47-
let headermap = self.message.headers.clone().unwrap_or_default();
48-
let headers = headermap.to_pydict(py)?.unbind();
49-
self.headers = Some(headers.clone_ref(py));
50-
Ok(headers)
51-
}
52-
}
53-
54-
pub fn ack<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
55-
let acker_guard = self.acker.clone();
56-
natsrpy_future(py, async move {
57-
acker_guard.read().await.ack().await?;
58-
Ok(())
59-
})
68+
pub const fn headers(&mut self) -> &Py<PyDict> {
69+
&self.message.headers
70+
}
71+
72+
#[pyo3(signature=(double=false))]
73+
pub fn ack<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
74+
self.inner_ack(py, async_nats::jetstream::message::AckKind::Ack, double)
75+
}
76+
77+
#[pyo3(signature=(delay=None, double=false))]
78+
pub fn nack<'py>(
79+
&self,
80+
py: Python<'py>,
81+
delay: Option<TimeValue>,
82+
double: bool,
83+
) -> NatsrpyResult<Bound<'py, PyAny>> {
84+
self.inner_ack(
85+
py,
86+
async_nats::jetstream::message::AckKind::Nak(delay.map(Into::into)),
87+
double,
88+
)
89+
}
90+
91+
#[pyo3(signature=(double=false))]
92+
pub fn progress<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
93+
self.inner_ack(
94+
py,
95+
async_nats::jetstream::message::AckKind::Progress,
96+
double,
97+
)
98+
}
99+
100+
#[pyo3(signature=(double=false))]
101+
pub fn next<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
102+
self.inner_ack(py, async_nats::jetstream::message::AckKind::Next, double)
103+
}
104+
105+
#[pyo3(signature=(double=false))]
106+
pub fn term<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
107+
self.inner_ack(py, async_nats::jetstream::message::AckKind::Term, double)
108+
}
109+
110+
#[must_use]
111+
pub fn __repr__(&self) -> String {
112+
self.message.__repr__()
60113
}
61114
}

0 commit comments

Comments
 (0)