Skip to content

public/schema: add Date, TimeOfDay, UUID and timestamp/time-of-day params#429

Merged
twmb merged 1 commit into
mainfrom
schema/avro-logical-types
May 6, 2026
Merged

public/schema: add Date, TimeOfDay, UUID and timestamp/time-of-day params#429
twmb merged 1 commit into
mainfrom
schema/avro-logical-types

Conversation

@twmb
Copy link
Copy Markdown
Contributor

@twmb twmb commented May 6, 2026

Summary

Extend the neutral schema language so format adapters can round-trip rich logical-type information without loss.

  • Adds CommonType variants Date, TimeOfDay, UUID.
  • Adds TimeUnit enum (Seconds/Millis/Micros/Nanos).
  • Adds TimestampParams and TimeOfDayParams under LogicalParams, each carrying a unit and an AdjustToUTC flag.
  • Adds Common.EffectiveTimestamp() helper for legacy nil-Logical defaulting (millis, UTC).
  • ToAny / ParseFromAny / Validate / fingerprint emit the new fields only when non-nil, so existing schemas keep their byte-identical serialisation and SchemaCache fingerprints.

Why

Companion to redpanda-data/connect#4399. The Confluent Schema Registry Avro decoder currently drops every Avro logicalType annotation other than decimal because there's no neutral-schema vocabulary for them. After this PR it has somewhere to put that information, and downstream sinks (e.g. Iceberg) can map it to the correct column type instead of silently bottoming out at BIGINT/INT/STRING.

Backwards compatibility

Scenario Behavior
Type == Timestamp with nil Logical Permitted. Treated as legacy default {Millis, AdjustToUTC: true} via EffectiveTimestamp(). ToAny output and fingerprint unchanged from today.
Type == TimeOfDay with nil Logical Rejected — no historical default to fall back to (the type is new).
LogicalParams.Timestamp or TimeOfDay on the wrong top-level type Rejected at Validate() and ParseFromAny().
SchemaCache keys for pre-PR schemas Unchanged. New TestFingerprintLegacyStability snapshots representative cases to prevent drift.

Test plan

  • task test passes — extends common_test.go, fingerprint_test.go, infer_from_any_test.go.
  • task lint is clean.
  • New TestFingerprintLegacyStability snapshots fingerprints for String, Int64 optional, Timestamp (legacy), Object, and parameterised Decimal. Drift here means a SchemaCache stampede for every consumer; the test fails loudly.
  • New TestParameterisedRoundTrip covers Date, UUID, every Timestamp/TimeOfDay unit × AdjustToUTC combo, and an Object containing a mix of parameterised children.
  • New TestParseFromAnyRejectsMisplacedParams ensures stray unit/adjust_to_utc on non-temporal types is rejected.

🤖 Generated with Claude Code

…rams

Extend the neutral schema language so logical-type information from formats
like Avro can round-trip without loss. Adds three new CommonType variants
(Date, TimeOfDay, UUID), a TimeUnit enum, and TimestampParams /
TimeOfDayParams blocks under LogicalParams that carry a unit and an
AdjustToUTC flag. EffectiveTimestamp() supplies the legacy default
(millis, UTC) when a Timestamp-typed schema has no Logical set, preserving
fingerprint stability for pre-existing schemas.

ToAny / ParseFromAny / Validate / fingerprint emit the new fields only when
non-nil so unaffected schemas keep their existing serialised bytes and
SchemaCache fingerprints. A new TestFingerprintLegacyStability locks down
the canonical form for representative pre-parameterised schemas.

Backwards-compat invariants:
- Type == Timestamp with nil Logical: permitted, treated as legacy default.
- Type == TimeOfDay with nil Logical: rejected (no historical default exists).
- LogicalParams.Timestamp / TimeOfDay on the wrong top-level type: rejected.

Companion to redpanda-data/connect#4399, which uses these new types to
preserve Avro logicalType annotations end-to-end through the schema-registry
decode -> Iceberg path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Commits
LGTM

Review
PR adds three new CommonType variants (Date, TimeOfDay, UUID), a TimeUnit enum, and TimestampParams/TimeOfDayParams blocks under LogicalParams. Backwards-compatibility for pre-parameterised Timestamp schemas is preserved by emitting params only when non-nil, and the new TestFingerprintLegacyStability locks in canonical fingerprint bytes for representative legacy schemas.

Validation correctly rejects cross-pollution (params on the wrong top-level type), parseFromAnyNoValidate cleanly rejects misplaced/incomplete unit/adjust_to_utc fields, and the round-trip tests cover all new variants including nested in containers.

LGTM

@twmb twmb merged commit b13320b into main May 6, 2026
4 checks passed
twmb added a commit to redpanda-data/connect that referenced this pull request May 6, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
twmb added a commit to redpanda-data/connect that referenced this pull request May 7, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
twmb added a commit to redpanda-data/connect that referenced this pull request May 7, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
twmb added a commit to redpanda-data/connect that referenced this pull request May 7, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail pushed a commit to redpanda-data/connect that referenced this pull request May 14, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail pushed a commit to redpanda-data/connect that referenced this pull request May 14, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail pushed a commit to redpanda-data/connect that referenced this pull request May 18, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Jeffail pushed a commit to redpanda-data/connect that referenced this pull request May 28, 2026
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants