Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- `local::LinkTx::publish_with_retain` and
`local::LinkTx::try_publish_with_retain` for in-process callers
that need to seed the retained store with a custom retain flag.
Existing `publish` / `try_publish` keep the previous
`retain: false` behaviour and now delegate.
### Changed
### Deprecated
### Removed
Expand Down
45 changes: 41 additions & 4 deletions rumqttd/src/link/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,38 @@ impl LinkTx {
Ok(())
}

/// Sends a MQTT Publish to the router
/// Sends a MQTT Publish to the router with `retain: false`.
pub fn publish<S, V>(&mut self, topic: S, payload: V) -> Result<usize, LinkError>
where
S: Into<Bytes>,
V: Into<Bytes>,
{
self.publish_with_retain(topic, payload, false)
}

/// Sends a MQTT Publish to the router with an explicit retain flag.
///
/// Useful for embedded-broker setups that need to seed the
/// retained store with snapshot-class topics from a local
/// data source (a captured fixture, an SQLite snapshot, ...)
/// before any external client connects. Without this method
/// callers either had to spin up a loopback rumqttc client to
/// publish-with-retain over the wire, or wait for the next live
/// publish to populate the retained store.
pub fn publish_with_retain<S, V>(
&mut self,
topic: S,
payload: V,
retain: bool,
) -> Result<usize, LinkError>
where
S: Into<Bytes>,
V: Into<Bytes>,
{
let publish = Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
retain,
topic: topic.into(),
pkid: 0,
payload: payload.into(),
Expand All @@ -230,16 +252,31 @@ impl LinkTx {
Ok(len)
}

/// Sends a MQTT Publish to the router
/// Sends a MQTT Publish to the router with `retain: false`.
pub fn try_publish<S, V>(&mut self, topic: S, payload: V) -> Result<usize, LinkError>
where
S: Into<Bytes>,
V: Into<Bytes>,
{
self.try_publish_with_retain(topic, payload, false)
}

/// Sends a MQTT Publish to the router (non-blocking) with an
/// explicit retain flag. See [`LinkTx::publish_with_retain`].
pub fn try_publish_with_retain<S, V>(
&mut self,
topic: S,
payload: V,
retain: bool,
) -> Result<usize, LinkError>
where
S: Into<Bytes>,
V: Into<Bytes>,
{
let publish = Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
retain,
topic: topic.into(),
pkid: 0,
payload: payload.into(),
Expand Down