Skip to content

Commit 9e5e929

Browse files
feat: automatically compress archive if profile folder is above a certain threshold
1 parent a5ddb78 commit 9e5e929

2 files changed

Lines changed: 96 additions & 18 deletions

File tree

src/run/uploader/profile_archive.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub struct ProfileArchive {
1313
pub enum ProfileArchiveContent {
1414
CompressedInMemory { data: Vec<u8> },
1515
UncompressedOnDisk { path: PathBuf },
16+
CompressedOnDisk { path: PathBuf },
1617
}
1718

1819
impl ProfileArchive {
@@ -39,13 +40,30 @@ impl ProfileArchive {
3940
content: ProfileArchiveContent::UncompressedOnDisk { path },
4041
})
4142
}
43+
44+
pub fn new_compressed_on_disk(path: PathBuf) -> Result<Self> {
45+
let metadata = std::fs::metadata(&path)?;
46+
if !metadata.is_file() {
47+
return Err(anyhow!("The provided path is not a file"));
48+
}
49+
let mut file = std::fs::File::open(&path)?;
50+
let mut buffer = Vec::new();
51+
use std::io::Read;
52+
file.read_to_end(&mut buffer)?;
53+
let hash = general_purpose::STANDARD.encode(md5::compute(&buffer).0);
54+
Ok(ProfileArchive {
55+
hash,
56+
content: ProfileArchiveContent::CompressedOnDisk { path },
57+
})
58+
}
4259
}
4360

