Skip to content

Commit 5cfd559

Browse files
twmbclaude
andcommitted
confluent,iceberg: preserve avro logical types end-to-end (#4399)
The schema-registry Avro decoder now honours every Avro logical type the spec defines (previously only `decimal`), and the iceberg connector maps the resulting schema metadata to the right column type and interprets numeric values using the schema's declared unit. Decoder (internal/impl/confluent/ecs_avro.go): - Replace the single decimal-only branch with a dispatcher covering timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos}, date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised logicalType annotations and primitive/logical-type mismatches fall back silently to the base primitive. Encoder (internal/impl/confluent/common_to_avro.go): - Symmetric encode for the new common types. Timestamp without explicit Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp, preserving pre-PR output for legacy schemas. Date and TimeOfDay paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day, AdjustToUTC for time-of-day) with field-naming errors rather than silently downcasting. Iceberg type resolver (internal/impl/iceberg/type_resolver.go): - Map schema.Timestamp through EffectiveTimestamp so legacy schemas keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos) with field-naming errors. Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go): - Plumb a fieldID -> *schema.Common map onto RecordShredder via SetFieldSchemaMetadata. The leaf-value converter looks up the metadata for time-typed columns and uses the declared unit to scale numeric inputs into the column's internal representation. Without metadata, the converter accepts time.Time / time.Duration directly and falls back to bloblang.ValueAsTimestamp's seconds-default for bare numerics — preserving existing behaviour for callers that genuinely store unix-seconds. - This closes the silent-corruption case where a numeric millisecond value declared by the schema as timestamp-millis would land ~50,000 years in the future when the column type flipped from BIGINT to TIMESTAMPTZ. The schema's declared unit is now the source of truth for unit interpretation. Iceberg writer (internal/impl/iceberg/writer.go, router.go): - NewWriter accepts the *typeResolver. The writer parses schema_metadata from the first message of a batch and builds the field-ID lookup the shredder consults. Internal API change only — the only call site is the router. Breaking surface is documented in CHANGELOG.md under Unreleased. Pipeline values flow through unchanged in both preserve_logical_types modes; bloblang behaviour and JSON output bytes are unaffected. The breakage is concentrated in (a) iceberg tables that already exist with BIGINT/INT/STRING columns from this bug, which hit Iceberg's schema-evolution wall, and (b) custom code that pattern-matches the historical schema_metadata shape via meta() lookups. Companion to redpanda-data/benthos#429 which adds the new schema.Common types and parameter blocks. The go.mod replace directive is a development crutch and must flip to a tagged release before merge. Closes #4399 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4c25d02 commit 5cfd559

15 files changed

Lines changed: 1096 additions & 77 deletions

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,23 @@ Changelog
33

44
All notable changes to this project will be documented in this file.
55

6+
## Unreleased
7+
8+
### Fixed
9+
10+
- **BREAKING:** schema_registry_decode (avro): Avro logical types — `timestamp-{millis,micros,nanos}`, `local-timestamp-{millis,micros,nanos}`, `date`, `time-{millis,micros}`, and `uuid` — are now preserved end-to-end in the schema metadata produced by the schema-registry decoder. Previously only `decimal` was honoured; every other logical type silently degraded to its base primitive (`long`, `int`, or `string`). Downstream sinks that consume `schema_metadata` (notably `iceberg`) now create the correct column type. ([#4399](https://github.com/redpanda-data/connect/issues/4399))
11+
12+
Pipeline values flow through unchanged in both `preserve_logical_types=false` (default — values stay numeric) and `preserve_logical_types=true` (values stay rich Go time types). Bloblang behaviour and JSON-output bytes are unaffected.
13+
14+
**What this changes for existing pipelines:**
15+
- **iceberg with existing tables that have BIGINT / INT / STRING columns from this bug**: the connector now wants to create or evolve those columns to TIMESTAMP / TIMESTAMPTZ / DATE / TIME / UUID. Iceberg disallows BIGINT → TIMESTAMP schema evolution, so the first write after upgrade will fail loudly. Drop and re-create the table, or use Iceberg-native column-rename + add-new-column tooling to migrate before upgrading.
16+
- **Pipelines whose own code reads the `schema_metadata` bytes via `meta()`** and pattern-matches the historical INT64 shape: schemas now contain `TIMESTAMP` / `DATE` / `TIME_OF_DAY` / `UUID` along with new `unit` and `adjust_to_utc` fields. Update the pattern.
17+
- **iceberg shredder** is now schema-aware for numeric inputs: a numeric millisecond value declared by the schema as `timestamp-millis` is correctly interpreted as milliseconds rather than as Unix seconds. This closes a previously-silent corruption case where an int64 millis input into a TIMESTAMPTZ column would land ~50,000 years in the future.
18+
19+
### Changed
20+
21+
- iceberg: `NewWriter` now takes a `*typeResolver` argument so the writer can use schema metadata to interpret numeric inputs into time-typed columns at shredding time. Internal API change only.
22+
623
## 4.90.0 - 2026-04-30
724

825
### Added

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ go 1.26.2
44

55
replace github.com/99designs/keyring => github.com/Jeffail/keyring v1.2.3
66

7+
// Local development against the Avro logical-types schema extension.
8+
// Replace with a tagged release once redpanda-data/benthos#429 is merged.
9+
replace github.com/redpanda-data/benthos/v4 => ../benthos-4
10+
711
ignore (
812
./bin
913
./config

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,8 +1553,6 @@ github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8A
15531553
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
15541554
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
15551555
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
1556-
github.com/redpanda-data/benthos/v4 v4.72.0 h1:Aj63fY6nZBGL17YDvHZV88caN2/jHlza9ZS57BGtxhs=
1557-
github.com/redpanda-data/benthos/v4 v4.72.0/go.mod h1:if/3gnj/gIz3mKIiz2MGF7gNag/gv7ak0snVxP81BM4=
15581556
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8 h1:hZTIp81OUDNOTCTD0gM01b1t821pDbToU9jWnZRnd/E=
15591557
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8/go.mod h1:sHhzCYf64ZYUBi7snbopQl+wQaKySbFsKCvGhmSckhk=
15601558
github.com/redpanda-data/common-go/license v0.0.0-20260318014216-2bbd72bde0a0 h1:xL2THs63tUTZmTiBfBm/mrjFMrwQaHKduvgQ6gIizXg=

internal/impl/confluent/common_to_avro.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,42 @@ func commonToAvroInner(c schema.Common, recordName, namespace string, isRoot boo
7272
case schema.Any:
7373
return "bytes", nil
7474
case schema.Timestamp:
75+
// Honour Logical.Timestamp params if present; legacy nil-Logical
76+
// schemas fall through EffectiveTimestamp() to {Millis, UTC},
77+
// preserving the pre-PR encoder output exactly.
78+
p := c.EffectiveTimestamp()
79+
base := "long"
80+
logicalName, err := avroTimestampLogicalName(p)
81+
if err != nil {
82+
return nil, fmt.Errorf("timestamp field %q: %w", c.Name, err)
83+
}
84+
return map[string]any{
85+
"type": base,
86+
"logicalType": logicalName,
87+
}, nil
88+
case schema.Date:
89+
return map[string]any{
90+
"type": "int",
91+
"logicalType": "date",
92+
}, nil
93+
case schema.TimeOfDay:
94+
if c.Logical == nil || c.Logical.TimeOfDay == nil {
95+
return nil, fmt.Errorf("time-of-day field %q missing Logical.TimeOfDay parameters", c.Name)
96+
}
97+
// Avro defines only time-millis (int) and time-micros (long); reject
98+
// anything else loudly rather than silently downcasting.
99+
switch c.Logical.TimeOfDay.Unit {
100+
case schema.TimeUnitMillis:
101+
return map[string]any{"type": "int", "logicalType": "time-millis"}, nil
102+
case schema.TimeUnitMicros:
103+
return map[string]any{"type": "long", "logicalType": "time-micros"}, nil
104+
default:
105+
return nil, fmt.Errorf("time-of-day field %q has unit %v which Avro cannot express; only MILLIS and MICROS are supported", c.Name, c.Logical.TimeOfDay.Unit)
106+
}
107+
case schema.UUID:
75108
return map[string]any{
76-
"type": "long",
77-
"logicalType": "timestamp-millis",
109+
"type": "string",
110+
"logicalType": "uuid",
78111
}, nil
79112
case schema.Decimal:
80113
if c.Logical == nil || c.Logical.Decimal == nil {
@@ -172,6 +205,28 @@ func commonToAvroUnion(c schema.Common) (any, error) {
172205
return variants, nil
173206
}
174207

208+
// avroTimestampLogicalName picks the Avro logical-type name for a
209+
// TimestampParams value. The mapping mirrors the reverse path in
210+
// applyAvroLogicalType: AdjustToUTC=true picks `timestamp-*`, false picks
211+
// `local-timestamp-*`. Avro 1.10+ supports nanos; older brokers may reject it.
212+
func avroTimestampLogicalName(p schema.TimestampParams) (string, error) {
213+
var unit string
214+
switch p.Unit {
215+
case schema.TimeUnitMillis:
216+
unit = "millis"
217+
case schema.TimeUnitMicros:
218+
unit = "micros"
219+
case schema.TimeUnitNanos:
220+
unit = "nanos"
221+
default:
222+
return "", fmt.Errorf("unsupported timestamp unit %v (Avro supports only MILLIS, MICROS, NANOS)", p.Unit)
223+
}
224+
if p.AdjustToUTC {
225+
return "timestamp-" + unit, nil
226+
}
227+
return "local-timestamp-" + unit, nil
228+
}
229+
175230
// sanitizeAvroName derives a valid Avro name from an arbitrary subject string.
176231
// Avro names must match [A-Za-z_][A-Za-z0-9_]*. Invalid characters are replaced
177232
// with underscores and a leading digit is prefixed with an underscore.

internal/impl/confluent/common_to_avro_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,133 @@ func TestSanitizeAvroName(t *testing.T) {
219219
})
220220
}
221221
}
222+
223+
// TestCommonToAvroLogicalTypeRoundTrip drives both halves of the Avro
224+
// adapter to confirm encode/decode are symmetric for every logical type
225+
// the connector now preserves. A schema.Common produced by a synthetic
226+
// decoder run, re-encoded to Avro JSON, must yield byte-equivalent output
227+
// to the original Avro spec for the same field.
228+
func TestCommonToAvroLogicalTypeRoundTrip(t *testing.T) {
229+
cases := []struct {
230+
name string
231+
input schema.Common
232+
wantOuter map[string]any
233+
}{
234+
{
235+
name: "Timestamp default (millis, UTC)",
236+
input: schema.Common{Type: schema.Timestamp},
237+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-millis"},
238+
},
239+
{
240+
name: "Timestamp millis explicit",
241+
input: schema.Common{
242+
Type: schema.Timestamp,
243+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMillis, AdjustToUTC: true}},
244+
},
245+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-millis"},
246+
},
247+
{
248+
name: "Timestamp micros UTC",
249+
input: schema.Common{
250+
Type: schema.Timestamp,
251+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMicros, AdjustToUTC: true}},
252+
},
253+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-micros"},
254+
},
255+
{
256+
name: "Timestamp nanos UTC",
257+
input: schema.Common{
258+
Type: schema.Timestamp,
259+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitNanos, AdjustToUTC: true}},
260+
},
261+
wantOuter: map[string]any{"type": "long", "logicalType": "timestamp-nanos"},
262+
},
263+
{
264+
name: "Timestamp local micros",
265+
input: schema.Common{
266+
Type: schema.Timestamp,
267+
Logical: &schema.LogicalParams{Timestamp: &schema.TimestampParams{Unit: schema.TimeUnitMicros, AdjustToUTC: false}},
268+
},
269+
wantOuter: map[string]any{"type": "long", "logicalType": "local-timestamp-micros"},
270+
},
271+
{
272+
name: "Date",
273+
input: schema.Common{Type: schema.Date},
274+
wantOuter: map[string]any{"type": "int", "logicalType": "date"},
275+
},
276+
{
277+
name: "TimeOfDay millis",
278+
input: schema.Common{
279+
Type: schema.TimeOfDay,
280+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMillis}},
281+
},
282+
wantOuter: map[string]any{"type": "int", "logicalType": "time-millis"},
283+
},
284+
{
285+
name: "TimeOfDay micros",
286+
input: schema.Common{
287+
Type: schema.TimeOfDay,
288+
Logical: &schema.LogicalParams{TimeOfDay: &schema.TimeOfDayParams{Unit: schema.TimeUnitMicros}},
289+
},
290+
wantOuter: map[string]any{"type": "long", "logicalType": "time-micros"},
291+
},
292+
{
293+
name: "UUID",
294+
input: schema.Common{Type: schema.UUID},
295+
wantOuter: map[string]any{"type": "string", "logicalType": "uuid"},
296+
},
297+
}
298+
299+
for _, tc := range cases {
300+
t.Run(tc.name, func(t *testing.T) {
301+
got := avroUnmarshal(t, tc.input, "", "")
302+
m, ok := got.(map[string]any)
303+
require.True(t, ok, "expected object output, got %T", got)
304+
assert.Equal(t, tc.wantOuter["type"], m["type"])
305+
assert.Equal(t, tc.wantOuter["logicalType"], m["logicalType"])
306+
})
307+
}
308+
}
309+
310+
// TestCommonToAvroDecodeEncodeRoundTrip ensures that decoding an Avro spec
311+
// via ecsAvroParseFromBytes and re-encoding via commonToAvroSchema yields a
312+
// schema with the same logicalType annotation. This is the symmetry contract
313+
// future format adapters should maintain.
314+
func TestCommonToAvroDecodeEncodeRoundTrip(t *testing.T) {
315+
cases := []struct {
316+
name string
317+
fieldType string
318+
}{
319+
{"timestamp-millis", `{"type":"long","logicalType":"timestamp-millis"}`},
320+
{"timestamp-micros", `{"type":"long","logicalType":"timestamp-micros"}`},
321+
{"local-timestamp-millis", `{"type":"long","logicalType":"local-timestamp-millis"}`},
322+
{"date", `{"type":"int","logicalType":"date"}`},
323+
{"time-millis", `{"type":"int","logicalType":"time-millis"}`},
324+
{"time-micros", `{"type":"long","logicalType":"time-micros"}`},
325+
{"uuid", `{"type":"string","logicalType":"uuid"}`},
326+
}
327+
328+
for _, tc := range cases {
329+
t.Run(tc.name, func(t *testing.T) {
330+
spec := []byte(`{
331+
"type":"record","name":"Row",
332+
"fields":[{"name":"f","type":` + tc.fieldType + `}]
333+
}`)
334+
c, err := ecsAvroParseFromBytes(ecsAvroConfig{}, spec)
335+
require.NoError(t, err)
336+
require.Len(t, c.Children, 1)
337+
338+
// Re-encode the decoded child and verify the logicalType is
339+
// the same as what we started with.
340+
f := c.Children[0]
341+
out, err := commonToAvroSchema(f, "Inner", "")
342+
require.NoError(t, err)
343+
var rt map[string]any
344+
require.NoError(t, json.Unmarshal([]byte(out), &rt))
345+
346+
var want map[string]any
347+
require.NoError(t, json.Unmarshal([]byte(tc.fieldType), &want))
348+
assert.Equal(t, want["logicalType"], rt["logicalType"])
349+
})
350+
}
351+
}

0 commit comments

Comments
 (0)