Skip to content
16 changes: 15 additions & 1 deletion rsworkspace/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rsworkspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ clap = { version = "=4.6.0", features = ["derive"] }
opentelemetry = "=0.31.0"
opentelemetry-appender-tracing = "=0.31.1"
opentelemetry-otlp = "=0.31.1"
opentelemetry-semantic-conventions = { version = "=0.31.0", features = ["semconv_experimental"] }
opentelemetry_sdk = "=0.31.0"
tracing = "=0.1.44"
tracing-opentelemetry = "=0.32.1"
Expand Down
3 changes: 1 addition & 2 deletions rsworkspace/crates/acp-nats-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ clap = { workspace = true, features = ["env"] }
futures-util = { workspace = true, features = ["sink"] }
opentelemetry = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "net", "sync", "io-util"] }
tower-http = { workspace = true }
tracing = { workspace = true }
trogon-nats = { workspace = true }
trogon-std = { workspace = true }
trogon-std = { workspace = true, features = ["telemetry-http"] }

[dev-dependencies]
serde_json = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions rsworkspace/crates/acp-nats-ws/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use upgrade::{ConnectionRequest, UpgradeState};

#[cfg(not(coverage))]
use {
acp_nats::nats, acp_telemetry::ServiceName, clap::Parser, std::net::SocketAddr,
tower_http::trace::TraceLayer, tracing::error, trogon_std::env::SystemEnv,
trogon_std::fs::SystemFs,
acp_nats::nats, acp_telemetry::ServiceName, clap::Parser, std::net::SocketAddr, tracing::error,
trogon_std::env::SystemEnv, trogon_std::fs::SystemFs,
Comment thread
cursor[bot] marked this conversation as resolved.
};

#[cfg(not(coverage))]
Expand Down Expand Up @@ -47,10 +46,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
shutdown_tx: shutdown_tx.clone(),
};

let app = axum::Router::new()
.route("/ws", axum::routing::get(upgrade::handle))
.layer(TraceLayer::new_for_http())
.with_state(state);
let app = trogon_std::telemetry::http::instrument_router(
axum::Router::new()
.route("/ws", axum::routing::get(upgrade::handle))
.with_state(state),
);

let addr = SocketAddr::from((ws_config.host, ws_config.port));
let listener = tokio::net::TcpListener::bind(addr).await?;
Expand Down
2 changes: 1 addition & 1 deletion rsworkspace/crates/trogon-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trogon-source-linear = { workspace = true }
trogon-source-notion = { workspace = true }
trogon-source-slack = { workspace = true }
trogon-source-telegram = { workspace = true }
trogon-std = { workspace = true, features = ["clap"] }
trogon-std = { workspace = true, features = ["clap", "telemetry-http"] }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion rsworkspace/crates/trogon-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

let app = http::mount_sources(resolved, publisher);
let app =
trogon_std::telemetry::http::instrument_router(http::mount_sources(resolved, publisher));

