Skip to content

Commit aef6526

Browse files
committed
resolved race condition
1 parent cbf5fe5 commit aef6526

6 files changed

Lines changed: 272 additions & 18 deletions

File tree

crates/fula-cli/src/handlers/batch.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ pub async fn delete_objects(
2222
return Err(ApiError::s3(S3ErrorCode::AccessDenied, "Write access required"));
2323
}
2424

25+
// Serialize same-bucket index mutations (see `put_object` for rationale).
26+
let _bucket_guard = state
27+
.bucket_manager
28+
.bucket_write_lock(&session.hashed_user_id, &bucket_name)
29+
.lock_owned()
30+
.await;
31+
2532
// User-scoped bucket access
2633
let mut bucket = state.bucket_manager.open_bucket_for_user(&session.hashed_user_id, &bucket_name).await?;
2734

crates/fula-cli/src/handlers/multipart.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,16 @@ pub async fn complete_multipart_upload(
217217
metadata = metadata.with_user_metadata(k, v);
218218
}
219219

220+
// Serialize same-bucket index mutations (see `put_object` rationale).
221+
// Held across the open → put_object → flush → persist sequence so a
222+
// concurrent put_object/delete_object/copy_object can't race the
223+
// multipart completion and drop its index entry.
224+
let _bucket_guard = state
225+
.bucket_manager
226+
.bucket_write_lock(&session.hashed_user_id, &bucket)
227+
.lock_owned()
228+
.await;
229+
220230
// Store in bucket (user-scoped)
221231
let mut bucket_handle = state.bucket_manager.open_bucket_for_user(&session.hashed_user_id, &bucket).await?;
222232
bucket_handle.put_object(key.clone(), metadata).await?;

crates/fula-cli/src/handlers/object.rs

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ pub async fn put_object(
5151
None => body,
5252
};
5353

54+
// Serialize index-mutating operations on the same user-scoped bucket.
55+
// Without this, parallel chunk PUTs (fula-client fans out up to 16) all
56+
// open at the same root_cid, each flushes a tree containing only its
57+
// own key, and DashMap::insert last-writer-wins drops every other
58+
// mapping — leaving the chunk bytes in IPFS but the bucket index
59+
// pointing at only one of them. Held until the end of the handler so
60+
// the open → mutate → flush → persist sequence is atomic per bucket.
61+
let _bucket_guard = state
62+
.bucket_manager
63+
.bucket_write_lock(&session.hashed_user_id, &bucket_name)
64+
.lock_owned()
65+
.await;
66+
5467
// Open bucket first so conditional-write guards can consult the current
5568
// stored ETag without doing extra I/O later. (Moved ahead of put_block.)
5669
tracing::debug!(bucket = %bucket_name, "Opening user-scoped bucket");
@@ -60,15 +73,10 @@ pub async fn put_object(
6073
e
6174
})?;
6275

63-
// RFC 7232 conditional-write preconditions. Used by fula-client forest
64-
// flush to catch concurrent writers (surfaces as ClientError::Concurrent
65-
// Modification on 412). Checked before put_block to avoid an orphan
66-
// block when the precondition fails.
67-
//
68-
// NOTE: This is a best-effort check — get_object + put_object are not
69-
// atomic under concurrent PUTs on the same key (each request opens its
70-
// own bucket snapshot). The client's retry-on-412 loop handles the
71-
// residual commit-window race.
76+
// RFC 7232 conditional-write preconditions. With the per-bucket write
77+
// lock held above, get_object + put_object now observe a consistent
78+
// snapshot for the same bucket; the client still needs retry-on-412
79+
// for cross-device races.
7280
let existing = bucket.get_object(&key).await?;
7381
let current_etag: Option<&str> = existing.as_ref().map(|m| m.etag.as_str());
7482

@@ -495,6 +503,13 @@ pub async fn delete_object(
495503
return Err(ApiError::s3(S3ErrorCode::AccessDenied, "Write access required"));
496504
}
497505

506+
// Serialize same-bucket index mutations (see `put_object` for rationale).
507+
let _bucket_guard = state
508+
.bucket_manager
509+
.bucket_write_lock(&session.hashed_user_id, &bucket_name)
510+
.lock_owned()
511+
.await;
512+
498513
// User-scoped bucket access
499514
let mut bucket = state.bucket_manager.open_bucket_for_user(&session.hashed_user_id, &bucket_name).await?;
500515

