Skip to content

Commit 8408451

Browse files
Copilots3rius
andauthored
Perf: remove unnecessary locks, replace polling with channels, increase buffers (#44)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: s3rius <18153319+s3rius@users.noreply.github.com>
1 parent 60a11f1 commit 8408451

File tree

11 files changed

+159
-152
lines changed

11 files changed

+159
-152
lines changed

src/js/consumers/pull/consumer.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use std::sync::{Arc, RwLock};
1+
use std::sync::Arc;
22

33
use futures_util::StreamExt;
44
use pyo3::{Bound, PyAny, Python};
55

66
use crate::{
7-
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
7+
exceptions::rust_err::NatsrpyResult,
88
utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue},
99
};
1010

@@ -18,7 +18,7 @@ pub struct PullConsumer {
1818
name: String,
1919
#[pyo3(get)]
2020
stream_name: String,
21-
consumer: Arc<RwLock<NatsPullConsumer>>,
21+
consumer: Arc<NatsPullConsumer>,
2222
}
2323

2424
impl PullConsumer {
@@ -28,17 +28,9 @@ impl PullConsumer {
2828
Self {
2929
name: info.name.clone(),
3030
stream_name: info.stream_name.clone(),
31-
consumer: Arc::new(RwLock::new(consumer)),
31+
consumer: Arc::new(consumer),
3232
}
3333
}
34-
35-
pub fn get_consumer(&self) -> NatsrpyResult<NatsPullConsumer> {
36-
Ok(self
37-
.consumer
38-
.read()
39-
.map_err(|_| NatsrpyError::SessionError("Lock poisoned".to_string()))?
40-
.clone())
41-
}
4234
}
4335

