Skip to content

Commit 6545f18

Browse files
s3riusCopilot
andauthored
Optimize performance + allocator replacement. (#43)
Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
1 parent 3779c49 commit 6545f18

File tree

21 files changed

+453
-451
lines changed

21 files changed

+453
-451
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ async-nats = "0.46"
1818
bytes = "1.11.1"
1919
futures-util = "0.3.32"
2020
log = "0.4.29"
21-
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
21+
mimalloc = "0.1.48"
22+
pyo3 = { version = "0.28", features = ["experimental-inspect"] }
2223
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
2324
pyo3-log = "0.13.3"
2425
serde = { version = "1.0.228", features = ["derive"] }

src/exceptions/rust_err.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub type NatsrpyResult<T> = Result<T, NatsrpyError>;
88
pub enum NatsrpyError {
99
#[error(transparent)]
1010
StdIOError(#[from] std::io::Error),
11+
#[error("The lock is poisoned")]
12+
PoisonedLock,
1113
#[error(transparent)]
1214
StdParseIntError(#[from] std::num::ParseIntError),
1315
#[error(transparent)]

src/js/consumers/pull/consumer.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use futures_util::StreamExt;
44
use pyo3::{Bound, PyAny, Python};
5-
use tokio::sync::RwLock;
65

76
use crate::{
87
exceptions::rust_err::NatsrpyResult,
@@ -19,7 +18,7 @@ pub struct PullConsumer {
1918
name: String,
2019
#[pyo3(get)]
2120
stream_name: String,
22-
consumer: Arc<RwLock<NatsPullConsumer>>,
21+
consumer: Arc<NatsPullConsumer>,
2322
}
2423

2524
impl PullConsumer {
@@ -29,7 +28,7 @@ impl PullConsumer {
2928
Self {
3029
name: info.name.clone(),
3130
stream_name: info.stream_name.clone(),
32-
consumer: Arc::new(RwLock::new(consumer)),
31+
consumer: Arc::new(consumer),
3332
}
3433
}
3534
}
@@ -60,13 +59,9 @@ impl PullConsumer {
6059
min_ack_pending: Option<usize>,
6160
timeout: Option<TimeValue>,
6261
) -> NatsrpyResult<Bound<'py, PyAny>> {
63-
let ctx = self.consumer.clone();
64-
65-
// Because we borrow cosnumer lock
66-
// later for modifications of fetchbuilder.
62+
let consumer = self.consumer.clone();
6763
#[allow(clippy::significant_drop_tightening)]
6864
natsrpy_future_with_timeout(py, timeout, async move {
69-
let consumer = ctx.read().await;
7065
let mut fetch_builder = consumer.fetch();
7166
if let Some(max_messages) = max_messages {
7267
fetch_builder = fetch_builder.max_messages(max_messages);

src/js/consumers/push/consumer.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use futures_util::StreamExt;
44
use pyo3::{Bound, PyAny, PyRef, Python};
5-
use tokio::sync::RwLock;
65

76
use crate::{
87
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
@@ -20,7 +19,7 @@ pub struct PushConsumer {
2019
name: String,
2120
#[pyo3(get)]
2221
stream_name: String,
23-
consumer: Arc<RwLock<NatsPushConsumer>>,
22+
consumer: Arc<NatsPushConsumer>,
2423
}
2524

2625
impl PushConsumer {
@@ -30,32 +29,30 @@ impl PushConsumer {
3029
Self {
3130
name: info.name.clone(),
3231
stream_name: info.stream_name.clone(),
33-
consumer: Arc::new(RwLock::new(consumer)),
32+
consumer: Arc::new(consumer),
3433
}
3534
}
3635
}
3736

3837
#[pyo3::pyclass]
3938
pub struct MessagesIterator {
40-
messages: Option<Arc<RwLock<async_nats::jetstream::consumer::push::Messages>>>,
39+
messages: Option<Arc<tokio::sync::Mutex<async_nats::jetstream::consumer::push::Messages>>>,
4140
}
4241

4342
impl From<async_nats::jetstream::consumer::push::Messages> for MessagesIterator {
4443
fn from(value: async_nats::jetstream::consumer::push::Messages) -> Self {
4544
Self {
46-
messages: Some(Arc::new(RwLock::new(value))),
45+
messages: Some(Arc::new(tokio::sync::Mutex::new(value))),
4746
}
4847
}
4948
}
5049

5150
#[pyo3::pymethods]
5251
impl PushConsumer {
5352
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
54-
let consumer_guard = self.consumer.clone();
53+
let consumer = self.consumer.clone();
5554
natsrpy_future(py, async move {
56-
Ok(MessagesIterator::from(
57-
consumer_guard.read().await.messages().await?,
58-
))
55+
Ok(MessagesIterator::from(consumer.messages().await?))
5956
})
6057
}
6158

@@ -87,7 +84,7 @@ impl MessagesIterator {
8784
};
8885
#[allow(clippy::significant_drop_tightening)]
8986
natsrpy_future_with_timeout(py, timeout, async move {
90-
let mut messages = messages_guard.write().await;
87+
let mut messages = messages_guard.lock().await;
9188
let Some(message) = messages.next().await else {
9289
return Err(NatsrpyError::AsyncStopIteration);
9390
};

src/js/counters.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use async_nats::{HeaderMap, jetstream::context::traits::Publisher};
44
use pyo3::{Bound, PyAny, Python};
5-
use tokio::sync::RwLock;
65

76
use crate::{
87
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
@@ -301,17 +300,18 @@ impl CounterEntry {
301300
#[pyo3::pyclass]
302301
#[allow(dead_code)]
303302
pub struct Counters {
304-
stream: Arc<RwLock<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>>,
305-
js: Arc<RwLock<async_nats::jetstream::Context>>,
303+
stream: Arc<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>,
304+
js: Arc<async_nats::jetstream::Context>,
306305
}
307306

308307
impl Counters {
308+
#[must_use]
309309
pub fn new(
310310
stream: async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>,
311-
js: Arc<RwLock<async_nats::jetstream::Context>>,
311+
js: Arc<async_nats::jetstream::Context>,
312312
) -> Self {
313313
Self {
314-
stream: Arc::new(RwLock::new(stream)),
314+
stream: Arc::new(stream),
315315
js,
316316
}
317317
}
@@ -357,8 +357,6 @@ impl Counters {
357357
headers.insert(COUNTER_INCREMENT_HEADER, value.to_string());
358358
natsrpy_future_with_timeout(py, timeout, async move {
359359
let resp = js
360-
.read()
361-
.await
362360
.publish_message(async_nats::jetstream::message::OutboundMessage {
363361
subject: key.into(),
364362
payload: bytes::Bytes::new(),
@@ -404,11 +402,7 @@ impl Counters {
404402
) -> NatsrpyResult<Bound<'py, PyAny>> {
405403
let stream_guard = self.stream.clone();
406404
natsrpy_future_with_timeout(py, timeout, async move {
407-
let message = stream_guard
408-
.read()
409-
.await
410-
.direct_get_last_for_subject(key)
411-
.await?;
405+
let message = stream_guard.direct_get_last_for_subject(key).await?;
412406
CounterEntry::try_from(message)
413407
})
414408
}

src/js/jetstream.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use async_nats::{Subject, connection::State, jetstream::context::traits::Publisher};
44
use pyo3::{Bound, PyAny, Python, types::PyDict};
5-
use tokio::sync::RwLock;
65

76
use crate::{
87
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
@@ -15,15 +14,13 @@ use crate::{
1514

1615
#[pyo3::pyclass]
1716
pub struct JetStream {
18-
ctx: Arc<RwLock<async_nats::jetstream::Context>>,
17+
ctx: Arc<async_nats::jetstream::Context>,
1918
}
2019

2120
impl JetStream {
2221
#[must_use]
2322
pub fn new(ctx: async_nats::jetstream::Context) -> Self {
24-
Self {
25-
ctx: Arc::new(RwLock::new(ctx)),
26-
}
23+
Self { ctx: Arc::new(ctx) }
2724
}
2825
}
2926

@@ -92,20 +89,16 @@ impl JetStream {
9289
err_on_disconnect: bool,
9390
wait: bool,
9491
) -> NatsrpyResult<Bound<'py, PyAny>> {
95-
let ctx = self.ctx.clone();
9692
let data = payload.into();
9793
let headermap = headers
9894
.map(async_nats::HeaderMap::from_pydict)
9995
.transpose()?;
96+
let client = self.ctx.clone();
10097
natsrpy_future(py, async move {
101-
if err_on_disconnect
102-
&& ctx.read().await.client().connection_state() == State::Disconnected
103-
{
98+
if err_on_disconnect && client.client().connection_state() == State::Disconnected {
10499
return Err(NatsrpyError::Disconnected);
105100
}
106-
let publication = ctx
107-
.read()
108-
.await
101+
let publication = client
109102
.publish_message(async_nats::jetstream::message::OutboundMessage {
110103
subject: Subject::from(subject),
111104
payload: data,

0 commit comments

Comments
 (0)