Skip to content

Commit 80ff730

Browse files
committed
confluent: harden Avro name resolution and union error wrapping
Polish pass on the Avro JSON metadata parser following local review: - Implement the Avro spec's namespace-inheritance rule. A name with no dot and no `namespace` field inherits the most-tightly-enclosing namespace; the new ecsAvroAssignFullname helper handles all three spelling forms (dot-in-name, explicit namespace, inheritance) and is mirrored by an inheritance-aware ecsAvroLookupName that tries `<enclosing>.<ref>` before the bare name. - Pre-register a structural-stub placeholder before walking a record's children so a self-reference (linked-list style) resolves to a one- level Object stub instead of collapsing to schema.Any. Mutual recursion across distinct records remains out of scope. - Deep-copy entries on retrieval from the names map (cloneCommon) so callers can mutate the returned Common without corrupting later look-ups. Removes a latent aliasing footgun. - Restore the %w-wrapping contract through the union resolvers. ecsAvroResolveTypeRef now returns (Common, error) and ecsAvroResolveOptionalUnion returns (Common, bool, error); both union hydrators wrap the inner cause, so a malformed inline decimal surfaces as "decimal precision: not an integer: ..." rather than the type- stringifier fallback "could not resolve type map[string]interface{}". Tests cover namespace inheritance with short and inherited-FQN refs, dot-form names overriding the namespace field, self-reference stubs, names-map immutability after caller mutation, and %w propagation across raw / lame union paths and nullable / general-union shapes.
1 parent bbd78f9 commit 80ff730

2 files changed

Lines changed: 367 additions & 59 deletions

File tree

internal/impl/confluent/ecs_avro.go

Lines changed: 161 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,19 @@ type ecsAvroConfig struct {
7575
// for sibling-form Avro by [applyAvroLogicalType].
7676
translateKafkaConnectTypes bool
7777

78-
// names accumulates resolved record/enum/fixed definitions during a
79-
// single parse so that string-form references (e.g. "Fee" in
80-
// ["null", "Fee"]) can be expanded to their full Common shape. The
81-
// map is lazily allocated by ecsAvroParseFromBytes and shared by
82-
// reference through recursive ecsAvroFromAnyMap calls; mutations from
83-
// sub-trees propagate to later siblings as the Avro spec requires
84-
// (named types are lexically scoped from the schema root, and a name
85-
// must be defined before it is referenced).
78+
// names is the lexical-scope registry of resolved record/enum/fixed
79+
// definitions, keyed by Avro fullname (and short name for
80+
// convenience). Stored values must be treated as immutable — every
81+
// retrieval clones via cloneCommon so callers can mutate freely
82+
// without corrupting later look-ups.
8683
names map[string]schema.Common
84+
85+
// namespace is the enclosing Avro namespace, threaded through the
86+
// recursion by value. It is updated when entering a named-type
87+
// declaration that introduces a new namespace, per the Avro spec's
88+
// inheritance rule (a name with no dots and no `namespace` field
89+
// inherits the most tightly enclosing namespace).
90+
namespace string
8791
}
8892

