Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions relay-server/src/endpoints/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use crate::service::ServiceState;
#[cfg(feature = "processing")]
use crate::services::objectstore;
use crate::services::projects::cache::Project;
use crate::services::upload::{self, ByteStream, SignedLocation};
use crate::services::upload::{
self, ByteStream, Final, LocationQueryParams, Provisional, SignedLocation, UploadLength,
};
use crate::services::upstream::UpstreamRequestError;
use crate::utils::{ApiErrorResponse, MeteredStream};
use crate::utils::{BoundedStream, find_error_source, tus};
Expand Down Expand Up @@ -75,7 +77,6 @@ impl IntoResponse for Error {
}

let status = match self {
Error::Tus(tus::Error::DeferLengthNotAllowed) => StatusCode::FORBIDDEN,
Error::Tus(_) => StatusCode::BAD_REQUEST,
Error::Request(error) => return error.into_response(),
Error::SendError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -128,7 +129,7 @@ impl IntoResponse for Error {
}
}

impl IntoResponse for SignedLocation {
impl<L: UploadLength> IntoResponse for SignedLocation<L> {
fn into_response(self) -> Response {
let mut headers = tus::response_headers();
match self.into_header_value() {
Expand All @@ -152,8 +153,7 @@ async fn handle_post(
check_kill_switch(&state)?;

relay_log::trace!("Validating headers");
let upload_length = tus::validate_post_headers(&headers, meta.request_trust().is_trusted())
.map_err(Error::from)?;
let upload_length = tus::validate_post_headers(&headers).map_err(Error::from)?;
let config = state.config();

if upload_length.is_some_and(|len| len > config.max_upload_size()) {
Expand Down Expand Up @@ -194,7 +194,7 @@ async fn handle_patch(
meta: RequestMeta,
headers: HeaderMap,
Path(upload::LocationPath { project_id, key }): Path<upload::LocationPath>,
Query(upload::LocationQueryParams { length, signature }): Query<upload::LocationQueryParams>,
Query(LocationQueryParams { length, signature }): Query<LocationQueryParams<Provisional>>,
body: Body,
) -> axum::response::Result<impl IntoResponse> {
check_kill_switch(&state)?;
Expand All @@ -219,15 +219,15 @@ async fn handle_patch(
})?;

relay_log::trace!("Checking request");
let scoping = check_request(&state, meta, length, project).await?;
let scoping = check_request(&state, meta, length.value(), project).await?;

let stream = body
.into_data_stream()
.map(|result| result.map_err(io::Error::other))
.boxed();
let stream = MeteredStream::new(stream, "upload");

let (lower_bound, upper_bound) = match length {
let (lower_bound, upper_bound) = match length.value() {
None => (1, config.max_upload_size()),
Some(u) => (u, u),
};
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn create(
state: &ServiceState,
scoping: Scoping,
upload_length: Option<usize>,
) -> Result<SignedLocation, Error> {
) -> Result<SignedLocation<Provisional>, Error> {
let location = state
.upload()
.send(upload::Create {
Expand All @@ -295,9 +295,9 @@ async fn create(
async fn upload(
state: &ServiceState,
scoping: Scoping,
location: SignedLocation,
location: SignedLocation<Provisional>,
stream: BoundedStream<MeteredStream<ByteStream>>,
) -> Result<SignedLocation, Error> {
) -> Result<SignedLocation<Final>, Error> {
let location = state
.upload()
.send(upload::Stream {
Expand Down
11 changes: 5 additions & 6 deletions relay-server/src/processing/utils/attachments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,22 @@ fn validate(item: &Item, config: &Config) -> Result<(), ProcessingError> {

#[cfg(feature = "processing")]
{
use crate::services::upload::{Final, SignedLocation};

if !item.is_attachment_ref() {
return Ok(());
}

let payload = item.payload();
let payload: AttachmentPlaceholder =
serde_json::from_slice(&payload).map_err(|_| ProcessingError::InvalidAttachmentRef)?;
let signed_location =
crate::services::upload::SignedLocation::try_from_str(payload.location)
.ok_or(ProcessingError::InvalidAttachmentRef)?;
let signed_location: SignedLocation<Final> = SignedLocation::try_from_str(payload.location)
.ok_or(ProcessingError::InvalidAttachmentRef)?;
// NOTE: Using the received timestamp here breaks tests without a pop-relay.
let location = signed_location
.verify(chrono::Utc::now(), config)
.map_err(|_| ProcessingError::InvalidAttachmentRef)?;
let signed_length = location
.length
.ok_or(ProcessingError::InvalidAttachmentRef)?;
let signed_length = location.length.into_inner();

match item.attachment_body_size() == signed_length {
true => Ok(()),
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::upload::SignedLocation;
use crate::services::upload::{Final, SignedLocation};
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{self, FormDataIter};

Expand Down Expand Up @@ -1031,7 +1031,7 @@ impl StoreService {
let payload = item.payload();
let placeholder: AttachmentPlaceholder<'_> =
serde_json::from_slice(&payload).map_err(|_| StoreError::InvalidAttachmentRef)?;
let location = SignedLocation::try_from_str(placeholder.location)
let location = SignedLocation::<Final>::try_from_str(placeholder.location)
.ok_or(StoreError::InvalidAttachmentRef)?
.verify(Utc::now(), &self.config)
.map_err(|_| StoreError::InvalidAttachmentRef)?;
Expand Down
Loading
Loading