Skip to content

Commit d225efe

Browse files
feat(rumqttc): add support for unbounded client
Signed-off-by: Jyotiraditya Panda <jyotiraditya@aospa.co>
1 parent 2167da0 commit d225efe

6 files changed

Lines changed: 152 additions & 41 deletions

File tree

rumqttc/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
## [Unreleased]
99

1010
### Added
11+
- `ClientBuilder` and `AsyncClientBuilder` for both v4 and v5 clients,
12+
enabling unbounded channels by default with opt-in bounded capacity
13+
via `.capacity(n)`.
14+
1115
### Changed
1216
### Deprecated
17+
- `Client::new(options, cap)` and `AsyncClient::new(options, cap)`
18+
use `Client::builder(options).build()` or
19+
`Client::builder(options).capacity(cap).build()` instead.
20+
1321
### Removed
1422
### Fixed
1523
### Security

rumqttc/examples/unbounded.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use rumqttc::{Client, MqttOptions, QoS};
2+
use std::thread;
3+
use std::time::Duration;
4+
5+
fn main() {
6+
let mqtt_opts = MqttOptions::new("test-1", "localhost", 1883);
7+
8+
// unbounded channel — no capacity limit
9+
let (mut client, mut connection) = Client::builder(mqtt_opts).build();
10+
11+
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
12+
13+
thread::spawn(move || {
14+
for i in 0..10 {
15+
client
16+
.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![1; i])
17+
.unwrap();
18+
thread::sleep(Duration::from_millis(100));
19+
}
20+
});
21+
22+
for notification in connection.iter() {
23+
println!("Notification = {:?}", notification);
24+
}
25+
}

rumqttc/src/client.rs

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,42 @@ impl From<TrySendError<Request>> for ClientError {
3232
}
3333
}
3434

