Skip to content

Commit 3e77b14

Browse files
committed
Added pull consumer fetch function.
1 parent d1bafca commit 3e77b14

File tree

9 files changed

+168
-17
lines changed

9 files changed

+168
-17
lines changed

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Any
2+
13
from .managers import KVManager, StreamsManager
24

35
class JetStream:
@@ -14,3 +16,14 @@ class JetStream:
1416
def kv(self) -> KVManager: ...
1517
@property
1618
def streams(self) -> StreamsManager: ...
19+
20+
class JetStreamMessage:
21+
@property
22+
def subject(self) -> str: ...
23+
@property
24+
def reply(self) -> str | None: ...
25+
@property
26+
def payload(self) -> bytes: ...
27+
@property
28+
def headers(self) -> dict[str, Any]: ...
29+
async def ack(self) -> None: ...

python/natsrpy/_natsrpy_rs/js/consumers.pyi

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from datetime import timedelta
22

3+
from natsrpy._natsrpy_rs.js import JetStreamMessage
4+
35
class DeliverPolicy:
46
ALL: DeliverPolicy
57
LAST: DeliverPolicy
@@ -146,4 +148,16 @@ class PushConsumerConfig:
146148
) -> None: ...
147149

148150
class PushConsumer: ...
149-
class PullConsumer: ...
151+
152+
class PullConsumer:
153+
async def fetch(
154+
self,
155+
max_messages: int | None = None,
156+
group: str | None = None,
157+
priority: int | None = None,
158+
max_bytes: int | None = None,
159+
heartbeat: timedelta | None = None,
160+
expires: timedelta | None = None,
161+
min_pending: int | None = None,
162+
min_ack_pending: int | None = None,
163+
) -> list[JetStreamMessage]: ...

src/exceptions/rust_err.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ pub enum NatsrpyError {
6464
PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError),
6565
#[error(transparent)]
6666
PullConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
67+
#[error(transparent)]
68+
PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError),
6769
}
6870

6971
impl From<NatsrpyError> for pyo3::PyErr {

src/js/consumers/pull/consumer.rs

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

3+
use futures_util::StreamExt;
4+
use pyo3::{Bound, PyAny, Python};
35
use tokio::sync::RwLock;
46

7+
use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future};
8+
59
type NatsPullConsumer =
610
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>;
711

@@ -20,13 +24,68 @@ impl PullConsumer {
2024
}
2125
}
2226

23-
#[pyo3::pyclass]
24-
pub struct PullMessageIterator {
25-
inner: Arc<RwLock<async_nats::jetstream::consumer::pull::Batch>>,
26-
}
27-
28-
#[pyo3::pymethods]
29-
impl PullConsumer {}
30-
3127
#[pyo3::pymethods]
32-
impl PullMessageIterator {}
28+
impl PullConsumer {
29+
#[pyo3(signature=(
30+
max_messages=None,
31+
group=None,
32+
priority=None,
33+
max_bytes=None,
34+
heartbeat=None,
35+
expires=None,
36+
min_pending=None,
37+
min_ack_pending=None,
38+
))]
39+
pub fn fetch<'py>(
40+
&self,
41+
py: Python<'py>,
42+
max_messages: Option<usize>,
43+
group: Option<String>,
44+
priority: Option<usize>,
45+
max_bytes: Option<usize>,
46+
heartbeat: Option<Duration>,
47+
expires: Option<Duration>,
48+
min_pending: Option<usize>,
49+
min_ack_pending: Option<usize>,
50+
) -> NatsrpyResult<Bound<'py, PyAny>> {
51+
let ctx = self.consumer.clone();
52+
#[allow(clippy::significant_drop_tightening)]
53+
natsrpy_future(py, async move {
54+
// Because we borrow created value
55+
// later for modifications.
56+
let consumer = ctx.read().await;
57+
let mut fetch_builder = consumer.fetch();
58+
if let Some(max_messages) = max_messages {
59+
fetch_builder = fetch_builder.max_messages(max_messages);
60+
}
61+
if let Some(group) = group {
62+
fetch_builder = fetch_builder.group(group);
63+
}
64+
if let Some(priority) = priority {
65+
fetch_builder = fetch_builder.priority(priority);
66+
}
67+
if let Some(max_bytes) = max_bytes {
68+
fetch_builder = fetch_builder.max_bytes(max_bytes);
69+
}
70+
if let Some(heartbeat) = heartbeat {
71+
fetch_builder = fetch_builder.heartbeat(heartbeat);
72+
}
73+
if let Some(expires) = expires {
74+
fetch_builder = fetch_builder.expires(expires);
75+
}
76+
if let Some(min_pending) = min_pending {
77+
fetch_builder = fetch_builder.min_pending(min_pending);
78+
}
79+
if let Some(min_ack_pending) = min_ack_pending {
80+
fetch_builder = fetch_builder.min_ack_pending(min_ack_pending);
81+
}
82+
let mut messages = fetch_builder.messages().await?;
83+
let mut ret_messages = Vec::new();
84+
while let Some(msg) = messages.next().await {
85+
let raw_msg = msg?;
86+
ret_messages.push(crate::js::message::JetStreamMessage::from(raw_msg));
87+
}
88+
Ok(ret_messages)
89+
})
90+
}
91+
}

