Skip to content

Commit d1993f8

Browse files
authored
Added better API for timeouts. (#10)
1 parent 6e4a1fb commit d1993f8

File tree

8 files changed

+113
-44
lines changed

8 files changed

+113
-44
lines changed

python/natsrpy/_natsrpy_rs/__init__.pyi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class Nats:
2020
read_buffer_capacity: int = 65535,
2121
sender_capacity: int = 128,
2222
max_reconnects: int | None = None,
23-
connection_timeout: timedelta = ...,
24-
request_timeout: timedelta = ...,
23+
connection_timeout: float | timedelta = ...,
24+
request_timeout: float | timedelta = ...,
2525
) -> None: ...
2626
async def startup(self) -> None: ...
2727
async def shutdown(self) -> None: ...

python/natsrpy/_natsrpy_rs/js/consumers.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,5 @@ class PullConsumer:
160160
expires: timedelta | None = None,
161161
min_pending: int | None = None,
162162
min_ack_pending: int | None = None,
163+
timeout: float | timedelta | None = None,
163164
) -> list[JetStreamMessage]: ...

src/exceptions/rust_err.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use pyo3::exceptions::PyTypeError;
1+
use pyo3::exceptions::{PyTimeoutError, PyTypeError};
22

33
use crate::exceptions::py_err::{NatsrpyPublishError, NatsrpySessionError};
44

