From f95ef95eaccf84a03c0be46d8e2525b9311c625b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 5 Apr 2026 21:54:27 +0530 Subject: [PATCH 1/2] perf(v5): replace `copy_from_slice` in `Publish::new` and `LastWill::new` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both used `Bytes::copy_from_slice`, which allocates a String and copies its bytes into a second Bytes allocation before dropping the String. Replace with `Bytes::from` to take ownership of the String's buffer directly — same approach as the v4 Publish constructor — cutting the topic allocation count from 2, down to 1 per publish. --- rumqttc/src/v5/mqttbytes/v5/connect.rs | 2 +- rumqttc/src/v5/mqttbytes/v5/publish.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rumqttc/src/v5/mqttbytes/v5/connect.rs b/rumqttc/src/v5/mqttbytes/v5/connect.rs index dfef10621..944c3e44d 100644 --- a/rumqttc/src/v5/mqttbytes/v5/connect.rs +++ b/rumqttc/src/v5/mqttbytes/v5/connect.rs @@ -371,7 +371,7 @@ impl LastWill { retain: bool, properties: Option, ) -> LastWill { - let topic = Bytes::copy_from_slice(topic.into().as_bytes()); + let topic = Bytes::from(topic.into()); LastWill { topic, message: Bytes::from(payload.into()), diff --git a/rumqttc/src/v5/mqttbytes/v5/publish.rs b/rumqttc/src/v5/mqttbytes/v5/publish.rs index e2eee772d..0cc9c47bb 100644 --- a/rumqttc/src/v5/mqttbytes/v5/publish.rs +++ b/rumqttc/src/v5/mqttbytes/v5/publish.rs @@ -20,7 +20,7 @@ impl Publish { payload: P, properties: Option, ) -> Self { - let topic = Bytes::copy_from_slice(topic.into().as_bytes()); + let topic = Bytes::from(topic.into()); Self { qos, topic, From 7b09960a2b057f1ce31785131951044fce8ad468 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 6 Apr 2026 15:38:09 +0530 Subject: [PATCH 2/2] perf(v5): eliminate unnecessary alloc in v5 publish path 1. `Publish::new` topic bound changed from `T: Into` to `T: Into`. `String::from(&str)` (clone) ~> `Bytes::from(String::into_bytes())` = 0 copies. 2. `handle_publish`/`handle_try_publish`/`handle_publish_bytes` now validate the topic string before consuming it, passing the resulting `Bytes` directly to `Publish::new` instead of passing `&String` (saving on an unnecessary clone). --- rumqttc/src/v5/client.rs | 32 ++++++++++++++++---------- rumqttc/src/v5/mqttbytes/v5/publish.rs | 2 +- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 7a86333f2..5babdb247 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -83,11 +83,13 @@ impl AsyncClient { S: Into, P: Into, { - let topic = topic.into(); - let mut publish = Publish::new(&topic, qos, payload, properties); + let topic: String = topic.into(); + let topic_valid = valid_topic(&topic); + let topic_bytes = Bytes::from(topic.into_bytes()); + let mut publish = Publish::new(topic_bytes, qos, payload, properties); publish.retain = retain; let publish = Request::Publish(publish); - if !valid_topic(&topic) { + if !topic_valid { return Err(ClientError::Request(publish)); } self.request_tx.send_async(publish).await?; @@ -137,11 +139,13 @@ impl AsyncClient { S: Into, P: Into, { - let topic = topic.into(); - let mut publish = Publish::new(&topic, qos, payload, properties); + let topic: String = topic.into(); + let topic_valid = valid_topic(&topic); + let topic_bytes = Bytes::from(topic.into_bytes()); + let mut publish = Publish::new(topic_bytes, qos, payload, properties); publish.retain = retain; let publish = Request::Publish(publish); - if !valid_topic(&topic) { + if !topic_valid { return Err(ClientError::TryRequest(publish)); } self.request_tx.try_send(publish)?; @@ -208,11 +212,13 @@ impl AsyncClient { where S: Into, { - let topic = topic.into(); - let mut publish = Publish::new(&topic, qos, payload, properties); + let topic: String = topic.into(); + let topic_valid = valid_topic(&topic); + let topic_bytes = Bytes::from(topic.into_bytes()); + let mut publish = Publish::new(topic_bytes, qos, payload, properties); publish.retain = retain; let publish = Request::Publish(publish); - if !valid_topic(&topic) { + if !topic_valid { return Err(ClientError::TryRequest(publish)); } self.request_tx.send_async(publish).await?; @@ -508,11 +514,13 @@ impl Client { S: Into, P: Into, { - let topic = topic.into(); - let mut publish = Publish::new(&topic, qos, payload, properties); + let topic: String = topic.into(); + let topic_valid = valid_topic(&topic); + let topic_bytes = Bytes::from(topic.into_bytes()); + let mut publish = Publish::new(topic_bytes, qos, payload, properties); publish.retain = retain; let publish = Request::Publish(publish); - if !valid_topic(&topic) { + if !topic_valid { return Err(ClientError::Request(publish)); } self.client.request_tx.send(publish)?; diff --git a/rumqttc/src/v5/mqttbytes/v5/publish.rs b/rumqttc/src/v5/mqttbytes/v5/publish.rs index 0cc9c47bb..ba4ec1bd0 100644 --- a/rumqttc/src/v5/mqttbytes/v5/publish.rs +++ b/rumqttc/src/v5/mqttbytes/v5/publish.rs @@ -14,7 +14,7 @@ pub struct Publish { } impl Publish { - pub fn new, P: Into>( + pub fn new, P: Into>( topic: T, qos: QoS, payload: P,