Skip to content

confluent,iceberg,parquet: preserve avro logical types end-to-end across schema_metadata consumers#4427

Open
Jeffail wants to merge 13 commits into
mainfrom
fix/avro-logical-types-end-to-end
Open

confluent,iceberg,parquet: preserve avro logical types end-to-end across schema_metadata consumers#4427
Jeffail wants to merge 13 commits into
mainfrom
fix/avro-logical-types-end-to-end

Conversation

@Jeffail
Copy link
Copy Markdown
Contributor

@Jeffail Jeffail commented May 14, 2026

Summary

Closes the GF iceberg-pipeline issue — time.Time values from schema_registry_decode failing the iceberg shredder with "expected number value, got timestamp" against BIGINT columns — and cascades the same fix class through every other consumer of the schema.Common metadata format. Adds a Claude skill to catch the same drift in future.

The underlying bug class is a value-vs-metadata mismatch: the value-side decoder honours an Avro annotation but the metadata-side parser doesn't, so downstream sinks (notably iceberg) create the wrong column type, then fail to write the typed value into it. Four distinct annotations all triggered this shape:

  1. Field-level (sibling-of-type) logicalType — the Java/JDBC Avro idiom emitted by Oracle CDC and most JDBC tooling.
  2. Debezium connect.name temporal annotations.
  3. Avro duration logical type.
  4. Cross-consumer gaps (parquet, JSON Schema, iceberg Map/Union) found by the post-fix audit.

Commit narrative

11 commits, intentionally ordered as five narrative phases:

