Skip to content

Commit 60a11f1

Browse files
committed
Made some optimizations.
1 parent 6f0efae commit 60a11f1

File tree

15 files changed

+152
-143
lines changed

15 files changed

+152
-143
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ async-nats = "0.46"
1818
bytes = "1.11.1"
1919
futures-util = "0.3.32"
2020
log = "0.4.29"
21-
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
21+
mimalloc = "0.1.48"
22+
pyo3 = { version = "0.28", features = ["experimental-inspect"] }
2223
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
2324
pyo3-log = "0.13.3"
2425
serde = { version = "1.0.228", features = ["derive"] }

src/exceptions/rust_err.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub type NatsrpyResult<T> = Result<T, NatsrpyError>;
88
pub enum NatsrpyError {
99
#[error(transparent)]
1010
StdIOError(#[from] std::io::Error),
11+
#[error("The lock is poisoned")]
12+
PoisonedLock,
1113
#[error(transparent)]
1214
StdParseIntError(#[from] std::num::ParseIntError),
1315
#[error(transparent)]

src/js/consumers/pull/consumer.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, RwLock};
22

33
use futures_util::StreamExt;
44
use pyo3::{Bound, PyAny, Python};
5-
use tokio::sync::RwLock;
65

76
use crate::{
8-
exceptions::rust_err::NatsrpyResult,
7+
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
98
utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue},
109
};
1110

@@ -32,6 +31,14 @@ impl PullConsumer {
3231
consumer: Arc::new(RwLock::new(consumer)),
3332
}
3433
}
34+
35+
pub fn get_consumer(&self) -> NatsrpyResult<NatsPullConsumer> {
36+
Ok(self
37+
.consumer
38+
.read()
39+
.map_err(|_| NatsrpyError::SessionError("Lock poisoned".to_string()))?
40+
.clone())
41+
}
3542
}
3643

