Skip to content

Commit 3cb9f9b

Browse files
committed
corrected large file issue
1 parent 0f0f92c commit 3cb9f9b

15 files changed

Lines changed: 1140 additions & 74 deletions

File tree

crates/fula-blockstore/src/ipfs.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,15 @@ impl IpfsBlockStore {
158158
}
159159

160160
/// Put a raw block
161+
/// Note: Raw blocks (file chunks) are NOT pinned inline to avoid timeouts.
162+
/// They are protected by the bucket root's recursive pin after flush_forest.
161163
#[instrument(skip(self, data), fields(size = data.len()))]
162164
pub async fn put_block_raw(&self, data: &[u8]) -> Result<Cid> {
165+
// Don't pin raw blocks inline - even small blocks can cause timeouts under load.
166+
// All blocks are protected by recursive pinning of the bucket root CID at flush time.
163167
let url = format!(
164168
"{}/api/v0/block/put?cid-codec=raw&mhtype=blake3",
165-
self.config.api_url
169+
self.config.api_url,
166170
);
167171

168172
let part = multipart::Part::bytes(data.to_vec())
@@ -267,6 +271,8 @@ impl BlockStore for IpfsBlockStore {
267271

268272
tracing::debug!(bytes_len = bytes.len(), "Serialized IPLD data to CBOR");
269273

274+
// Don't pin inline - it delays the response body and causes timeouts.
275+
// All data is protected by bucket root recursive pin at flush_forest time.
270276
let url = format!(
271277
"{}/api/v0/dag/put?store-codec=dag-cbor&input-codec=dag-cbor",
272278
self.config.api_url
@@ -294,10 +300,14 @@ impl BlockStore for IpfsBlockStore {
294300
)));
295301
}
296302

297-
let result: DagPutResponse = response
298-
.json()
299-
.await
300-
.map_err(|e| BlockStoreError::IpfsApi(e.to_string()))?;
303+
// Read response body as text first for debugging
304+
let body_text = response.text().await
305+
.map_err(|e| BlockStoreError::IpfsApi(format!("Failed to read response body: {}", e)))?;
306+
307+
tracing::debug!(body = %body_text, "IPFS dag/put response body");
308+
309+
let result: DagPutResponse = serde_json::from_str(&body_text)
310+
.map_err(|e| BlockStoreError::IpfsApi(format!("Failed to parse response: {} - body: {}", e, body_text)))?;
301311

302312
tracing::debug!(cid = %result.cid.root_cid, "DAG stored successfully");
303313

crates/fula-blockstore/src/ipfs_pinning.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,14 @@ impl BlockStore for IpfsPinningBlockStore {
242242
self.cache.insert(cid, Bytes::copy_from_slice(data));
243243
}
244244

245-
// Pin for persistence
246-
if let Err(e) = self.pin_cid(&cid, None).await {
247-
warn!(cid = %cid, error = %e, "Failed to pin CID, data may not be persistent");
248-
}
245+
// NOTE: We deliberately DO NOT pin individual blocks here.
246+
// Instead, the root CID should be pinned at upload completion.
247+
// IPFS recursive pinning will automatically pin all referenced blocks.
248+
// This optimization reduces N pin requests to 1 for chunked uploads.
249+
//
250+
// Pinning should happen at:
251+
// - put_object() completion for single objects
252+
// - complete_multipart_upload() for multipart uploads
249253

250254
Ok(cid)
251255
}
@@ -302,10 +306,8 @@ impl BlockStore for IpfsPinningBlockStore {
302306
async fn put_ipld<T: serde::Serialize + Send + Sync>(&self, data: &T) -> Result<Cid> {
303307
let cid = self.ipfs.put_ipld(data).await?;
304308

305-
// Pin for persistence
306-
if let Err(e) = self.pin_cid(&cid, None).await {
307-
warn!(cid = %cid, error = %e, "Failed to pin IPLD, data may not be persistent");
308-
}
309+
// NOTE: We deliberately DO NOT pin IPLD nodes inline.
310+
// Pin the root CID at upload completion for recursive pinning.
309311

310312
Ok(cid)
311313
}
@@ -491,6 +493,47 @@ impl BlockStore for FlexibleBlockStore {
491493
}
492494
}
493495

