Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 3a13e94

Browse files
authored
introduce a simple filecounter based on upload requests (#351)
* introduce a simple file counter based on upload requests that can be reused in storage config using `${upload_count}`
1 parent 5c4815f commit 3a13e94

1 file changed

Lines changed: 223 additions & 3 deletions

File tree

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 223 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ async fn request_upload(
9393
let storage_config = task.storage_config;
9494

9595
let mut file_name = request_upload.file_name.to_string();
96+
let mut group_id = None;
97+
9698
if let Some(storage_config) = storage_config {
9799
if let Some(file_name_template) = storage_config.file_name_template {
98100
file_name = generate_file_name(&file_name_template, &request_upload.file_name);
@@ -113,6 +115,7 @@ async fn request_upload(
113115

114116
match plugin.get_node_group(address) {
115117
Ok(Some(group)) => {
118+
group_id = Some(group.id.clone());
116119
file_name = file_name.replace("${node_group_id}", &group.id);
117120
file_name =
118121
file_name.replace("${node_group_size}", &group.nodes.len().to_string());
@@ -133,6 +136,50 @@ async fn request_upload(
133136
}
134137
}
135138

139+
// Create a unique key for this file upload based on address, group_id, and file name
140+
let upload_key = match &group_id {
141+
Some(gid) => format!("upload:{}:{}:{}", address, gid, &request_upload.file_name),
142+
None => format!(
143+
"upload:{}:{}:{}",
144+
address, "no-group", &request_upload.file_name
145+
),
146+
};
147+
let upload_exists: RedisResult<Option<String>> = redis_con.get(&upload_key);
148+
if let Ok(None) = upload_exists {
149+
if let Err(e) = redis_con.set::<_, _, ()>(&upload_key, "pending") {
150+
log::error!("Failed to set upload status in Redis: {}", e);
151+
}
152+
}
153+
let pattern = match &group_id {
154+
Some(gid) => format!("upload:{}:{}:*", address, gid),
155+
None => format!("upload:{}:no-group:*", address),
156+
};
157+
158+
let total_uploads: RedisResult<Vec<String>> = {
159+
let mut keys = Vec::new();
160+
match redis_con.scan_match(&pattern) {
161+
Ok(iter) => {
162+
for key in iter {
163+
keys.push(key);
164+
}
165+
Ok(keys)
166+
}
167+
Err(e) => Err(e),
168+
}
169+
};
170+
171+
let upload_count = match total_uploads {
172+
Ok(keys) => keys.len(),
173+
Err(e) => {
174+
log::error!("Failed to count uploads: {}", e);
175+
0
176+
}
177+
};
178+
179+
if file_name.contains("${upload_count}") {
180+
file_name = file_name.replace("${upload_count}", &upload_count.to_string());
181+
}
182+
136183
let file_size = &request_upload.file_size;
137184
let file_type = &request_upload.file_type;
138185
let sha256 = &request_upload.sha256;
@@ -416,7 +463,7 @@ mod tests {
416463
name: "test-task".to_string(),
417464
storage_config: Some(StorageConfig {
418465
file_name_template: Some(
419-
"model_xyz/dataset_1/${node_group_id}-${node_group_size}-${node_group_index}.parquet".to_string(),
466+
"model_xyz/dataset_1/${node_group_id}-${node_group_size}-${node_group_index}-${upload_count}.parquet".to_string(),
420467
),
421468
}),
422469
..Default::default()
@@ -438,6 +485,35 @@ mod tests {
438485
))
439486
.await;
440487

488+
// First request with test.parquet
489+
let req = test::TestRequest::post()
490+
.uri("/storage/request-upload")
491+
.insert_header(("x-address", node.address.to_string()))
492+
.set_json(&RequestUploadRequest {
493+
file_name: "test.parquet".to_string(),
494+
file_size: 1024,
495+
file_type: "application/octet-stream".to_string(),
496+
sha256: "test_sha256".to_string(),
497+
task_id: task.id.to_string(),
498+
})
499+
.to_request();
500+
501+
let resp = test::call_service(&app, req).await;
502+
let body = test::read_body(resp).await;
503+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
504+
assert_eq!(json["success"], serde_json::Value::Bool(true));
505+
assert_eq!(
506+
json["file_name"],
507+
serde_json::Value::String(format!(
508+
"model_xyz/dataset_1/{}-{}-{}-{}.parquet",
509+
group.id,
510+
group.nodes.len(),
511+
0,
512+
1
513+
))
514+
);
515+
516+
// Second request with same file name - should not increment count
441517
let req = test::TestRequest::post()
442518
.uri("/storage/request-upload")
443519
.insert_header(("x-address", node.address.to_string()))
@@ -457,11 +533,155 @@ mod tests {
457533
assert_eq!(
458534
json["file_name"],
459535
serde_json::Value::String(format!(
460-
"model_xyz/dataset_1/{}-{}-{}.parquet",
536+
"model_xyz/dataset_1/{}-{}-{}-{}.parquet",
537+
group.id,
538+
group.nodes.len(),
539+
0,
540+
1
541+
))
542+
);
543+
544+
// Third request with different file name - should increment count
545+
let req = test::TestRequest::post()
546+
.uri("/storage/request-upload")
547+
.insert_header(("x-address", node.address.to_string()))
548+
.set_json(&RequestUploadRequest {
549+
file_name: "test2.parquet".to_string(),
550+
file_size: 1024,
551+
file_type: "application/octet-stream".to_string(),
552+
sha256: "test_sha256_2".to_string(),
553+
task_id: task.id.to_string(),
554+
})
555+
.to_request();
556+
557+
let resp = test::call_service(&app, req).await;
558+
let body = test::read_body(resp).await;
559+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
560+
assert_eq!(json["success"], serde_json::Value::Bool(true));
561+
assert_eq!(
562+
json["file_name"],
563+
serde_json::Value::String(format!(
564+
"model_xyz/dataset_1/{}-{}-{}-{}.parquet",
461565
group.id,
462566
group.nodes.len(),
463-
0
567+
0,
568+
2
569+
))
570+
);
571+
}
572+
573+
#[actix_web::test]
574+
async fn test_upload_counter_without_node_group() {
575+
let app_state = create_test_app_state_with_nodegroups().await;
576+
577+
let node = OrchestratorNode {
578+
address: Address::ZERO,
579+
ip_address: "127.0.0.1".to_string(),
580+
port: 8080,
581+
p2p_id: Some("test_p2p_id".to_string()),
582+
status: NodeStatus::Healthy,
583+
..Default::default()
584+
};
585+
586+
app_state.store_context.node_store.add_node(node.clone());
587+
588+
let node_from_store = app_state
589+
.store_context
590+
.node_store
591+
.get_node(&node.address)
592+
.unwrap();
593+
assert_eq!(node_from_store.address, node.address);
594+
595+
let task = Task {
596+
id: Uuid::new_v4(),
597+
image: "test-image".to_string(),
598+
name: "test-task".to_string(),
599+
storage_config: Some(StorageConfig {
600+
file_name_template: Some("model_xyz/dataset_1/${upload_count}.parquet".to_string()),
601+
}),
602+
..Default::default()
603+
};
604+
605+
let task_store = app_state.store_context.task_store.clone();
606+
task_store.add_task(task.clone());
607+
608+
assert!(task_store.get_task(&task.id.to_string()).is_some());
609+
610+
if app_state.s3_credentials.is_none() {
611+
println!("S3 credentials not configured");
612+
return;
613+
}
614+
615+
let app =
616+
test::init_service(App::new().app_data(app_state.clone()).service(
617+
web::scope("/storage").route("/request-upload", post().to(request_upload)),
464618
))
619+
.await;
620+
621+
// First request with test.parquet
622+
let req = test::TestRequest::post()
623+
.uri("/storage/request-upload")
624+
.insert_header(("x-address", node.address.to_string()))
625+
.set_json(&RequestUploadRequest {
626+
file_name: "test.parquet".to_string(),
627+
file_size: 1024,
628+
file_type: "application/octet-stream".to_string(),
629+
sha256: "test_sha256".to_string(),
630+
task_id: task.id.to_string(),
631+
})
632+
.to_request();
633+
634+
let resp = test::call_service(&app, req).await;
635+
let body = test::read_body(resp).await;
636+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
637+
assert_eq!(json["success"], serde_json::Value::Bool(true));
638+
assert_eq!(
639+
json["file_name"],
640+
serde_json::Value::String("model_xyz/dataset_1/1.parquet".to_string())
641+
);
642+
643+
// Second request with same file name - should not increment count
644+
let req = test::TestRequest::post()
645+
.uri("/storage/request-upload")
646+
.insert_header(("x-address", node.address.to_string()))
647+
.set_json(&RequestUploadRequest {
648+
file_name: "test.parquet".to_string(),
649+
file_size: 1024,
650+
file_type: "application/octet-stream".to_string(),
651+
sha256: "test_sha256".to_string(),
652+
task_id: task.id.to_string(),
653+
})
654+
.to_request();
655+
656+
let resp = test::call_service(&app, req).await;
657+
let body = test::read_body(resp).await;
658+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
659+
assert_eq!(json["success"], serde_json::Value::Bool(true));
660+
assert_eq!(
661+
json["file_name"],
662+
serde_json::Value::String("model_xyz/dataset_1/1.parquet".to_string())
663+
);
664+
665+
// Third request with different file name - should increment count
666+
let req = test::TestRequest::post()
667+
.uri("/storage/request-upload")
668+
.insert_header(("x-address", node.address.to_string()))
669+
.set_json(&RequestUploadRequest {
670+
file_name: "test2.parquet".to_string(),
671+
file_size: 1024,
672+
file_type: "application/octet-stream".to_string(),
673+
sha256: "test_sha256_2".to_string(),
674+
task_id: task.id.to_string(),
675+
})
676+
.to_request();
677+
678+
let resp = test::call_service(&app, req).await;
679+
let body = test::read_body(resp).await;
680+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
681+
assert_eq!(json["success"], serde_json::Value::Bool(true));
682+
assert_eq!(
683+
json["file_name"],
684+
serde_json::Value::String("model_xyz/dataset_1/2.parquet".to_string())
465685
);
466686
}
467687
}

0 commit comments

Comments
 (0)