let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr).await?;
Expand Down
1 change: 1 addition & 0 deletions rsworkspace/crates/trogon-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-nats = { workspace = true, features = ["ring", "nkeys", "jetstream", "kv",
bytes = { workspace = true }
futures = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["time", "io-util"] }
Expand Down
1 change: 1 addition & 0 deletions rsworkspace/crates/trogon-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub mod lease;
pub mod messaging;
pub mod nats_token;
pub mod subject_token_violation;
pub(crate) mod telemetry;
pub(crate) mod token;

#[cfg(any(test, feature = "test-support"))]
Expand Down
135 changes: 123 additions & 12 deletions rsworkspace/crates/trogon-nats/src/messaging.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::client::{FlushClient, PublishClient, RequestClient};
use crate::telemetry::messaging::{
MessagingError, MessagingOperation, set_client_operation_span_attributes, set_span_error,
};
use async_nats::header::HeaderMap;
use opentelemetry::propagation::Injector;
use serde::{Serialize, de::DeserializeOwned};
use std::time::Duration;
use tracing::Span;
use tracing::{Span, instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::constants::{DEFAULT_TIMEOUT, REQ_ID_HEADER};
Expand Down Expand Up @@ -35,6 +38,7 @@ pub fn build_request_headers() -> HeaderMap {
headers
}

#[instrument(name = "nats.request", skip(client, request), fields(subject = %subject))]
pub async fn request_with_timeout<N: RequestClient, Req, Res>(
client: &N,
subject: &str,
Expand All @@ -45,28 +49,45 @@ where
Req: Serialize,
Res: DeserializeOwned,
{
let payload = serde_json::to_vec(request).map_err(NatsError::Serialize)?;
let span = Span::current();
set_client_operation_span_attributes(&span, MessagingOperation::Request, subject);

let payload = serde_json::to_vec(request).map_err(|error| {
set_span_error(&span, MessagingError::Serialize);
NatsError::Serialize(error)
})?;
let headers = build_request_headers();

let response = tokio::time::timeout(
timeout,
client.request_with_headers(subject.to_string(), headers, payload.into()),
)
.await
.map_err(|_| NatsError::Timeout {
subject: subject.to_string(),
.map_err(|_| {
set_span_error(&span, MessagingError::Timeout);
NatsError::Timeout {
subject: subject.to_string(),
}
})?
.map_err(|e| NatsError::Request {
subject: subject.to_string(),
error: e.to_string(),
.map_err(|error| {
set_span_error(&span, MessagingError::Request);
NatsError::Request {
subject: subject.to_string(),
error: error.to_string(),
}
})?;

let payload_str = String::from_utf8_lossy(&response.payload);
tracing::debug!(payload = %payload_str, "Received NATS response");

serde_json::from_slice(&response.payload).map_err(|e| {
tracing::error!(payload = %payload_str, error = %e, "Failed to deserialize NATS response");
NatsError::Deserialize(e)
serde_json::from_slice(&response.payload).map_err(|error| {
set_span_error(&span, MessagingError::Deserialize);
tracing::error!(
error = %error,
subject = %subject,
"Failed to deserialize NATS response"
);
Comment thread
yordis marked this conversation as resolved.
NatsError::Deserialize(error)
})
}

Expand Down Expand Up @@ -246,6 +267,7 @@ impl PublishOptionsBuilder {
}
}

#[instrument(name = "nats.publish", skip(client, request, options), fields(subject = %subject))]
pub async fn publish<N: PublishClient + FlushClient, Req>(
client: &N,
subject: &str,
Expand All @@ -255,7 +277,13 @@ pub async fn publish<N: PublishClient + FlushClient, Req>(
where
Req: Serialize,
{
let payload = serde_json::to_vec(request).map_err(NatsError::Serialize)?;
let span = Span::current();
set_client_operation_span_attributes(&span, MessagingOperation::Publish, subject);

let payload = serde_json::to_vec(request).map_err(|error| {
set_span_error(&span, MessagingError::Serialize);
NatsError::Serialize(error)
})?;
let headers = headers_with_trace_context();

options
Expand All @@ -274,7 +302,10 @@ where
"publish",
subject,
)
.await?;
.await
.inspect_err(|_error| {
set_span_error(&span, MessagingError::PublishOperation);
})?;

let Some(flush_policy) = options.flush else {
return Ok(());
Expand All @@ -296,6 +327,9 @@ where
subject,
)
.await
.inspect_err(|_error| {
set_span_error(&span, MessagingError::FlushOperation);
})
}

#[derive(Debug)]
Expand Down Expand Up @@ -388,6 +422,22 @@ mod tests {
result: String,
}

#[cfg(feature = "test-support")]
struct FailingSerialize;

#[cfg(feature = "test-support")]
impl serde::Serialize for FailingSerialize {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
Err(serde::ser::Error::custom(format!(
"{} cannot be serialized",
std::any::type_name::<S>()
)))
}
}

#[test]
fn test_retry_policy_no_retries() {
let policy = RetryPolicy::no_retries();
Expand Down Expand Up @@ -580,6 +630,17 @@ mod tests {
}
}

#[tokio::test]
#[cfg(feature = "test-support")]
async fn test_request_serialize_error() {
let mock = AdvancedMockNatsClient::new();

let result: Result<TestResponse, NatsError> =
request(&mock, "test.subject", &FailingSerialize).await;

assert!(matches!(result, Err(NatsError::Serialize(_))));
}

#[tokio::test]
#[cfg(feature = "test-support")]
async fn test_publish_simple() {
Expand All @@ -594,6 +655,23 @@ mod tests {
assert_eq!(mock.published_messages(), vec!["test.subject"]);
}

#[tokio::test]
#[cfg(feature = "test-support")]
async fn test_publish_serialize_error() {
let mock = AdvancedMockNatsClient::new();

let result = publish(
&mock,
"test.subject",
&FailingSerialize,
PublishOptions::simple(),
)
.await;

assert!(matches!(result, Err(NatsError::Serialize(_))));
assert!(mock.published_messages().is_empty());
}

#[tokio::test]
#[cfg(feature = "test-support")]
async fn test_publish_with_flush() {
Expand All @@ -612,6 +690,39 @@ mod tests {
assert_eq!(mock.published_messages(), vec!["test.subject"]);
}

#[tokio::test]
#[cfg(feature = "test-support")]
async fn test_publish_returns_error_when_publish_fails() {
let mock = AdvancedMockNatsClient::new();
mock.fail_next_publish();
let data = TestRequest {
message: "test".to_string(),
};

let result = publish(&mock, "test.subject", &data, PublishOptions::simple()).await;

assert!(matches!(result, Err(NatsError::PublishOperation(_))));
assert!(mock.published_messages().is_empty());
}

#[tokio::test]
#[cfg(feature = "test-support")]
async fn test_publish_returns_error_when_flush_fails() {
let mock = AdvancedMockNatsClient::new();
mock.fail_next_flush();
let data = TestRequest {
message: "test".to_string(),
};
let options = PublishOptions::builder()
.flush_policy(FlushPolicy::no_retries())
.build();

let result = publish(&mock, "test.subject", &data, options).await;

assert!(matches!(result, Err(NatsError::PublishOperation(_))));
assert_eq!(mock.published_messages(), vec!["test.subject"]);
}

#[test]
fn test_publish_operation_error_display() {
let err = PublishOperationError("test error".to_string());
Expand Down
Loading
Loading