diff --git a/relay-server/src/endpoints/upload.rs b/relay-server/src/endpoints/upload.rs index 90b07c52820..0ac2eb06fbd 100644 --- a/relay-server/src/endpoints/upload.rs +++ b/relay-server/src/endpoints/upload.rs @@ -30,7 +30,9 @@ use crate::service::ServiceState; #[cfg(feature = "processing")] use crate::services::objectstore; use crate::services::projects::cache::Project; -use crate::services::upload::{self, ByteStream, SignedLocation}; +use crate::services::upload::{ + self, ByteStream, Final, LocationQueryParams, Provisional, SignedLocation, UploadLength, +}; use crate::services::upstream::UpstreamRequestError; use crate::utils::{ApiErrorResponse, MeteredStream}; use crate::utils::{BoundedStream, find_error_source, tus}; @@ -75,7 +77,6 @@ impl IntoResponse for Error { } let status = match self { - Error::Tus(tus::Error::DeferLengthNotAllowed) => StatusCode::FORBIDDEN, Error::Tus(_) => StatusCode::BAD_REQUEST, Error::Request(error) => return error.into_response(), Error::SendError(_) => StatusCode::INTERNAL_SERVER_ERROR, @@ -128,7 +129,7 @@ impl IntoResponse for Error { } } -impl IntoResponse for SignedLocation { +impl IntoResponse for SignedLocation { fn into_response(self) -> Response { let mut headers = tus::response_headers(); match self.into_header_value() { @@ -152,8 +153,7 @@ async fn handle_post( check_kill_switch(&state)?; relay_log::trace!("Validating headers"); - let upload_length = tus::validate_post_headers(&headers, meta.request_trust().is_trusted()) - .map_err(Error::from)?; + let upload_length = tus::validate_post_headers(&headers).map_err(Error::from)?; let config = state.config(); if upload_length.is_some_and(|len| len > config.max_upload_size()) { @@ -194,7 +194,7 @@ async fn handle_patch( meta: RequestMeta, headers: HeaderMap, Path(upload::LocationPath { project_id, key }): Path, - Query(upload::LocationQueryParams { length, signature }): Query, + Query(LocationQueryParams { length, signature }): Query>, body: Body, ) -> axum::response::Result { check_kill_switch(&state)?; @@ -219,7 +219,7 @@ async fn handle_patch( })?; relay_log::trace!("Checking request"); - let scoping = check_request(&state, meta, length, project).await?; + let scoping = check_request(&state, meta, length.value(), project).await?; let stream = body .into_data_stream() @@ -227,7 +227,7 @@ async fn handle_patch( .boxed(); let stream = MeteredStream::new(stream, "upload"); - let (lower_bound, upper_bound) = match length { + let (lower_bound, upper_bound) = match length.value() { None => (1, config.max_upload_size()), Some(u) => (u, u), }; @@ -280,7 +280,7 @@ async fn create( state: &ServiceState, scoping: Scoping, upload_length: Option, -) -> Result { +) -> Result, Error> { let location = state .upload() .send(upload::Create { @@ -295,9 +295,9 @@ async fn create( async fn upload( state: &ServiceState, scoping: Scoping, - location: SignedLocation, + location: SignedLocation, stream: BoundedStream>, -) -> Result { +) -> Result, Error> { let location = state .upload() .send(upload::Stream { diff --git a/relay-server/src/processing/utils/attachments.rs b/relay-server/src/processing/utils/attachments.rs index f5a08e10883..89d3a5be63a 100644 --- a/relay-server/src/processing/utils/attachments.rs +++ b/relay-server/src/processing/utils/attachments.rs @@ -40,6 +40,8 @@ fn validate(item: &Item, config: &Config) -> Result<(), ProcessingError> { #[cfg(feature = "processing")] { + use crate::services::upload::{Final, SignedLocation}; + if !item.is_attachment_ref() { return Ok(()); } @@ -47,16 +49,13 @@ fn validate(item: &Item, config: &Config) -> Result<(), ProcessingError> { let payload = item.payload(); let payload: AttachmentPlaceholder = serde_json::from_slice(&payload).map_err(|_| ProcessingError::InvalidAttachmentRef)?; - let signed_location = - crate::services::upload::SignedLocation::try_from_str(payload.location) - .ok_or(ProcessingError::InvalidAttachmentRef)?; + let signed_location: SignedLocation = SignedLocation::try_from_str(payload.location) + .ok_or(ProcessingError::InvalidAttachmentRef)?; // NOTE: Using the received timestamp here breaks tests without a pop-relay. let location = signed_location .verify(chrono::Utc::now(), config) .map_err(|_| ProcessingError::InvalidAttachmentRef)?; - let signed_length = location - .length - .ok_or(ProcessingError::InvalidAttachmentRef)?; + let signed_length = location.length.into_inner(); match item.attachment_body_size() == signed_length { true => Ok(()), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 19b3aa401fe..7c789c3d0fb 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -39,7 +39,7 @@ use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -use crate::services::upload::SignedLocation; +use crate::services::upload::{Final, SignedLocation}; use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::{self, FormDataIter}; @@ -1031,7 +1031,7 @@ impl StoreService { let payload = item.payload(); let placeholder: AttachmentPlaceholder<'_> = serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?; - let location = SignedLocation::try_from_str(placeholder.location) + let location = SignedLocation::::try_from_str(placeholder.location) .ok_or(StoreError::InvalidAttachmentRef)? .verify(Utc::now(), &self.config) .map_err(|_| StoreError::InvalidAttachmentRef)?; diff --git a/relay-server/src/services/upload.rs b/relay-server/src/services/upload.rs index c0ea2447ed0..ffe1ebcc211 100644 --- a/relay-server/src/services/upload.rs +++ b/relay-server/src/services/upload.rs @@ -96,12 +96,12 @@ pub enum Upload { /// Creates an upload resource. /// /// Returns the trusted identifier of the upload. - Create(Create, InstrumentedSender), + Create(Create, InstrumentedSender), /// Upload a stream of bytes for a given location. /// /// The service also returns the signed location. This is redundant, but creates a simpler /// flow for the caller side. - Upload(Stream, InstrumentedSender), + Upload(Stream, InstrumentedSender), } impl Interface for Upload {} @@ -126,15 +126,18 @@ pub struct Stream { /// The organization & project that the stream belongs to. pub scoping: Scoping, /// The location to upload to. - pub location: SignedLocation, + pub location: SignedLocation, /// The body to be uploaded to objectstore, with length validation. pub stream: BoundedStream>, } impl FromMessage for Upload { - type Response = AsyncResponse>; + type Response = AsyncResponse, Error>>; - fn from_message(message: Create, sender: Sender>) -> Self { + fn from_message( + message: Create, + sender: Sender, Error>>, + ) -> Self { Self::Create( message, InstrumentedSender { @@ -146,9 +149,9 @@ impl FromMessage for Upload { } impl FromMessage for Upload { - type Response = AsyncResponse>; + type Response = AsyncResponse, Error>>; - fn from_message(message: Stream, sender: Sender>) -> Self { + fn from_message(message: Stream, sender: Sender, Error>>) -> Self { Self::Upload( message, InstrumentedSender { @@ -207,13 +210,13 @@ pub struct Service { } /// A response channel that emits a metric for each response. -pub struct InstrumentedSender { +pub struct InstrumentedSender { metric: RelayCounters, - inner: Sender>, + inner: Sender, Error>>, } -impl InstrumentedSender { - fn send(self, result: Result) { +impl InstrumentedSender { + fn send(self, result: Result, Error>) { let result_msg = match &result { Ok(_) => "success", Err(e) => e.variant(), @@ -236,7 +239,10 @@ enum Backend { } impl Service { - async fn create(&self, Create { scoping, length }: Create) -> Result { + async fn create( + &self, + Create { scoping, length }: Create, + ) -> Result, Error> { match &self.backend { Backend::Upstream { addr } => { let (request, rx) = UploadRequest::create(scoping, length); @@ -251,14 +257,14 @@ impl Service { Location { project_id: scoping.project_id, key, - length, + length: Provisional(length), } .try_sign(config) } } } - async fn upload(&self, stream: Stream) -> Result { + async fn upload(&self, stream: Stream) -> Result, Error> { match &self.backend { Backend::Upstream { addr } => { let (request, rx) = UploadRequest::upload(stream); @@ -281,7 +287,7 @@ impl Service { } = location.verify(received, config)?; debug_assert_eq!(scoping.project_id, project_id); - debug_assert!(stream.length().is_none_or(|l| Some(l) == length)); + debug_assert!(stream.length().is_none_or(|l| Some(l) == length.value())); let byte_counter = stream.byte_counter(); let key = addr @@ -294,7 +300,7 @@ impl Service { .await .map_err(Error::ObjectstoreServiceUnavailable)?? .into_inner(); - let length = Some(byte_counter.get()); + let length = Final(byte_counter.get()); Location { project_id, @@ -306,10 +312,11 @@ impl Service { } } - async fn timeout>>( - &self, - future: F, - ) -> Result { + async fn timeout(&self, future: F) -> Result, Error> + where + L: UploadLength, + F: IntoFuture, Error>>, + { tokio::time::timeout(self.timeout, future).await? } } @@ -332,11 +339,57 @@ impl SimpleService for Service { impl LoadShed for Service { fn handle_loadshed(&self, message: Upload) { match message { - Upload::Create(_, tx) | Upload::Upload(_, tx) => tx.send(Err(Error::LoadShed)), + Upload::Create(_, tx) => tx.send(Err(Error::LoadShed)), + Upload::Upload(_, tx) => tx.send(Err(Error::LoadShed)), } } } +/// An interface for known or unknown upload lengths. +/// +/// This allows code sharing between [`Provisional`] and [`Final`] upload locations. +pub trait UploadLength: for<'de> Deserialize<'de> { + fn value(&self) -> Option; +} + +/// A provisional upload length which may or may not yet be known. +/// +/// /// See also [`Final`]. +#[derive(Clone, Copy, Deserialize)] +pub struct Provisional(Option); + +impl Provisional { + /// Defines a provisional upload length. + pub fn new(value: Option) -> Self { + Self(value) + } +} + +impl UploadLength for Provisional { + fn value(&self) -> Option { + self.0.clone() + } +} + +/// A final upload length that represents the actual amount of bytes uploaded to objectstore. +/// +/// See also [`Provisional`]. +#[derive(Clone, Copy, Deserialize)] +pub struct Final(usize); + +impl Final { + /// Get the value. + pub fn into_inner(self) -> usize { + self.0 + } +} + +impl UploadLength for Final { + fn value(&self) -> Option { + Some(self.0) + } +} + /// An identifier for the upload. /// /// The location can be converted into a URI to be put in the `Location` HTTP header @@ -345,30 +398,30 @@ impl LoadShed for Service { /// Calling [`Self::try_sign`] appends a `&signature=` query parameter that can later be used /// to validate whether the URI (especially the length) has been tempered with. #[derive(Debug)] -pub struct Location { +pub struct Location { /// Sentry project ID. pub project_id: ProjectId, /// Objectstore identifier. pub key: String, /// Value of the `Upload-Length` header. `None` if `Upload-Defer-Length: 1`. - pub length: Option, + pub length: L, } -impl Location { +impl Location { fn as_uri(&self) -> String { let Location { project_id, key, - length, + length: _, } = self; - match length { + match self.length.value() { Some(length) => format!("/api/{project_id}/upload/{key}/?length={length}"), None => format!("/api/{project_id}/upload/{key}/"), } } #[cfg(feature = "processing")] - fn try_sign(self, config: &Config) -> Result { + fn try_sign(self, config: &Config) -> Result, Error> { let uri = self.as_uri(); let signature = config .credentials() @@ -397,29 +450,53 @@ pub struct LocationPath { } /// Query parameters for the upload endpoint. -#[derive(Debug, Deserialize)] -pub struct LocationQueryParams { - pub length: Option, +#[derive(Debug)] +pub struct LocationQueryParams { + pub length: L, pub signature: String, } +#[derive(Deserialize)] +struct Helper(LocationQueryParams); + +impl<'de> Deserialize<'de> for LocationQueryParams { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Helper::deserialize(deserializer).map(|helper| helper.0) + } +} + +impl<'de> Deserialize<'de> for LocationQueryParams { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let LocationQueryParams:: { length, signature } = + Helper::deserialize(deserializer)?.0; + let Some(length) = length.value() else { + return Err(serde::de::Error::custom("missing length")); + }; + Ok(Self { + length: Final(length), + signature, + }) + } +} + /// A verifiable [`Location`] signed by this Relay or an upstream Relay. #[derive(Debug)] -pub struct SignedLocation { - location: Location, +pub struct SignedLocation { + location: Location, signature: Signature, } -impl SignedLocation { +impl SignedLocation { /// Creates an unverified location from path and query params. /// /// Call `verify` to make sure the signature is correct. - pub fn from_parts( - project_id: ProjectId, - key: String, - length: Option, - signature: String, - ) -> Self { + pub fn from_parts(project_id: ProjectId, key: String, length: L, signature: String) -> Self { // TODO: forward compat: allow other query params? Self { location: Location { @@ -442,7 +519,11 @@ impl SignedLocation { signature, } = self; let mut uri = location.as_uri(); - uri.push(if location.length.is_some() { '&' } else { '?' }); // TODO: brittle. + uri.push(if location.length.value().is_some() { + '&' + } else { + '?' + }); // TODO: brittle. uri.push_str("signature="); uri.push_str(&signature.to_string()); uri @@ -452,7 +533,7 @@ impl SignedLocation { /// /// Fails if the signature is outdated or incorrect. #[cfg(feature = "processing")] - pub fn verify(self, received: DateTime, config: &Config) -> Result { + pub fn verify(self, received: DateTime, config: &Config) -> Result, Error> { let public_key = config.public_key().ok_or(Error::SigningFailed)?; let is_valid = self.signature.verify( self.location.as_uri().as_bytes(), @@ -465,7 +546,12 @@ impl SignedLocation { false => Err(Error::InvalidSignature), } } +} +impl SignedLocation +where + LocationQueryParams: for<'de> Deserialize<'de>, +{ fn try_from_response(response: Response) -> Result { match response.0.error_for_status() { Ok(response) => { @@ -510,7 +596,7 @@ enum RequestKind { length: Option, }, Upload { - location: SignedLocation, + location: SignedLocation, stream: TakeOnce>>, length: Option, encoding: HttpEncoding, diff --git a/relay-server/src/utils/tus.rs b/relay-server/src/utils/tus.rs index aa3582c8519..333592ac1f4 100644 --- a/relay-server/src/utils/tus.rs +++ b/relay-server/src/utils/tus.rs @@ -29,9 +29,6 @@ pub enum Error { /// The `Upload-Offset` header is missing or invalid #[error("expected Upload-Offset: 0, got: {0:?}")] UploadOffset(Option), - /// The `Upload-Defer-Length` header is not allowed for external/untrusted requests. - #[error("Upload-Defer-Length not allowed")] - DeferLengthNotAllowed, /// The `Content-Type` header is not what TUS expects. #[error("expected Content-Type: {expected}, got: {received}")] ContentType { @@ -78,10 +75,7 @@ const EXPECTED_CONTENT_TYPE_STR: &str = "application/offset+octet-stream"; /// Validates TUS protocol headers and returns a subset of parsed values. /// /// Returns the declared `Upload-Length`. -pub fn validate_post_headers( - headers: &HeaderMap, - allow_defer_length: bool, -) -> Result, Error> { +pub fn validate_post_headers(headers: &HeaderMap) -> Result, Error> { let tus_version = headers.get(TUS_RESUMABLE); if tus_version != Some(&TUS_VERSION) { return Err(Error::Version( @@ -102,10 +96,9 @@ pub fn validate_post_headers( // Exactly one of Upload-Length and Upload-Defer-Length must be present. // Upload-Defer-Length is only accepted if its value is 1 (as demanded by the TUS protocol) // and `allow_defer_length` is true (i.e. the sender is trusted/internal). - let upload_length = match (upload_length, upload_defer_length, allow_defer_length) { - (Some(u), None, _) => Ok(Some(u)), - (None, Some(1), true) => Ok(None), - (None, Some(1), false) => Err(Error::DeferLengthNotAllowed), + let upload_length = match (upload_length, upload_defer_length) { + (Some(u), None) => Ok(Some(u)), + (None, Some(1)) => Ok(None), _ => Err(Error::UploadLength { upload_length, upload_defer_length, @@ -183,7 +176,7 @@ mod tests { #[test] fn test_validate_tus_headers_missing_version() { let headers = HeaderMap::new(); - let result = validate_post_headers(&headers, false); + let result = validate_post_headers(&headers); assert!(matches!(result, Err(Error::Version(_)))); } @@ -191,7 +184,7 @@ mod tests { fn test_validate_tus_headers_missing_length() { let mut headers = HeaderMap::new(); headers.insert(TUS_RESUMABLE, HeaderValue::from_static("1.0.0")); - let result = validate_post_headers(&headers, false); + let result = validate_post_headers(&headers); assert!(matches!(result, Err(Error::UploadLength { .. }))); } @@ -202,7 +195,7 @@ mod tests { headers.insert(hyper::header::CONTENT_LENGTH, 1024.into()); headers.insert(hyper::header::CONTENT_TYPE, EXPECTED_CONTENT_TYPE); headers.insert(UPLOAD_LENGTH, HeaderValue::from_static("1024")); - let result = validate_post_headers(&headers, false); + let result = validate_post_headers(&headers); assert!(matches!(result, Err(Error::ContentType { .. }))); } @@ -211,7 +204,7 @@ mod tests { let mut headers = HeaderMap::new(); headers.insert(TUS_RESUMABLE, HeaderValue::from_static("1.0.0")); headers.insert(UPLOAD_LENGTH, HeaderValue::from_static("1024")); - let result = validate_post_headers(&headers, false); + let result = validate_post_headers(&headers); assert_eq!(result.unwrap(), Some(1024)); } @@ -224,7 +217,7 @@ mod tests { HeaderValue::from_static("chunked"), ); headers.insert(UPLOAD_LENGTH, HeaderValue::from_static("1024")); - let result = validate_post_headers(&headers, false); + let result = validate_post_headers(&headers); assert_eq!(result.unwrap(), Some(1024)); } @@ -233,12 +226,12 @@ mod tests { let mut headers = HeaderMap::new(); headers.insert(TUS_RESUMABLE, HeaderValue::from_static("0.2.0")); headers.insert(UPLOAD_LENGTH, HeaderValue::from_static("1024")); - let result = validate_post_headers(&headers, false); + let result = validate_post_headers(&headers); assert!(matches!(result, Err(Error::Version(_)))); } #[test] - fn test_validate_tus_headers_valid_deferred_length_from_trusted_source() { + fn test_validate_tus_headers_valid_deferred_length() { let mut headers = HeaderMap::new(); headers.insert(TUS_RESUMABLE, HeaderValue::from_static("1.0.0")); headers.insert( @@ -246,25 +239,16 @@ mod tests { HeaderValue::from_static("chunked"), ); headers.insert(UPLOAD_DEFER_LENGTH, HeaderValue::from_static("1")); - let result = validate_post_headers(&headers, true); + let result = validate_post_headers(&headers); assert!(matches!(result, Ok(None))); } - #[test] - fn test_validate_tus_headers_valid_deferred_length_from_untrusted_source() { - let mut headers = HeaderMap::new(); - headers.insert(TUS_RESUMABLE, HeaderValue::from_static("1.0.0")); - headers.insert(UPLOAD_DEFER_LENGTH, HeaderValue::from_static("1")); - let result = validate_post_headers(&headers, false); - assert!(matches!(result, Err(Error::DeferLengthNotAllowed))); - } - #[test] fn test_validate_tus_headers_invalid_deferred_length() { let mut headers = HeaderMap::new(); headers.insert(TUS_RESUMABLE, HeaderValue::from_static("1.0.0")); headers.insert(UPLOAD_DEFER_LENGTH, HeaderValue::from_static("2")); - let result = validate_post_headers(&headers, true); + let result = validate_post_headers(&headers); assert!(matches!(result, Err(Error::UploadLength { .. }))); } diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index aeb4cb9aa12..9365148b5f9 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -985,6 +985,46 @@ def test_minidump_objectstore_uploads( assert minidump.payload.bytes == minidump_content +def test_minidump_objectstore_uploads_external_chain( + mini_sentry, + relay, + relay_with_processing, + attachments_consumer, +): + project_id = 42 + minidump_content = b"MDMP content" + log_content = b"Some log file content" + + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).extend( + [ + # "projects:relay-minidump-uploads", # TODO: why 400 here (though not related to PR) + "projects:relay-minidump-attachment-uploads", # TODO: why 400 here (though not related to PR) + "projects:relay-upload-endpoint", + ] + ) + mini_sentry.global_config["options"]["relay.endpoint-fetch-config.enabled"] = True + + relay = relay(relay_with_processing(), external=True) + project_config["config"]["trustedRelays"] = list(relay.iter_public_keys()) + + attachments_consumer = attachments_consumer() + + response = relay.send_minidump( + project_id=project_id, + files=[ + (MINIDUMP_ATTACHMENT_NAME, "minidump.dmp", minidump_content), + ("logs", "log.txt", log_content), + ], + ) + assert response.ok + + attachment, _ = attachments_consumer.get_individual_attachment() + attachment, _ = attachments_consumer.get_individual_attachment() + event, _ = attachments_consumer.get_event() + assert UUID(event["event_id"]) == UUID(response.text) + + def test_minidump_objectstore_errors( mini_sentry, relay, @@ -1024,7 +1064,6 @@ def create(**opts): ("logs", "log.txt", log_content), ], ) - mini_sentry.captured_envelopes.get() assert mini_sentry.get_aggregated_outcomes() == [