@@ -596,7 +611,7 @@ pub async fn copy_object(
596611
.split_once('/')
597612
.ok_or_else(|| ApiError::s3(S3ErrorCode::InvalidArgument, "Invalid copy source format"))?;
598613

599-
// Get source object (user-scoped)
614+
// Get source object (user-scoped). Read-only, so no write lock needed here.
600615
let source_bucket_handle = state.bucket_manager.open_bucket_for_user(&session.hashed_user_id, source_bucket).await?;
601616

602617
let source_metadata = source_bucket_handle.get_object(source_key).await?
@@ -605,12 +620,23 @@ pub async fn copy_object(
605620
"Source object not found",
606621
copy_source,
607622
))?;
623+
drop(source_bucket_handle);
608624

609625
// Copy to destination (user-scoped)
610626
let mut dest_metadata = source_metadata.clone();
611627
dest_metadata.last_modified = chrono::Utc::now();
612628
dest_metadata.owner_id = Some(session.hashed_user_id.clone());
613629

630+
// Serialize same-bucket index mutations on the destination (see
631+
// `put_object` for rationale). Acquired after the source read so a copy
632+
// within the same bucket can still proceed without the reader holding
633+
// its own handle through the write.
634+
let _bucket_guard = state
635+
.bucket_manager
636+
.bucket_write_lock(&session.hashed_user_id, &dest_bucket)
637+
.lock_owned()
638+
.await;
639+
614640
let mut dest_bucket_handle = state.bucket_manager.open_bucket_for_user(&session.hashed_user_id, &dest_bucket).await?;
615641

616642
dest_bucket_handle.put_object(dest_key, dest_metadata.clone()).await?;

crates/fula-cli/src/handlers/tagging.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,16 @@ pub async fn put_object_tagging(
6969
return Err(ApiError::s3(S3ErrorCode::AccessDenied, "Write access required"));
7070
}
7171

72+
// Serialize same-bucket index mutations (see `put_object` for rationale).
73+
let _bucket_guard = state
74+
.bucket_manager
75+
.bucket_write_lock(&session.hashed_user_id, &bucket_name)
76+
.lock_owned()
77+
.await;
78+
7279
// User-scoped bucket access
7380
let mut bucket = state.bucket_manager.open_bucket_for_user(&session.hashed_user_id, &bucket_name).await?;
74-
81+
7582
let mut metadata = bucket.get_object(&key).await?
7683
.ok_or_else(|| ApiError::s3_with_resource(
7784
S3ErrorCode::NoSuchKey,
@@ -82,10 +89,10 @@ pub async fn put_object_tagging(
8289
// Parse tags from XML body
8390
let body_str = String::from_utf8_lossy(&body);
8491
let tags = parse_tagging_xml(&body_str)?;
85-
92+
8693
// Update tags
8794
metadata.tags = tags;
88-
95+
8996
bucket.put_object(key, metadata).await?;
9097
bucket.flush().await?;
9198

@@ -102,9 +109,16 @@ pub async fn delete_object_tagging(
102109
return Err(ApiError::s3(S3ErrorCode::AccessDenied, "Write access required"));
103110
}
104111

112+
// Serialize same-bucket index mutations (see `put_object` for rationale).
113+
let _bucket_guard = state
114+
.bucket_manager
115+
.bucket_write_lock(&session.hashed_user_id, &bucket_name)
116+
.lock_owned()
117+
.await;
118+
105119
// User-scoped bucket access
106120
let mut bucket = state.bucket_manager.open_bucket_for_user(&session.hashed_user_id, &bucket_name).await?;
107-
121+
108122
let mut metadata = bucket.get_object(&key).await?
109123
.ok_or_else(|| ApiError::s3_with_resource(
110124
S3ErrorCode::NoSuchKey,
@@ -114,7 +128,7 @@ pub async fn delete_object_tagging(
114128

115129
// Clear tags
116130
metadata.tags.clear();
117-
131+
118132
bucket.put_object(key, metadata).await?;
119133
bucket.flush().await?;
120134

0 commit comments

Comments
 (0)