Skip to content

Commit 5e0e282

Browse files
committed
Tolerate per-user pin failures and add wire helpers
Make users-index publishing tolerant of individual per-user pin failures: collect per-user pin results without aborting the tick, surface a failed_users count in TickOutcome/PublishNowResponse, emit per-user and tick-level warnings, and add comprehensive tests (including a FaultyBlockStore) for partial/failing/all-fail/retry scenarios. Extract and unit-test HTTP-layer helpers for Phase 1.2: control-header filtering and parse_bucket_lookup_h_header (with explicit error enum) so the lookup_h header is handled cleanly and not persisted as user metadata. Enhance ful a-client block cache: add KEY_TO_CID mapping for offline-fallback, resolver hot-start METADATA rows and accessors, debug impls, and store/load helpers for users_index state. Update ful a-client Cargo.toml to add serde_ipld_dagcbor and sha3 (tests). Misc: wire failed_users through publish_now response and small cleanup/refactors to object header handling.
1 parent 8d14a20 commit 5e0e282

21 files changed

Lines changed: 6103 additions & 100 deletions

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fula-cli/src/handlers/internal.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ pub struct PublishNowResponse {
6363
pub global_cid: String,
6464
pub sequence: u64,
6565
pub changed_users: usize,
66+
/// Number of users whose per-user CBOR pin failed this tick.
67+
/// Surfaces the per-user-error-tolerance count from
68+
/// `TickOutcome.failed_users` so an operator clicking
69+
/// "publish now" in the admin UI sees per-user pin failures
70+
/// without tailing logs. A non-zero value means the published
71+
/// global may exclude one or more users (or carry their prior
72+
/// CIDs). The per-user `warn!` lines inside `run_tick` identify
73+
/// WHICH users failed; this field is the count for surfacing.
74+
pub failed_users: usize,
6675
pub total_users: usize,
6776
pub global_rebuilt: bool,
6877
}
@@ -159,6 +168,7 @@ pub async fn publish_now(
159168
global_cid: outcome.global_cid.to_string(),
160169
sequence: outcome.sequence,
161170
changed_users: outcome.changed_users,
171+
failed_users: outcome.failed_users,
162172
total_users: outcome.total_users,
163173
global_rebuilt: outcome.global_rebuilt,
164174
};

crates/fula-cli/src/handlers/object.rs

Lines changed: 260 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,13 @@ pub async fn put_object(
130130
metadata = metadata.with_content_type(ct);
131131
}
132132

