Skip to content

Commit 442de0f

Browse files
committed
Replaced seconds with duration.
1 parent 21b18fd commit 442de0f

File tree

3 files changed

+22
-25
lines changed

3 files changed

+22
-25
lines changed

src/js/kv.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct KVConfig {
1919
description: Option<String>,
2020
max_value_size: Option<i32>,
2121
history: Option<i64>,
22-
max_age: Option<f32>,
22+
max_age: Option<Duration>,
2323
max_bytes: Option<i64>,
2424
storage: Option<js::stream::StorageType>,
2525
num_replicas: Option<usize>,
@@ -29,7 +29,7 @@ pub struct KVConfig {
2929
mirror_direct: Option<bool>,
3030
compression: Option<bool>,
3131
placement: Option<js::stream::Placement>,
32-
limit_markers: Option<f32>,
32+
limit_markers: Option<Duration>,
3333
}
3434

3535
#[pymethods]
@@ -58,7 +58,7 @@ impl KVConfig {
5858
description: Option<String>,
5959
max_value_size: Option<i32>,
6060
history: Option<i64>,
61-
max_age: Option<f32>,
61+
max_age: Option<Duration>,
6262
max_bytes: Option<i64>,
6363
storage: Option<js::stream::StorageType>,
6464
num_replicas: Option<usize>,
@@ -68,7 +68,7 @@ impl KVConfig {
6868
mirror_direct: Option<bool>,
6969
compression: Option<bool>,
7070
placement: Option<js::stream::Placement>,
71-
limit_markers: Option<f32>,
71+
limit_markers: Option<Duration>,
7272
) -> Self {
7373
Self {
7474
bucket,
@@ -101,7 +101,6 @@ impl TryFrom<KVConfig> for async_nats::jetstream::kv::Config {
101101
history: value.history.unwrap_or_default(),
102102
max_age: value
103103
.max_age
104-
.map(std::time::Duration::from_secs_f32)
105104
.unwrap_or_default(),
106105
max_bytes: value.max_bytes.unwrap_or_default(),
107106
storage: value.storage.unwrap_or_default().into(),
@@ -126,7 +125,7 @@ impl TryFrom<KVConfig> for async_nats::jetstream::kv::Config {
126125
mirror_direct: value.mirror_direct.unwrap_or_default(),
127126
compression: value.compression.unwrap_or_default(),
128127
placement: value.placement.map(std::convert::Into::into),
129-
limit_markers: value.limit_markers.map(Duration::from_secs_f32),
128+
limit_markers: value.limit_markers,
130129
})
131130
}
132131
}

src/nats_cls.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ pub struct NatsCls {
2323
read_buffer_capacity: u16,
2424
sender_capacity: usize,
2525
max_reconnects: Option<usize>,
26-
connection_timeout: f32,
27-
request_timeout: f32,
26+
connection_timeout: Duration,
27+
request_timeout: Option<Duration>,
2828
}
2929

3030
#[pymethods]
@@ -40,8 +40,8 @@ impl NatsCls {
4040
read_buffer_capacity=65535,
4141
sender_capacity=128,
4242
max_reconnects=None,
43-
connection_timeout=5.0,
44-
request_timeout=10.0,
43+
connection_timeout=Duration::from_secs(5),
44+
request_timeout=Duration::from_secs(10),
4545
))]
4646
fn __new__(
4747
addrs: Vec<String>,
@@ -52,8 +52,8 @@ impl NatsCls {
5252
read_buffer_capacity: u16,
5353
sender_capacity: usize,
5454
max_reconnects: Option<usize>,
55-
connection_timeout: f32,
56-
request_timeout: f32,
55+
connection_timeout: Duration,
56+
request_timeout: Option<Duration>,
5757
) -> Self {
5858
Self {
5959
nats_session: Arc::new(RwLock::new(None)),
@@ -80,10 +80,8 @@ impl NatsCls {
8080
}
8181
conn_opts = conn_opts
8282
.max_reconnects(self.max_reconnects)
83-
.connection_timeout(std::time::Duration::from_secs_f32(self.connection_timeout))
84-
.request_timeout(Some(std::time::Duration::from_secs_f32(
85-
self.request_timeout,
86-
)))
83+
.connection_timeout(self.connection_timeout)
84+
.request_timeout(self.request_timeout)
8785
.read_buffer_capacity(self.read_buffer_capacity)
8886
.client_capacity(self.sender_capacity);
8987

@@ -109,7 +107,7 @@ impl NatsCls {
109107
}
110108
Ok(())
111109
};
112-
let timeout = Duration::from_secs_f32(self.connection_timeout);
110+
let timeout = self.connection_timeout;
113111
return Ok(natsrpy_future(py, async move {
114112
tokio::time::timeout(timeout, startup_future).await?
115113
})?);
@@ -160,7 +158,7 @@ impl NatsCls {
160158
payload: Option<Bound<PyBytes>>,
161159
headers: Option<Bound<PyDict>>,
162160
inbox: Option<String>,
163-
timeout: Option<f32>,
161+
timeout: Option<Duration>,
164162
) -> PyResult<Bound<'py, PyAny>> {
165163
let session = self.nats_session.clone();
166164
let data = payload.map(|inner| bytes::Bytes::from(inner.as_bytes().to_vec()));
@@ -173,7 +171,7 @@ impl NatsCls {
173171
payload: data,
174172
headers: headermap,
175173
inbox,
176-
timeout: timeout.map(|t| Some(std::time::Duration::from_secs_f32(t))),
174+
timeout: timeout.map(|val| Some(val)),
177175
};
178176
session.send_request(subject, request).await?;
179177
Ok(())
@@ -223,8 +221,8 @@ impl NatsCls {
223221
py: Python<'py>,
224222
domain: Option<String>,
225223
api_prefix: Option<String>,
226-
timeout: Option<f32>,
227-
ack_timeout: Option<f32>,
224+
timeout: Option<Duration>,
225+
ack_timeout: Option<Duration>,
228226
concurrency_limit: Option<usize>,
229227
max_ack_inflight: Option<usize>,
230228
backpressure_on_inflight: Option<bool>,
@@ -235,10 +233,10 @@ impl NatsCls {
235233
let mut builder =
236234
async_nats::jetstream::ContextBuilder::new().concurrency_limit(concurrency_limit);
237235
if let Some(timeout) = ack_timeout {
238-
builder = builder.ack_timeout(Duration::from_secs_f32(timeout));
236+
builder = builder.ack_timeout(timeout);
239237
}
240238
if let Some(timeout) = timeout {
241-
builder = builder.timeout(Duration::from_secs_f32(timeout));
239+
builder = builder.timeout(timeout);
242240
}
243241
if let Some(max_ack_inflight) = max_ack_inflight {
244242
builder = builder.max_ack_inflight(max_ack_inflight);

src/subscription.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl Subscription {
3434
pub fn next<'py>(
3535
&self,
3636
py: Python<'py>,
37-
timeout: Option<f32>,
37+
timeout: Option<Duration>,
3838
) -> NatsrpyResult<Bound<'py, PyAny>> {
3939
let Some(inner) = self.inner.clone() else {
4040
return Err(NatsrpyError::NotInitialized);
@@ -54,7 +54,7 @@ impl Subscription {
5454

5555
natsrpy_future(py, async move {
5656
if let Some(timeout) = timeout {
57-
tokio::time::timeout(Duration::from_secs_f32(timeout), future).await?
57+
tokio::time::timeout(timeout, future).await?
5858
} else {
5959
future.await
6060
}

0 commit comments

Comments
 (0)