Skip to content

Commit 223ee6a

Browse files
twmbclaude
andcommitted
confluent,iceberg: preserve avro logical types end-to-end (#4399)
The schema-registry Avro decoder now honours every Avro logical type the spec defines (previously only `decimal`), and the iceberg connector maps the resulting schema metadata to the right column type and interprets numeric values using the schema's declared unit. Decoder (internal/impl/confluent/ecs_avro.go): - Replace the single decimal-only branch with a dispatcher covering timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos}, date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised logicalType annotations and primitive/logical-type mismatches fall back silently to the base primitive. Encoder (internal/impl/confluent/common_to_avro.go): - Symmetric encode for the new common types. Timestamp without explicit Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp, preserving pre-PR output for legacy schemas. Date and TimeOfDay paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day, AdjustToUTC for time-of-day) with field-naming errors rather than silently downcasting. Iceberg type resolver (internal/impl/iceberg/type_resolver.go): - Map schema.Timestamp through EffectiveTimestamp so legacy schemas keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos) with field-naming errors. Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go): - Plumb a fieldID -> *schema.Common map onto RecordShredder via SetFieldSchemaMetadata. The leaf-value converter looks up the metadata for time-typed columns and uses the declared unit to scale numeric inputs into the column's internal representation. Without metadata, the converter accepts time.Time / time.Duration directly and falls back to bloblang.ValueAsTimestamp's seconds-default for bare numerics — preserving existing behaviour for callers that genuinely store unix-seconds. - This closes the silent-corruption case where a numeric millisecond value declared by the schema as timestamp-millis would land ~50,000 years in the future when the column type flipped from BIGINT to TIMESTAMPTZ. The schema's declared unit is now the source of truth for unit interpretation. Iceberg writer (internal/impl/iceberg/writer.go, router.go): - NewWriter accepts the *typeResolver. The writer parses schema_metadata from the first message of a batch and builds the field-ID lookup the shredder consults. Internal API change only — the only call site is the router. Breaking surface is documented in CHANGELOG.md under Unreleased. Pipeline values flow through unchanged in both preserve_logical_types modes; bloblang behaviour and JSON output bytes are unaffected. The breakage is concentrated in (a) iceberg tables that already exist with BIGINT/INT/STRING columns from this bug, which hit Iceberg's schema-evolution wall, and (b) custom code that pattern-matches the historical schema_metadata shape via meta() lookups. Companion to redpanda-data/benthos#429 which adds the new schema.Common types and parameter blocks. The go.mod replace directive is a development crutch and must flip to a tagged release before merge. Closes #4399 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ad74617 commit 223ee6a

16 files changed

Lines changed: 1307 additions & 78 deletions

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,23 @@ Changelog
33

44
All notable changes to this project will be documented in this file.
55

6+
## Unreleased
7+
8+
### Fixed
9+
10+
- **BREAKING:** schema_registry_decode (avro): Avro logical types — `timestamp-{millis,micros,nanos}`, `local-timestamp-{millis,micros,nanos}`, `date`, `time-{millis,micros}`, and `uuid` — are now preserved end-to-end in the schema metadata produced by the schema-registry decoder. Previously only `decimal` was honoured; every other logical type silently degraded to its base primitive (`long`, `int`, or `string`). Downstream sinks that consume `schema_metadata` (notably `iceberg`) now create the correct column type. ([#4399](https://github.com/redpanda-data/connect/issues/4399))
11+
12+
Pipeline values flow through unchanged in both `preserve_logical_types=false` (default — values stay numeric) and `preserve_logical_types=true` (values stay rich Go time types). Bloblang behaviour and JSON-output bytes are unaffected.
13+
14+
**What this changes for existing pipelines:**
15+
- **iceberg with existing tables that have BIGINT / INT / STRING columns from this bug**: the connector now wants to create or evolve those columns to TIMESTAMP / TIMESTAMPTZ / DATE / TIME / UUID. Iceberg disallows BIGINT → TIMESTAMP schema evolution, so the first write after upgrade will fail loudly. Drop and re-create the table, or use Iceberg-native column-rename + add-new-column tooling to migrate before upgrading.
16+
- **Pipelines whose own code reads the `schema_metadata` bytes via `meta()`** and pattern-matches the historical INT64 shape: schemas now contain `TIMESTAMP` / `DATE` / `TIME_OF_DAY` / `UUID` along with new `unit` and `adjust_to_utc` fields. Update the pattern.
17+
- **iceberg shredder** is now schema-aware for numeric inputs: a numeric millisecond value declared by the schema as `timestamp-millis` is correctly interpreted as milliseconds rather than as Unix seconds. This closes a previously-silent corruption case where an int64 millis input into a TIMESTAMPTZ column would land ~50,000 years in the future.
18+
19+
### Changed
20+
21+
- iceberg: `NewWriter` now takes a `*typeResolver` argument so the writer can use schema metadata to interpret numeric inputs into time-typed columns at shredding time. Internal API change only.
22+
623
## 4.90.0 - 2026-04-30
724

825
### Added

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ require (
144144
github.com/rabbitmq/amqp091-go v1.10.0
145145
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
146146
github.com/redis/go-redis/v9 v9.18.0
147-
github.com/redpanda-data/benthos/v4 v4.72.0
147+
github.com/redpanda-data/benthos/v4 v4.72.1-0.20260506203543-b13320be0389
148148
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8
149149
github.com/redpanda-data/common-go/license v0.0.0-20260318014216-2bbd72bde0a0
150150
github.com/redpanda-data/common-go/redpanda-otel-exporter v0.4.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,8 +1551,8 @@ github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8A
15511551
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
15521552
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
15531553
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
1554-
github.com/redpanda-data/benthos/v4 v4.72.0 h1:Aj63fY6nZBGL17YDvHZV88caN2/jHlza9ZS57BGtxhs=
1555-
github.com/redpanda-data/benthos/v4 v4.72.0/go.mod h1:if/3gnj/gIz3mKIiz2MGF7gNag/gv7ak0snVxP81BM4=
1554+
github.com/redpanda-data/benthos/v4 v4.72.1-0.20260506203543-b13320be0389 h1:Zd/+nUlzUlEh1xpLuX1JHw1XDoqccnfTOKQfqCnXdLk=
1555+
github.com/redpanda-data/benthos/v4 v4.72.1-0.20260506203543-b13320be0389/go.mod h1:if/3gnj/gIz3mKIiz2MGF7gNag/gv7ak0snVxP81BM4=
15561556
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8 h1:hZTIp81OUDNOTCTD0gM01b1t821pDbToU9jWnZRnd/E=
15571557
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8/go.mod h1:sHhzCYf64ZYUBi7snbopQl+wQaKySbFsKCvGhmSckhk=
15581558
github.com/redpanda-data/common-go/license v0.0.0-20260318014216-2bbd72bde0a0 h1:xL2THs63tUTZmTiBfBm/mrjFMrwQaHKduvgQ6gIizXg=

internal/impl/confluent/common_to_avro.go

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,48 @@ func commonToAvroInner(c schema.Common, recordName, namespace string, isRoot boo
7272
case schema.Any:
7373
return "bytes", nil
7474
case schema.Timestamp:
75+
// Honour Logical.Timestamp params if present; legacy nil-Logical
76+
// schemas fall through EffectiveTimestamp() to {Millis, UTC},
77+
// preserving the pre-PR encoder output exactly.
78+
p := c.EffectiveTimestamp()
79+
base := "long"
80+
logicalName, err := avroTimestampLogicalName(p)
81+
if err != nil {
82+
return nil, fmt.Errorf("timestamp field %q: %w", c.Name, err)
83+
}
84+
return map[string]any{
85+
"type": base,
86+
"logicalType": logicalName,
87+
}, nil
88+
case schema.Date:
7589
return map[string]any{
76-
"type": "long",
77-
"logicalType": "timestamp-millis",
90+
"type": "int",
91+
"logicalType": "date",
92+
}, nil
93+
case schema.TimeOfDay:
94+
if c.Logical == nil || c.Logical.TimeOfDay == nil {
95+
return nil, fmt.Errorf("time-of-day field %q missing Logical.TimeOfDay parameters", c.Name)
96+
}
97+
// Avro time-{millis,micros} carry no zone semantics, so a
98+
// TimeOfDay{AdjustToUTC=true} cannot be expressed faithfully.
99+
// Reject loudly rather than silently dropping that bit.
100+
if c.Logical.TimeOfDay.AdjustToUTC {
101+
return nil, fmt.Errorf("time-of-day field %q has AdjustToUTC=true; Avro time-millis/time-micros have no UTC-adjust variant — coerce upstream before encoding", c.Name)
102+
}
103+
// Avro defines only time-millis (int) and time-micros (long); reject
104+
// anything else loudly rather than silently downcasting.
105+
switch c.Logical.TimeOfDay.Unit {
106+
case schema.TimeUnitMillis:
107+
return map[string]any{"type": "int", "logicalType": "time-millis"}, nil
108+
case schema.TimeUnitMicros:
109+
return map[string]any{"type": "long", "logicalType": "time-micros"}, nil
110+
default:
111+
return nil, fmt.Errorf("time-of-day field %q has unit %v which Avro cannot express; only MILLIS and MICROS are supported", c.Name, c.Logical.TimeOfDay.Unit)
112+
}
113+
case schema.UUID:
114+
return map[string]any{
115+
"type": "string",
116+
"logicalType": "uuid",
78117
}, nil
79118
case schema.Decimal:
80119
if c.Logical == nil || c.Logical.Decimal == nil {
@@ -172,6 +211,28 @@ func commonToAvroUnion(c schema.Common) (any, error) {
172211
return variants, nil
173212
}
174213

214+
// avroTimestampLogicalName picks the Avro logical-type name for a
215+
// TimestampParams value. The mapping mirrors the reverse path in
216+
// applyAvroLogicalType: AdjustToUTC=true picks `timestamp-*`, false picks
217+
// `local-timestamp-*`. Avro 1.10+ supports nanos; older brokers may reject it.
218+
func avroTimestampLogicalName(p schema.TimestampParams) (string, error) {
219+
var unit string
220+
switch p.Unit {
221+
case schema.TimeUnitMillis:
222+
unit = "millis"
223+
case schema.TimeUnitMicros:
224+
unit = "micros"
225+
case schema.TimeUnitNanos:
226+
unit = "nanos"
227+
default:
228+
return "", fmt.Errorf("unsupported timestamp unit %v (Avro supports only MILLIS, MICROS, NANOS)", p.Unit)
229+
}
230+
if p.AdjustToUTC {
231+
return "timestamp-" + unit, nil
232+
}
233+
return "local-timestamp-" + unit, nil
234+
}
235+
175236
// sanitizeAvroName derives a valid Avro name from an arbitrary subject string.
176237
// Avro names must match [A-Za-z_][A-Za-z0-9_]*. Invalid characters are replaced
177238
// with underscores and a leading digit is prefixed with an underscore.

internal/impl/confluent/common_to_avro_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,149 @@ func TestSanitizeAvroName(t *testing.T) {
219219
})
220220
}
221221
}
222+
223+
// TestCommonToAvroLogicalTypeRoundTrip drives both halves of the Avro
224+
// adapter to confirm encode/decode are symmetric for every logical type
225+
// the connector now preserves. A schema.Common produced by a synthetic
226+
// decoder run, re-encoded to Avro JSON, must yield byte-equivalent output
227+
// to the original Avro spec for the same field.
228+
func TestCommonToAvroLogicalTypeRoundTrip(t *testing.T) {
229+
cases := []struct {
230+
name string
231+
input schema.Common
232+
wantOuter map[string]any
233+
}{
234+
{
235+
name: "Timestamp default (millis, UTC)",
236+
input: schema.Common{Type: schema.Timestamp},
237+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-millis"},
238+
},
239+
{
240+
name: "Timestamp millis explicit",
241+
input: schema.Common{
242+
Type: schema.Timestamp,
243+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMillis, AdjustToUTC: true}},
244+
},
245+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-millis"},
246+
},
247+
{
248+
name: "Timestamp micros UTC",
249+
input: schema.Common{
250+
Type: schema.Timestamp,
251+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMicros, AdjustToUTC: true}},
252+
},
253+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-micros"},
254+
},
255+
{
256+
name: "Timestamp nanos UTC",
257+
input: schema.Common{
258+
Type: schema.Timestamp,
259+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitNanos, AdjustToUTC: true}},
260+
},
261+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-nanos"},
262+
},
263+
{
264+
name: "Timestamp local micros",
265+
input: schema.Common{
266+
Type: schema.Timestamp,
267+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMicros, AdjustToUTC: false}},
268+
},
269+
wantOuter: map[string]any{"type": "long", "logicalType": "local-timestamp-micros"},
270+
},
271+
{
272+
name: "Date",
273+
input: schema.Common{Type: schema.Date},
274+
wantOuter: map[string]any{"type": "int", "logicalType": "date"},
275+
},
276+
{
277+
name: "TimeOfDay millis",
278+
input: schema.Common{
279+
Type: schema.TimeOfDay,
280+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMillis}},
281+
},
282+
wantOuter: map[string]any{"type": "int", "logicalType": "time-millis"},
283+
},
284+
{
285+
name: "TimeOfDay micros",
286+
input: schema.Common{
287+
Type: schema.TimeOfDay,
288+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMicros}},
289+
},
290+
wantOuter: map[string]any{"type": "long", "logicalType": "time-micros"},
291+
},
292+
{
293+
name: "UUID",
294+
input: schema.Common{Type: schema.UUID},
295+
wantOuter: map[string]any{"type": "string", "logicalType": "uuid"},
296+
},
297+
}
298+
299+
for _, tc := range cases {
300+
t.Run(tc.name, func(t *testing.T) {
301+
got := avroUnmarshal(t, tc.input, "", "")
302+
m, ok := got.(map[string]any)
303+
require.True(t, ok, "expected object output, got %T", got)
304+
assert.Equal(t, tc.wantOuter["type"], m["type"])
305+
assert.Equal(t, tc.wantOuter["logicalType"], m["logicalType"])
306+
})
307+
}
308+
}
309+
310+
// TestCommonToAvroTimeOfDayRejectsAdjustToUTC verifies that the Avro
311+
// encoder refuses to emit a TimeOfDay schema with AdjustToUTC=true.
312+
// Avro's time-millis / time-micros logical types carry no UTC-adjust bit,
313+
// so silently dropping it would lose the metadata. Fail loud instead.
314+
func TestCommonToAvroTimeOfDayRejectsAdjustToUTC(t *testing.T) {
315+
c := schema.Common{
316+
Name: "shift_start",
317+
Type: schema.TimeOfDay,
318+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMicros, AdjustToUTC: true}},
319+
}
320+
_, err := commonToAvroSchema(c, "Row", "")
321+
require.Error(t, err)
322+
assert.Contains(t, err.Error(), "AdjustToUTC=true")
323+
assert.Contains(t, err.Error(), "shift_start")
324+
}
325+
326+
// TestCommonToAvroDecodeEncodeRoundTrip ensures that decoding an Avro spec
327+
// via ecsAvroParseFromBytes and re-encoding via commonToAvroSchema yields a
328+
// schema with the same logicalType annotation. This is the symmetry contract
329+
// future format adapters should maintain.
330+
func TestCommonToAvroDecodeEncodeRoundTrip(t *testing.T) {
331+
cases := []struct {
332+
name string
333+
fieldType string
334+
}{
335+
{"timestamp-millis", `{"type":"long","logicalType":"timestamp-millis"}`},
336+
{"timestamp-micros", `{"type":"long","logicalType":"timestamp-micros"}`},
337+
{"local-timestamp-millis", `{"type":"long","logicalType":"local-timestamp-millis"}`},
338+
{"date", `{"type":"int","logicalType":"date"}`},
339+
{"time-millis", `{"type":"int","logicalType":"time-millis"}`},
340+
{"time-micros", `{"type":"long","logicalType":"time-micros"}`},
341+
{"uuid", `{"type":"string","logicalType":"uuid"}`},
342+
}
343+
344+
for _, tc := range cases {
345+
t.Run(tc.name, func(t *testing.T) {
346+
spec := []byte(`{
347+
"type":"record","name":"Row",
348+
"fields":[{"name":"f","type":` + tc.fieldType + `}]
349+
}`)
350+
c, err := ecsAvroParseFromBytes(ecsAvroConfig{}, spec)
351+
require.NoError(t, err)
352+
require.Len(t, c.Children, 1)
353+
354+
// Re-encode the decoded child and verify the logicalType is
355+
// the same as what we started with.
356+
f := c.Children[0]
357+
out, err := commonToAvroSchema(f, "Inner", "")
358+
require.NoError(t, err)
359+
var rt map[string]any
360+
require.NoError(t, json.Unmarshal([]byte(out), &rt))
361+
362+
var want map[string]any
363+
require.NoError(t, json.Unmarshal([]byte(tc.fieldType), &want))
364+
assert.Equal(t, want["logicalType"], rt["logicalType"])
365+
})
366+
}
367+
}

0 commit comments

Comments
 (0)