Skip to content

Commit 33bff13

Browse files
committed
Merge remote-tracking branch 'origin/feat/backup-phase0a-dynamodb' into feat/backup-phase0a-s3
2 parents 19ae328 + ce5b7da commit 33bff13

6 files changed

Lines changed: 258 additions & 32 deletions

File tree

internal/backup/dynamodb.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -212,31 +212,48 @@ func (d *DDBEncoder) flushTable(st *ddbTableState) error {
212212
hashKey := st.schema.GetPrimaryKey().GetHashKey()
213213
rangeKey := st.schema.GetPrimaryKey().GetRangeKey()
214214
activeGen := st.schema.GetGeneration()
215-
// Only items belonging to the schema's active generation are
216-
// emitted. Older-gen rows (in-flight delete/recreate cleanup) are
217-
// counted into a structured warning so the operator can correlate
218-
// orphans to the cluster's state, but never written under the
219-
// current schema where they would resurrect deleted data.
220-
if stale := totalStaleItems(st.itemsByGen, activeGen); stale > 0 {
215+
migrationSourceGen := st.schema.GetMigratingFromGeneration()
216+
// During a generation migration the live read path falls back to
217+
// migration_source_generation for items not yet copied to the new
218+
// generation (see adapter/dynamodb.go readLogicalItemAt). Both
219+
// generations therefore carry items the user can read at this
220+
// moment, so a backup must include both. Codex P1 #227.
221+
//
222+
// Items present in BOTH generations: the new-gen row is the
223+
// authoritative one (the live code prefers it on read). We emit
224+
// migration-source first, then active gen LAST, so writeFileAtomic's
225+
// tmp+rename leaves the active-gen content on disk per (pk,sk).
226+
emitOrder := []uint64{}
227+
if migrationSourceGen != 0 && migrationSourceGen != activeGen {
228+
emitOrder = append(emitOrder, migrationSourceGen)
229+
}
230+
emitOrder = append(emitOrder, activeGen)
231+
if stale := totalStaleItemsExcluding(st.itemsByGen, emitOrder); stale > 0 {
221232
d.emitWarn("ddb_stale_generation_items",
222233
"table", st.name,
223234
"active_generation", activeGen,
235+
"migration_source_generation", migrationSourceGen,
224236
"stale_count", stale,
225-
"hint", "stale-gen rows are excluded from the dump; restore would otherwise emit them under the new schema")
237+
"hint", "stale-gen rows excluded; restore would otherwise emit them under the new schema")
226238
}
227-
active := st.itemsByGen[activeGen]
228-
for _, item := range active {
229-
if err := writeDDBItem(itemsDir, hashKey, rangeKey, item); err != nil {
230-
return err
239+
for _, gen := range emitOrder {
240+
for _, item := range st.itemsByGen[gen] {
241+
if err := writeDDBItem(itemsDir, hashKey, rangeKey, item); err != nil {
242+
return err
243+
}
231244
}
232245
}
233246
return nil
234247
}
235248

236-
func totalStaleItems(m map[uint64][]*pb.DynamoItem, activeGen uint64) int {
249+
func totalStaleItemsExcluding(m map[uint64][]*pb.DynamoItem, included []uint64) int {
250+
includedSet := make(map[uint64]struct{}, len(included))
251+
for _, g := range included {
252+
includedSet[g] = struct{}{}
253+
}
237254
stale := 0
238255
for gen, items := range m {
239-
if gen != activeGen {
256+
if _, ok := includedSet[gen]; !ok {
240257
stale += len(items)
241258
}
242259
}

internal/backup/dynamodb_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,87 @@ func TestDDB_BundleJSONLNotImplementedYet(t *testing.T) {
335335
}
336336
}
337337

338+
func TestDDB_MigrationSourceGenerationItemsAreEmitted(t *testing.T) {
339+
t.Parallel()
340+
enc, root := newDDBEncoder(t)
341+
// During a live migration, schema.Generation is the new gen and
342+
// schema.MigratingFromGeneration carries the source gen. The live
343+
// read path falls back to the source for items not yet copied.
344+
// The dump must include both — Codex P1 #227.
345+
schema := &pb.DynamoTableSchema{
346+
TableName: "t",
347+
PrimaryKey: &pb.DynamoKeySchema{HashKey: "id"},
348+
AttributeDefinitions: map[string]string{"id": "S"},
349+
Generation: 7,
350+
MigratingFromGeneration: 6,
351+
}
352+
newRow := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
353+
"id": sAttr("a"), "v": sAttr("new"),
354+
}}
355+
migratingRow := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
356+
"id": sAttr("b"), "v": sAttr("not-yet-migrated"),
357+
}}
358+
if err := enc.HandleItem(EncodeDDBItemKey("t", 7, "a", ""), encodeItemValue(t, newRow)); err != nil {
359+
t.Fatal(err)
360+
}
361+
if err := enc.HandleItem(EncodeDDBItemKey("t", 6, "b", ""), encodeItemValue(t, migratingRow)); err != nil {
362+
t.Fatal(err)
363+
}
364+
if err := enc.HandleTableMeta(EncodeDDBTableMetaKey("t"), encodeSchemaValue(t, schema)); err != nil {
365+
t.Fatal(err)
366+
}
367+
if err := enc.Finalize(); err != nil {
368+
t.Fatal(err)
369+
}
370+
if _, err := os.Stat(filepath.Join(root, "dynamodb", "t", "items", "a.json")); err != nil {
371+
t.Fatalf("active-gen item missing: %v", err)
372+
}
373+
if _, err := os.Stat(filepath.Join(root, "dynamodb", "t", "items", "b.json")); err != nil {
374+
t.Fatalf("migrating-from-gen item must be emitted during live migration: %v", err)
375+
}
376+
}
377+
378+
func TestDDB_NewGenerationWinsOverMigrationSourceForSameKey(t *testing.T) {
379+
t.Parallel()
380+
enc, root := newDDBEncoder(t)
381+
schema := &pb.DynamoTableSchema{
382+
TableName: "t",
383+
PrimaryKey: &pb.DynamoKeySchema{HashKey: "id"},
384+
AttributeDefinitions: map[string]string{"id": "S"},
385+
Generation: 7,
386+
MigratingFromGeneration: 6,
387+
}
388+
// Same primary key in both generations. The live read path
389+
// prefers the new gen; the dump must do the same.
390+
newRow := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
391+
"id": sAttr("k"), "v": sAttr("new-version"),
392+
}}
393+
oldRow := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
394+
"id": sAttr("k"), "v": sAttr("old-version"),
395+
}}
396+
if err := enc.HandleItem(EncodeDDBItemKey("t", 6, "k", ""), encodeItemValue(t, oldRow)); err != nil {
397+
t.Fatal(err)
398+
}
399+
if err := enc.HandleItem(EncodeDDBItemKey("t", 7, "k", ""), encodeItemValue(t, newRow)); err != nil {
400+
t.Fatal(err)
401+
}
402+
if err := enc.HandleTableMeta(EncodeDDBTableMetaKey("t"), encodeSchemaValue(t, schema)); err != nil {
403+
t.Fatal(err)
404+
}
405+
if err := enc.Finalize(); err != nil {
406+
t.Fatal(err)
407+
}
408+
body, err := os.ReadFile(filepath.Join(root, "dynamodb", "t", "items", "k.json")) //nolint:gosec
409+
if err != nil {
410+
t.Fatal(err)
411+
}
412+
got := readItemMap(t, filepath.Join(root, "dynamodb", "t", "items", "k.json"))
413+
v := mustSubMap(t, got, "v")
414+
if v["S"] != "new-version" {
415+
t.Fatalf("body = %s; new gen must win on conflict, got v.S=%v", body, v["S"])
416+
}
417+
}
418+
338419
func TestDDB_StaleGenerationItemsExcludedAndWarned(t *testing.T) {
339420
t.Parallel()
340421
enc, root := newDDBEncoder(t)

internal/backup/filename.go

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -80,32 +80,70 @@ var ErrShaFallbackNeedsKeymap = errors.New("backup: filename uses SHA fallback;
8080
// filename component. It is the inverse of DecodeSegment for non-fallback
8181
// inputs.
8282
//
83-
// The encoding is deterministic and idempotent given the same input.
83+
// The encoding is deterministic given the same input.
8484
//
85-
// Two short-circuits ensure the encoder never trips its own invariants:
85+
// Three structural short-circuits ensure DecodeSegment cannot
86+
// misclassify a legitimate key:
8687
//
87-
// - If raw is so large that percent-encoding it would always overflow
88-
// maxSegmentBytes (3*len(raw) > maxSegmentBytes), we go straight to
89-
// shaFallback without allocating the full expansion. Without this an
90-
// adversarial caller could force a very large transient allocation
91-
// just to discard it.
92-
// - If the percent-encoded form happens to match the SHA-fallback shape
93-
// (32 hex chars followed by "__"), we promote it to a real
88+
// - If `raw` is longer than maxSegmentBytes, even a fully-unreserved
89+
// encoding (1:1) cannot fit, so we go straight to shaFallback.
90+
// This also caps the percent-encode allocation at
91+
// ~maxSegmentBytes, preventing OOM on adversarial input.
92+
// - If the percent-encoded form happens to match the SHA-fallback
93+
// shape (32 hex chars followed by "__"), we promote it to a real
9494
// SHA-fallback so DecodeSegment's structural detection cannot
95-
// misclassify a legitimate key. Both isShaFallback and shaFallback
96-
// are true on the resulting output, so KEYMAP.jsonl carries the
97-
// original bytes for exact-byte recovery.
95+
// fabricate a wrong original.
96+
// - If the percent-encoded form starts with the binary "b64."
97+
// prefix, we promote to SHA-fallback for the same reason: a
98+
// plain string key like "b64.foo" would otherwise be decoded as
99+
// base64 and produce different bytes on round-trip.
100+
//
101+
// Both promoted-fallback paths leave the original in KEYMAP.jsonl
102+
// (a correctness dependency, per the package doc), so exact-byte
103+
// recovery is preserved.
98104
func EncodeSegment(raw []byte) string {
99-
if len(raw)*percentEncodeMaxExpansion > maxSegmentBytes {
105+
if len(raw) > maxSegmentBytes {
106+
// 1:1 lower bound on encoded length; cannot fit.
100107
return shaFallback(raw)
101108
}
102-
encoded := percentEncode(raw)
103-
if len(encoded) > maxSegmentBytes || isShaFallback(encoded) {
109+
encoded, ok := percentEncodeBounded(raw, maxSegmentBytes)
110+
if !ok || isShaFallback(encoded) || strings.HasPrefix(encoded, binaryPrefix) {
104111
return shaFallback(raw)
105112
}
106113
return encoded
107114
}
108115

116+
// percentEncodeBounded percent-encodes raw, bailing out as soon as the
117+
// in-progress output would exceed maxLen. Returns ("", false) on
118+
// overflow so the caller can take the SHA-fallback path without
119+
// having allocated the full 3*len(raw) buffer that the unbounded
120+
// variant would. Returns (encoded, true) on success.
121+
func percentEncodeBounded(raw []byte, maxLen int) (string, bool) {
122+
const escapeBytes = 3 // len("%HH") -- one escape's worst-case width
123+
cap := escapeBytes * len(raw)
124+
if cap > maxLen+escapeBytes {
125+
cap = maxLen + escapeBytes
126+
}
127+
var b strings.Builder
128+
b.Grow(cap)
129+
for _, c := range raw {
130+
if isUnreserved(c) {
131+
if b.Len()+1 > maxLen {
132+
return "", false
133+
}
134+
b.WriteByte(c)
135+
continue
136+
}
137+
if b.Len()+escapeBytes > maxLen {
138+
return "", false
139+
}
140+
b.WriteByte('%')
141+
b.WriteByte(hexUpper(c >> 4)) //nolint:mnd // 4 == nibble width
142+
b.WriteByte(hexUpper(c & 0x0F)) //nolint:mnd // 0x0F == low-nibble mask
143+
}
144+
return b.String(), true
145+
}
146+
109147
// EncodeBinarySegment encodes a DynamoDB B-attribute (binary) segment as
110148
// "b64.<base64url-no-padding>" so that binary keys never collide with string
111149
// keys whose hex-encoding happens to look like base64.
@@ -152,10 +190,6 @@ func DecodeSegment(seg string) ([]byte, error) {
152190
return percentDecode(seg)
153191
}
154192

155-
// percentEncodeMaxExpansion is the worst-case ratio of encoded length to
156-
// raw length for percentEncode (every byte expands to "%HH").
157-
const percentEncodeMaxExpansion = 3
158-
159193
// IsShaFallback reports whether seg uses the SHA-prefix-and-truncated-original
160194
// form. Such segments cannot be reversed without KEYMAP.jsonl.
161195
func IsShaFallback(seg string) bool {

internal/backup/filename_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,44 @@ func TestEncodeBinarySegment_FuzzRoundTripIfNotShaFallback(t *testing.T) {
309309
})
310310
}
311311

312+
func TestEncodeSegment_LongUnreservedASCIIEncodesAsIs(t *testing.T) {
313+
t.Parallel()
314+
// 200 ASCII letters are all unreserved; the percent-encoding is
315+
// 1:1 (200 bytes), well under the 240-byte ceiling. The encoder
316+
// must NOT take the SHA fallback for such inputs — Codex P1 #100.
317+
raw := []byte(strings.Repeat("a", 200))
318+
enc := EncodeSegment(raw)
319+
if IsShaFallback(enc) {
320+
t.Fatalf("200-byte ASCII unreserved input must NOT take SHA fallback")
321+
}
322+
dec, err := DecodeSegment(enc)
323+
if err != nil {
324+
t.Fatalf("DecodeSegment: %v", err)
325+
}
326+
if string(dec) != string(raw) {
327+
t.Fatalf("round-trip failed for 200-byte ASCII")
328+
}
329+
}
330+
331+
func TestEncodeSegment_KeyStartingWithBinaryPrefixIsPromotedToFallback(t *testing.T) {
332+
t.Parallel()
333+
// A user STRING key like "b64.foo" passed naively through
334+
// EncodeSegment returns "b64.foo" (all unreserved). DecodeSegment
335+
// then sees the b64. prefix, treats it as a binary segment, and
336+
// decodes the base64 — producing the wrong bytes. Codex P1 #146.
337+
// EncodeSegment must promote any input whose encoded form starts
338+
// with the binary prefix to a real SHA fallback so KEYMAP.jsonl
339+
// carries the original.
340+
raw := []byte("b64.foo")
341+
enc := EncodeSegment(raw)
342+
if !IsShaFallback(enc) {
343+
t.Fatalf("expected SHA fallback for b64.-prefixed input, got %q", enc)
344+
}
345+
if _, err := DecodeSegment(enc); !errors.Is(err, ErrShaFallbackNeedsKeymap) {
346+
t.Fatalf("decode err=%v want ErrShaFallbackNeedsKeymap", err)
347+
}
348+
}
349+
312350
func TestEncodeSegment_KeyMatchingShaFallbackShapeIsPromotedToFallback(t *testing.T) {
313351
t.Parallel()
314352
// A user key that is itself made of 32 hex chars + "__" + suffix

internal/backup/manifest.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,17 @@ func (m Manifest) validatePhaseSpecific() error {
294294
if m.Source != nil {
295295
return errors.Wrap(ErrInvalidManifest, "phase1 must not set source")
296296
}
297+
// A phase1 dump's whole point is the cluster-wide read_ts
298+
// pin recorded under Live. A manifest that omits Live cannot
299+
// describe its consistency point and downstream restore /
300+
// audit logic must not silently accept it as valid (Codex
301+
// P1 #295).
302+
if m.Live == nil {
303+
return errors.Wrap(ErrInvalidManifest, "phase1 must set live")
304+
}
305+
if m.Live.ReadTS == 0 {
306+
return errors.Wrap(ErrInvalidManifest, "phase1 live.read_ts must be non-zero")
307+
}
297308
}
298309
return nil
299310
}

internal/backup/manifest_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,51 @@ func TestManifest_Phase0RoundTrip(t *testing.T) {
5050
}
5151
}
5252

53+
func TestManifest_Phase1MustSetLive(t *testing.T) {
54+
t.Parallel()
55+
m := NewPhase0SnapshotManifest(time.Now())
56+
m.Phase = PhasePhase1LivePinned
57+
m.Source = nil
58+
// Live deliberately omitted -- the gap Codex P1 #295 caught.
59+
var buf bytes.Buffer
60+
err := WriteManifest(&buf, m)
61+
if !errors.Is(err, ErrInvalidManifest) {
62+
t.Fatalf("err=%v want ErrInvalidManifest", err)
63+
}
64+
}
65+
66+
func TestManifest_Phase1RejectsZeroReadTS(t *testing.T) {
67+
t.Parallel()
68+
m := NewPhase0SnapshotManifest(time.Now())
69+
m.Phase = PhasePhase1LivePinned
70+
m.Source = nil
71+
m.Live = &Live{ReadTS: 0}
72+
var buf bytes.Buffer
73+
err := WriteManifest(&buf, m)
74+
if !errors.Is(err, ErrInvalidManifest) {
75+
t.Fatalf("err=%v want ErrInvalidManifest for zero read_ts", err)
76+
}
77+
}
78+
79+
func TestManifest_Phase1WithLiveAndNonZeroReadTSIsValid(t *testing.T) {
80+
t.Parallel()
81+
m := NewPhase0SnapshotManifest(time.Now())
82+
m.Phase = PhasePhase1LivePinned
83+
m.Source = nil
84+
m.Live = &Live{ReadTS: 12345}
85+
var buf bytes.Buffer
86+
if err := WriteManifest(&buf, m); err != nil {
87+
t.Fatalf("WriteManifest: %v", err)
88+
}
89+
got, err := ReadManifest(&buf)
90+
if err != nil {
91+
t.Fatalf("ReadManifest: %v", err)
92+
}
93+
if got.Live == nil || got.Live.ReadTS != 12345 {
94+
t.Fatalf("Live mismatch: %+v", got.Live)
95+
}
96+
}
97+
5398
func TestManifest_Phase1MustNotSetSource(t *testing.T) {
5499
t.Parallel()
55100
m := NewPhase0SnapshotManifest(time.Now())

0 commit comments

Comments
 (0)