3744
#[pyo3::pymethods]
@@ -60,13 +67,11 @@ impl PullConsumer {
6067
min_ack_pending: Option<usize>,
6168
timeout: Option<TimeValue>,
6269
) -> NatsrpyResult<Bound<'py, PyAny>> {
63-
let ctx = self.consumer.clone();
64-
6570
// Because we borrow cosnumer lock
6671
// later for modifications of fetchbuilder.
72+
let consumer = self.get_consumer()?;
6773
#[allow(clippy::significant_drop_tightening)]
6874
natsrpy_future_with_timeout(py, timeout, async move {
69-
let consumer = ctx.read().await;
7075
let mut fetch_builder = consumer.fetch();
7176
if let Some(max_messages) = max_messages {
7277
fetch_builder = fetch_builder.max_messages(max_messages);

src/js/consumers/push/consumer.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, RwLock};
22

33
use futures_util::StreamExt;
44
use pyo3::{Bound, PyAny, PyRef, Python};
5-
use tokio::sync::RwLock;
65

76
use crate::{
87
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
@@ -33,29 +32,35 @@ impl PushConsumer {
3332
consumer: Arc::new(RwLock::new(consumer)),
3433
}
3534
}
35+
36+
pub fn get_consumer(&self) -> NatsrpyResult<NatsPushConsumer> {
37+
Ok(self
38+
.consumer
39+
.read()
40+
.map_err(|_| NatsrpyError::SessionError(String::from("Lock is poisoned")))?
41+
.clone())
42+
}
3643
}
3744

3845
#[pyo3::pyclass]
3946
pub struct MessagesIterator {
40-
messages: Option<Arc<RwLock<async_nats::jetstream::consumer::push::Messages>>>,
47+
messages: Option<Arc<tokio::sync::Mutex<async_nats::jetstream::consumer::push::Messages>>>,
4148
}
4249

4350
impl From<async_nats::jetstream::consumer::push::Messages> for MessagesIterator {
4451
fn from(value: async_nats::jetstream::consumer::push::Messages) -> Self {
4552
Self {
46-
messages: Some(Arc::new(RwLock::new(value))),
53+
messages: Some(Arc::new(tokio::sync::Mutex::new(value))),
4754
}
4855
}
4956
}
5057

5158
#[pyo3::pymethods]
5259
impl PushConsumer {
5360
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
54-
let consumer_guard = self.consumer.clone();
61+
let consumer = self.get_consumer()?;
5562
natsrpy_future(py, async move {
56-
Ok(MessagesIterator::from(
57-
consumer_guard.read().await.messages().await?,
58-
))
63+
Ok(MessagesIterator::from(consumer.messages().await?))
5964
})
6065
}
6166

@@ -87,7 +92,7 @@ impl MessagesIterator {
8792
};
8893
#[allow(clippy::significant_drop_tightening)]
8994
natsrpy_future_with_timeout(py, timeout, async move {
90-
let mut messages = messages_guard.write().await;
95+
let mut messages = messages_guard.lock().await;
9196
let Some(message) = messages.next().await else {
9297
return Err(NatsrpyError::AsyncStopIteration);
9398
};

src/js/counters.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,14 @@ impl CounterEntry {
302302
#[allow(dead_code)]
303303
pub struct Counters {
304304
stream: Arc<RwLock<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>>,
305-
js: Arc<RwLock<async_nats::jetstream::Context>>,
305+
js: Arc<async_nats::jetstream::Context>,
306306
}
307307

308308
impl Counters {
309+
#[must_use]
309310
pub fn new(
310311
stream: async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>,
311-
js: Arc<RwLock<async_nats::jetstream::Context>>,
312+
js: Arc<async_nats::jetstream::Context>,
312313
) -> Self {
313314
Self {
314315
stream: Arc::new(RwLock::new(stream)),
@@ -357,8 +358,6 @@ impl Counters {
357358
headers.insert(COUNTER_INCREMENT_HEADER, value.to_string());
358359
natsrpy_future_with_timeout(py, timeout, async move {
359360
let resp = js
360-
.read()
361-
.await
362361
.publish_message(async_nats::jetstream::message::OutboundMessage {
363362
subject: key.into(),
364363
payload: bytes::Bytes::new(),

src/js/jetstream.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use async_nats::{Subject, connection::State, jetstream::context::traits::Publisher};
44
use pyo3::{Bound, PyAny, Python, types::PyDict};
5-
use tokio::sync::RwLock;
65

76
use crate::{
87
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
@@ -15,15 +14,13 @@ use crate::{
1514

1615
#[pyo3::pyclass]
1716
pub struct JetStream {
18-
ctx: Arc<RwLock<async_nats::jetstream::Context>>,
17+
ctx: Arc<async_nats::jetstream::Context>,
1918
}
2019

2120
impl JetStream {
2221
#[must_use]
2322
pub fn new(ctx: async_nats::jetstream::Context) -> Self {
24-
Self {
25-
ctx: Arc::new(RwLock::new(ctx)),
26-
}
23+
Self { ctx: Arc::new(ctx) }
2724
}
2825
}
2926

@@ -92,20 +89,16 @@ impl JetStream {
9289
err_on_disconnect: bool,
9390
wait: bool,
9491
) -> NatsrpyResult<Bound<'py, PyAny>> {
95-
let ctx = self.ctx.clone();
9692
let data = payload.into();
9793
let headermap = headers
9894
.map(async_nats::HeaderMap::from_pydict)
9995
.transpose()?;
96+
let client = self.ctx.clone();
10097
natsrpy_future(py, async move {
101-
if err_on_disconnect
102-
&& ctx.read().await.client().connection_state() == State::Disconnected
103-
{
98+
if err_on_disconnect && client.client().connection_state() == State::Disconnected {
10499
return Err(NatsrpyError::Disconnected);
105100
}
106-
let publication = ctx
107-
.read()
108-
.await
101+
let publication = client
109102
.publish_message(async_nats::jetstream::message::OutboundMessage {
110103
subject: Subject::from(subject),
111104
payload: data,

src/js/managers/counters.rs

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ use crate::{
55
js::counters::{Counters, CountersConfig},
66
};
77
use pyo3::{Bound, PyAny, Python};
8-
use tokio::sync::RwLock;
98

109
use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future};
1110

1211
#[pyo3::pyclass]
1312
pub struct CountersManager {
14-
ctx: Arc<RwLock<async_nats::jetstream::Context>>,
13+
ctx: Arc<async_nats::jetstream::Context>,
1514
}
1615

1716
impl CountersManager {
18-
pub const fn new(ctx: Arc<RwLock<async_nats::jetstream::Context>>) -> Self {
17+
#[must_use]
18+
pub const fn new(ctx: Arc<async_nats::jetstream::Context>) -> Self {
1919
Self { ctx }
2020
}
2121
}
@@ -27,13 +27,13 @@ impl CountersManager {
2727
py: Python<'py>,
2828
config: CountersConfig,
2929
) -> NatsrpyResult<Bound<'py, PyAny>> {
30-
let ctx = self.ctx.clone();
30+
let client = self.ctx.clone();
3131
natsrpy_future(py, async move {
32-
let js = ctx.read().await;
3332
Ok(Counters::new(
34-
js.create_stream(async_nats::jetstream::stream::Config::try_from(config)?)
33+
client
34+
.create_stream(async_nats::jetstream::stream::Config::try_from(config)?)
3535
.await?,
36-
ctx.clone(),
36+
client,
3737
))
3838
})
3939
}
@@ -43,24 +43,22 @@ impl CountersManager {
4343
py: Python<'py>,
4444
config: CountersConfig,
4545
) -> NatsrpyResult<Bound<'py, PyAny>> {
46-
let ctx = self.ctx.clone();
46+
let client = self.ctx.clone();
4747
natsrpy_future(py, async move {
48-
let info = ctx
49-
.read()
50-
.await
48+
let info = client
5149
.create_or_update_stream(async_nats::jetstream::stream::Config::try_from(config)?)
5250
.await?;
5351
Ok(Counters::new(
54-
ctx.read().await.get_stream(info.config.name).await?,
55-
ctx.clone(),
52+
client.get_stream(info.config.name).await?,
53+
client,
5654
))
5755
})
5856
}
5957

6058
pub fn get<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult<Bound<'py, PyAny>> {
61-
let ctx = self.ctx.clone();
59+
let client = self.ctx.clone();
6260
natsrpy_future(py, async move {
63-
let stream = ctx.read().await.get_stream(&name).await?;
61+
let stream = client.get_stream(&name).await?;
6462
let config = stream.get_info().await?.config;
6563
if !config.allow_direct {
6664
return Err(NatsrpyError::SessionError(format!(
@@ -72,33 +70,31 @@ impl CountersManager {
7270
"Stream {name} doesn't allow message counters.",
7371
)));
7472
}
75-
Ok(Counters::new(stream, ctx.clone()))
73+
Ok(Counters::new(stream, client))
7674
})
7775
}
7876

7977
pub fn delete<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult<Bound<'py, PyAny>> {
80-
let ctx = self.ctx.clone();
81-
natsrpy_future(py, async move {
82-
let js = ctx.read().await;
83-
Ok(js.delete_stream(name).await?.success)
84-
})
78+
let client = self.ctx.clone();
79+
natsrpy_future(
80+
py,
81+
async move { Ok(client.delete_stream(name).await?.success) },
82+
)
8583
}
8684

8785
pub fn update<'py>(
8886
&self,
8987
py: Python<'py>,
9088
config: CountersConfig,
9189
) -> NatsrpyResult<Bound<'py, PyAny>> {
92-
let ctx = self.ctx.clone();
90+
let client = self.ctx.clone();
9391
natsrpy_future(py, async move {
94-
let info = ctx
95-
.read()
96-
.await
92+
let info = client
9793
.update_stream(async_nats::jetstream::stream::Config::try_from(config)?)
9894
.await?;
9995
Ok(Counters::new(
100-
ctx.read().await.get_stream(info.config.name).await?,
101-
ctx.clone(),
96+
client.get_stream(info.config.name).await?,
97+
client,
10298
))
10399
})
104100
}

0 commit comments

Comments
 (0)