35+
pub struct ClientBuilder {
36+
options: MqttOptions,
37+
capacity: Option<usize>,
38+
}
39+
40+
impl ClientBuilder {
41+
pub fn new(options: MqttOptions) -> Self {
42+
Self {
43+
options,
44+
capacity: None,
45+
}
46+
}
47+
48+
pub fn capacity(mut self, cap: usize) -> Self {
49+
self.capacity = Some(cap);
50+
self
51+
}
52+
53+
pub fn build_async(self) -> (AsyncClient, EventLoop) {
54+
let eventloop = EventLoop::new(self.options, self.capacity);
55+
let request_tx = eventloop.requests_tx.clone();
56+
(AsyncClient { request_tx }, eventloop)
57+
}
58+
59+
pub fn build(self) -> (Client, Connection) {
60+
let (client, eventloop) = self.build_async();
61+
let client = Client { client };
62+
let runtime = runtime::Builder::new_current_thread()
63+
.enable_all()
64+
.build()
65+
.unwrap();
66+
let connection = Connection::new(eventloop, runtime);
67+
(client, connection)
68+
}
69+
}
70+
3571
/// An asynchronous client, communicates with MQTT `EventLoop`.
3672
///
3773
/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
@@ -47,14 +83,17 @@ pub struct AsyncClient {
4783
impl AsyncClient {
4884
/// Create a new `AsyncClient`.
4985
///
50-
/// `cap` specifies the capacity of the bounded async channel.
86+
/// # Deprecation
87+
/// Use [`ClientBuilder`] instead for bounded/unbounded control.
88+
/// `Client::builder(options).capacity(cap).build_async()` for bounded,
89+
/// `Client::builder(options).build_async()` for unbounded.
90+
#[deprecated(since = "0.26.0", note = "Use ClientBuilder instead")]
5191
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
52-
let eventloop = EventLoop::new(options, cap);
53-
let request_tx = eventloop.requests_tx.clone();
54-
55-
let client = AsyncClient { request_tx };
92+
ClientBuilder::new(options).capacity(cap).build_async()
93+
}
5694

57-
(client, eventloop)
95+
pub fn builder(options: MqttOptions) -> ClientBuilder {
96+
ClientBuilder::new(options)
5897
}
5998

6099
/// Create a new `AsyncClient` from a channel `Sender`.
@@ -253,19 +292,17 @@ pub struct Client {
253292
}
254293

255294
impl Client {
256-
/// Create a new `Client`
295+
/// Create a new `Client`.
257296
///
258-
/// `cap` specifies the capacity of the bounded async channel.
297+
/// # Deprecation
298+
/// Use [`ClientBuilder`] instead.
299+
#[deprecated(since = "0.26.0", note = "Use ClientBuilder instead")]
259300
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
260-
let (client, eventloop) = AsyncClient::new(options, cap);
261-
let client = Client { client };
262-
let runtime = runtime::Builder::new_current_thread()
263-
.enable_all()
264-
.build()
265-
.unwrap();
301+
ClientBuilder::new(options).capacity(cap).build()
302+
}
266303

267-
let connection = Connection::new(eventloop, runtime);
268-
(client, connection)
304+
pub fn builder(options: MqttOptions) -> ClientBuilder {
305+
ClientBuilder::new(options)
269306
}
270307

271308
/// Create a new `Client` from a channel `Sender`.
@@ -532,7 +569,7 @@ mod test {
532569
.set_keep_alive(Duration::from_secs(5))
533570
.set_last_will(will);
534571

535-
let (_, mut connection) = Client::new(mqttoptions, 10);
572+
let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
536573
let _ = connection.iter();
537574
let _ = connection.iter();
538575
}

rumqttc/src/eventloop.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{MqttOptions, Outgoing};
44

55
use crate::framed::AsyncReadWrite;
66
use crate::mqttbytes::v4::*;
7-
use flume::{bounded, Receiver, Sender};
7+
use flume::{bounded, unbounded, Receiver, Sender};
88
use tokio::net::{lookup_host, TcpSocket, TcpStream};
99
use tokio::select;
1010
use tokio::time::{self, Instant, Sleep};
@@ -99,8 +99,11 @@ impl EventLoop {
9999
///
100100
/// When connection encounters critical errors (like auth failure), user has a choice to
101101
/// access and update `options`, `state` and `requests`.
102-
pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop {
103-
let (requests_tx, requests_rx) = bounded(cap);
102+
pub fn new(mqtt_options: MqttOptions, cap: Option<usize>) -> EventLoop {
103+
let (requests_tx, requests_rx) = match cap {
104+
Some(cap) => bounded(cap),
105+
None => unbounded(),
106+
};
104107
let pending = VecDeque::new();
105108
let max_inflight = mqtt_options.inflight;
106109
let manual_acks = mqtt_options.manual_acks;

rumqttc/src/v5/client.rs

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,41 @@ impl From<TrySendError<Request>> for ClientError {
3737
}
3838
}
3939

40+
pub struct AsyncClientBuilder {
41+
options: MqttOptions,
42+
capacity: Option<usize>,
43+
}
44+
45+
impl AsyncClientBuilder {
46+
pub fn new(options: MqttOptions) -> Self {
47+
Self {
48+
options,
49+
capacity: None,
50+
}
51+
}
52+
53+
pub fn capacity(mut self, cap: usize) -> Self {
54+
self.capacity = Some(cap);
55+
self
56+
}
57+
58+
pub fn build_async(self) -> (AsyncClient, EventLoop) {
59+
let eventloop = EventLoop::new(self.options, self.capacity);
60+
let request_tx = eventloop.requests_tx.clone();
61+
(AsyncClient { request_tx }, eventloop)
62+
}
63+
64+
pub fn build(self) -> (Client, Connection) {
65+
let (client, eventloop) = self.build_async();
66+
let client = Client { client };
67+
let runtime = runtime::Builder::new_current_thread()
68+
.enable_all()
69+
.build()
70+
.unwrap();
71+
(client, Connection::new(eventloop, runtime))
72+
}
73+
}
74+
4075
/// An asynchronous client, communicates with MQTT `EventLoop`.
4176
///
4277
/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
@@ -52,14 +87,17 @@ pub struct AsyncClient {
5287
impl AsyncClient {
5388
/// Create a new `AsyncClient`.
5489
///
55-
/// `cap` specifies the capacity of the bounded async channel.
90+
/// # Deprecation
91+
/// Use [`AsyncClientBuilder`] instead for bounded/unbounded control.
92+
/// `Client::builder(options).capacity(cap).build_async()` for bounded,
93+
/// `Client::builder(options).build_async()` for unbounded.
94+
#[deprecated(since = "0.26.0", note = "Use AsyncClientBuilder instead")]
5695
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
57-
let eventloop = EventLoop::new(options, cap);
58-
let request_tx = eventloop.requests_tx.clone();
59-
60-
let client = AsyncClient { request_tx };
96+
AsyncClientBuilder::new(options).capacity(cap).build_async()
97+
}
6198

62-
(client, eventloop)
99+
pub fn builder(options: MqttOptions) -> AsyncClientBuilder {
100+
AsyncClientBuilder::new(options)
63101
}
64102

65103
/// Create a new `AsyncClient` from a channel `Sender`.
@@ -469,20 +507,17 @@ pub struct Client {
469507
}
470508

471509
impl Client {
472-
/// Create a new `Client`
510+
/// Create a new `Client`.
473511
///
474-
/// `cap` specifies the capacity of the bounded async channel.
512+
/// # Deprecation
513+
/// Use [`AsyncClientBuilder`] instead.
514+
#[deprecated(since = "0.26.0", note = "Use AsyncClientBuilder instead")]
475515
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
476-
let (client, eventloop) = AsyncClient::new(options, cap);
477-
let client = Client { client };
478-
479-
let runtime = runtime::Builder::new_current_thread()
480-
.enable_all()
481-
.build()
482-
.unwrap();
516+
AsyncClientBuilder::new(options).capacity(cap).build()
517+
}
483518

484-
let connection = Connection::new(eventloop, runtime);
485-
(client, connection)
519+
pub fn builder(options: MqttOptions) -> AsyncClientBuilder {
520+
AsyncClientBuilder::new(options)
486521
}
487522

488523
/// Create a new `Client` from a channel `Sender`.
@@ -882,7 +917,7 @@ mod test {
882917
.set_keep_alive(Duration::from_secs(5))
883918
.set_last_will(will);
884919

885-
let (_, mut connection) = Client::new(mqttoptions, 10);
920+
let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
886921
let _ = connection.iter();
887922
let _ = connection.iter();
888923
}

rumqttc/src/v5/eventloop.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::{Incoming, MqttOptions, MqttState, Outgoing, Request, StateError, Tra
44
use crate::eventloop::socket_connect;
55
use crate::framed::AsyncReadWrite;
66

7-
use flume::{bounded, Receiver, Sender};
7+
use flume::{bounded, unbounded, Receiver, Sender};
88
use tokio::select;
99
use tokio::time::{self, error::Elapsed, Instant, Sleep};
1010

@@ -96,8 +96,11 @@ impl EventLoop {
9696
///
9797
/// When connection encounters critical errors (like auth failure), user has a choice to
9898
/// access and update `options`, `state` and `requests`.
99-
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
100-
let (requests_tx, requests_rx) = bounded(cap);
99+
pub fn new(options: MqttOptions, cap: Option<usize>) -> EventLoop {
100+
let (requests_tx, requests_rx) = match cap {
101+
Some(cap) => bounded(cap),
102+
None => unbounded(),
103+
};
101104
let pending = VecDeque::new();
102105
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
103106
let manual_acks = options.manual_acks;

0 commit comments

Comments
 (0)