Skip to content

Commit 7a5e096

Browse files
feat(rumqttc): add support for unbounded client
Signed-off-by: Jyotiraditya Panda <jyotiraditya@aospa.co>
1 parent e886a78 commit 7a5e096

6 files changed

Lines changed: 215 additions & 47 deletions

File tree

rumqttc/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
## [Unreleased]
99

1010
### Added
11+
- `ClientBuilder` and `AsyncClientBuilder` for both v4 and v5 clients,
12+
enabling bounded channels by default using the capacity set in `MqttOptions`,
13+
with opt-in unbounded channels via `.unbounded()`.
14+
1115
### Changed
1216
### Deprecated
17+
- `Client::new(options, cap)` and `AsyncClient::new(options, cap)` -
18+
use `Client::builder(options).build()` or
19+
`Client::builder(options).capacity(cap).build()` instead.
20+
1321
### Removed
1422
### Fixed
1523
### Security

rumqttc/examples/unbounded.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use rumqttc::{Client, MqttOptions, QoS};
2+
use std::thread;
3+
use std::time::Duration;
4+
5+
fn main() {
6+
let mqtt_opts = MqttOptions::new("test-1", "localhost", 1883);
7+
8+
// unbounded channel, no backpressure
9+
let (mut client, mut connection) = Client::builder(mqtt_opts).unbounded().build();
10+
11+
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
12+
13+
thread::spawn(move || {
14+
for i in 0..10 {
15+
client
16+
.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![1; i])
17+
.unwrap();
18+
thread::sleep(Duration::from_millis(100));
19+
}
20+
});
21+
22+
for notification in connection.iter() {
23+
println!("Notification = {:?}", notification);
24+
}
25+
}

rumqttc/src/client.rs

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! async eventloop.
33
use std::time::Duration;
44

5+
use crate::eventloop::ChannelCapacity;
56
use crate::mqttbytes::{v4::*, QoS};
67
use crate::{valid_filter, valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request};
78

@@ -32,6 +33,62 @@ impl From<TrySendError<Request>> for ClientError {
3233
}
3334
}
3435

36+
/// Builder for synchronous and asynchronous MQTT v4 clients.
37+
///
38+
/// The request channel is bounded by default, using the capacity configured
39+
/// in [`MqttOptions`]. Call [`capacity`](Self::capacity) to override it or
40+
/// [`unbounded`](Self::unbounded) to opt out of backpressure entirely.
41+
pub struct ClientBuilder {
42+
options: MqttOptions,
43+
capacity: ChannelCapacity,
44+
}
45+
46+
impl ClientBuilder {
47+
/// Create a new builder. The channel capacity defaults to the value set
48+
/// in [`MqttOptions::request_channel_capacity`].
49+
pub fn new(options: MqttOptions) -> Self {
50+
let capacity = ChannelCapacity::Bounded(options.request_channel_capacity());
51+
Self { options, capacity }
52+
}
53+
54+
/// Set a bounded request channel with the given capacity.
55+
/// A capacity of `0` creates a rendezvous channel.
56+
pub fn capacity(mut self, cap: usize) -> Self {
57+
self.capacity = ChannelCapacity::Bounded(cap);
58+
self
59+
}
60+
61+
/// Use an unbounded request channel.
62+
/// This removes backpressure from the client. Prefer bounded channels
63+
/// unless you have a specific reason to allow unbounded queuing.
64+
pub fn unbounded(mut self) -> Self {
65+
self.capacity = ChannelCapacity::Unbounded;
66+
self
67+
}
68+
69+
/// Build an [`AsyncClient`] and its [`EventLoop`].
70+
pub fn build_async(self) -> (AsyncClient, EventLoop) {
71+
let eventloop = EventLoop::new(self.options, self.capacity);
72+
let request_tx = eventloop.requests_tx.clone();
73+
let client = AsyncClient::from_senders(request_tx);
74+
(client, eventloop)
75+
}
76+
77+
/// Build a [`Client`] and its [`Connection`].
78+
pub fn build(self) -> (Client, Connection) {
79+
let (async_client, eventloop) = self.build_async();
80+
let client = Client {
81+
client: async_client,
82+
};
83+
let runtime = runtime::Builder::new_current_thread()
84+
.enable_all()
85+
.build()
86+
.unwrap();
87+
let connection = Connection::new(eventloop, runtime);
88+
(client, connection)
89+
}
90+
}
91+
3592
/// An asynchronous client, communicates with MQTT `EventLoop`.
3693
///
3794
/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
@@ -45,16 +102,15 @@ pub struct AsyncClient {
45102
}
46103

