Skip to content

Commit 75c2c48

Browse files
authored
s3keys: export ParseBlobKey for offline blob-key consumers (#717)
## Summary Tiny addition to `internal/s3keys`. Exports `ParseBlobKey` so the offline Phase 0a logical-backup decoder (next stacked PR) can route `!s3|blob|` records to their assembled object without holding a live cluster. The package already exports parsers for object manifests and upload parts; blob keys are constructable via `BlobKey` / `VersionedBlobKey` but were not parseable from the outside. ## What `ParseBlobKey` does Returns all components: `bucket`, `generation`, `object`, `uploadID`, `partNo`, `chunkNo`, `partVersion`. Both the 6-component (un-versioned) and 7-component (versioned) shapes are decoded. `partVersion=0` reverts to the un-versioned shape, matching `VersionedBlobKey`'s documented fallback. Truncated keys, malformed segments, and trailers that aren't either zero bytes or exactly one u64 are rejected with `ok=false`. ## Test plan - [x] `go test -race ./internal/s3keys/...` — pass. - [x] `golangci-lint run ./internal/s3keys/...` — clean. - [x] 5 new tests: un-versioned round-trip, versioned round-trip, `partVersion=0` fallback, rejection of bucket-meta / object-manifest / upload-part / junk inputs, rejection of trailing-garbage keys. - [x] Implementation split into `parseBlobKeyHead` + `parseOptionalPartVersion` so cyclomatic complexity stays under the package cap (no `//nolint`). ## Why this is its own PR Branch stack discipline. The S3 backup encoder uses this; landing the parser separately keeps the encoder PR focused on the dump-format mapping.
2 parents 5c7ccce + 0e15f02 commit 75c2c48

8 files changed

Lines changed: 2632 additions & 0 deletions

File tree

internal/backup/dynamodb.go

Lines changed: 482 additions & 0 deletions
Large diffs are not rendered by default.

internal/backup/dynamodb_test.go

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
package backup
2+
3+
import (
4+
"encoding/json"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
9+
pb "github.com/bootjp/elastickv/proto"
10+
"github.com/cockroachdb/errors"
11+
gproto "google.golang.org/protobuf/proto"
12+
)
13+
14+
func encodeSchemaValue(t *testing.T, schema *pb.DynamoTableSchema) []byte {
15+
t.Helper()
16+
body, err := gproto.Marshal(schema)
17+
if err != nil {
18+
t.Fatalf("marshal schema: %v", err)
19+
}
20+
out := append([]byte{}, storedDDBSchemaMagic...)
21+
return append(out, body...)
22+
}
23+
24+
func encodeItemValue(t *testing.T, item *pb.DynamoItem) []byte {
25+
t.Helper()
26+
body, err := gproto.Marshal(item)
27+
if err != nil {
28+
t.Fatalf("marshal item: %v", err)
29+
}
30+
out := append([]byte{}, storedDDBItemMagic...)
31+
return append(out, body...)
32+
}
33+
34+
func sAttr(s string) *pb.DynamoAttributeValue {
35+
return &pb.DynamoAttributeValue{Value: &pb.DynamoAttributeValue_S{S: s}}
36+
}
37+
38+
func nAttr(n string) *pb.DynamoAttributeValue {
39+
return &pb.DynamoAttributeValue{Value: &pb.DynamoAttributeValue_N{N: n}}
40+
}
41+
42+
func bAttr(b []byte) *pb.DynamoAttributeValue {
43+
return &pb.DynamoAttributeValue{Value: &pb.DynamoAttributeValue_B{B: b}}
44+
}
45+
46+
func boolAttr(b bool) *pb.DynamoAttributeValue {
47+
return &pb.DynamoAttributeValue{Value: &pb.DynamoAttributeValue_BoolValue{BoolValue: b}}
48+
}
49+
50+
func newDDBEncoder(t *testing.T) (*DDBEncoder, string) {
51+
t.Helper()
52+
root := t.TempDir()
53+
return NewDDBEncoder(root), root
54+
}
55+
56+
func readPublicSchema(t *testing.T, path string) ddbPublicSchema {
57+
t.Helper()
58+
body, err := os.ReadFile(path) //nolint:gosec // test path
59+
if err != nil {
60+
t.Fatalf("read schema: %v", err)
61+
}
62+
var got ddbPublicSchema
63+
if err := json.Unmarshal(body, &got); err != nil {
64+
t.Fatalf("unmarshal schema: %v", err)
65+
}
66+
return got
67+
}
68+
69+
func readItemMap(t *testing.T, path string) map[string]any {
70+
t.Helper()
71+
body, err := os.ReadFile(path) //nolint:gosec // test path
72+
if err != nil {
73+
t.Fatalf("read item: %v", err)
74+
}
75+
var got map[string]any
76+
if err := json.Unmarshal(body, &got); err != nil {
77+
t.Fatalf("unmarshal item: %v", err)
78+
}
79+
return got
80+
}
81+
82+
func mustSubMap(t *testing.T, m map[string]any, key string) map[string]any {
83+
t.Helper()
84+
v, ok := m[key].(map[string]any)
85+
if !ok {
86+
t.Fatalf("field %q wrong shape: %v", key, m[key])
87+
}
88+
return v
89+
}
90+
91+
func TestDDB_HashOnlyTableRoundTrip(t *testing.T) {
92+
t.Parallel()
93+
enc, root := newDDBEncoder(t)
94+
schema := &pb.DynamoTableSchema{
95+
TableName: "sessions",
96+
PrimaryKey: &pb.DynamoKeySchema{HashKey: "session_id"},
97+
AttributeDefinitions: map[string]string{"session_id": "S"},
98+
Generation: 1,
99+
}
100+
item := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
101+
"session_id": sAttr("sess-abc123"),
102+
"user_id": sAttr("alice"),
103+
"flags": boolAttr(true),
104+
"count": nAttr("42"),
105+
}}
106+
if err := enc.HandleItem(EncodeDDBItemKey("sessions", 1, "sess-abc123", ""), encodeItemValue(t, item)); err != nil {
107+
t.Fatalf("HandleItem: %v", err)
108+
}
109+
if err := enc.HandleTableMeta(EncodeDDBTableMetaKey("sessions"), encodeSchemaValue(t, schema)); err != nil {
110+
t.Fatalf("HandleTableMeta: %v", err)
111+
}
112+
if err := enc.Finalize(); err != nil {
113+
t.Fatalf("Finalize: %v", err)
114+
}
115+
116+
got := readPublicSchema(t, filepath.Join(root, "dynamodb", "sessions", "_schema.json"))
117+
if got.TableName != "sessions" {
118+
t.Fatalf("table_name = %q", got.TableName)
119+
}
120+
if got.PrimaryKey.HashKey.Name != "session_id" || got.PrimaryKey.HashKey.Type != "S" {
121+
t.Fatalf("primary_key = %+v", got.PrimaryKey)
122+
}
123+
if got.PrimaryKey.RangeKey.Name != "" {
124+
t.Fatalf("hash-only table must have empty range_key, got %+v", got.PrimaryKey.RangeKey)
125+
}
126+
127+
asMap := readItemMap(t, filepath.Join(root, "dynamodb", "sessions", "items", "sess-abc123.json"))
128+
if mustSubMap(t, asMap, "session_id")["S"] != "sess-abc123" {
129+
t.Fatalf("session_id.S = %v", asMap["session_id"])
130+
}
131+
if mustSubMap(t, asMap, "flags")["BOOL"] != true {
132+
t.Fatalf("flags.BOOL = %v", asMap["flags"])
133+
}
134+
}
135+
136+
func TestDDB_CompositeKeyTableRoundTrip(t *testing.T) {
137+
t.Parallel()
138+
enc, root := newDDBEncoder(t)
139+
schema := &pb.DynamoTableSchema{
140+
TableName: "orders",
141+
PrimaryKey: &pb.DynamoKeySchema{
142+
HashKey: "customer_id",
143+
RangeKey: "order_ts",
144+
},
145+
AttributeDefinitions: map[string]string{
146+
"customer_id": "S",
147+
"order_ts": "S",
148+
},
149+
Generation: 1,
150+
}
151+
item := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
152+
"customer_id": sAttr("customer-7421"),
153+
"order_ts": sAttr("2026-04-29T12:00:00Z"),
154+
"total": nAttr("129.50"),
155+
}}
156+
if err := enc.HandleItem(EncodeDDBItemKey("orders", 1, "customer-7421", "2026-04-29T12:00:00Z"), encodeItemValue(t, item)); err != nil {
157+
t.Fatal(err)
158+
}
159+
if err := enc.HandleTableMeta(EncodeDDBTableMetaKey("orders"), encodeSchemaValue(t, schema)); err != nil {
160+
t.Fatal(err)
161+
}
162+
if err := enc.Finalize(); err != nil {
163+
t.Fatal(err)
164+
}
165+
want := filepath.Join(root, "dynamodb", "orders", "items", "customer-7421", "2026-04-29T12%3A00%3A00Z.json")
166+
if _, err := os.Stat(want); err != nil {
167+
t.Fatalf("expected %s, stat err=%v", want, err)
168+
}
169+
}
170+
171+
func TestDDB_BinaryHashKeyRendersAsB64Prefix(t *testing.T) {
172+
t.Parallel()
173+
enc, root := newDDBEncoder(t)
174+
schema := &pb.DynamoTableSchema{
175+
TableName: "blobs",
176+
PrimaryKey: &pb.DynamoKeySchema{
177+
HashKey: "id",
178+
},
179+
AttributeDefinitions: map[string]string{"id": "B"},
180+
}
181+
item := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
182+
"id": bAttr([]byte{0x00, 0x01, 0x02}),
183+
"data": sAttr("v"),
184+
}}
185+
if err := enc.HandleItem(EncodeDDBItemKey("blobs", 1, "doesnt-matter", ""), encodeItemValue(t, item)); err != nil {
186+
t.Fatal(err)
187+
}
188+
if err := enc.HandleTableMeta(EncodeDDBTableMetaKey("blobs"), encodeSchemaValue(t, schema)); err != nil {
189+
t.Fatal(err)
190+
}
191+
if err := enc.Finalize(); err != nil {
192+
t.Fatal(err)
193+
}
194+
want := filepath.Join(root, "dynamodb", "blobs", "items", "b64.AAEC.json")
195+
if _, err := os.Stat(want); err != nil {
196+
t.Fatalf("expected %s, stat err=%v", want, err)
197+
}
198+
}
199+
200+
func TestDDB_OrphanItemsWithoutSchemaWarn(t *testing.T) {
201+
t.Parallel()
202+
enc, _ := newDDBEncoder(t)
203+
var events []string
204+
enc.WithWarnSink(func(event string, _ ...any) {
205+
events = append(events, event)
206+
})
207+
item := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
208+
"id": sAttr("orphan"),
209+
}}
210+
if err := enc.HandleItem(EncodeDDBItemKey("ghost", 1, "orphan", ""), encodeItemValue(t, item)); err != nil {
211+
t.Fatal(err)
212+
}
213+
if err := enc.Finalize(); err != nil {
214+
t.Fatal(err)
215+
}
216+
if len(events) != 1 || events[0] != "ddb_orphan_items" {
217+
t.Fatalf("events = %v", events)
218+
}
219+
}
220+
221+
func TestDDB_RejectsValueWithoutMagic(t *testing.T) {
222+
t.Parallel()
223+
t.Run("schema", func(t *testing.T) {
224+
enc, _ := newDDBEncoder(t)
225+
err := enc.HandleTableMeta(EncodeDDBTableMetaKey("t"), []byte("not-magic"))
226+
if !errors.Is(err, ErrDDBInvalidSchema) {
227+
t.Fatalf("err=%v", err)
228+
}
229+
})
230+
t.Run("item", func(t *testing.T) {
231+
enc, _ := newDDBEncoder(t)
232+
err := enc.HandleItem(EncodeDDBItemKey("t", 1, "h", ""), []byte("not-magic"))
233+
if !errors.Is(err, ErrDDBInvalidItem) {
234+
t.Fatalf("err=%v", err)
235+
}
236+
})
237+
}
238+
239+
func TestDDB_RejectsItemMissingHashKeyAttribute(t *testing.T) {
240+
t.Parallel()
241+
enc, _ := newDDBEncoder(t)
242+
schema := &pb.DynamoTableSchema{
243+
TableName: "t", PrimaryKey: &pb.DynamoKeySchema{HashKey: "id"},
244+
AttributeDefinitions: map[string]string{"id": "S"},
245+
}
246+
item := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
247+
// "id" is missing
248+
"other": sAttr("v"),
249+
}}
250+
if err := enc.HandleItem(EncodeDDBItemKey("t", 1, "x", ""), encodeItemValue(t, item)); err != nil {
251+
t.Fatal(err)
252+
}
253+
if err := enc.HandleTableMeta(EncodeDDBTableMetaKey("t"), encodeSchemaValue(t, schema)); err != nil {
254+
t.Fatal(err)
255+
}
256+
err := enc.Finalize()
257+
if !errors.Is(err, ErrDDBInvalidItem) {
258+
t.Fatalf("Finalize err=%v want ErrDDBInvalidItem", err)
259+
}
260+
}
261+
262+
func TestDDB_GSIRowsIgnored(t *testing.T) {
263+
t.Parallel()
264+
enc, _ := newDDBEncoder(t)
265+
if err := enc.HandleGSIRow([]byte("!ddb|gsi|whatever"), []byte("opaque")); err != nil {
266+
t.Fatalf("HandleGSIRow should be a no-op, err=%v", err)
267+
}
268+
}
269+
270+
func TestDDB_AllAttributeKindsRoundTripThroughJSON(t *testing.T) {
271+
t.Parallel()
272+
enc, root := newDDBEncoder(t)
273+
schema := &pb.DynamoTableSchema{
274+
TableName: "kitchensink", PrimaryKey: &pb.DynamoKeySchema{HashKey: "id"},
275+
AttributeDefinitions: map[string]string{"id": "S"},
276+
}
277+
item := &pb.DynamoItem{Attributes: map[string]*pb.DynamoAttributeValue{
278+
"id": sAttr("k"),
279+
"s": sAttr("hi"),
280+
"n": nAttr("1.5"),
281+
"b": bAttr([]byte{0xff, 0x01}),
282+
"bool_t": boolAttr(true),
283+
"null_a": {Value: &pb.DynamoAttributeValue_NullValue{NullValue: true}},
284+
"ss": {Value: &pb.DynamoAttributeValue_Ss{Ss: &pb.DynamoStringSet{Values: []string{"a", "b"}}}},
285+
"ns": {Value: &pb.DynamoAttributeValue_Ns{Ns: &pb.DynamoNumberSet{Values: []string{"1", "2"}}}},
286+
"bs": {Value: &pb.DynamoAttributeValue_Bs{Bs: &pb.DynamoBinarySet{Values: [][]byte{{0x01}, {0x02}}}}},
287+
"l": {Value: &pb.DynamoAttributeValue_L{L: &pb.DynamoAttributeValueList{Values: []*pb.DynamoAttributeValue{sAttr("x"), nAttr("9")}}}},
288+
"m": {Value: &pb.DynamoAttributeValue_M{M: &pb.DynamoAttributeValueMap{Values: map[string]*pb.DynamoAttributeValue{"k1": sAttr("v1")}}}},
289+
}}
290+
if err := enc.HandleItem(EncodeDDBItemKey("kitchensink", 1, "k", ""), encodeItemValue(t, item)); err != nil {
291+
t.Fatal(err)
292+
}
293+
if err := enc.HandleTableMeta(EncodeDDBTableMetaKey("kitchensink"), encodeSchemaValue(t, schema)); err != nil {
294+
t.Fatal(err)
295+
}
296+
if err := enc.Finalize(); err != nil {
297+
t.Fatal(err)
298+
}
299+
got := readItemMap(t, filepath.Join(root, "dynamodb", "kitchensink", "items", "k.json"))
300+
// Spot-check a few attributes; full per-attribute assertions live
301+
// in the dedicated attributeValueToPublic tests below.
302+
if mustSubMap(t, got, "s")["S"] != "hi" {
303+
t.Fatalf("s = %v", got["s"])
304+
}
305+
if mustSubMap(t, got, "bool_t")["BOOL"] != true {
306+
t.Fatalf("bool_t = %v", got["bool_t"])
307+
}
308+
lInner, ok := mustSubMap(t, got, "l")["L"].([]any)
309+
if !ok {
310+
t.Fatalf("l[\"L\"] wrong shape: %v", mustSubMap(t, got, "l")["L"])
311+
}
312+
if len(lInner) != 2 {
313+
t.Fatalf("l[\"L\"] len = %d want 2", len(lInner))
314+
}
315+
}
316+
317+
func TestDDB_AttributeValueToPublic_EmptyOneofSurfacedAsNull(t *testing.T) {
318+
t.Parallel()
319+
got := attributeValueToPublic(&pb.DynamoAttributeValue{})
320+
if got["NULL"] != true {
321+
t.Fatalf("got %v want NULL=true", got)
322+
}
323+
}
324+
325+
func TestDDB_BundleJSONLNotImplementedYet(t *testing.T) {
326+
t.Parallel()
327+
enc, _ := newDDBEncoder(t)
328+
enc.WithBundleJSONL(true)
329+
err := enc.Finalize()
330+
if err == nil {
331+
t.Fatalf("expected not-implemented error from Finalize on bundle mode")
332+
}
333+
}
334+
335+
func TestDDB_RejectsKeyWithMissingTableSegment(t *testing.T) {
336+
t.Parallel()
337+
enc, _ := newDDBEncoder(t)
338+
// Missing the table segment entirely.
339+
err := enc.HandleItem([]byte(DDBItemPrefix), []byte("ignored"))
340+
if !errors.Is(err, ErrDDBMalformedKey) {
341+
t.Fatalf("err=%v", err)
342+
}
343+
}

0 commit comments

Comments
 (0)