Skip to content

Commit ed2e97e

Browse files
author
Nathan Monfils
committed
Added retry mechanism for file uploads
Prevents zip files from being lost if the upload failed for whichever reason. Signed-off-by: Nathan Monfils <nathan.monfils@destiny.eu>
1 parent 8baeab0 commit ed2e97e

1 file changed

Lines changed: 63 additions & 48 deletions

File tree

core-dump-agent/src/main.rs

Lines changed: 63 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,15 @@ async fn main() -> Result<(), anyhow::Error> {
9191
};
9292
let p = Path::new(&file);
9393
info!("Uploading {}", file);
94-
process_file(p, &bucket).await;
94+
match process_file(p, &bucket).await {
95+
Ok(()) => (),
96+
Err(e) => {
97+
error!("File processing failed: {e}");
98+
}
99+
};
95100
} else {
96101
info!("Uploading all content in {}", core_dir_command);
97-
run_polling_agent().await;
102+
run_polling_agent(false).await;
98103
}
99104
process::exit(0);
100105
}
@@ -149,7 +154,7 @@ async fn main() -> Result<(), anyhow::Error> {
149154
std::thread::sleep(Duration::from_millis(1000));
150155
}
151156
} else {
152-
run_polling_agent().await;
157+
run_polling_agent(use_inotify == "true").await;
153158
}
154159

155160
if !interval.is_empty() && !schedule.is_empty() {
@@ -190,7 +195,7 @@ async fn main() -> Result<(), anyhow::Error> {
190195
match next_tick {
191196
Ok(Some(ts)) => {
192197
info!("Next scheduled run {:?}", ts);
193-
run_polling_agent().await;
198+
run_polling_agent(false).await;
194199
}
195200
_ => warn!("Could not get next tick for job"),
196201
}
@@ -255,22 +260,14 @@ async fn main() -> Result<(), anyhow::Error> {
255260
if event.mask.contains(EventMask::ISDIR) {
256261
warn!("Unknown Directory created: {:?}", event.name);
257262
} else {
258-
let bucket = match get_bucket() {
259-
Ok(v) => v,
260-
Err(e) => {
261-
error!("Bucket creation failed in event: {}", e);
262-
continue;
263-
}
264-
};
265263
match event.name {
266264
Some(s) => {
267265
let file = format!(
268266
"{}/{}",
269267
core_dir_command,
270268
s.to_str().unwrap_or_default()
271269
);
272-
let p = Path::new(&file);
273-
process_file(p, &bucket).await
270+
tokio::spawn(process_file_or_retry(PathBuf::from(file), 0));
274271
}
275272
None => {
276273
continue;
@@ -287,7 +284,32 @@ async fn main() -> Result<(), anyhow::Error> {
287284
Ok(())
288285
}
289286

290-
async fn process_file(zip_path: &Path, bucket: &Bucket) {
287+
async fn process_file_or_retry(file: PathBuf, iteration: usize) {
288+
let bucket = match get_bucket() {
289+
Ok(v) => v,
290+
Err(e) => {
291+
error!("Bucket creation failed in event: {}", e);
292+
return;
293+
}
294+
};
295+
296+
match process_file(&file, &bucket).await {
297+
Ok(()) => (),
298+
Err(e) => {
299+
let backoff = Duration::from_secs(60).mul_f32((iteration as f32 + 1.0).powf(1.5));
300+
301+
error!(
302+
"Core dump file processing failed: {e}. Retrying in {} s.",
303+
backoff.as_secs()
304+
);
305+
tokio::time::sleep(backoff).await;
306+
307+
Box::pin(process_file_or_retry(file, iteration + 1)).await;
308+
}
309+
}
310+
}
311+
312+
async fn process_file(zip_path: &Path, bucket: &Bucket) -> Result<(), String> {
291313
info!("Uploading: {}", zip_path.display());
292314

293315
let f = File::open(zip_path).expect("no file found");
@@ -303,53 +325,37 @@ async fn process_file(zip_path: &Path, bucket: &Bucket) {
303325
} else {
304326
error!("File locked on INotify shouldn't happen as we are waiting for file close events.\nPlease recycling pod to perform sweep\n{}", e);
305327
}
306-
return;
328+
return Err("File locked".into());
307329
}
308330
}
309331

310-
let metadata = fs::metadata(zip_path).expect("unable to read metadata");
332+
let metadata = fs::metadata(zip_path).map_err(|e| format!("unable to read metadata: {e}"))?;
311333
info!("zip size is {}", metadata.len());
312-
let path_str = match zip_path.to_str() {
313-
Some(v) => v,
314-
None => {
315-
error!("Failed to extract path");
316-
return;
317-
}
318-
};
319-
let upload_file_name: &str = match zip_path.file_name().unwrap().to_str() {
320-
Some(v) => v,
321-
None => {
322-
error!("Failed to get file name for upload");
323-
return;
324-
}
325-
};
334+
let path_str = zip_path.to_str().ok_or("Failed to extract path")?;
335+
let upload_file_name: &str = zip_path
336+
.file_name()
337+
.ok_or("Failed to get file name for upload")?
338+
.to_str()
339+
.ok_or("Invalid encoding of file name for upload")?;
326340

327341
let mut fasync = tokio::fs::File::open(zip_path)
328342
.await
329-
.expect("file was removed");
343+
.map_err(|e| format!("file became unavailable while processing: {e}"))?;
330344

331-
let code = match bucket
345+
let code = bucket
332346
.put_object_stream(&mut fasync, upload_file_name)
333347
.await
334-
{
335-
Ok(v) => v,
336-
Err(e) => {
337-
error!("Upload Failed {}", e);
338-
return;
339-
}
340-
};
341-
match fs::remove_file(path_str) {
342-
Ok(v) => v,
343-
Err(e) => {
344-
error!("File delete failed: {}", e);
345-
return;
346-
}
347-
};
348+
.map_err(|e| format!("Upload Failed: {e:?}"))?;
349+
350+
fs::remove_file(path_str).map_err(|e| format!("File delete failed: {}", e))?;
351+
348352
info!(
349353
"S3 Returned: status_code: {} uploaded_bytes: {}",
350354
code.status_code(),
351355
code.uploaded_bytes()
352356
);
357+
358+
Ok(())
353359
}
354360

355361
fn get_bucket() -> Result<Box<Bucket>, anyhow::Error> {
@@ -395,7 +401,7 @@ fn get_bucket() -> Result<Box<Bucket>, anyhow::Error> {
395401
Ok(Bucket::new(&s3.bucket, s3.region, s3.credentials)?.with_path_style())
396402
}
397403

398-
async fn run_polling_agent() {
404+
async fn run_polling_agent(retry: bool) {
399405
let core_location = env::var("CORE_DIR").unwrap_or_else(|_| DEFAULT_CORE_DIR.to_string());
400406
info!("Executing Agent with location : {}", core_location);
401407

@@ -418,7 +424,16 @@ async fn run_polling_agent() {
418424

419425
info!("Dir Content {:?}", paths);
420426
for zip_path in paths {
421-
process_file(&zip_path, &bucket).await;
427+
if retry {
428+
process_file_or_retry(zip_path, 0).await;
429+
} else {
430+
match process_file(&zip_path, &bucket).await {
431+
Ok(()) => (),
432+
Err(e) => {
433+
error!("File processing failed: {e}");
434+
}
435+
};
436+
}
422437
}
423438
}
424439

0 commit comments

Comments
 (0)