@@ -94,14 +94,33 @@ type s3BucketState struct {
9494 objects map [string ]* s3ObjectState // keyed by "object\x00generation"
9595 keymap * KeymapWriter
9696 keymapDir string
97+ // incompleteUploadsJL is opened lazily on the first
98+ // !s3|upload|meta or !s3|upload|part record under
99+ // --include-incomplete-uploads, then reused for every subsequent
100+ // record in the same bucket and closed in flushBucket. Without
101+ // this caching, the prior code re-opened (truncating!) the file
102+ // on every record, leaving only the last record on disk and
103+ // silently losing forensic data — flagged as Codex P2 #318.
104+ incompleteUploadsJL * jsonlFile
97105}
98106
99107type s3ObjectState struct {
100108 bucket string
101109 generation uint64
102110 object string
103- manifest * s3PublicManifest
104- // chunkPaths maps (uploadID, partNo, chunkNo) -> scratch path.
111+ // uploadID is the manifest's `upload_id`. Set by HandleObjectManifest;
112+ // consumed by assembleObjectBody to filter chunkPaths so a stale
113+ // upload's blob chunks (still in the snapshot during a delete/retry
114+ // window) cannot be merged into the active body — Codex P1 #500,
115+ // Gemini HIGH #106/#476/#504.
116+ uploadID string
117+ // scratchDirCreated avoids the per-blob MkdirAll syscall flagged
118+ // by Gemini MEDIUM #285. The scratch directory for this object is
119+ // created exactly once on the first HandleBlob call.
120+ scratchDirCreated bool
121+ manifest * s3PublicManifest
122+ // chunkPaths maps (uploadID, partNo, chunkNo, partVersion) ->
123+ // scratch path.
105124 chunkPaths map [s3ChunkKey ]string
106125}
107126
@@ -261,12 +280,13 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error {
261280 ContentDisposition : live .ContentDisposition ,
262281 UserMetadata : live .UserMetadata ,
263282 }
264- // Persist the parts list on the object state so Finalize knows
265- // what to assemble. We attach the live parts directly because
266- // that's purely structural data — the public sidecar has no need
267- // for them.
283+ // Capture the manifest's uploadID so assembleObjectBody can
284+ // filter blob chunks belonging to other (stale or in-flight)
285+ // upload attempts. The live parts list is purely structural —
286+ // the public sidecar has no need for it, but its uploadID is
287+ // the load-bearing detail.
288+ st .uploadID = live .UploadID
268289 st .chunkPaths = ensureChunkPaths (st .chunkPaths )
269- st .attachManifestParts (live .UploadID , live .Parts )
270290 return nil
271291}
272292
@@ -280,8 +300,11 @@ func (s *S3Encoder) HandleBlob(key, value []byte) error {
280300 }
281301 st := s .objectState (bucket , gen , object )
282302 dir := filepath .Join (s .scratchRoot , EncodeSegment ([]byte (bucket )), EncodeSegment ([]byte (object )))
283- if err := os .MkdirAll (dir , 0o755 ); err != nil { //nolint:mnd // 0755 == standard dir mode
284- return errors .WithStack (err )
303+ if ! st .scratchDirCreated {
304+ if err := os .MkdirAll (dir , 0o755 ); err != nil { //nolint:mnd // 0755 == standard dir mode
305+ return errors .WithStack (err )
306+ }
307+ st .scratchDirCreated = true
285308 }
286309 path := filepath .Join (dir , blobScratchName (uploadID , partNo , chunkNo , partVersion ))
287310 if err := writeFileAtomic (path , value ); err != nil {
@@ -293,8 +316,14 @@ func (s *S3Encoder) HandleBlob(key, value []byte) error {
293316}
294317
295318// HandleIncompleteUpload routes !s3|upload|meta|/!s3|upload|part|
296- // records to <bucket>/_incomplete_uploads/ when the include flag is
297- // on; otherwise drops them.
319+ // records to <bucket>/_incomplete_uploads/records.jsonl when the
320+ // include flag is on; otherwise drops them.
321+ //
322+ // The output writer is opened once per bucket on the first record and
323+ // cached on s3BucketState. Re-opening per record (the prior
324+ // implementation) used create/truncate semantics, so each call wiped
325+ // the file and only the last record survived — Codex P2 #318 / Gemini
326+ // HIGH+MEDIUM #318.
298327func (s * S3Encoder ) HandleIncompleteUpload (prefix string , key , value []byte ) error {
299328 if ! s .includeIncompleteUploads {
300329 return nil
@@ -303,25 +332,24 @@ func (s *S3Encoder) HandleIncompleteUpload(prefix string, key, value []byte) err
303332 if ! ok {
304333 return errors .Wrapf (ErrS3MalformedKey , "upload-family key: %q" , key )
305334 }
306- dir := filepath . Join ( s . outRoot , "s3" , EncodeSegment ([] byte ( bucket )), "_incomplete_uploads" )
307- if err := os . MkdirAll ( dir , 0o755 ); err != nil { //nolint:mnd // 0755 == standard dir mode
308- return errors . WithStack ( err )
309- }
310- // Phase 0a stores upload-family records as opaque key/value pairs
311- // (one JSON line per record) rather than reconstructing the
312- // in-flight upload state. Restoring incomplete uploads is itself
313- // a follow-up; this artifact preserves the bytes for forensics.
314- jl , err := openJSONL ( filepath . Join ( dir , "records.jsonl" ))
315- if err != nil {
316- return err
335+ b := s . bucketState ( bucket )
336+ if b . incompleteUploadsJL == nil {
337+ dir := filepath . Join ( s . outRoot , "s3" , EncodeSegment ([] byte ( bucket )), "_incomplete_uploads" )
338+ if err := os . MkdirAll ( dir , 0o755 ); err != nil { //nolint:mnd // 0755 == standard dir mode
339+ return errors . WithStack ( err )
340+ }
341+ jl , err := openJSONL ( filepath . Join ( dir , "records.jsonl" ))
342+ if err != nil {
343+ return err
344+ }
345+ b . incompleteUploadsJL = jl
317346 }
318- defer func () { _ = closeJSONL (jl ) }()
319347 rec := struct {
320348 Prefix string `json:"prefix"`
321349 KeyB64 []byte `json:"key"`
322350 ValueB64 []byte `json:"value"`
323351 }{Prefix : prefix , KeyB64 : key , ValueB64 : value }
324- if err := jl .enc .Encode (rec ); err != nil {
352+ if err := b . incompleteUploadsJL .enc .Encode (rec ); err != nil {
325353 return errors .WithStack (err )
326354 }
327355 return nil
@@ -361,34 +389,47 @@ func (s *S3Encoder) flushBucket(b *s3BucketState) error {
361389 return err
362390 }
363391 }
364- if b .keymap != nil {
365- if err := b .keymap .Close (); err != nil {
392+ // closeJSONL errors must surface — they are the canonical "data
393+ // did not flush to disk" signal for a writable resource (Gemini
394+ // MEDIUM #318).
395+ if err := closeBucketKeymap (b ); err != nil {
396+ return err
397+ }
398+ if b .incompleteUploadsJL != nil {
399+ if err := closeJSONL (b .incompleteUploadsJL ); err != nil {
366400 return err
367401 }
368- // If no rename was recorded, drop the empty file so the
369- // dump tree omits it (per the spec: keymaps are absent when
370- // empty).
371- if b .keymap .Count () == 0 && b .keymapDir != "" {
372- _ = os .Remove (filepath .Join (b .keymapDir , "KEYMAP.jsonl" ))
373- }
402+ }
403+ return nil
404+ }
405+
406+ // closeBucketKeymap closes the per-bucket KEYMAP.jsonl writer (if
407+ // opened) and removes the file when no rename was recorded.
408+ func closeBucketKeymap (b * s3BucketState ) error {
409+ if b .keymap == nil {
410+ return nil
411+ }
412+ if err := b .keymap .Close (); err != nil {
413+ return err
414+ }
415+ if b .keymap .Count () == 0 && b .keymapDir != "" {
416+ _ = os .Remove (filepath .Join (b .keymapDir , "KEYMAP.jsonl" ))
374417 }
375418 return nil
376419}
377420
378421func (s * S3Encoder ) flushObject (b * s3BucketState , bucketDir string , obj * s3ObjectState ) error {
379422 if obj .manifest == nil {
380- s .emitWarn ("s3_orphan_chunks" ,
381- "bucket" , b .name ,
382- "object" , obj .object ,
383- "chunks" , len (obj .chunkPaths ),
384- "hint" , "blob chunks present but no !s3|obj|head record matched" )
385- return nil
423+ return s .flushOrphanObject (b , bucketDir , obj )
386424 }
387425 objectName , kind , err := s .resolveObjectFilename (b , obj )
388426 if err != nil {
389427 return err
390428 }
391- bodyPath := filepath .Join (bucketDir , objectName )
429+ bodyPath , err := safeJoinUnderRoot (bucketDir , objectName )
430+ if err != nil {
431+ return err
432+ }
392433 if err := os .MkdirAll (filepath .Dir (bodyPath ), 0o755 ); err != nil { //nolint:mnd // 0755 == standard dir mode
393434 return errors .WithStack (err )
394435 }
@@ -407,6 +448,59 @@ func (s *S3Encoder) flushObject(b *s3BucketState, bucketDir string, obj *s3Objec
407448 return nil
408449}
409450
451+ // flushOrphanObject handles objects with chunks but no manifest. By
452+ // default they emit only a warning. With --include-orphans on, the
453+ // chunks are written under <bucket>/_orphans/<encoded-object>/ as
454+ // per-chunk .bin files so the operator can recover bytes manually
455+ // (Gemini MEDIUM #386).
456+ func (s * S3Encoder ) flushOrphanObject (b * s3BucketState , bucketDir string , obj * s3ObjectState ) error {
457+ s .emitWarn ("s3_orphan_chunks" ,
458+ "bucket" , b .name ,
459+ "object" , obj .object ,
460+ "chunks" , len (obj .chunkPaths ),
461+ "hint" , "blob chunks present but no !s3|obj|head record matched" )
462+ if ! s .includeOrphans {
463+ return nil
464+ }
465+ if len (obj .chunkPaths ) == 0 {
466+ return nil
467+ }
468+ dir := filepath .Join (bucketDir , "_orphans" , EncodeSegment ([]byte (obj .object )))
469+ if err := os .MkdirAll (dir , 0o755 ); err != nil { //nolint:mnd // 0755 == standard dir mode
470+ return errors .WithStack (err )
471+ }
472+ for k , scratchPath := range obj .chunkPaths {
473+ out := filepath .Join (dir , blobScratchName (k .uploadID , k .partNo , k .chunkNo , k .partVersion ))
474+ body , err := os .ReadFile (scratchPath ) //nolint:gosec // scratchPath composed from scratch root
475+ if err != nil {
476+ return errors .WithStack (err )
477+ }
478+ if err := writeFileAtomic (out , body ); err != nil {
479+ return err
480+ }
481+ }
482+ return nil
483+ }
484+
485+ // safeJoinUnderRoot composes <root>/<rel> and asserts the result is
486+ // still rooted under <root>. S3 object keys are user-controlled and
487+ // can contain "..", absolute paths, or NUL bytes; without this guard
488+ // a key like "../etc/passwd" would escape the dump tree and overwrite
489+ // host files (Codex P1 #425).
490+ func safeJoinUnderRoot (root , rel string ) (string , error ) {
491+ if rel == "" {
492+ return "" , errors .Wrap (ErrS3MalformedKey , "empty object name" )
493+ }
494+ cleanRoot := filepath .Clean (root )
495+ joined := filepath .Clean (filepath .Join (cleanRoot , rel ))
496+ rootSep := cleanRoot + string (filepath .Separator )
497+ if joined != cleanRoot && ! strings .HasPrefix (joined , rootSep ) {
498+ return "" , errors .Wrapf (ErrS3MalformedKey ,
499+ "object name %q escapes bucket directory" , rel )
500+ }
501+ return joined , nil
502+ }
503+
410504// resolveObjectFilename returns the relative path of the assembled
411505// body within the bucket directory, plus the keymap "kind" when a
412506// rename took place ("" when the object writes at its natural path).
@@ -421,7 +515,9 @@ func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState)
421515 // Object path taken at face value. Path collisions (`path/to`
422516 // vs `path/to/sub`) are deferred until the master pipeline
423517 // detects them across multiple manifests; this PR's per-object
424- // flush trusts the caller's collision detection.
518+ // flush trusts the caller's collision detection. Path-traversal
519+ // sanitisation runs in safeJoinUnderRoot, downstream of this
520+ // function, where the bucket-directory root is in scope.
425521 return obj .object , "" , nil
426522}
427523
@@ -464,17 +560,6 @@ func (s *S3Encoder) objectState(bucket string, gen uint64, object string) *s3Obj
464560 return st
465561}
466562
467- // attachManifestParts is a placeholder that records the part list on
468- // the object state. The current implementation walks the manifest's
469- // part order at Finalize time, so this method just memoises the upload
470- // ID for reference; future extensions (e.g., versioned parts) can
471- // surface here.
472- func (st * s3ObjectState ) attachManifestParts (_ string , _ []s3LivePart ) {
473- // Intentionally empty: assembleObjectBody consumes the manifest
474- // directly via st.manifest at Finalize. Kept as a hook so the
475- // callsite reads symmetrically with HandleBlob.
476- }
477-
478563// assembleObjectBody concatenates the blob chunks per the manifest's
479564// (PartNo, ChunkNo) order into outPath. The encoder buffers chunks on
480565// disk during the scan, so this copy walk is bounded by the object's
@@ -494,11 +579,20 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error {
494579 _ = os .Remove (tmpPath )
495580 }
496581 }()
497- chunks := sortChunkKeys (obj .chunkPaths )
582+ // Filter chunks by the manifest's uploadID. A snapshot taken
583+ // during a delete/recreate or a retry-after-failed-CompleteUpload
584+ // can legitimately contain blob chunks for multiple upload
585+ // attempts under the same (bucket, generation, object). Mixing
586+ // them produces corrupted bytes — Codex P1 #500 / Gemini HIGH
587+ // #504. The manifest is the single source of truth; only its
588+ // uploadID's chunks belong in the assembled body.
589+ chunks := filterChunksForManifest (obj .chunkPaths , obj .uploadID )
498590 for _ , k := range chunks {
499591 path := obj .chunkPaths [k ]
500592 if err := appendFile (tmp , path ); err != nil {
501- _ = tmp .Close ()
593+ if closeErr := tmp .Close (); closeErr != nil {
594+ return errors .Wrap (err , "tmp.Close after appendFile failure: " + closeErr .Error ())
595+ }
502596 return err
503597 }
504598 }
@@ -511,25 +605,21 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error {
511605 return nil
512606}
513607
514- func appendFile (dst io.Writer , srcPath string ) error {
515- f , err := os .Open (srcPath ) //nolint:gosec // srcPath composed from scratch root
516- if err != nil {
517- return errors .WithStack (err )
518- }
519- defer f .Close ()
520- if _ , err := io .Copy (dst , f ); err != nil {
521- return errors .WithStack (err )
522- }
523- return nil
524- }
525-
526- func sortChunkKeys (m map [s3ChunkKey ]string ) []s3ChunkKey {
527- out := make ([]s3ChunkKey , 0 , len (m ))
608+ // filterChunksForManifest returns the chunk keys belonging to
609+ // manifestUploadID, sorted by (partNo, partVersion, chunkNo). An empty
610+ // manifestUploadID matches every chunk — useful for tests that
611+ // pre-date the uploadID feature, but production callers always have a
612+ // non-empty uploadID via HandleObjectManifest.
613+ func filterChunksForManifest (m map [s3ChunkKey ]string , manifestUploadID string ) []s3ChunkKey {
614+ keys := make ([]s3ChunkKey , 0 , len (m ))
528615 for k := range m {
529- out = append (out , k )
616+ if manifestUploadID != "" && k .uploadID != manifestUploadID {
617+ continue
618+ }
619+ keys = append (keys , k )
530620 }
531- sort .SliceStable (out , func (i , j int ) bool {
532- a , b := out [i ], out [j ]
621+ sort .SliceStable (keys , func (i , j int ) bool {
622+ a , b := keys [i ], keys [j ]
533623 switch {
534624 case a .partNo != b .partNo :
535625 return a .partNo < b .partNo
@@ -539,7 +629,19 @@ func sortChunkKeys(m map[s3ChunkKey]string) []s3ChunkKey {
539629 return a .chunkNo < b .chunkNo
540630 }
541631 })
542- return out
632+ return keys
633+ }
634+
635+ func appendFile (dst io.Writer , srcPath string ) error {
636+ f , err := os .Open (srcPath ) //nolint:gosec // srcPath composed from scratch root
637+ if err != nil {
638+ return errors .WithStack (err )
639+ }
640+ defer f .Close ()
641+ if _ , err := io .Copy (dst , f ); err != nil {
642+ return errors .WithStack (err )
643+ }
644+ return nil
543645}
544646
545647func ensureChunkPaths (m map [s3ChunkKey ]string ) map [s3ChunkKey ]string {
0 commit comments