diff --git a/Cargo.lock b/Cargo.lock index fa00d3f..6331848 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -448,7 +448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea14ef9355e3beab063703aa9dab15afd25f0667c341310c1e5274bb1d0da18" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -468,6 +468,17 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -796,10 +807,10 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls", + "rustls 0.23.27", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", "webpki-roots", ] @@ -997,6 +1008,7 @@ dependencies = [ "config", "deadpool-amqprs", "futures", + "rumqttc", "secrecy", "serde", "serde-aux", @@ -1385,16 +1397,16 @@ dependencies = [ [[package]] name = "proxy-http-client" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "async-stream", - "deadpool-amqprs", "futures", "intersect-ingress-proxy-common", "jemallocator", "reqwest", "reqwest-eventsource", + "rumqttc", "secrecy", "serde", "serde-aux", @@ -1407,16 +1419,16 @@ dependencies = [ [[package]] name = "proxy-http-server" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "async-stream", "axum", "axum-extra", - "deadpool-amqprs", "futures", "intersect-ingress-proxy-common", "jemallocator", + "rumqttc", "secrecy", "serde", "serde-aux", @@ -1442,7 +1454,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.27", "socket2", "thiserror 2.0.12", "tokio", @@ -1462,7 +1474,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls", + "rustls 0.23.27", "rustls-pki-types", "slab", "thiserror 2.0.12", @@ -1610,7 +1622,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.27", "rustls-pki-types", "serde", "serde_json", @@ -1618,7 +1630,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.2", "tokio-util", "tower", "tower-http", @@ -1661,6 +1673,24 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rumqttc" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1568e15fab2d546f940ed3a21f48bbbd1c494c90c99c4481339364a497f94a9" +dependencies = [ + "bytes", + "flume", + "futures-util", + "log", + "rustls-native-certs", + "rustls-pemfile", + "rustls-webpki 0.102.8", + "thiserror 1.0.69", + "tokio", + "tokio-rustls 0.25.0", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1683,7 +1713,21 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", +] + +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "subtle", + "zeroize", ] [[package]] @@ -1695,11 +1739,33 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.3", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -1710,6 +1776,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.3" @@ -1926,6 +2003,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2000,7 +2086,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2148,13 +2234,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.27", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 4674ce1..548f1d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ amqprs = { version = "2", features = ["traces"] } config = { version = "0.14", default-features = false, features = ["yaml"] } # TODO check to see why this can't be upgraded to v15 deadpool-amqprs = "0.3" futures = "0.3" +rumqttc = "0.24.0" secrecy = { version = "0.10", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde-aux = "4" diff --git a/README.md b/README.md index a3ec36d..c92e95e 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Caveats: - while you can have a `client` and a `server` application for both systems, you should only have one of the clients talk to the other's server. Don't connect both clients to both servers. -Currently only supports AMQP 0-9-1 as the broker protocol but can potentially support others in the future +Currently only supports AMQP 0-9-1 and MQTT 3-1-1 as the broker protocols, but can potentially support others in the future ## Why Rust? @@ -84,7 +84,7 @@ Specific configuration structs are in `proxy-http-server/src/configuration.rs` a ## Setup -### Using the RabbitMQ web management UIs +### Using the RabbitMQ web management UIs (AMQP) These instructions assume you are using the docker compose configuration and the default `conf.yaml` configurations for each. @@ -92,7 +92,10 @@ These instructions assume you are using the docker compose configuration and the 2) Make sure that you have both applications started (do NOT start more than 1 of each). Each application should be connected to a separate broker. 3) To login to the broker that the server instance uses, go to localhost:15672, username `intersect_username`, password `intersect_password` 4) To login to the broker that the client instance uses, go to localhost:15673, username `intersect_username`, password `intersect_password` -5) On each application, click on the `Exchanges` tab, and click on the `intersect-messages` exchange. +5) + - IF AMQP: On each application, click on the `Exchanges` tab, and click on the `intersect-messages` exchange. + - IF MQTT: On each application, click on the `Queues and Streams` tab, then click on the queue (it should look like `mqtt-subscription-proxy-http-clientqos1` or `mqtt-subscription-proxy-http-serverqos1`). + 6) Make sure that the `Publish message` dropdown is expanded, select the large text area which is labeled with `Payload:` For the application on `localhost:15672`, set the payload to below (no newlines): @@ -145,3 +148,7 @@ Now it's advisable to [run some INTERSECT-SDK examples](https://github.com/INTER - one exchange for all messages for each application (see `shared-deps/src/protocols/amqp/mod.rs` to get name) - routing keys will match SDK naming schematics (SOS hierarchy, "." as separator, end with ".{userspace|lifecycle|events}"). The routing key will roughly correspond to the `destination` field in an INTERSECT message, but the `destination` field only exists on userspace messages (event/lifecycle messages do not have a specific destination in mind). - The queue name is hardcoded to match the name of the application. + +## MQTT setup + +- follows similar rationale to AMQP, except does not utilize exchanges. diff --git a/charts/proxy-http-client/Chart.yaml b/charts/proxy-http-client/Chart.yaml index 3915e2b..e3bb195 100644 --- a/charts/proxy-http-client/Chart.yaml +++ b/charts/proxy-http-client/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v2 name: proxy-http-client description: "Subscribe to events over HTTP from a proxy-http-server instance, and publish them on a local broker" -version: 0.1.1 +version: 0.1.2 dependencies: - name: common repository: oci://registry-1.docker.io/bitnamicharts diff --git a/charts/proxy-http-client/templates/deployment.yaml b/charts/proxy-http-client/templates/deployment.yaml index 310a9cb..a09c9cc 100644 --- a/charts/proxy-http-client/templates/deployment.yaml +++ b/charts/proxy-http-client/templates/deployment.yaml @@ -107,6 +107,8 @@ spec: value: {{ .Values.app.broker.host | quote }} - name: PROXYAPP_BROKER__PORT value: {{ .Values.app.broker.port | quote }} + - name: PROXYAPP_BROKER__PROTOCOL + value: {{ .Values.app.broker.protocol | quote }} - name: PROXYAPP_OTHER_PROXY__URL value: {{ .Values.app.other_proxy.url | quote }} - name: PROXYAPP_OTHER_PROXY__USERNAME diff --git a/charts/proxy-http-client/values.yaml b/charts/proxy-http-client/values.yaml index 9bef1a6..121ee14 100644 --- a/charts/proxy-http-client/values.yaml +++ b/charts/proxy-http-client/values.yaml @@ -136,6 +136,7 @@ app: broker: host: "127.0.0.1" port: "5672" + protocol: "amqp" # or "mqtt" username: "" password: isSecret: false diff --git a/charts/proxy-http-server/Chart.yaml b/charts/proxy-http-server/Chart.yaml index 0be705e..e586244 100644 --- a/charts/proxy-http-server/Chart.yaml +++ b/charts/proxy-http-server/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v2 name: proxy-http-server description: "Subscribe to broker messages and broadcast them to HTTP listeners" -version: 0.1.1 +version: 0.1.2 dependencies: - name: common repository: oci://registry-1.docker.io/bitnamicharts diff --git a/charts/proxy-http-server/templates/deployment.yaml b/charts/proxy-http-server/templates/deployment.yaml index a0927df..aedbc62 100644 --- a/charts/proxy-http-server/templates/deployment.yaml +++ b/charts/proxy-http-server/templates/deployment.yaml @@ -112,6 +112,8 @@ spec: value: {{ .Values.app.broker.port | quote }} - name: PROXYAPP_APP_PORT value: {{ .Values.containerPort | quote }} + - name: PROXYAPP_BROKER__PROTOCOL + value: {{ .Values.app.broker.protocol | quote }} - name: PROXYAPP_TOPIC_PREFIX value: {{ required "app.topic_prefix not set (i.e. 'organization.facility.system.')" .Values.app.topic_prefix | quote }} - name: PROXYAPP_USERNAME diff --git a/charts/proxy-http-server/values.yaml b/charts/proxy-http-server/values.yaml index 59f1753..85a71f3 100644 --- a/charts/proxy-http-server/values.yaml +++ b/charts/proxy-http-server/values.yaml @@ -166,6 +166,7 @@ app: broker: host: "127.0.0.1" port: "5672" + protocol: "amqp" # or "mqtt" username: "" password: isSecret: false diff --git a/proxy-http-client/Cargo.toml b/proxy-http-client/Cargo.toml index f56849b..f31d793 100644 --- a/proxy-http-client/Cargo.toml +++ b/proxy-http-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "proxy-http-client" -version = "0.1.0" +version = "0.2.0" edition = "2021" publish = false default-run = "proxy-http-client" @@ -15,8 +15,8 @@ name = "proxy-http-client" [dependencies] anyhow = { workspace = true } async-stream = { workspace = true } -deadpool-amqprs = { workspace = true } futures = { workspace = true } +rumqttc = { workspace = true } secrecy = { workspace = true } serde = { workspace = true } serde-aux = { workspace = true } diff --git a/proxy-http-client/conf.yaml b/proxy-http-client/conf.yaml index 298eb80..3fa9751 100644 --- a/proxy-http-client/conf.yaml +++ b/proxy-http-client/conf.yaml @@ -8,7 +8,12 @@ broker: password: intersect_password host: "127.0.0.1" # note: differs from other config file (two separate brokers used) + # -- IF AMQP -- port: 5673 + protocol: "amqp" + # -- IF MQTT -- + #port: 1884 + #protocol: "mqtt" # use amqp topic notation, note that the system is different from the other configuration topic_prefix: "organization.facility.system2" # CHANGE THIS PER DEPLOYMENT!!! log_level: "debug" diff --git a/proxy-http-client/src/event_source.rs b/proxy-http-client/src/event_source.rs index edf7276..6d46e79 100644 --- a/proxy-http-client/src/event_source.rs +++ b/proxy-http-client/src/event_source.rs @@ -1,57 +1,40 @@ -use deadpool_amqprs::Pool; use futures::StreamExt; use reqwest_eventsource::{Event, EventSource}; use secrecy::ExposeSecret; use intersect_ingress_proxy_common::intersect_messaging::extract_eventsource_data; -use intersect_ingress_proxy_common::protocols::amqp::{ - get_channel, is_routing_key_compliant, publish::amqp_publish_message, -}; +use intersect_ingress_proxy_common::protocols::interfaces::PublishProtoHandler; use intersect_ingress_proxy_common::server_paths::SUBSCRIBE_URL; use intersect_ingress_proxy_common::signals::wait_for_os_signal; use crate::configuration::ExternalProxy; /// Return Err only if we weren't able to publish a correct message to the broker, invalid messages are ignored -async fn send_message(message: String, connection_pool: &Pool) -> Result<(), String> { +async fn send_message( + message: String, + proto_handler: &impl PublishProtoHandler, +) -> Result<(), &str> { let es_data_result = extract_eventsource_data(&message); if es_data_result.is_err() { return Ok(()); } let (topic, data) = es_data_result.unwrap(); - if !is_routing_key_compliant(&topic) { - tracing::warn!( - "{} is not a valid AMQP topic name, will not attempt publish", - topic - ); - return Ok(()); - } - tracing::debug!("Publishing message with topic: {}", &topic); - - let connection = connection_pool.get().await.map_err(|_| { - "WARNING: Couldn't get connection, message received from other proxy was NOT published on our own broker." - .to_string() - })?; - let channel = get_channel(&connection).await.map_err(|_| { - "WARNING: Couldn't get channel, message received from other proxy was NOT published on our own broker." - .to_string() - })?; - - match amqp_publish_message(channel, &topic, data).await { - Ok(()) => Ok(()), - Err(()) => Err( - "WARNING: message received from other proxy was NOT published on our own broker." - .into(), - ), + if let Err(e) = proto_handler.preverify_publish(&topic) { + tracing::warn!("invalid topic name -- {e}"); + return Ok(()); } + proto_handler.publish_message(&topic, data).await } /// Return value - exit code to use /// /// # Panics /// - Inner API could potentially panic but is currently not expected to do so -pub async fn event_source_loop(other_proxy: ExternalProxy, connection_pool: Pool) -> i32 { +pub async fn event_source_loop( + other_proxy: ExternalProxy, + proto_handler: impl PublishProtoHandler, +) -> i32 { let mut es = EventSource::new( reqwest::Client::new() .get(format!("{}{}", &other_proxy.url, SUBSCRIBE_URL)) @@ -79,7 +62,7 @@ pub async fn event_source_loop(other_proxy: ExternalProxy, connection_pool: Pool tracing::info!("connected to {}", &other_proxy.url); }, Ok(Event::Message(message)) => { - if let Err(e) = send_message(message.data, &connection_pool).await { + if let Err(e) = send_message(message.data, &proto_handler).await { tracing::error!(e); }; }, diff --git a/proxy-http-client/src/lib.rs b/proxy-http-client/src/lib.rs index faa3836..d0c9dc0 100644 --- a/proxy-http-client/src/lib.rs +++ b/proxy-http-client/src/lib.rs @@ -1,3 +1,5 @@ pub mod configuration; pub mod event_source; pub mod poster; + +pub const APPLICATION_NAME: &str = "proxy-http-client"; diff --git a/proxy-http-client/src/main.rs b/proxy-http-client/src/main.rs index eeb60ba..04ef4fd 100644 --- a/proxy-http-client/src/main.rs +++ b/proxy-http-client/src/main.rs @@ -3,46 +3,29 @@ use std::sync::Arc; use tokio::sync::oneshot; use intersect_ingress_proxy_common::configuration::get_configuration; -use intersect_ingress_proxy_common::protocols::amqp::{ - get_connection_pool, subscribe::broker_consumer_loop, verify_connection_pool, +use intersect_ingress_proxy_common::protocols::{ + amqp::init::init_amqp_proto_handlers, + interfaces::{PublishProtoHandler, SubscribeProtoHandler}, + mqtt::init::init_mqtt_proto_handlers, }; use intersect_ingress_proxy_common::telemetry::{ get_json_subscriber, get_pretty_subscriber, init_subscriber, }; -use proxy_http_client::{configuration::Settings, event_source::event_source_loop, poster::Poster}; - -const APPLICATION_NAME: &str = "proxy-http-client"; +use proxy_http_client::{ + configuration::Settings, event_source::event_source_loop, poster::Poster, APPLICATION_NAME, +}; // Muslc has a slow allocator, but we can only use jemalloc on 64-bit systems since jemalloc doesn't support i686. #[cfg(all(target_env = "musl", target_pointer_width = "64"))] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -#[tokio::main] -pub async fn main() -> anyhow::Result<()> { - let configuration = get_configuration::().expect("Failed to read configuration"); - - // Start logging - if configuration.production { - let subscriber = get_json_subscriber( - APPLICATION_NAME.into(), - configuration.log_level.to_string(), - std::io::stderr, - ); - init_subscriber(subscriber); - } else { - let subscriber = get_pretty_subscriber(configuration.log_level.to_string()); - init_subscriber(subscriber); - } - - // set up broker connection pool - let pool = get_connection_pool(&configuration.broker); - if let Err(msg) = verify_connection_pool(&pool, APPLICATION_NAME).await { - tracing::error!(msg); - std::process::exit(1); - } - +async fn begin_execution( + configuration: Settings, + publish_proto_handler: impl PublishProtoHandler, + subscribe_proto_handler: impl SubscribeProtoHandler, +) -> anyhow::Result<()> { // How this works: // - Pass in the receiver to the broker consumer loop // - In the broker consumer loop, use tokio::select! to wait for rx.recv() at key points @@ -51,10 +34,8 @@ pub async fn main() -> anyhow::Result<()> { // - This allows us to "finish up" publishing a message to our broker before killing the application. let (tx, rx) = oneshot::channel(); - let broker_join_handle = broker_consumer_loop( - pool.clone(), + let broker_join_handle = subscribe_proto_handler.begin_subscribe_loop( configuration.topic_prefix.clone(), - APPLICATION_NAME.into(), Arc::new(Poster::new(&configuration.other_proxy)), rx, ); @@ -63,7 +44,7 @@ pub async fn main() -> anyhow::Result<()> { drop(configuration); // this will run until we get an event source error or we catch an OS signal - let rc = event_source_loop(other_proxy, pool).await; + let rc = event_source_loop(other_proxy, publish_proto_handler).await; tracing::info!("Attempting graceful shutdown: No longer listening for events over HTTP"); drop(tx); @@ -71,3 +52,56 @@ pub async fn main() -> anyhow::Result<()> { std::process::exit(rc); } + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let configuration = get_configuration::().expect("Failed to read configuration"); + + // Start logging + if configuration.production { + let subscriber = get_json_subscriber( + APPLICATION_NAME.into(), + configuration.log_level.to_string(), + std::io::stderr, + ); + init_subscriber(subscriber); + } else { + let subscriber = get_pretty_subscriber(configuration.log_level.to_string()); + init_subscriber(subscriber); + } + + match configuration.broker.protocol { + intersect_ingress_proxy_common::configuration::Protocol::Amqp => { + match init_amqp_proto_handlers(&configuration.broker, APPLICATION_NAME).await { + Ok((publish_proto_handler, subscribe_proto_handler)) => { + begin_execution( + configuration, + publish_proto_handler, + subscribe_proto_handler, + ) + .await + } + Err(err) => { + tracing::error!("AMQP broker: initial verification problem -- {err}"); + std::process::exit(1); + } + } + } + intersect_ingress_proxy_common::configuration::Protocol::Mqtt => { + match init_mqtt_proto_handlers(&configuration.broker, APPLICATION_NAME).await { + Ok((publish_proto_handler, subscribe_proto_handler)) => { + begin_execution( + configuration, + publish_proto_handler, + subscribe_proto_handler, + ) + .await + } + Err(err) => { + tracing::error!("MQTT broker: initial verification problem -- {err}"); + std::process::exit(1); + } + } + } + } +} diff --git a/proxy-http-client/src/poster.rs b/proxy-http-client/src/poster.rs index 0615f67..17f762c 100644 --- a/proxy-http-client/src/poster.rs +++ b/proxy-http-client/src/poster.rs @@ -1,6 +1,6 @@ use secrecy::ExposeSecret; -use intersect_ingress_proxy_common::protocols::amqp::subscribe::HttpBroadcast; +use intersect_ingress_proxy_common::protocols::interfaces::HttpBroadcast; use intersect_ingress_proxy_common::server_paths::PUBLISH_URL; use crate::configuration::ExternalProxy; @@ -29,10 +29,9 @@ impl HttpBroadcast for Poster { .body(event) .send() .await; - if result.is_ok() { - let result = result.unwrap(); - let status = result.status().as_u16(); - match result.bytes().await { + if let Ok(response) = result { + let status = response.status().as_u16(); + match response.bytes().await { Ok(bytes) => tracing::debug!("{:?}", bytes), Err(err) => tracing::debug!("ERROR: {}", err.to_string()), } diff --git a/proxy-http-server/Cargo.toml b/proxy-http-server/Cargo.toml index b498f1f..fc0d807 100644 --- a/proxy-http-server/Cargo.toml +++ b/proxy-http-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "proxy-http-server" -version = "0.1.0" +version = "0.2.0" edition = "2021" publish = false default-run = "proxy-http-server" @@ -15,8 +15,8 @@ name = "proxy-http-server" [dependencies] anyhow = { workspace = true } async-stream = { workspace = true } -deadpool-amqprs = { workspace = true } futures = { workspace = true } +rumqttc = { workspace = true } secrecy = { workspace = true } serde = { workspace = true } serde-aux = { workspace = true } diff --git a/proxy-http-server/conf.yaml b/proxy-http-server/conf.yaml index 57577a6..f40475a 100644 --- a/proxy-http-server/conf.yaml +++ b/proxy-http-server/conf.yaml @@ -5,7 +5,12 @@ broker: password: intersect_password host: "127.0.0.1" # note: differs from other config file (two separate brokers used) + # -- IF AMQP -- port: 5672 + protocol: "amqp" + # -- IF MQTT -- + #port: 1883 + #protocol: "mqtt" # use amqp topic notation topic_prefix: "organization.facility.system" # CHANGE THIS PER DEPLOYMENT!!! log_level: "debug" diff --git a/proxy-http-server/src/broadcaster.rs b/proxy-http-server/src/broadcaster.rs index 06d3135..d9ea150 100644 --- a/proxy-http-server/src/broadcaster.rs +++ b/proxy-http-server/src/broadcaster.rs @@ -2,7 +2,7 @@ use axum::response::sse::Event; use std::sync::Arc; use tokio::sync::broadcast; -use intersect_ingress_proxy_common::protocols::amqp::subscribe::HttpBroadcast; +use intersect_ingress_proxy_common::protocols::interfaces::HttpBroadcast; /// This broadcaster is an optimized implementation of a single-producer, multi-consumer channel. /// The Broadcaster is effectively the "link" between the broker and the HTTP gateway. diff --git a/proxy-http-server/src/lib.rs b/proxy-http-server/src/lib.rs index 6759e27..9c7ecfe 100644 --- a/proxy-http-server/src/lib.rs +++ b/proxy-http-server/src/lib.rs @@ -1,4 +1,7 @@ pub mod broadcaster; pub mod configuration; pub mod routes; -pub mod webapp; +pub mod webapp_server; +pub mod webapp_state; + +pub const APPLICATION_NAME: &str = "proxy-http-server"; diff --git a/proxy-http-server/src/main.rs b/proxy-http-server/src/main.rs index 03504c5..f8fc308 100644 --- a/proxy-http-server/src/main.rs +++ b/proxy-http-server/src/main.rs @@ -1,53 +1,34 @@ +use std::sync::Arc; + use tokio::sync::oneshot; use intersect_ingress_proxy_common::configuration::get_configuration; -use intersect_ingress_proxy_common::protocols::amqp::subscribe::broker_consumer_loop; -use intersect_ingress_proxy_common::protocols::amqp::{ - get_connection_pool, verify_connection_pool, +use intersect_ingress_proxy_common::protocols::{ + amqp::init::init_amqp_proto_handlers, interfaces::SubscribeProtoHandler, + mqtt::init::init_mqtt_proto_handlers, }; use intersect_ingress_proxy_common::telemetry::{ get_json_subscriber, get_pretty_subscriber, init_subscriber, }; use proxy_http_server::{ - broadcaster::Broadcaster, configuration::Settings, webapp::WebApplication, + broadcaster::Broadcaster, + configuration::Settings, + webapp_server::{AmqpWebApplication, MqttWebApplication, WebApplication}, + APPLICATION_NAME, }; -const APPLICATION_NAME: &str = "proxy-http-server"; - // Muslc has a slow allocator, but we can only use jemalloc on 64-bit systems since jemalloc doesn't support i686. #[cfg(all(target_env = "musl", target_pointer_width = "64"))] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let configuration = get_configuration::().expect("Failed to read configuration"); - - // Start logging - if configuration.production { - let subscriber = get_json_subscriber( - APPLICATION_NAME.into(), - configuration.log_level.to_string(), - std::io::stderr, - ); - init_subscriber(subscriber); - } else { - let subscriber = get_pretty_subscriber(configuration.log_level.to_string()); - init_subscriber(subscriber); - } - - // set up broker connection pool - let pool = get_connection_pool(&configuration.broker); - if let Err(msg) = verify_connection_pool(&pool, APPLICATION_NAME).await { - tracing::error!(msg); - std::process::exit(1); - } - - let broadcaster = Broadcaster::new(); - let application = - WebApplication::build(&configuration, broadcaster.clone(), pool.clone()).await?; - +async fn begin_execution( + configuration: Settings, + subscribe_proto_handler: impl SubscribeProtoHandler, + application: impl WebApplication, + broadcaster: Arc, +) -> anyhow::Result<()> { // How this works: // - Pass in the receiver to the broker consumer loop // - In the broker consumer loop, use tokio::select! to wait for rx.recv() at key points @@ -55,10 +36,8 @@ async fn main() -> anyhow::Result<()> { // - This allows us to "finish up" publishing a message to our broker before killing the application. let (tx, rx) = oneshot::channel(); - let broker_join_handle = broker_consumer_loop( - pool, + let broker_join_handle = subscribe_proto_handler.begin_subscribe_loop( configuration.topic_prefix.clone(), - APPLICATION_NAME.into(), broadcaster, rx, ); @@ -73,3 +52,72 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let configuration = get_configuration::().expect("Failed to read configuration"); + + // Start logging + if configuration.production { + let subscriber = get_json_subscriber( + APPLICATION_NAME.into(), + configuration.log_level.to_string(), + std::io::stderr, + ); + init_subscriber(subscriber); + } else { + let subscriber = get_pretty_subscriber(configuration.log_level.to_string()); + init_subscriber(subscriber); + } + + let broadcaster = Broadcaster::new(); + + match configuration.broker.protocol { + intersect_ingress_proxy_common::configuration::Protocol::Amqp => { + match init_amqp_proto_handlers(&configuration.broker, APPLICATION_NAME).await { + Ok((publish_proto_handler, subscribe_proto_handler)) => { + let web_server = AmqpWebApplication::build( + &configuration, + broadcaster.clone(), + publish_proto_handler, + ) + .await?; + begin_execution( + configuration, + subscribe_proto_handler, + web_server, + broadcaster, + ) + .await + } + Err(err) => { + tracing::error!("AMQP broker: initial verification problem -- {err}"); + std::process::exit(1); + } + } + } + intersect_ingress_proxy_common::configuration::Protocol::Mqtt => { + match init_mqtt_proto_handlers(&configuration.broker, APPLICATION_NAME).await { + Ok((publish_proto_handler, subscribe_proto_handler)) => { + let web_server = MqttWebApplication::build( + &configuration, + broadcaster.clone(), + publish_proto_handler, + ) + .await?; + begin_execution( + configuration, + subscribe_proto_handler, + web_server, + broadcaster, + ) + .await + } + Err(err) => { + tracing::error!("MQTT broker: initial verification problem -- {err}"); + std::process::exit(1); + } + } + } + } +} diff --git a/proxy-http-server/src/routes/publish.rs b/proxy-http-server/src/routes/publish.rs index d6f1207..a4310ec 100644 --- a/proxy-http-server/src/routes/publish.rs +++ b/proxy-http-server/src/routes/publish.rs @@ -5,15 +5,13 @@ use axum_extra::{ headers::{authorization::Basic, Authorization}, TypedHeader, }; +use intersect_ingress_proxy_common::protocols::interfaces::PublishProtoHandler; use secrecy::ExposeSecret; use std::sync::Arc; use intersect_ingress_proxy_common::intersect_messaging::extract_eventsource_data; -use intersect_ingress_proxy_common::protocols::amqp::{ - get_channel, is_routing_key_compliant, publish::amqp_publish_message, -}; -use crate::webapp::WebApplicationState; +use crate::webapp_state::WebApplicationState; /// HTTP POST endpoint which will publish a message meeting the INTERSECT specification /// @@ -22,12 +20,12 @@ use crate::webapp::WebApplicationState; /// - Sends back a 400 if the message body is improperly formatted /// - Sends back a 500 if the server was unable to send the message pub async fn publish_message( - State(app_state): State>, + State(app_state): State>, TypedHeader(authorization): TypedHeader>, request: Request, ) -> Result<(StatusCode, String), (StatusCode, String)> { - if authorization.username() != app_state.username - || authorization.password() != app_state.password.expose_secret() + if authorization.username() != app_state.get_username() + || authorization.password() != app_state.get_password().expose_secret() { return Err((StatusCode::UNAUTHORIZED, "unauthorized".to_string())); } @@ -48,41 +46,22 @@ pub async fn publish_message( "body is not valid INTERSECT format".to_string(), ) })?; - if !is_routing_key_compliant(&topic) { - tracing::warn!( - "{} is not a valid AMQP topic name, will not attempt publish", - topic - ); - return Err(( - StatusCode::BAD_REQUEST, - format!("{topic} is not a valid AMQP topic name"), - )); + + if let Err(e) = app_state.get_proto_handler().preverify_publish(&topic) { + return Err((StatusCode::BAD_REQUEST, e)); } - tracing::debug!("Publishing message with topic: {}", &topic); - let connection = app_state.amqp_connection_pool.get().await.map_err(|e| { - tracing::error!(error = ?e, "cannot connect to broker"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "server fault, message not published".to_string(), - ) - })?; + tracing::debug!("Publishing message with topic: {}", &topic); - let channel = get_channel(&connection).await.map_err(|e| { - tracing::error!(error = ?e, "cannot create channel on broker"); - ( + match app_state + .get_proto_handler() + .publish_message(&topic, data) + .await + { + Ok(()) => Ok((StatusCode::CREATED, "Success".to_string())), + Err(_) => Err(( StatusCode::INTERNAL_SERVER_ERROR, "server fault, message not published".to_string(), - ) - })?; - amqp_publish_message(channel, &topic, data) - .await - .map_err(|()| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - "server fault, message not published".to_string(), - ) - })?; - - Ok((StatusCode::CREATED, "Success".to_string())) + )), + } } diff --git a/proxy-http-server/src/routes/subscribe.rs b/proxy-http-server/src/routes/subscribe.rs index 8133b69..cc0f5f3 100644 --- a/proxy-http-server/src/routes/subscribe.rs +++ b/proxy-http-server/src/routes/subscribe.rs @@ -11,14 +11,14 @@ use secrecy::ExposeSecret; use std::convert::Infallible; use std::sync::Arc; -use crate::webapp::WebApplicationState; +use crate::webapp_state::WebApplicationState; use intersect_ingress_proxy_common::signals::wait_for_os_signal; #[allow(clippy::needless_pass_by_value)] fn sse_response( - app_state: Arc, + app_state: Arc, ) -> Sse>> { - let mut rx = app_state.broadcaster.add_client(); + let mut rx = app_state.get_broadcaster().add_client(); let stream = async_stream::stream! { loop { @@ -57,11 +57,11 @@ fn sse_response( /// `` /// `` pub async fn sse_handler( - State(app_state): State>, + State(app_state): State>, TypedHeader(authorization): TypedHeader>, ) -> impl IntoResponse { - if authorization.username() != app_state.username - || authorization.password() != app_state.password.expose_secret() + if authorization.username() != app_state.get_username() + || authorization.password() != app_state.get_password().expose_secret() { return (StatusCode::UNAUTHORIZED, "unauthorized").into_response(); } diff --git a/proxy-http-server/src/webapp.rs b/proxy-http-server/src/webapp.rs deleted file mode 100644 index d1919af..0000000 --- a/proxy-http-server/src/webapp.rs +++ /dev/null @@ -1,121 +0,0 @@ -use axum::{routing::get, routing::post, serve::Serve, Router}; -use deadpool_amqprs::Pool; -use secrecy::SecretString; -use std::sync::Arc; -use tokio::net::TcpListener; -use tower::ServiceBuilder; -use tower_http::{ - request_id::MakeRequestUuid, - trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, - ServiceBuilderExt, -}; -use tracing::Level; - -use intersect_ingress_proxy_common::server_paths::{PUBLISH_URL, SUBSCRIBE_URL}; -use intersect_ingress_proxy_common::signals::wait_for_os_signal; - -use crate::{ - broadcaster::Broadcaster, - configuration::Settings, - routes::{ - health_check::health_check, not_found::handler_404, publish::publish_message, - subscribe::sse_handler, - }, -}; - -/// This is state that can be accessed by any endpoint on the server. -pub struct WebApplicationState { - /// AMQP connection pool for publishing to the broker. - pub amqp_connection_pool: Pool, - /// this broadcaster gets messages published to it from one source and can publish many messages from it. Use this if an HTTP endpoint needs to react to a subscription. - pub broadcaster: Arc, - /// basic auth username - pub username: String, - /// basic auth password - pub password: SecretString, -} - -type WebAppServer = Serve; -pub struct WebApplication { - port: u16, - server: WebAppServer, -} - -impl WebApplication { - /// - /// # Errors - /// - errors if unable to bind to provided TCP port - pub async fn build( - configuration: &Settings, - broadcaster: Arc, - amqp_pool: Pool, - ) -> Result { - let address = format!( - "{}:{}", - if configuration.production { - "0.0.0.0" - } else { - "127.0.0.1" - }, - configuration.app_port - ); - let listener = TcpListener::bind(address).await?; - let port = listener.local_addr()?.port(); - let server = create_axum(listener, configuration, broadcaster, amqp_pool); - - tracing::info!("Web server is running on port {}", port); - - Ok(Self { port, server }) - } - - pub fn port(&self) -> u16 { - self.port - } - - /// - /// # Errors - /// - Errors if unable to initialize web server - pub async fn run_until_stopped(self) -> Result<(), std::io::Error> { - // the return type of "with_graceful_shutdown" is unstable, so set it up here - self.server - .with_graceful_shutdown(wait_for_os_signal()) - .await - } -} - -fn create_axum( - listener: TcpListener, - configuration: &Settings, - broadcaster: Arc, - amqp_pool: Pool, -) -> WebAppServer { - let middleware = ServiceBuilder::new() - .set_x_request_id(MakeRequestUuid) - .layer( - TraceLayer::new_for_http() - .make_span_with( - DefaultMakeSpan::new() - .include_headers(true) - .level(Level::INFO), - ) - .on_response(DefaultOnResponse::new().include_headers(true)), - ) - .propagate_x_request_id(); - - let app_state = Arc::new(WebApplicationState { - amqp_connection_pool: amqp_pool, - broadcaster, - username: configuration.username.clone(), - password: configuration.password.clone(), - }); - - let app = Router::new() - .route(SUBSCRIBE_URL, get(sse_handler)) - .route(PUBLISH_URL, post(publish_message)) - .layer(middleware) // routes added before this layer will be logged, after this layer will not be logged - .with_state(app_state) - .route("/healthcheck", get(health_check)) - .fallback(handler_404); - - axum::serve(listener, app) -} diff --git a/proxy-http-server/src/webapp_server.rs b/proxy-http-server/src/webapp_server.rs new file mode 100644 index 0000000..59ec8e8 --- /dev/null +++ b/proxy-http-server/src/webapp_server.rs @@ -0,0 +1,171 @@ +use axum::{ + routing::{get, post}, + serve::Serve, + Router, +}; +use std::sync::Arc; +use tokio::net::TcpListener; +use tower::ServiceBuilder; +use tower_http::{ + request_id::MakeRequestUuid, + trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, + ServiceBuilderExt, +}; +use tracing::Level; + +use intersect_ingress_proxy_common::protocols::{ + amqp::publish::AmqpPublishProtoHandler, mqtt::publish::MqttPublishProtoHandler, +}; +use intersect_ingress_proxy_common::server_paths::{PUBLISH_URL, SUBSCRIBE_URL}; +use intersect_ingress_proxy_common::signals::wait_for_os_signal; + +use crate::{ + broadcaster::Broadcaster, + configuration::Settings, + routes::{ + health_check::health_check, not_found::handler_404, publish::publish_message, + subscribe::sse_handler, + }, + webapp_state::{AmqpWebApplicationState, MqttWebApplicationState, WebApplicationState}, +}; + +type WebAppServer = Serve; + +fn get_router(initial_state: S) -> Router { + let middleware = ServiceBuilder::new() + .set_x_request_id(MakeRequestUuid) + .layer( + TraceLayer::new_for_http() + .make_span_with( + DefaultMakeSpan::new() + .include_headers(true) + .level(Level::INFO), + ) + .on_response(DefaultOnResponse::new().include_headers(true)), + ) + .propagate_x_request_id(); + Router::new() + .route(SUBSCRIBE_URL, get(sse_handler)) + .route(PUBLISH_URL, post(publish_message)) + .layer(middleware) + .with_state(Arc::new(initial_state)) + .route("/healthcheck", get(health_check)) + .fallback(handler_404) +} + +pub trait WebApplication { + fn port(&self) -> u16; + fn run_until_stopped( + self, + ) -> impl std::future::Future> + Send; +} + +pub struct AmqpWebApplication { + port: u16, + server: WebAppServer, +} + +impl AmqpWebApplication { + /// + /// # Errors + /// - errors if unable to bind to provided TCP port + pub async fn build( + configuration: &Settings, + broadcaster: Arc, + proto_handler: AmqpPublishProtoHandler, + ) -> Result { + let address = format!( + "{}:{}", + if configuration.production { + "0.0.0.0" + } else { + "127.0.0.1" + }, + configuration.app_port + ); + let listener = TcpListener::bind(address).await?; + let port = listener.local_addr()?.port(); + let router = get_router(AmqpWebApplicationState { + proto_handler: proto_handler.clone(), + broadcaster, + username: configuration.username.clone(), + password: configuration.password.clone(), + }); + let server = axum::serve(listener, router); + + tracing::info!("Web server is running on port {}", port); + + Ok(Self { port, server }) + } +} + +impl WebApplication for AmqpWebApplication { + fn port(&self) -> u16 { + self.port + } + + /// + /// # Errors + /// - Errors if unable to initialize web server + async fn run_until_stopped(self) -> Result<(), std::io::Error> { + // the return type of "with_graceful_shutdown" is unstable, so set it up here + self.server + .with_graceful_shutdown(wait_for_os_signal()) + .await + } +} + +pub struct MqttWebApplication { + port: u16, + server: WebAppServer, +} + +impl MqttWebApplication { + /// + /// # Errors + /// - errors if unable to bind to provided TCP port + pub async fn build( + configuration: &Settings, + broadcaster: Arc, + proto_handler: MqttPublishProtoHandler, + ) -> Result { + let address = format!( + "{}:{}", + if configuration.production { + "0.0.0.0" + } else { + "127.0.0.1" + }, + configuration.app_port + ); + let listener = TcpListener::bind(address).await?; + let port = listener.local_addr()?.port(); + let router = get_router(MqttWebApplicationState { + proto_handler: proto_handler.clone(), + broadcaster, + username: configuration.username.clone(), + password: configuration.password.clone(), + }); + let server = axum::serve(listener, router); + + tracing::info!("Web server is running on port {}", port); + + Ok(Self { port, server }) + } +} + +impl WebApplication for MqttWebApplication { + fn port(&self) -> u16 { + self.port + } + + /// + /// # Errors + /// - Errors if unable to initialize web server + async fn run_until_stopped(self) -> Result<(), std::io::Error> { + // the return type of "with_graceful_shutdown" is unstable, so set it up here + self.server + .with_graceful_shutdown(wait_for_os_signal()) + .await + } +} diff --git a/proxy-http-server/src/webapp_state.rs b/proxy-http-server/src/webapp_state.rs new file mode 100644 index 0000000..baa56f5 --- /dev/null +++ b/proxy-http-server/src/webapp_state.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; + +use secrecy::SecretString; + +use intersect_ingress_proxy_common::protocols::interfaces::PublishProtoHandler; +use intersect_ingress_proxy_common::protocols::{ + amqp::publish::AmqpPublishProtoHandler, mqtt::publish::MqttPublishProtoHandler, +}; + +use crate::broadcaster::Broadcaster; + +/// This is state that can be accessed by any endpoint on the server. +pub trait WebApplicationState { + fn get_proto_handler(&self) -> &impl PublishProtoHandler; + fn get_broadcaster(&self) -> &Arc; + fn get_username(&self) -> &str; + fn get_password(&self) -> &SecretString; +} + +/// AMQP specific state +#[derive(Clone)] +pub struct AmqpWebApplicationState { + /// protocol handler which can publish to the message broker + pub proto_handler: AmqpPublishProtoHandler, + /// this broadcaster gets messages published to it from one source and can publish many messages from it. Use this if an HTTP endpoint needs to react to a subscription. + pub broadcaster: Arc, + /// basic auth username + pub username: String, + /// basic auth password + pub password: SecretString, +} + +impl WebApplicationState for AmqpWebApplicationState { + fn get_proto_handler(&self) -> &impl PublishProtoHandler { + &self.proto_handler + } + + fn get_broadcaster(&self) -> &Arc { + &self.broadcaster + } + + fn get_username(&self) -> &str { + &self.username + } + + fn get_password(&self) -> &SecretString { + &self.password + } +} + +/// MQTT specific state +pub struct MqttWebApplicationState { + /// protocol handler which can publish to the message broker + pub proto_handler: MqttPublishProtoHandler, + /// this broadcaster gets messages published to it from one source and can publish many messages from it. Use this if an HTTP endpoint needs to react to a subscription. + pub broadcaster: Arc, + /// basic auth username + pub username: String, + /// basic auth password + pub password: SecretString, +} + +impl WebApplicationState for MqttWebApplicationState { + fn get_proto_handler(&self) -> &impl PublishProtoHandler { + &self.proto_handler + } + + fn get_broadcaster(&self) -> &Arc { + &self.broadcaster + } + + fn get_username(&self) -> &str { + &self.username + } + + fn get_password(&self) -> &SecretString { + &self.password + } +} diff --git a/shared-deps/Cargo.toml b/shared-deps/Cargo.toml index 4484d4c..53cd00f 100644 --- a/shared-deps/Cargo.toml +++ b/shared-deps/Cargo.toml @@ -14,6 +14,7 @@ async-stream = { workspace = true } config = { workspace = true } deadpool-amqprs = { workspace = true } futures = { workspace = true } +rumqttc = { workspace = true } secrecy = { workspace = true } serde = { workspace = true } serde-aux = { workspace = true } diff --git a/shared-deps/src/configuration.rs b/shared-deps/src/configuration.rs index 4e80709..219fd94 100644 --- a/shared-deps/src/configuration.rs +++ b/shared-deps/src/configuration.rs @@ -7,6 +7,34 @@ use secrecy::SecretString; use serde::{de::Error as deError, Deserialize, Deserializer}; use serde_aux::field_attributes::deserialize_number_from_string; +#[derive(serde::Deserialize, Copy, Clone, Debug)] +#[repr(u8)] +pub enum Protocol { + Amqp, + Mqtt, +} + +impl FromStr for Protocol { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "amqp" => Ok(Protocol::Amqp), + "mqtt" => Ok(Protocol::Mqtt), + wrong => Err(format!("'{wrong}' is not a valid protocol")), + } + } +} + +impl Display for Protocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Protocol::Amqp => f.write_str("amqp"), + Protocol::Mqtt => f.write_str("mqtt"), + } + } +} + #[derive(serde::Deserialize, Clone, Debug)] pub struct BrokerSettings { /// broker username @@ -18,6 +46,7 @@ pub struct BrokerSettings { pub port: u16, /// broker hostname pub host: String, + pub protocol: Protocol, } #[derive(serde::Deserialize, Clone, Debug)] diff --git a/shared-deps/src/protocols/amqp/init.rs b/shared-deps/src/protocols/amqp/init.rs new file mode 100644 index 0000000..4fde425 --- /dev/null +++ b/shared-deps/src/protocols/amqp/init.rs @@ -0,0 +1,24 @@ +use crate::{ + configuration::BrokerSettings, + protocols::amqp::{ + publish::AmqpPublishProtoHandler, + subscribe::AmqpSubscribeProtoHandler, + utils::{get_connection_pool, verify_connection_pool}, + }, +}; + +/// Sets up the AMQP proto handlers, and verifies that we can connect to the AMQP broker. +/// +/// # Errors +/// - If we can't make an initial connection to the broker, return Err. +pub async fn init_amqp_proto_handlers( + broker_config: &BrokerSettings, + application_name: &'static str, +) -> Result<(AmqpPublishProtoHandler, AmqpSubscribeProtoHandler), String> { + let pool = get_connection_pool(broker_config); + verify_connection_pool(&pool, application_name).await?; + + let publish_handler = AmqpPublishProtoHandler::new(pool.clone(), application_name); + let subscribe_handler = AmqpSubscribeProtoHandler::new(pool, application_name); + Ok((publish_handler, subscribe_handler)) +} diff --git a/shared-deps/src/protocols/amqp/mod.rs b/shared-deps/src/protocols/amqp/mod.rs index d5fed5f..4397e20 100644 --- a/shared-deps/src/protocols/amqp/mod.rs +++ b/shared-deps/src/protocols/amqp/mod.rs @@ -1,118 +1,4 @@ -use amqprs::{ - callbacks::DefaultChannelCallback, - channel::{Channel, ExchangeDeclareArguments, QueueBindArguments, QueueDeclareArguments}, - connection::{Connection, OpenConnectionArguments}, -}; -use deadpool_amqprs::{Config, Pool}; -use secrecy::ExposeSecret; - -use crate::{configuration::BrokerSettings, intersect_messaging::INTERSECT_MESSAGE_EXCHANGE}; - +pub mod init; pub mod publish; pub mod subscribe; - -/// Get an AMQP connection pool, this pool will manage all connections to the broker -/// -/// NOTE: calling this function cannot fail on its own, you have to call pool.get().await for it to fail -#[must_use] -pub fn get_connection_pool(connection_details: &BrokerSettings) -> Pool { - let mut args = OpenConnectionArguments::new( - &connection_details.host, - connection_details.port, - &connection_details.username, - connection_details.password.expose_secret(), - ); - args.virtual_host("/"); - - let config = Config::new_with_con_args(args); - config.create_pool() -} - -/// This is our initial verification step. We will make sure that we can connect and that the exchanges/queues are set up. -/// -/// # Errors -/// - Errors if can't connect to server or has invalid permissions -pub async fn verify_connection_pool(pool: &Pool, queue_name_src: &str) -> Result<(), String> { - let connection = pool - .get() - .await - .map_err(|_| "Couldn't connect to broker, check your credentials.".to_string())?; - - let channel = get_channel(&connection) - .await - .map_err(|_| "Couldn't make verification channel")?; - - make_exchange(&channel).await.map_err(|_| { - "Couldn't make the INTERSECT exchange or confirm that it exists.".to_string() - })?; - - // we'll use a persistent queue named "proxy-http-client", as there should only be one proxy-http-server deployment per System - // TODO - note that we should probably name queues larger than 127 characters with a hashed key - let Some((queue_name, _, _)) = channel - .queue_declare(QueueDeclareArguments::durable_client_named(queue_name_src)) - .await - .map_err(|_| format!("Couldn't declare the {queue_name_src} queue"))? - else { - // unlikely this pops - return Err("didn't get correct args back from queue declaration".to_string()); - }; - - // listen for every single message on the exchange, we must do this due to the way userspace messages work - channel - .queue_bind(QueueBindArguments::new( - &queue_name, - INTERSECT_MESSAGE_EXCHANGE, - "#", - )) - .await - .map_err(|_| { - format!( - "Couldn't bind the {} exchange to the {} queue", - INTERSECT_MESSAGE_EXCHANGE, &queue_name - ) - })?; - channel - .close() - .await - .map_err(|_| "Couldn't close the setup channel".to_string())?; - - Ok(()) -} - -/// open a channel on the provided connection -/// -/// Returns: -/// - the channel -/// -/// # Errors -/// - Errors if failure to communicate with broker server -pub async fn get_channel(connection: &Connection) -> Result { - let channel = connection.open_channel(None).await?; - channel.register_callback(DefaultChannelCallback).await?; - Ok(channel) -} - -/// logic for declaring the INTERSECT exchange - need to do this in case no services/systems have declared it -/// -/// Returns: -/// - result of whether or not the exchange declaration was successful -/// -/// # Errors -/// - Errors if failure to communicate with broker server -pub async fn make_exchange(channel: &Channel) -> Result<(), amqprs::error::Error> { - channel - .exchange_declare( - ExchangeDeclareArguments::new(INTERSECT_MESSAGE_EXCHANGE, "topic") - .durable(true) - .finish(), - ) - .await -} - -/// make sure that the routing key is valid for AMQP -/// we do not permit publishing on wildcards -#[must_use] -pub fn is_routing_key_compliant(key: &str) -> bool { - !key.chars() - .any(|c| !c.is_alphanumeric() && c != '-' && c != '_' && c != '.' && c != ':') -} +pub mod utils; diff --git a/shared-deps/src/protocols/amqp/publish.rs b/shared-deps/src/protocols/amqp/publish.rs index ec893d8..ff07328 100644 --- a/shared-deps/src/protocols/amqp/publish.rs +++ b/shared-deps/src/protocols/amqp/publish.rs @@ -1,36 +1,77 @@ -use amqprs::{ - channel::{BasicPublishArguments, Channel}, - BasicProperties, +use amqprs::{channel::BasicPublishArguments, BasicProperties}; +use deadpool_amqprs::Pool; + +use crate::protocols::amqp::utils::get_channel; +use crate::protocols::proxy::is_routing_key_compliant; +use crate::{ + intersect_messaging::INTERSECT_MESSAGE_EXCHANGE, protocols::interfaces::PublishProtoHandler, }; -use crate::intersect_messaging::INTERSECT_MESSAGE_EXCHANGE; - -/// publish an INTERSECT message on the broker; if unable to publish, return an error -/// -/// this closes and consumes the publishing channel -/// -/// # Errors -/// - errors if the message couldn't be published. Note that it does NOT error if it has trouble closing the channel. -pub async fn amqp_publish_message(channel: Channel, topic: &str, data: String) -> Result<(), ()> { - let args = BasicPublishArguments::new(INTERSECT_MESSAGE_EXCHANGE, topic); - tracing::debug!("Preparing to publish message on broker: {}", &data); - match channel - .basic_publish( - BasicProperties::default().with_persistence(true).finish(), - data.into_bytes(), - args, - ) - .await - { - Ok(()) => tracing::debug!("message published successfully"), - Err(e) => { - tracing::error!(error = ?e, "could not publish message"); - return Err(()); +#[derive(Clone)] +pub struct AmqpPublishProtoHandler { + pool: Pool, + /// `application_name` is used for the hardcoded queue name and for debugging purposes + application_name: &'static str, +} + +impl std::fmt::Debug for AmqpPublishProtoHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AmqpPublishProtoHandler") + .field("application_name", &self.application_name) + .finish() + } +} + +impl AmqpPublishProtoHandler { + #[must_use] + pub fn new(pool: Pool, application_name: &'static str) -> Self { + Self { + pool, + application_name, } } - if let Err(e) = channel.close().await { - tracing::warn!(error = ?e, "could not close channel after publishing message"); +} + +impl PublishProtoHandler for AmqpPublishProtoHandler { + fn preverify_publish(&self, topic: &str) -> Result<(), String> { + if !is_routing_key_compliant(topic) { + return Err(format!( + "'{topic}' does not meet the INTERSECT proxy-app routing key specification." + )); + } + Ok(()) } - Ok(()) + async fn publish_message(&self, topic: &str, data: String) -> Result<(), &str> { + let connection = self.pool.get().await.map_err(|e| { + tracing::error!(error = ?e, "cannot connect to broker"); + "cannot connect to broker" + })?; + let channel = get_channel(&connection).await.map_err(|e| { + tracing::error!(error = ?e, "cannot create channel on broker"); + "cannot create channel on broker" + })?; + + let args = BasicPublishArguments::new(INTERSECT_MESSAGE_EXCHANGE, topic); + tracing::debug!("Preparing to publish message on broker: {}", &data); + match channel + .basic_publish( + BasicProperties::default().with_persistence(true).finish(), + data.into_bytes(), + args, + ) + .await + { + Ok(()) => tracing::debug!("message published successfully"), + Err(e) => { + tracing::error!(error = ?e, "could not publish message"); + return Err("could not publish message"); + } + } + if let Err(e) = channel.close().await { + tracing::warn!(error = ?e, "could not close channel after publishing message"); + } + + Ok(()) + } } diff --git a/shared-deps/src/protocols/amqp/subscribe.rs b/shared-deps/src/protocols/amqp/subscribe.rs index ddd6729..e1b71d2 100644 --- a/shared-deps/src/protocols/amqp/subscribe.rs +++ b/shared-deps/src/protocols/amqp/subscribe.rs @@ -9,41 +9,57 @@ use tokio::sync::oneshot::Receiver; use uuid::Uuid; use crate::intersect_messaging::{make_eventsource_data, should_message_passthrough}; -use crate::protocols::amqp::{get_channel, verify_connection_pool}; - -/// Trait which should be implemented by the application to handle a formatted message, ready to send to a server or clients -pub trait HttpBroadcast { - /// Return true if we can consider the event to be successfully "published" - /// note that this does not *have* to have an asynchronous internal implementation, it should just allow for one - fn publish_event_to_http( - &self, - event: String, - ) -> impl std::future::Future + Send; +use crate::protocols::amqp::utils::{get_channel, verify_connection_pool}; +use crate::protocols::interfaces::{HttpBroadcast, SubscribeProtoHandler}; + +pub struct AmqpSubscribeProtoHandler { + pool: Pool, + /// `application_name` is used for the hardcoded queue name and for debugging purposes + application_name: &'static str, } -pub fn broker_consumer_loop( - amqp_connection_pool: Pool, - config_topic: String, - queue_name_src: String, - broadcaster: Arc, - killswitch: Receiver<()>, -) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - broker_consumer_loop_inner( - amqp_connection_pool, - config_topic, - queue_name_src, - broadcaster, - killswitch, - ) - .await; - }) +impl std::fmt::Debug for AmqpSubscribeProtoHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AmqpSubscribeProtoHandler") + .field("application_name", &self.application_name) + .finish() + } +} + +impl AmqpSubscribeProtoHandler { + #[must_use] + pub fn new(pool: Pool, application_name: &'static str) -> Self { + Self { + pool, + application_name, + } + } +} + +impl SubscribeProtoHandler for AmqpSubscribeProtoHandler { + fn begin_subscribe_loop( + self, + config_topic: String, + broadcaster: Arc, + killswitch: Receiver<()>, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + broker_consumer_loop_inner( + self.pool, + config_topic, + self.application_name, + broadcaster, + killswitch, + ) + .await; + }) + } } async fn broker_consumer_loop_inner( amqp_connection_pool: Pool, config_topic: String, - queue_name_src: String, + queue_name_src: &str, broadcaster: Arc, // calls recv() once the EventSource loop or HTTP server catches an OS signal mut killswitch: Receiver<()>, @@ -69,7 +85,7 @@ async fn broker_consumer_loop_inner( } if needs_reverification { - if let Err(e) = verify_connection_pool(&amqp_connection_pool, &queue_name_src).await { + if let Err(e) = verify_connection_pool(&amqp_connection_pool, queue_name_src).await { tracing::warn!(error = ?e, "Couldn't fully recover broker setup"); continue; } @@ -84,7 +100,7 @@ async fn broker_consumer_loop_inner( let channel = channel_result.unwrap(); // Do NOT automatically acknowledge messages, we may not be able to forward them. - let args = BasicConsumeArguments::new(&queue_name_src, &Uuid::new_v4().to_string()) + let args = BasicConsumeArguments::new(queue_name_src, &Uuid::new_v4().to_string()) .manual_ack(true) // only ack messages we should actually publish, we will nack the others .finish(); diff --git a/shared-deps/src/protocols/amqp/utils.rs b/shared-deps/src/protocols/amqp/utils.rs new file mode 100644 index 0000000..fc9303b --- /dev/null +++ b/shared-deps/src/protocols/amqp/utils.rs @@ -0,0 +1,110 @@ +use amqprs::{ + callbacks::DefaultChannelCallback, + channel::{Channel, ExchangeDeclareArguments, QueueBindArguments, QueueDeclareArguments}, + connection::{Connection, OpenConnectionArguments}, +}; +use deadpool_amqprs::{Config, Pool}; +use secrecy::ExposeSecret; + +use crate::{configuration::BrokerSettings, intersect_messaging::INTERSECT_MESSAGE_EXCHANGE}; + +/// Get an AMQP connection pool, this pool will manage all connections to the broker +/// +/// NOTE: calling this function cannot fail on its own, you have to call pool.get().await for it to fail +#[must_use] +pub(crate) fn get_connection_pool(connection_details: &BrokerSettings) -> Pool { + let mut args = OpenConnectionArguments::new( + &connection_details.host, + connection_details.port, + &connection_details.username, + connection_details.password.expose_secret(), + ); + args.virtual_host("/"); + + let config = Config::new_with_con_args(args); + config.create_pool() +} + +/// This is our initial verification step. We will make sure that we can connect and that the exchanges/queues are set up. +/// +/// # Errors +/// - Errors if can't connect to server or has invalid permissions +pub(crate) async fn verify_connection_pool( + pool: &Pool, + queue_name_src: &str, +) -> Result<(), String> { + let connection = pool + .get() + .await + .map_err(|_| "Couldn't connect to broker, check your credentials.".to_string())?; + + let channel = get_channel(&connection) + .await + .map_err(|_| "Couldn't make verification channel")?; + + make_exchange(&channel).await.map_err(|_| { + "Couldn't make the INTERSECT exchange or confirm that it exists.".to_string() + })?; + + // we'll use a persistent queue named "proxy-http-client", as there should only be one proxy-http-server deployment per System + // TODO - note that we should probably name queues larger than 127 characters with a hashed key + let Some((queue_name, _, _)) = channel + .queue_declare(QueueDeclareArguments::durable_client_named(queue_name_src)) + .await + .map_err(|_| format!("Couldn't declare the {queue_name_src} queue"))? + else { + // unlikely this pops + return Err("didn't get correct args back from queue declaration".to_string()); + }; + + // listen for every single message on the exchange, we must do this due to the way userspace messages work + channel + .queue_bind(QueueBindArguments::new( + &queue_name, + INTERSECT_MESSAGE_EXCHANGE, + "#", + )) + .await + .map_err(|_| { + format!( + "Couldn't bind the {} exchange to the {} queue", + INTERSECT_MESSAGE_EXCHANGE, &queue_name + ) + })?; + channel + .close() + .await + .map_err(|_| "Couldn't close the setup channel".to_string())?; + + Ok(()) +} + +/// open a channel on the provided connection +/// +/// Returns: +/// - the channel +/// +/// # Errors +/// - Errors if failure to communicate with broker server +pub(crate) async fn get_channel(connection: &Connection) -> Result { + let channel = connection.open_channel(None).await?; + channel.register_callback(DefaultChannelCallback).await?; + Ok(channel) +} + +/// logic for declaring the INTERSECT exchange - need to do this in case no services/systems have declared it +/// +/// Returns: +/// - result of whether or not the exchange declaration was successful +/// +/// # Errors +/// - Errors if failure to communicate with broker server +async fn make_exchange(channel: &Channel) -> Result<(), amqprs::error::Error> { + channel + .exchange_declare( + ExchangeDeclareArguments::new(INTERSECT_MESSAGE_EXCHANGE, "topic") + .durable(true) + .finish(), + ) + .await +} diff --git a/shared-deps/src/protocols/interfaces.rs b/shared-deps/src/protocols/interfaces.rs new file mode 100644 index 0000000..5acf912 --- /dev/null +++ b/shared-deps/src/protocols/interfaces.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; + +use tokio::sync::oneshot::Receiver; + +/// Trait which should be implemented by the application to handle a formatted message, ready to send to a server or clients +pub trait HttpBroadcast { + /// Return true if we can consider the event to be successfully "published" + /// note that this does not *have* to have an asynchronous internal implementation, it should just allow for one + fn publish_event_to_http( + &self, + event: String, + ) -> impl std::future::Future + Send; +} + +/// Trait which determines how to publish a message. Should usually show up as a reaction to receiving an HTTP event or request. +/// Note that whatever implements `PublishProtoHandler` should generally implement Clone as well. +pub trait PublishProtoHandler { + /// this is meant to verify errors in the message before publishing + /// + /// # Errors + /// - return an error message if message verification failed. + fn preverify_publish(&self, topic: &str) -> Result<(), String>; + /// the assumption is that once this function is called, all faults lie in the broker (and not the parameters) + fn publish_message( + &self, + topic: &str, + data: String, + ) -> impl std::future::Future> + Send; +} + +/// Trait which determines how to subscribe to a message. Usually runs in its own thread and uses an [`HttpBroadcast`] to send the message over an HTTP channel. +pub trait SubscribeProtoHandler { + /// this should start a subscribe loop in a [`tokio::spawn`] thread, and return the [`tokio::task::JoinHandle`] . + /// this consumes the `SubscribeProtoHandler` itself after being called. + fn begin_subscribe_loop( + self, + config_topic: String, + broadcaster: Arc, + killswitch: Receiver<()>, + ) -> tokio::task::JoinHandle<()>; +} diff --git a/shared-deps/src/protocols/mod.rs b/shared-deps/src/protocols/mod.rs index 3c5bbbc..b28f2da 100644 --- a/shared-deps/src/protocols/mod.rs +++ b/shared-deps/src/protocols/mod.rs @@ -1 +1,4 @@ pub mod amqp; +pub mod interfaces; +pub mod mqtt; +pub mod proxy; diff --git a/shared-deps/src/protocols/mqtt/init.rs b/shared-deps/src/protocols/mqtt/init.rs new file mode 100644 index 0000000..20a5e9b --- /dev/null +++ b/shared-deps/src/protocols/mqtt/init.rs @@ -0,0 +1,76 @@ +use std::time::Duration; + +use rumqttc::{AsyncClient, MqttOptions}; +use secrecy::ExposeSecret; + +use crate::{ + configuration::BrokerSettings, + protocols::mqtt::{ + publish::MqttPublishProtoHandler, subscribe::MqttSubscribeProtoHandler, + utils::subscribe_all, + }, +}; + +/// Sets up the MQTT proto handlers, and verifies that we can connect to the MQTT broker. +/// +/// # Errors +/// - If we can't make an initial connection to the broker, return Err. +pub async fn init_mqtt_proto_handlers( + broker_config: &BrokerSettings, + application_name: &'static str, +) -> Result<(MqttPublishProtoHandler, MqttSubscribeProtoHandler), String> { + let mut mqtt_options = MqttOptions::new( + application_name, + broker_config.host.clone(), + broker_config.port, + ); + mqtt_options.set_credentials( + broker_config.username.clone(), + broker_config.password.expose_secret(), + ); + // TODO may want to handle message sizes >= 4 GiB + mqtt_options.set_max_packet_size(1 << 32, 1 << 32); + mqtt_options.set_clean_session(false); + // TODO - implement manual acks + // mqtt_options.set_manual_acks(true); + mqtt_options.set_keep_alive(Duration::from_secs(60)); + + let (mqtt_client, mut mqtt_event_loop) = AsyncClient::new(mqtt_options, 256); + + // verify initial connection + match mqtt_event_loop.poll().await { + Ok(event) => { + match event { + rumqttc::Event::Incoming(packet) => { + match packet { + rumqttc::Packet::ConnAck(conn_ack) => { + // expected + tracing::info!("Connected -- {conn_ack:?}"); + } + p => { + // unexpected incoming event, however it shouldn't be fatal + tracing::warn!("unexpected initial incoming event -- {p:?}"); + } + } + } + rumqttc::Event::Outgoing(outgoing) => { + // all of these are unexpected, this should probably not be seen but shouldn't be fatal + tracing::warn!("unexpected initial outgoing event -- {outgoing:?}"); + } + } + } + Err(err) => { + // should be fatal + return Err(err.to_string()); + } + } + + // listen for every single message on the exchange, we must do this due to the way userspace messages work + subscribe_all(&mqtt_client).await?; + + let publish_handler = MqttPublishProtoHandler::new(application_name, mqtt_client.clone()); + let subscribe_handler = + MqttSubscribeProtoHandler::new(application_name, mqtt_client, mqtt_event_loop); + + Ok((publish_handler, subscribe_handler)) +} diff --git a/shared-deps/src/protocols/mqtt/mod.rs b/shared-deps/src/protocols/mqtt/mod.rs new file mode 100644 index 0000000..4397e20 --- /dev/null +++ b/shared-deps/src/protocols/mqtt/mod.rs @@ -0,0 +1,4 @@ +pub mod init; +pub mod publish; +pub mod subscribe; +pub mod utils; diff --git a/shared-deps/src/protocols/mqtt/publish.rs b/shared-deps/src/protocols/mqtt/publish.rs new file mode 100644 index 0000000..11ebb12 --- /dev/null +++ b/shared-deps/src/protocols/mqtt/publish.rs @@ -0,0 +1,49 @@ +use rumqttc::{AsyncClient, QoS}; + +use crate::protocols::interfaces::PublishProtoHandler; +use crate::protocols::proxy::is_routing_key_compliant; + +#[derive(Clone)] +pub struct MqttPublishProtoHandler { + mqtt_client: AsyncClient, + /// `application_name` is used for the hardcoded queue name and for debugging purposes + application_name: &'static str, +} + +impl std::fmt::Debug for MqttPublishProtoHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MqttPublishProtoHandler") + .field("application_name", &self.application_name) + .finish() + } +} + +impl MqttPublishProtoHandler { + #[must_use] + pub fn new(application_name: &'static str, mqtt_client: AsyncClient) -> Self { + MqttPublishProtoHandler { + mqtt_client, + application_name, + } + } +} + +impl PublishProtoHandler for MqttPublishProtoHandler { + fn preverify_publish(&self, topic: &str) -> Result<(), String> { + if !is_routing_key_compliant(topic) { + return Err(format!( + "'{topic}' does not meet the INTERSECT proxy-app routing key specification." + )); + } + Ok(()) + } + async fn publish_message(&self, topic: &str, data: String) -> Result<(), &str> { + self.mqtt_client + .publish(topic, QoS::AtLeastOnce, true, data.into_bytes()) + .await + .map_err(|err| { + tracing::error!("Could not publish message -- {err}"); + "could not publish message" + }) + } +} diff --git a/shared-deps/src/protocols/mqtt/subscribe.rs b/shared-deps/src/protocols/mqtt/subscribe.rs new file mode 100644 index 0000000..57b5e83 --- /dev/null +++ b/shared-deps/src/protocols/mqtt/subscribe.rs @@ -0,0 +1,193 @@ +use std::sync::Arc; + +use tokio::sync::oneshot::Receiver; + +use rumqttc::{AsyncClient, EventLoop, Publish}; + +use crate::{ + intersect_messaging::{make_eventsource_data, should_message_passthrough}, + protocols::interfaces::{HttpBroadcast, SubscribeProtoHandler}, + protocols::mqtt::utils::mqtt_topic_to_proxy_topic, +}; + +pub struct MqttSubscribeProtoHandler { + mqtt_client: AsyncClient, + mqtt_event_loop: EventLoop, + /// `application_name` is used for the hardcoded queue name and for debugging purposes + application_name: &'static str, +} + +impl std::fmt::Debug for MqttSubscribeProtoHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MqttSubscribeProtoHandler") + .field("application_name", &self.application_name) + .finish() + } +} + +impl MqttSubscribeProtoHandler { + pub fn new( + application_name: &'static str, + mqtt_client: AsyncClient, + mqtt_event_loop: EventLoop, + ) -> Self { + MqttSubscribeProtoHandler { + mqtt_client, + mqtt_event_loop, + application_name, + } + } +} + +impl SubscribeProtoHandler for MqttSubscribeProtoHandler { + fn begin_subscribe_loop( + self, + config_topic: String, + broadcaster: Arc, + killswitch: Receiver<()>, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + broker_consumer_loop_inner( + self.mqtt_client, + self.mqtt_event_loop, + config_topic, + broadcaster, + killswitch, + ) + .await; + }) + } +} + +async fn broker_consumer_loop_inner( + mqtt_client: AsyncClient, + mut mqtt_event_loop: EventLoop, + config_topic: String, + broadcaster: Arc, + // calls recv() once the EventSource loop or HTTP server catches an OS signal + mut killswitch: Receiver<()>, +) { + let still_connected = loop { + tokio::select! { + _ = &mut killswitch => { + break true; + }, + event_loop_pool = mqtt_event_loop.poll() => { + match event_loop_pool { + Ok(event) => { + match event { + rumqttc::Event::Incoming(packet) => { + match packet { + rumqttc::Packet::Publish(publish_packet) => { + consume_message( + publish_packet, + &mqtt_client, + &config_topic, + &broadcaster, + &mut killswitch, + ).await; + }, + packet => { + tracing::debug!("Incoming packet -- {packet:?}"); + }, + } + }, + rumqttc::Event::Outgoing(outgoing) => { + tracing::debug!("Outgoing packet -- {outgoing:?}"); + }, + } + }, + Err(conn_err) => { + tracing::warn!("Consumer lost connection to broker, retrying in 5 seconds. Specifics: {conn_err}"); + let retry_wait = tokio::time::sleep(std::time::Duration::from_secs(5)); + tokio::pin!(retry_wait); + tokio::select! { + _ = &mut killswitch => { + // HTTP component has been shut down, no point in waiting + tracing::warn!("Shutting down while attempting to reconnect to broker"); + break false; + }, + () = &mut retry_wait => { + // no-op + }, + } + }, + } + }, + } + }; + if still_connected { + let _ = mqtt_client.disconnect().await; + } +} + +async fn consume_message( + publish_packet: Publish, + // TODO - the client will eventually manually ACK messages + _mqtt_client: &AsyncClient, + config_topic: &str, + broadcaster: &Arc, + killswitch: &mut Receiver<()>, +) { + let mut should_ack = true; + if publish_packet.dup { + tracing::warn!("message was redelivered"); + } + + match String::from_utf8(publish_packet.payload.to_vec()) { + Ok(utf8_data) => { + tracing::debug!("got raw message data from broker: {}", &utf8_data); + match should_message_passthrough(&utf8_data, config_topic) { + Err(e) => { + // This should generally not be seen, so log it as a warning + tracing::warn!(error = ?e, "message is valid UTF-8 but not INTERSECT JSON"); + } + Ok(false) => { + tracing::debug!( + "message source is not from this system, will not broadcast it" + ); + } + Ok(true) => { + let topic = mqtt_topic_to_proxy_topic(&publish_packet.topic); + match make_eventsource_data(&topic, &utf8_data) { + Err(_) => {} + Ok(event) => { + tracing::debug!( + "Consume message {}, data: {}", + publish_packet.pkid, + event + ); + // TODO handle this better later, see broadcast() documentation for details. + tokio::select! { + _ = killswitch => { + // WARNING: in the client implementation, this may happen while waiting on a response, resulting in us rejecting a message we actually passed through successfully + // this would only happen if we actually call publish_event_to_http(), if the killswitch was toggled before reaching here we will always do the killswitch branch. + tracing::warn!("Got message from broker but did not send it over HTTP, the message will be rejected."); + should_ack = false; + }, + http_result = broadcaster.publish_event_to_http(event) => { + if !http_result { + tracing::warn!("Some clients may not have gotten a message, the message will be rejected."); + should_ack = false; + } + }, + } + } + } + } + } + } + Err(e) => { + // this should generally not be seen, so log as an error + tracing::error!(error = ?e, "message data is not UTF-8, cannot be forwarded over SSE"); + } + } + + // TODO implement acknowledgments + if !should_ack { + tracing::warn!( + "We SHOULD be rejecting message {}, ACK is not implemented yet though", + publish_packet.pkid + ); + } +} diff --git a/shared-deps/src/protocols/mqtt/utils.rs b/shared-deps/src/protocols/mqtt/utils.rs new file mode 100644 index 0000000..a2e66ed --- /dev/null +++ b/shared-deps/src/protocols/mqtt/utils.rs @@ -0,0 +1,15 @@ +use rumqttc::{AsyncClient, QoS}; + +pub(crate) async fn subscribe_all(mqtt_client: &AsyncClient) -> Result<(), String> { + mqtt_client + .subscribe("#", QoS::AtLeastOnce) + .await + .map_err(|e| e.to_string()) +} + +/// convert an MQTT topic representation to the proxy's expected topic representation (which mirrors AMQP) +/// +/// this should generally happen at some point in the subscribe loop +pub(crate) fn mqtt_topic_to_proxy_topic(topic: &str) -> String { + topic.replace('/', ".") +} diff --git a/shared-deps/src/protocols/proxy.rs b/shared-deps/src/protocols/proxy.rs new file mode 100644 index 0000000..9a20d98 --- /dev/null +++ b/shared-deps/src/protocols/proxy.rs @@ -0,0 +1,9 @@ +/// Make sure that the routing key matches the format expected to be used throughout the proxy app. Note that this format is expected to handle multiple protocols; +/// for example, we allow for a proxy-http-client and a proxy-http-server instance to be talking to brokers with different protocols. +/// +/// Also note that we do not permit publishing on wildcards. +#[must_use] +pub fn is_routing_key_compliant(key: &str) -> bool { + !key.chars() + .any(|c| !c.is_alphanumeric() && c != '-' && c != '_' && c != '.') +}