Skip to content

Commit 92ee22e

Browse files
committed
backup: address codex review on S3 encoder (PR #718, round 2)
Three follow-up issues from Codex on top of round 1. #619 Codex P1 -- assemble only manifest-declared parts. filterChunksForManifest previously matched on uploadID only; the manifest declares specific (partNo, partVersion) tuples, and S3's overwrite-then-async-cleanup window can leave older partVersion chunks present under the same (bucket, generation, object, uploadID). Mixing them produces corrupted bytes. s3ObjectState gains declaredParts map[s3PartKey]struct{}, populated by HandleObjectManifest. filterChunksForManifest takes the set and restricts emission to declared (partNo, partVersion) tuples. Test TestS3_StalePartVersionExcludedFromAssembledBody asserts a stale partVersion=7 cannot leak when the manifest declares partVersion=9. #497 Codex P2 -- preserve dot segments. safeJoinUnderRoot used filepath.Clean which collapses "a/../b" to "b". S3 treats those bytes literally; "a/../b" and "b" are distinct keys that would have silently merged into one output file. Now explicitly rejects any object key whose segments are "." or ".." with ErrS3MalformedKey. NUL bytes also rejected. Test TestS3_DotSegmentObjectKeyRejected covers the four forms ("a/../b", "a/./b", "..", "."). #521 Codex P2 -- cross-generation collision. s3BucketState gains activeGen captured from the bucket-meta record. flushBucket suppresses objects whose generation differs from activeGen (under --include-orphans, those flow to _orphans/; by default they're dropped with an s3_stale_generation_objects warning). Tests TestS3_StaleGenerationObjectExcluded. flushBucket cyclomatic complexity stayed under the cap by extracting flushBucketObjects.
1 parent 33bff13 commit 92ee22e

2 files changed

Lines changed: 239 additions & 26 deletions

File tree

internal/backup/s3.go

Lines changed: 132 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,14 @@ type S3Encoder struct {
8989
}
9090

9191
type s3BucketState struct {
92-
name string
93-
meta *s3PublicBucket
92+
name string
93+
meta *s3PublicBucket
94+
// activeGen is the bucket's current generation, captured from the
95+
// bucket-meta record. Used at flush time to suppress objects
96+
// belonging to older incarnations of the same bucket name (Codex
97+
// P2 #521). Zero means "no bucket meta seen yet"; in that state
98+
// every object flushes (the prior orphan-warning path covers it).
99+
activeGen uint64
94100
objects map[string]*s3ObjectState // keyed by "object\x00generation"
95101
keymap *KeymapWriter
96102
keymapDir string
@@ -114,6 +120,13 @@ type s3ObjectState struct {
114120
// window) cannot be merged into the active body — Codex P1 #500,
115121
// Gemini HIGH #106/#476/#504.
116122
uploadID string
123+
// declaredParts is the set of (partNo, partVersion) tuples the
124+
// manifest claims belong to this object. When non-nil, the body
125+
// assembler restricts chunkPaths to entries matching this set —
126+
// Codex P1 #619. nil means "no filter" (used only in tests that
127+
// pre-date the manifest-parts feature; production callers always
128+
// receive a non-nil set via HandleObjectManifest).
129+
declaredParts map[s3PartKey]struct{}
117130
// scratchDirCreated avoids the per-blob MkdirAll syscall flagged
118131
// by Gemini MEDIUM #285. The scratch directory for this object is
119132
// created exactly once on the first HandleBlob call.
@@ -131,6 +144,16 @@ type s3ChunkKey struct {
131144
partVersion uint64
132145
}
133146

147+
// s3PartKey is the manifest-declared part identifier: a (partNo,
148+
// partVersion) tuple. ChunkNo is excluded because the manifest's
149+
// per-part chunk_count + chunk_sizes drive how many chunks to expect
150+
// per part, but the manifest doesn't enumerate (chunkNo) tuples
151+
// directly.
152+
type s3PartKey struct {
153+
partNo uint64
154+
partVersion uint64
155+
}
156+
134157
// s3PublicBucket is the dump-format projection of s3BucketMeta.
135158
type s3PublicBucket struct {
136159
FormatVersion uint32 `json:"format_version"`
@@ -245,6 +268,7 @@ func (s *S3Encoder) HandleBucketMeta(key, value []byte) error {
245268
Region: live.Region,
246269
ACL: live.Acl,
247270
}
271+
st.activeGen = live.Generation
248272
return nil
249273
}
250274

@@ -282,10 +306,16 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error {
282306
}
283307
// Capture the manifest's uploadID so assembleObjectBody can
284308
// filter blob chunks belonging to other (stale or in-flight)
285-
// upload attempts. The live parts list is purely structural —
286-
// the public sidecar has no need for it, but its uploadID is
287-
// the load-bearing detail.
309+
// upload attempts. Also capture the manifest's declared
310+
// (partNo, partVersion) set so the assembler restricts itself
311+
// to canonically-declared parts — older partVersions left
312+
// behind by overwrite-then-async-cleanup must NOT be merged
313+
// into the body (Codex P1 #619).
288314
st.uploadID = live.UploadID
315+
st.declaredParts = make(map[s3PartKey]struct{}, len(live.Parts))
316+
for _, p := range live.Parts {
317+
st.declaredParts[s3PartKey{partNo: p.PartNo, partVersion: p.PartVersion}] = struct{}{}
318+
}
289319
st.chunkPaths = ensureChunkPaths(st.chunkPaths)
290320
return nil
291321
}
@@ -384,10 +414,16 @@ func (s *S3Encoder) flushBucket(b *s3BucketState) error {
384414
return err
385415
}
386416
}
387-
for _, obj := range b.objects {
388-
if err := s.flushObject(b, bucketDir, obj); err != nil {
389-
return err
390-
}
417+
staleCount, err := s.flushBucketObjects(b, bucketDir)
418+
if err != nil {
419+
return err
420+
}
421+
if staleCount > 0 {
422+
s.emitWarn("s3_stale_generation_objects",
423+
"bucket", b.name,
424+
"active_generation", b.activeGen,
425+
"stale_count", staleCount,
426+
"hint", "stale-gen objects excluded; restore would otherwise emit them under the new bucket")
391427
}
392428
// closeJSONL errors must surface — they are the canonical "data
393429
// did not flush to disk" signal for a writable resource (Gemini
@@ -403,6 +439,37 @@ func (s *S3Encoder) flushBucket(b *s3BucketState) error {
403439
return nil
404440
}
405441

442+
// flushBucketObjects walks the bucket's object set, routes stale-gen
443+
// objects to the orphan path (under --include-orphans) or drops them
444+
// with a warning counter, and flushes active-gen objects normally.
445+
// Split out of flushBucket to keep cyclomatic complexity within the
446+
// package cap.
447+
func (s *S3Encoder) flushBucketObjects(b *s3BucketState, bucketDir string) (int, error) {
448+
stale := 0
449+
for _, obj := range b.objects {
450+
// Suppress objects from older bucket incarnations: when a
451+
// bucket is deleted and recreated the generation bumps, but
452+
// snapshots taken mid-cleanup can still carry the previous
453+
// generation's manifests + chunks. Routing both to the same
454+
// natural path is non-deterministic last-write-wins (Codex
455+
// P2 #521). When a bucket-meta record is present, only its
456+
// active generation flushes.
457+
if b.activeGen != 0 && obj.generation != b.activeGen {
458+
stale++
459+
if s.includeOrphans {
460+
if err := s.flushOrphanObject(b, bucketDir, obj); err != nil {
461+
return stale, err
462+
}
463+
}
464+
continue
465+
}
466+
if err := s.flushObject(b, bucketDir, obj); err != nil {
467+
return stale, err
468+
}
469+
}
470+
return stale, nil
471+
}
472+
406473
// closeBucketKeymap closes the per-bucket KEYMAP.jsonl writer (if
407474
// opened) and removes the file when no rename was recorded.
408475
func closeBucketKeymap(b *s3BucketState) error {
@@ -484,15 +551,42 @@ func (s *S3Encoder) flushOrphanObject(b *s3BucketState, bucketDir string, obj *s
484551

485552
// safeJoinUnderRoot composes <root>/<rel> and asserts the result is
486553
// still rooted under <root>. S3 object keys are user-controlled and
487-
// can contain "..", absolute paths, or NUL bytes; without this guard
488-
// a key like "../etc/passwd" would escape the dump tree and overwrite
489-
// host files (Codex P1 #425).
554+
// can contain "..", absolute paths, NUL bytes, or "." segments;
555+
// without this guard a key like "../etc/passwd" would escape the
556+
// dump tree and overwrite host files (Codex P1 #425).
557+
//
558+
// We refuse keys whose path-segment components include "." or ".."
559+
// rather than filepath.Clean'ing them. S3 treats those bytes
560+
// literally — `aws s3 put-object` accepts a key like "a/../b" as
561+
// distinct from "b" — so collapsing them via filepath.Clean would
562+
// silently merge two distinct user keys into one output file
563+
// (Codex P2 #497). Operators with such keys must rename them in
564+
// S3, then re-take the dump; the spec's rename-collisions path
565+
// does not currently cover this.
566+
//
567+
// NUL bytes are also refused: POSIX cannot represent them in a
568+
// path component, and they have no legitimate meaning in S3 keys
569+
// transmitted over HTTP.
490570
func safeJoinUnderRoot(root, rel string) (string, error) {
491571
if rel == "" {
492572
return "", errors.Wrap(ErrS3MalformedKey, "empty object name")
493573
}
574+
if strings.ContainsRune(rel, 0) {
575+
return "", errors.Wrapf(ErrS3MalformedKey, "object name contains NUL: %q", rel)
576+
}
577+
for _, seg := range strings.Split(rel, "/") {
578+
switch seg {
579+
case ".", "..":
580+
return "", errors.Wrapf(ErrS3MalformedKey,
581+
"object name has dot segment %q (S3 treats it literally; rename in S3 first)", rel)
582+
}
583+
}
494584
cleanRoot := filepath.Clean(root)
495-
joined := filepath.Clean(filepath.Join(cleanRoot, rel))
585+
// Use filepath.Join here — its only behavioural change vs. raw
586+
// concatenation after the dot-segment guard above is normalising
587+
// a leading "/" off `rel` (which is what we want: absolute-path
588+
// keys collapse safely under bucketDir).
589+
joined := filepath.Join(cleanRoot, rel)
496590
rootSep := cleanRoot + string(filepath.Separator)
497591
if joined != cleanRoot && !strings.HasPrefix(joined, rootSep) {
498592
return "", errors.Wrapf(ErrS3MalformedKey,
@@ -579,14 +673,17 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error {
579673
_ = os.Remove(tmpPath)
580674
}
581675
}()
582-
// Filter chunks by the manifest's uploadID. A snapshot taken
583-
// during a delete/recreate or a retry-after-failed-CompleteUpload
584-
// can legitimately contain blob chunks for multiple upload
585-
// attempts under the same (bucket, generation, object). Mixing
586-
// them produces corrupted bytes — Codex P1 #500 / Gemini HIGH
587-
// #504. The manifest is the single source of truth; only its
588-
// uploadID's chunks belong in the assembled body.
589-
chunks := filterChunksForManifest(obj.chunkPaths, obj.uploadID)
676+
// Filter chunks by the manifest's uploadID AND its declared
677+
// (partNo, partVersion) set. A snapshot taken during
678+
// delete/recreate, retry-after-failed-CompleteUpload, or
679+
// part-overwrite-before-cleanup can legitimately contain blob
680+
// chunks for multiple upload attempts and/or multiple part
681+
// versions under the same (bucket, generation, object). Mixing
682+
// them produces corrupted bytes — Codex P1 #500 (uploadID),
683+
// Codex P1 #619 (partVersion). The manifest is the single source
684+
// of truth; only its uploadID + declaredParts make it into the
685+
// assembled body.
686+
chunks := filterChunksForManifest(obj.chunkPaths, obj.uploadID, obj.declaredParts)
590687
for _, k := range chunks {
591688
path := obj.chunkPaths[k]
592689
if err := appendFile(tmp, path); err != nil {
@@ -606,16 +703,25 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error {
606703
}
607704

608705
// filterChunksForManifest returns the chunk keys belonging to
609-
// manifestUploadID, sorted by (partNo, partVersion, chunkNo). An empty
610-
// manifestUploadID matches every chunk — useful for tests that
611-
// pre-date the uploadID feature, but production callers always have a
612-
// non-empty uploadID via HandleObjectManifest.
613-
func filterChunksForManifest(m map[s3ChunkKey]string, manifestUploadID string) []s3ChunkKey {
706+
// manifestUploadID AND whose (partNo, partVersion) appears in
707+
// declaredParts. Returned keys are sorted by (partNo, partVersion,
708+
// chunkNo) for byte-deterministic body assembly.
709+
//
710+
// An empty manifestUploadID and a nil declaredParts both mean "no
711+
// filter" — used by tests that pre-date these features. Production
712+
// callers always pass non-empty/non-nil values via
713+
// HandleObjectManifest.
714+
func filterChunksForManifest(m map[s3ChunkKey]string, manifestUploadID string, declaredParts map[s3PartKey]struct{}) []s3ChunkKey {
614715
keys := make([]s3ChunkKey, 0, len(m))
615716
for k := range m {
616717
if manifestUploadID != "" && k.uploadID != manifestUploadID {
617718
continue
618719
}
720+
if declaredParts != nil {
721+
if _, ok := declaredParts[s3PartKey{partNo: k.partNo, partVersion: k.partVersion}]; !ok {
722+
continue
723+
}
724+
}
619725
keys = append(keys, k)
620726
}
621727
sort.SliceStable(keys, func(i, j int) bool {

internal/backup/s3_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,113 @@ func TestS3_OrphanChunksWrittenWhenIncludeOrphans(t *testing.T) {
397397
}
398398
}
399399

400+
func TestS3_StalePartVersionExcludedFromAssembledBody(t *testing.T) {
401+
t.Parallel()
402+
enc, root := newS3Encoder(t)
403+
bucket := "b"
404+
gen := uint64(1)
405+
object := "obj"
406+
uploadID := "u"
407+
if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{
408+
"bucket_name": bucket, "generation": gen,
409+
})); err != nil {
410+
t.Fatal(err)
411+
}
412+
// Manifest declares partNo=1 partVersion=9. A stale chunk at
413+
// partVersion=7 (a previous overwrite still uncleaned) must NOT
414+
// be merged — Codex P1 #619.
415+
if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{
416+
"upload_id": uploadID, "size_bytes": 5, "parts": []map[string]any{
417+
{"part_no": 1, "size_bytes": 5, "chunk_count": 1, "part_version": 9},
418+
},
419+
})); err != nil {
420+
t.Fatal(err)
421+
}
422+
if err := enc.HandleBlob(s3keys.VersionedBlobKey(bucket, gen, object, uploadID, 1, 0, 7), []byte("STALE")); err != nil {
423+
t.Fatal(err)
424+
}
425+
if err := enc.HandleBlob(s3keys.VersionedBlobKey(bucket, gen, object, uploadID, 1, 0, 9), []byte("OKBYE")); err != nil {
426+
t.Fatal(err)
427+
}
428+
if err := enc.Finalize(); err != nil {
429+
t.Fatal(err)
430+
}
431+
got, err := os.ReadFile(filepath.Join(root, "s3", bucket, object)) //nolint:gosec
432+
if err != nil {
433+
t.Fatal(err)
434+
}
435+
if string(got) != "OKBYE" {
436+
t.Fatalf("body=%q want %q (stale partVersion leaked)", got, "OKBYE")
437+
}
438+
}
439+
440+
func TestS3_DotSegmentObjectKeyRejected(t *testing.T) {
441+
t.Parallel()
442+
cases := []string{"a/../b", "a/./b", "..", "."}
443+
for _, key := range cases {
444+
t.Run(key, func(t *testing.T) {
445+
enc, _ := newS3Encoder(t)
446+
emitObject(t, enc, "b", 1, key, []byte("x"), "")
447+
err := enc.Finalize()
448+
if !errors.Is(err, ErrS3MalformedKey) {
449+
t.Fatalf("err=%v want ErrS3MalformedKey for key %q", err, key)
450+
}
451+
})
452+
}
453+
}
454+
455+
// emitObjectAtGen is a helper for cross-generation tests: emits a
456+
// manifest + single chunk under an explicit (gen, uploadID) instead
457+
// of the bucket's active gen.
458+
func emitObjectAtGen(t *testing.T, enc *S3Encoder, bucket string, gen uint64, object, uploadID string, body []byte) {
459+
t.Helper()
460+
if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{
461+
"upload_id": uploadID, "size_bytes": int64(len(body)), "parts": []map[string]any{
462+
{"part_no": 1, "size_bytes": int64(len(body)), "chunk_count": 1},
463+
},
464+
})); err != nil {
465+
t.Fatal(err)
466+
}
467+
if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 0), body); err != nil {
468+
t.Fatal(err)
469+
}
470+
}
471+
472+
func TestS3_StaleGenerationObjectExcluded(t *testing.T) {
473+
t.Parallel()
474+
enc, root := newS3Encoder(t)
475+
var events []string
476+
enc.WithWarnSink(func(e string, _ ...any) { events = append(events, e) })
477+
if err := enc.HandleBucketMeta(s3keys.BucketMetaKey("b"), encodeS3BucketMetaValue(t, map[string]any{
478+
"bucket_name": "b", "generation": 7,
479+
})); err != nil {
480+
t.Fatal(err)
481+
}
482+
emitObjectAtGen(t, enc, "b", 6, "stale-obj", "us", []byte("STALE"))
483+
emitObjectAtGen(t, enc, "b", 7, "live-obj", "ul", []byte("LIVE"))
484+
if err := enc.Finalize(); err != nil {
485+
t.Fatal(err)
486+
}
487+
if _, err := os.Stat(filepath.Join(root, "s3", "b", "live-obj")); err != nil {
488+
t.Fatalf("live-gen object missing: %v", err)
489+
}
490+
if _, err := os.Stat(filepath.Join(root, "s3", "b", "stale-obj")); !os.IsNotExist(err) {
491+
t.Fatalf("stale-gen object must NOT flush, stat err=%v", err)
492+
}
493+
if !sliceContains(events, "s3_stale_generation_objects") {
494+
t.Fatalf("expected s3_stale_generation_objects warning, got %v", events)
495+
}
496+
}
497+
498+
func sliceContains(haystack []string, needle string) bool {
499+
for _, s := range haystack {
500+
if s == needle {
501+
return true
502+
}
503+
}
504+
return false
505+
}
506+
400507
func TestS3_VersionedBlobAssembledByPartVersionOrder(t *testing.T) {
401508
t.Parallel()
402509
enc, root := newS3Encoder(t)

0 commit comments

Comments
 (0)