From fcb0736b3f12f2043ea46e0f6f2eebd7c4bab2e8 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 14 May 2026 14:20:34 +0100 Subject: [PATCH 1/7] confluent: resolve named-type references in nullable Avro unions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avro JSON schemas may reference a previously-defined record/enum/fixed by name rather than inlining the full definition — the Java/JDBC idiom for any record reused across more than one field, e.g. {"name": "secondary_fee", "type": ["null", "Fee"]} where "Fee" was defined inline by an earlier field. The metadata parser in ecsAvroParseFromBytes was treating the string branch as an unknown type and falling through to schema.Any, so the resulting common-schema metadata reported the field as VARCHAR rather than the registered record structure. Downstream sinks (notably iceberg) then created a string column where the customer expected a nested struct. Thread a names map through ecsAvroConfig, register every record/enum/ fixed by both its simple name and its fully-qualified namespace.name form, and have the string-form type resolver consult the map before falling back to the primitive-name lookup. Also generalise the optional- union helper to accept either ordering -- [null, X] and [X, null] -- since the Avro spec doesn't constrain branch order. The lexical-scope assumption -- a name must be defined before it is referenced -- is the Avro spec's, so a single forward-only pass suffices. Self-referential records remain unsupported and would need pre- registration with a placeholder; flagged in the registration helper's comment. Tests cover the four shapes CON-468's acceptance criteria call out: nullable inline record (already green via #4380), nullable record by name reference, both branch orderings, fully-qualified vs short-name references, and record-with-nested-record where the inner level is itself a name-reference union. --- internal/impl/confluent/ecs_avro.go | 180 ++++++++++++++------- internal/impl/confluent/ecs_avro_test.go | 198 +++++++++++++++++++++++ 2 files changed, 323 insertions(+), 55 deletions(-) diff --git a/internal/impl/confluent/ecs_avro.go b/internal/impl/confluent/ecs_avro.go index 09c53977c5..862f56f180 100644 --- a/internal/impl/confluent/ecs_avro.go +++ b/internal/impl/confluent/ecs_avro.go @@ -74,6 +74,16 @@ type ecsAvroConfig struct { // but the metadata still claims Int64 — the exact mismatch fixed // for sibling-form Avro by [applyAvroLogicalType]. translateKafkaConnectTypes bool + + // names accumulates resolved record/enum/fixed definitions during a + // single parse so that string-form references (e.g. "Fee" in + // ["null", "Fee"]) can be expanded to their full Common shape. The + // map is lazily allocated by ecsAvroParseFromBytes and shared by + // reference through recursive ecsAvroFromAnyMap calls; mutations from + // sub-trees propagate to later siblings as the Avro spec requires + // (named types are lexically scoped from the schema root, and a name + // must be defined before it is referenced). + names map[string]schema.Common } // ecsAvroParseFromBytes parses an Avro JSON spec into a schema.Common. The @@ -81,6 +91,9 @@ type ecsAvroConfig struct { // decimal field paths during value normalisation; callers that just want // the metadata copy can call ToAny() on the result. func ecsAvroParseFromBytes(cfg ecsAvroConfig, specBytes []byte) (schema.Common, error) { + if cfg.names == nil { + cfg.names = map[string]schema.Common{} + } var as any if err := json.Unmarshal(specBytes, &as); err != nil { return schema.Common{}, err @@ -109,51 +122,75 @@ func ecsAvroParseFromBytes(cfg ecsAvroConfig, specBytes []byte) (schema.Common, return schema.Common{}, fmt.Errorf("expected either an array or object at root of schema, got %T", as) } -// If the union is actually just a verbose way of defining an optional field -// then we return the real type and true. E.g. if we see: +// ecsAvroResolveOptionalUnion checks whether a 2-element union is just a +// nullable wrapper (one branch is "null") and returns the resolved non-null +// branch as a Common with Optional=true. // -// `"type": [ "null", "string" ]` +// Handles both orderings ([null, X] and [X, null]) and resolves the non-null +// branch in three forms: // -// Then we return string and true. -func ecsAvroIsUnionJustOptional(types []any) (schema.CommonType, bool) { - if len(types) != 2 { - return schema.CommonType(-1), false - } - - firstTypeStr, ok := types[0].(string) - if !ok || firstTypeStr != "null" { - return schema.CommonType(-1), false - } - - secondTypeStr, ok := types[1].(string) - if !ok { - return schema.CommonType(-1), false - } - - return ecsAvroTypeToCommon(secondTypeStr), true -} - -// ecsAvroIsUnionJustOptionalObject mirrors ecsAvroIsUnionJustOptional but -// for the [null, {object}] shape — Avro's idiom for a nullable named or -// logically-typed field. Returns the resolved Common (with any -// LogicalParams populated) and true on match. -func ecsAvroIsUnionJustOptionalObject(cfg ecsAvroConfig, types []any) (schema.Common, bool) { +// - a primitive type name string (e.g. "string"), +// - a previously-defined named-type reference string (e.g. "Fee"), +// resolved via cfg.names per the Avro lexical-scope rule, +// - an inline type definition object (e.g. {"type":"record",...}). +// +// Returns (Common{}, false) when the union doesn't fit this shape (length +// != 2, no "null" branch, two non-null branches, or an inline object that +// fails to parse). +func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common, bool) { if len(types) != 2 { return schema.Common{}, false } - firstStr, ok := types[0].(string) - if !ok || firstStr != "null" { + var other any + for _, t := range types { + if s, ok := t.(string); ok && s == "null" { + continue + } + if other != nil { + // Two non-null branches — not a nullable wrapper. + return schema.Common{}, false + } + other = t + } + if other == nil { return schema.Common{}, false } - secondMap, ok := types[1].(map[string]any) + inner, ok := ecsAvroResolveTypeRef(cfg, other) if !ok { return schema.Common{}, false } - c, err := ecsAvroFromAnyMap(cfg, secondMap) - if err != nil { - return schema.Common{}, false + inner.Optional = true + return inner, true +} + +// ecsAvroResolveTypeRef resolves a single Avro type reference — the value +// of a "type" field, or one branch of a union — to a Common. The reference +// may be a primitive type name string, a previously-defined named-type +// reference string (resolved via cfg.names), or an inline type definition +// object. +// +// Returns (Common{}, false) only when an inline object fails to parse. +// Unknown string names fall back to schema.Any so that downstream sinks +// see a sensible (if structureless) column rather than a parse error. +func ecsAvroResolveTypeRef(cfg ecsAvroConfig, ref any) (schema.Common, bool) { + switch b := ref.(type) { + case string: + // Try the names map first so a name reference takes priority over + // the schema.Any fallback in ecsAvroTypeToCommon. Primitive names + // are never registered in the map, so primitives reach the + // fallback unchanged. + if resolved, ok := cfg.names[b]; ok { + return resolved, true + } + return schema.Common{Type: ecsAvroTypeToCommon(b)}, true + case map[string]any: + inner, err := ecsAvroFromAnyMap(cfg, b) + if err != nil { + return schema.Common{}, false + } + return inner, true } - return c, true + return schema.Common{}, false } // applyAvroLogicalType reads the optional "logicalType" annotation from an @@ -471,38 +508,25 @@ func ecsAvroTypeToCommon(t string) schema.CommonType { } func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) error { - // [null, primitive-name] → Optional . - if t, optional := ecsAvroIsUnionJustOptional(types); optional { - c.Type, c.Optional = t, true - return nil - } - // [null, {object}] → Optional , propagating logical params and - // nested children. This catches the common Avro idiom for nullable - // decimal/timestamp/etc. logical types. - if inner, ok := ecsAvroIsUnionJustOptionalObject(cfg, types); ok { + // [null, X] or [X, null] → Optional X. ecsAvroResolveOptionalUnion + // handles primitive names, named-type references, and inline objects + // in either ordering. + if inner, ok := ecsAvroResolveOptionalUnion(cfg, types); ok { name := c.Name *c = inner if name != "" { c.Name = name } - c.Optional = true return nil } c.Type = schema.Union for i, uObj := range types { - switch ut := uObj.(type) { - case string: - c.Children = append(c.Children, schema.Common{ - Type: ecsAvroTypeToCommon(ut), - }) - case map[string]any: - tmpC, err := ecsAvroFromAnyMap(cfg, ut) - if err != nil { - return fmt.Errorf("union `%v` child '%v': %w", c.Name, i, err) - } - c.Children = append(c.Children, tmpC) + child, ok := ecsAvroResolveTypeRef(cfg, uObj) + if !ok { + return fmt.Errorf("union `%v` child '%v': could not resolve type %T", c.Name, i, uObj) } + c.Children = append(c.Children, child) } return nil } @@ -544,6 +568,42 @@ func ecsAvroHydrateLameUnion(cfg ecsAvroConfig, c *schema.Common, types []any) e } func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) { + c, err := ecsAvroFromAnyMapImpl(cfg, as) + if err == nil { + ecsAvroRegisterNamedType(cfg, as, c) + } + return c, err +} + +// ecsAvroRegisterNamedType records a resolved record/enum/fixed Common in +// cfg.names so that later string-form references (e.g. "Fee" instead of an +// inline record definition) can be expanded. Avro's lexical-scope rule +// requires the name to appear before any reference, so a single forward-only +// pass through the schema is sufficient. Recursive types — where a record's +// own children reference it by name before its definition completes — are +// not supported by this approach; the registered entry must not be mutated +// after registration to avoid surprising aliasing with later look-ups. +func ecsAvroRegisterNamedType(cfg ecsAvroConfig, as map[string]any, c schema.Common) { + if cfg.names == nil { + return + } + typeName, _ := as["type"].(string) + switch typeName { + case "record", "enum", "fixed": + default: + return + } + name, _ := as["name"].(string) + if name == "" { + return + } + cfg.names[name] = c + if ns, _ := as["namespace"].(string); ns != "" { + cfg.names[ns+"."+name] = c + } +} + +func ecsAvroFromAnyMapImpl(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) { var c schema.Common c.Name, _ = as["name"].(string) @@ -580,6 +640,16 @@ func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, err } return c, nil case string: + // String form may be a primitive type name OR a name reference to + // a previously-defined record/enum/fixed in lexical scope. + if resolved, ok := cfg.names[t]; ok { + fieldName := c.Name + c = resolved + if fieldName != "" { + c.Name = fieldName + } + return c, nil + } c.Type = ecsAvroTypeToCommon(t) case map[string]any: // The type field is an object (e.g. {"type":"map","values":"long"}). diff --git a/internal/impl/confluent/ecs_avro_test.go b/internal/impl/confluent/ecs_avro_test.go index 9cf658e570..09ffb23217 100644 --- a/internal/impl/confluent/ecs_avro_test.go +++ b/internal/impl/confluent/ecs_avro_test.go @@ -269,6 +269,204 @@ func TestEcsAvroRawUnionNestedRecord(t *testing.T) { assert.Equal(t, schema.String, party.Children[0].Type) } +// TestEcsAvroRawUnionNullableRecordByName is the CON-468 regression for +// nullable record unions where the non-null branch is a string name +// reference to a previously-defined record (rather than an inline +// definition). The Avro JSON spec requires named types to be defined once +// then referenced by name, so any non-trivial customer schema with reused +// records will exercise this path. +func TestEcsAvroRawUnionNullableRecordByName(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "fields": [ + { + "name": "primary_fee", + "type": { + "type": "record", + "name": "Fee", + "fields": [ + {"name": "amount", "type": "long"}, + {"name": "currency", "type": "string"} + ] + } + }, + {"name": "secondary_fee", "type": ["null", "Fee"], "default": null} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Equal(t, schema.Object, c.Type) + require.Len(t, c.Children, 2) + + primary := c.Children[0] + assert.Equal(t, "primary_fee", primary.Name) + require.Equal(t, schema.Object, primary.Type) + require.Len(t, primary.Children, 2) + + secondary := c.Children[1] + assert.Equal(t, "secondary_fee", secondary.Name) + require.Equal(t, schema.Object, secondary.Type, "name reference to Fee should resolve to the same record structure, not VARCHAR") + assert.True(t, secondary.Optional) + require.Len(t, secondary.Children, 2) + assert.Equal(t, "amount", secondary.Children[0].Name) + assert.Equal(t, schema.Int64, secondary.Children[0].Type) + assert.Equal(t, "currency", secondary.Children[1].Name) + assert.Equal(t, schema.String, secondary.Children[1].Type) +} + +// TestEcsAvroRawUnionNullableOrderIndependence covers CON-468 acceptance +// criterion 2: the [, "null"] ordering (null second) must resolve +// identically to ["null", ] (null first), across inline objects, +// primitives, and name references. +func TestEcsAvroRawUnionNullableOrderIndependence(t *testing.T) { + t.Run("inline record, null second", func(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "fields": [{ + "name": "fee", + "type": [{ + "type": "record", + "name": "Fee", + "fields": [{"name": "amount", "type": "long"}] + }, "null"] + }] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + fee := c.Children[0] + assert.Equal(t, schema.Object, fee.Type) + assert.True(t, fee.Optional) + require.Len(t, fee.Children, 1) + assert.Equal(t, "amount", fee.Children[0].Name) + }) + + t.Run("primitive, null second", func(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "fields": [{"name": "ref", "type": ["string", "null"]}] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + ref := c.Children[0] + assert.Equal(t, schema.String, ref.Type) + assert.True(t, ref.Optional) + }) + + t.Run("name reference, null second", func(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "fields": [ + { + "name": "primary_fee", + "type": { + "type": "record", + "name": "Fee", + "fields": [{"name": "amount", "type": "long"}] + } + }, + {"name": "secondary_fee", "type": ["Fee", "null"]} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + secondary := c.Children[1] + assert.Equal(t, schema.Object, secondary.Type) + assert.True(t, secondary.Optional) + require.Len(t, secondary.Children, 1) + assert.Equal(t, "amount", secondary.Children[0].Name) + }) +} + +// TestEcsAvroRawUnionNullableRecordNamespaced verifies that namespaced +// record names can be referenced either by short name (Fee) or by fully- +// qualified name (com.example.Fee), matching the Avro spec's name +// resolution rules. +func TestEcsAvroRawUnionNullableRecordNamespaced(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "namespace": "com.example", + "fields": [ + { + "name": "primary_fee", + "type": { + "type": "record", + "name": "Fee", + "namespace": "com.example", + "fields": [{"name": "amount", "type": "long"}] + } + }, + {"name": "by_short_name", "type": ["null", "Fee"]}, + {"name": "by_full_name", "type": ["null", "com.example.Fee"]} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 3) + + short := c.Children[1] + assert.Equal(t, "by_short_name", short.Name) + assert.Equal(t, schema.Object, short.Type) + require.Len(t, short.Children, 1) + + full := c.Children[2] + assert.Equal(t, "by_full_name", full.Name) + assert.Equal(t, schema.Object, full.Type) + require.Len(t, full.Children, 1) +} + +// TestEcsAvroRawUnionNullableRecordNested covers CON-468 acceptance +// criterion 2's "record-with-nested-record" case: a nullable record union +// whose record contains its own nullable record union, both as inline +// definitions and as name references at the inner level. +func TestEcsAvroRawUnionNullableRecordNested(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "fields": [ + { + "name": "inner_template", + "type": { + "type": "record", + "name": "Inner", + "fields": [{"name": "code", "type": "string"}] + } + }, + { + "name": "outer", + "type": ["null", { + "type": "record", + "name": "Outer", + "fields": [ + {"name": "label", "type": "string"}, + {"name": "inner", "type": ["null", "Inner"]} + ] + }] + } + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 2) + + outer := c.Children[1] + assert.Equal(t, "outer", outer.Name) + assert.Equal(t, schema.Object, outer.Type) + assert.True(t, outer.Optional) + require.Len(t, outer.Children, 2) + + inner := outer.Children[1] + assert.Equal(t, "inner", inner.Name) + assert.Equal(t, schema.Object, inner.Type, "nested name reference must resolve, not collapse to VARCHAR") + assert.True(t, inner.Optional) + require.Len(t, inner.Children, 1) + assert.Equal(t, "code", inner.Children[0].Name) +} + // TestEcsAvroRecordWithNilFields is a regression test for schemas where a // field's type is a record object without a "fields" key (e.g. back-reference // form {"type":"record","name":"Party"} or "fields":null from some generators). From d196837f6f1beb587d8ef02cb9aa8be15f0aaef8 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 14 May 2026 14:21:12 +0100 Subject: [PATCH 2/7] confluent: resolve named-type references in lame Avro unions too The lame-union path -- raw_unions: false on schema_registry_decode, which is the documented default -- carried the same bug as the raw path: string branches like "Fee" in ["null", "Fee"] went through ecsAvroTypeToCommon directly and collapsed to schema.Any, even when "Fee" was a previously- defined record. The tagged-JSON envelope around each branch then wrapped an Any inner, producing a structureless metadata tree. Reroute the lame hydrator through the same ecsAvroResolveTypeRef helper the raw path now uses, then re-apply the lame-specific wrapping (tagged-Object envelope, type-name preserved as Common.Name to match the wire-form tag). The non-Avro behavior of the lame envelope is unchanged; only the inner Common is now correctly populated for name references. This closes the same CON-468 bug class for the default-config path that commit 4531c5171 closed for the raw-unions path. --- internal/impl/confluent/ecs_avro.go | 21 ++++------ internal/impl/confluent/ecs_avro_test.go | 49 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 13 deletions(-) diff --git a/internal/impl/confluent/ecs_avro.go b/internal/impl/confluent/ecs_avro.go index 862f56f180..affae91ac5 100644 --- a/internal/impl/confluent/ecs_avro.go +++ b/internal/impl/confluent/ecs_avro.go @@ -534,19 +534,14 @@ func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) er func ecsAvroHydrateLameUnion(cfg ecsAvroConfig, c *schema.Common, types []any) error { c.Type = schema.Union for i, uObj := range types { - var childT schema.Common - - switch ut := uObj.(type) { - case string: - childT = schema.Common{ - Name: ut, - Type: ecsAvroTypeToCommon(ut), - } - case map[string]any: - var err error - if childT, err = ecsAvroFromAnyMap(cfg, ut); err != nil { - return fmt.Errorf("union `%v` child '%v': %w", c.Name, i, err) - } + childT, ok := ecsAvroResolveTypeRef(cfg, uObj) + if !ok { + return fmt.Errorf("union `%v` child '%v': could not resolve type %T", c.Name, i, uObj) + } + if s, isStr := uObj.(string); isStr { + // Lame-union children keep the type-name as the Common.Name so + // the tagged-JSON envelope key matches the Avro wire form. + childT.Name = s } if childT.Type == schema.Null { diff --git a/internal/impl/confluent/ecs_avro_test.go b/internal/impl/confluent/ecs_avro_test.go index 09ffb23217..0ee273734b 100644 --- a/internal/impl/confluent/ecs_avro_test.go +++ b/internal/impl/confluent/ecs_avro_test.go @@ -467,6 +467,55 @@ func TestEcsAvroRawUnionNullableRecordNested(t *testing.T) { assert.Equal(t, "code", inner.Children[0].Name) } +// TestEcsAvroLameUnionNameResolution covers the same CON-468 bug class in +// the lame-union (rawUnion=false) path that the schema_registry_decode +// processor takes by default. The lame envelope wraps each branch in a +// tagged Object so the wire shape stays the same, but the inner Common +// must still expand previously-defined named-type references rather than +// collapsing them to schema.Any. +func TestEcsAvroLameUnionNameResolution(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "fields": [ + { + "name": "primary_fee", + "type": { + "type": "record", + "name": "Fee", + "fields": [{"name": "amount", "type": "long"}] + } + }, + {"name": "secondary_fee", "type": ["null", "Fee"]} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{}, spec) + require.NoError(t, err) + + secondary := c.Children[1] + assert.Equal(t, "secondary_fee", secondary.Name) + assert.Equal(t, schema.Union, secondary.Type) + require.Len(t, secondary.Children, 2) + + // One child is the null branch (Common.Type=Null, Name=""). + // The other is the tagged-Object envelope wrapping the resolved Fee. + var feeEnvelope schema.Common + for _, ch := range secondary.Children { + if ch.Type == schema.Object { + feeEnvelope = ch + break + } + } + require.Equal(t, schema.Object, feeEnvelope.Type, "Fee branch should be tagged-Object envelope") + require.Len(t, feeEnvelope.Children, 1) + feeInner := feeEnvelope.Children[0] + assert.Equal(t, "Fee", feeInner.Name) + assert.Equal(t, schema.Object, feeInner.Type, "name reference to Fee should resolve to its record structure, not schema.Any") + require.Len(t, feeInner.Children, 1) + assert.Equal(t, "amount", feeInner.Children[0].Name) + assert.Equal(t, schema.Int64, feeInner.Children[0].Type) +} + // TestEcsAvroRecordWithNilFields is a regression test for schemas where a // field's type is a record object without a "fields" key (e.g. back-reference // form {"type":"record","name":"Party"} or "fields":null from some generators). From b2bf493a0b377b1aec7d856c43c999ba01862f11 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 18 May 2026 10:08:15 +0100 Subject: [PATCH 3/7] confluent: harden Avro name resolution and union error wrapping Polish pass on the Avro JSON metadata parser following local review: - Implement the Avro spec's namespace-inheritance rule. A name with no dot and no `namespace` field inherits the most-tightly-enclosing namespace; the new ecsAvroAssignFullname helper handles all three spelling forms (dot-in-name, explicit namespace, inheritance) and is mirrored by an inheritance-aware ecsAvroLookupName that tries `.` before the bare name. - Pre-register a structural-stub placeholder before walking a record's children so a self-reference (linked-list style) resolves to a one- level Object stub instead of collapsing to schema.Any. Mutual recursion across distinct records remains out of scope. - Deep-copy entries on retrieval from the names map (cloneCommon) so callers can mutate the returned Common without corrupting later look-ups. Removes a latent aliasing footgun. - Restore the %w-wrapping contract through the union resolvers. ecsAvroResolveTypeRef now returns (Common, error) and ecsAvroResolveOptionalUnion returns (Common, bool, error); both union hydrators wrap the inner cause, so a malformed inline decimal surfaces as "decimal precision: not an integer: ..." rather than the type- stringifier fallback "could not resolve type map[string]interface{}". Tests cover namespace inheritance with short and inherited-FQN refs, dot-form names overriding the namespace field, self-reference stubs, names-map immutability after caller mutation, and %w propagation across raw / lame union paths and nullable / general-union shapes. --- internal/impl/confluent/ecs_avro.go | 220 +++++++++++++++++------ internal/impl/confluent/ecs_avro_test.go | 206 +++++++++++++++++++++ 2 files changed, 367 insertions(+), 59 deletions(-) diff --git a/internal/impl/confluent/ecs_avro.go b/internal/impl/confluent/ecs_avro.go index affae91ac5..36470b51a9 100644 --- a/internal/impl/confluent/ecs_avro.go +++ b/internal/impl/confluent/ecs_avro.go @@ -75,15 +75,19 @@ type ecsAvroConfig struct { // for sibling-form Avro by [applyAvroLogicalType]. translateKafkaConnectTypes bool - // names accumulates resolved record/enum/fixed definitions during a - // single parse so that string-form references (e.g. "Fee" in - // ["null", "Fee"]) can be expanded to their full Common shape. The - // map is lazily allocated by ecsAvroParseFromBytes and shared by - // reference through recursive ecsAvroFromAnyMap calls; mutations from - // sub-trees propagate to later siblings as the Avro spec requires - // (named types are lexically scoped from the schema root, and a name - // must be defined before it is referenced). + // names is the lexical-scope registry of resolved record/enum/fixed + // definitions, keyed by Avro fullname (and short name for + // convenience). Stored values must be treated as immutable — every + // retrieval clones via cloneCommon so callers can mutate freely + // without corrupting later look-ups. names map[string]schema.Common + + // namespace is the enclosing Avro namespace, threaded through the + // recursion by value. It is updated when entering a named-type + // declaration that introduces a new namespace, per the Avro spec's + // inheritance rule (a name with no dots and no `namespace` field + // inherits the most tightly enclosing namespace). + namespace string } // ecsAvroParseFromBytes parses an Avro JSON spec into a schema.Common. The @@ -134,12 +138,14 @@ func ecsAvroParseFromBytes(cfg ecsAvroConfig, specBytes []byte) (schema.Common, // resolved via cfg.names per the Avro lexical-scope rule, // - an inline type definition object (e.g. {"type":"record",...}). // -// Returns (Common{}, false) when the union doesn't fit this shape (length -// != 2, no "null" branch, two non-null branches, or an inline object that -// fails to parse). -func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common, bool) { +// The matched bool reports whether the union has the [null, X] / [X, null] +// shape; the error is non-nil only when the shape matched but resolving the +// non-null branch failed (e.g. a malformed inline decimal). Callers must +// surface the error rather than falling through to the general-union path — +// the fall-through would also fail, with a less informative message. +func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (resolved schema.Common, matched bool, err error) { if len(types) != 2 { - return schema.Common{}, false + return schema.Common{}, false, nil } var other any for _, t := range types { @@ -148,19 +154,19 @@ func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common, } if other != nil { // Two non-null branches — not a nullable wrapper. - return schema.Common{}, false + return schema.Common{}, false, nil } other = t } if other == nil { - return schema.Common{}, false + return schema.Common{}, false, nil } - inner, ok := ecsAvroResolveTypeRef(cfg, other) - if !ok { - return schema.Common{}, false + inner, err := ecsAvroResolveTypeRef(cfg, other) + if err != nil { + return schema.Common{}, true, err } inner.Optional = true - return inner, true + return inner, true, nil } // ecsAvroResolveTypeRef resolves a single Avro type reference — the value @@ -169,28 +175,26 @@ func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common, // reference string (resolved via cfg.names), or an inline type definition // object. // -// Returns (Common{}, false) only when an inline object fails to parse. -// Unknown string names fall back to schema.Any so that downstream sinks -// see a sensible (if structureless) column rather than a parse error. -func ecsAvroResolveTypeRef(cfg ecsAvroConfig, ref any) (schema.Common, bool) { +// Unknown string names fall back to schema.Any so downstream sinks see a +// sensible (if structureless) column. An error is returned only when an +// inline object fails to parse (the wrapped cause flows back to the caller) +// or when ref is neither a string nor a map (a malformed Avro JSON shape +// the upstream parser couldn't reject). +func ecsAvroResolveTypeRef(cfg ecsAvroConfig, ref any) (schema.Common, error) { switch b := ref.(type) { case string: // Try the names map first so a name reference takes priority over // the schema.Any fallback in ecsAvroTypeToCommon. Primitive names // are never registered in the map, so primitives reach the // fallback unchanged. - if resolved, ok := cfg.names[b]; ok { - return resolved, true + if resolved, ok := ecsAvroLookupName(cfg, b); ok { + return resolved, nil } - return schema.Common{Type: ecsAvroTypeToCommon(b)}, true + return schema.Common{Type: ecsAvroTypeToCommon(b)}, nil case map[string]any: - inner, err := ecsAvroFromAnyMap(cfg, b) - if err != nil { - return schema.Common{}, false - } - return inner, true + return ecsAvroFromAnyMap(cfg, b) } - return schema.Common{}, false + return schema.Common{}, fmt.Errorf("expected type reference to be a string or object, got %T", ref) } // applyAvroLogicalType reads the optional "logicalType" annotation from an @@ -477,6 +481,56 @@ func applyKafkaConnectType(cfg ecsAvroConfig, c *schema.Common, as map[string]an } } +// ecsAvroLookupName resolves a string reference to a previously-registered +// named type, applying Avro's name-resolution rules: a reference that +// contains a dot is treated as a fullname; an unqualified reference is +// looked up first against the enclosing namespace, then against the bare +// name as a fallback for root-scope references. +// +// The returned Common is cloned so callers can mutate it freely without +// corrupting the registered entry. +func ecsAvroLookupName(cfg ecsAvroConfig, ref string) (schema.Common, bool) { + if !strings.ContainsRune(ref, '.') && cfg.namespace != "" { + if resolved, ok := cfg.names[cfg.namespace+"."+ref]; ok { + return cloneCommon(resolved), true + } + } + if resolved, ok := cfg.names[ref]; ok { + return cloneCommon(resolved), true + } + return schema.Common{}, false +} + +// cloneCommon deep-copies a schema.Common, allocating fresh slice and +// pointer storage for Children and Logical so the result aliases nothing +// with the source. +func cloneCommon(c schema.Common) schema.Common { + if c.Children != nil { + children := make([]schema.Common, len(c.Children)) + for i := range c.Children { + children[i] = cloneCommon(c.Children[i]) + } + c.Children = children + } + if c.Logical != nil { + l := *c.Logical + if l.Decimal != nil { + d := *l.Decimal + l.Decimal = &d + } + if l.Timestamp != nil { + ts := *l.Timestamp + l.Timestamp = &ts + } + if l.TimeOfDay != nil { + tod := *l.TimeOfDay + l.TimeOfDay = &tod + } + c.Logical = &l + } + return c +} + func ecsAvroTypeToCommon(t string) schema.CommonType { switch t { case "record": @@ -511,7 +565,10 @@ func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) er // [null, X] or [X, null] → Optional X. ecsAvroResolveOptionalUnion // handles primitive names, named-type references, and inline objects // in either ordering. - if inner, ok := ecsAvroResolveOptionalUnion(cfg, types); ok { + if inner, matched, err := ecsAvroResolveOptionalUnion(cfg, types); matched { + if err != nil { + return fmt.Errorf("union `%v`: %w", c.Name, err) + } name := c.Name *c = inner if name != "" { @@ -522,9 +579,9 @@ func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) er c.Type = schema.Union for i, uObj := range types { - child, ok := ecsAvroResolveTypeRef(cfg, uObj) - if !ok { - return fmt.Errorf("union `%v` child '%v': could not resolve type %T", c.Name, i, uObj) + child, err := ecsAvroResolveTypeRef(cfg, uObj) + if err != nil { + return fmt.Errorf("union `%v` child '%v': %w", c.Name, i, err) } c.Children = append(c.Children, child) } @@ -534,9 +591,9 @@ func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) er func ecsAvroHydrateLameUnion(cfg ecsAvroConfig, c *schema.Common, types []any) error { c.Type = schema.Union for i, uObj := range types { - childT, ok := ecsAvroResolveTypeRef(cfg, uObj) - if !ok { - return fmt.Errorf("union `%v` child '%v': could not resolve type %T", c.Name, i, uObj) + childT, err := ecsAvroResolveTypeRef(cfg, uObj) + if err != nil { + return fmt.Errorf("union `%v` child '%v': %w", c.Name, i, err) } if s, isStr := uObj.(string); isStr { // Lame-union children keep the type-name as the Common.Name so @@ -563,39 +620,84 @@ func ecsAvroHydrateLameUnion(cfg ecsAvroConfig, c *schema.Common, types []any) e } func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) { + // Pre-register a structural placeholder before walking children so a + // self-reference (e.g. a linked-list record with a `next` field of its + // own type) resolves to a one-level stub rather than collapsing to + // schema.Any. The placeholder is overwritten with the fully-resolved + // Common once the walk completes. Mutual recursion across distinct + // records is still not supported — the second record's placeholder + // does not exist while the first is being walked. + typeName, _ := as["type"].(string) + fullname, shortName, childNamespace := ecsAvroAssignFullname(cfg.namespace, typeName, as) + if fullname != "" { + placeholder := ecsAvroPlaceholder(typeName, shortName) + cfg.names[fullname] = placeholder + if shortName != "" && shortName != fullname { + cfg.names[shortName] = placeholder + } + // Inheritable namespace propagates into the child walk; sibling + // scopes are unaffected because cfg is passed by value. + cfg.namespace = childNamespace + } + c, err := ecsAvroFromAnyMapImpl(cfg, as) - if err == nil { - ecsAvroRegisterNamedType(cfg, as, c) + if err == nil && fullname != "" { + cfg.names[fullname] = c + if shortName != "" && shortName != fullname { + cfg.names[shortName] = c + } } return c, err } -// ecsAvroRegisterNamedType records a resolved record/enum/fixed Common in -// cfg.names so that later string-form references (e.g. "Fee" instead of an -// inline record definition) can be expanded. Avro's lexical-scope rule -// requires the name to appear before any reference, so a single forward-only -// pass through the schema is sufficient. Recursive types — where a record's -// own children reference it by name before its definition completes — are -// not supported by this approach; the registered entry must not be mutated -// after registration to avoid surprising aliasing with later look-ups. -func ecsAvroRegisterNamedType(cfg ecsAvroConfig, as map[string]any, c schema.Common) { - if cfg.names == nil { - return - } - typeName, _ := as["type"].(string) +// ecsAvroAssignFullname computes the Avro fullname of a named-type +// declaration ([record, enum, fixed]) from its declaration map and the +// enclosing namespace, alongside the short name and the namespace that +// should be threaded into the child walk. Returns empty fullname when the +// node is not a named-type declaration or lacks a `name` field. +// +// Per the Avro spec (`Names` section): +// 1. If `name` contains a dot, it IS the fullname and any `namespace` +// field is ignored. +// 2. Else if `namespace` is set, the fullname is `namespace.name`. +// 3. Else the fullname inherits the enclosing namespace. +func ecsAvroAssignFullname(enclosing, typeName string, as map[string]any) (fullname, shortName, childNamespace string) { switch typeName { case "record", "enum", "fixed": default: - return + return "", "", enclosing } name, _ := as["name"].(string) if name == "" { - return + return "", "", enclosing + } + if idx := strings.LastIndex(name, "."); idx >= 0 { + return name, name[idx+1:], name[:idx] } - cfg.names[name] = c if ns, _ := as["namespace"].(string); ns != "" { - cfg.names[ns+"."+name] = c + return ns + "." + name, name, ns + } + if enclosing != "" { + return enclosing + "." + name, name, enclosing + } + return name, name, "" +} + +// ecsAvroPlaceholder returns the structural stub that stands in for a +// self-referencing named type while its definition is being walked. The +// placeholder uses the short name (matching what ecsAvroFromAnyMapImpl +// would set) and the closest leaf type so downstream sinks see a coherent +// shape rather than schema.Any. +func ecsAvroPlaceholder(typeName, shortName string) schema.Common { + switch typeName { + case "record": + return schema.Common{Name: shortName, Type: schema.Object} + case "enum": + return schema.Common{Name: shortName, Type: schema.String} + case "fixed": + return schema.Common{Name: shortName, Type: schema.ByteArray} } + return schema.Common{Name: shortName} } func ecsAvroFromAnyMapImpl(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) { @@ -637,7 +739,7 @@ func ecsAvroFromAnyMapImpl(cfg ecsAvroConfig, as map[string]any) (schema.Common, case string: // String form may be a primitive type name OR a name reference to // a previously-defined record/enum/fixed in lexical scope. - if resolved, ok := cfg.names[t]; ok { + if resolved, ok := ecsAvroLookupName(cfg, t); ok { fieldName := c.Name c = resolved if fieldName != "" { diff --git a/internal/impl/confluent/ecs_avro_test.go b/internal/impl/confluent/ecs_avro_test.go index 0ee273734b..9cee176338 100644 --- a/internal/impl/confluent/ecs_avro_test.go +++ b/internal/impl/confluent/ecs_avro_test.go @@ -516,6 +516,212 @@ func TestEcsAvroLameUnionNameResolution(t *testing.T) { assert.Equal(t, schema.Int64, feeInner.Children[0].Type) } +// TestEcsAvroUnionInlineErrorPropagation pins down the %w-wrapping +// contract through the union resolvers. A malformed inline decimal sitting +// inside a nullable union must surface its root-cause error (the precision +// parse failure), not collapse to a generic "could not resolve type +// map[string]interface{}". Covers both the optional-union fast path and the +// general union fall-through, and both raw- and lame-union flavours. +func TestEcsAvroUnionInlineErrorPropagation(t *testing.T) { + tests := []struct { + name string + cfg ecsAvroConfig + spec string + wantMsg string + }{ + { + name: "raw union, nullable shape, malformed decimal precision", + cfg: ecsAvroConfig{rawUnion: true}, + spec: `{ + "type": "record", + "name": "Tx", + "fields": [{ + "name": "amount", + "type": ["null", {"type": "bytes", "logicalType": "decimal", "precision": "not-a-number", "scale": 2}] + }] + }`, + wantMsg: "decimal precision", + }, + { + name: "raw union, three-branch shape, malformed decimal scale", + cfg: ecsAvroConfig{rawUnion: true}, + spec: `{ + "type": "record", + "name": "Tx", + "fields": [{ + "name": "amount", + "type": ["null", "string", {"type": "bytes", "logicalType": "decimal", "precision": 9, "scale": "nope"}] + }] + }`, + wantMsg: "decimal scale", + }, + { + name: "lame union, nullable shape, malformed decimal precision", + cfg: ecsAvroConfig{}, + spec: `{ + "type": "record", + "name": "Tx", + "fields": [{ + "name": "amount", + "type": ["null", {"type": "bytes", "logicalType": "decimal", "precision": "not-a-number", "scale": 2}] + }] + }`, + wantMsg: "decimal precision", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := ecsAvroParseFromBytes(tt.cfg, []byte(tt.spec)) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantMsg, "inner error context must propagate via %%w wrapping, got %q", err.Error()) + assert.NotContains(t, err.Error(), "could not resolve type", "must not collapse to the type-stringifier fallback when an inner error exists") + }) + } +} + +// TestEcsAvroNamespaceInheritance verifies that named types defined inside +// a namespaced record inherit the enclosing namespace when they omit their +// own `namespace` field, per the Avro spec's name-resolution rules. Both +// short-name and fully-qualified references to the inherited fullname must +// resolve, including across deeply nested scopes. +func TestEcsAvroNamespaceInheritance(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "namespace": "com.example", + "fields": [ + { + "name": "primary_fee", + "type": { + "type": "record", + "name": "Fee", + "fields": [{"name": "amount", "type": "long"}] + } + }, + {"name": "by_short_name", "type": ["null", "Fee"]}, + {"name": "by_inherited_full_name", "type": ["null", "com.example.Fee"]} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 3) + + short := c.Children[1] + assert.Equal(t, "by_short_name", short.Name) + assert.Equal(t, schema.Object, short.Type, "unqualified Fee should resolve via inherited com.example namespace") + require.Len(t, short.Children, 1) + assert.Equal(t, "amount", short.Children[0].Name) + + full := c.Children[2] + assert.Equal(t, "by_inherited_full_name", full.Name) + assert.Equal(t, schema.Object, full.Type, "Fee implicitly belongs to com.example and must be reachable by FQN") + require.Len(t, full.Children, 1) +} + +// TestEcsAvroNameWithEmbeddedDot exercises the third form of Avro's named- +// type spelling: the `name` field itself contains a dot, in which case the +// dot-bearing value IS the fullname and any sibling `namespace` field is +// ignored. References must resolve under the embedded fullname. +func TestEcsAvroNameWithEmbeddedDot(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "namespace": "ignored.outer", + "fields": [ + { + "name": "primary_fee", + "type": { + "type": "record", + "name": "com.example.Fee", + "namespace": "this.is.ignored", + "fields": [{"name": "amount", "type": "long"}] + } + }, + {"name": "by_fqn", "type": ["null", "com.example.Fee"]} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 2) + + fqn := c.Children[1] + assert.Equal(t, "by_fqn", fqn.Name) + assert.Equal(t, schema.Object, fqn.Type) + require.Len(t, fqn.Children, 1) +} + +// TestEcsAvroSelfReferentialRecord verifies that a record whose own field +// references it by name (e.g. a linked-list `next` pointer) resolves to a +// structural stub — a one-level Object carrying the record's short name — +// rather than collapsing to schema.Any (VARCHAR downstream). +// +// True recursive resolution is out of scope: schema.Common is a value type +// with no back-reference machinery, so the self-reference is necessarily a +// stub. The fix exists so downstream sinks see "this is some kind of +// record" rather than an opaque blob. +func TestEcsAvroSelfReferentialRecord(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Node", + "fields": [ + {"name": "value", "type": "long"}, + {"name": "next", "type": ["null", "Node"]} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Equal(t, schema.Object, c.Type) + require.Len(t, c.Children, 2) + assert.Equal(t, "value", c.Children[0].Name) + assert.Equal(t, schema.Int64, c.Children[0].Type) + + next := c.Children[1] + assert.Equal(t, "next", next.Name) + assert.Equal(t, schema.Object, next.Type, "self-reference should resolve to Object stub, not schema.Any") + assert.True(t, next.Optional) + assert.Empty(t, next.Children, "self-reference is a one-level stub — recursive resolution is out of scope") +} + +// TestEcsAvroNamesMapIsolation verifies the clone-on-retrieval contract for +// the names map: mutating a resolved Common (e.g. appending to its Children) +// must not propagate to subsequent lookups of the same named type. Without +// the clone, the second `secondary_fee` resolution would see the mutated +// shape of the first. +func TestEcsAvroNamesMapIsolation(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Transfer", + "fields": [ + { + "name": "primary_fee", + "type": { + "type": "record", + "name": "Fee", + "fields": [{"name": "amount", "type": "long"}] + } + }, + {"name": "first_ref", "type": "Fee"}, + {"name": "second_ref", "type": "Fee"} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 3) + + first := c.Children[1] + require.Equal(t, schema.Object, first.Type) + require.Len(t, first.Children, 1) + // Mutate the first resolved copy. If the names map were aliased, the + // next retrieval would see a record with two children. + first.Children = append(first.Children, schema.Common{Name: "poison", Type: schema.String}) + assert.Len(t, first.Children, 2) + + second := c.Children[2] + require.Equal(t, schema.Object, second.Type) + assert.Len(t, second.Children, 1, "later retrieval of Fee must not see mutations applied to earlier resolutions") +} + // TestEcsAvroRecordWithNilFields is a regression test for schemas where a // field's type is a record object without a "fields" key (e.g. back-reference // form {"type":"record","name":"Party"} or "fields":null from some generators). From 03a57a76120ac7251d5e5e6ee027cd584463f9f5 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 18 May 2026 13:40:44 +0100 Subject: [PATCH 4/7] parquet: floor pre-epoch DATE strings instead of truncating MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit coerceDateForEncode's string path used Go's integer division, which truncates toward zero. For a pre-epoch RFC3339 input with a non-midnight component (e.g. "1969-12-31T23:59:59Z", Unix = -1) this rounded up to day 0 (1970-01-01) instead of the correct day -1 (1969-12-31). The time.Time arm already had the floor-toward-negative-infinity correction; the string arm did not. Extract the rounding logic into a shared unixDaysFloor helper and call it from both arms so the time.Time and string entry points produce bit-identical days-since-epoch for the same instant. The bare-date YYYY-MM-DD form parses to midnight UTC so its Unix is a multiple of 86400 — truncate and floor agree there — but it goes through the helper too for consistency. The regression test covers time.Time and RFC3339 inputs across pre-epoch non-midnight, pre-epoch midnight, epoch, post-epoch non-midnight, and bare-date shapes, asserting all paths agree on the floored day. --- internal/impl/parquet/schema_coercion.go | 23 ++++++++---- .../parquet/schema_coercion_coverage_test.go | 35 +++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/internal/impl/parquet/schema_coercion.go b/internal/impl/parquet/schema_coercion.go index 90d035d6a5..0b511088aa 100644 --- a/internal/impl/parquet/schema_coercion.go +++ b/internal/impl/parquet/schema_coercion.go @@ -172,12 +172,7 @@ func coerceTimestampForEncode(value any, ts *format.TimestampType) (any, error) func coerceDateForEncode(value any) (any, error) { switch v := value.(type) { case time.Time: - secs := v.UTC().Unix() - days := secs / 86400 - if secs < 0 && secs%86400 != 0 { - days-- - } - return int32(days), nil + return int32(unixDaysFloor(v)), nil case string: t, errRFC := time.Parse(time.RFC3339, v) if errRFC != nil { @@ -191,7 +186,7 @@ func coerceDateForEncode(value any) (any, error) { } t = t2 } - return int32(t.UTC().Unix() / 86400), nil + return int32(unixDaysFloor(t)), nil case int32: return v, nil case int64: @@ -210,6 +205,20 @@ func coerceDateForEncode(value any) (any, error) { } } +// unixDaysFloor returns days-since-epoch for t with floor-toward-negative- +// infinity rounding. Go's integer division truncates toward zero, which +// would map a pre-epoch wall-clock like 1969-12-31T23:59:59Z (Unix = -1) to +// day 0 (1970-01-01) instead of the correct day -1 (1969-12-31). Times at +// or after the epoch are unaffected. +func unixDaysFloor(t time.Time) int64 { + secs := t.UTC().Unix() + days := secs / 86400 + if secs < 0 && secs%86400 != 0 { + days-- + } + return days +} + // coerceTimeForEncode converts a value into the parquet physical // representation for a TIME column (int32 millis for millis unit, int64 // otherwise). Accepts time.Duration, time.Time (wall-clock portion), diff --git a/internal/impl/parquet/schema_coercion_coverage_test.go b/internal/impl/parquet/schema_coercion_coverage_test.go index 37acbfb6c4..de2b8f836b 100644 --- a/internal/impl/parquet/schema_coercion_coverage_test.go +++ b/internal/impl/parquet/schema_coercion_coverage_test.go @@ -202,3 +202,38 @@ func TestCoerceDateForEncode_StringErrorSurfacesBothAttempts(t *testing.T) { assert.Contains(t, msg, "RFC3339", "error should mention the RFC3339 attempt") assert.Contains(t, msg, "YYYY-MM-DD", "error should mention the bare-date attempt") } + +// TestCoerceDateForEncode_FloorsTowardNegativeInfinity pins down the +// pre-epoch rounding contract: a time.Time and an RFC3339 string at the +// same instant must produce the same days-since-epoch, and a pre-epoch +// wall clock with a non-midnight component must round down (1969-12-31) +// rather than truncate toward zero (1970-01-01). +// +// Go's integer division truncates, which silently mapped pre-epoch +// RFC3339 strings to the wrong day before the floor adjustment landed. +// The bare-date YYYY-MM-DD form happens to land on midnight UTC, so its +// Unix() is a multiple of 86400 and truncate/floor agree — but it is +// included here to confirm the shared helper hasn't perturbed it. +func TestCoerceDateForEncode_FloorsTowardNegativeInfinity(t *testing.T) { + tests := []struct { + name string + value any + wantDays int32 + }{ + {"time.Time pre-epoch non-midnight", time.Date(1969, 12, 31, 23, 59, 59, 0, time.UTC), -1}, + {"RFC3339 pre-epoch non-midnight", "1969-12-31T23:59:59Z", -1}, + {"RFC3339 pre-epoch midnight", "1969-12-31T00:00:00Z", -1}, + {"bare date pre-epoch", "1969-12-31", -1}, + {"time.Time epoch", time.Unix(0, 0).UTC(), 0}, + {"RFC3339 epoch", "1970-01-01T00:00:00Z", 0}, + {"RFC3339 post-epoch non-midnight", "1970-01-01T12:34:56Z", 0}, + {"bare date post-epoch", "2024-03-14", 19796}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := coerceDateForEncode(tt.value) + require.NoError(t, err) + assert.Equal(t, tt.wantDays, got, "days-since-epoch must be floor-rounded, not truncated") + }) + } +} From 148052091b94806757bd8998e963c52026d78579 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Tue, 26 May 2026 09:35:03 +0100 Subject: [PATCH 5/7] confluent: regression test for annotated inline-primitive nullable unions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kafka Connect / Debezium emit nullable string fields as `[{"type":"string","connect.default":""}, "null"]` — an inline-object non-null branch with extension annotations alongside the primitive type name, rather than the bare `"null","string"` two-string spelling. A field bug report claimed this shape failed to collapse to a nullable STRING under `raw_unions: true`; on the current build it does collapse correctly (the unknown annotations are ignored per Avro spec). Pinning the behaviour with a regression test so any future change to the optional-union resolver or the type-ref dispatcher trips on it. Covers both branch orderings ([inline, null] and [null, inline]) — the former was the shape from the bug report, the latter is the canonical Avro JSON spelling. --- internal/impl/confluent/ecs_avro_test.go | 62 ++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/internal/impl/confluent/ecs_avro_test.go b/internal/impl/confluent/ecs_avro_test.go index 9cee176338..d337a29aca 100644 --- a/internal/impl/confluent/ecs_avro_test.go +++ b/internal/impl/confluent/ecs_avro_test.go @@ -516,6 +516,68 @@ func TestEcsAvroLameUnionNameResolution(t *testing.T) { assert.Equal(t, schema.Int64, feeInner.Children[0].Type) } +// TestEcsAvroRawUnionAnnotatedInlinePrimitive verifies that the +// optional-union collapse handles the [{primitive-with-annotations}, null] +// shape — the form Kafka Connect / Debezium emit for nullable string fields, +// where the non-null branch is an inline object carrying `connect.default` +// (or similar extension properties) alongside `type: "string"`. The +// resolver should ignore the unknown annotations (per Avro spec) and +// collapse the union to Optional STRING the same way it collapses +// `["null", "string"]`. +// +// Both branch orderings are covered: null-second is the shape the bug +// report came in with, null-first is the canonical Avro spelling. +func TestEcsAvroRawUnionAnnotatedInlinePrimitive(t *testing.T) { + tests := []struct { + name string + spec string + }{ + { + name: "null second", + spec: `{ + "type": "record", + "name": "Rec", + "fields": [{ + "name": "category", + "type": [ + {"type": "string", "connect.default": ""}, + "null" + ], + "default": "" + }] + }`, + }, + { + name: "null first", + spec: `{ + "type": "record", + "name": "Rec", + "fields": [{ + "name": "category", + "type": [ + "null", + {"type": "string", "connect.default": ""} + ], + "default": null + }] + }`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, []byte(tt.spec)) + require.NoError(t, err) + require.Equal(t, schema.Object, c.Type) + require.Len(t, c.Children, 1) + category := c.Children[0] + assert.Equal(t, "category", category.Name, "outer field name must be preserved across the union collapse") + assert.Equal(t, schema.String, category.Type, "annotated inline primitive should collapse to STRING, not stay a Union") + assert.True(t, category.Optional, "the null branch must drive Optional=true") + assert.Empty(t, category.Children, "collapsed primitive has no children") + }) + } +} + // TestEcsAvroUnionInlineErrorPropagation pins down the %w-wrapping // contract through the union resolvers. A malformed inline decimal sitting // inside a nullable union must surface its root-cause error (the precision From 042d53e75a2f646c162558f57920bbe00d504898 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 28 May 2026 13:37:16 +0100 Subject: [PATCH 6/7] iceberg: address PR #4427 follow-up review Three review comments on PR #4427 that landed after merge: - shredder: reject the schema/column mismatch where a Timestamp common reaches an Int32 column. coerceTemporalToNumeric returns int64 UnixMilli/Micros/Nanos (~10^12), which the Int32Type arm cast to int32 with no bounds check, silently truncating into a garbage year. The arm is intended for Date / TimeOfDay coercions whose values do fit; bound-check post-coerce and fail loudly when they don't, with a message pointing the operator at BIGINT or a schema-metadata fix. - output_iceberg: drop the redundant conf.Contains guard around require_schema_metadata parsing. The field declares Default(false) in the spec so FieldBool returns false-without-error on absence and the inline form fits cleanly. - shredder: expose StrictTemporalMode as an exported struct field instead of the SetStrictTemporalMode setter. It's a single bool with no validation hook, so the setter added ceremony without value. Updates the sole writer.go caller to assign the field directly. New regression test TestCoerceTemporalInt32OverflowGuard pins the Int32 overflow rejection and confirms in-range Date / TimeOfDay coercions still succeed. --- internal/impl/iceberg/output_iceberg.go | 17 ++++--- internal/impl/iceberg/shredder/shredder.go | 47 +++++++++---------- .../impl/iceberg/shredder/temporal_test.go | 44 +++++++++++++++++ internal/impl/iceberg/writer.go | 6 +-- 4 files changed, 77 insertions(+), 37 deletions(-) diff --git a/internal/impl/iceberg/output_iceberg.go b/internal/impl/iceberg/output_iceberg.go index 991e947f9b..07ae681df2 100644 --- a/internal/impl/iceberg/output_iceberg.go +++ b/internal/impl/iceberg/output_iceberg.go @@ -484,15 +484,14 @@ func parseSchemaEvolutionConfig(conf *service.ParsedConfig) (SchemaEvolutionConf } } - // Parse require_schema_metadata - if conf.Contains(ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata) { - cfg.RequireSchemaMetadata, err = conf.FieldBool(ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata) - if err != nil { - return cfg, err - } - if cfg.RequireSchemaMetadata && cfg.SchemaMetadata == "" { - return cfg, fmt.Errorf("%s.%s requires %s.%s to be set", ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata, ioFieldSchemaEvolution, ioFieldSchemaEvolutionSchemaMetadata) - } + // Parse require_schema_metadata. The field carries Default(false) in the + // spec so FieldBool returns false-without-error when absent, no Contains + // guard required. + if cfg.RequireSchemaMetadata, err = conf.FieldBool(ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata); err != nil { + return cfg, err + } + if cfg.RequireSchemaMetadata && cfg.SchemaMetadata == "" { + return cfg, fmt.Errorf("%s.%s requires %s.%s to be set", ioFieldSchemaEvolution, ioFieldSchemaEvolutionRequireSchemaMetadata, ioFieldSchemaEvolution, ioFieldSchemaEvolutionSchemaMetadata) } return cfg, nil diff --git a/internal/impl/iceberg/shredder/shredder.go b/internal/impl/iceberg/shredder/shredder.go index d91e7e6f5a..4a0a30fb02 100644 --- a/internal/impl/iceberg/shredder/shredder.go +++ b/internal/impl/iceberg/shredder/shredder.go @@ -87,13 +87,20 @@ type RecordShredder struct { // typed columns instead of guessing. nil entries fall back to the // pre-schema-metadata behavior — see [convertLeafValue]. fieldCommons map[int]*schema.Common - // strictTemporal causes [convertLeafValue] to refuse numeric inputs - // into time-typed columns when no schema metadata has been - // registered for that column. When false (the default), the value - // converter falls back to [bloblang.ValueAsTimestamp]'s seconds - // default — convenient but silently wrong if the upstream produced - // a different unit. When true, the writer fails the batch loudly. - strictTemporal bool + // StrictTemporalMode causes [convertLeafValue] to refuse numeric + // inputs into time-typed columns (TIMESTAMP / TIMESTAMPTZ / DATE / + // TIME) when no schema metadata has been registered for that + // column. When false (the default), the value converter falls back + // to [bloblang.ValueAsTimestamp]'s seconds default — convenient but + // silently wrong if the upstream produced a different unit. When + // true, the writer fails the batch loudly. + // + // No effect on time.Time / time.Duration values, which carry their + // own unit unambiguously, and no effect on non-time columns. Set + // directly on the shredder after construction; operators that + // cannot guarantee schema metadata flows end-to-end flip this on to + // fail loudly instead of silently corrupting dates by ~50,000 years. + StrictTemporalMode bool } // NewRecordShredder creates a new shredder for the given schema. @@ -107,22 +114,6 @@ func NewRecordShredder(schema *iceberg.Schema, caseSensitive bool) *RecordShredd } } -// SetStrictTemporalMode toggles whether numeric inputs into time-typed -// columns require registered schema metadata. With strict mode on, a bare -// int64 / float64 value reaching a TIMESTAMP / TIMESTAMPTZ / DATE / TIME -// column with no [schema.Common] in the field map is rejected with a -// per-field error rather than guessed-as-Unix-seconds. -// -// Strict mode has no effect on time.Time / time.Duration values, which -// carry their own unit unambiguously, and no effect on non-time columns. -// -// Defaults to off (back-compat). Operators that cannot guarantee schema -// metadata flows end-to-end can flip this on to fail loudly instead of -// silently corrupting dates by ~50,000 years. -func (rs *RecordShredder) SetStrictTemporalMode(on bool) { - rs.strictTemporal = on -} - // SetFieldSchemaMetadata supplies a field-ID → schema.Common map that the // leaf value converter consults when it sees a numeric input destined for a // time-typed Iceberg column (TIMESTAMP, TIMESTAMPTZ, TIMESTAMP_NS, @@ -264,7 +255,7 @@ func (rs *RecordShredder) shredValue( default: // Leaf/primitive type. - pqVal, err := convertLeafValue(value, typ, rs.commonForField(fieldID), rs.strictTemporal) + pqVal, err := convertLeafValue(value, typ, rs.commonForField(fieldID), rs.StrictTemporalMode) if err != nil { return err } @@ -478,6 +469,14 @@ func convertLeafValue(value any, typ iceberg.Type, common *schema.Common, strict if strictTemporal { return parquet.NullValue(), fmt.Errorf("int column received %T while schema metadata declares type %v; require_schema_metadata=true demands the existing column type match the schema metadata — recreate the table to migrate", value, common.Type) } + // A Timestamp common's UnixMilli/Micros/Nanos far exceeds the + // int32 range, but the Int32Type arm is intended for Date / + // TimeOfDay coercions whose values do fit. Reject the + // schema/column mismatch loudly rather than silently + // truncating into a garbage year. + if n > math.MaxInt32 || n < math.MinInt32 { + return parquet.NullValue(), fmt.Errorf("int column received %T with schema metadata type %v; coerced value %d overflows int32 — the column should be BIGINT or the schema metadata is wrong", value, common.Type, n) + } return parquet.Int32Value(int32(n)), nil } i, err := bloblang.ValueAsInt64(value) diff --git a/internal/impl/iceberg/shredder/temporal_test.go b/internal/impl/iceberg/shredder/temporal_test.go index f5ddc1117c..cb4aff52ac 100644 --- a/internal/impl/iceberg/shredder/temporal_test.go +++ b/internal/impl/iceberg/shredder/temporal_test.go @@ -498,6 +498,50 @@ func TestCoerceTemporalIntoNumericColumn(t *testing.T) { }) } +// TestCoerceTemporalInt32OverflowGuard covers the schema/column mismatch +// where a TIMESTAMP schema common reaches an Int32 column — coerceTemporal- +// ToNumeric returns a UnixMilli value (~10^12) that vastly exceeds int32 +// range. The Int32 arm is intended for Date / TimeOfDay coercions whose +// values fit in int32; a Timestamp value silently truncating into a garbage +// year is the failure mode this guard exists to prevent. +// +// The complementary Int64 arm has no bounds problem (Timestamp values fit +// comfortably in int64) and is verified separately by +// TestCoerceTemporalIntoNumericColumn. +func TestCoerceTemporalInt32OverflowGuard(t *testing.T) { + const tsMillis = int64(1_700_000_000_000) // 2023-11-14, ~10^12, well beyond int32 + + t.Run("Timestamp(Millis) into Int32 errors with overflow message", func(t *testing.T) { + v := time.UnixMilli(tsMillis).UTC() + _, err := convertLeafValue(v, iceberg.Int32Type{}, tsCommon(schema.TimeUnitMillis, true), false) + require.Error(t, err) + assert.Contains(t, err.Error(), "overflows int32", "must reject the schema/column mismatch loudly, not truncate to a garbage year") + }) + + t.Run("Timestamp(Micros) into Int32 errors with overflow message", func(t *testing.T) { + v := time.UnixMilli(tsMillis).UTC() + _, err := convertLeafValue(v, iceberg.Int32Type{}, tsCommon(schema.TimeUnitMicros, true), false) + require.Error(t, err) + assert.Contains(t, err.Error(), "overflows int32") + }) + + // Sanity check: the in-range coercions the Int32 arm was designed for + // still succeed. + t.Run("Date into Int32 still succeeds", func(t *testing.T) { + v := time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC) + pq, err := convertLeafValue(v, iceberg.Int32Type{}, &schema.Common{Type: schema.Date}, false) + require.NoError(t, err) + assert.Equal(t, int32(19737), pq.Int32()) + }) + + t.Run("TimeOfDay into Int32 still succeeds", func(t *testing.T) { + d := 8*time.Hour + 30*time.Minute + pq, err := convertLeafValue(d, iceberg.Int32Type{}, todCommon(schema.TimeUnitMillis), false) + require.NoError(t, err) + assert.Equal(t, int32(8*3600+30*60)*1000, pq.Int32()) + }) +} + // TestCoerceTemporalRejectedInStrictMode confirms that the temporal->numeric // coerce path is disabled when require_schema_metadata=true. In strict mode // a type disagreement between the existing column and the schema metadata diff --git a/internal/impl/iceberg/writer.go b/internal/impl/iceberg/writer.go index 4ba895f8aa..72088f6215 100644 --- a/internal/impl/iceberg/writer.go +++ b/internal/impl/iceberg/writer.go @@ -55,7 +55,7 @@ type writer struct { // resolver supplies optional per-message schema metadata used by the shredder // to interpret numeric inputs into time-typed columns; pass nil to disable. // requireSchemaMetadata enables shredder strict mode — see -// [shredder.RecordShredder.SetStrictTemporalMode]. +// [shredder.RecordShredder.StrictTemporalMode]. func NewWriter(tbl *table.Table, comm *committer, caseSensitive bool, writerOpts []parquet.WriterOption, resolver *typeResolver, requireSchemaMetadata bool, logger *service.Logger) *writer { return &writer{ table: tbl, @@ -222,9 +222,7 @@ func (w *writer) messagesToParquet(batch service.MessageBatch) ([]partitionFile, // silently for messages 1..N; in that case the writer must be // extended to per-message metadata lookup with a small cache. rs := shredder.NewRecordShredder(schema, w.caseSensitive) - if w.requireSchemaMetadata { - rs.SetStrictTemporalMode(true) - } + rs.StrictTemporalMode = w.requireSchemaMetadata if w.resolver != nil && len(batch) > 0 { if common, err := w.resolver.parseSchemaMetadata(batch[0]); err != nil { w.logger.Warnf("parsing schema metadata for shredder: %v (falling back to schema-agnostic conversion)", err) From 97c74e4c902f8f0ef07dca985608596ca60b6997 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 28 May 2026 20:14:20 +0100 Subject: [PATCH 7/7] confluent: arbitrate Avro short-name collisions across namespaces MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ecsAvroFromAnyMap registered each record/enum/fixed under both its fullname and an unqualified short-name shortcut, with the second declaration silently overwriting the first when two types in different namespaces shared a short name. An unqualified reference that missed the enclosing-namespace prefix would then bind to whichever fullname registered last — silent column corruption when downstream sinks key off the resolved structure. Route every registration through a new putName helper backed by a nameOwners map that tracks which fullname currently owns each key. The arbitration rules: - A canonical fullname binding (key == owner) always wins. Two different fullnames can never share a fullname key. - A short-name claim that collides with an existing canonical fullname for the same key is dropped (the fullname keeps the slot, regardless of registration order). - Two short-name claims from different fullnames mark the key as ambiguous: nameOwners becomes "" and the names entry is deleted, so the bare-name lookup falls through to schema.Any rather than guessing. Unqualified references from inside the correct namespace still resolve through the enclosing-prefix lookup (cfg.namespace + "." + ref) unchanged — the ambiguity guard only affects the bare-name shortcut. Two new tests pin the contract: - TestEcsAvroSharedShortNameAcrossNamespaces: com.a.Fee + com.b.Fee + bare "Fee" reference → bare reference resolves to schema.Any (loud ambiguity), fully-qualified references resolve to their respective types. - TestEcsAvroRootShortNameWinsOverNamespacedCollision: root-scope Fee + com.a.Fee, registered in both orders → bare "Fee" always resolves to the canonical root-scope Fee. --- internal/impl/confluent/ecs_avro.go | 77 +++++++++++-- internal/impl/confluent/ecs_avro_test.go | 135 +++++++++++++++++++++++ 2 files changed, 202 insertions(+), 10 deletions(-) diff --git a/internal/impl/confluent/ecs_avro.go b/internal/impl/confluent/ecs_avro.go index 36470b51a9..4860f36250 100644 --- a/internal/impl/confluent/ecs_avro.go +++ b/internal/impl/confluent/ecs_avro.go @@ -76,12 +76,24 @@ type ecsAvroConfig struct { translateKafkaConnectTypes bool // names is the lexical-scope registry of resolved record/enum/fixed - // definitions, keyed by Avro fullname (and short name for - // convenience). Stored values must be treated as immutable — every - // retrieval clones via cloneCommon so callers can mutate freely - // without corrupting later look-ups. + // definitions, keyed by Avro fullname plus an unambiguous short-name + // shortcut for each declaration. Stored values must be treated as + // immutable — every retrieval clones via cloneCommon so callers can + // mutate freely without corrupting later look-ups. names map[string]schema.Common + // nameOwners tracks the fullname that owns each key in [names]. A + // fullname-key is its own owner ("Fee" → "Fee" for a root-scope Fee, + // "com.a.Fee" → "com.a.Fee" for a namespaced Fee). A short-name key + // inherits its owning declaration's fullname ("Fee" → "com.a.Fee"). + // When a second declaration tries to claim a short-name key that a + // different fullname already owns the entry is marked ambiguous — + // owner becomes "" and the [names] entry is deleted — so unqualified + // references fall through to schema.Any instead of silently binding + // to whichever fullname registered last. A canonical fullname binding + // (key equals owner) always wins over a colliding short-name claim. + nameOwners map[string]string + // namespace is the enclosing Avro namespace, threaded through the // recursion by value. It is updated when entering a named-type // declaration that introduces a new namespace, per the Avro spec's @@ -98,6 +110,9 @@ func ecsAvroParseFromBytes(cfg ecsAvroConfig, specBytes []byte) (schema.Common, if cfg.names == nil { cfg.names = map[string]schema.Common{} } + if cfg.nameOwners == nil { + cfg.nameOwners = map[string]string{} + } var as any if err := json.Unmarshal(specBytes, &as); err != nil { return schema.Common{}, err @@ -631,9 +646,9 @@ func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, err fullname, shortName, childNamespace := ecsAvroAssignFullname(cfg.namespace, typeName, as) if fullname != "" { placeholder := ecsAvroPlaceholder(typeName, shortName) - cfg.names[fullname] = placeholder - if shortName != "" && shortName != fullname { - cfg.names[shortName] = placeholder + putName(cfg, fullname, fullname, placeholder) + if shortName != fullname { + putName(cfg, shortName, fullname, placeholder) } // Inheritable namespace propagates into the child walk; sibling // scopes are unaffected because cfg is passed by value. @@ -642,14 +657,56 @@ func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, err c, err := ecsAvroFromAnyMapImpl(cfg, as) if err == nil && fullname != "" { - cfg.names[fullname] = c - if shortName != "" && shortName != fullname { - cfg.names[shortName] = c + putName(cfg, fullname, fullname, c) + if shortName != fullname { + putName(cfg, shortName, fullname, c) } } return c, err } +// putName registers a resolved Common under one lookup key in cfg.names, +// arbitrating short-name collisions across namespaces. A fullname-as-key +// call (`key == owner`) is canonical and always wins. Two short-name +// claims from different fullnames mark the key as ambiguous (`nameOwners` +// becomes "" and the [names] entry is deleted), so unqualified references +// fall through to schema.Any instead of silently binding to whichever +// fullname registered last. A short-name claim that collides with an +// existing canonical fullname binding for the same key is dropped — the +// fullname keeps the slot. +func putName(cfg ecsAvroConfig, key, owner string, c schema.Common) { + if key == "" { + return + } + existing, seen := cfg.nameOwners[key] + if !seen { + cfg.nameOwners[key] = owner + cfg.names[key] = c + return + } + if existing == "" { + return // already ambiguous + } + if existing == owner { + cfg.names[key] = c // same owner; placeholder→final replacement + return + } + if key == owner { + // This call is the canonical fullname registration — take the slot + // over from whoever was using it as a short-name shortcut. + cfg.nameOwners[key] = owner + cfg.names[key] = c + return + } + if key == existing { + // Existing owner's fullname matches the key — canonical, leave it. + return + } + // Both claims are short-name shortcuts from different fullnames. + cfg.nameOwners[key] = "" + delete(cfg.names, key) +} + // ecsAvroAssignFullname computes the Avro fullname of a named-type // declaration ([record, enum, fixed]) from its declaration map and the // enclosing namespace, alongside the short name and the namespace that diff --git a/internal/impl/confluent/ecs_avro_test.go b/internal/impl/confluent/ecs_avro_test.go index d337a29aca..70bec72439 100644 --- a/internal/impl/confluent/ecs_avro_test.go +++ b/internal/impl/confluent/ecs_avro_test.go @@ -516,6 +516,141 @@ func TestEcsAvroLameUnionNameResolution(t *testing.T) { assert.Equal(t, schema.Int64, feeInner.Children[0].Type) } +// TestEcsAvroSharedShortNameAcrossNamespaces pins down the collision- +// detection contract: when two records in different namespaces share the +// same short name, the bare-name shortcut in cfg.names must drop out so +// unqualified references fall through to schema.Any instead of silently +// binding to whichever record registered last. +// +// Fully-qualified references continue to resolve through the fullname +// keys, which are unique by construction. References from inside one of +// the two colliding namespaces also resolve through the +// enclosing-namespace prefix, never relying on the bare-name shortcut. +func TestEcsAvroSharedShortNameAcrossNamespaces(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Container", + "fields": [ + { + "name": "from_a", + "type": { + "type": "record", + "name": "com.a.Fee", + "fields": [{"name": "amount_a", "type": "long"}] + } + }, + { + "name": "from_b", + "type": { + "type": "record", + "name": "com.b.Fee", + "fields": [{"name": "amount_b", "type": "string"}] + } + }, + {"name": "by_qualified_a", "type": "com.a.Fee"}, + {"name": "by_qualified_b", "type": "com.b.Fee"}, + {"name": "by_bare_name", "type": "Fee"} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 5) + + byQualA := c.Children[2] + require.Equal(t, schema.Object, byQualA.Type, "fully-qualified com.a.Fee must resolve") + require.Len(t, byQualA.Children, 1) + assert.Equal(t, "amount_a", byQualA.Children[0].Name) + + byQualB := c.Children[3] + require.Equal(t, schema.Object, byQualB.Type, "fully-qualified com.b.Fee must resolve") + require.Len(t, byQualB.Children, 1) + assert.Equal(t, "amount_b", byQualB.Children[0].Name) + + byBare := c.Children[4] + assert.Equal(t, schema.Any, byBare.Type, "ambiguous short name must not silently bind to whichever namespaced type registered last") + assert.Empty(t, byBare.Children) +} + +// TestEcsAvroRootShortNameWinsOverNamespacedCollision verifies that a +// canonical fullname binding (a record whose fullname equals the colliding +// short name) preserves its slot when a namespaced record tries to claim +// the same bare-name shortcut. The fullname binding is unique per type +// and takes priority regardless of registration order. +func TestEcsAvroRootShortNameWinsOverNamespacedCollision(t *testing.T) { + t.Run("root declared first", func(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Container", + "fields": [ + { + "name": "root_fee", + "type": { + "type": "record", + "name": "Fee", + "fields": [{"name": "root_amount", "type": "long"}] + } + }, + { + "name": "ns_fee", + "type": { + "type": "record", + "name": "com.a.Fee", + "fields": [{"name": "ns_amount", "type": "string"}] + } + }, + {"name": "by_bare", "type": "Fee"}, + {"name": "by_qualified", "type": "com.a.Fee"} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 4) + byBare := c.Children[2] + require.Equal(t, schema.Object, byBare.Type, "bare Fee must resolve to the canonical root-scope Fee, not the namespaced shortcut") + require.Len(t, byBare.Children, 1) + assert.Equal(t, "root_amount", byBare.Children[0].Name) + byQualified := c.Children[3] + require.Equal(t, schema.Object, byQualified.Type) + assert.Equal(t, "ns_amount", byQualified.Children[0].Name) + }) + + t.Run("namespaced declared first", func(t *testing.T) { + spec := []byte(`{ + "type": "record", + "name": "Container", + "fields": [ + { + "name": "ns_fee", + "type": { + "type": "record", + "name": "com.a.Fee", + "fields": [{"name": "ns_amount", "type": "string"}] + } + }, + { + "name": "root_fee", + "type": { + "type": "record", + "name": "Fee", + "fields": [{"name": "root_amount", "type": "long"}] + } + }, + {"name": "by_bare", "type": "Fee"}, + {"name": "by_qualified", "type": "com.a.Fee"} + ] + }`) + c, err := ecsAvroParseFromBytes(ecsAvroConfig{rawUnion: true}, spec) + require.NoError(t, err) + require.Len(t, c.Children, 4) + byBare := c.Children[2] + require.Equal(t, schema.Object, byBare.Type, "canonical fullname must win regardless of registration order") + require.Len(t, byBare.Children, 1) + assert.Equal(t, "root_amount", byBare.Children[0].Name) + byQualified := c.Children[3] + assert.Equal(t, "ns_amount", byQualified.Children[0].Name) + }) +} + // TestEcsAvroRawUnionAnnotatedInlinePrimitive verifies that the // optional-union collapse handles the [{primitive-with-annotations}, null] // shape — the form Kafka Connect / Debezium emit for nullable string fields,