4436
#[pyo3::pymethods]
@@ -67,9 +59,7 @@ impl PullConsumer {
6759
min_ack_pending: Option<usize>,
6860
timeout: Option<TimeValue>,
6961
) -> NatsrpyResult<Bound<'py, PyAny>> {
70-
// Because we borrow cosnumer lock
71-
// later for modifications of fetchbuilder.
72-
let consumer = self.get_consumer()?;
62+
let consumer = self.consumer.clone();
7363
#[allow(clippy::significant_drop_tightening)]
7464
natsrpy_future_with_timeout(py, timeout, async move {
7565
let mut fetch_builder = consumer.fetch();

src/js/consumers/push/consumer.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::{Arc, RwLock};
1+
use std::sync::Arc;
22

33
use futures_util::StreamExt;
44
use pyo3::{Bound, PyAny, PyRef, Python};
@@ -19,7 +19,7 @@ pub struct PushConsumer {
1919
name: String,
2020
#[pyo3(get)]
2121
stream_name: String,
22-
consumer: Arc<RwLock<NatsPushConsumer>>,
22+
consumer: Arc<NatsPushConsumer>,
2323
}
2424

2525
impl PushConsumer {
@@ -29,17 +29,9 @@ impl PushConsumer {
2929
Self {
3030
name: info.name.clone(),
3131
stream_name: info.stream_name.clone(),
32-
consumer: Arc::new(RwLock::new(consumer)),
32+
consumer: Arc::new(consumer),
3333
}
3434
}
35-
36-
pub fn get_consumer(&self) -> NatsrpyResult<NatsPushConsumer> {
37-
Ok(self
38-
.consumer
39-
.read()
40-
.map_err(|_| NatsrpyError::SessionError(String::from("Lock is poisoned")))?
41-
.clone())
42-
}
4335
}
4436

4537
#[pyo3::pyclass]
@@ -58,7 +50,7 @@ impl From<async_nats::jetstream::consumer::push::Messages> for MessagesIterator
5850
#[pyo3::pymethods]
5951
impl PushConsumer {
6052
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
61-
let consumer = self.get_consumer()?;
53+
let consumer = self.consumer.clone();
6254
natsrpy_future(py, async move {
6355
Ok(MessagesIterator::from(consumer.messages().await?))
6456
})

src/js/counters.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use async_nats::{HeaderMap, jetstream::context::traits::Publisher};
44
use pyo3::{Bound, PyAny, Python};
5-
use tokio::sync::RwLock;
65

76
use crate::{
87
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
@@ -301,7 +300,7 @@ impl CounterEntry {
301300
#[pyo3::pyclass]
302301
#[allow(dead_code)]
303302
pub struct Counters {
304-
stream: Arc<RwLock<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>>,
303+
stream: Arc<async_nats::jetstream::stream::Stream<async_nats::jetstream::stream::Info>>,
305304
js: Arc<async_nats::jetstream::Context>,
306305
}
307306

@@ -312,7 +311,7 @@ impl Counters {
312311
js: Arc<async_nats::jetstream::Context>,
313312
) -> Self {
314313
Self {
315-
stream: Arc::new(RwLock::new(stream)),
314+
stream: Arc::new(stream),
316315
js,
317316
}
318317
}
@@ -404,8 +403,6 @@ impl Counters {
404403
let stream_guard = self.stream.clone();
405404
natsrpy_future_with_timeout(py, timeout, async move {
406405
let message = stream_guard
407-
.read()
408-
.await
409406
.direct_get_last_for_subject(key)
410407
.await?;
411408
CounterEntry::try_from(message)

src/js/kv.rs

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use pyo3::{
1313
Bound, Py, PyAny, PyRef, Python,
1414
types::{PyBytes, PyDateTime},
1515
};
16-
use tokio::sync::{Mutex, RwLock};
16+
use tokio::sync::Mutex;
1717

1818
use crate::{
1919
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
@@ -229,20 +229,19 @@ pub struct KeyValue {
229229
put_prefix: Option<String>,
230230
#[pyo3(get)]
231231
use_jetstream_prefix: bool,
232-
store: Arc<RwLock<async_nats::jetstream::kv::Store>>,
232+
store: Arc<async_nats::jetstream::kv::Store>,
233233
}
234234

235235
impl KeyValue {
236236
#[must_use]
237237
pub fn new(store: async_nats::jetstream::kv::Store) -> Self {
238-
// store.
239238
Self {
240239
name: store.name.clone(),
241240
stream_name: store.stream_name.clone(),
242241
prefix: store.prefix.clone(),
243242
put_prefix: store.put_prefix.clone(),
244243
use_jetstream_prefix: store.use_jetstream_prefix,
245-
store: Arc::new(RwLock::new(store)),
244+
store: Arc::new(store),
246245
}
247246
}
248247
}
@@ -253,8 +252,6 @@ impl KeyValue {
253252
let store = self.store.clone();
254253
natsrpy_future(py, async move {
255254
Ok(store
256-
.read()
257-
.await
258255
.get(key)
259256
.await?
260257
.map(|data| Python::attach(move |gil| PyBytes::new(gil, &data).unbind())))
@@ -274,12 +271,10 @@ impl KeyValue {
274271
natsrpy_future(py, async move {
275272
if let Some(ttl) = ttl {
276273
Ok(store
277-
.read()
278-
.await
279274
.create_with_ttl(key, data, ttl.into())
280275
.await?)
281276
} else {
282-
Ok(store.read().await.create(key, data).await?)
277+
Ok(store.create(key, data).await?)
283278
}
284279
})
285280
}
@@ -300,14 +295,10 @@ impl KeyValue {
300295
natsrpy_future(py, async move {
301296
match (ttl, expect_revision) {
302297
(None, _) => Ok(store
303-
.read()
304-
.await
305298
.purge_expect_revision(key, expect_revision)
306299
.await?),
307-
(Some(ttl), None) => Ok(store.read().await.purge_with_ttl(key, ttl.into()).await?),
300+
(Some(ttl), None) => Ok(store.purge_with_ttl(key, ttl.into()).await?),
308301
(Some(ttl), Some(revision)) => Ok(store
309-
.read()
310-
.await
311302
.purge_expect_revision_with_ttl(key, revision, ttl.into())
312303
.await?),
313304
}
@@ -324,7 +315,7 @@ impl KeyValue {
324315
let data = value.into();
325316
natsrpy_future(
326317
py,
327-
async move { Ok(store.read().await.put(key, data).await?) },
318+
async move { Ok(store.put(key, data).await?) },
328319
)
329320
}
330321

@@ -341,8 +332,6 @@ impl KeyValue {
341332
let store = self.store.clone();
342333
natsrpy_future(py, async move {
343334
Ok(store
344-
.read()
345-
.await
346335
.delete_expect_revision(key, expect_revision)
347336
.await?)
348337
})
@@ -358,8 +347,6 @@ impl KeyValue {
358347
let store = self.store.clone();
359348
natsrpy_future(py, async move {
360349
Ok(store
361-
.read()
362-
.await
363350
.update(key, value.into(), revision)
364351
.await?)
365352
})
@@ -369,7 +356,7 @@ impl KeyValue {
369356
let store = self.store.clone();
370357
natsrpy_future(py, async move {
371358
Ok(KVEntryIterator::new(Streamer::new(
372-
store.read().await.history(key).await?,
359+
store.history(key).await?,
373360
)))
374361
})
375362
}
@@ -383,9 +370,9 @@ impl KeyValue {
383370
let store = self.store.clone();
384371
natsrpy_future(py, async move {
385372
let watch = if let Some(rev) = from_revision {
386-
store.read().await.watch_all_from_revision(rev).await?
373+
store.watch_all_from_revision(rev).await?
387374
} else {
388-
store.read().await.watch_all().await?
375+
store.watch_all().await?
389376
};
390377
Ok(KVEntryIterator::new(Streamer::new(watch)))
391378
})
@@ -401,9 +388,9 @@ impl KeyValue {
401388
let store = self.store.clone();
402389
natsrpy_future(py, async move {
403390
let watch = if let Some(rev) = from_revision {
404-
store.read().await.watch_from_revision(key, rev).await?
391+
store.watch_from_revision(key, rev).await?
405392
} else {
406-
store.read().await.watch(key).await?
393+
store.watch(key).await?
407394
};
408395
Ok(KVEntryIterator::new(Streamer::new(watch)))
409396
})
@@ -417,7 +404,7 @@ impl KeyValue {
417404
let store = self.store.clone();
418405
natsrpy_future(py, async move {
419406
Ok(KVEntryIterator::new(Streamer::new(
420-
store.read().await.watch_with_history(key).await?,
407+
store.watch_with_history(key).await?,
421408
)))
422409
})
423410
}
@@ -430,7 +417,7 @@ impl KeyValue {
430417
let store = self.store.clone();
431418
natsrpy_future(py, async move {
432419
Ok(KVEntryIterator::new(Streamer::new(
433-
store.read().await.watch_many(keys).await?,
420+
store.watch_many(keys).await?,
434421
)))
435422
})
436423
}
@@ -443,7 +430,7 @@ impl KeyValue {
443430
let store = self.store.clone();
444431
natsrpy_future(py, async move {
445432
Ok(KVEntryIterator::new(Streamer::new(
446-
store.read().await.watch_many_with_history(keys).await?,
433+
store.watch_many_with_history(keys).await?,
447434
)))
448435
})
449436
}
@@ -462,16 +449,12 @@ impl KeyValue {
462449
natsrpy_future(py, async move {
463450
let entry = if let Some(rev) = revision {
464451
store
465-
.read()
466-
.await
467452
.entry_for_revision(key, rev)
468453
.await?
469454
.map(KVEntry::try_from)
470455
.transpose()?
471456
} else {
472457
store
473-
.read()
474-
.await
475458
.entry(key)
476459
.await?
477460
.map(KVEntry::try_from)
@@ -484,15 +467,15 @@ impl KeyValue {
484467
pub fn status<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
485468
let store = self.store.clone();
486469
natsrpy_future(py, async move {
487-
KVStatus::try_from(store.read().await.status().await?)
470+
KVStatus::try_from(store.status().await?)
488471
})
489472
}
490473

491474
pub fn keys<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
492475
let store = self.store.clone();
493476
natsrpy_future(py, async move {
494477
Ok(KeysIterator::new(Streamer::new(
495-
store.read().await.keys().await?,
478+
store.keys().await?,
496479
)))
497480
})
498481
}

0 commit comments

Comments
 (0)