Skip to content

Commit 790b8cc

Browse files
committed
Implemented getting counters.
1 parent 24d14ba commit 790b8cc

File tree

7 files changed

+100
-3
lines changed

7 files changed

+100
-3
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ log = "0.4.29"
2121
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
2222
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
2323
pyo3-log = "0.13.3"
24+
serde = { version = "1.0.228", features = ["derive"] }
25+
serde_json = "1.0.149"
2426
thiserror = "2.0.18"
2527
time = "0.3.47"
2628
tokio = { version = "1.50.0", features = ["full"] }

python/natsrpy/_natsrpy_rs/js/counters.pyi

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ from .stream import (
1616
SubjectTransform,
1717
)
1818

19-
__all__ = ["Counters", "CountersConfig"]
19+
__all__ = ["CounterEntry", "Counters", "CountersConfig"]
2020

2121
@final
2222
class CountersConfig:
@@ -153,6 +153,13 @@ class CountersConfig:
153153
allow_message_schedules: bool | None = None,
154154
) -> Self: ...
155155

156+
@final
157+
class CounterEntry:
158+
subject: str
159+
value: int
160+
sources: dict[str, dict[str, int]]
161+
increment: int | None
162+
156163
@final
157164
class Counters:
158165
async def add(
@@ -171,3 +178,8 @@ class Counters:
171178
key: str,
172179
timeout: float | timedelta | None = None,
173180
) -> int: ...
181+
async def get(
182+
self,
183+
key: str,
184+
timeout: float | timedelta | None = None,
185+
) -> CounterEntry: ...

python/natsrpy/js.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
PushConsumerConfig,
1111
ReplayPolicy,
1212
)
13-
from ._natsrpy_rs.js.counters import Counters, CountersConfig
13+
from ._natsrpy_rs.js.counters import CounterEntry, Counters, CountersConfig
1414
from ._natsrpy_rs.js.kv import (
1515
KeysIterator,
1616
KeyValue,
@@ -54,6 +54,7 @@
5454
"ClusterInfo",
5555
"Compression",
5656
"ConsumerLimits",
57+
"CounterEntry",
5758
"Counters",
5859
"CountersConfig",
5960
"DeliverPolicy",

src/exceptions/rust_err.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ pub enum NatsrpyError {
1111
#[error(transparent)]
1212
StdParseIntError(#[from] std::num::ParseIntError),
1313
#[error(transparent)]
14+
JSONParseError(#[from] serde_json::Error),
15+
#[error(transparent)]
1416
UnknownError(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
1517
#[error("NATS session error: {0}")]
1618
SessionError(String),

src/js/counters.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::{
1414
};
1515

1616
const COUNTER_INCREMENT_HEADER: &str = "Nats-Incr";
17+
const COUNTER_SOURCES_HEADER: &str = "Nats-Counter-Sources";
18+
type CounterSources = HashMap<String, HashMap<String, i128>>;
1719

1820
#[pyo3::pyclass(from_py_object, get_all, set_all)]
1921
#[derive(Debug, Clone, Default)]
@@ -250,6 +252,38 @@ impl TryFrom<CountersConfig> for async_nats::jetstream::stream::Config {
250252
}
251253
}
252254

255+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
256+
pub struct CounterPayload<'a> {
257+
val: &'a str,
258+
}
259+
260+
#[pyo3::pyclass(from_py_object, get_all)]
261+
#[derive(Clone)]
262+
pub struct CounterEntry {
263+
pub subject: String,
264+
pub value: i128,
265+
pub sources: CounterSources,
266+
pub increment: Option<i128>,
267+
}
268+
269+
impl TryFrom<async_nats::jetstream::message::StreamMessage> for CounterEntry {
270+
type Error = NatsrpyError;
271+
272+
fn try_from(value: async_nats::jetstream::message::StreamMessage) -> Result<Self, Self::Error> {
273+
let counter_value = serde_json::from_slice::<CounterPayload>(&value.payload)?
274+
.val
275+
.parse::<i128>()?;
276+
let sources = parse_sources(&value.headers)?;
277+
let increment = parse_increment(&value.headers)?;
278+
Ok(Self {
279+
subject: value.subject.to_string(),
280+
value: counter_value,
281+
sources,
282+
increment,
283+
})
284+
}
285+
}
286+
253287
#[pyo3::pyclass]
254288
#[allow(dead_code)]
255289
pub struct Counters {
@@ -269,6 +303,31 @@ impl Counters {
269303
}
270304
}
271305

306+
fn parse_sources(headers: &HeaderMap) -> NatsrpyResult<CounterSources> {
307+
let Some(sources) = headers.get(COUNTER_SOURCES_HEADER) else {
308+
return Ok(CounterSources::new());
309+
};
310+
let raw_sources =
311+
serde_json::from_str::<HashMap<String, HashMap<String, String>>>(sources.as_str())?;
312+
let mut sources = CounterSources::new();
313+
for (source_id, subjects) in raw_sources {
314+
let mut subject_values = HashMap::new();
315+
for (subject, value_str) in subjects {
316+
subject_values.insert(subject, value_str.parse()?);
317+
}
318+
sources.insert(source_id, subject_values);
319+
}
320+
321+
Ok(sources)
322+
}
323+
324+
pub fn parse_increment(headers: &HeaderMap) -> NatsrpyResult<Option<i128>> {
325+
let Some(header_value) = headers.get(COUNTER_INCREMENT_HEADER) else {
326+
return Ok(None);
327+
};
328+
Ok(Some(header_value.as_str().parse()?))
329+
}
330+
272331
#[pyo3::pymethods]
273332
impl Counters {
274333
#[pyo3(signature=(key, value, timeout=None))]
@@ -321,10 +380,28 @@ impl Counters {
321380
) -> NatsrpyResult<Bound<'py, PyAny>> {
322381
self.add(py, key, -1, timeout)
323382
}
383+
384+
#[pyo3(signature=(key, timeout=None))]
385+
pub fn get<'py>(
386+
&self,
387+
py: Python<'py>,
388+
key: String,
389+
timeout: Option<TimeValue>,
390+
) -> NatsrpyResult<Bound<'py, PyAny>> {
391+
let stream_guard = self.stream.clone();
392+
natsrpy_future_with_timeout(py, timeout, async move {
393+
let message = stream_guard
394+
.read()
395+
.await
396+
.direct_get_last_for_subject(key)
397+
.await?;
398+
CounterEntry::try_from(message)
399+
})
400+
}
324401
}
325402

326403
#[pyo3::pymodule(submodule, name = "counters")]
327404
pub mod pymod {
328405
#[pymodule_export]
329-
use super::{Counters, CountersConfig};
406+
use super::{CounterEntry, Counters, CountersConfig};
330407
}

src/utils/futures.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ where
1414
Ok(res)
1515
}
1616

17+
#[inline]
1718
pub fn natsrpy_future_with_timeout<F, T, D>(
1819
py: Python,
1920
timeout: Option<D>,

0 commit comments

Comments
 (0)