4848 // ErrS3MetaSuffixCollision is returned when a user object key
4949 // collides with the reserved S3MetaSuffixReserved suffix.
5050 ErrS3MetaSuffixCollision = errors .New ("backup: user S3 object key collides with reserved sidecar suffix" )
51+ // ErrS3IncompleteBlobChunks is returned when a manifest declares
52+ // N chunks for some part but the snapshot did not contain all N.
53+ // Without this guard a partial / racy snapshot would silently
54+ // emit a truncated body. Codex P1 #729.
55+ ErrS3IncompleteBlobChunks = errors .New ("backup: incomplete blob chunks for manifest-declared part" )
5156)
5257
58+ // verifyChunkCompleteness checks every (partNo, partVersion) entry in
59+ // declaredParts has the right number of chunkNo values present in
60+ // chunks. Chunks are expected at chunkNo in [0, chunk_count); a
61+ // missing index in that range surfaces as
62+ // ErrS3IncompleteBlobChunks rather than letting the assembler emit a
63+ // truncated body.
64+ //
65+ // declaredParts == nil means "no contract to verify" — used by tests
66+ // that pre-date the manifest-parts feature; production callers
67+ // always populate it via HandleObjectManifest.
68+ func verifyChunkCompleteness (chunks []s3ChunkKey , declaredParts map [s3PartKey ]s3DeclaredPart ) error {
69+ if declaredParts == nil {
70+ return nil
71+ }
72+ // Count present chunks per (partNo, partVersion).
73+ type observed struct {
74+ count uint64
75+ maxIndex uint64
76+ }
77+ got := make (map [s3PartKey ]observed , len (declaredParts ))
78+ for _ , k := range chunks {
79+ pk := s3PartKey {partNo : k .partNo , partVersion : k .partVersion }
80+ o := got [pk ]
81+ o .count ++
82+ if k .chunkNo > o .maxIndex {
83+ o .maxIndex = k .chunkNo
84+ }
85+ got [pk ] = o
86+ }
87+ for pk , want := range declaredParts {
88+ o := got [pk ]
89+ // We accept o.count == want.chunkCount AND
90+ // o.maxIndex == chunkCount-1, because a snapshot with N
91+ // duplicates of chunkNo=0 would satisfy the count check
92+ // alone. Both the count and the highest index must match.
93+ if want .chunkCount > 0 && (o .count != want .chunkCount || o .maxIndex + 1 != want .chunkCount ) {
94+ return errors .Wrapf (ErrS3IncompleteBlobChunks ,
95+ "partNo=%d partVersion=%d declared chunks=%d, observed count=%d maxIndex=%d" ,
96+ pk .partNo , pk .partVersion , want .chunkCount , o .count , o .maxIndex )
97+ }
98+ }
99+ return nil
100+ }
101+
53102// S3Encoder emits per-bucket _bucket.json + assembled object bodies +
54103// .elastickv-meta.json sidecars + KEYMAP.jsonl, per the Phase 0
55104// design (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md).
@@ -120,13 +169,13 @@ type s3ObjectState struct {
120169 // window) cannot be merged into the active body — Codex P1 #500,
121170 // Gemini HIGH #106/#476/#504.
122171 uploadID string
123- // declaredParts is the set of (partNo, partVersion) tuples the
124- // manifest claims belong to this object. When non-nil, the body
125- // assembler restricts chunkPaths to entries matching this set —
126- // Codex P1 #619. nil means "no filter" (used only in tests that
127- // pre-date the manifest-parts feature; production callers always
128- // receive a non- nil set via HandleObjectManifest) .
129- declaredParts map [s3PartKey ]struct {}
172+ // declaredParts maps each manifest-declared (partNo, partVersion)
173+ // to the metadata the assembler needs to validate completeness
174+ // (chunk_count). When non-nil, the body assembler restricts
175+ // chunkPaths to entries matching the keys AND verifies every
176+ // chunk index in [0, chunk_count) is present — Codex P1 #619
177+ // (filter) + #729 (completeness). nil means "no filter" .
178+ declaredParts map [s3PartKey ]s3DeclaredPart
130179 // scratchDirCreated avoids the per-blob MkdirAll syscall flagged
131180 // by Gemini MEDIUM #285. The scratch directory for this object is
132181 // created exactly once on the first HandleBlob call.
@@ -146,14 +195,22 @@ type s3ChunkKey struct {
146195
147196// s3PartKey is the manifest-declared part identifier: a (partNo,
148197// partVersion) tuple. ChunkNo is excluded because the manifest's
149- // per-part chunk_count + chunk_sizes drive how many chunks to expect
150- // per part, but the manifest doesn't enumerate (chunkNo) tuples
151- // directly.
198+ // per-part chunk_count drives how many chunks to expect per part;
199+ // that count is stored on the s3DeclaredPart value, not in the key.
152200type s3PartKey struct {
153201 partNo uint64
154202 partVersion uint64
155203}
156204
205+ // s3DeclaredPart captures what the manifest claims for a part: its
206+ // expected chunk_count. assembleObjectBody verifies that one chunk
207+ // per (partNo, partVersion, chunkNo) in [0, chunk_count) actually
208+ // arrived; a missing chunk surfaces as ErrS3IncompleteBlobChunks
209+ // rather than a silently-truncated body (Codex P1 #729).
210+ type s3DeclaredPart struct {
211+ chunkCount uint64
212+ }
213+
157214// s3PublicBucket is the dump-format projection of s3BucketMeta.
158215type s3PublicBucket struct {
159216 FormatVersion uint32 `json:"format_version"`
@@ -312,9 +369,11 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error {
312369 // behind by overwrite-then-async-cleanup must NOT be merged
313370 // into the body (Codex P1 #619).
314371 st .uploadID = live .UploadID
315- st .declaredParts = make (map [s3PartKey ]struct {} , len (live .Parts ))
372+ st .declaredParts = make (map [s3PartKey ]s3DeclaredPart , len (live .Parts ))
316373 for _ , p := range live .Parts {
317- st .declaredParts [s3PartKey {partNo : p .PartNo , partVersion : p .PartVersion }] = struct {}{}
374+ st .declaredParts [s3PartKey {partNo : p .PartNo , partVersion : p .PartVersion }] = s3DeclaredPart {
375+ chunkCount : p .ChunkCount ,
376+ }
318377 }
319378 st .chunkPaths = ensureChunkPaths (st .chunkPaths )
320379 return nil
@@ -608,11 +667,28 @@ func safeJoinUnderRoot(root, rel string) (string, error) {
608667 if strings .ContainsRune (rel , 0 ) {
609668 return "" , errors .Wrapf (ErrS3MalformedKey , "object name contains NUL: %q" , rel )
610669 }
611- for _ , seg := range strings .Split (rel , "/" ) {
670+ // Split on "/" and inspect every segment. S3 treats "a/", "a",
671+ // and "a//b" as three distinct keys, but filepath.Join collapses
672+ // them onto one filesystem path; without explicit rejection,
673+ // distinct user keys would silently overwrite each other at
674+ // finalize (Codex P1 #614).
675+ segs := strings .Split (rel , "/" )
676+ for i , seg := range segs {
612677 switch seg {
613678 case "." , ".." :
614679 return "" , errors .Wrapf (ErrS3MalformedKey ,
615680 "object name has dot segment %q (S3 treats it literally; rename in S3 first)" , rel )
681+ case "" :
682+ // A leading "/" produces an initial empty segment
683+ // (segs[0] == ""); filepath.Join handles that case
684+ // safely by stripping the prefix, matching the
685+ // already-tested "absolute path collapses under
686+ // bucket dir" behaviour. Reject empty segments
687+ // anywhere else (mid-path "//" or trailing "/").
688+ if i != 0 {
689+ return "" , errors .Wrapf (ErrS3MalformedKey ,
690+ "object name has empty path segment %q" , rel )
691+ }
616692 }
617693 }
618694 cleanRoot := filepath .Clean (root )
@@ -724,6 +800,10 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error {
724800 // of truth; only its uploadID + declaredParts make it into the
725801 // assembled body.
726802 chunks := filterChunksForManifest (obj .chunkPaths , obj .uploadID , obj .declaredParts )
803+ if err := verifyChunkCompleteness (chunks , obj .declaredParts ); err != nil {
804+ _ = tmp .Close ()
805+ return err
806+ }
727807 for _ , k := range chunks {
728808 path := obj .chunkPaths [k ]
729809 if err := appendFile (tmp , path ); err != nil {
@@ -751,7 +831,7 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error {
751831// filter" — used by tests that pre-date these features. Production
752832// callers always pass non-empty/non-nil values via
753833// HandleObjectManifest.
754- func filterChunksForManifest (m map [s3ChunkKey ]string , manifestUploadID string , declaredParts map [s3PartKey ]struct {} ) []s3ChunkKey {
834+ func filterChunksForManifest (m map [s3ChunkKey ]string , manifestUploadID string , declaredParts map [s3PartKey ]s3DeclaredPart ) []s3ChunkKey {
755835 keys := make ([]s3ChunkKey , 0 , len (m ))
756836 for k := range m {
757837 if manifestUploadID != "" && k .uploadID != manifestUploadID {
0 commit comments