Phase Commits
Foundation (Travis's prior #4399 work) 1a68504fb
Customer fix (field-level lift + upstream bump) 7ea489942, d84edb87a
Rolling-upgrade support (opt-in strict mode, then the coerce bridge that needs it, then the gate that makes the whole thing opt-in via preserve_logical_types) 821468858, 008c03bf8, 0e771a9db
Audit-found gaps in other schema.Common consumers 9879e3b79 (Debezium), e25f69a5b (parquet), 25fd6455d (iceberg Map/Union), bda5ab073 (JSON Schema)
Drift prevention 69275f6c0 (audit skill)

Each commit is self-contained, passes its own test surface, and reads top-to-bottom as a story.

What this changes for users

Default users (preserve_logical_types: false): zero behaviour change.

Users on preserve_logical_types: true:

  • schema_metadata now faithfully reports TIMESTAMP / DATE / TIME_OF_DAY / UUID for fields whose Avro schemas declare those logical types (including the Java/JDBC sibling-of-type idiom).
  • Iceberg auto-creates the correct column types on fresh tables.
  • Iceberg coerce-on-write writes time.Time values into existing BIGINT/INT columns by scaling to the wire-equivalent integer using the schema metadata's declared Unit — operators with affected tables can upgrade without dropping them. An INFO-level log fires once per divergent column for traceability.
  • Setting require_schema_metadata: true flips the coerce into a hard error — for operators who want loud failure on schema drift rather than silent reconciliation.

Users on translate_kafka_connect_types: true: Debezium connect.name temporal annotations (io.debezium.time.Date, Timestamp, Time, MicroTimestamp, MicroTime, NanoTimestamp, NanoTime, ZonedTimestamp, Year) are now also recognised on the metadata side, matching the value-side translation.

Parquet-encode users: schemas declaring Date, TimeOfDay, UUID, or Map no longer fail with the generic unsupported by this processor error. Union and Null still error but with messages that name the constraint and point at the upstream coercion remedy. time.Time / time.Duration values now flow through to TIMESTAMP / DATE / TIME columns without requiring upstream conversion to RFC3339 strings.

Schema Registry encode (JSON Schema): Date, TimeOfDay, and UUID now produce the matching JSON Schema draft 2019-09 format keywords.

Drift prevention

.claude/skills/common-schema-audit/SKILL.md codifies the audit workflow for every schema.Common consumer. Invoke as /common-schema-audit to enumerate variants from the live benthos source, find every consumer via schema.ParseFromAny, audit type-coverage and value-coercion per consumer in parallel, and produce a matrix with per-consumer GAP / PARTIAL / COVERED verdicts. The skill caught a real iceberg Map/Union gap on its first run, which is included as a fix in this PR.

Upstream dependency bump

github.com/twmb/avro bumped to v1.7.3-0.20260513193503-1e5c2a3fc070 (pseudo-version pinning the merge SHA of twmb/avro#38). The upstream PR adds the field-level logicalType lift on the value-decode side, so time.Time flows through natively for sibling-form schemas. A proper v1.7.3 tag is pending from Travis; we'll bump again to the tag once it lands.

The bump also picks up twmb/avro#39's spec-compliance pass, which changed the default-mode JSON serialisation of decimal values to the spec-blessed codepoint-mapped string form (matching Java's JsonEncoder). Test expectations updated accordingly; users on preserve_logical_types: true are unaffected because our decimal CustomType still produces json.Number.

Test plan

  • go test ./internal/impl/iceberg/... ./internal/impl/confluent/... ./internal/impl/parquet/... — 12 packages, all green.
  • task lint0 issues.
  • Integration tests under -tags integration against testcontainers (MinIO + iceberg-rest-fixture) — green.
  • Rebased on current origin/main (afff20cc5).

twmb and others added 11 commits May 14, 2026 11:01
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a record field declares a union as its type, ecsAvroFromAnyMap
previously returned without consulting applyAvroLogicalType on the
field-level object. This silently dropped sibling-of-type annotations,
which is the Java/JDBC Avro idiom -- a nullable timestamp written as
{"type": ["null", "long"], "logicalType": "timestamp-millis"} rather
than nesting the annotation inside the union element.

The value-side decoder honours both idioms, so the schema-metadata side
must agree; otherwise the resulting Common reports the base primitive
and downstream consumers (e.g. the iceberg output) pick column types
that mismatch the decoded values, producing the "field <name>: expected
number value, got timestamp" shredder error against a BIGINT column.

The unit-test file covers every logical type (timestamp-{millis,micros},
local-timestamp-{millis,micros}, date, time-{millis,micros}, uuid,
decimal) under both nested-in-type and sibling-of-type idioms. An
encode/decode round-trip exercises the symmetric path through
commonToAvroNode to confirm the encoder remains consistent.

The iceberg integration test asserts that, given the @Schema shape
that schema_registry_decode now produces, the iceberg router
auto-creates a timestamp column rather than a BIGINT, and that both
time.Time and raw int64 values round-trip to the correct calendar
date via the shredder's metadata-driven unit scaling.
Picks up two upstream landings from the rolling-fix work:

  1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our
     own metadata parser already handles. Pulling it in means the
     value-side decoder now produces time.Time for sibling-form
     timestamp-millis (and the rest of the matrix) natively, instead
     of returning int64 and relying on the iceberg shredder's
     metadata-driven numeric scaling bridge to reconcile.

  2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and
     spec-compliance pass. Includes "decimal precision/scale, spec
     form" which changes how decimal-typed values serialise under
     EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as
     the codepoint-mapped string "!" rather than the numeric 0.33,
     matching Java's JsonEncoder output.

The shredder coerce bridge in iceberg/shredder/temporal.go stays —
it's now a safety net rather than load-bearing infrastructure. The
metadata-side fix in confluent/ecs_avro.go also stays because it
parses schemas into schema.Common independently of twmb (the iceberg
output's schema_metadata path uses Common, not twmb's schemaNode).

Coverage:
  - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the
    upstream PR #38 behaviour by asserting that sibling-form schemas
    decode to time.Time end-to-end. If twmb ever regresses on this,
    the test surfaces it in the package that depends on the contract.

  - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson:
    pos_0_33333333 default-mode expectation updated from `0.33` to
    `"!"` per the spec form. Preserved-mode expectation unchanged —
    our preserveLogicalTypeOpts decimal CustomType still produces
    json.Number, which the SetStructuredMut path preserves through
    Go's json.Marshal.

CHANGELOG: a "Changed (potentially breaking)" entry documents the
decimal serialisation shape change for default-mode users and points
at preserve_logical_types: true as the migration knob.
Addresses review comment #9 on PR #4402: a downstream mapping that drops
schema_metadata between the schema-registry decoder and the iceberg sink
would silently reintroduce the year-50000 corruption that the rest of
the PR closes — the type-resolver picks TIMESTAMPTZ for the column based
on metadata seen at table-creation time, but per-message metadata is
what the shredder needs to interpret each numeric value's unit.

When schema_evolution.require_schema_metadata is true (default false),
the shredder rejects numeric inputs into time-typed columns when no
schema.Common has been registered for that field. time.Time /
time.Duration native inputs are unaffected — they carry their own unit
unambiguously. Non-time columns are unaffected.

The flag is gated to require schema_metadata also be set; setting strict
mode without configuring metadata at all is a config error caught at
startup.

Plumbing:
- config.go: new field with operator-facing description.
- output_iceberg.go: parse and validate the require/has-metadata pair.
- router.go: add to SchemaEvolutionConfig and pass to writer.
- writer.go: extend NewWriter signature; flip shredder strict mode when
  the flag is set.
- shredder/shredder.go: new SetStrictTemporalMode(bool) and a
  strictTemporal field; thread through convertLeafValue.
- shredder/temporal.go: convertDate / convertTime / convertTimestamp
  return field-naming errors instead of falling through when strict
  mode is on and metadata is absent.

Tests cover (a) numeric without metadata under strict mode is rejected
with a require_schema_metadata=true error message, (b) native time.Time
and time.Duration inputs are unaffected by strict mode, (c) numeric with
metadata under strict mode succeeds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a pipeline previously produced int64-millisecond values into a BIGINT
column (the pre-fix-#4399 metadata bug), an operator who upgrades
schema_registry_decode to faithfully report Timestamp metadata now finds
time.Time values reaching the shredder against the same BIGINT column. The
shredder's Int64Type / Int32Type arms hand the temporal value to
bloblang.ValueAsInt64 which fails with "expected number value, got
timestamp" -- breaking every rolling upgrade against a table that pre-dates
the fix.

The symmetric numeric->temporal bridge already exists in temporal.go:208
for forward-compatibility; this change adds the reverse temporal->numeric
direction so a rolling upgrade can simply work without dropping any
affected table. The wire-equivalent integer is computed per the schema's
declared Unit (UnixMilli/Micro/Nano for Timestamp, days-since-epoch for
Date, micros/millis/etc for TimeOfDay), preserving exactly what the
operator's pre-fix pipeline had been storing.

The writer emits one INFO-level log per (writer, fieldID) the first time
it observes a metadata-vs-column type divergence, so operators can see
which columns are silently coercing on write and choose to rebuild the
affected tables when they want native temporal columns.

Coverage:
  - shredder/temporal.go: new coerceTemporalToNumeric helper.
  - shredder/shredder.go: Int32Type / Int64Type arms call it before the
    bloblang fallback. Non-temporal values continue to flow unchanged.
  - shredder/temporal_test.go: unit tests for each temporal/Unit/column
    combination, plus no-op cases (plain int, no metadata, mismatch).
  - writer.go: logCoerceDecisions detects divergences at shredder setup
    time and logs once per (writer, field).
  - integration/schema_metadata_timestamp_test.go: end-to-end test
    pre-creates a table with id:STRING, ts:BIGINT, sends a message with a
    time.Time value plus Timestamp(Millis) metadata, asserts the column
    remains BIGINT and the row stores the millisecond integer.

Future work (Phase 1d): require_schema_metadata=true should turn a coerce
situation into a hard error rather than coercing silently. Not in scope
here; the INFO log is the interim signal.
…_types

The original #4399 fix promoted every Avro logical type (timestamp-*,
local-timestamp-*, date, time-*, uuid) to its semantic schema.Common type
in the metadata produced by the schema-registry decoder. Defaulting that
behaviour on shipped a real breaking change: pipelines whose downstream
iceberg tables had BIGINT / INT / STRING columns from the pre-fix bug
suddenly wanted to evolve those columns to TIMESTAMP / DATE / UUID, which
iceberg refuses (BIGINT->TIMESTAMP is not an allowed alter).

This commit makes the metadata-side preservation contingent on
preserve_logical_types: the same flag that controls whether values
surface as time.Time / uuid.UUID on the message body. The two sides now
move in lock-step:

  - preserve_logical_types: false (default): value side emits raw
    long/int/string from twmb/avro; metadata side keeps the base
    primitive. Identical to pre-#4399 behaviour byte-for-byte.
  - preserve_logical_types: true: value side emits the rich Go types;
    metadata side surfaces TIMESTAMP / DATE / TIME_OF_DAY / UUID with
    the correct unit / adjust-to-utc parameters. This is the shape the
    iceberg output needs to auto-create correctly-typed columns.

Decimal stays unconditional. It pre-dates the preserve_logical_types
contract and is load-bearing for normaliseAvroDecimals on the value
side, which keys on Common.Type == Decimal regardless of any flag.
Gating it would silently regress decimal handling for pipelines that
have never opted into preserve_logical_types. The field comment on
ecsAvroConfig.preserveLogicalTypes documents the carve-out.

Strict mode (require_schema_metadata=true) is also tightened in the
same commit: the temporal-to-numeric coerce bridge in shredder/
shredder.go's Int32Type / Int64Type arms now returns a hard error in
strict mode instead of silently converting, and the writer's
logCoerceDecisions emits a stronger notice ("writes will be rejected")
when strict mode is enabled. Together with the coerce path itself,
operators have three rolling-upgrade options:

  - leave preserve_logical_types alone: zero change.
  - flip preserve_logical_types: true on a fresh table: native timestamp
    columns from auto-create.
  - flip preserve_logical_types: true against pre-existing BIGINT
    tables: the coerce-on-write path stores wire-equivalent millis in
    the existing columns until you choose to rebuild, with an INFO log
    naming each divergent column.
  - additionally flip require_schema_metadata: true: same as above but
    type disagreements between metadata and existing columns become a
    hard write error, so operators who want loud failure on schema
    drift get it.

Coverage:
  - ecs_avro_test.go / common_to_avro_test.go: existing logical-type
    tests pass preserveLogicalTypes: true explicitly.
  - ecs_avro_field_level_logical_type_test.go: new
    TestEcsAvroFieldLevelLogicalType_LegacyShapeWithoutFlag matrix
    asserts the pre-#4399 shape (Int64+optional with nil Logical) is
    preserved exactly under preserveLogicalTypes: false, for every
    logical type the gate covers. Decimal stays preserved in both
    modes.
  - shredder/temporal_test.go: new TestCoerceTemporalRejectedInStrictMode
    confirms strict mode disables the coerce path and returns an error
    naming the require_schema_metadata flag.
  - iceberg/integration/schema_metadata_timestamp_test.go: new
    TestIntegrationStrictModeRejectsCoerceOnExistingBigintColumn drives
    a full Route() against a pre-existing BIGINT table under strict
    mode and asserts the write errors with require_schema_metadata=true
    in the message.

CHANGELOG: the BREAKING entry has been reframed. The default path is
unchanged for every user who has not opted into preserve_logical_types:
true. The Added section enumerates the new shredder behaviours (
schema-aware numeric scaling, temporal->numeric coerce bridge with
INFO log, require_schema_metadata strict mode).
…data

Direct analogue of the field-level logicalType fix. The schema_registry_
decode processor's `translate_kafka_connect_types: true` flag has long
caused the value side to recognise Debezium temporal annotations
(io.debezium.time.{Date, Year, Timestamp, Time, MicroTimestamp,
MicroTime, NanoTimestamp, NanoTime, ZonedTimestamp}) via the
kafkaConnectTypeOpt CustomType registration in avro_walker.go and emit
time.Time values. The metadata side however ignored the connect.name
annotation entirely, so @Schema continued to claim INT64/INT32/STRING.

The result for a Debezium-sourced iceberg pipeline running with both
`translate_kafka_connect_types: true` and `schema_metadata: schema`:
twmb emits time.Time for the field, ecsAvroParseFromBytes builds
@Schema with the base primitive, iceberg auto-creates a BIGINT column,
and the shredder rejects the typed value with the same "expected
number value, got timestamp" symptom that surfaced this whole work
stream for the customer's sibling-form Avro case.

Changes:

  - Add `translateKafkaConnectTypes` to ecsAvroConfig and plumb it
    through serde_avro.go's metadata-storage path. Mirrors the
    `preserveLogicalTypes` plumbing pattern from Phase 1a.

  - Add applyKafkaConnectType(cfg, c, as) next to applyAvroLogicalType.
    Reads `connect.name` from the parsed aobject's properties and maps
    to the matching schema.Common type:
      - Date         → schema.Date
      - Year         → schema.Date  (value side returns time.Time at
                                     Jan 1; days-since-epoch round-
                                     trips faithfully through DATE)
      - Timestamp,   → schema.Timestamp(Millis, AdjustToUTC)
        Time
      - MicroTimestamp,
        MicroTime    → schema.Timestamp(Micros, AdjustToUTC)
      - NanoTimestamp,
        NanoTime     → schema.Timestamp(Nanos, AdjustToUTC)
      - ZonedTimestamp → schema.Timestamp(Millis, AdjustToUTC)

  - Call applyKafkaConnectType from the same two sites
    applyAvroLogicalType is already called from — the bottom of
    ecsAvroFromAnyMap (primitive case) and the union case (after
    ecsAvroHydrateRawUnion). Both flat and Debezium-nested forms
    therefore get the annotation lifted into the parsed schema.Common.

Time and Timestamp are intentionally both mapped to schema.Timestamp
despite Time being semantically a time-of-day. The value side returns
time.Time for both (kafkaConnectTypeOpt's case "Timestamp", "Time"),
so the metadata side matches that shape to keep the value/metadata
contract symmetric. Operators who need a distinct TIME column can
override via the iceberg output's new_column_type_mapping.

Also closes the audit-found duration gap. Avro `duration` (fixed[12]
with logicalType: duration) is decoded by the value side to an ISO
8601 string under preserve_logical_types. The metadata side now maps
it to schema.String, matching that output and giving iceberg a
VARCHAR column instead of BINARY. Same gate as the rest of
applyAvroLogicalType: preserve_logical_types must be true.

Coverage:

  - TestEcsAvroKafkaConnectTypes: matrix of all 9 Debezium types
    under both translate-on (asserts the lift fires) and translate-off
    (asserts the parser stays agnostic — same as the value side
    under that mode).

  - TestEcsAvroDurationLogicalType: preserve-on / preserve-off
    matrix for the duration logical type.

Audit notes:

  Cross-referenced every CustomType registration in
  preserveLogicalTypeOpts() and kafkaConnectTypeOpt() against the
  metadata side. After this commit, every annotation that the value
  side translates has a matching metadata-side mapping. The only
  remaining theoretical gap — annotations the value side doesn't
  translate either (e.g. io.debezium.data.{Uuid, Json, Enum}) — is
  internally consistent (both sides surface the base primitive) and
  doesn't trigger the value/metadata-mismatch bug class.
…code

parquet_encode was the largest remaining hole in the schema.Common
consumer audit. A pipeline using the same shape that took GF down (CDC
source → schema_registry_decode with preserve_logical_types: true →
@Schema metadata → parquet sink) hit two distinct walls today:

  1. Type coverage. parquetNodeFromCommonField rejected Date, TimeOfDay,
     UUID, Map, Union, and Null with a hostile "unsupported by this
     processor" error.

  2. Temporal value coercion. encodingCoercionVisitor required
     RFC3339-formatted strings for any TIMESTAMP column, with the
     error message literally telling the operator to upstream-convert.
     time.Time values flowing from twmb/avro went unrecognised.

This commit ports the same approach the iceberg shredder uses (Phase
1b/1c, coerceTemporalToNumeric in iceberg/shredder/temporal.go) into
the parquet encoder:

  - parquetNodeFromCommonField now handles Date → parquet.Date(),
    TimeOfDay → parquet.Time(unit) with millis/micros/nanos
    interpretation from the Common's Logical.TimeOfDay.Unit, UUID →
    parquet.UUID(), and Map → parquet.Map(String(), value-node).

  - Null and Union remain errors because parquet has no faithful
    representation. The error messages now name the constraint and
    point at the upstream-coercion remedy, rather than producing the
    generic "not supported by this processor" wording.

  - encodingCoercionVisitor grows three new bridges:
      - coerceTimestampForEncode: accepts time.Time (scaled to the
        column's declared unit), numeric (pre-scaled passthrough), and
        the historical RFC3339-string path.
      - coerceDateForEncode: accepts time.Time (UTC-floored to days
        since epoch), date string (RFC3339 or bare YYYY-MM-DD), and
        numeric.
      - coerceTimeForEncode: accepts time.Duration, time.Time
        (wall-clock portion only), and numeric. Returns int32 for the
        millis unit, int64 otherwise — matching parquet's physical
        representation for TIME columns.

Coverage:

  - TestParquetNodeFromCommonField_NewTypes: every new variant
    (Date, TimeOfDay millis/micros, UUID, Map) plus the loud-error
    cases (Union, Null).

  - TestEncodingCoercionVisitor_Temporal: round-trip-style assertions
    for time.Time/time.Duration/numeric/string inputs into
    TIMESTAMP(millis), TIMESTAMP(micros), DATE, TIME(millis),
    TIME(micros). Includes the "unsupported type errors clearly"
    case so the diagnostic stays useful.

No existing tests required changes.
Running the common-schema-audit skill against the existing iceberg
output surfaced a gap I'd missed: commonTypeToIcebergType handled 16
of 18 schema.CommonType variants but fell through to the generic
"unsupported common schema type" error for Map and Union.

A pipeline whose @Schema declares a record field as `Map<string, V>`
or as a Union — both natural in Avro — would error at table-create /
column-add time with a message that doesn't tell the operator what to
do. parquet_encode and JSON Schema both got matching coverage in
earlier commits on this branch; iceberg was the actual originator of
the consumer pattern but had the same hole.

Map maps to iceberg.MapType<String, ValueType>, mirroring the Avro /
parquet conventions where map keys are always strings. The single
schema.Common child describes the value type; we recurse through
commonTypeToIcebergTypeRec to resolve it, allocating a fresh
KeyID/ValueID via the shared type inferrer to keep field IDs unique
across the table schema.

Union is left as an explicit loud error because iceberg has no native
union type — the same decision parquet_encode landed on. The message
names the constraint and points at the upstream remediation
("flatten to a single branch, typically by projecting to the
non-null variant") rather than the previous generic "unsupported".

The shredder already had MapType handling at internal/impl/iceberg/
shredder/shredder.go:262, and the writer's buildShredderFieldCommons
already descended into MapType to register value-leaf metadata at
internal/impl/iceberg/writer.go (the recordOrRecurseIcebergField
helper). So no value-side changes needed; the schema/value paths
were just waiting for the type_resolver to start producing MapType
columns.

Coverage:

  - TestCommonTypeToIcebergType_MapAndUnion pins the new behaviour:
    Map of String->Int64, Map of String->Timestamp (validates the
    value-type recursion handles logically-typed values), the wrong-
    child-arity error path, and the Union loud-error case asserting
    the error message includes the "flatten" remediation pointer.

No production behaviour change for pipelines that did not exercise
Map or Union in their schema metadata; the previous error path is
replaced by a successful conversion for Map and a more helpful error
for Union.
The smaller half of the schema.Common consumer-coverage audit:
common_to_json_schema.go rejected schema.Date, schema.TimeOfDay, and
schema.UUID with the generic "unsupported schema type" error, even
though JSON Schema draft 2019-09 has well-defined `format` keywords
for each.

Maps:
  - Date      → {"type":"string","format":"date"}
  - TimeOfDay → {"type":"string","format":"time"}
  - UUID      → {"type":"string","format":"uuid"}

Symmetric with the iceberg / parquet_encode fixes earlier on this
branch: every consumer of schema.Common now handles every variant
the type system defines, so a producer schema declaring any of these
three types can be encoded as JSON Schema without an upstream
projection step.

Coverage:

  - TestCommonToJSONSchemaDateTimeUUID asserts each format keyword
    end-to-end.

No production behaviour change for pipelines that did not exercise
these types; the previous error path is replaced by a successful
encoding.
The schema.Common metadata format flows between source-side producers
(schema_registry_decode, parquet_decode, CDC sources) and downstream
sinks (iceberg, parquet_encode, schema_registry_encode). Every consumer
of this metadata must (a) handle every variant of schema.CommonType and
(b) coerce values when the message body's Go type doesn't match the
schema-declared type — the same bug class that caused the GF iceberg
issue and its cascade across confluent / parquet / JSON Schema fixes
earlier on this branch.

A new consumer can re-introduce the gap without anyone noticing until
a customer pipeline breaks. The new audit skill catches the drift
mechanically:

  - Enumerates the schema.CommonType variant universe from the live
    benthos source, so new variants surface as needed-but-missing rows
    rather than silent omissions.

  - Finds every consumer via `schema.ParseFromAny` and direct type
    switches, filtering out the CDC schema producers (intentionally
    out of scope).

  - Per-consumer Explore-agent brief covers both the type-coverage
    matrix and the temporal-coercion bridges that distinguish a
    rigorous consumer (iceberg post-Phase-1b/1c) from a fragile one.

  - Produces a Markdown matrix plus a JSON variant for CI consumption.

  - References the prior fix commits on this branch as templates:
    bfcad32 (parquet type-coverage + coercion), e2b2c86
    (coerceTemporalToNumeric), 8120621 (metadata-aware scaling),
    8e80cb4 (JSON Schema format keywords).

Read-only skill — it produces the report so a human can prioritise
fixes, never commits changes itself.

Lives at .claude/skills/common-schema-audit/SKILL.md alongside the
existing review skill, with matching frontmatter shape
(disable-model-invocation: true; tool allowlist scoped to the
read-only subset Bash/Read/Glob/Grep/Task).

Also adds a "adding new consumers" section listing the three
contract obligations a new consumer must meet, with citations to
the reference implementations.
Comment on lines +154 to +155
case float64:
return int64(v), nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The float64 arms in coerceTimestampForEncode (here), coerceDateForEncode (line 191-192), and coerceTimeForEncode (line 238-239) cast NaN / ±Inf directly via int64(v) / int32(v), which is implementation-defined garbage in Go — typically 0, i.e. silent year-1970 corruption. The iceberg-side analogues added in this same PR explicitly guard against this in convertTimestamp, convertDate, convertTime, and numericInt64 (see internal/impl/iceberg/shredder/temporal.go#L2479-L2483 and the TestTemporalRejectsNaNInf regression). Please mirror the math.IsNaN(v) || math.IsInf(v, 0) guard here so the two sibling pipelines fail loudly in the same shape rather than producing 1970-stamped rows.

@claude
Copy link
Copy Markdown

claude Bot commented May 14, 2026

Commits
LGTM

Review
Large, well-staged PR (11 commits) that closes the schema.Common consumer-drift bug class end-to-end across confluent / iceberg / parquet / JSON Schema, with strong test coverage and clear commit narrative. One real concern flagged inline.

  1. NaN / ±Inf guards are missing on the float64 arms of the new coerceTimestampForEncode / coerceDateForEncode / coerceTimeForEncode helpers in internal/impl/parquet/schema_coercion.go#L154-L155, while the iceberg-side analogues added in the same PR explicitly guard against this and a regression test (TestTemporalRejectsNaNInf) pins it. Asymmetric defense for the same bug class — please propagate the guard.

Jeffail added 2 commits May 14, 2026 17:21
…se attempts

Mirrors the iceberg shredder defense for the same bug class: the
float64 arms of coerceTimestampForEncode / coerceDateForEncode /
coerceTimeForEncode now reject NaN and +/-Inf rather than int-casting
them to implementation-defined garbage that would silently land as
year 1970 (or worse) in the column.

coerceDateForEncode string-parse error path now reports both the
RFC3339 and YYYY-MM-DD attempts instead of only the RFC3339 one. A
malformed bare date like "2024-13-99" was previously surfaced with an
RFC3339-only error that misleadingly suggested adding a time
component.

Adds TestEncodingCoercionVisitor_RejectsNaNInf (parallel to the
iceberg shredder TestTemporalRejectsNaNInf) and
TestCoerceDateForEncode_StringErrorSurfacesBothAttempts.
When applyAvroLogicalType promotes a `fixed`-backed Avro decimal to
schema.Decimal, the fact that the source was fixed[N] rather than
bytes is discarded — schema.Common has no field to retain it. This is
lossless for every downstream consumer of schema.Common (iceberg,
parquet, JSON Schema, value-side normaliseAvroDecimals), all of which
pick their own physical encoding from precision/scale. The one place
it leaks is common_to_avro.go re-emit, which will always produce a
bytes-backed decimal even when the source schema was fixed-backed:
semantically equivalent but a different Avro schema shape, which can
matter for Schema Registry compatibility checks that compare by
equality.

Comment-only; the trade-off was already implicit but unwritten, so
future readers tracing the path do not have to rederive it.
@claude
Copy link
Copy Markdown

claude Bot commented May 14, 2026

Commits
LGTM

Review
This is a thorough cascade fix for the schema.Common value-vs-metadata mismatch class. Each commit reads as a self-contained story; tests cover both the value-side and metadata-side; defense-in-depth guards (NaN/Inf, strict mode, panic-on-invalid-unit) are in place; license headers match the project's existing convention.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants