Skip to content
This repository was archived by the owner on Jul 28, 2025. It is now read-only.

Commit 79f9a35

Browse files
committed
api: Update 'missing_content' to give upload endpoint(s)
1 parent af2b057 commit 79f9a35

4 files changed

Lines changed: 103 additions & 64 deletions

File tree

crates/api/src/v1/package.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use warg_protocol::{
1212

1313
/// Represents the supported kinds of content sources.
1414
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
15-
#[serde(tag = "type", rename_all = "camelCase")]
15+
#[serde(tag = "type", rename_all = "kebab-case")]
1616
pub enum ContentSource {
1717
/// The content is located at an anonymous HTTP URL.
1818
Http {
@@ -21,6 +21,26 @@ pub enum ContentSource {
2121
},
2222
}
2323

24+
/// Represents the supported kinds of content upload endpoints.
25+
#[derive(Clone, Debug, Serialize, Deserialize)]
26+
#[serde(tag = "type", rename_all = "kebab-case")]
27+
pub enum UploadEndpoint {
28+
/// Content may be uploaded via HTTP POST to the given URL.
29+
/// If the endpoint responds with "201 Created" and a Location header, that
30+
/// header's value will be the content source.
31+
HttpPost {
32+
/// The URL to POST content to.
33+
url: String,
34+
},
35+
}
36+
37+
/// Information about missing content.
38+
#[derive(Clone, Debug, Serialize, Deserialize)]
39+
pub struct MissingContent {
40+
/// Upload endpoint(s) that may be used to provide missing content.
41+
pub upload: Vec<UploadEndpoint>,
42+
}
43+
2444
/// Represents a request to publish a record to a package log.
2545
#[derive(Serialize, Deserialize)]
2646
#[serde(rename = "camelCase")]
@@ -55,13 +75,13 @@ impl PackageRecord {
5575
}
5676
}
5777

58-
/// Gets the missing content digests of the record.
59-
pub fn missing_content(&self) -> &[AnyHash] {
78+
/// Gets the missing content of the record.
79+
pub fn missing_content(&self) -> impl Iterator<Item = (&AnyHash, &MissingContent)> {
6080
match &self.state {
6181
PackageRecordState::Sourcing {
6282
missing_content, ..
63-
} => missing_content,
64-
_ => &[],
83+
} => Box::new(missing_content.iter()) as Box<dyn Iterator<Item = _>>,
84+
_ => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = _>>,
6585
}
6686
}
6787
}
@@ -78,7 +98,7 @@ pub enum PackageRecordState {
7898
/// The package record needs content sources.
7999
Sourcing {
80100
/// The digests of the missing content.
81-
missing_content: Vec<AnyHash>,
101+
missing_content: HashMap<AnyHash, MissingContent>,
82102
},
83103
/// The package record is processing.
84104
Processing,

crates/client/src/api.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,9 @@ impl Client {
349349
/// Uploads package content to the registry.
350350
pub async fn upload_content(
351351
&self,
352-
log_id: &LogId,
353-
record_id: &RecordId,
354-
digest: &AnyHash,
352+
url: &str,
355353
content: impl Into<Body>,
356354
) -> Result<String, ClientError> {
357-
let url = self
358-
.url
359-
.join(&paths::package_record_content(log_id, record_id, digest))
360-
.unwrap();
361355
tracing::debug!("uploading content to `{url}`");
362356

363357
let response = self.client.post(url).body(content).send().await?;

crates/client/src/lib.rs

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use storage::{
1313
use thiserror::Error;
1414
use warg_api::v1::{
1515
fetch::{FetchError, FetchLogsRequest, FetchLogsResponse},
16-
package::{PackageError, PackageRecord, PackageRecordState, PublishRecordRequest},
16+
package::{
17+
MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
18+
UploadEndpoint,
19+
},
1720
proof::{ConsistencyRequest, InclusionRequest},
1821
};
1922
use warg_crypto::{
@@ -154,34 +157,33 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {
154157
})
155158
})?;
156159

157-
let missing = record.missing_content();
158-
if !missing.is_empty() {
159-
// Upload the missing content
160-
// TODO: parallelize this
161-
for digest in record.missing_content() {
162-
self.api
163-
.upload_content(
164-
&log_id,
165-
&record.id,
166-
digest,
167-
Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
168-
|| ClientError::ContentNotFound {
169-
digest: digest.clone(),
170-
},
171-
)?),
172-
)
173-
.await
174-
.map_err(|e| match e {
175-
api::ClientError::Package(PackageError::Rejection(reason)) => {
176-
ClientError::PublishRejected {
177-
id: package.id.clone(),
178-
record_id: record.id.clone(),
179-
reason,
180-
}
160+
// TODO: parallelize this
161+
for (digest, MissingContent { upload }) in record.missing_content() {
162+
// Upload the missing content, if the registry supports it
163+
let Some(UploadEndpoint::HttpPost {url}) = upload.first() else {
164+
continue;
165+
};
166+
167+
self.api
168+
.upload_content(
169+
url,
170+
Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
171+
|| ClientError::ContentNotFound {
172+
digest: digest.clone(),
173+
},
174+
)?),
175+
)
176+
.await
177+
.map_err(|e| match e {
178+
api::ClientError::Package(PackageError::Rejection(reason)) => {
179+
ClientError::PublishRejected {
180+
id: package.id.clone(),
181+
record_id: record.id.clone(),
182+
reason,
181183
}
182-
_ => e.into(),
183-
})?;
184-
}
184+
}
185+
_ => e.into(),
186+
})?;
185187
}
186188

