Skip to content

Commit 51300d1

Browse files
twmbclaude
authored andcommitted
confluent,iceberg: preserve avro logical types end-to-end (#4399)
The schema-registry Avro decoder now honours every Avro logical type the spec defines (previously only `decimal`), and the iceberg connector maps the resulting schema metadata to the right column type and interprets numeric values using the schema's declared unit. Decoder (internal/impl/confluent/ecs_avro.go): - Replace the single decimal-only branch with a dispatcher covering timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos}, date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised logicalType annotations and primitive/logical-type mismatches fall back silently to the base primitive. Encoder (internal/impl/confluent/common_to_avro.go): - Symmetric encode for the new common types. Timestamp without explicit Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp, preserving pre-PR output for legacy schemas. Date and TimeOfDay paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day, AdjustToUTC for time-of-day) with field-naming errors rather than silently downcasting. Iceberg type resolver (internal/impl/iceberg/type_resolver.go): - Map schema.Timestamp through EffectiveTimestamp so legacy schemas keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos) with field-naming errors. Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go): - Plumb a fieldID -> *schema.Common map onto RecordShredder via SetFieldSchemaMetadata. The leaf-value converter looks up the metadata for time-typed columns and uses the declared unit to scale numeric inputs into the column's internal representation. Without metadata, the converter accepts time.Time / time.Duration directly and falls back to bloblang.ValueAsTimestamp's seconds-default for bare numerics — preserving existing behaviour for callers that genuinely store unix-seconds. - This closes the silent-corruption case where a numeric millisecond value declared by the schema as timestamp-millis would land ~50,000 years in the future when the column type flipped from BIGINT to TIMESTAMPTZ. The schema's declared unit is now the source of truth for unit interpretation. Iceberg writer (internal/impl/iceberg/writer.go, router.go): - NewWriter accepts the *typeResolver. The writer parses schema_metadata from the first message of a batch and builds the field-ID lookup the shredder consults. Internal API change only — the only call site is the router. Breaking surface is documented in CHANGELOG.md under Unreleased. Pipeline values flow through unchanged in both preserve_logical_types modes; bloblang behaviour and JSON output bytes are unaffected. The breakage is concentrated in (a) iceberg tables that already exist with BIGINT/INT/STRING columns from this bug, which hit Iceberg's schema-evolution wall, and (b) custom code that pattern-matches the historical schema_metadata shape via meta() lookups. Companion to redpanda-data/benthos#429 which adds the new schema.Common types and parameter blocks. The go.mod replace directive is a development crutch and must flip to a tagged release before merge. Closes #4399 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 99c3dee commit 51300d1

13 files changed

Lines changed: 1358 additions & 75 deletions

internal/impl/confluent/common_to_avro.go

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,48 @@ func commonToAvroInner(c schema.Common, recordName, namespace string, isRoot boo
7272
case schema.Any:
7373
return "bytes", nil
7474
case schema.Timestamp:
75+
// Honour Logical.Timestamp params if present; legacy nil-Logical
76+
// schemas fall through EffectiveTimestamp() to {Millis, UTC},
77+
// preserving the pre-PR encoder output exactly.
78+
p := c.EffectiveTimestamp()
79+
base := "long"
80+
logicalName, err := avroTimestampLogicalName(p)
81+
if err != nil {
82+
return nil, fmt.Errorf("timestamp field %q: %w", c.Name, err)
83+
}
84+
return map[string]any{
85+
"type": base,
86+
"logicalType": logicalName,
87+
}, nil
88+
case schema.Date:
7589
return map[string]any{
76-
"type": "long",
77-
"logicalType": "timestamp-millis",
90+
"type": "int",
91+
"logicalType": "date",
92+
}, nil
93+
case schema.TimeOfDay:
94+
if c.Logical == nil || c.Logical.TimeOfDay == nil {
95+
return nil, fmt.Errorf("time-of-day field %q missing Logical.TimeOfDay parameters", c.Name)
96+
}
97+
// Avro time-{millis,micros} carry no zone semantics, so a
98+
// TimeOfDay{AdjustToUTC=true} cannot be expressed faithfully.
99+
// Reject loudly rather than silently dropping that bit.
100+
if c.Logical.TimeOfDay.AdjustToUTC {
101+
return nil, fmt.Errorf("time-of-day field %q has AdjustToUTC=true; Avro time-millis/time-micros have no UTC-adjust variant — coerce upstream before encoding", c.Name)
102+
}
103+
// Avro defines only time-millis (int) and time-micros (long); reject
104+
// anything else loudly rather than silently downcasting.
105+
switch c.Logical.TimeOfDay.Unit {
106+
case schema.TimeUnitMillis:
107+
return map[string]any{"type": "int", "logicalType": "time-millis"}, nil
108+
case schema.TimeUnitMicros:
109+
return map[string]any{"type": "long", "logicalType": "time-micros"}, nil
110+
default:
111+
return nil, fmt.Errorf("time-of-day field %q has unit %v which Avro cannot express; only MILLIS and MICROS are supported", c.Name, c.Logical.TimeOfDay.Unit)
112+
}
113+
case schema.UUID:
114+
return map[string]any{
115+
"type": "string",
116+
"logicalType": "uuid",
78117
}, nil
79118
case schema.Decimal:
80119
if c.Logical == nil || c.Logical.Decimal == nil {
@@ -172,6 +211,28 @@ func commonToAvroUnion(c schema.Common) (any, error) {
172211
return variants, nil
173212
}
174213

214+
// avroTimestampLogicalName picks the Avro logical-type name for a
215+
// TimestampParams value. The mapping mirrors the reverse path in
216+
// applyAvroLogicalType: AdjustToUTC=true picks `timestamp-*`, false picks
217+
// `local-timestamp-*`. Avro 1.10+ supports nanos; older brokers may reject it.
218+
func avroTimestampLogicalName(p schema.TimestampParams) (string, error) {
219+
var unit string
220+
switch p.Unit {
221+
case schema.TimeUnitMillis:
222+
unit = "millis"
223+
case schema.TimeUnitMicros:
224+
unit = "micros"
225+
case schema.TimeUnitNanos:
226+
unit = "nanos"
227+
default:
228+
return "", fmt.Errorf("unsupported timestamp unit %v (Avro supports only MILLIS, MICROS, NANOS)", p.Unit)
229+
}
230+
if p.AdjustToUTC {
231+
return "timestamp-" + unit, nil
232+
}
233+
return "local-timestamp-" + unit, nil
234+
}
235+
175236
// sanitizeAvroName derives a valid Avro name from an arbitrary subject string.
176237
// Avro names must match [A-Za-z_][A-Za-z0-9_]*. Invalid characters are replaced
177238
// with underscores and a leading digit is prefixed with an underscore.

internal/impl/confluent/common_to_avro_test.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,186 @@ func TestSanitizeAvroName(t *testing.T) {
219219
})
220220
}
221221
}
222+
223+
// TestCommonToAvroLogicalTypeRoundTrip drives both halves of the Avro
224+
// adapter to confirm encode/decode are symmetric for every logical type
225+
// the connector now preserves. A schema.Common produced by a synthetic
226+
// decoder run, re-encoded to Avro JSON, must yield byte-equivalent output
227+
// to the original Avro spec for the same field.
228+
func TestCommonToAvroLogicalTypeRoundTrip(t *testing.T) {
229+
cases := []struct {
230+
name string
231+
input schema.Common
232+
wantOuter map[string]any
233+
}{
234+
{
235+
name: "Timestamp default (millis, UTC)",
236+
input: schema.Common{Type: schema.Timestamp},
237+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-millis"},
238+
},
239+
{
240+
name: "Timestamp millis explicit",
241+
input: schema.Common{
242+
Type: schema.Timestamp,
243+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMillis, AdjustToUTC: true}},
244+
},
245+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-millis"},
246+
},
247+
{
248+
name: "Timestamp micros UTC",
249+
input: schema.Common{
250+
Type: schema.Timestamp,
251+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMicros, AdjustToUTC: true}},
252+
},
253+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-micros"},
254+
},
255+
{
256+
name: "Timestamp nanos UTC",
257+
input: schema.Common{
258+
Type: schema.Timestamp,
259+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitNanos, AdjustToUTC: true}},
260+
},
261+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-nanos"},
262+
},
263+
{
264+
name: "Timestamp local micros",
265+
input: schema.Common{
266+
Type: schema.Timestamp,
267+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMicros, AdjustToUTC: false}},
268+
},
269+
wantOuter: map[string]any{"type": "long", "logicalType": "local-timestamp-micros"},
270+
},
271+
{
272+
name: "Date",
273+
input: schema.Common{Type: schema.Date},
274+
wantOuter: map[string]any{"type": "int", "logicalType": "date"},
275+
},
276+
{
277+
name: "TimeOfDay millis",
278+
input: schema.Common{
279+
Type: schema.TimeOfDay,
280+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMillis}},
281+
},
282+
wantOuter: map[string]any{"type": "int", "logicalType": "time-millis"},
283+
},
284+
{
285+
name: "TimeOfDay micros",
286+
input: schema.Common{
287+
Type: schema.TimeOfDay,
288+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMicros}},
289+
},
290+
wantOuter: map[string]any{"type": "long", "logicalType": "time-micros"},
291+
},
292+
{
293+
name: "UUID",
294+
input: schema.Common{Type: schema.UUID},
295+
wantOuter: map[string]any{"type": "string", "logicalType": "uuid"},
296+
},
297+
}
298+
299+
for _, tc := range cases {
300+
t.Run(tc.name, func(t *testing.T) {
301+
got := avroUnmarshal(t, tc.input, "", "")
302+
m, ok := got.(map[string]any)
303+
require.True(t, ok, "expected object output, got %T", got)
304+
assert.Equal(t, tc.wantOuter["type"], m["type"])
305+
assert.Equal(t, tc.wantOuter["logicalType"], m["logicalType"])
306+
})
307+
}
308+
}
309+
310+
// TestCommonToAvroTimeOfDayRejectsAdjustToUTC verifies that the Avro
311+
// encoder refuses to emit a TimeOfDay schema with AdjustToUTC=true.
312+
// Avro's time-millis / time-micros logical types carry no UTC-adjust bit,
313+
// so silently dropping it would lose the metadata. Fail loud instead.
314+
func TestCommonToAvroTimeOfDayRejectsAdjustToUTC(t *testing.T) {
315+
c := schema.Common{
316+
Name: "shift_start",
317+
Type: schema.TimeOfDay,
318+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMicros, AdjustToUTC: true}},
319+
}
320+
_, err := commonToAvroSchema(c, "Row", "")
321+
require.Error(t, err)
322+
assert.Contains(t, err.Error(), "AdjustToUTC=true")
323+
assert.Contains(t, err.Error(), "shift_start")
324+
}
325+
326+
// TestCommonToAvroTimeOfDayRejectsUnsupportedUnit verifies that the
327+
// encoder refuses TimeOfDay schemas with units Avro doesn't define
328+
// (Seconds, Nanos), with a field-naming error mentioning the supported
329+
// units. Without this guard, an encoder downcast would silently change
330+
// resolution.
331+
func TestCommonToAvroTimeOfDayRejectsUnsupportedUnit(t *testing.T) {
332+
for _, u := range []schema.TimeUnit{schema.TimeUnitSeconds, schema.TimeUnitNanos} {
333+
t.Run(u.String(), func(t *testing.T) {
334+
c := schema.Common{
335+
Name: "open_at",
336+
Type: schema.TimeOfDay,
337+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: u}},
338+
}
339+
_, err := commonToAvroSchema(c, "Row", "")
340+
require.Error(t, err)
341+
assert.Contains(t, err.Error(), "MILLIS and MICROS")
342+
assert.Contains(t, err.Error(), "open_at")
343+
})
344+
}
345+
}
346+
347+
// TestCommonToAvroTimestampRejectsUnsupportedUnit verifies that the
348+
// avroTimestampLogicalName helper rejects timestamp units Avro doesn't
349+
// define (Seconds, and any other invalid TimeUnit). Avro 1.10+ supports
350+
// millis/micros/nanos.
351+
func TestCommonToAvroTimestampRejectsUnsupportedUnit(t *testing.T) {
352+
c := schema.Common{
353+
Name: "event_time",
354+
Type: schema.Timestamp,
355+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitSeconds, AdjustToUTC: true}},
356+
}
357+
_, err := commonToAvroSchema(c, "Row", "")
358+
require.Error(t, err)
359+
assert.Contains(t, err.Error(), "event_time")
360+
assert.Contains(t, err.Error(), "MILLIS, MICROS, NANOS")
361+
}
362+
363+
// TestCommonToAvroDecodeEncodeRoundTrip ensures that decoding an Avro spec
364+
// via ecsAvroParseFromBytes and re-encoding via commonToAvroSchema yields a
365+
// schema with the same logicalType annotation. This is the symmetry contract
366+
// future format adapters should maintain.
367+
func TestCommonToAvroDecodeEncodeRoundTrip(t *testing.T) {
368+
cases := []struct {
369+
name string
370+
fieldType string
371+
}{
372+
{"timestamp-millis", `{"type":"long","logicalType":"timestamp-millis"}`},
373+
{"timestamp-micros", `{"type":"long","logicalType":"timestamp-micros"}`},
374+
{"local-timestamp-millis", `{"type":"long","logicalType":"local-timestamp-millis"}`},
375+
{"date", `{"type":"int","logicalType":"date"}`},
376+
{"time-millis", `{"type":"int","logicalType":"time-millis"}`},
377+
{"time-micros", `{"type":"long","logicalType":"time-micros"}`},
378+
{"uuid", `{"type":"string","logicalType":"uuid"}`},
379+
}
380+
381+
for _, tc := range cases {
382+
t.Run(tc.name, func(t *testing.T) {
383+
spec := []byte(`{
384+
"type":"record","name":"Row",
385+
"fields":[{"name":"f","type":` + tc.fieldType + `}]
386+
}`)
387+
c, err := ecsAvroParseFromBytes(ecsAvroConfig{}, spec)
388+
require.NoError(t, err)
389+
require.Len(t, c.Children, 1)
390+
391+
// Re-encode the decoded child and verify the logicalType is
392+
// the same as what we started with.
393+
f := c.Children[0]
394+
out, err := commonToAvroSchema(f, "Inner", "")
395+
require.NoError(t, err)
396+
var rt map[string]any
397+
require.NoError(t, json.Unmarshal([]byte(out), &rt))
398+
399+
var want map[string]any
400+
require.NoError(t, json.Unmarshal([]byte(tc.fieldType), &want))
401+
assert.Equal(t, want["logicalType"], rt["logicalType"])
402+
})
403+
}
404+
}

0 commit comments

Comments
 (0)