4461
impl ProfileArchiveContent {
4562
pub async fn size(&self) -> Result<u64> {
4663
match &self {
4764
ProfileArchiveContent::CompressedInMemory { data } => Ok(data.len() as u64),
48-
ProfileArchiveContent::UncompressedOnDisk { path } => {
65+
ProfileArchiveContent::UncompressedOnDisk { path }
66+
| ProfileArchiveContent::CompressedOnDisk { path } => {
4967
let metadata = tokio::fs::metadata(path).await?;
5068
Ok(metadata.len())
5169
}
@@ -55,14 +73,17 @@ impl ProfileArchiveContent {
5573
pub fn encoding(&self) -> Option<String> {
5674
match self {
5775
ProfileArchiveContent::CompressedInMemory { .. } => Some("gzip".to_string()),
76+
ProfileArchiveContent::CompressedOnDisk { .. } => Some("gzip".to_string()),
5877
_ => None,
5978
}
6079
}
6180
}
6281

6382
impl Drop for ProfileArchiveContent {
6483
fn drop(&mut self) {
65-
if let ProfileArchiveContent::UncompressedOnDisk { path } = self {
84+
if let ProfileArchiveContent::UncompressedOnDisk { path }
85+
| ProfileArchiveContent::CompressedOnDisk { path } = self
86+
{
6687
if path.exists() {
6788
let _ = std::fs::remove_file(path);
6889
}

src/run/uploader/upload.rs

Lines changed: 73 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,39 @@ use tokio_tar::Builder;
1919
use super::interfaces::{UploadData, UploadMetadata};
2020
use super::profile_archive::ProfileArchive;
2121

22+
fn bytes_to_mib(bytes: u64) -> u64 {
23+
bytes / (1024 * 1024)
24+
}
25+
26+
/// Maximum uncompressed profile folder size in MiB before compression is required
27+
const MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES: u64 = 1024 * 1024 * 1024 * 5; // 5 GiB
28+
29+
/// Calculate the total size of a directory in bytes
30+
async fn calculate_folder_size(path: &std::path::Path) -> Result<u64> {
31+
let mut total_size = 0u64;
32+
let mut dirs_to_process = vec![path.to_path_buf()];
33+
34+
while let Some(current_dir) = dirs_to_process.pop() {
35+
let mut entries = tokio::fs::read_dir(&current_dir).await?;
36+
37+
while let Some(entry) = entries.next_entry().await? {
38+
let metadata = entry.metadata().await?;
39+
if metadata.is_file() {
40+
total_size += metadata.len();
41+
} else if metadata.is_dir() {
42+
dirs_to_process.push(entry.path());
43+
}
44+
}
45+
}
46+
47+
Ok(total_size)
48+
}
49+
2250
/// Create a profile archive from the profile folder and return its md5 hash encoded in base64
2351
///
2452
/// For Valgrind, we create a gzip-compressed tar archive of the entire profile folder.
25-
/// For WallTime, we create an uncompressed tar archive of the entire profile folder.
53+
/// For WallTime, we check the folder size and create either a compressed or uncompressed tar archive
54+
/// based on the MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES threshold.
2655
async fn create_profile_archive(
2756
run_data: &RunData,
2857
executor_name: ExecutorName,
@@ -41,23 +70,47 @@ async fn create_profile_archive(
4170
ProfileArchive::new_compressed_in_memory(data)
4271
}
4372
ExecutorName::WallTime => {
44-
debug!("Creating uncompressed tar archive for WallTime on disk");
73+
// Check folder size to decide on compression
74+
let folder_size_bytes = calculate_folder_size(&run_data.profile_folder).await?;
75+
let should_compress = folder_size_bytes >= MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES;
76+
4577
let temp_file = tempfile::NamedTempFile::new()?;
4678
let temp_path = temp_file.path().to_path_buf();
4779

4880
// Create a tokio File handle to the temporary file
4981
let file = File::create(&temp_path).await?;
50-
{
82+
83+
// Persist the temporary file to prevent deletion when temp_file goes out of scope
84+
let persistent_path = temp_file.into_temp_path().keep()?;
85+
86+
if should_compress {
87+
debug!(
88+
"Profile folder size ({} MiB) exceeds threshold ({} MiB), creating compressed tar.gz archive on disk",
89+
bytes_to_mib(folder_size_bytes),
90+
bytes_to_mib(MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES)
91+
);
92+
let enc = GzipEncoder::new(file);
93+
let mut tar = Builder::new(enc);
94+
tar.append_dir_all(".", run_data.profile_folder.clone())
95+
.await?;
96+
let mut gzip_encoder = tar.into_inner().await?;
97+
gzip_encoder.shutdown().await?;
98+
gzip_encoder.into_inner().sync_all().await?;
99+
100+
ProfileArchive::new_compressed_on_disk(persistent_path)?
101+
} else {
102+
debug!(
103+
"Profile folder size ({} MiB) is below threshold ({} MiB), creating uncompressed tar archive on disk",
104+
bytes_to_mib(folder_size_bytes),
105+
bytes_to_mib(MAX_UNCOMPRESSED_PROFILE_SIZE_BYTES)
106+
);
51107
let mut tar = Builder::new(file);
52108
tar.append_dir_all(".", run_data.profile_folder.clone())
53109
.await?;
54110
tar.into_inner().await?.sync_all().await?;
55-
}
56-
57-
// Persist the temporary file to prevent deletion when temp_file goes out of scope
58-
let persistent_path = temp_file.into_temp_path().keep()?;
59111

60-
ProfileArchive::new_uncompressed_on_disk(persistent_path)?
112+
ProfileArchive::new_uncompressed_on_disk(persistent_path)?
113+
}
61114
}
62115
};
63116

@@ -130,36 +183,40 @@ async fn upload_profile_archive(
130183
let archive_hash = profile_archive.hash;
131184

132185
let response = match &profile_archive.content {
133-
ProfileArchiveContent::CompressedInMemory { data } => {
186+
content @ ProfileArchiveContent::CompressedInMemory { data } => {
134187
// Use regular client with retry middleware for compressed data
135188
let mut request = REQUEST_CLIENT
136189
.put(upload_data.upload_url.clone())
137190
.header("Content-Type", "application/x-tar")
138191
.header("Content-Length", archive_size)
139192
.header("Content-MD5", archive_hash);
140193

141-
if let Some(encoding) = profile_archive.content.encoding() {
194+
if let Some(encoding) = content.encoding() {
142195
request = request.header("Content-Encoding", encoding);
143196
}
144197

145198
request.body(data.clone()).send().await?
146199
}
147-
ProfileArchiveContent::UncompressedOnDisk { path } => {
200+
content @ ProfileArchiveContent::UncompressedOnDisk { path }
201+
| content @ ProfileArchiveContent::CompressedOnDisk { path } => {
148202
// Use streaming client without retry middleware for file streams
149203
let file = File::open(path)
150204
.await
151205
.context(format!("Failed to open file at path: {}", path.display()))?;
152206
let stream = tokio_util::io::ReaderStream::new(file);
153207
let body = reqwest::Body::wrap_stream(stream);
154208

155-
STREAMING_CLIENT
209+
let mut request = STREAMING_CLIENT
156210
.put(upload_data.upload_url.clone())
157211
.header("Content-Type", "application/x-tar")
158212
.header("Content-Length", archive_size)
159-
.header("Content-MD5", archive_hash)
160-
.body(body)
161-
.send()
162-
.await?
213+
.header("Content-MD5", archive_hash);
214+
215+
if let Some(encoding) = content.encoding() {
216+
request = request.header("Content-Encoding", encoding);
217+
}
218+
219+
request.body(body).send().await?
163220
}
164221
};
165222

0 commit comments

Comments
 (0)