Skip to content

Commit 62d385a

Browse files
committed
Added some stream methods.
1 parent 7e0c612 commit 62d385a

File tree

13 files changed

+480
-64
lines changed

13 files changed

+480
-64
lines changed

python/natsrpy/_inner/js/__init__.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ class JetStream:
2020
async def create_stream(self, config: StreamConfig) -> Stream: ...
2121
async def update_stream(self, config: StreamConfig) -> Stream: ...
2222
async def get_stream(self, name: str) -> Stream: ...
23-
async def delete_stream(self, name: str) -> Stream: ...
23+
async def delete_stream(self, name: str) -> bool: ...

python/natsrpy/_inner/js/stream.pyi

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,50 @@ class StreamMessage:
181181
payload: bytes
182182
time: datetime
183183

184+
class StreamState:
185+
messages: int
186+
bytes: int
187+
first_sequence: int
188+
first_timestamp: int
189+
last_sequence: int
190+
last_timestamp: int
191+
consumer_count: int
192+
subjects_count: int
193+
deleted_count: int | None
194+
deleted: list[int] | None
195+
196+
class SourceInfo:
197+
name: str
198+
lag: int
199+
active: timedelta | None
200+
filter_subject: str | None
201+
subject_transform_dest: str | None
202+
subject_transforms: list[SubjectTransform]
203+
204+
class PeerInfo:
205+
name: str
206+
current: bool
207+
active: timedelta
208+
offline: bool
209+
lag: int | None
210+
211+
class ClusterInfo:
212+
name: str | None
213+
raft_group: str | None
214+
leader: str | None
215+
leader_since: int | None
216+
system_account: bool
217+
traffic_account: str | None
218+
replicas: list[PeerInfo]
219+
220+
class StreamInfo:
221+
config: StreamConfig
222+
created: float
223+
state: StreamState
224+
cluster: ClusterInfo | None
225+
mirror: SourceInfo | None
226+
sources: list[SourceInfo]
227+
184228
class Stream:
185229
async def direct_get(self, sequence: int) -> StreamMessage:
186230
"""

python/natsrpy/js/stream.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
StreamConfig,
1414
StreamMessage,
1515
SubjectTransform,
16+
ClusterInfo,
17+
PeerInfo,
18+
SourceInfo,
19+
StreamInfo,
20+
StreamState,
1621
)
1722

1823
__all__ = [
@@ -30,4 +35,16 @@
3035
"StreamConfig",
3136
"StreamMessage",
3237
"SubjectTransform",
38+
"ClusterInfo",
39+
"Compression",
40+
"ConsumerLimits",
41+
"DiscardPolicy",
42+
"Stream",
43+
"PeerInfo",
44+
"PersistenceMode",
45+
"RetentionPolicy",
46+
"SourceInfo",
47+
"StreamConfig",
48+
"StreamInfo",
49+
"StreamState",
3350
]

src/exceptions/rust_err.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub enum NatsrpyError {
5454
GetStreamError(#[from] async_nats::jetstream::context::GetStreamError),
5555
#[error(transparent)]
5656
StreamDirectGetError(#[from] async_nats::jetstream::stream::DirectGetError),
57+
#[error(transparent)]
58+
StreamInfoError(#[from] async_nats::jetstream::stream::InfoError),
5759
}
5860

5961
impl From<NatsrpyError> for pyo3::PyErr {

src/js/consumers/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod pull;
2+
pub mod push;

src/js/consumers/pull.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#[pyo3::pyclass(from_py_object)]
2+
#[derive(Debug, Clone)]
3+
pub struct PullConsumer;

src/js/consumers/push.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#[pyo3::pyclass(from_py_object)]
2+
#[derive(Debug, Clone)]
3+
pub struct PushConsumer;

src/js/jetstream.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{ops::Deref, sync::Arc};
22

33
use async_nats::{Subject, client::traits::Publisher, connection::State};
44
use pyo3::{
5-
Bound, PyAny, Python, pyclass, pymethods,
5+
Bound, PyAny, Python,
66
types::{PyBytes, PyBytesMethods, PyDict},
77
};
88
use tokio::sync::RwLock;
@@ -16,7 +16,7 @@ use crate::{
1616
utils::{headers::NatsrpyHeadermapExt, natsrpy_future},
1717
};
1818

19-
#[pyclass]
19+
#[pyo3::pyclass]
2020
pub struct JetStream {
2121
ctx: Arc<RwLock<async_nats::jetstream::Context>>,
2222
}
@@ -30,7 +30,7 @@ impl JetStream {
3030
}
3131
}
3232

33-
#[pymethods]
33+
#[pyo3::pymethods]
3434
impl JetStream {
3535
#[pyo3(signature = (
3636
subject,
@@ -120,6 +120,18 @@ impl JetStream {
120120
})
121121
}
122122

123+
pub fn get_stream<'py>(
124+
&self,
125+
py: Python<'py>,
126+
name: String,
127+
) -> NatsrpyResult<Bound<'py, PyAny>> {
128+
let ctx = self.ctx.clone();
129+
natsrpy_future(py, async move {
130+
let js = ctx.read().await;
131+
Ok(super::stream::Stream::new(js.get_stream(name).await?))
132+
})
133+
}
134+
123135
pub fn create_stream<'py>(
124136
&self,
125137
py: Python<'py>,
@@ -135,15 +147,15 @@ impl JetStream {
135147
})
136148
}
137149

138-
pub fn get_stream<'py>(
150+
pub fn delete_stream<'py>(
139151
&self,
140152
py: Python<'py>,
141153
name: String,
142154
) -> NatsrpyResult<Bound<'py, PyAny>> {
143155
let ctx = self.ctx.clone();
144156
natsrpy_future(py, async move {
145157
let js = ctx.read().await;
146-
Ok(super::stream::Stream::new(js.get_stream(name).await?))
158+
Ok(js.delete_stream(name).await?.success)
147159
})
148160
}
149161
}

src/js/kv.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};
22

33
use crate::js;
44
use pyo3::{
5-
Bound, PyAny, Python, pyclass, pymethods,
5+
Bound, PyAny, Python,
66
types::{PyBytes, PyBytesMethods},
77
};
88
use tokio::sync::RwLock;
@@ -12,7 +12,7 @@ use crate::{
1212
utils::natsrpy_future,
1313
};
1414

15-
#[pyclass(from_py_object, get_all, set_all)]
15+
#[pyo3::pyclass(from_py_object, get_all, set_all)]
1616
#[derive(Clone)]
1717
pub struct KVConfig {
1818
bucket: String,
@@ -32,7 +32,7 @@ pub struct KVConfig {
3232
limit_markers: Option<Duration>,
3333
}
3434

35-
#[pymethods]
35+
#[pyo3::pymethods]
3636
impl KVConfig {
3737
#[new]
3838
#[pyo3(signature=(
@@ -128,7 +128,7 @@ impl TryFrom<KVConfig> for async_nats::jetstream::kv::Config {
128128
}
129129
}
130130

131-
#[pyclass(from_py_object)]
131+
#[pyo3::pyclass(from_py_object)]
132132
#[derive(Clone)]
133133
pub struct KeyValue {
134134
#[pyo3(get)]
@@ -159,7 +159,7 @@ impl KeyValue {
159159
}
160160
}
161161

162-
#[pymethods]
162+
#[pyo3::pymethods]
163163
impl KeyValue {
164164
pub fn get<'py>(&self, py: Python<'py>, key: String) -> NatsrpyResult<Bound<'py, PyAny>> {
165165
let store = self.store.clone();

src/js/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1+
pub mod consumers;
12
pub mod jetstream;
23
pub mod kv;
34
pub mod stream;
45

56
#[pyo3::pymodule(submodule, name = "js")]
67
pub mod pymod {
8+
// Classes
79
#[pymodule_export]
8-
pub use super::jetstream::JetStream;
9-
#[pymodule_export]
10-
pub use super::kv::KVConfig;
11-
12-
#[pymodule_export]
13-
pub use super::kv::KeyValue;
10+
pub use super::{
11+
consumers::{pull::PullConsumer, push::PushConsumer},
12+
jetstream::JetStream,
13+
kv::{KVConfig, KeyValue},
14+
};
1415

16+
// SubModules
1517
#[pymodule_export]
1618
pub use super::kv::pymod as kv;
1719
#[pymodule_export]

0 commit comments

Comments
 (0)