Skip to content

Commit 38c6684

Browse files
committed
Revert "feat(data): allow uploading without known size"
This reverts commit 66af7d0.
1 parent 4afa91f commit 38c6684

7 files changed

Lines changed: 44 additions & 81 deletions

File tree

data/src/bin/example.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async fn clean(mut reader: impl Read, mut writer: impl Write, size: u64) -> Resu
9191
FileUploadSession::new(TranslatorConfig::local_config(std::env::current_dir()?)?.into(), None).await?;
9292

9393
let mut size_read = 0;
94-
let mut handle = translator.start_clean(None, Some(size), None).await;
94+
let mut handle = translator.start_clean(None, size, None).await;
9595

9696
loop {
9797
let bytes = reader.read(&mut read_buf)?;

data/src/data_client.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ pub async fn clean_bytes(
249249
bytes: Vec<u8>,
250250
name: Option<Arc<str>>,
251251
) -> errors::Result<(XetFileInfo, DeduplicationMetrics)> {
252-
let mut handle = processor.start_clean(name, Some(bytes.len() as u64), None).await;
252+
let mut handle = processor.start_clean(name, bytes.len() as u64, None).await;
253253
handle.add_data(&bytes).await?;
254254
handle.finish().await
255255
}
@@ -270,11 +270,7 @@ pub async fn clean_file(
270270
let mut buffer = vec![0u8; u64::min(filesize, *xet_config().data.ingestion_block_size) as usize];
271271

272272
let mut handle = processor
273-
.start_clean(
274-
Some(filename.as_ref().to_string_lossy().into()),
275-
Some(filesize),
276-
Sha256::from_hex(sha256.as_ref()).ok(),
277-
)
273+
.start_clean(Some(filename.as_ref().to_string_lossy().into()), filesize, Sha256::from_hex(sha256.as_ref()).ok())
278274
.await;
279275

280276
loop {

data/src/file_upload_session.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,14 +260,13 @@ impl FileUploadSession {
260260
pub async fn start_clean(
261261
self: &Arc<Self>,
262262
file_name: Option<Arc<str>>,
263-
size: Option<u64>,
263+
size: u64,
264264
sha256: Option<Sha256>,
265265
) -> SingleFileCleaner {
266-
// Get a new file id for the completion tracking.
267-
// When size is unknown (None), register with 0 to disable progress assertions.
266+
// Get a new file id for the completion tracking
268267
let file_id = self
269268
.completion_tracker
270-
.register_new_file(file_name.clone().unwrap_or_default(), size.unwrap_or(0))
269+
.register_new_file(file_name.clone().unwrap_or_default(), size)
271270
.await;
272271

273272
SingleFileCleaner::new(file_name, file_id, sha256, self.clone())
@@ -572,7 +571,7 @@ mod tests {
572571
.unwrap();
573572

574573
let mut cleaner = upload_session
575-
.start_clean(Some("test".into()), Some(read_data.len() as u64), None)
574+
.start_clean(Some("test".into()), read_data.len() as u64, None)
576575
.await;
577576

578577
// Read blocks from the source file and hand them to the cleaning handle

data/src/streaming.rs

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl XetClient {
7070
&self,
7171
progress_updater: Option<Arc<dyn TrackingProgressUpdater>>,
7272
name: Option<Arc<str>>,
73-
size: Option<u64>,
73+
size: u64,
7474
) -> errors::Result<XetWriter> {
7575
XetWriter::new(self.config.clone(), progress_updater, name, size).await
7676
}
@@ -91,12 +91,12 @@ impl XetWriter {
9191
/// Creates a new writer that will upload a single file.
9292
///
9393
/// `size` is the total number of bytes that will be written and is used for
94-
/// progress tracking. Pass `None` if the content length is not known upfront.
94+
/// progress tracking. The caller must know the content length before writing.
9595
pub async fn new(
9696
config: Arc<TranslatorConfig>,
9797
progress_updater: Option<Arc<dyn TrackingProgressUpdater>>,
9898
name: Option<Arc<str>>,
99-
size: Option<u64>,
99+
size: u64,
100100
) -> errors::Result<Self> {
101101
let session = FileUploadSession::new(config, progress_updater).await?;
102102
let handle = session.start_clean(name, size, None).await;
@@ -291,8 +291,8 @@ mod tests {
291291

292292
/// Upload `content` via [`XetWriter`] and download it back via [`XetReader`],
293293
/// asserting the round-tripped bytes match the original.
294-
async fn assert_roundtrip(client: &XetClient, content: &[u8], size: Option<u64>) {
295-
let mut writer = client.write(None, None, size).await.unwrap();
294+
async fn assert_roundtrip(client: &XetClient, content: &[u8]) {
295+
let mut writer = client.write(None, None, content.len() as u64).await.unwrap();
296296
for chunk in content.chunks(4096) {
297297
writer.write(Bytes::copy_from_slice(chunk)).await.unwrap();
298298
}
@@ -326,25 +326,12 @@ mod tests {
326326
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
327327

328328
// Empty.
329-
assert_roundtrip(&client, &[], Some(0)).await;
329+
assert_roundtrip(&client, &[]).await;
330330
// Small.
331-
assert_roundtrip(&client, b"Hello, World!", Some(13)).await;
331+
assert_roundtrip(&client, b"Hello, World!").await;
332332
// Large (1 MB).
333333
let large: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
334-
assert_roundtrip(&client, &large, Some(large.len() as u64)).await;
335-
}
336-
337-
#[tokio::test]
338-
async fn test_roundtrip_unknown_size() {
339-
let temp_dir = tempdir().unwrap();
340-
let endpoint = format!("local://{}", temp_dir.path().display());
341-
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
342-
343-
// Small without explicit size.
344-
assert_roundtrip(&client, b"Hello, World!", None).await;
345-
// Large (1 MB) without explicit size.
346-
let large: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
347-
assert_roundtrip(&client, &large, None).await;
334+
assert_roundtrip(&client, &large).await;
348335
}
349336

350337
#[tokio::test]
@@ -354,7 +341,7 @@ mod tests {
354341
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
355342

356343
let content: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
357-
let mut writer = client.write(None, None, Some(content.len() as u64)).await.unwrap();
344+
let mut writer = client.write(None, None, content.len() as u64).await.unwrap();
358345
for chunk in content.chunks(4096) {
359346
writer.write(Bytes::copy_from_slice(chunk)).await.unwrap();
360347
}
@@ -372,7 +359,7 @@ mod tests {
372359
let endpoint = format!("local://{}", temp_dir.path().display());
373360
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
374361

375-
let mut writer = client.write(None, None, Some(5)).await.unwrap();
362+
let mut writer = client.write(None, None, 5).await.unwrap();
376363
writer.write(Bytes::from_static(b"hello")).await.unwrap();
377364
let file_info = writer.close().await.unwrap();
378365

@@ -429,7 +416,7 @@ mod tests {
429416
let endpoint = format!("local://{}", temp_dir.path().display());
430417
let client = XetClient::new(Some(endpoint), None, None, "test".into()).unwrap();
431418

432-
let mut writer = client.write(None, None, Some(100)).await.unwrap();
419+
let mut writer = client.write(None, None, 100).await.unwrap();
433420
writer.write(Bytes::from_static(b"some data")).await.unwrap();
434421
writer.abort().await.unwrap();
435422

data/tests/test_session_resume.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ mod tests {
6767

6868
// Feed it half the data, and checkpoint.
6969
let mut cleaner = file_upload_session
70-
.start_clean(Some("data".into()), Some(data.len() as u64), None)
70+
.start_clean(Some("data".into()), data.len() as u64, None)
7171
.await;
7272
cleaner.add_data(&data[..half_n]).await.unwrap();
7373
cleaner.checkpoint().await.unwrap();
@@ -85,7 +85,7 @@ mod tests {
8585

8686
// Feed it half the data, and checkpoint.
8787
let mut cleaner = file_upload_session
88-
.start_clean(Some("data".into()), Some(data.len() as u64), None)
88+
.start_clean(Some("data".into()), data.len() as u64, None)
8989
.await;
9090

9191
// Add all the data. Roughly the first half should dedup.
@@ -140,7 +140,7 @@ mod tests {
140140

141141
// Feed it half the data, and checkpoint.
142142
let mut cleaner = file_upload_session
143-
.start_clean(Some("data".into()), Some(data.len() as u64), None)
143+
.start_clean(Some("data".into()), data.len() as u64, None)
144144
.await;
145145
cleaner.add_data(&data[..rn]).await.unwrap();
146146
cleaner.checkpoint().await.unwrap();
@@ -172,7 +172,7 @@ mod tests {
172172

173173
// Feed it half the data, and checkpoint.
174174
let mut cleaner = file_upload_session
175-
.start_clean(Some("data".into()), Some(data.len() as u64), None)
175+
.start_clean(Some("data".into()), data.len() as u64, None)
176176
.await;
177177

178178
// Add all the data. Roughly the first half should dedup.

progress_tracking/src/upload_tracking.rs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,7 @@ impl CompletionTrackerImpl {
141141
if dep.is_external {
142142
// This is the freebie case, where we can just increment the progress.
143143
file_entry.completed_bytes += dep.n_bytes;
144-
if file_entry.total_bytes > 0 {
145-
debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
146-
}
144+
debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
147145

148146
let progress_update = ItemProgressUpdate {
149147
item_name: file_entry.name.clone(),
@@ -165,9 +163,7 @@ impl CompletionTrackerImpl {
165163
// If the entry has already been completed, then just mark this as completed.
166164
if entry.is_completed {
167165
file_entry.completed_bytes += dep.n_bytes;
168-
if file_entry.total_bytes > 0 {
169-
debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
170-
}
166+
debug_assert_le!(file_entry.completed_bytes, file_entry.total_bytes);
171167

172168
let progress_update = ItemProgressUpdate {
173169
item_name: file_entry.name.clone(),
@@ -190,9 +186,7 @@ impl CompletionTrackerImpl {
190186
// Register that this much has been completed already
191187
self.total_bytes_completed += file_bytes_processed;
192188

193-
if self.total_bytes > 0 {
194-
debug_assert_le!(self.total_bytes_completed, self.total_bytes);
195-
}
189+
debug_assert_le!(self.total_bytes_completed, self.total_bytes);
196190

197191
// There may be a lot of per-file updates, but these don't actually count against the new byte total;
198192
// this is counted only using xorbs.
@@ -307,9 +301,7 @@ impl CompletionTrackerImpl {
307301
self.total_upload_bytes_completed += byte_completion_increment;
308302

309303
self.total_bytes_completed += file_bytes_processed;
310-
if self.total_bytes > 0 {
311-
debug_assert_le!(self.total_bytes_completed, self.total_bytes);
312-
}
304+
debug_assert_le!(self.total_bytes_completed, self.total_bytes);
313305

314306
ProgressUpdate {
315307
item_updates,
@@ -421,9 +413,7 @@ impl CompletionTrackerImpl {
421413
debug_assert_le!(self.total_upload_bytes_completed, self.total_upload_bytes);
422414

423415
self.total_bytes_completed += file_bytes_processed;
424-
if self.total_bytes > 0 {
425-
debug_assert_le!(self.total_bytes_completed, self.total_bytes);
426-
}
416+
debug_assert_le!(self.total_bytes_completed, self.total_bytes);
427417

428418
ProgressUpdate {
429419
item_updates,
@@ -467,14 +457,11 @@ impl CompletionTrackerImpl {
467457
fn assert_complete(&self) {
468458
// Check each file for completeness
469459
for (idx, file) in self.files.iter().enumerate() {
470-
// Skip size check for files registered without a known size (total_bytes == 0).
471-
if file.total_bytes > 0 {
472-
assert_eq!(
473-
file.completed_bytes, file.total_bytes,
474-
"File #{} ({}) is not fully completed: {}/{} bytes",
475-
idx, file.name, file.completed_bytes, file.total_bytes
476-
);
477-
}
460+
assert_eq!(
461+
file.completed_bytes, file.total_bytes,
462+
"File #{} ({}) is not fully completed: {}/{} bytes",
463+
idx, file.name, file.completed_bytes, file.total_bytes
464+
);
478465
assert!(
479466
file.remaining_xorbs_parts.is_empty(),
480467
"File #{} ({}) still has uncompleted xorb parts: {:?}",

progress_tracking/src/verification_wrapper.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,11 @@ impl ProgressUpdaterVerificationWrapper {
5151
let tr = self.tr.lock().await;
5252

5353
for (item_name, data) in tr.items.iter() {
54-
// Skip for files registered without a known size (total_count == 0).
55-
if data.total_count > 0 {
56-
assert_eq!(
57-
data.last_completed, data.total_count,
58-
"Item '{}' is not fully complete: {}/{}",
59-
item_name, data.last_completed, data.total_count
60-
);
61-
}
54+
assert_eq!(
55+
data.last_completed, data.total_count,
56+
"Item '{}' is not fully complete: {}/{}",
57+
item_name, data.last_completed, data.total_count
58+
);
6259
}
6360

6461
assert_eq!(tr.total_transfer_bytes_completed, tr.total_transfer_bytes);
@@ -100,16 +97,13 @@ impl TrackingProgressUpdater for ProgressUpdaterVerificationWrapper {
10097
);
10198

10299
// 2) `completed_count` must not exceed `total_count`
103-
// Skip for files registered without a known size (total_bytes == 0).
104-
if up.total_bytes > 0 {
105-
assert!(
106-
up.bytes_completed <= up.total_bytes,
107-
"Item '{}' completed_count {} exceeds total {}",
108-
up.item_name,
109-
up.bytes_completed,
110-
up.total_bytes
111-
);
112-
}
100+
assert!(
101+
up.bytes_completed <= up.total_bytes,
102+
"Item '{}' completed_count {} exceeds total {}",
103+
up.item_name,
104+
up.bytes_completed,
105+
up.total_bytes
106+
);
113107

114108
// 3) The increment must match the difference
115109
let expected_new = entry.last_completed + up.bytes_completion_increment;

0 commit comments

Comments
 (0)