Skip to content

Commit 67ded3c

Browse files
committed
confluent: resolve named-type references in nullable Avro unions
Avro JSON schemas may reference a previously-defined record/enum/fixed by name rather than inlining the full definition — the Java/JDBC idiom for any record reused across more than one field, e.g. {"name": "secondary_fee", "type": ["null", "Fee"]} where "Fee" was defined inline by an earlier field. The metadata parser in ecsAvroParseFromBytes was treating the string branch as an unknown type and falling through to schema.Any, so the resulting common-schema metadata reported the field as VARCHAR rather than the registered record structure. Downstream sinks (notably iceberg) then created a string column where the customer expected a nested struct. Thread a names map through ecsAvroConfig, register every record/enum/ fixed by both its simple name and its fully-qualified namespace.name form, and have the string-form type resolver consult the map before falling back to the primitive-name lookup. Also generalise the optional- union helper to accept either ordering -- [null, X] and [X, null] -- since the Avro spec doesn't constrain branch order. The lexical-scope assumption -- a name must be defined before it is referenced -- is the Avro spec's, so a single forward-only pass suffices. Self-referential records remain unsupported and would need pre- registration with a placeholder; flagged in the registration helper's comment. Tests cover the four shapes CON-468's acceptance criteria call out: nullable inline record (already green via #4380), nullable record by name reference, both branch orderings, fully-qualified vs short-name references, and record-with-nested-record where the inner level is itself a name-reference union.
1 parent eac275e commit 67ded3c

2 files changed

Lines changed: 323 additions & 55 deletions

File tree

internal/impl/confluent/ecs_avro.go

Lines changed: 125 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,26 @@ type ecsAvroConfig struct {
7474
// but the metadata still claims Int64 — the exact mismatch fixed
7575
// for sibling-form Avro by [applyAvroLogicalType].
7676
translateKafkaConnectTypes bool
77+
78+
// names accumulates resolved record/enum/fixed definitions during a
79+
// single parse so that string-form references (e.g. "Fee" in
80+
// ["null", "Fee"]) can be expanded to their full Common shape. The
81+
// map is lazily allocated by ecsAvroParseFromBytes and shared by
82+
// reference through recursive ecsAvroFromAnyMap calls; mutations from
83+
// sub-trees propagate to later siblings as the Avro spec requires
84+
// (named types are lexically scoped from the schema root, and a name
85+
// must be defined before it is referenced).
86+
names map[string]schema.Common
7787
}
7888

7989
// ecsAvroParseFromBytes parses an Avro JSON spec into a schema.Common. The
8090
// schema-registry decoder uses the parsed form directly so it can walk
8191
// decimal field paths during value normalisation; callers that just want
8292
// the metadata copy can call ToAny() on the result.
8393
func ecsAvroParseFromBytes(cfg ecsAvroConfig, specBytes []byte) (schema.Common, error) {
94+
if cfg.names == nil {
95+
cfg.names = map[string]schema.Common{}
96+
}
8497
var as any
8598
if err := json.Unmarshal(specBytes, &as); err != nil {
8699
return schema.Common{}, err
@@ -109,51 +122,75 @@ func ecsAvroParseFromBytes(cfg ecsAvroConfig, specBytes []byte) (schema.Common,
109122
return schema.Common{}, fmt.Errorf("expected either an array or object at root of schema, got %T", as)
110123
}
111124

112-
// If the union is actually just a verbose way of defining an optional field
113-
// then we return the real type and true. E.g. if we see:
125+
// ecsAvroResolveOptionalUnion checks whether a 2-element union is just a
126+
// nullable wrapper (one branch is "null") and returns the resolved non-null
127+
// branch as a Common with Optional=true.
114128
//
115-
// `"type": [ "null", "string" ]`
129+
// Handles both orderings ([null, X] and [X, null]) and resolves the non-null
130+
// branch in three forms:
116131
//
117-
// Then we return string and true.
118-
func ecsAvroIsUnionJustOptional(types []any) (schema.CommonType, bool) {
119-
if len(types) != 2 {
120-
return schema.CommonType(-1), false
121-
}
122-
123-
firstTypeStr, ok := types[0].(string)
124-
if !ok || firstTypeStr != "null" {
125-
return schema.CommonType(-1), false
126-
}
127-
128-
secondTypeStr, ok := types[1].(string)
129-
if !ok {
130-
return schema.CommonType(-1), false
131-
}
132-
133-
return ecsAvroTypeToCommon(secondTypeStr), true
134-
}
135-
136-
// ecsAvroIsUnionJustOptionalObject mirrors ecsAvroIsUnionJustOptional but
137-
// for the [null, {object}] shape — Avro's idiom for a nullable named or
138-
// logically-typed field. Returns the resolved Common (with any
139-
// LogicalParams populated) and true on match.
140-
func ecsAvroIsUnionJustOptionalObject(cfg ecsAvroConfig, types []any) (schema.Common, bool) {
132+
// - a primitive type name string (e.g. "string"),
133+
// - a previously-defined named-type reference string (e.g. "Fee"),
134+
// resolved via cfg.names per the Avro lexical-scope rule,
135+
// - an inline type definition object (e.g. {"type":"record",...}).
136+
//
137+
// Returns (Common{}, false) when the union doesn't fit this shape (length
138+
// != 2, no "null" branch, two non-null branches, or an inline object that
139+
// fails to parse).
140+
func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common, bool) {
141141
if len(types) != 2 {
142142
return schema.Common{}, false
143143
}
144-
firstStr, ok := types[0].(string)
145-
if !ok || firstStr != "null" {
144+
var other any
145+
for _, t := range types {
146+
if s, ok := t.(string); ok && s == "null" {
147+
continue
148+
}
149+
if other != nil {
150+
// Two non-null branches — not a nullable wrapper.
151+
return schema.Common{}, false
152+
}
153+
other = t
154+
}
155+
if other == nil {
146156
return schema.Common{}, false
147157
}
148-
secondMap, ok := types[1].(map[string]any)
158+
inner, ok := ecsAvroResolveTypeRef(cfg, other)
149159
if !ok {
150160
return schema.Common{}, false
151161
}
152-
c, err := ecsAvroFromAnyMap(cfg, secondMap)
153-
if err != nil {
154-
return schema.Common{}, false
162+
inner.Optional = true
163+
return inner, true
164+
}
165+
166+
// ecsAvroResolveTypeRef resolves a single Avro type reference — the value
167+
// of a "type" field, or one branch of a union — to a Common. The reference
168+
// may be a primitive type name string, a previously-defined named-type
169+
// reference string (resolved via cfg.names), or an inline type definition
170+
// object.
171+
//
172+
// Returns (Common{}, false) only when an inline object fails to parse.
173+
// Unknown string names fall back to schema.Any so that downstream sinks
174+
// see a sensible (if structureless) column rather than a parse error.
175+
func ecsAvroResolveTypeRef(cfg ecsAvroConfig, ref any) (schema.Common, bool) {
176+
switch b := ref.(type) {
177+
case string:
178+
// Try the names map first so a name reference takes priority over
179+
// the schema.Any fallback in ecsAvroTypeToCommon. Primitive names
180+
// are never registered in the map, so primitives reach the
181+
// fallback unchanged.
182+
if resolved, ok := cfg.names[b]; ok {
183+
return resolved, true
184+
}
185+
return schema.Common{Type: ecsAvroTypeToCommon(b)}, true
186+
case map[string]any:
187+
inner, err := ecsAvroFromAnyMap(cfg, b)
188+
if err != nil {
189+
return schema.Common{}, false
190+
}
191+
return inner, true
155192
}
156-
return c, true
193+
return schema.Common{}, false
157194
}
158195

159196
// applyAvroLogicalType reads the optional "logicalType" annotation from an
@@ -471,38 +508,25 @@ func ecsAvroTypeToCommon(t string) schema.CommonType {
471508
}
472509

473510
func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) error {
474-
// [null, primitive-name] → Optional <primitive>.
475-
if t, optional := ecsAvroIsUnionJustOptional(types); optional {
476-
c.Type, c.Optional = t, true
477-
return nil
478-
}
479-
// [null, {object}] → Optional <object>, propagating logical params and
480-
// nested children. This catches the common Avro idiom for nullable
481-
// decimal/timestamp/etc. logical types.
482-
if inner, ok := ecsAvroIsUnionJustOptionalObject(cfg, types); ok {
511+
// [null, X] or [X, null] → Optional X. ecsAvroResolveOptionalUnion
512+
// handles primitive names, named-type references, and inline objects
513+
// in either ordering.
514+
if inner, ok := ecsAvroResolveOptionalUnion(cfg, types); ok {
483515
name := c.Name
484516
*c = inner
485517
if name != "" {
486518
c.Name = name
487519
}
488-
c.Optional = true
489520
return nil
490521
}
491522

492523
c.Type = schema.Union
493524
for i, uObj := range types {
494-
switch ut := uObj.(type) {
495-
case string:
496-
c.Children = append(c.Children, schema.Common{
497-
Type: ecsAvroTypeToCommon(ut),
498-
})
499-
case map[string]any:
500-
tmpC, err := ecsAvroFromAnyMap(cfg, ut)
501-
if err != nil {
502-
return fmt.Errorf("union `%v` child '%v': %w", c.Name, i, err)
503-
}
504-
c.Children = append(c.Children, tmpC)
525+
child, ok := ecsAvroResolveTypeRef(cfg, uObj)
526+
if !ok {
527+
return fmt.Errorf("union `%v` child '%v': could not resolve type %T", c.Name, i, uObj)
505528
}
529+
c.Children = append(c.Children, child)
506530
}
507531
return nil
508532
}
@@ -544,6 +568,42 @@ func ecsAvroHydrateLameUnion(cfg ecsAvroConfig, c *schema.Common, types []any) e
544568
}
545569

546570
func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) {
571+
c, err := ecsAvroFromAnyMapImpl(cfg, as)
572+
if err == nil {
573+
ecsAvroRegisterNamedType(cfg, as, c)
574+
}
575+
return c, err
576+
}
577+
578+
// ecsAvroRegisterNamedType records a resolved record/enum/fixed Common in
579+
// cfg.names so that later string-form references (e.g. "Fee" instead of an
580+
// inline record definition) can be expanded. Avro's lexical-scope rule
581+
// requires the name to appear before any reference, so a single forward-only
582+
// pass through the schema is sufficient. Recursive types — where a record's
583+
// own children reference it by name before its definition completes — are
584+
// not supported by this approach; the registered entry must not be mutated
585+
// after registration to avoid surprising aliasing with later look-ups.
586+
func ecsAvroRegisterNamedType(cfg ecsAvroConfig, as map[string]any, c schema.Common) {
587+
if cfg.names == nil {
588+
return
589+
}
590+
typeName, _ := as["type"].(string)
591+
switch typeName {
592+
case "record", "enum", "fixed":
593+
default:
594+
return
595+
}
596+
name, _ := as["name"].(string)
597+
if name == "" {
598+
return
599+
}
600+
cfg.names[name] = c
601+
if ns, _ := as["namespace"].(string); ns != "" {
602+
cfg.names[ns+"."+name] = c
603+
}
604+
}
605+
606+
func ecsAvroFromAnyMapImpl(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) {
547607
var c schema.Common
548608
c.Name, _ = as["name"].(string)
549609

@@ -580,6 +640,16 @@ func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, err
580640
}
581641
return c, nil
582642
case string:
643+
// String form may be a primitive type name OR a name reference to
644+
// a previously-defined record/enum/fixed in lexical scope.
645+
if resolved, ok := cfg.names[t]; ok {
646+
fieldName := c.Name
647+
c = resolved
648+
if fieldName != "" {
649+
c.Name = fieldName
650+
}
651+
return c, nil
652+
}
583653
c.Type = ecsAvroTypeToCommon(t)
584654
case map[string]any:
585655
// The type field is an object (e.g. {"type":"map","values":"long"}).

0 commit comments

Comments
 (0)