187189
Ok(record.id)

crates/server/src/api/v1/package.rs

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ use axum::{
1616
Router,
1717
};
1818
use futures::StreamExt;
19-
use std::path::PathBuf;
2019
use std::sync::Arc;
20+
use std::{collections::HashMap, path::PathBuf};
2121
use tempfile::NamedTempFile;
2222
use tokio::io::AsyncWriteExt;
2323
use url::Url;
2424
use warg_api::v1::package::{
25-
ContentSource, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
25+
ContentSource, MissingContent, PackageError, PackageRecord, PackageRecordState,
26+
PublishRecordRequest, UploadEndpoint,
2627
};
2728
use warg_crypto::hash::{AnyHash, Sha256};
2829
use warg_protocol::{
@@ -83,13 +84,32 @@ impl Config {
8384
self.files_dir.join(self.content_file_name(digest))
8485
}
8586

86-
fn content_url(&self, digest: &AnyHash) -> String {
87-
self.content_base_url
88-
.join("content/")
89-
.unwrap()
90-
.join(&self.content_file_name(digest))
91-
.unwrap()
92-
.to_string()
87+
fn content_url(&self, log_id: &LogId, record_id: &RecordId, digest: &AnyHash) -> String {
88+
format!(
89+
"{url}/{log_id}/record/{record_id}/content/{digest}",
90+
url = self.content_base_url,
91+
)
92+
}
93+
94+
fn missing_content<'a>(
95+
&self,
96+
log_id: &LogId,
97+
record_id: &RecordId,
98+
missing_digests: impl IntoIterator<Item = &'a AnyHash>,
99+
) -> HashMap<AnyHash, MissingContent> {
100+
missing_digests
101+
.into_iter()
102+
.map(|digest| {
103+
(
104+
digest.clone(),
105+
MissingContent {
106+
upload: vec![UploadEndpoint::HttpPost {
107+
url: self.content_url(log_id, record_id, digest),
108+
}],
109+
},
110+
)
111+
})
112+
.collect()
93113
}
94114
}
95115

@@ -231,13 +251,12 @@ async fn publish_record(
231251
));
232252
}
233253

254+
let missing_content = config.missing_content(&log_id, &record_id, missing);
234255
Ok((
235256
StatusCode::ACCEPTED,
236257
Json(PackageRecord {
237258
id: record_id,
238-
state: PackageRecordState::Sourcing {
239-
missing_content: missing.into_iter().cloned().collect(),
240-
},
259+
state: PackageRecordState::Sourcing { missing_content },
241260
}),
242261
))
243262
}
@@ -254,12 +273,13 @@ async fn get_record(
254273
.await?;
255274

256275
match record.status {
257-
RecordStatus::MissingContent(missing) => Ok(Json(PackageRecord {
258-
id: record_id,
259-
state: PackageRecordState::Sourcing {
260-
missing_content: missing,
261-
},
262-
})),
276+
RecordStatus::MissingContent(missing) => {
277+
let missing_content = config.missing_content(&log_id, &record_id, &missing);
278+
Ok(Json(PackageRecord {
279+
id: record_id,
280+
state: PackageRecordState::Sourcing { missing_content },
281+
}))
282+
}
263283
// Validated is considered still processing until included in a checkpoint
264284
RecordStatus::Pending | RecordStatus::Validated => Ok(Json(PackageRecord {
265285
id: record_id,
@@ -275,11 +295,11 @@ async fn get_record(
275295
.as_ref()
276296
.contents()
277297
.into_iter()
278-
.map(|d| {
298+
.map(|digest| {
279299
(
280-
d.clone(),
300+
digest.clone(),
281301
vec![ContentSource::Http {
282-
url: config.content_url(d),
302+
url: config.content_url(&log_id, &record_id, digest),
283303
}],
284304
)
285305
})
@@ -361,13 +381,16 @@ async fn upload_content(
361381
{
362382
config
363383
.core_service
364-
.submit_package_record(log_id, record_id.clone())
384+
.submit_package_record(log_id.clone(), record_id.clone())
365385
.await;
366386
}
367387

368388
Ok((
369389
StatusCode::CREATED,
370-
[(axum::http::header::LOCATION, config.content_url(&digest))],
390+
[(
391+
axum::http::header::LOCATION,
392+
config.content_url(&log_id, &record_id, &digest),
393+
)],
371394
))
372395
}
373396

0 commit comments

Comments
 (0)