diff --git a/rsworkspace/Cargo.lock b/rsworkspace/Cargo.lock index 0da57058f..b5567dd6d 100644 --- a/rsworkspace/Cargo.lock +++ b/rsworkspace/Cargo.lock @@ -3337,7 +3337,9 @@ dependencies = [ "sha2", "testcontainers-modules", "tokio", + "tower", "tracing", + "tracing-subscriber", "trogon-nats", "trogon-std", ] diff --git a/rsworkspace/crates/trogon-source-linear/Cargo.toml b/rsworkspace/crates/trogon-source-linear/Cargo.toml index 18daf6343..50ee2c4b2 100644 --- a/rsworkspace/crates/trogon-source-linear/Cargo.toml +++ b/rsworkspace/crates/trogon-source-linear/Cargo.toml @@ -26,6 +26,9 @@ trogon-std = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tower = "0.5" +tracing-subscriber = { workspace = true } +trogon-nats = { workspace = true, features = ["test-support"] } trogon-std = { workspace = true, features = ["test-support"] } testcontainers-modules = { version = "0.8", features = ["nats"] } reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } diff --git a/rsworkspace/crates/trogon-source-linear/src/constants.rs b/rsworkspace/crates/trogon-source-linear/src/constants.rs index 6ebea1a33..dc2ab75ef 100644 --- a/rsworkspace/crates/trogon-source-linear/src/constants.rs +++ b/rsworkspace/crates/trogon-source-linear/src/constants.rs @@ -9,3 +9,5 @@ pub const DEFAULT_TIMESTAMP_TOLERANCE_SECS: u64 = 60; /// Default JetStream ACK timeout: 10 seconds. pub const DEFAULT_NATS_ACK_TIMEOUT_MS: u64 = 10_000; pub const DEFAULT_NATS_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); + +pub const NATS_HEADER_REJECT_REASON: &str = "X-Linear-Reject-Reason"; diff --git a/rsworkspace/crates/trogon-source-linear/src/lib.rs b/rsworkspace/crates/trogon-source-linear/src/lib.rs index 4679e5ece..9f8c46cd4 100644 --- a/rsworkspace/crates/trogon-source-linear/src/lib.rs +++ b/rsworkspace/crates/trogon-source-linear/src/lib.rs @@ -17,6 +17,10 @@ //! - **Headers**: `Nats-Msg-Id` (set to Linear's `webhookId` for dedup) //! - **Payload**: raw JSON body from Linear //! +//! Payloads that pass signature verification but fail validation (invalid JSON, +//! missing/invalid `type` or `action`, stale timestamp) are published to +//! `{LINEAR_SUBJECT_PREFIX}.unroutable` with an `X-Linear-Reject-Reason` header. +//! //! ## Configuration (env vars) //! //! | Variable | Default | Description | diff --git a/rsworkspace/crates/trogon-source-linear/src/server.rs b/rsworkspace/crates/trogon-source-linear/src/server.rs index 01498e4bf..4c156e31e 100644 --- a/rsworkspace/crates/trogon-source-linear/src/server.rs +++ b/rsworkspace/crates/trogon-source-linear/src/server.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use std::time::Duration; use crate::config::LinearConfig; +use crate::constants::NATS_HEADER_REJECT_REASON; use crate::signature; #[cfg(not(coverage))] use async_nats::jetstream::context::CreateStreamError; @@ -52,6 +53,31 @@ impl From for ServeError { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RejectReason { + InvalidJson, + MissingWebhookTimestamp, + StaleWebhookTimestamp, + MissingType, + InvalidType, + MissingAction, + InvalidAction, +} + +impl RejectReason { + fn as_str(self) -> &'static str { + match self { + Self::InvalidJson => "invalid_json", + Self::MissingWebhookTimestamp => "missing_webhook_timestamp", + Self::StaleWebhookTimestamp => "stale_webhook_timestamp", + Self::MissingType => "missing_type", + Self::InvalidType => "invalid_type", + Self::MissingAction => "missing_action", + Self::InvalidAction => "invalid_action", + } + } +} + fn outcome_to_status(outcome: PublishOutcome) -> StatusCode { if outcome.is_ok() { info!("Published Linear event to NATS"); @@ -62,6 +88,26 @@ fn outcome_to_status(outcome: PublishOutcome) -> StatusCode } } +async fn publish_unroutable( + js: &P, + subject_prefix: &NatsToken, + reason: RejectReason, + body: Bytes, + ack_timeout: Duration, +) -> StatusCode { + let subject = format!("{subject_prefix}.unroutable"); + let mut headers = async_nats::HeaderMap::new(); + headers.insert(NATS_HEADER_REJECT_REASON, reason.as_str()); + + let outcome = publish_event(js, subject, headers, body, ack_timeout).await; + if outcome.is_ok() { + StatusCode::BAD_REQUEST + } else { + outcome.log_on_error("linear.unroutable"); + StatusCode::INTERNAL_SERVER_ERROR + } +} + #[derive(Clone)] struct AppState { js: P, @@ -173,14 +219,28 @@ async fn handle_webhook_inner( Ok(v) => v, Err(e) => { warn!(error = %e, "Failed to parse Linear webhook body as JSON"); - return StatusCode::BAD_REQUEST; + return publish_unroutable( + &state.js, + &state.subject_prefix, + RejectReason::InvalidJson, + body, + state.nats_ack_timeout, + ) + .await; } }; if let Some(tolerance) = state.timestamp_tolerance { let Some(ts_ms) = parsed.get("webhookTimestamp").and_then(|v| v.as_u64()) else { warn!("Missing or malformed 'webhookTimestamp' field"); - return StatusCode::BAD_REQUEST; + return publish_unroutable( + &state.js, + &state.subject_prefix, + RejectReason::MissingWebhookTimestamp, + body, + state.nats_ack_timeout, + ) + .await; }; let now_ms = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -193,26 +253,61 @@ async fn handle_webhook_inner( tolerance_ms = tolerance.as_millis() as u64, "Stale webhookTimestamp — potential replay attack" ); - return StatusCode::BAD_REQUEST; + return publish_unroutable( + &state.js, + &state.subject_prefix, + RejectReason::StaleWebhookTimestamp, + body, + state.nats_ack_timeout, + ) + .await; } } let Some(raw_type) = parsed.get("type").and_then(|v| v.as_str()) else { warn!("Missing 'type' field in Linear webhook payload"); - return StatusCode::BAD_REQUEST; + return publish_unroutable( + &state.js, + &state.subject_prefix, + RejectReason::MissingType, + body, + state.nats_ack_timeout, + ) + .await; }; let Ok(event_type) = NatsToken::new(raw_type) else { warn!("Invalid 'type' field in Linear webhook payload"); - return StatusCode::BAD_REQUEST; + return publish_unroutable( + &state.js, + &state.subject_prefix, + RejectReason::InvalidType, + body, + state.nats_ack_timeout, + ) + .await; }; let Some(raw_action) = parsed.get("action").and_then(|v| v.as_str()) else { warn!("Missing 'action' field in Linear webhook payload"); - return StatusCode::BAD_REQUEST; + return publish_unroutable( + &state.js, + &state.subject_prefix, + RejectReason::MissingAction, + body, + state.nats_ack_timeout, + ) + .await; }; let Ok(action) = NatsToken::new(raw_action) else { warn!("Invalid 'action' field in Linear webhook payload"); - return StatusCode::BAD_REQUEST; + return publish_unroutable( + &state.js, + &state.subject_prefix, + RejectReason::InvalidAction, + body, + state.nats_ack_timeout, + ) + .await; }; let webhook_id = parsed @@ -243,3 +338,400 @@ async fn handle_webhook_inner( outcome_to_status(outcome) } + +#[cfg(test)] +mod tests { + use super::*; + use axum::body::Body; + use axum::http::Request; + use hmac::{Hmac, Mac}; + use sha2::Sha256; + use tower::ServiceExt; + use tracing_subscriber::util::SubscriberInitExt; + use trogon_nats::jetstream::MockJetStreamPublisher; + + type HmacSha256 = Hmac; + + const TEST_SECRET: &str = "test-secret"; + + fn compute_sig(secret: &str, body: &[u8]) -> String { + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("valid key length"); + mac.update(body); + hex::encode(mac.finalize().into_bytes()) + } + + fn test_config() -> LinearConfig { + LinearConfig { + webhook_secret: TEST_SECRET.to_string(), + port: 0, + subject_prefix: NatsToken::new("linear").expect("valid token"), + stream_name: NatsToken::new("LINEAR").expect("valid token"), + stream_max_age: Duration::from_secs(3600), + timestamp_tolerance: Some(Duration::from_secs(60)), + nats_ack_timeout: Duration::from_secs(10), + nats: trogon_nats::NatsConfig::from_env(&trogon_std::env::InMemoryEnv::new()), + } + } + + fn tracing_guard() -> tracing::subscriber::DefaultGuard { + tracing_subscriber::fmt().with_test_writer().set_default() + } + + fn mock_app(publisher: MockJetStreamPublisher) -> Router { + router(publisher, &test_config()) + } + + fn webhook_request(body: &[u8], sig: Option<&str>) -> Request { + let mut builder = Request::builder().method("POST").uri("/webhook"); + + if let Some(s) = sig { + builder = builder.header("linear-signature", s); + } + + builder + .body(Body::from(body.to_vec())) + .expect("valid request") + } + + fn valid_body() -> Vec { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time after epoch") + .as_millis() as u64; + + serde_json::to_vec(&serde_json::json!({ + "type": "Issue", + "action": "create", + "webhookId": "wh_test_123", + "webhookTimestamp": now_ms, + "data": {} + })) + .expect("valid JSON") + } + + fn assert_unroutable(publisher: &MockJetStreamPublisher, expected_reason: &str) { + let messages = publisher.published_messages(); + assert_eq!(messages.len(), 1, "expected exactly one unroutable publish"); + assert_eq!(messages[0].subject, "linear.unroutable"); + assert_eq!( + messages[0] + .headers + .get(NATS_HEADER_REJECT_REASON) + .map(|v| v.as_str()), + Some(expected_reason), + ); + } + + fn assert_no_publishes(publisher: &MockJetStreamPublisher) { + assert!( + publisher.published_messages().is_empty(), + "expected no publishes" + ); + } + + #[tokio::test] + async fn valid_webhook_returns_200() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let body = valid_body(); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::OK); + let messages = publisher.published_messages(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].subject, "linear.Issue.create"); + } + + #[tokio::test] + async fn missing_signature_returns_401() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let body = valid_body(); + + let resp = app + .oneshot(webhook_request(&body, None)) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + assert_no_publishes(&publisher); + } + + #[tokio::test] + async fn invalid_signature_returns_401() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let body = valid_body(); + + let resp = app + .oneshot(webhook_request(&body, Some("bad-sig"))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + assert_no_publishes(&publisher); + } + + #[tokio::test] + async fn invalid_json_publishes_unroutable() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let body = b"not json"; + let sig = compute_sig(TEST_SECRET, body); + + let resp = app + .oneshot(webhook_request(body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_unroutable(&publisher, "invalid_json"); + } + + #[tokio::test] + async fn missing_webhook_timestamp_publishes_unroutable() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let body = serde_json::to_vec(&serde_json::json!({ + "type": "Issue", + "action": "create" + })) + .expect("valid JSON"); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_unroutable(&publisher, "missing_webhook_timestamp"); + } + + #[tokio::test] + async fn stale_webhook_timestamp_publishes_unroutable() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let body = serde_json::to_vec(&serde_json::json!({ + "type": "Issue", + "action": "create", + "webhookTimestamp": 1_000_000_000_000_u64 + })) + .expect("valid JSON"); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_unroutable(&publisher, "stale_webhook_timestamp"); + } + + #[tokio::test] + async fn missing_type_publishes_unroutable() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time after epoch") + .as_millis() as u64; + let body = serde_json::to_vec(&serde_json::json!({ + "action": "create", + "webhookTimestamp": now_ms + })) + .expect("valid JSON"); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_unroutable(&publisher, "missing_type"); + } + + #[tokio::test] + async fn invalid_type_publishes_unroutable() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time after epoch") + .as_millis() as u64; + let body = serde_json::to_vec(&serde_json::json!({ + "type": "has space", + "action": "create", + "webhookTimestamp": now_ms + })) + .expect("valid JSON"); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_unroutable(&publisher, "invalid_type"); + } + + #[tokio::test] + async fn missing_action_publishes_unroutable() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time after epoch") + .as_millis() as u64; + let body = serde_json::to_vec(&serde_json::json!({ + "type": "Issue", + "webhookTimestamp": now_ms + })) + .expect("valid JSON"); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_unroutable(&publisher, "missing_action"); + } + + #[tokio::test] + async fn invalid_action_publishes_unroutable() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let app = mock_app(publisher.clone()); + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time after epoch") + .as_millis() as u64; + let body = serde_json::to_vec(&serde_json::json!({ + "type": "Issue", + "action": "has.dot", + "webhookTimestamp": now_ms + })) + .expect("valid JSON"); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + assert_unroutable(&publisher, "invalid_action"); + } + + #[tokio::test] + async fn publish_failure_returns_500() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + publisher.fail_next_js_publish(); + let app = mock_app(publisher.clone()); + let body = valid_body(); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); + } + + #[cfg(not(coverage))] + #[test] + fn serve_error_display_and_source() { + use async_nats::jetstream::context::{CreateStreamError, CreateStreamErrorKind}; + + let io_err = ServeError::Io(std::io::Error::new( + std::io::ErrorKind::AddrInUse, + "port taken", + )); + assert_eq!( + io_err.to_string(), + "stream provisioning failed: port taken" + .replace("stream provisioning failed", "IO error") + ); + assert!(std::error::Error::source(&io_err).is_some()); + + let prov_err = + ServeError::Provision(CreateStreamError::new(CreateStreamErrorKind::TimedOut)); + let display = prov_err.to_string(); + assert!(display.contains("stream provisioning failed"), "{display}"); + assert!(std::error::Error::source(&prov_err).is_some()); + } + + #[cfg(not(coverage))] + #[test] + fn serve_error_from_io() { + let io = std::io::Error::other("boom"); + let err: ServeError = io.into(); + assert!(matches!(err, ServeError::Io(_))); + } + + #[tokio::test] + async fn dlq_publish_failure_returns_500() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + publisher.fail_next_js_publish(); + let app = mock_app(publisher.clone()); + let body = b"not json"; + let sig = compute_sig(TEST_SECRET, body); + + let resp = app + .oneshot(webhook_request(body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); + } + + #[tokio::test] + async fn timestamp_tolerance_disabled_skips_check() { + let _guard = tracing_guard(); + let publisher = MockJetStreamPublisher::new(); + let mut config = test_config(); + config.timestamp_tolerance = None; + let app = router(publisher.clone(), &config); + + let body = serde_json::to_vec(&serde_json::json!({ + "type": "Issue", + "action": "create", + "webhookId": "wh_no_ts", + "webhookTimestamp": 1_000_000_000_000_u64 + })) + .expect("valid JSON"); + let sig = compute_sig(TEST_SECRET, &body); + + let resp = app + .oneshot(webhook_request(&body, Some(&sig))) + .await + .expect("request should succeed"); + + assert_eq!(resp.status(), StatusCode::OK); + let messages = publisher.published_messages(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].subject, "linear.Issue.create"); + } +}