133-
// Extract user metadata (x-amz-meta-*).
134-
// Internal Fula control headers (consumed by the handler, not stored as
135-
// object metadata) are filtered out — they would otherwise pollute every
136-
// object's persisted metadata.
137-
const FULA_CONTROL_HEADERS: &[&str] = &["fula-bucket-lookup-h"];
133+
// Extract user metadata (x-amz-meta-*). Internal Fula control
134+
// headers (consumed by the handler, not stored as object metadata)
135+
// are filtered out via `is_fula_control_header` — they would
136+
// otherwise pollute every object's persisted metadata.
138137
for (name, value) in headers.iter() {
139138
if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
140-
if FULA_CONTROL_HEADERS.contains(&key) {
139+
if is_fula_control_header(key) {
141140
continue;
142141
}
143142
if let Ok(v) = value.to_str() {
@@ -180,10 +179,8 @@ pub async fn put_object(
180179
.get("x-amz-meta-fula-bucket-lookup-h")
181180
.and_then(|v| v.to_str().ok())
182181
{
183-
match hex::decode(hex_str) {
184-
Ok(bytes) if bytes.len() == 16 => {
185-
let mut lookup_h = [0u8; 16];
186-
lookup_h.copy_from_slice(&bytes);
182+
match parse_bucket_lookup_h_header(hex_str) {
183+
Ok(lookup_h) => {
187184
match state.bucket_manager.populate_lookup_h_if_missing(
188185
&session.hashed_user_id,
189186
&bucket_name,
@@ -205,12 +202,12 @@ pub async fn put_object(
205202
),
206203
}
207204
}
208-
Ok(other) => tracing::warn!(
209-
actual_len = other.len(),
205+
Err(BucketLookupHError::WrongLength { actual }) => tracing::warn!(
206+
actual_len = actual,
210207
"x-amz-meta-fula-bucket-lookup-h: expected 16-byte hex (32 chars), got {} bytes",
211-
other.len()
208+
actual
212209
),
213-
Err(e) => tracing::warn!(
210+
Err(BucketLookupHError::InvalidHex(e)) => tracing::warn!(
214211
error = %e,
215212
"Failed to hex-decode x-amz-meta-fula-bucket-lookup-h"
216213
),
@@ -768,6 +765,255 @@ fn parse_etag_list(s: &str) -> impl Iterator<Item = String> + '_ {
768765
})
769766
}
770767

768+
// ============================================================
769+
// Phase 1.2 wire-path helpers (master-side)
770+
// ============================================================
771+
//
772+
// These are extracted out of the put_object handler so the
773+
// header-parsing + control-header-filter logic can be unit-tested
774+
// without spinning up the full HTTP server stack. Audit follow-up
775+
// item #5: cover the wire path beyond the BucketManager-direct
776+
// integration test in users_index_publisher.rs.
777+
778+
/// Internal Fula control headers (consumed by handler logic, NOT
779+
/// persisted as object metadata). The list is `pub(crate)` so it
780+
/// can be referenced from sibling modules; tests below assert it
781+
/// stays in lockstep with the handler's filtering.
782+
pub(crate) const FULA_CONTROL_HEADERS: &[&str] = &["fula-bucket-lookup-h"];
783+
784+
/// Returns `true` if the given x-amz-meta key (already stripped of
785+
/// the `x-amz-meta-` prefix) is a Fula control header — meaning it
786+
/// should NOT end up in `ObjectMetadata.user_metadata` even though
787+
/// it's a perfectly valid `x-amz-meta-*` name.
788+
pub(crate) fn is_fula_control_header(stripped_key: &str) -> bool {
789+
FULA_CONTROL_HEADERS.contains(&stripped_key)
790+
}
791+
792+
/// Parse error for the `x-amz-meta-fula-bucket-lookup-h` header
793+
/// value. Three failure modes today; expanding this enum is
794+
/// backward-compatible (the handler matches exhaustively).
795+
#[derive(Debug)]
796+
pub(crate) enum BucketLookupHError {
797+
/// hex::decode failed — non-hex characters in the value.
798+
InvalidHex(hex::FromHexError),
799+
/// Decoded byte length wasn't 16 (the only legal width per
800+
/// Phase 1.2 spec — `userKey`-equivalent 128-bit blinded key).
801+
WrongLength { actual: usize },
802+
}
803+
804+
impl From<hex::FromHexError> for BucketLookupHError {
805+
fn from(e: hex::FromHexError) -> Self {
806+
BucketLookupHError::InvalidHex(e)
807+
}
808+
}
809+
810+
/// Parse `x-amz-meta-fula-bucket-lookup-h` header value into a
811+
/// 16-byte fixed array. Pure: no I/O, no allocations beyond the
812+
/// transient hex::decode buffer. Used by `put_object` to convert
813+
/// the wire-format string into the format
814+
/// `BucketManager::populate_lookup_h_if_missing` expects.
815+
pub(crate) fn parse_bucket_lookup_h_header(
816+
hex_str: &str,
817+
) -> Result<[u8; 16], BucketLookupHError> {
818+
let bytes = hex::decode(hex_str)?;
819+
if bytes.len() != 16 {
820+
return Err(BucketLookupHError::WrongLength { actual: bytes.len() });
821+
}
822+
let mut out = [0u8; 16];
823+
out.copy_from_slice(&bytes);
824+
Ok(out)
825+
}
826+
827+
#[cfg(test)]
828+
mod phase_1_2_wire_tests {
829+
//! Phase 1.2 wire-path tests. Covers what the existing
830+
//! `users_index_publisher::test_run_tick_legacy_to_blinded_replaces_entry`
831+
//! test does NOT cover: the HTTP-layer header extraction +
832+
//! parsing logic that sits between an SDK request and a
833+
//! `populate_lookup_h_if_missing` call.
834+
835+
use super::*;
836+
use axum::http::{HeaderMap, HeaderName, HeaderValue};
837+
838+
#[test]
839+
fn control_header_filter_includes_lookup_h() {
840+
// Audit gold: the lookup_h header IS recognized as a control
841+
// header. If someone removes it from FULA_CONTROL_HEADERS the
842+
// header would leak into user_metadata storage on every PUT.
843+
assert!(is_fula_control_header("fula-bucket-lookup-h"));
844+
}
845+
846+
#[test]
847+
fn control_header_filter_excludes_arbitrary_user_metadata() {
848+
// Defensive: an app's own metadata keys must NOT be filtered.
849+
assert!(!is_fula_control_header("content-language"));
850+
assert!(!is_fula_control_header("x-fula-encrypted"));
851+
assert!(!is_fula_control_header(""));
852+
}
853+
854+
#[test]
855+
fn parse_lookup_h_accepts_valid_32_char_hex() {
856+
// Mirrors what `compute_bucket_lookup_h_hex` produces in the
857+
// SDK: 32 lowercase hex chars = 16 bytes.
858+
let valid = "deadbeefcafebabefeedfacef00dbabe";
859+
let parsed = parse_bucket_lookup_h_header(valid).expect("valid 32-char hex");
860+
assert_eq!(parsed.len(), 16);
861+
assert_eq!(parsed[0], 0xde);
862+
assert_eq!(parsed[15], 0xbe);
863+
}
864+
865+
#[test]
866+
fn parse_lookup_h_accepts_uppercase_hex() {
867+
// hex::decode is case-insensitive; we don't normalize.
868+
let valid = "DEADBEEFCAFEBABEFEEDFACEF00DBABE";
869+
let parsed = parse_bucket_lookup_h_header(valid).expect("uppercase ok");
870+
assert_eq!(parsed[0], 0xde);
871+
}
872+
873+
#[test]
874+
fn parse_lookup_h_rejects_too_short() {
875+
// 30 hex chars = 15 bytes — one short.
876+
let too_short = "deadbeefcafebabefeedfacef00dba";
877+
match parse_bucket_lookup_h_header(too_short) {
878+
Err(BucketLookupHError::WrongLength { actual: 15 }) => {}
879+
other => panic!("expected WrongLength{{actual:15}}, got {:?}", other),
880+
}
881+
}
882+
883+
#[test]
884+
fn parse_lookup_h_rejects_too_long() {
885+
// 34 hex chars = 17 bytes — one byte over.
886+
let too_long = "deadbeefcafebabefeedfacef00dbabe11";
887+
match parse_bucket_lookup_h_header(too_long) {
888+
Err(BucketLookupHError::WrongLength { actual: 17 }) => {}
889+
other => panic!("expected WrongLength{{actual:17}}, got {:?}", other),
890+
}
891+
}
892+
893+
#[test]
894+
fn parse_lookup_h_rejects_non_hex_chars() {
895+
// 'z' is not a valid hex char; even at correct length this
896+
// fails with InvalidHex.
897+
let bad_chars = "zzadbeefcafebabefeedfacef00dbabe";
898+
match parse_bucket_lookup_h_header(bad_chars) {
899+
Err(BucketLookupHError::InvalidHex(_)) => {}
900+
other => panic!("expected InvalidHex, got {:?}", other),
901+
}
902+
}
903+
904+
#[test]
905+
fn parse_lookup_h_rejects_empty_string() {
906+
// An empty header value reaches us as "" — must not parse
907+
// to a zero-byte array.
908+
match parse_bucket_lookup_h_header("") {
909+
Err(BucketLookupHError::WrongLength { actual: 0 }) => {}
910+
other => panic!("expected WrongLength{{actual:0}}, got {:?}", other),
911+
}
912+
}
913+
914+
#[test]
915+
fn parse_lookup_h_rejects_odd_length_hex() {
916+
// 31 chars — odd-length is invalid per hex spec; hex::decode
917+
// returns OddLength, which we surface as InvalidHex.
918+
let odd = "deadbeefcafebabefeedfacef00dbab";
919+
match parse_bucket_lookup_h_header(odd) {
920+
Err(BucketLookupHError::InvalidHex(_)) => {}
921+
other => panic!("expected InvalidHex (odd length), got {:?}", other),
922+
}
923+
}
924+
925+
/// End-to-end-ish wire-path simulation: from a real `HeaderMap`
926+
/// (as the put_object handler would receive), extract:
927+
/// - the user_metadata that should be persisted (lookup_h MUST
928+
/// NOT appear there)
929+
/// - the parsed lookup_h bytes (MUST equal what the SDK sent)
930+
///
931+
/// This is the critical regression guard for "old client uploads
932+
/// without header → no populate" vs "new client uploads with
933+
/// header → populate fires with correct bytes". The integration
934+
/// with `BucketManager` and the publisher is already covered by
935+
/// `users_index_publisher::test_run_tick_legacy_to_blinded_replaces_entry`.
936+
#[test]
937+
fn old_client_no_header_means_no_populate() {
938+
let mut headers = HeaderMap::new();
939+
// Old client sends content-type and a user metadata key; no
940+
// lookup_h header.
941+
headers.insert(
942+
HeaderName::from_static("content-type"),
943+
HeaderValue::from_static("image/jpeg"),
944+
);
945+
headers.insert(
946+
HeaderName::from_static("x-amz-meta-myapp-tag"),
947+
HeaderValue::from_static("vacation"),
948+
);
949+
950+
// Wire-path step 1: lookup_h header absent → handler skips populate.
951+
let lookup_h_present = headers.get("x-amz-meta-fula-bucket-lookup-h").is_some();
952+
assert!(!lookup_h_present, "no header on old-client PUT");
953+
954+
// Wire-path step 2: user_metadata extraction filters control
955+
// headers (none to filter here, but the loop must include the
956+
// app's own tag).
957+
let mut user_meta: Vec<(String, String)> = Vec::new();
958+
for (name, value) in headers.iter() {
959+
if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
960+
if is_fula_control_header(key) {
961+
continue;
962+
}
963+
if let Ok(v) = value.to_str() {
964+
user_meta.push((key.to_string(), v.to_string()));
965+
}
966+
}
967+
}
968+
assert_eq!(user_meta, vec![("myapp-tag".to_string(), "vacation".to_string())]);
969+
}
970+
971+
#[test]
972+
fn new_client_header_parses_and_does_not_leak_into_user_metadata() {
973+
let mut headers = HeaderMap::new();
974+
headers.insert(
975+
HeaderName::from_static("content-type"),
976+
HeaderValue::from_static("image/jpeg"),
977+
);
978+
headers.insert(
979+
HeaderName::from_static("x-amz-meta-fula-bucket-lookup-h"),
980+
HeaderValue::from_static("aabbccddeeff00112233445566778899"),
981+
);
982+
headers.insert(
983+
HeaderName::from_static("x-amz-meta-myapp-tag"),
984+
HeaderValue::from_static("vacation"),
985+
);
986+
987+
// Wire-path step 1: lookup_h header parses to expected bytes.
988+
let hex_str = headers
989+
.get("x-amz-meta-fula-bucket-lookup-h")
990+
.and_then(|v| v.to_str().ok())
991+
.expect("present");
992+
let parsed = parse_bucket_lookup_h_header(hex_str).expect("valid hex");
993+
assert_eq!(parsed, [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11,
994+
0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99]);
995+
996+
// Wire-path step 2: user_metadata extraction MUST drop the
997+
// lookup_h header and keep the app's own tag.
998+
let mut user_meta: Vec<(String, String)> = Vec::new();
999+
for (name, value) in headers.iter() {
1000+
if let Some(key) = name.as_str().strip_prefix("x-amz-meta-") {
1001+
if is_fula_control_header(key) {
1002+
continue;
1003+
}
1004+
if let Ok(v) = value.to_str() {
1005+
user_meta.push((key.to_string(), v.to_string()));
1006+
}
1007+
}
1008+
}
1009+
assert_eq!(
1010+
user_meta,
1011+
vec![("myapp-tag".to_string(), "vacation".to_string())],
1012+
"lookup_h header must NOT leak into user_metadata"
1013+
);
1014+
}
1015+
}
1016+
7711017
#[cfg(test)]
7721018
mod conditional_tests {
7731019
use super::{match_if_match, match_if_none_match};

0 commit comments

Comments
 (0)