Skip to content

Commit 6de961a

Browse files
committed
fix(mqtt): pass client certificates to rumqttc for mTLS
1 parent 89bc15f commit 6de961a

3 files changed

Lines changed: 14 additions & 3 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed the mqtt sink's certificate handling.
2+
3+
authors: mr-

src/sources/mqtt/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl MqttSourceConfig {
148148

149149
if let Some(tls) = tls.tls() {
150150
let ca = tls.authorities_pem().flatten().collect();
151-
let client_auth = None;
151+
let client_auth = tls.identity_pem();
152152
let alpn = Some(vec!["mqtt".into()]);
153153
options.set_transport(Transport::Tls(TlsConfiguration::Simple {
154154
ca,

src/sources/mqtt/source.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ impl MqttSource {
4747
client
4848
.subscribe(topic, QoS::AtLeastOnce)
4949
.await
50-
.map_err(|_| ())?;
50+
.map_err(|e| {
51+
tracing::error!(error = ?e, "Failed to send MQTT subscribe command");
52+
})?;
5153
}
5254
OneOrMany::Many(topics) => {
5355
client
@@ -58,7 +60,9 @@ impl MqttSource {
5860
.map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)),
5961
)
6062
.await
61-
.map_err(|_| ())?;
63+
.map_err(|e| {
64+
tracing::error!(error = ?e, "Failed to send MQTT subscribe command");
65+
})?;
6266
}
6367
}
6468

@@ -80,6 +84,10 @@ impl MqttSource {
8084
)) => {
8185
// TODO Handle acknowledgement - https://github.com/vectordotdev/vector/issues/21967
8286
}
87+
Err(e) => {
88+
tracing::error!("Error = {e:?}");
89+
return Ok(());
90+
}
8391
_ => {}
8492
}
8593
}

0 commit comments

Comments
 (0)