Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit d36d2a2

Browse files
authored
Toploc Integration for groups & Core storage rewrite (#353)
* implement group validation calls for toploc * large scale storage rewrite, introduce mock storage provider for better testing * update deployment, load toploc configs from env * setup toploc group e2e tests
1 parent 1fc9f8a commit d36d2a2

18 files changed

Lines changed: 2372 additions & 985 deletions

File tree

Cargo.lock

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ watch-check:
8787

8888
watch-validator:
8989
set -a; source ${ENV_FILE}; set +a; \
90-
cargo watch -w crates/validator/src -x "run --bin validator -- --validator-key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL} --pool-id $${WORKER_COMPUTE_POOL_ID} --toploc-configs '$${TOPLOC_CONFIGS:-[]}' --bucket-name $${BUCKET_NAME} -l $${LOG_LEVEL:-info} --toploc-grace-interval $${TOPLOC_GRACE_INTERVAL:-30}"
90+
cargo watch -w crates/validator/src -x "run --bin validator -- --validator-key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL} --pool-id $${WORKER_COMPUTE_POOL_ID} --bucket-name $${BUCKET_NAME} -l $${LOG_LEVEL:-info} --toploc-grace-interval $${TOPLOC_GRACE_INTERVAL:-30}"
9191

9292
watch-orchestrator:
9393
set -a; source ${ENV_FILE}; set +a; \

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 15 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ use actix_web::{
44
HttpRequest, HttpResponse, Scope,
55
};
66
use redis::{Commands, RedisResult};
7-
use shared::{
8-
models::storage::RequestUploadRequest,
9-
utils::google_cloud::{generate_mapping_file, generate_upload_signed_url},
10-
};
7+
use shared::models::storage::RequestUploadRequest;
118
use std::time::Duration;
129

1310
const MAX_FILE_SIZE: u64 = 100 * 1024 * 1024;
@@ -192,31 +189,16 @@ async fn request_upload(
192189
sha256
193190
);
194191

195-
// Get credentials from app state
196-
let credentials = match &app_state.s3_credentials {
197-
Some(creds) => creds,
198-
None => {
199-
return HttpResponse::InternalServerError().json(serde_json::json!({
200-
"success": false,
201-
"error": "Storage credentials not configured"
202-
}))
203-
}
204-
};
205-
206192
log::info!(
207-
"Generating mapping file for sha256: {} to file: {} in bucket: {}",
193+
"Generating mapping file for sha256: {} to file: {}",
208194
sha256,
209195
file_name,
210-
app_state.bucket_name.clone().unwrap_or_default()
211196
);
212197

213-
if let Err(e) = generate_mapping_file(
214-
app_state.bucket_name.clone().unwrap().as_str(),
215-
credentials,
216-
sha256,
217-
&file_name,
218-
)
219-
.await
198+
if let Err(e) = app_state
199+
.storage_provider
200+
.generate_mapping_file(sha256, &file_name)
201+
.await
220202
{
221203
log::error!("Failed to generate mapping file: {}", e);
222204
return HttpResponse::InternalServerError().json(serde_json::json!({
@@ -231,15 +213,15 @@ async fn request_upload(
231213
);
232214

233215
// Generate signed upload URL
234-
match generate_upload_signed_url(
235-
app_state.bucket_name.clone().unwrap().as_str(),
236-
&file_name,
237-
credentials,
238-
Some(file_type.to_string()),
239-
Duration::from_secs(3600), // 1 hour expiry
240-
Some(*file_size),
241-
)
242-
.await
216+
match app_state
217+
.storage_provider
218+
.generate_upload_signed_url(
219+
&file_name,
220+
Some(file_type.to_string()),
221+
Duration::from_secs(3600), // 1 hour expiry
222+
Some(*file_size),
223+
)
224+
.await
243225
{
244226
Ok(signed_url) => {
245227
// Increment rate limit counter after successful URL generation
@@ -347,11 +329,6 @@ mod tests {
347329

348330
assert!(task_store.get_task(&task.id.to_string()).is_some());
349331

350-
if app_state.s3_credentials.is_none() {
351-
println!("S3 credentials not configured");
352-
return;
353-
}
354-
355332
let app =
356333
test::init_service(App::new().app_data(app_state.clone()).service(
357334
web::scope("/storage").route("/request-upload", post().to(request_upload)),
@@ -384,11 +361,6 @@ mod tests {
384361
async fn test_request_upload_invalid_task() {
385362
let app_state = create_test_app_state().await;
386363

387-
if app_state.s3_credentials.is_none() {
388-
println!("S3 credentials not configured");
389-
return;
390-
}
391-
392364
let app =
393365
test::init_service(App::new().app_data(app_state.clone()).service(
394366
web::scope("/storage").route("/request-upload", post().to(request_upload)),
@@ -474,11 +446,6 @@ mod tests {
474446

475447
assert!(task_store.get_task(&task.id.to_string()).is_some());
476448

477-
if app_state.s3_credentials.is_none() {
478-
println!("S3 credentials not configured");
479-
return;
480-
}
481-
482449
let app =
483450
test::init_service(App::new().app_data(app_state.clone()).service(
484451
web::scope("/storage").route("/request-upload", post().to(request_upload)),
@@ -607,11 +574,6 @@ mod tests {
607574

608575
assert!(task_store.get_task(&task.id.to_string()).is_some());
609576

610-
if app_state.s3_credentials.is_none() {
611-
println!("S3 credentials not configured");
612-
return;
613-
}
614-
615577
let app =
616578
test::init_service(App::new().app_data(app_state.clone()).service(
617579
web::scope("/storage").route("/request-upload", post().to(request_upload)),

crates/orchestrator/src/api/server.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ use log::info;
1616
use serde_json::json;
1717
use shared::security::api_key_middleware::ApiKeyMiddleware;
1818
use shared::security::auth_signature_middleware::{ValidateSignature, ValidatorState};
19+
use shared::utils::StorageProvider;
1920
use shared::web3::contracts::core::builder::Contracts;
2021
use shared::web3::wallet::Wallet;
2122
use std::sync::Arc;
2223

2324
pub struct AppState {
2425
pub store_context: Arc<StoreContext>,
2526
pub wallet: Arc<Wallet>,
26-
pub s3_credentials: Option<String>,
27-
pub bucket_name: Option<String>,
27+
pub storage_provider: Arc<dyn StorageProvider>,
2828
pub heartbeats: Arc<LoopHeartbeats>,
2929
pub redis_store: Arc<RedisStore>,
3030
pub hourly_upload_limit: i64,
@@ -41,8 +41,7 @@ pub async fn start_server(
4141
store_context: Arc<StoreContext>,
4242
wallet: Arc<Wallet>,
4343
admin_api_key: String,
44-
s3_credentials: Option<String>,
45-
bucket_name: Option<String>,
44+
storage_provider: Arc<dyn StorageProvider>,
4645
heartbeats: Arc<LoopHeartbeats>,
4746
redis_store: Arc<RedisStore>,
4847
hourly_upload_limit: i64,
@@ -56,8 +55,7 @@ pub async fn start_server(
5655
let app_state = Data::new(AppState {
5756
store_context,
5857
wallet,
59-
s3_credentials,
60-
bucket_name,
58+
storage_provider,
6159
heartbeats,
6260
redis_store,
6361
hourly_upload_limit,

crates/orchestrator/src/api/tests/helper.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use url::Url;
1717

1818
#[cfg(test)]
1919
pub async fn create_test_app_state() -> Data<AppState> {
20+
use shared::utils::MockStorageProvider;
21+
2022
use crate::{scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats, ServerMode};
2123

2224
let store = Arc::new(RedisStore::new_test());
@@ -35,8 +37,9 @@ pub async fn create_test_app_state() -> Data<AppState> {
3537
let store_context = Arc::new(StoreContext::new(store.clone()));
3638
let mode = ServerMode::Full;
3739
let scheduler = Scheduler::new(store_context.clone(), vec![]);
38-
let s3_credentials = std::env::var("S3_CREDENTIALS").ok();
39-
let bucket_name = std::env::var("BUCKET_NAME").ok();
40+
41+
let mock_storage = MockStorageProvider::new();
42+
let storage_provider = Arc::new(mock_storage);
4043

4144
Data::new(AppState {
4245
store_context: store_context.clone(),
@@ -49,8 +52,7 @@ pub async fn create_test_app_state() -> Data<AppState> {
4952
)
5053
.unwrap(),
5154
),
52-
s3_credentials,
53-
bucket_name,
55+
storage_provider,
5456
heartbeats: Arc::new(LoopHeartbeats::new(&mode)),
5557
hourly_upload_limit: 12,
5658
redis_store: store.clone(),
@@ -61,6 +63,8 @@ pub async fn create_test_app_state() -> Data<AppState> {
6163

6264
#[cfg(test)]
6365
pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
66+
use shared::utils::MockStorageProvider;
67+
6468
use crate::{
6569
plugins::node_groups::{NodeGroupConfiguration, NodeGroupsPlugin},
6670
scheduler::Scheduler,
@@ -84,8 +88,6 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
8488
let store_context = Arc::new(StoreContext::new(store.clone()));
8589
let mode = ServerMode::Full;
8690
let scheduler = Scheduler::new(store_context.clone(), vec![]);
87-
let s3_credentials = std::env::var("S3_CREDENTIALS").ok();
88-
let bucket_name = std::env::var("BUCKET_NAME").ok();
8991

9092
let config = NodeGroupConfiguration {
9193
name: "test-config".to_string(),
@@ -100,6 +102,9 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
100102
store_context.clone(),
101103
)));
102104

105+
let mock_storage = MockStorageProvider::new();
106+
let storage_provider = Arc::new(mock_storage);
107+
103108
Data::new(AppState {
104109
store_context: store_context.clone(),
105110
contracts: None,
@@ -111,8 +116,7 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
111116
)
112117
.unwrap(),
113118
),
114-
s3_credentials,
115-
bucket_name,
119+
storage_provider,
116120
heartbeats: Arc::new(LoopHeartbeats::new(&mode)),
117121
hourly_upload_limit: 12,
118122
redis_store: store.clone(),

crates/orchestrator/src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use plugins::node_groups::NodeGroupsPlugin;
2828
use plugins::webhook::WebhookPlugin;
2929
use plugins::SchedulerPlugin;
3030
use plugins::StatusUpdatePlugin;
31+
use shared::utils::google_cloud::GcsStorageProvider;
3132
use shared::web3::contracts::core::builder::ContractBuilder;
3233
use shared::web3::contracts::structs::compute_pool::PoolStatus;
3334
use shared::web3::wallet::Wallet;
@@ -286,6 +287,12 @@ async fn main() -> Result<()> {
286287
let server_store_context = store_context.clone();
287288

288289
let s3_credentials = std::env::var("S3_CREDENTIALS").ok();
290+
291+
let gcs_storage = GcsStorageProvider::new(&args.bucket_name.unwrap(), &s3_credentials.unwrap())
292+
.await
293+
.unwrap();
294+
let storage_provider = Arc::new(gcs_storage);
295+
289296
// Always start server regardless of mode
290297
tokio::select! {
291298
res = start_server(
@@ -294,8 +301,7 @@ async fn main() -> Result<()> {
294301
server_store_context.clone(),
295302
server_wallet,
296303
args.admin_api_key,
297-
s3_credentials,
298-
args.bucket_name,
304+
storage_provider,
299305
heartbeats.clone(),
300306
store.clone(),
301307
args.hourly_s3_upload_limit,

crates/shared/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ rand = "0.9.0"
3131
google-cloud-storage = "0.24.0"
3232
base64 = "0.22.1"
3333
chrono = { workspace = true, features = ["serde"] }
34+
async-trait = "0.1.88"

0 commit comments

Comments
 (0)