public/schema: add Date, TimeOfDay, UUID and timestamp/time-of-day params#429
Merged
Conversation
…rams Extend the neutral schema language so logical-type information from formats like Avro can round-trip without loss. Adds three new CommonType variants (Date, TimeOfDay, UUID), a TimeUnit enum, and TimestampParams / TimeOfDayParams blocks under LogicalParams that carry a unit and an AdjustToUTC flag. EffectiveTimestamp() supplies the legacy default (millis, UTC) when a Timestamp-typed schema has no Logical set, preserving fingerprint stability for pre-existing schemas. ToAny / ParseFromAny / Validate / fingerprint emit the new fields only when non-nil so unaffected schemas keep their existing serialised bytes and SchemaCache fingerprints. A new TestFingerprintLegacyStability locks down the canonical form for representative pre-parameterised schemas. Backwards-compat invariants: - Type == Timestamp with nil Logical: permitted, treated as legacy default. - Type == TimeOfDay with nil Logical: rejected (no historical default exists). - LogicalParams.Timestamp / TimeOfDay on the wrong top-level type: rejected. Companion to redpanda-data/connect#4399, which uses these new types to preserve Avro logicalType annotations end-to-end through the schema-registry decode -> Iceberg path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Commits Review Validation correctly rejects cross-pollution (params on the wrong top-level type), LGTM |
4 tasks
josephwoodward
approved these changes
May 6, 2026
squiidz
approved these changes
May 6, 2026
twmb
added a commit
to redpanda-data/connect
that referenced
this pull request
May 6, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
twmb
added a commit
to redpanda-data/connect
that referenced
this pull request
May 7, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
twmb
added a commit
to redpanda-data/connect
that referenced
this pull request
May 7, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
twmb
added a commit
to redpanda-data/connect
that referenced
this pull request
May 7, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail
pushed a commit
to redpanda-data/connect
that referenced
this pull request
May 14, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail
pushed a commit
to redpanda-data/connect
that referenced
this pull request
May 14, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail
pushed a commit
to redpanda-data/connect
that referenced
this pull request
May 18, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail
pushed a commit
to redpanda-data/connect
that referenced
this pull request
May 28, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Extend the neutral schema language so format adapters can round-trip rich logical-type information without loss.
CommonTypevariantsDate,TimeOfDay,UUID.TimeUnitenum (Seconds/Millis/Micros/Nanos).TimestampParamsandTimeOfDayParamsunderLogicalParams, each carrying a unit and anAdjustToUTCflag.Common.EffectiveTimestamp()helper for legacy nil-Logicaldefaulting (millis, UTC).ToAny/ParseFromAny/Validate/fingerprintemit the new fields only when non-nil, so existing schemas keep their byte-identical serialisation and SchemaCache fingerprints.Why
Companion to redpanda-data/connect#4399. The Confluent Schema Registry Avro decoder currently drops every Avro
logicalTypeannotation other thandecimalbecause there's no neutral-schema vocabulary for them. After this PR it has somewhere to put that information, and downstream sinks (e.g. Iceberg) can map it to the correct column type instead of silently bottoming out at BIGINT/INT/STRING.Backwards compatibility
Type == Timestampwithnil Logical{Millis, AdjustToUTC: true}viaEffectiveTimestamp(). ToAny output and fingerprint unchanged from today.Type == TimeOfDaywithnil LogicalLogicalParams.TimestamporTimeOfDayon the wrong top-level typeValidate()andParseFromAny().SchemaCachekeys for pre-PR schemasTestFingerprintLegacyStabilitysnapshots representative cases to prevent drift.Test plan
task testpasses — extendscommon_test.go,fingerprint_test.go,infer_from_any_test.go.task lintis clean.TestFingerprintLegacyStabilitysnapshots fingerprints forString,Int64 optional,Timestamp(legacy),Object, and parameterisedDecimal. Drift here means a SchemaCache stampede for every consumer; the test fails loudly.TestParameterisedRoundTripcoversDate,UUID, everyTimestamp/TimeOfDayunit ×AdjustToUTCcombo, and anObjectcontaining a mix of parameterised children.TestParseFromAnyRejectsMisplacedParamsensures strayunit/adjust_to_utcon non-temporal types is rejected.🤖 Generated with Claude Code