src/js/message.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,61 @@
1+
use pyo3::{Bound, Py, PyAny, Python, types::PyDict};
2+
use std::sync::Arc;
3+
use tokio::sync::RwLock;
14

5+
use crate::{
6+
exceptions::rust_err::NatsrpyResult,
7+
utils::{headers::NatsrpyHeadermapExt, natsrpy_future},
8+
};
9+
10+
#[pyo3::pyclass]
11+
pub struct JetStreamMessage {
12+
message: async_nats::Message,
13+
headers: Option<Py<PyDict>>,
14+
acker: Arc<RwLock<async_nats::jetstream::message::Acker>>,
15+
}
16+
17+
impl From<async_nats::jetstream::Message> for JetStreamMessage {
18+
fn from(value: async_nats::jetstream::Message) -> Self {
19+
let (message, acker) = value.split();
20+
Self {
21+
message,
22+
headers: None,
23+
acker: Arc::new(RwLock::new(acker)),
24+
}
25+
}
26+
}
27+
28+
#[pyo3::pymethods]
29+
impl JetStreamMessage {
30+
#[getter]
31+
pub fn subject(&self) -> &str {
32+
self.message.subject.as_str()
33+
}
34+
#[getter]
35+
pub fn reply(&self) -> Option<&str> {
36+
self.message.reply.as_ref().map(async_nats::Subject::as_str)
37+
}
38+
#[getter]
39+
pub fn payload(&self) -> &[u8] {
40+
&self.message.payload
41+
}
42+
#[getter]
43+
pub fn headers(&mut self, py: Python<'_>) -> NatsrpyResult<Py<PyDict>> {
44+
if let Some(headers) = &self.headers {
45+
Ok(headers.clone_ref(py))
46+
} else {
47+
let headermap = self.message.headers.clone().unwrap_or_default();
48+
let headers = headermap.to_pydict(py)?.unbind();
49+
self.headers = Some(headers.clone_ref(py));
50+
Ok(headers)
51+
}
52+
}
53+
54+
pub fn ack<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
55+
let acker_guard = self.acker.clone();
56+
natsrpy_future(py, async move {
57+
acker_guard.read().await.ack().await?;
58+
Ok(())
59+
})
60+
}
61+
}

src/js/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ pub mod consumers;
22
pub mod jetstream;
33
pub mod kv;
44
pub mod managers;
5+
pub mod message;
56
pub mod stream;
67

78
#[pyo3::pymodule(submodule, name = "js")]
89
pub mod pymod {
910
// Classes
1011
#[pymodule_export]
1112
pub use super::jetstream::JetStream;
13+
#[pymodule_export]
14+
pub use super::message::JetStreamMessage;
1215

1316
// SubModules
1417
#[pymodule_export]

src/js/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,7 @@ impl StreamMessage {
817817
Ok(Self {
818818
subject: msg.subject.to_string(),
819819
payload: PyBytes::new(py, &msg.payload).unbind(),
820-
headers: msg.headers.to_pydict(py)?,
820+
headers: msg.headers.to_pydict(py)?.unbind(),
821821
sequence: msg.sequence,
822822
time: time.unbind(),
823823
})

src/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl TryFrom<async_nats::Message> for Message {
2323
fn try_from(value: async_nats::Message) -> Result<Self, Self::Error> {
2424
Python::attach(move |gil| {
2525
let headers = match value.headers {
26-
Some(headermap) => headermap.to_pydict(gil)?,
26+
Some(headermap) => headermap.to_pydict(gil)?.unbind(),
2727
None => PyDict::new(gil).unbind(),
2828
};
2929
Ok(Self {

src/utils/headers.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use pyo3::{
2-
Bound, Py, Python,
2+
Bound, Python,
33
types::{PyAnyMethods, PyDict},
44
};
55

66
use crate::exceptions::rust_err::NatsrpyResult;
77

88
pub trait NatsrpyHeadermapExt: Sized {
99
fn from_pydict(pydict: Bound<PyDict>) -> NatsrpyResult<Self>;
10-
fn to_pydict(&self, py: Python) -> NatsrpyResult<Py<PyDict>>;
10+
fn to_pydict<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyDict>>;
1111
}
1212

1313
impl NatsrpyHeadermapExt for async_nats::HeaderMap {
@@ -30,7 +30,7 @@ impl NatsrpyHeadermapExt for async_nats::HeaderMap {
3030
Ok(headermap)
3131
}
3232

33-
fn to_pydict(&self, py: Python) -> NatsrpyResult<Py<PyDict>> {
33+
fn to_pydict<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyDict>> {
3434
let dict = PyDict::new(py);
3535
for (header_name, header_val) in self.iter() {
3636
let py_val = header_val
@@ -46,6 +46,6 @@ impl NatsrpyHeadermapExt for async_nats::HeaderMap {
4646
}
4747
dict.set_item(header_name.to_string(), py_val)?;
4848
}
49-
Ok(dict.unbind())
49+
Ok(dict)
5050
}
5151
}

0 commit comments

Comments
 (0)