47104
impl AsyncClient {
48-
/// Create a new `AsyncClient`.
49-
///
50-
/// `cap` specifies the capacity of the bounded async channel.
105+
/// Deprecated. Use [`AsyncClient::builder`] instead.
106+
#[deprecated(since = "0.26.0", note = "Use AsyncClient::builder instead")]
51107
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
52-
let eventloop = EventLoop::new(options, cap);
53-
let request_tx = eventloop.requests_tx.clone();
54-
55-
let client = AsyncClient { request_tx };
108+
ClientBuilder::new(options).capacity(cap).build_async()
109+
}
56110

57-
(client, eventloop)
111+
/// Returns a [`ClientBuilder`] for constructing an MQTT v4 async client.
112+
pub fn builder(options: MqttOptions) -> ClientBuilder {
113+
ClientBuilder::new(options)
58114
}
59115

60116
/// Create a new `AsyncClient` from a channel `Sender`.
@@ -253,19 +309,15 @@ pub struct Client {
253309
}
254310

255311
impl Client {
256-
/// Create a new `Client`
257-
///
258-
/// `cap` specifies the capacity of the bounded async channel.
312+
/// Deprecated. Use [`Client::builder`] instead.
313+
#[deprecated(since = "0.26.0", note = "Use Client::builder instead")]
259314
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
260-
let (client, eventloop) = AsyncClient::new(options, cap);
261-
let client = Client { client };
262-
let runtime = runtime::Builder::new_current_thread()
263-
.enable_all()
264-
.build()
265-
.unwrap();
315+
ClientBuilder::new(options).capacity(cap).build()
316+
}
266317

267-
let connection = Connection::new(eventloop, runtime);
268-
(client, connection)
318+
/// Returns a [`ClientBuilder`] for constructing an MQTT v4 sync client.
319+
pub fn builder(options: MqttOptions) -> ClientBuilder {
320+
ClientBuilder::new(options)
269321
}
270322

271323
/// Create a new `Client` from a channel `Sender`.
@@ -532,7 +584,7 @@ mod test {
532584
.set_keep_alive(Duration::from_secs(5))
533585
.set_last_will(will);
534586

535-
let (_, mut connection) = Client::new(mqttoptions, 10);
587+
let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
536588
let _ = connection.iter();
537589
let _ = connection.iter();
538590
}

rumqttc/src/eventloop.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{MqttOptions, Outgoing};
44

