Skip to content

Commit 1b26890

Browse files
authored
Added JS information parsed from message. (#16)
1 parent 5dd165d commit 1b26890

File tree

4 files changed

+138
-21
lines changed

4 files changed

+138
-21
lines changed

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import timedelta
1+
from datetime import datetime, timedelta
22
from typing import Any
33

44
from .managers import KVManager, ObjectStoreManager, StreamsManager
@@ -29,6 +29,26 @@ class JetStreamMessage:
2929
def payload(self) -> bytes: ...
3030
@property
3131
def headers(self) -> dict[str, Any]: ...
32+
@property
33+
def domain(self) -> str | None: ...
34+
@property
35+
def acc_hash(self) -> str | None: ...
36+
@property
37+
def stream(self) -> str: ...
38+
@property
39+
def consumer(self) -> str: ...
40+
@property
41+
def stream_sequence(self) -> int: ...
42+
@property
43+
def consumer_sequence(self) -> int: ...
44+
@property
45+
def delivered(self) -> int: ...
46+
@property
47+
def pending(self) -> int: ...
48+
@property
49+
def published(self) -> datetime: ...
50+
@property
51+
def token(self) -> str | None: ...
3252
async def ack(self, double: bool = False) -> None:
3353
"""
3454
Acknowledge that a message was handled.

src/js/message.rs

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,65 @@
11
use pyo3::{
22
Bound, Py, PyAny, Python,
3-
types::{PyBytes, PyDict},
3+
types::{PyBytes, PyDateTime, PyDict},
44
};
55
use std::sync::Arc;
66
use tokio::sync::RwLock;
77

88
use crate::{
99
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
10-
utils::{natsrpy_future, py_types::TimeValue},
10+
utils::{
11+
natsrpy_future,
12+
py_types::{TimeValue, ToPyDate},
13+
},
1114
};
1215

16+
#[derive(Debug, Clone)]
17+
pub struct JSInfo {
18+
pub domain: Option<String>,
19+
pub acc_hash: Option<String>,
20+
pub stream: String,
21+
pub consumer: String,
22+
pub stream_sequence: u64,
23+
pub consumer_sequence: u64,
24+
pub delivered: i64,
25+
pub pending: u64,
26+
pub published: time::OffsetDateTime,
27+
pub token: Option<String>,
28+
}
29+
30+
impl From<async_nats::jetstream::message::Info<'_>> for JSInfo {
31+
fn from(value: async_nats::jetstream::message::Info) -> Self {
32+
Self {
33+
domain: value.domain.map(ToString::to_string),
34+
acc_hash: value.acc_hash.map(ToString::to_string),
35+
stream: value.stream.to_string(),
36+
consumer: value.consumer.to_string(),
37+
stream_sequence: value.stream_sequence,
38+
consumer_sequence: value.consumer_sequence,
39+
delivered: value.delivered,
40+
pending: value.pending,
41+
published: value.published,
42+
token: value.token.map(ToString::to_string),
43+
}
44+
}
45+
}
46+
1347
#[pyo3::pyclass]
1448
pub struct JetStreamMessage {
1549
message: crate::message::Message,
50+
info: JSInfo,
1651
acker: Arc<RwLock<async_nats::jetstream::message::Acker>>,
1752
}
1853

1954
impl TryFrom<async_nats::jetstream::Message> for JetStreamMessage {
2055
type Error = NatsrpyError;
2156

2257
fn try_from(value: async_nats::jetstream::Message) -> Result<Self, Self::Error> {
58+
let js_info = JSInfo::from(value.info()?);
2359
let (message, acker) = value.split();
2460
Ok(Self {
2561
message: message.try_into()?,
62+
info: js_info,
2663
acker: Arc::new(RwLock::new(acker)),
2764
})
2865
}
@@ -69,6 +106,57 @@ impl JetStreamMessage {
69106
&self.message.headers
70107
}
71108

109+
#[getter]
110+
pub const fn domain(&mut self) -> &Option<String> {
111+
&self.info.domain
112+
}
113+
114+
#[getter]
115+
#[must_use]
116+
pub const fn acc_hash(&self) -> &Option<String> {
117+
&self.info.acc_hash
118+
}
119+
#[getter]
120+
#[must_use]
121+
pub const fn stream(&self) -> &str {
122+
self.info.stream.as_str()
123+
}
124+
#[getter]
125+
#[must_use]
126+
pub const fn consumer(&self) -> &str {
127+
self.info.consumer.as_str()
128+
}
129+
#[getter]
130+
#[must_use]
131+
pub const fn stream_sequence(&self) -> u64 {
132+
self.info.stream_sequence
133+
}
134+
#[getter]
135+
#[must_use]
136+
pub const fn consumer_sequence(&self) -> u64 {
137+
self.info.consumer_sequence
138+
}
139+
#[getter]
140+
#[must_use]
141+
pub const fn delivered(&self) -> i64 {
142+
self.info.delivered
143+
}
144+
#[getter]
145+
#[must_use]
146+
pub const fn pending(&self) -> u64 {
147+
self.info.pending
148+
}
149+
#[getter]
150+
pub fn published<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyDateTime>> {
151+
Ok(self.info.published.to_py_date(py)?)
152+
}
153+
154+
#[getter]
155+
#[must_use]
156+
pub const fn token(&self) -> &Option<String> {
157+
&self.info.token
158+
}
159+
72160
#[pyo3(signature=(double=false))]
73161
pub fn ack<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
74162
self.inner_ack(py, async_nats::jetstream::message::AckKind::Ack, double)

src/js/stream.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use pyo3::{
22
Py,
3-
types::{PyBytes, PyDateTime, PyDict, PyTzInfo},
3+
types::{PyBytes, PyDateTime, PyDict},
44
};
55
use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};
66

77
use crate::{
88
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
99
js::managers::consumers::ConsumersManager,
10-
utils::{headers::NatsrpyHeadermapExt, natsrpy_future},
10+
utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::ToPyDate},
1111
};
1212
use pyo3::{Bound, PyAny, Python};
1313
use tokio::sync::RwLock;
@@ -801,25 +801,12 @@ impl StreamMessage {
801801
py: Python,
802802
msg: &async_nats::jetstream::message::StreamMessage,
803803
) -> NatsrpyResult<Self> {
804-
let time = msg.time.to_utc();
805-
let tz_info = PyTzInfo::utc(py)?;
806-
let time = PyDateTime::new(
807-
py,
808-
time.year(),
809-
time.month().into(),
810-
time.day(),
811-
time.hour(),
812-
time.minute(),
813-
time.second(),
814-
time.microsecond(),
815-
Some(&*tz_info),
816-
)?;
817804
Ok(Self {
818805
subject: msg.subject.to_string(),
819806
payload: PyBytes::new(py, &msg.payload).unbind(),
820807
headers: msg.headers.to_pydict(py)?.unbind(),
821808
sequence: msg.sequence,
822-
time: time.unbind(),
809+
time: msg.time.to_py_date(py)?.unbind(),
823810
})
824811
}
825812
}

src/utils/py_types.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::time::Duration;
22

33
use pyo3::{
4-
FromPyObject,
5-
types::{PyBytes, PyBytesMethods},
4+
Bound, FromPyObject, PyResult, Python,
5+
types::{PyBytes, PyBytesMethods, PyDateTime, PyTzInfo},
66
};
77

88
use crate::exceptions::rust_err::NatsrpyError;
@@ -74,3 +74,25 @@ impl<'py> FromPyObject<'_, 'py> for TimeValue {
7474
}
7575
}
7676
}
77+
78+
pub trait ToPyDate {
79+
fn to_py_date<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDateTime>>;
80+
}
81+
82+
impl ToPyDate for time::OffsetDateTime {
83+
fn to_py_date<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDateTime>> {
84+
let time = self.to_utc();
85+
let tz_info = PyTzInfo::utc(py)?;
86+
PyDateTime::new(
87+
py,
88+
time.year(),
89+
time.month().into(),
90+
time.day(),
91+
time.hour(),
92+
time.minute(),
93+
time.second(),
94+
time.microsecond(),
95+
Some(&*tz_info),
96+
)
97+
}
98+
}

0 commit comments

Comments
 (0)