496+
#[async_trait]
497+
impl PinStore for FlexibleBlockStore {
498+
async fn pin(&self, cid: &Cid, name: Option<&str>) -> Result<()> {
499+
match self {
500+
Self::IpfsPinning(store) => store.pin(cid, name).await,
501+
Self::Memory(_) => {
502+
// Memory store doesn't need pinning, just succeed silently
503+
Ok(())
504+
}
505+
}
506+
}
507+
508+
async fn unpin(&self, cid: &Cid) -> Result<()> {
509+
match self {
510+
Self::IpfsPinning(store) => store.unpin(cid).await,
511+
Self::Memory(_) => Ok(()),
512+
}
513+
}
514+
515+
async fn is_pinned(&self, cid: &Cid) -> Result<bool> {
516+
match self {
517+
Self::IpfsPinning(store) => store.is_pinned(cid).await,
518+
Self::Memory(_) => Ok(true), // Memory store "pins" everything
519+
}
520+
}
521+
522+
async fn list_pins(&self) -> Result<Vec<Cid>> {
523+
match self {
524+
Self::IpfsPinning(store) => store.list_pins().await,
525+
Self::Memory(_) => Ok(Vec::new()),
526+
}
527+
}
528+
529+
async fn pin_status(&self, cid: &Cid) -> Result<crate::PinStatus> {
530+
match self {
531+
Self::IpfsPinning(store) => store.pin_status(cid).await,
532+
Self::Memory(_) => Ok(crate::PinStatus::Pinned), // Memory store "pins" everything
533+
}
534+
}
535+
}
536+
494537
#[cfg(test)]
495538
mod tests {
496539
use super::*;

crates/fula-cli/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ impl IntoResponse for ApiError {
189189
ApiError::S3Error { request_id, .. } => request_id.clone(),
190190
_ => uuid::Uuid::new_v4().to_string(),
191191
};
192+
193+
// Log the actual error for debugging
194+
tracing::error!(
195+
error_code = %code.as_str(),
196+
status = %status,
197+
error = %self,
198+
"API error response"
199+
);
192200

193201
let xml = format!(
194202
r#"<?xml version="1.0" encoding="UTF-8"?>

crates/fula-cli/src/handlers/multipart.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Multipart upload handlers
22
33
use crate::{AppState, ApiError, S3ErrorCode};
4+
use crate::pinning::pin_for_user;
45
use crate::state::UserSession;
56
use crate::multipart::UploadPart;
67
use crate::xml;
@@ -10,7 +11,7 @@ use axum::{
1011
response::{IntoResponse, Response},
1112
};
1213
use bytes::Bytes;
13-
use fula_blockstore::BlockStore;
14+
use fula_blockstore::{BlockStore, PinStore};
1415
use fula_core::metadata::ObjectMetadata;
1516
use fula_crypto::hashing::md5_hash;
1617
use serde::Deserialize;
@@ -139,6 +140,7 @@ pub async fn complete_multipart_upload(
139140
Extension(session): Extension<UserSession>,
140141
Path((bucket, key)): Path<(String, String)>,
141142
Query(params): Query<MultipartParams>,
143+
headers: HeaderMap,
142144
_body: Bytes,
143145
) -> Result<Response, ApiError> {
144146
if !session.can_write() {
@@ -167,10 +169,15 @@ pub async fn complete_multipart_upload(
167169
// Calculate total size
168170
let total_size: u64 = upload.parts.values().map(|p| p.size).sum();
169171

172+
// Collect all part CIDs for pinning
173+
let part_cids: Vec<cid::Cid> = upload.parts.values()
174+
.filter_map(|p| p.cid.parse().ok())
175+
.collect();
176+
170177
// Create the final object metadata
171178
// In a real implementation, we'd create a DAG linking all parts
172-
let first_part_cid: cid::Cid = upload.parts.values().next()
173-
.map(|p| p.cid.parse().unwrap())
179+
let first_part_cid: cid::Cid = part_cids.first()
180+
.copied()
174181
.ok_or_else(|| ApiError::s3(S3ErrorCode::InvalidPart, "No parts uploaded"))?;
175182

176183
let mut metadata = ObjectMetadata::new(first_part_cid, total_size, final_etag.clone())
@@ -187,7 +194,27 @@ pub async fn complete_multipart_upload(
187194
// Store in bucket
188195
let mut bucket_handle = state.bucket_manager.open_bucket(&bucket).await?;
189196
bucket_handle.put_object(key.clone(), metadata).await?;
190-
bucket_handle.flush().await?;
197+
let bucket_root_cid = bucket_handle.flush().await?;
198+
199+
// Pin the BUCKET ROOT CID to ensure tree structure survives GC.
200+
// This recursively pins all tree nodes AND all referenced object data (including parts).
201+
// NOTE: Pinning is async (fire-and-forget) to avoid blocking the response.
202+
{
203+
let block_store = Arc::clone(&state.block_store);
204+
let pin_bucket = bucket.clone();
205+
tokio::spawn(async move {
206+
let pin_name = format!("bucket:{}", pin_bucket);
207+
if let Err(e) = block_store.pin(&bucket_root_cid, Some(&pin_name)).await {
208+
tracing::warn!(cid = %bucket_root_cid, error = %e, "Failed to pin bucket root CID");
209+
} else {
210+
tracing::info!(cid = %bucket_root_cid, bucket = %pin_name, "Bucket root CID pinned (recursive)");
211+
}
212+
});
213+
}
214+
215+
// Also pin to user's external pinning service if credentials provided
216+
// Pin the first part CID as the representative (or all parts)
217+
pin_for_user(&headers, &first_part_cid, Some(&key)).await;
191218

192219
let location = format!("/{}/{}", bucket, key);
193220
let xml_response = xml::complete_multipart_upload_result(

crates/fula-cli/src/handlers/object.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use axum::{
1111
response::{IntoResponse, Response},
1212
};
1313
use bytes::Bytes;
14-
use fula_blockstore::BlockStore;
14+
use fula_blockstore::{BlockStore, PinStore};
1515
use fula_core::metadata::ObjectMetadata;
1616
use fula_crypto::hashing::md5_hash;
1717
use serde::Deserialize;
@@ -91,13 +91,28 @@ pub async fn put_object(
9191
})?;
9292

9393
tracing::debug!("Flushing bucket");
94-
bucket.flush().await
94+
let bucket_root_cid = bucket.flush().await
9595
.map_err(|e| {
9696
tracing::error!(error = %e, "Failed to flush bucket");
9797
e
9898
})?;
9999

100-
// Pin to user's pinning service if credentials provided
100+
// Pin the BUCKET ROOT CID to ensure tree structure survives GC.
101+
// This recursively pins all tree nodes AND all referenced object data.
102+
// NOTE: Pinning is async (fire-and-forget) to avoid blocking the response.
103+
{
104+
let block_store = Arc::clone(&state.block_store);
105+
let pin_name = format!("bucket:{}", bucket_name);
106+
tokio::spawn(async move {
107+
if let Err(e) = block_store.pin(&bucket_root_cid, Some(&pin_name)).await {
108+
tracing::warn!(cid = %bucket_root_cid, error = %e, "Failed to pin bucket root CID");
109+
} else {
110+
tracing::info!(cid = %bucket_root_cid, bucket = %pin_name, "Bucket root CID pinned (recursive)");
111+
}
112+
});
113+
}
114+
115+
// Also pin to user's external pinning service if credentials provided
101116
// Headers: X-Pinning-Service, X-Pinning-Token
102117
pin_for_user(&headers, &cid, Some(&key)).await;
103118

crates/fula-cli/src/routes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async fn object_post_handler(
220220
if query.uploads.is_some() {
221221
handlers::create_multipart_upload(state, session, path, headers).await
222222
} else if query.upload_id.is_some() {
223-
handlers::complete_multipart_upload(state, session, path, query, body).await
223+
handlers::complete_multipart_upload(state, session, path, query, headers, body).await
224224
} else {
225225
Err(crate::ApiError::s3(
226226
crate::S3ErrorCode::InvalidRequest,

0 commit comments

Comments
 (0)