8993
// ecsAvroParseFromBytes parses an Avro JSON spec into a schema.Common. The
@@ -134,12 +138,14 @@ func ecsAvroParseFromBytes(cfg ecsAvroConfig, specBytes []byte) (schema.Common,
134138
// resolved via cfg.names per the Avro lexical-scope rule,
135139
// - an inline type definition object (e.g. {"type":"record",...}).
136140
//
137-
// Returns (Common{}, false) when the union doesn't fit this shape (length
138-
// != 2, no "null" branch, two non-null branches, or an inline object that
139-
// fails to parse).
140-
func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common, bool) {
141+
// The matched bool reports whether the union has the [null, X] / [X, null]
142+
// shape; the error is non-nil only when the shape matched but resolving the
143+
// non-null branch failed (e.g. a malformed inline decimal). Callers must
144+
// surface the error rather than falling through to the general-union path —
145+
// the fall-through would also fail, with a less informative message.
146+
func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (resolved schema.Common, matched bool, err error) {
141147
if len(types) != 2 {
142-
return schema.Common{}, false
148+
return schema.Common{}, false, nil
143149
}
144150
var other any
145151
for _, t := range types {
@@ -148,19 +154,19 @@ func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common,
148154
}
149155
if other != nil {
150156
// Two non-null branches — not a nullable wrapper.
151-
return schema.Common{}, false
157+
return schema.Common{}, false, nil
152158
}
153159
other = t
154160
}
155161
if other == nil {
156-
return schema.Common{}, false
162+
return schema.Common{}, false, nil
157163
}
158-
inner, ok := ecsAvroResolveTypeRef(cfg, other)
159-
if !ok {
160-
return schema.Common{}, false
164+
inner, err := ecsAvroResolveTypeRef(cfg, other)
165+
if err != nil {
166+
return schema.Common{}, true, err
161167
}
162168
inner.Optional = true
163-
return inner, true
169+
return inner, true, nil
164170
}
165171

166172
// ecsAvroResolveTypeRef resolves a single Avro type reference — the value
@@ -169,28 +175,26 @@ func ecsAvroResolveOptionalUnion(cfg ecsAvroConfig, types []any) (schema.Common,
169175
// reference string (resolved via cfg.names), or an inline type definition
170176
// object.
171177
//
172-
// Returns (Common{}, false) only when an inline object fails to parse.
173-
// Unknown string names fall back to schema.Any so that downstream sinks
174-
// see a sensible (if structureless) column rather than a parse error.
175-
func ecsAvroResolveTypeRef(cfg ecsAvroConfig, ref any) (schema.Common, bool) {
178+
// Unknown string names fall back to schema.Any so downstream sinks see a
179+
// sensible (if structureless) column. An error is returned only when an
180+
// inline object fails to parse (the wrapped cause flows back to the caller)
181+
// or when ref is neither a string nor a map (a malformed Avro JSON shape
182+
// the upstream parser couldn't reject).
183+
func ecsAvroResolveTypeRef(cfg ecsAvroConfig, ref any) (schema.Common, error) {
176184
switch b := ref.(type) {
177185
case string:
178186
// Try the names map first so a name reference takes priority over
179187
// the schema.Any fallback in ecsAvroTypeToCommon. Primitive names
180188
// are never registered in the map, so primitives reach the
181189
// fallback unchanged.
182-
if resolved, ok := cfg.names[b]; ok {
183-
return resolved, true
190+
if resolved, ok := ecsAvroLookupName(cfg, b); ok {
191+
return resolved, nil
184192
}
185-
return schema.Common{Type: ecsAvroTypeToCommon(b)}, true
193+
return schema.Common{Type: ecsAvroTypeToCommon(b)}, nil
186194
case map[string]any:
187-
inner, err := ecsAvroFromAnyMap(cfg, b)
188-
if err != nil {
189-
return schema.Common{}, false
190-
}
191-
return inner, true
195+
return ecsAvroFromAnyMap(cfg, b)
192196
}
193-
return schema.Common{}, false
197+
return schema.Common{}, fmt.Errorf("expected type reference to be a string or object, got %T", ref)
194198
}
195199

196200
// applyAvroLogicalType reads the optional "logicalType" annotation from an
@@ -477,6 +481,56 @@ func applyKafkaConnectType(cfg ecsAvroConfig, c *schema.Common, as map[string]an
477481
}
478482
}
479483

484+
// ecsAvroLookupName resolves a string reference to a previously-registered
485+
// named type, applying Avro's name-resolution rules: a reference that
486+
// contains a dot is treated as a fullname; an unqualified reference is
487+
// looked up first against the enclosing namespace, then against the bare
488+
// name as a fallback for root-scope references.
489+
//
490+
// The returned Common is cloned so callers can mutate it freely without
491+
// corrupting the registered entry.
492+
func ecsAvroLookupName(cfg ecsAvroConfig, ref string) (schema.Common, bool) {
493+
if !strings.ContainsRune(ref, '.') && cfg.namespace != "" {
494+
if resolved, ok := cfg.names[cfg.namespace+"."+ref]; ok {
495+
return cloneCommon(resolved), true
496+
}
497+
}
498+
if resolved, ok := cfg.names[ref]; ok {
499+
return cloneCommon(resolved), true
500+
}
501+
return schema.Common{}, false
502+
}
503+
504+
// cloneCommon deep-copies a schema.Common, allocating fresh slice and
505+
// pointer storage for Children and Logical so the result aliases nothing
506+
// with the source.
507+
func cloneCommon(c schema.Common) schema.Common {
508+
if c.Children != nil {
509+
children := make([]schema.Common, len(c.Children))
510+
for i := range c.Children {
511+
children[i] = cloneCommon(c.Children[i])
512+
}
513+
c.Children = children
514+
}
515+
if c.Logical != nil {
516+
l := *c.Logical
517+
if l.Decimal != nil {
518+
d := *l.Decimal
519+
l.Decimal = &d
520+
}
521+
if l.Timestamp != nil {
522+
ts := *l.Timestamp
523+
l.Timestamp = &ts
524+
}
525+
if l.TimeOfDay != nil {
526+
tod := *l.TimeOfDay
527+
l.TimeOfDay = &tod
528+
}
529+
c.Logical = &l
530+
}
531+
return c
532+
}
533+
480534
func ecsAvroTypeToCommon(t string) schema.CommonType {
481535
switch t {
482536
case "record":
@@ -511,7 +565,10 @@ func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) er
511565
// [null, X] or [X, null] → Optional X. ecsAvroResolveOptionalUnion
512566
// handles primitive names, named-type references, and inline objects
513567
// in either ordering.
514-
if inner, ok := ecsAvroResolveOptionalUnion(cfg, types); ok {
568+
if inner, matched, err := ecsAvroResolveOptionalUnion(cfg, types); matched {
569+
if err != nil {
570+
return fmt.Errorf("union `%v`: %w", c.Name, err)
571+
}
515572
name := c.Name
516573
*c = inner
517574
if name != "" {
@@ -522,9 +579,9 @@ func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) er
522579

523580
c.Type = schema.Union
524581
for i, uObj := range types {
525-
child, ok := ecsAvroResolveTypeRef(cfg, uObj)
526-
if !ok {
527-
return fmt.Errorf("union `%v` child '%v': could not resolve type %T", c.Name, i, uObj)
582+
child, err := ecsAvroResolveTypeRef(cfg, uObj)
583+
if err != nil {
584+
return fmt.Errorf("union `%v` child '%v': %w", c.Name, i, err)
528585
}
529586
c.Children = append(c.Children, child)
530587
}
@@ -534,9 +591,9 @@ func ecsAvroHydrateRawUnion(cfg ecsAvroConfig, c *schema.Common, types []any) er
534591
func ecsAvroHydrateLameUnion(cfg ecsAvroConfig, c *schema.Common, types []any) error {
535592
c.Type = schema.Union
536593
for i, uObj := range types {
537-
childT, ok := ecsAvroResolveTypeRef(cfg, uObj)
538-
if !ok {
539-
return fmt.Errorf("union `%v` child '%v': could not resolve type %T", c.Name, i, uObj)
594+
childT, err := ecsAvroResolveTypeRef(cfg, uObj)
595+
if err != nil {
596+
return fmt.Errorf("union `%v` child '%v': %w", c.Name, i, err)
540597
}
541598
if s, isStr := uObj.(string); isStr {
542599
// Lame-union children keep the type-name as the Common.Name so
@@ -563,39 +620,84 @@ func ecsAvroHydrateLameUnion(cfg ecsAvroConfig, c *schema.Common, types []any) e
563620
}
564621

565622
func ecsAvroFromAnyMap(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) {
623+
// Pre-register a structural placeholder before walking children so a
624+
// self-reference (e.g. a linked-list record with a `next` field of its
625+
// own type) resolves to a one-level stub rather than collapsing to
626+
// schema.Any. The placeholder is overwritten with the fully-resolved
627+
// Common once the walk completes. Mutual recursion across distinct
628+
// records is still not supported — the second record's placeholder
629+
// does not exist while the first is being walked.
630+
typeName, _ := as["type"].(string)
631+
fullname, shortName, childNamespace := ecsAvroAssignFullname(cfg.namespace, typeName, as)
632+
if fullname != "" {
633+
placeholder := ecsAvroPlaceholder(typeName, shortName)
634+
cfg.names[fullname] = placeholder
635+
if shortName != "" && shortName != fullname {
636+
cfg.names[shortName] = placeholder
637+
}
638+
// Inheritable namespace propagates into the child walk; sibling
639+
// scopes are unaffected because cfg is passed by value.
640+
cfg.namespace = childNamespace
641+
}
642+
566643
c, err := ecsAvroFromAnyMapImpl(cfg, as)
567-
if err == nil {
568-
ecsAvroRegisterNamedType(cfg, as, c)
644+
if err == nil && fullname != "" {
645+
cfg.names[fullname] = c
646+
if shortName != "" && shortName != fullname {
647+
cfg.names[shortName] = c
648+
}
569649
}
570650
return c, err
571651
}
572652

573-
// ecsAvroRegisterNamedType records a resolved record/enum/fixed Common in
574-
// cfg.names so that later string-form references (e.g. "Fee" instead of an
575-
// inline record definition) can be expanded. Avro's lexical-scope rule
576-
// requires the name to appear before any reference, so a single forward-only
577-
// pass through the schema is sufficient. Recursive types — where a record's
578-
// own children reference it by name before its definition completes — are
579-
// not supported by this approach; the registered entry must not be mutated
580-
// after registration to avoid surprising aliasing with later look-ups.
581-
func ecsAvroRegisterNamedType(cfg ecsAvroConfig, as map[string]any, c schema.Common) {
582-
if cfg.names == nil {
583-
return
584-
}
585-
typeName, _ := as["type"].(string)
653+
// ecsAvroAssignFullname computes the Avro fullname of a named-type
654+
// declaration ([record, enum, fixed]) from its declaration map and the
655+
// enclosing namespace, alongside the short name and the namespace that
656+
// should be threaded into the child walk. Returns empty fullname when the
657+
// node is not a named-type declaration or lacks a `name` field.
658+
//
659+
// Per the Avro spec (`Names` section):
660+
// 1. If `name` contains a dot, it IS the fullname and any `namespace`
661+
// field is ignored.
662+
// 2. Else if `namespace` is set, the fullname is `namespace.name`.
663+
// 3. Else the fullname inherits the enclosing namespace.
664+
func ecsAvroAssignFullname(enclosing, typeName string, as map[string]any) (fullname, shortName, childNamespace string) {
586665
switch typeName {
587666
case "record", "enum", "fixed":
588667
default:
589-
return
668+
return "", "", enclosing
590669
}
591670
name, _ := as["name"].(string)
592671
if name == "" {
593-
return
672+
return "", "", enclosing
673+
}
674+
if idx := strings.LastIndex(name, "."); idx >= 0 {
675+
return name, name[idx+1:], name[:idx]
594676
}
595-
cfg.names[name] = c
596677
if ns, _ := as["namespace"].(string); ns != "" {
597-
cfg.names[ns+"."+name] = c
678+
return ns + "." + name, name, ns
679+
}
680+
if enclosing != "" {
681+
return enclosing + "." + name, name, enclosing
682+
}
683+
return name, name, ""
684+
}
685+
686+
// ecsAvroPlaceholder returns the structural stub that stands in for a
687+
// self-referencing named type while its definition is being walked. The
688+
// placeholder uses the short name (matching what ecsAvroFromAnyMapImpl
689+
// would set) and the closest leaf type so downstream sinks see a coherent
690+
// shape rather than schema.Any.
691+
func ecsAvroPlaceholder(typeName, shortName string) schema.Common {
692+
switch typeName {
693+
case "record":
694+
return schema.Common{Name: shortName, Type: schema.Object}
695+
case "enum":
696+
return schema.Common{Name: shortName, Type: schema.String}
697+
case "fixed":
698+
return schema.Common{Name: shortName, Type: schema.ByteArray}
598699
}
700+
return schema.Common{Name: shortName}
599701
}
600702

601703
func ecsAvroFromAnyMapImpl(cfg ecsAvroConfig, as map[string]any) (schema.Common, error) {
@@ -637,7 +739,7 @@ func ecsAvroFromAnyMapImpl(cfg ecsAvroConfig, as map[string]any) (schema.Common,
637739
case string:
638740
// String form may be a primitive type name OR a name reference to
639741
// a previously-defined record/enum/fixed in lexical scope.
640-
if resolved, ok := cfg.names[t]; ok {
742+
if resolved, ok := ecsAvroLookupName(cfg, t); ok {
641743
fieldName := c.Name
642744
c = resolved
643745
if fieldName != "" {

0 commit comments

Comments
 (0)