-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconsumer.rs
More file actions
97 lines (90 loc) · 3.1 KB
/
consumer.rs
File metadata and controls
97 lines (90 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::sync::Arc;
use futures_util::StreamExt;
use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;
use crate::{
exceptions::rust_err::NatsrpyResult,
utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue},
};
type NatsPullConsumer =
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>;
#[pyo3::pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PullConsumer {
consumer: Arc<RwLock<NatsPullConsumer>>,
}
impl PullConsumer {
#[must_use]
pub fn new(consumer: NatsPullConsumer) -> Self {
Self {
consumer: Arc::new(RwLock::new(consumer)),
}
}
}
#[pyo3::pymethods]
impl PullConsumer {
#[pyo3(signature=(
max_messages=None,
group=None,
priority=None,
max_bytes=None,
heartbeat=None,
expires=None,
min_pending=None,
min_ack_pending=None,
timeout=None,
))]
pub fn fetch<'py>(
&self,
py: Python<'py>,
max_messages: Option<usize>,
group: Option<String>,
priority: Option<usize>,
max_bytes: Option<usize>,
heartbeat: Option<TimeValue>,
expires: Option<TimeValue>,
min_pending: Option<usize>,
min_ack_pending: Option<usize>,
timeout: Option<TimeValue>,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.consumer.clone();
// Because we borrow cosnumer lock
// later for modifications of fetchbuilder.
#[allow(clippy::significant_drop_tightening)]
natsrpy_future_with_timeout(py, timeout, async move {
let consumer = ctx.read().await;
let mut fetch_builder = consumer.fetch();
if let Some(max_messages) = max_messages {
fetch_builder = fetch_builder.max_messages(max_messages);
}
if let Some(group) = group {
fetch_builder = fetch_builder.group(group);
}
if let Some(priority) = priority {
fetch_builder = fetch_builder.priority(priority);
}
if let Some(max_bytes) = max_bytes {
fetch_builder = fetch_builder.max_bytes(max_bytes);
}
if let Some(heartbeat) = heartbeat {
fetch_builder = fetch_builder.heartbeat(heartbeat.into());
}
if let Some(expires) = expires {
fetch_builder = fetch_builder.expires(expires.into());
}
if let Some(min_pending) = min_pending {
fetch_builder = fetch_builder.min_pending(min_pending);
}
if let Some(min_ack_pending) = min_ack_pending {
fetch_builder = fetch_builder.min_ack_pending(min_ack_pending);
}
let mut messages = fetch_builder.messages().await?;
let mut ret_messages = Vec::new();
while let Some(msg) = messages.next().await {
let raw_msg = msg?;
ret_messages.push(crate::js::message::JetStreamMessage::try_from(raw_msg)?);
}
Ok(ret_messages)
})
}
}