confluent,iceberg,parquet: preserve avro logical types end-to-end across schema_metadata consumers#4427
confluent,iceberg,parquet: preserve avro logical types end-to-end across schema_metadata consumers#4427Jeffail wants to merge 13 commits into
Conversation
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a record field declares a union as its type, ecsAvroFromAnyMap
previously returned without consulting applyAvroLogicalType on the
field-level object. This silently dropped sibling-of-type annotations,
which is the Java/JDBC Avro idiom -- a nullable timestamp written as
{"type": ["null", "long"], "logicalType": "timestamp-millis"} rather
than nesting the annotation inside the union element.
The value-side decoder honours both idioms, so the schema-metadata side
must agree; otherwise the resulting Common reports the base primitive
and downstream consumers (e.g. the iceberg output) pick column types
that mismatch the decoded values, producing the "field <name>: expected
number value, got timestamp" shredder error against a BIGINT column.
The unit-test file covers every logical type (timestamp-{millis,micros},
local-timestamp-{millis,micros}, date, time-{millis,micros}, uuid,
decimal) under both nested-in-type and sibling-of-type idioms. An
encode/decode round-trip exercises the symmetric path through
commonToAvroNode to confirm the encoder remains consistent.
The iceberg integration test asserts that, given the @Schema shape
that schema_registry_decode now produces, the iceberg router
auto-creates a timestamp column rather than a BIGINT, and that both
time.Time and raw int64 values round-trip to the correct calendar
date via the shredder's metadata-driven unit scaling.
Picks up two upstream landings from the rolling-fix work: 1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our own metadata parser already handles. Pulling it in means the value-side decoder now produces time.Time for sibling-form timestamp-millis (and the rest of the matrix) natively, instead of returning int64 and relying on the iceberg shredder's metadata-driven numeric scaling bridge to reconcile. 2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and spec-compliance pass. Includes "decimal precision/scale, spec form" which changes how decimal-typed values serialise under EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as the codepoint-mapped string "!" rather than the numeric 0.33, matching Java's JsonEncoder output. The shredder coerce bridge in iceberg/shredder/temporal.go stays — it's now a safety net rather than load-bearing infrastructure. The metadata-side fix in confluent/ecs_avro.go also stays because it parses schemas into schema.Common independently of twmb (the iceberg output's schema_metadata path uses Common, not twmb's schemaNode). Coverage: - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the upstream PR #38 behaviour by asserting that sibling-form schemas decode to time.Time end-to-end. If twmb ever regresses on this, the test surfaces it in the package that depends on the contract. - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson: pos_0_33333333 default-mode expectation updated from `0.33` to `"!"` per the spec form. Preserved-mode expectation unchanged — our preserveLogicalTypeOpts decimal CustomType still produces json.Number, which the SetStructuredMut path preserves through Go's json.Marshal. CHANGELOG: a "Changed (potentially breaking)" entry documents the decimal serialisation shape change for default-mode users and points at preserve_logical_types: true as the migration knob.
Addresses review comment #9 on PR #4402: a downstream mapping that drops schema_metadata between the schema-registry decoder and the iceberg sink would silently reintroduce the year-50000 corruption that the rest of the PR closes — the type-resolver picks TIMESTAMPTZ for the column based on metadata seen at table-creation time, but per-message metadata is what the shredder needs to interpret each numeric value's unit. When schema_evolution.require_schema_metadata is true (default false), the shredder rejects numeric inputs into time-typed columns when no schema.Common has been registered for that field. time.Time / time.Duration native inputs are unaffected — they carry their own unit unambiguously. Non-time columns are unaffected. The flag is gated to require schema_metadata also be set; setting strict mode without configuring metadata at all is a config error caught at startup. Plumbing: - config.go: new field with operator-facing description. - output_iceberg.go: parse and validate the require/has-metadata pair. - router.go: add to SchemaEvolutionConfig and pass to writer. - writer.go: extend NewWriter signature; flip shredder strict mode when the flag is set. - shredder/shredder.go: new SetStrictTemporalMode(bool) and a strictTemporal field; thread through convertLeafValue. - shredder/temporal.go: convertDate / convertTime / convertTimestamp return field-naming errors instead of falling through when strict mode is on and metadata is absent. Tests cover (a) numeric without metadata under strict mode is rejected with a require_schema_metadata=true error message, (b) native time.Time and time.Duration inputs are unaffected by strict mode, (c) numeric with metadata under strict mode succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a pipeline previously produced int64-millisecond values into a BIGINT column (the pre-fix-#4399 metadata bug), an operator who upgrades schema_registry_decode to faithfully report Timestamp metadata now finds time.Time values reaching the shredder against the same BIGINT column. The shredder's Int64Type / Int32Type arms hand the temporal value to bloblang.ValueAsInt64 which fails with "expected number value, got timestamp" -- breaking every rolling upgrade against a table that pre-dates the fix. The symmetric numeric->temporal bridge already exists in temporal.go:208 for forward-compatibility; this change adds the reverse temporal->numeric direction so a rolling upgrade can simply work without dropping any affected table. The wire-equivalent integer is computed per the schema's declared Unit (UnixMilli/Micro/Nano for Timestamp, days-since-epoch for Date, micros/millis/etc for TimeOfDay), preserving exactly what the operator's pre-fix pipeline had been storing. The writer emits one INFO-level log per (writer, fieldID) the first time it observes a metadata-vs-column type divergence, so operators can see which columns are silently coercing on write and choose to rebuild the affected tables when they want native temporal columns. Coverage: - shredder/temporal.go: new coerceTemporalToNumeric helper. - shredder/shredder.go: Int32Type / Int64Type arms call it before the bloblang fallback. Non-temporal values continue to flow unchanged. - shredder/temporal_test.go: unit tests for each temporal/Unit/column combination, plus no-op cases (plain int, no metadata, mismatch). - writer.go: logCoerceDecisions detects divergences at shredder setup time and logs once per (writer, field). - integration/schema_metadata_timestamp_test.go: end-to-end test pre-creates a table with id:STRING, ts:BIGINT, sends a message with a time.Time value plus Timestamp(Millis) metadata, asserts the column remains BIGINT and the row stores the millisecond integer. Future work (Phase 1d): require_schema_metadata=true should turn a coerce situation into a hard error rather than coercing silently. Not in scope here; the INFO log is the interim signal.
…_types The original #4399 fix promoted every Avro logical type (timestamp-*, local-timestamp-*, date, time-*, uuid) to its semantic schema.Common type in the metadata produced by the schema-registry decoder. Defaulting that behaviour on shipped a real breaking change: pipelines whose downstream iceberg tables had BIGINT / INT / STRING columns from the pre-fix bug suddenly wanted to evolve those columns to TIMESTAMP / DATE / UUID, which iceberg refuses (BIGINT->TIMESTAMP is not an allowed alter). This commit makes the metadata-side preservation contingent on preserve_logical_types: the same flag that controls whether values surface as time.Time / uuid.UUID on the message body. The two sides now move in lock-step: - preserve_logical_types: false (default): value side emits raw long/int/string from twmb/avro; metadata side keeps the base primitive. Identical to pre-#4399 behaviour byte-for-byte. - preserve_logical_types: true: value side emits the rich Go types; metadata side surfaces TIMESTAMP / DATE / TIME_OF_DAY / UUID with the correct unit / adjust-to-utc parameters. This is the shape the iceberg output needs to auto-create correctly-typed columns. Decimal stays unconditional. It pre-dates the preserve_logical_types contract and is load-bearing for normaliseAvroDecimals on the value side, which keys on Common.Type == Decimal regardless of any flag. Gating it would silently regress decimal handling for pipelines that have never opted into preserve_logical_types. The field comment on ecsAvroConfig.preserveLogicalTypes documents the carve-out. Strict mode (require_schema_metadata=true) is also tightened in the same commit: the temporal-to-numeric coerce bridge in shredder/ shredder.go's Int32Type / Int64Type arms now returns a hard error in strict mode instead of silently converting, and the writer's logCoerceDecisions emits a stronger notice ("writes will be rejected") when strict mode is enabled. Together with the coerce path itself, operators have three rolling-upgrade options: - leave preserve_logical_types alone: zero change. - flip preserve_logical_types: true on a fresh table: native timestamp columns from auto-create. - flip preserve_logical_types: true against pre-existing BIGINT tables: the coerce-on-write path stores wire-equivalent millis in the existing columns until you choose to rebuild, with an INFO log naming each divergent column. - additionally flip require_schema_metadata: true: same as above but type disagreements between metadata and existing columns become a hard write error, so operators who want loud failure on schema drift get it. Coverage: - ecs_avro_test.go / common_to_avro_test.go: existing logical-type tests pass preserveLogicalTypes: true explicitly. - ecs_avro_field_level_logical_type_test.go: new TestEcsAvroFieldLevelLogicalType_LegacyShapeWithoutFlag matrix asserts the pre-#4399 shape (Int64+optional with nil Logical) is preserved exactly under preserveLogicalTypes: false, for every logical type the gate covers. Decimal stays preserved in both modes. - shredder/temporal_test.go: new TestCoerceTemporalRejectedInStrictMode confirms strict mode disables the coerce path and returns an error naming the require_schema_metadata flag. - iceberg/integration/schema_metadata_timestamp_test.go: new TestIntegrationStrictModeRejectsCoerceOnExistingBigintColumn drives a full Route() against a pre-existing BIGINT table under strict mode and asserts the write errors with require_schema_metadata=true in the message. CHANGELOG: the BREAKING entry has been reframed. The default path is unchanged for every user who has not opted into preserve_logical_types: true. The Added section enumerates the new shredder behaviours ( schema-aware numeric scaling, temporal->numeric coerce bridge with INFO log, require_schema_metadata strict mode).
…data
Direct analogue of the field-level logicalType fix. The schema_registry_
decode processor's `translate_kafka_connect_types: true` flag has long
caused the value side to recognise Debezium temporal annotations
(io.debezium.time.{Date, Year, Timestamp, Time, MicroTimestamp,
MicroTime, NanoTimestamp, NanoTime, ZonedTimestamp}) via the
kafkaConnectTypeOpt CustomType registration in avro_walker.go and emit
time.Time values. The metadata side however ignored the connect.name
annotation entirely, so @Schema continued to claim INT64/INT32/STRING.
The result for a Debezium-sourced iceberg pipeline running with both
`translate_kafka_connect_types: true` and `schema_metadata: schema`:
twmb emits time.Time for the field, ecsAvroParseFromBytes builds
@Schema with the base primitive, iceberg auto-creates a BIGINT column,
and the shredder rejects the typed value with the same "expected
number value, got timestamp" symptom that surfaced this whole work
stream for the customer's sibling-form Avro case.
Changes:
- Add `translateKafkaConnectTypes` to ecsAvroConfig and plumb it
through serde_avro.go's metadata-storage path. Mirrors the
`preserveLogicalTypes` plumbing pattern from Phase 1a.
- Add applyKafkaConnectType(cfg, c, as) next to applyAvroLogicalType.
Reads `connect.name` from the parsed aobject's properties and maps
to the matching schema.Common type:
- Date → schema.Date
- Year → schema.Date (value side returns time.Time at
Jan 1; days-since-epoch round-
trips faithfully through DATE)
- Timestamp, → schema.Timestamp(Millis, AdjustToUTC)
Time
- MicroTimestamp,
MicroTime → schema.Timestamp(Micros, AdjustToUTC)
- NanoTimestamp,
NanoTime → schema.Timestamp(Nanos, AdjustToUTC)
- ZonedTimestamp → schema.Timestamp(Millis, AdjustToUTC)
- Call applyKafkaConnectType from the same two sites
applyAvroLogicalType is already called from — the bottom of
ecsAvroFromAnyMap (primitive case) and the union case (after
ecsAvroHydrateRawUnion). Both flat and Debezium-nested forms
therefore get the annotation lifted into the parsed schema.Common.
Time and Timestamp are intentionally both mapped to schema.Timestamp
despite Time being semantically a time-of-day. The value side returns
time.Time for both (kafkaConnectTypeOpt's case "Timestamp", "Time"),
so the metadata side matches that shape to keep the value/metadata
contract symmetric. Operators who need a distinct TIME column can
override via the iceberg output's new_column_type_mapping.
Also closes the audit-found duration gap. Avro `duration` (fixed[12]
with logicalType: duration) is decoded by the value side to an ISO
8601 string under preserve_logical_types. The metadata side now maps
it to schema.String, matching that output and giving iceberg a
VARCHAR column instead of BINARY. Same gate as the rest of
applyAvroLogicalType: preserve_logical_types must be true.
Coverage:
- TestEcsAvroKafkaConnectTypes: matrix of all 9 Debezium types
under both translate-on (asserts the lift fires) and translate-off
(asserts the parser stays agnostic — same as the value side
under that mode).
- TestEcsAvroDurationLogicalType: preserve-on / preserve-off
matrix for the duration logical type.
Audit notes:
Cross-referenced every CustomType registration in
preserveLogicalTypeOpts() and kafkaConnectTypeOpt() against the
metadata side. After this commit, every annotation that the value
side translates has a matching metadata-side mapping. The only
remaining theoretical gap — annotations the value side doesn't
translate either (e.g. io.debezium.data.{Uuid, Json, Enum}) — is
internally consistent (both sides surface the base primitive) and
doesn't trigger the value/metadata-mismatch bug class.
…code parquet_encode was the largest remaining hole in the schema.Common consumer audit. A pipeline using the same shape that took GF down (CDC source → schema_registry_decode with preserve_logical_types: true → @Schema metadata → parquet sink) hit two distinct walls today: 1. Type coverage. parquetNodeFromCommonField rejected Date, TimeOfDay, UUID, Map, Union, and Null with a hostile "unsupported by this processor" error. 2. Temporal value coercion. encodingCoercionVisitor required RFC3339-formatted strings for any TIMESTAMP column, with the error message literally telling the operator to upstream-convert. time.Time values flowing from twmb/avro went unrecognised. This commit ports the same approach the iceberg shredder uses (Phase 1b/1c, coerceTemporalToNumeric in iceberg/shredder/temporal.go) into the parquet encoder: - parquetNodeFromCommonField now handles Date → parquet.Date(), TimeOfDay → parquet.Time(unit) with millis/micros/nanos interpretation from the Common's Logical.TimeOfDay.Unit, UUID → parquet.UUID(), and Map → parquet.Map(String(), value-node). - Null and Union remain errors because parquet has no faithful representation. The error messages now name the constraint and point at the upstream-coercion remedy, rather than producing the generic "not supported by this processor" wording. - encodingCoercionVisitor grows three new bridges: - coerceTimestampForEncode: accepts time.Time (scaled to the column's declared unit), numeric (pre-scaled passthrough), and the historical RFC3339-string path. - coerceDateForEncode: accepts time.Time (UTC-floored to days since epoch), date string (RFC3339 or bare YYYY-MM-DD), and numeric. - coerceTimeForEncode: accepts time.Duration, time.Time (wall-clock portion only), and numeric. Returns int32 for the millis unit, int64 otherwise — matching parquet's physical representation for TIME columns. Coverage: - TestParquetNodeFromCommonField_NewTypes: every new variant (Date, TimeOfDay millis/micros, UUID, Map) plus the loud-error cases (Union, Null). - TestEncodingCoercionVisitor_Temporal: round-trip-style assertions for time.Time/time.Duration/numeric/string inputs into TIMESTAMP(millis), TIMESTAMP(micros), DATE, TIME(millis), TIME(micros). Includes the "unsupported type errors clearly" case so the diagnostic stays useful. No existing tests required changes.
Running the common-schema-audit skill against the existing iceberg output surfaced a gap I'd missed: commonTypeToIcebergType handled 16 of 18 schema.CommonType variants but fell through to the generic "unsupported common schema type" error for Map and Union. A pipeline whose @Schema declares a record field as `Map<string, V>` or as a Union — both natural in Avro — would error at table-create / column-add time with a message that doesn't tell the operator what to do. parquet_encode and JSON Schema both got matching coverage in earlier commits on this branch; iceberg was the actual originator of the consumer pattern but had the same hole. Map maps to iceberg.MapType<String, ValueType>, mirroring the Avro / parquet conventions where map keys are always strings. The single schema.Common child describes the value type; we recurse through commonTypeToIcebergTypeRec to resolve it, allocating a fresh KeyID/ValueID via the shared type inferrer to keep field IDs unique across the table schema. Union is left as an explicit loud error because iceberg has no native union type — the same decision parquet_encode landed on. The message names the constraint and points at the upstream remediation ("flatten to a single branch, typically by projecting to the non-null variant") rather than the previous generic "unsupported". The shredder already had MapType handling at internal/impl/iceberg/ shredder/shredder.go:262, and the writer's buildShredderFieldCommons already descended into MapType to register value-leaf metadata at internal/impl/iceberg/writer.go (the recordOrRecurseIcebergField helper). So no value-side changes needed; the schema/value paths were just waiting for the type_resolver to start producing MapType columns. Coverage: - TestCommonTypeToIcebergType_MapAndUnion pins the new behaviour: Map of String->Int64, Map of String->Timestamp (validates the value-type recursion handles logically-typed values), the wrong- child-arity error path, and the Union loud-error case asserting the error message includes the "flatten" remediation pointer. No production behaviour change for pipelines that did not exercise Map or Union in their schema metadata; the previous error path is replaced by a successful conversion for Map and a more helpful error for Union.
The smaller half of the schema.Common consumer-coverage audit:
common_to_json_schema.go rejected schema.Date, schema.TimeOfDay, and
schema.UUID with the generic "unsupported schema type" error, even
though JSON Schema draft 2019-09 has well-defined `format` keywords
for each.
Maps:
- Date → {"type":"string","format":"date"}
- TimeOfDay → {"type":"string","format":"time"}
- UUID → {"type":"string","format":"uuid"}
Symmetric with the iceberg / parquet_encode fixes earlier on this
branch: every consumer of schema.Common now handles every variant
the type system defines, so a producer schema declaring any of these
three types can be encoded as JSON Schema without an upstream
projection step.
Coverage:
- TestCommonToJSONSchemaDateTimeUUID asserts each format keyword
end-to-end.
No production behaviour change for pipelines that did not exercise
these types; the previous error path is replaced by a successful
encoding.
The schema.Common metadata format flows between source-side producers
(schema_registry_decode, parquet_decode, CDC sources) and downstream
sinks (iceberg, parquet_encode, schema_registry_encode). Every consumer
of this metadata must (a) handle every variant of schema.CommonType and
(b) coerce values when the message body's Go type doesn't match the
schema-declared type — the same bug class that caused the GF iceberg
issue and its cascade across confluent / parquet / JSON Schema fixes
earlier on this branch.
A new consumer can re-introduce the gap without anyone noticing until
a customer pipeline breaks. The new audit skill catches the drift
mechanically:
- Enumerates the schema.CommonType variant universe from the live
benthos source, so new variants surface as needed-but-missing rows
rather than silent omissions.
- Finds every consumer via `schema.ParseFromAny` and direct type
switches, filtering out the CDC schema producers (intentionally
out of scope).
- Per-consumer Explore-agent brief covers both the type-coverage
matrix and the temporal-coercion bridges that distinguish a
rigorous consumer (iceberg post-Phase-1b/1c) from a fragile one.
- Produces a Markdown matrix plus a JSON variant for CI consumption.
- References the prior fix commits on this branch as templates:
bfcad32 (parquet type-coverage + coercion), e2b2c86
(coerceTemporalToNumeric), 8120621 (metadata-aware scaling),
8e80cb4 (JSON Schema format keywords).
Read-only skill — it produces the report so a human can prioritise
fixes, never commits changes itself.
Lives at .claude/skills/common-schema-audit/SKILL.md alongside the
existing review skill, with matching frontmatter shape
(disable-model-invocation: true; tool allowlist scoped to the
read-only subset Bash/Read/Glob/Grep/Task).
Also adds a "adding new consumers" section listing the three
contract obligations a new consumer must meet, with citations to
the reference implementations.
| case float64: | ||
| return int64(v), nil |
There was a problem hiding this comment.
The float64 arms in coerceTimestampForEncode (here), coerceDateForEncode (line 191-192), and coerceTimeForEncode (line 238-239) cast NaN / ±Inf directly via int64(v) / int32(v), which is implementation-defined garbage in Go — typically 0, i.e. silent year-1970 corruption. The iceberg-side analogues added in this same PR explicitly guard against this in convertTimestamp, convertDate, convertTime, and numericInt64 (see internal/impl/iceberg/shredder/temporal.go#L2479-L2483 and the TestTemporalRejectsNaNInf regression). Please mirror the math.IsNaN(v) || math.IsInf(v, 0) guard here so the two sibling pipelines fail loudly in the same shape rather than producing 1970-stamped rows.
|
Commits Review
|
…se attempts Mirrors the iceberg shredder defense for the same bug class: the float64 arms of coerceTimestampForEncode / coerceDateForEncode / coerceTimeForEncode now reject NaN and +/-Inf rather than int-casting them to implementation-defined garbage that would silently land as year 1970 (or worse) in the column. coerceDateForEncode string-parse error path now reports both the RFC3339 and YYYY-MM-DD attempts instead of only the RFC3339 one. A malformed bare date like "2024-13-99" was previously surfaced with an RFC3339-only error that misleadingly suggested adding a time component. Adds TestEncodingCoercionVisitor_RejectsNaNInf (parallel to the iceberg shredder TestTemporalRejectsNaNInf) and TestCoerceDateForEncode_StringErrorSurfacesBothAttempts.
When applyAvroLogicalType promotes a `fixed`-backed Avro decimal to schema.Decimal, the fact that the source was fixed[N] rather than bytes is discarded — schema.Common has no field to retain it. This is lossless for every downstream consumer of schema.Common (iceberg, parquet, JSON Schema, value-side normaliseAvroDecimals), all of which pick their own physical encoding from precision/scale. The one place it leaks is common_to_avro.go re-emit, which will always produce a bytes-backed decimal even when the source schema was fixed-backed: semantically equivalent but a different Avro schema shape, which can matter for Schema Registry compatibility checks that compare by equality. Comment-only; the trade-off was already implicit but unwritten, so future readers tracing the path do not have to rederive it.
|
Commits Review LGTM |
Summary
Closes the GF iceberg-pipeline issue —
time.Timevalues fromschema_registry_decodefailing the iceberg shredder with"expected number value, got timestamp"againstBIGINTcolumns — and cascades the same fix class through every other consumer of theschema.Commonmetadata format. Adds a Claude skill to catch the same drift in future.The underlying bug class is a value-vs-metadata mismatch: the value-side decoder honours an Avro annotation but the metadata-side parser doesn't, so downstream sinks (notably
iceberg) create the wrong column type, then fail to write the typed value into it. Four distinct annotations all triggered this shape:logicalType— the Java/JDBC Avro idiom emitted by Oracle CDC and most JDBC tooling.connect.nametemporal annotations.durationlogical type.Commit narrative
11 commits, intentionally ordered as five narrative phases:
1a68504fb7ea489942,d84edb87apreserve_logical_types)821468858,008c03bf8,0e771a9dbschema.Commonconsumers9879e3b79(Debezium),e25f69a5b(parquet),25fd6455d(iceberg Map/Union),bda5ab073(JSON Schema)69275f6c0(audit skill)Each commit is self-contained, passes its own test surface, and reads top-to-bottom as a story.
What this changes for users
Default users (
preserve_logical_types: false): zero behaviour change.Users on
preserve_logical_types: true:schema_metadatanow faithfully reportsTIMESTAMP/DATE/TIME_OF_DAY/UUIDfor fields whose Avro schemas declare those logical types (including the Java/JDBC sibling-of-type idiom).time.Timevalues into existingBIGINT/INTcolumns by scaling to the wire-equivalent integer using the schema metadata's declaredUnit— operators with affected tables can upgrade without dropping them. AnINFO-level log fires once per divergent column for traceability.require_schema_metadata: trueflips the coerce into a hard error — for operators who want loud failure on schema drift rather than silent reconciliation.Users on
translate_kafka_connect_types: true: Debeziumconnect.nametemporal annotations (io.debezium.time.Date,Timestamp,Time,MicroTimestamp,MicroTime,NanoTimestamp,NanoTime,ZonedTimestamp,Year) are now also recognised on the metadata side, matching the value-side translation.Parquet-encode users: schemas declaring
Date,TimeOfDay,UUID, orMapno longer fail with the genericunsupported by this processorerror.UnionandNullstill error but with messages that name the constraint and point at the upstream coercion remedy.time.Time/time.Durationvalues now flow through to TIMESTAMP / DATE / TIME columns without requiring upstream conversion to RFC3339 strings.Schema Registry encode (JSON Schema):
Date,TimeOfDay, andUUIDnow produce the matching JSON Schema draft 2019-09formatkeywords.Drift prevention
.claude/skills/common-schema-audit/SKILL.mdcodifies the audit workflow for everyschema.Commonconsumer. Invoke as/common-schema-auditto enumerate variants from the live benthos source, find every consumer viaschema.ParseFromAny, audit type-coverage and value-coercion per consumer in parallel, and produce a matrix with per-consumer GAP / PARTIAL / COVERED verdicts. The skill caught a real icebergMap/Uniongap on its first run, which is included as a fix in this PR.Upstream dependency bump
github.com/twmb/avrobumped tov1.7.3-0.20260513193503-1e5c2a3fc070(pseudo-version pinning the merge SHA of twmb/avro#38). The upstream PR adds the field-level logicalType lift on the value-decode side, sotime.Timeflows through natively for sibling-form schemas. A properv1.7.3tag is pending from Travis; we'll bump again to the tag once it lands.The bump also picks up twmb/avro#39's spec-compliance pass, which changed the default-mode JSON serialisation of decimal values to the spec-blessed codepoint-mapped string form (matching Java's JsonEncoder). Test expectations updated accordingly; users on
preserve_logical_types: trueare unaffected because ourdecimalCustomType still producesjson.Number.Test plan
go test ./internal/impl/iceberg/... ./internal/impl/confluent/... ./internal/impl/parquet/...— 12 packages, all green.task lint—0 issues.-tags integrationagainst testcontainers (MinIO + iceberg-rest-fixture) — green.origin/main(afff20cc5).