Skip to content

Commit 89bc15f

Browse files
chore(deps): Bump lapin from 2.5.3 to 4.3.0 (#23316)
* chore(deps): Bump lapin from 2.5.3 to 3.0.0 Bumps [lapin](https://github.com/amqp-rs/lapin) from 2.5.3 to 3.0.0. - [Changelog](https://github.com/amqp-rs/lapin/blob/main/CHANGELOG.md) - [Commits](amqp-rs/lapin@lapin-2.5.3...lapin-3.0.0) --- updated-dependencies: - dependency-name: lapin dependency-version: 3.0.0 dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * Fix OwnedIdentity type change * cargo vdev build licenses * Bump lapin to 4.3.0 3.1.0 default-runtime conflicts with heim. 4.x uses tokio * Make tests and code work with new version * Update licenses * Remove clone comment * Fix clippy --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Thomas <thomas.schneider@datadoghq.com>
1 parent 59f53e2 commit 89bc15f

8 files changed

Lines changed: 183 additions & 156 deletions

File tree

Cargo.lock

Lines changed: 110 additions & 102 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,8 @@ goauth = { version = "0.16.0", optional = true }
337337
smpl_jwt = { version = "0.8.0", default-features = false, optional = true }
338338

339339
# AMQP
340-
lapin = { version = "2.5.3", default-features = false, features = ["native-tls"], optional = true }
340+
lapin = { version = "4.3.0", default-features = false, features = ["tokio", "native-tls"], optional = true }
341+
async-rs = { version = "0.8", default-features = false, features = ["tokio"], optional = true }
341342
deadpool = { version = "0.12.2", default-features = false, features = ["managed", "rt_tokio_1"], optional = true }
342343

343344
# API
@@ -666,7 +667,7 @@ sources-metrics = [
666667
"sources-websocket",
667668
]
668669

669-
sources-amqp = ["lapin"]
670+
sources-amqp = ["lapin", "async-rs"]
670671
sources-apache_metrics = ["sources-utils-http-client"]
671672
sources-aws_ecs_metrics = ["sources-utils-http-client"]
672673
sources-aws_kinesis_firehose = ["dep:base64"]
@@ -862,7 +863,7 @@ sinks-metrics = [
862863
"sinks-splunk_hec"
863864
]
864865

865-
sinks-amqp = ["deadpool", "lapin"]
866+
sinks-amqp = ["deadpool", "lapin", "async-rs"]
866867
sinks-appsignal = []
867868
sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws-sdk-kms"]
868869
sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"]

LICENSE-3rdparty.csv

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ aho-corasick,https://github.com/BurntSushi/aho-corasick,Unlicense OR MIT,Andrew
1010
alloc-no-stdlib,https://github.com/dropbox/rust-alloc-no-stdlib,BSD-3-Clause,Daniel Reiter Horn <danielrh@dropbox.com>
1111
alloc-stdlib,https://github.com/dropbox/rust-alloc-no-stdlib,BSD-3-Clause,Daniel Reiter Horn <danielrh@dropbox.com>
1212
allocator-api2,https://github.com/zakarumych/allocator-api2,MIT OR Apache-2.0,Zakarum <zaq.dev@icloud.com>
13-
amq-protocol,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <%arc-Antoine@Perennou.com>
14-
amq-protocol-tcp,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <%arc-Antoine@Perennou.com>
15-
amq-protocol-types,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <%arc-Antoine@Perennou.com>
16-
amq-protocol-uri,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <%arc-Antoine@Perennou.com>
13+
amq-protocol,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
14+
amq-protocol-tcp,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
15+
amq-protocol-types,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
16+
amq-protocol-uri,https://github.com/amqp-rs/amq-protocol,BSD-2-Clause,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
1717
android_system_properties,https://github.com/nical/android_system_properties,MIT OR Apache-2.0,Nicolas Silva <nical@fastmail.com>
1818
anstream,https://github.com/rust-cli/anstyle,MIT OR Apache-2.0,The anstream Authors
1919
anstyle,https://github.com/rust-cli/anstyle,MIT OR Apache-2.0,The anstyle Authors
@@ -44,23 +44,24 @@ arrow-string,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow <dev@arr
4444
ascii,https://github.com/tomprogrammer/rust-ascii,Apache-2.0 OR MIT,"Thomas Bahn <thomas@thomas-bahn.net>, Torbjørn Birch Moltu <t.b.moltu@lyse.net>, Simon Sapin <simon.sapin@exyr.org>"
4545
async-broadcast,https://github.com/smol-rs/async-broadcast,MIT OR Apache-2.0,"Stjepan Glavina <stjepang@gmail.com>, Yoshua Wuyts <yoshuawuyts@gmail.com>, Zeeshan Ali Khan <zeeshanak@gnome.org>"
4646
async-channel,https://github.com/smol-rs/async-channel,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
47+
async-compat,https://github.com/smol-rs/async-compat,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
4748
async-compression,https://github.com/Nullus157/async-compression,MIT OR Apache-2.0,"Wim Looman <wim@nemo157.com>, Allen Bui <fairingrey@gmail.com>"
4849
async-executor,https://github.com/smol-rs/async-executor,Apache-2.0 OR MIT,"Stjepan Glavina <stjepang@gmail.com>, John Nunley <dev@notgull.net>"
4950
async-fs,https://github.com/smol-rs/async-fs,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
5051
async-global-executor,https://github.com/Keruspe/async-global-executor,Apache-2.0 OR MIT,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
51-
async-global-executor-trait,https://github.com/amqp-rs/executor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
5252
async-graphql,https://github.com/async-graphql/async-graphql,MIT OR Apache-2.0,"sunli <scott_s829@163.com>, Koxiaet"
5353
async-graphql-derive,https://github.com/async-graphql/async-graphql,MIT OR Apache-2.0,"sunli <scott_s829@163.com>, Koxiaet"
5454
async-graphql-parser,https://github.com/async-graphql/async-graphql,MIT OR Apache-2.0,"sunli <scott_s829@163.com>, Koxiaet"
5555
async-graphql-value,https://github.com/async-graphql/async-graphql,MIT OR Apache-2.0,"sunli <scott_s829@163.com>, Koxiaet"
5656
async-graphql-warp,https://github.com/async-graphql/async-graphql,MIT OR Apache-2.0,"sunli <scott_s829@163.com>, Koxiaet"
5757
async-io,https://github.com/smol-rs/async-io,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
5858
async-lock,https://github.com/smol-rs/async-lock,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
59+
async-native-tls,https://github.com/async-email/async-native-tls,MIT OR Apache-2.0,dignifiedquire <me@dignifiedquire.com>
5960
async-nats,https://github.com/nats-io/nats.rs,Apache-2.0,"Tomasz Pietrek <tomasz@nats.io>, Casper Beyer <caspervonb@pm.me>"
6061
async-net,https://github.com/smol-rs/async-net,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
6162
async-process,https://github.com/smol-rs/async-process,Apache-2.0 OR MIT,Stjepan Glavina <stjepang@gmail.com>
62-
async-reactor-trait,https://github.com/amqp-rs/reactor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
6363
async-recursion,https://github.com/dcchut/async-recursion,MIT OR Apache-2.0,Robert Usher <266585+dcchut@users.noreply.github.com>
64+
async-rs,https://github.com/amqp-rs/async-rs,BSD-2-Clause,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
6465
async-signal,https://github.com/smol-rs/async-signal,Apache-2.0 OR MIT,John Nunley <dev@notgull.net>
6566
async-stream,https://github.com/tokio-rs/async-stream,MIT,Carl Lerche <me@carllerche.com>
6667
async-stream-impl,https://github.com/tokio-rs/async-stream,MIT,Carl Lerche <me@carllerche.com>
@@ -274,7 +275,6 @@ event-listener,https://github.com/smol-rs/event-listener,Apache-2.0 OR MIT,"Stje
274275
event-listener-strategy,https://github.com/smol-rs/event-listener-strategy,Apache-2.0 OR MIT,John Nunley <dev@notgull.net>
275276
evmap,https://github.com/jonhoo/rust-evmap,MIT OR Apache-2.0,Jon Gjengset <jon@thesquareplanet.com>
276277
evmap-derive,https://github.com/jonhoo/rust-evmap,MIT OR Apache-2.0,Jon Gjengset <jon@thesquareplanet.com>
277-
executor-trait,https://github.com/amqp-rs/executor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
278278
exitcode,https://github.com/benwilber/exitcode,Apache-2.0,Ben Wilber <benwilber@gmail.com>
279279
fakedata_generator,https://github.com/kevingimbel/fakedata_generator,MIT,Kevin Gimbel <hallo@kevingimbel.com>
280280
fallible-iterator,https://github.com/sfackler/rust-fallible-iterator,MIT OR Apache-2.0,Steven Fackler <sfackler@gmail.com>
@@ -564,7 +564,6 @@ pin-project,https://github.com/taiki-e/pin-project,Apache-2.0 OR MIT,The pin-pro
564564
pin-project-internal,https://github.com/taiki-e/pin-project,Apache-2.0 OR MIT,The pin-project-internal Authors
565565
pin-project-lite,https://github.com/taiki-e/pin-project-lite,Apache-2.0 OR MIT,The pin-project-lite Authors
566566
pin-utils,https://github.com/rust-lang-nursery/pin-utils,MIT OR Apache-2.0,Josef Brandl <mail@josefbrandl.de>
567-
pinky-swear,https://github.com/amqp-rs/pinky-swear,BSD-2-Clause,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
568567
piper,https://github.com/notgull/piper,MIT OR Apache-2.0,"Stjepan Glavina <stjepang@gmail.com>, John Nunley <dev@notgull.net>"
569568
pkcs1,https://github.com/RustCrypto/formats/tree/master/pkcs1,Apache-2.0 OR MIT,RustCrypto Developers
570569
pkcs8,https://github.com/RustCrypto/formats/tree/master/pkcs8,Apache-2.0 OR MIT,RustCrypto Developers
@@ -630,7 +629,6 @@ raw-cpuid,https://github.com/gz/rust-cpuid,MIT,Gerd Zellweger <mail@gerdzellwege
630629
raw-window-handle,https://github.com/rust-windowing/raw-window-handle,MIT OR Apache-2.0 OR Zlib,Osspial <osspial@gmail.com>
631630
rdkafka,https://github.com/fede1024/rust-rdkafka,MIT,Federico Giraud <giraud.federico@gmail.com>
632631
rdkafka-sys,https://github.com/fede1024/rust-rdkafka,MIT,Federico Giraud <giraud.federico@gmail.com>
633-
reactor-trait,https://github.com/amqp-rs/executor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
634632
redis,https://github.com/redis-rs/redis-rs,BSD-3-Clause,The redis Authors
635633
redox_syscall,https://gitlab.redox-os.org/redox-os/syscall,MIT,Jeremy Soller <jackpot51@gmail.com>
636634
redox_users,https://gitlab.redox-os.org/redox-os/users,MIT,"Jose Narvaez <goyox86@gmail.com>, Wesley Hershberger <mggmugginsmc@gmail.com>"

src/amqp.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl AmqpConfig {
7272
};
7373
let identity = if let Some(identity) = &tls.key_file {
7474
let der = tokio::fs::read(identity.to_owned()).await?;
75-
Some(OwnedIdentity {
75+
Some(OwnedIdentity::PKCS12 {
7676
der,
7777
password: tls
7878
.key_pass
@@ -91,6 +91,7 @@ impl AmqpConfig {
9191
&addr,
9292
lapin::ConnectionProperties::default(),
9393
tls_config,
94+
async_rs::Runtime::tokio_current(),
9495
)
9596
.await
9697
}

src/sinks/amqp/channel.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,14 @@ impl deadpool::managed::Manager for AmqpSinkChannelManager {
5151
channel: &mut Self::Type,
5252
_: &deadpool::managed::Metrics,
5353
) -> deadpool::managed::RecycleResult<Self::Error> {
54-
let state = channel.status().state();
55-
if state == lapin::ChannelState::Connected {
54+
let status = channel.status();
55+
if status.connected() {
5656
Ok(())
5757
} else {
58-
Err((AmqpError::ChannelClosed { state }).into())
58+
Err((AmqpError::ChannelClosed {
59+
status: status.clone(),
60+
})
61+
.into())
5962
}
6063
}
6164
}

src/sinks/amqp/integration_tests.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{collections::HashSet, time::Duration};
22

33
use config::AmqpPropertiesConfig;
44
use futures::StreamExt;
5+
use lapin::types::ShortString;
56
use vector_lib::{config::LogNamespace, event::LogEvent};
67

78
use super::*;
@@ -61,7 +62,8 @@ async fn amqp_happy_path() {
6162
let mut config = make_config();
6263
let exchange = format!("test-{}-exchange", random_string(10));
6364
config.exchange = Template::try_from(exchange.as_str()).unwrap();
64-
let queue = format!("test-{}-queue", random_string(10));
65+
let queue: lapin::types::ShortString = format!("test-{}-queue", random_string(10)).into();
66+
let exchange: ShortString = exchange.into();
6567

6668
await_connection(&config.connection).await;
6769
let (_conn, channel) = config.connection.connect().await.unwrap();
@@ -71,7 +73,7 @@ async fn amqp_happy_path() {
7173
};
7274
channel
7375
.exchange_declare(
74-
&exchange,
76+
exchange.clone(),
7577
lapin::ExchangeKind::Fanout,
7678
exchange_opts,
7779
lapin::types::FieldTable::default(),
@@ -89,15 +91,19 @@ async fn amqp_happy_path() {
8991
..Default::default()
9092
};
9193
channel
92-
.queue_declare(&queue, queue_opts, lapin::types::FieldTable::default())
94+
.queue_declare(
95+
queue.clone(),
96+
queue_opts,
97+
lapin::types::FieldTable::default(),
98+
)
9399
.await
94100
.unwrap();
95101

96102
channel
97103
.queue_bind(
98-
&queue,
99-
&exchange,
100-
"",
104+
queue.clone(),
105+
exchange,
106+
"".into(),
101107
lapin::options::QueueBindOptions::default(),
102108
lapin::types::FieldTable::default(),
103109
)
@@ -107,8 +113,8 @@ async fn amqp_happy_path() {
107113
let consumer = format!("test-{}-consumer", random_string(10));
108114
let mut consumer = channel
109115
.basic_consume(
110-
&queue,
111-
&consumer,
116+
queue,
117+
consumer.into(),
112118
lapin::options::BasicConsumeOptions::default(),
113119
lapin::types::FieldTable::default(),
114120
)
@@ -151,6 +157,7 @@ async fn amqp_round_trip() {
151157
let exchange = format!("test-{}-exchange", random_string(10));
152158
config.exchange = Template::try_from(exchange.as_str()).unwrap();
153159
let queue = format!("test-{}-queue", random_string(10));
160+
let exchange: ShortString = exchange.into();
154161

155162
await_connection(&config.connection).await;
156163
let (_conn, channel) = config.connection.connect().await.unwrap();
@@ -160,7 +167,7 @@ async fn amqp_round_trip() {
160167
};
161168
channel
162169
.exchange_declare(
163-
&exchange,
170+
exchange.clone(),
164171
lapin::ExchangeKind::Fanout,
165172
exchange_opts,
166173
lapin::types::FieldTable::default(),
@@ -192,20 +199,25 @@ async fn amqp_round_trip() {
192199
.unwrap();
193200

194201
// prepare server
202+
let queue: ShortString = queue.into();
195203
let queue_opts = lapin::options::QueueDeclareOptions {
196204
auto_delete: true,
197205
..Default::default()
198206
};
199207
channel
200-
.queue_declare(&queue, queue_opts, lapin::types::FieldTable::default())
208+
.queue_declare(
209+
queue.clone(),
210+
queue_opts,
211+
lapin::types::FieldTable::default(),
212+
)
201213
.await
202214
.unwrap();
203215

204216
channel
205217
.queue_bind(
206-
&queue,
207-
&exchange,
208-
"",
218+
queue,
219+
exchange,
220+
"".into(),
209221
lapin::options::QueueBindOptions::default(),
210222
lapin::types::FieldTable::default(),
211223
)
@@ -235,6 +247,7 @@ async fn amqp_priority_with_template(
235247
let mut config = make_config();
236248
let exchange = format!("test-{}-exchange", random_string(10));
237249
config.exchange = Template::try_from(exchange.as_str()).unwrap();
250+
let exchange: ShortString = exchange.into();
238251
config.properties = Some(AmqpPropertiesConfig {
239252
priority: Some(UnsignedIntTemplate::try_from(template).unwrap()),
240253
..Default::default()
@@ -248,7 +261,7 @@ async fn amqp_priority_with_template(
248261
};
249262
channel
250263
.exchange_declare(
251-
&exchange,
264+
exchange.clone(),
252265
lapin::ExchangeKind::Fanout,
253266
exchange_opts,
254267
lapin::types::FieldTable::default(),
@@ -261,7 +274,7 @@ async fn amqp_priority_with_template(
261274
healthcheck.await.expect("Health check failed");
262275

263276
// prepare consumer
264-
let queue = format!("test-{}-queue", random_string(10));
277+
let queue: ShortString = format!("test-{}-queue", random_string(10)).into();
265278
let queue_opts = lapin::options::QueueDeclareOptions {
266279
auto_delete: true,
267280
..Default::default()
@@ -275,15 +288,15 @@ async fn amqp_priority_with_template(
275288
args
276289
};
277290
channel
278-
.queue_declare(&queue, queue_opts, queue_args)
291+
.queue_declare(queue.clone(), queue_opts, queue_args)
279292
.await
280293
.unwrap();
281294

282295
channel
283296
.queue_bind(
284-
&queue,
285-
&exchange,
286-
"",
297+
queue.clone(),
298+
exchange,
299+
"".into(),
287300
lapin::options::QueueBindOptions::default(),
288301
lapin::types::FieldTable::default(),
289302
)
@@ -293,8 +306,8 @@ async fn amqp_priority_with_template(
293306
let consumer = format!("test-{}-consumer", random_string(10));
294307
let mut consumer = channel
295308
.basic_consume(
296-
&queue,
297-
&consumer,
309+
queue.clone(),
310+
consumer.into(),
298311
lapin::options::BasicConsumeOptions::default(),
299312
lapin::types::FieldTable::default(),
300313
)

src/sinks/amqp/service.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ pub enum AmqpError {
9696
#[snafu(display("Failed to open AMQP channel: {}", error))]
9797
ConnectFailed { error: vector_common::Error },
9898

99-
#[snafu(display("Channel is not writeable: {:?}", state))]
100-
ChannelClosed { state: lapin::ChannelState },
99+
#[snafu(display("Channel is not writeable: {:?}", status))]
100+
ChannelClosed { status: lapin::ChannelStatus },
101101

102102
#[snafu(display("Channel pool error: {}", error))]
103103
PoolError { error: vector_common::Error },
@@ -125,8 +125,8 @@ impl Service<AmqpRequest> for AmqpService {
125125
let byte_size = req.body.len();
126126
let fut = channel
127127
.basic_publish(
128-
&req.exchange,
129-
&req.routing_key,
128+
req.exchange.clone().into(),
129+
req.routing_key.clone().into(),
130130
BasicPublishOptions::default(),
131131
req.body.as_ref(),
132132
req.properties,
@@ -135,7 +135,7 @@ impl Service<AmqpRequest> for AmqpService {
135135

136136
match fut {
137137
Ok(result) => match result.await {
138-
Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => Err(AmqpError::Nack),
138+
Ok(lapin::Confirmation::Nack(_)) => Err(AmqpError::Nack),
139139
Err(error) => Err(AmqpError::AcknowledgementFailed { error }),
140140
Ok(_) => Ok(AmqpResponse {
141141
json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),

0 commit comments

Comments
 (0)