@@ -72,6 +72,7 @@ impl From<NatsrpyError> for pyo3::PyErr {
7272
fn from(value: NatsrpyError) -> Self {
7373
match value {
7474
NatsrpyError::PublishError(_) => NatsrpyPublishError::new_err(value.to_string()),
75+
NatsrpyError::Timeout(_) => PyTimeoutError::new_err(value.to_string()),
7576
NatsrpyError::PyError(py_err) => py_err,
7677
NatsrpyError::InvalidArgument(descr) => PyTypeError::new_err(descr),
7778
_ => NatsrpySessionError::new_err(value.to_string()),

src/js/consumers/pull/consumer.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use futures_util::StreamExt;
44
use pyo3::{Bound, PyAny, Python};
55
use tokio::sync::RwLock;
66

7-
use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future};
7+
use crate::{
8+
exceptions::rust_err::NatsrpyResult,
9+
utils::{futures::natsrpy_future_with_timeout, py_types::TimeoutValue},
10+
};
811

912
type NatsPullConsumer =
1013
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>;
@@ -35,6 +38,7 @@ impl PullConsumer {
3538
expires=None,
3639
min_pending=None,
3740
min_ack_pending=None,
41+
timeout=None,
3842
))]
3943
pub fn fetch<'py>(
4044
&self,
@@ -47,12 +51,14 @@ impl PullConsumer {
4751
expires: Option<Duration>,
4852
min_pending: Option<usize>,
4953
min_ack_pending: Option<usize>,
54+
timeout: Option<TimeoutValue>,
5055
) -> NatsrpyResult<Bound<'py, PyAny>> {
5156
let ctx = self.consumer.clone();
57+
58+
// Because we borrow cosnumer lock
59+
// later for modifications of fetchbuilder.
5260
#[allow(clippy::significant_drop_tightening)]
53-
natsrpy_future(py, async move {
54-
// Because we borrow created value
55-
// later for modifications.
61+
natsrpy_future_with_timeout(py, timeout, async move {
5662
let consumer = ctx.read().await;
5763
let mut fetch_builder = consumer.fetch();
5864
if let Some(max_messages) = max_messages {

src/nats_cls.rs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ use tokio::sync::RwLock;
99
use crate::{
1010
exceptions::rust_err::NatsrpyError,
1111
subscription::Subscription,
12-
utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::SendableValue},
12+
utils::{
13+
futures::natsrpy_future_with_timeout,
14+
headers::NatsrpyHeadermapExt,
15+
natsrpy_future,
16+
py_types::{SendableValue, TimeoutValue},
17+
},
1318
};
1419

1520
#[pyo3::pyclass(name = "Nats")]
@@ -23,8 +28,8 @@ pub struct NatsCls {
2328
read_buffer_capacity: u16,
2429
sender_capacity: usize,
2530
max_reconnects: Option<usize>,
26-
connection_timeout: Duration,
27-
request_timeout: Option<Duration>,
31+
connection_timeout: TimeoutValue,
32+
request_timeout: Option<TimeoutValue>,
2833
}
2934

3035
#[pyo3::pymethods]
@@ -40,8 +45,8 @@ impl NatsCls {
4045
read_buffer_capacity=65535,
4146
sender_capacity=128,
4247
max_reconnects=None,
43-
connection_timeout=Duration::from_secs(5),
44-
request_timeout=Duration::from_secs(10),
48+
connection_timeout=TimeoutValue::FloatSecs(5.0),
49+
request_timeout=TimeoutValue::FloatSecs(10.0),
4550
))]
4651
fn __new__(
4752
addrs: Vec<String>,
@@ -52,8 +57,8 @@ impl NatsCls {
5257
read_buffer_capacity: u16,
5358
sender_capacity: usize,
5459
max_reconnects: Option<usize>,
55-
connection_timeout: Duration,
56-
request_timeout: Option<Duration>,
60+
connection_timeout: TimeoutValue,
61+
request_timeout: Option<TimeoutValue>,
5762
) -> Self {
5863
Self {
5964
nats_session: Arc::new(RwLock::new(None)),
@@ -80,8 +85,8 @@ impl NatsCls {
8085
}
8186
conn_opts = conn_opts
8287
.max_reconnects(self.max_reconnects)
83-
.connection_timeout(self.connection_timeout)
84-
.request_timeout(self.request_timeout)
88+
.connection_timeout(self.connection_timeout.into())
89+
.request_timeout(self.request_timeout.map(Into::into))
8590
.read_buffer_capacity(self.read_buffer_capacity)
8691
.client_capacity(self.sender_capacity);
8792

@@ -94,23 +99,24 @@ impl NatsCls {
9499

95100
let session = self.nats_session.clone();
96101
let address = self.addr.clone();
97-
let startup_future = async move {
98-
if session.read().await.is_some() {
99-
return Err(NatsrpyError::SessionError(
100-
"NATS session already exists".to_string(),
101-
));
102-
}
103-
// Scoping for early-dropping of a guard.
104-
{
105-
let mut sesion_guard = session.write().await;
106-
*sesion_guard = Some(conn_opts.connect(address).await?);
107-
}
108-
Ok(())
109-
};
110102
let timeout = self.connection_timeout;
111-
return Ok(natsrpy_future(py, async move {
112-
tokio::time::timeout(timeout, startup_future).await?
113-
})?);
103+
return Ok(natsrpy_future_with_timeout(
104+
py,
105+
Some(timeout),
106+
async move {
107+
if session.read().await.is_some() {
108+
return Err(NatsrpyError::SessionError(
109+
"NATS session already exists".to_string(),
110+
));
111+
}
112+
// Scoping for early-dropping of a guard.
113+
{
114+
let mut sesion_guard = session.write().await;
115+
*sesion_guard = Some(conn_opts.connect(address).await?);
116+
}
117+
Ok(())
118+
},
119+
)?);
114120
}
115121

116122
#[pyo3(signature = (subject, payload, *, headers=None, reply=None, err_on_disconnect = false))]

src/subscription.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::sync::Mutex;
77

88
use crate::{
99
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
10-
utils::natsrpy_future,
10+
utils::futures::natsrpy_future_with_timeout,
1111
};
1212

1313
#[pyo3::pyclass]
@@ -39,21 +39,12 @@ impl Subscription {
3939
let Some(inner) = self.inner.clone() else {
4040
return Err(NatsrpyError::NotInitialized);
4141
};
42-
43-
let future = async move {
42+
natsrpy_future_with_timeout(py, timeout, async move {
4443
let Some(message) = inner.lock().await.next().await else {
4544
return Err(PyStopAsyncIteration::new_err("End of the stream.").into());
4645
};
4746

4847
crate::message::Message::try_from(message)
49-
};
50-
51-
natsrpy_future(py, async move {
52-
if let Some(timeout) = timeout {
53-
tokio::time::timeout(timeout, future).await?
54-
} else {
55-
future.await
56-
}
5748
})
5849
}
5950

src/utils/futures.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use std::time::Duration;
2+
13
use pyo3::{Bound, IntoPyObject, PyAny, Python};
24

3-
use crate::exceptions::rust_err::NatsrpyResult;
5+
use crate::exceptions::rust_err::{NatsrpyError, NatsrpyResult};
46

57
pub fn natsrpy_future<F, T>(py: Python, fut: F) -> NatsrpyResult<Bound<PyAny>>
68
where
@@ -11,3 +13,31 @@ where
1113
pyo3_async_runtimes::tokio::future_into_py(py, async { fut.await.map_err(Into::into) })?;
1214
Ok(res)
1315
}
16+
17+
pub fn natsrpy_future_with_timeout<F, T, D>(
18+
py: Python,
19+
timeout: Option<D>,
20+
fut: F,
21+
) -> NatsrpyResult<Bound<PyAny>>
22+
where
23+
F: Future<Output = NatsrpyResult<T>> + Send + 'static,
24+
T: for<'py> IntoPyObject<'py> + Send + 'static,
25+
D: Into<Duration>,
26+
{
27+
let timeout = timeout.map(Into::into);
28+
let res = pyo3_async_runtimes::tokio::future_into_py(py, async move {
29+
if let Some(timeout) = timeout {
30+
tokio::time::timeout(timeout, fut)
31+
.await
32+
// First map_err is for timeout
33+
.map_err(NatsrpyError::from)?
34+
// This one is for result returned from
35+
// a future.
36+
.map_err(Into::into)
37+
} else {
38+
// Simple return with error mapping.
39+
fut.await.map_err(Into::into)
40+
}
41+
})?;
42+
Ok(res)
43+
}

src/utils/py_types.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use pyo3::{
24
FromPyObject,
35
types::{PyBytes, PyBytesMethods},
@@ -40,3 +42,35 @@ impl From<SendableValue> for bytes::Bytes {
4042
}
4143
}
4244
}
45+
46+
#[derive(Clone, Debug, Copy, PartialEq, PartialOrd)]
47+
pub enum TimeoutValue {
48+
Duration(Duration),
49+
FloatSecs(f32),
50+
}
51+
52+
impl From<TimeoutValue> for Duration {
53+
fn from(value: TimeoutValue) -> Self {
54+
match value {
55+
TimeoutValue::Duration(duration) => duration,
56+
TimeoutValue::FloatSecs(fsecs) => Self::from_secs_f32(fsecs),
57+
}
58+
}
59+
}
60+
61+
impl<'py> FromPyObject<'_, 'py> for TimeoutValue {
62+
type Error = NatsrpyError;
63+
64+
fn extract(obj: pyo3::Borrowed<'_, 'py, pyo3::PyAny>) -> Result<Self, Self::Error> {
65+
#[allow(clippy::option_if_let_else)]
66+
if let Ok(fsec) = obj.extract::<f32>() {
67+
Ok(Self::FloatSecs(fsec))
68+
} else if let Ok(duration) = obj.extract::<Duration>() {
69+
Ok(Self::Duration(duration))
70+
} else {
71+
Err(NatsrpyError::InvalidArgument(String::from(
72+
"As timeouts only float or timedelta are accepted.",
73+
)))
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)