55
use crate::framed::AsyncReadWrite;
66
use crate::mqttbytes::v4::*;
7-
use flume::{bounded, Receiver, Sender};
7+
use flume::{bounded, unbounded, Receiver, Sender};
88
use tokio::net::{lookup_host, TcpSocket, TcpStream};
99
use tokio::select;
1010
use tokio::time::{self, Instant, Sleep};
@@ -31,6 +31,19 @@ use {
3131
#[cfg(feature = "proxy")]
3232
use crate::proxy::ProxyError;
3333

34+
/// Controls the capacity of the client request channel.
35+
///
36+
/// Defaults to [`Bounded`] which applies backpressure to the client.
37+
/// Use [`Unbounded`] only when you have a specific reason to remove backpressure.
38+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39+
pub enum ChannelCapacity {
40+
/// Bounded channel with the given capacity.
41+
/// A capacity of `0` creates a rendezvous channel.
42+
Bounded(usize),
43+
/// Unbounded channel with no backpressure.
44+
Unbounded,
45+
}
46+
3447
/// Critical errors during eventloop polling
3548
#[derive(Debug, thiserror::Error)]
3649
pub enum ConnectionError {
@@ -99,8 +112,11 @@ impl EventLoop {
99112
///
100113
/// When connection encounters critical errors (like auth failure), user has a choice to
101114
/// access and update `options`, `state` and `requests`.
102-
pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop {
103-
let (requests_tx, requests_rx) = bounded(cap);
115+
pub fn new(mqtt_options: MqttOptions, cap: ChannelCapacity) -> EventLoop {
116+
let (requests_tx, requests_rx) = match cap {
117+
ChannelCapacity::Bounded(n) => bounded(n),
118+
ChannelCapacity::Unbounded => unbounded(),
119+
};
104120
let pending = VecDeque::new();
105121
let max_inflight = mqtt_options.inflight;
106122
let manual_acks = mqtt_options.manual_acks;

rumqttc/src/v5/client.rs

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! async eventloop.
33
use std::time::Duration;
44

5+
use super::eventloop::ChannelCapacity;
56
use super::mqttbytes::v5::{
67
Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties,
78
Unsubscribe, UnsubscribeProperties,
@@ -37,6 +38,62 @@ impl From<TrySendError<Request>> for ClientError {
3738
}
3839
}
3940

41+
/// Builder for synchronous and asynchronous MQTT v5 clients.
42+
///
43+
/// The request channel is bounded by default, using the capacity configured
44+
/// in [`MqttOptions`]. Call [`capacity`](Self::capacity) to override it or
45+
/// [`unbounded`](Self::unbounded) to opt out of backpressure entirely.
46+
pub struct AsyncClientBuilder {
47+
options: MqttOptions,
48+
capacity: ChannelCapacity,
49+
}
50+
51+
impl AsyncClientBuilder {
52+
/// Create a new builder. The channel capacity defaults to the value set
53+
/// in [`MqttOptions::request_channel_capacity`].
54+
pub fn new(options: MqttOptions) -> Self {
55+
let capacity = ChannelCapacity::Bounded(options.request_channel_capacity());
56+
Self { options, capacity }
57+
}
58+
59+
/// Set a bounded request channel with the given capacity.
60+
/// A capacity of `0` creates a rendezvous channel.
61+
pub fn capacity(mut self, cap: usize) -> Self {
62+
self.capacity = ChannelCapacity::Bounded(cap);
63+
self
64+
}
65+
66+
/// Use an unbounded request channel.
67+
/// This removes backpressure from the client. Prefer bounded channels
68+
/// unless you have a specific reason to allow unbounded queuing.
69+
pub fn unbounded(mut self) -> Self {
70+
self.capacity = ChannelCapacity::Unbounded;
71+
self
72+
}
73+
74+
/// Build an [`AsyncClient`] and its [`EventLoop`].
75+
pub fn build_async(self) -> (AsyncClient, EventLoop) {
76+
let eventloop = EventLoop::new(self.options, self.capacity);
77+
let request_tx = eventloop.requests_tx.clone();
78+
let client = AsyncClient::from_senders(request_tx);
79+
(client, eventloop)
80+
}
81+
82+
/// Build a [`Client`] and its [`Connection`].
83+
pub fn build(self) -> (Client, Connection) {
84+
let (async_client, eventloop) = self.build_async();
85+
let client = Client {
86+
client: async_client,
87+
};
88+
let runtime = runtime::Builder::new_current_thread()
89+
.enable_all()
90+
.build()
91+
.unwrap();
92+
let connection = Connection::new(eventloop, runtime);
93+
(client, connection)
94+
}
95+
}
96+
4097
/// An asynchronous client, communicates with MQTT `EventLoop`.
4198
///
4299
/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
@@ -50,16 +107,15 @@ pub struct AsyncClient {
50107
}
51108

52109
impl AsyncClient {
53-
/// Create a new `AsyncClient`.
54-
///
55-
/// `cap` specifies the capacity of the bounded async channel.
110+
/// Deprecated. Use [`AsyncClient::builder`] instead.
111+
#[deprecated(since = "0.26.0", note = "Use AsyncClient::builder instead")]
56112
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
57-
let eventloop = EventLoop::new(options, cap);
58-
let request_tx = eventloop.requests_tx.clone();
59-
60-
let client = AsyncClient { request_tx };
113+
AsyncClientBuilder::new(options).capacity(cap).build_async()
114+
}
61115

62-
(client, eventloop)
116+
/// Returns an [`AsyncClientBuilder`] for constructing an MQTT v5 async client.
117+
pub fn builder(options: MqttOptions) -> AsyncClientBuilder {
118+
AsyncClientBuilder::new(options)
63119
}
64120

65121
/// Create a new `AsyncClient` from a channel `Sender`.
@@ -469,20 +525,15 @@ pub struct Client {
469525
}
470526

471527
impl Client {
472-
/// Create a new `Client`
473-
///
474-
/// `cap` specifies the capacity of the bounded async channel.
528+
/// Deprecated. Use [`Client::builder`] instead.
529+
#[deprecated(since = "0.26.0", note = "Use Client::builder instead")]
475530
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
476-
let (client, eventloop) = AsyncClient::new(options, cap);
477-
let client = Client { client };
478-
479-
let runtime = runtime::Builder::new_current_thread()
480-
.enable_all()
481-
.build()
482-
.unwrap();
531+
AsyncClientBuilder::new(options).capacity(cap).build()
532+
}
483533

484-
let connection = Connection::new(eventloop, runtime);
485-
(client, connection)
534+
/// Returns an [`AsyncClientBuilder`] for constructing an MQTT v5 sync client.
535+
pub fn builder(options: MqttOptions) -> AsyncClientBuilder {
536+
AsyncClientBuilder::new(options)
486537
}
487538

488539
/// Create a new `Client` from a channel `Sender`.
@@ -882,7 +933,7 @@ mod test {
882933
.set_keep_alive(Duration::from_secs(5))
883934
.set_last_will(will);
884935

885-
let (_, mut connection) = Client::new(mqttoptions, 10);
936+
let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
886937
let _ = connection.iter();
887938
let _ = connection.iter();
888939
}

rumqttc/src/v5/eventloop.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::{Incoming, MqttOptions, MqttState, Outgoing, Request, StateError, Tra
44
use crate::eventloop::socket_connect;
55
use crate::framed::AsyncReadWrite;
66

7-
use flume::{bounded, Receiver, Sender};
7+
use flume::{bounded, unbounded, Receiver, Sender};
88
use tokio::select;
99
use tokio::time::{self, error::Elapsed, Instant, Sleep};
1010

@@ -31,6 +31,19 @@ use {
3131
#[cfg(feature = "proxy")]
3232
use crate::proxy::ProxyError;
3333

34+
/// Controls the capacity of the client request channel.
35+
///
36+
/// Defaults to [`Bounded`] which applies backpressure to the client.
37+
/// Use [`Unbounded`] only when you have a specific reason to remove backpressure.
38+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39+
pub enum ChannelCapacity {
40+
/// Bounded channel with the given capacity.
41+
/// A capacity of `0` creates a rendezvous channel.
42+
Bounded(usize),
43+
/// Unbounded channel with no backpressure.
44+
Unbounded,
45+
}
46+
3447
/// Critical errors during eventloop polling
3548
#[derive(Debug, thiserror::Error)]
3649
pub enum ConnectionError {
@@ -96,8 +109,11 @@ impl EventLoop {
96109
///
97110
/// When connection encounters critical errors (like auth failure), user has a choice to
98111
/// access and update `options`, `state` and `requests`.
99-
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
100-
let (requests_tx, requests_rx) = bounded(cap);
112+
pub fn new(options: MqttOptions, cap: ChannelCapacity) -> EventLoop {
113+
let (requests_tx, requests_rx) = match cap {
114+
ChannelCapacity::Bounded(n) => bounded(n),
115+
ChannelCapacity::Unbounded => unbounded(),
116+
};
101117
let pending = VecDeque::new();
102118
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
103119
let manual_acks = options.manual_acks;

0 commit comments

Comments
 (0)