Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c058fb6
confluent,iceberg: preserve avro logical types end-to-end (#4399)
twmb May 6, 2026
edadc92
confluent: honour field-level avro logicalType on union-typed fields
Jeffail May 12, 2026
11709c7
chore: bump twmb/avro to v1.7.3 with field-level logicalType lift
Jeffail May 14, 2026
ea04106
iceberg: add require_schema_metadata strict-temporal-mode option
twmb May 7, 2026
38e35ce
iceberg: coerce temporal values into existing numeric columns on write
Jeffail May 13, 2026
e482d77
confluent,iceberg: gate logical-type preservation on preserve_logical…
Jeffail May 13, 2026
04eba95
confluent: honour Kafka Connect / Debezium annotations in schema meta…
Jeffail May 14, 2026
f01964b
parquet: close type-coverage and temporal-coercion gaps in parquet_en…
Jeffail May 14, 2026
2781632
iceberg: close type-coverage gap for Map and Union in type_resolver
Jeffail May 14, 2026
147420e
confluent: close json-schema gap for Date/TimeOfDay/UUID
Jeffail May 14, 2026
ac07c1a
chore: add common-schema-audit skill to catch consumer drift
Jeffail May 14, 2026
76a3e16
parquet: guard NaN/Inf in temporal encoders and surface both DATE par…
Jeffail May 14, 2026
7db39d5
confluent: document decimal-on-Any one-way carry-through
Jeffail May 14, 2026
0c35ccf
confluent: resolve named-type references in nullable Avro unions
Jeffail May 14, 2026
7ffa4a5
confluent: resolve named-type references in lame Avro unions too
Jeffail May 14, 2026
bf35c06
confluent: harden Avro name resolution and union error wrapping
Jeffail May 18, 2026
b0edf02
parquet: floor pre-epoch DATE strings instead of truncating
Jeffail May 18, 2026
8490a38
confluent: regression test for annotated inline-primitive nullable un…
Jeffail May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions .claude/skills/common-schema-audit/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
---
name: common-schema-audit
description: Audit every consumer of the schema.Common metadata format (the format produced by schema_registry_decode's store_schema_metadata, the parquet_decode processor, and CDC sources) for type-coverage drift and value-coercion gaps. Run this whenever a new component starts consuming schema.Common, when a new schema.CommonType variant is added upstream in benthos, or as a periodic maintenance check.
argument-hint: "[--format=md|json] [--component=<name>]"
disable-model-invocation: true
allowed-tools: Bash(go *), Bash(grep *), Bash(find *), Read, Glob, Grep, Task
---

# Common-schema consumer drift audit

`schema.Common` (from `github.com/redpanda-data/benthos/v4/public/schema`) is the canonical type metadata that flows through `meta(schema)` between Avro / Parquet / CDC sources and downstream sinks. Every consumer of this metadata must:

1. **Handle every variant of `schema.CommonType`** — or fail loudly with a useful error that names the missing case, not a generic "unsupported".
2. **Coerce values when the Go type of the message body doesn't match the schema-declared type** — specifically the temporal-to-numeric and numeric-to-temporal bridges that the iceberg shredder implements via `coerceTemporalToNumeric` and the metadata-aware path in `internal/impl/iceberg/shredder/temporal.go:208`.

This skill produces a per-consumer report so reviewers can catch drift before it ships.

## Why this matters

The "GF iceberg issue" was a value-vs-metadata mismatch class. Fixes closed the gap in each consumer:

- `iceberg` output → temporal coerce + numeric metadata-aware scaling
- `parquet_encode` → type coverage for Date/TimeOfDay/UUID/Map, temporal coerce bridges
- `confluent` decoder / metadata parser → field-level logicalType, Debezium connect.name, duration
- `confluent` JSON-Schema encoder → Date/TimeOfDay/UUID

A new consumer of `schema.Common`, or a new `schema.CommonType` variant added upstream in benthos, can re-introduce the same bug class without anyone noticing until a customer pipeline breaks. The audit catches the drift mechanically.

## Workflow

1. **Enumerate the type universe.** Read every `schema.CommonType` constant from the benthos source — the authoritative list of variants every consumer must consider.

```bash
gopath=$(go env GOMODCACHE)
benthos_dir=$(ls -d $gopath/github.com/redpanda-data/benthos/v4@*/ | tail -1)
grep -E '^\s*(Boolean|Int32|Int64|Float32|Float64|String|ByteArray|Object|Map|Array|Null|Union|Timestamp|Date|TimeOfDay|UUID|Decimal|BigDecimal|Any)\s+CommonType' "$benthos_dir/public/schema/common.go"
```

Cross-check against the current set (as of the GF issue):
`Boolean, Int32, Int64, Float32, Float64, String, ByteArray, Object, Map, Array, Null, Union, Timestamp, Date, TimeOfDay, UUID, Decimal, BigDecimal, Any`.

If new variants appear in benthos that aren't in this list, every consumer below will silently need an additional case — flag it loudly and update the skill's audit list.

2. **Find every consumer.** A "consumer" of `schema.Common` is a code path that reads parsed schema metadata and uses it to drive downstream type decisions. The reliable signal is a `schema.ParseFromAny(...)` call, plus any direct `schema.Common` type switches in encoding/coercion paths.

```bash
grep -rln 'schema\.ParseFromAny\|case schema\.\(Boolean\|Int32\|Int64\|Float32\|Float64\|String\|ByteArray\|Object\|Map\|Array\|Null\|Union\|Timestamp\|Date\|TimeOfDay\|UUID\|Decimal\|BigDecimal\|Any\)\b' internal/impl/ | grep -v _test
```

Producers (CDC schema builders in `mysql/`, `oracledb/`, `postgresql/`, `mongodb/cdc/`, `mssqlserver/`) are *not* consumers in this sense — they construct `schema.Common` from a source database's metadata; the type-coverage question doesn't apply. Filter those out.

3. **Per-consumer audit.** For each consumer, delegate to the Explore agent with the brief below. Run consumers in parallel.

```text
Working dir: <connect repo>

Audit the consumer at <file>:<function> against the full schema.CommonType variant set:
Boolean, Int32, Int64, Float32, Float64, String, ByteArray, Object, Map, Array,
Null, Union, Timestamp, Date, TimeOfDay, UUID, Decimal, BigDecimal, Any.

Report:
(a) Type-coverage table: for each variant, which target type the consumer maps to (or whether it errors). Cite file:line.
(b) Value-coercion handling: when a message value's Go type doesn't match the
schema-declared type, does the consumer coerce or fail loudly? Specifically
check these cross-type cases:
- time.Time value + schema-declared Timestamp + integer-typed target column
- time.Duration value + schema-declared TimeOfDay + integer-typed target column
- Numeric int64 value + schema-declared Timestamp + integer-typed target column (unit-aware scaling)
- Numeric int32 value + schema-declared Date + integer-typed target column
Cite the coercion function and its location.
(c) Verdict: COVERED | PARTIAL | GAP, with one-line justification.

Reference implementations to compare against:
- iceberg shredder's coerceTemporalToNumeric in internal/impl/iceberg/shredder/temporal.go
- iceberg shredder's metadata-aware numeric scaling at temporal.go:208 onwards
- iceberg type_resolver's commonTypeToIcebergTypeRec in internal/impl/iceberg/type_resolver.go

Under 300 words per consumer.
```

4. **Aggregate.** Combine the per-consumer reports into a single matrix:

```
| Consumer | Missing types | Missing coercions | Verdict |
|---|---|---|---|
| iceberg | (none) | (none) | COVERED |
| parquet_encode | … | … | … |
```

5. **Recommend.** For each GAP / PARTIAL row, propose the fix shape (port from iceberg, add cases to switch, etc.). Reference implementations to mirror, by file path so the pointers stay valid as the codebase evolves:

- Type coverage extension pattern: `internal/impl/parquet/processor_encode.go::parquetNodeFromCommonField` and `internal/impl/iceberg/type_resolver.go::commonTypeToIcebergTypeRec` — both have a case for every `schema.CommonType` variant with explicit loud-error arms for shapes the sink cannot express.
- Temporal-to-numeric coerce: `internal/impl/iceberg/shredder/temporal.go::coerceTemporalToNumeric` — the `time.Time → unit-scaled int64` helper used when the iceberg column is integer-typed but the schema metadata says Timestamp.
- Numeric-to-temporal scaling: `internal/impl/iceberg/shredder/temporal.go::convertTimestamp` (the `if n, ok := numericInt64(value); ok && common != nil && common.Type == schema.Timestamp` branch) — the metadata-aware unit interpretation for numeric values flowing into time-typed columns.
- JSON Schema format mapping: `internal/impl/confluent/common_to_json_schema.go::commonToJSONSchemaNode` — the `schema.Date → {format:"date"}` / `TimeOfDay → time` / `UUID → uuid` cases.

## Output format

By default, produce a Markdown report on stdout with these sections, in order:

1. **Variant universe** — the full list of `schema.CommonType` values found, plus a delta vs the canonical list (above) so reviewers spot when benthos adds new variants.
2. **Consumer matrix** — one row per consumer, columns as above.
3. **Detailed findings** — per-consumer block with the Explore agent's report verbatim.
4. **Recommendations** — ranked by impact (a sink that customers actually use comes ahead of an internal-only path).

If `--format=json` is passed, emit a structured JSON document with the same sections; useful for CI.

If `--component=<name>` is passed, audit only that one consumer (matched by directory name under `internal/impl/`).

## Adding new consumers

When adding a new consumer of `schema.Common`:

1. Either add a case for every variant in your type switch, OR explicitly error on unsupported with a message that names which variant and points at the upstream coercion that would close the gap.
2. If your consumer accepts user-provided values, implement the temporal-to-numeric coercion bridge analogous to `coerceTemporalToNumeric`. The customer is going to flip `preserve_logical_types: true` and start sending `time.Time` values; without the bridge you'll crash on shred/encode time.
3. Add an integration test analogous to `internal/impl/iceberg/integration/schema_metadata_timestamp_test.go::TestIntegrationCoerceTemporalIntoExistingBigintColumn` that pre-creates the target with a numeric column type, sends a typed value through, and asserts the coerce path fires correctly.

## Notes

- Producers of `schema.Common` (CDC schema builders) are intentionally out of scope. Their type-mapping coverage is a separate question and varies per source database.
- This skill is read-only. It must not write code or commit changes — its job is to produce the report so a human can prioritise fixes.
- If a consumer's type switch is implemented across multiple files (e.g. iceberg has the switch in `type_resolver.go` plus value handling in `shredder/`), evaluate the consumer as a whole.
- When in doubt, run the existing test suites for the suspected consumer (`go test ./internal/impl/<consumer>/...`) to see what's actually exercised. Coverage gaps in production code rarely have corresponding test coverage.
10 changes: 10 additions & 0 deletions docs/modules/components/pages/outputs/iceberg.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ output:
table_location: s3://my-iceberg-bucket/ # No default (optional)
schema_metadata: ""
new_column_type_mapping: "" # No default (optional)
require_schema_metadata: false
commit:
manifest_merge_enabled: true
max_snapshot_age: 24h
Expand Down Expand Up @@ -892,6 +893,15 @@ An optional Bloblang mapping to customize column types during schema evolution.
*Type*: `string`


=== `schema_evolution.require_schema_metadata`

When `true`, writing a numeric value into a `timestamp`, `timestamptz`, `date`, or `time` column without `schema_metadata` registered for that column is a hard error. The default `false` permits a fallback path that interprets bare numeric timestamps as Unix seconds and bare numeric times as already-microseconds — convenient, but silently wrong if upstream produced milliseconds. Enable this when you cannot guarantee the upstream attaches schema metadata and want to fail loudly rather than corrupt dates by ~50,000 years. No effect on time-typed columns receiving `time.Time`/`time.Duration` Go values, which carry their own unit unambiguously, and no effect on non-time columns. Requires `schema_metadata` to be set.


*Type*: `bool`

*Default*: `false`

=== `commit`

Commit behavior configuration.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ require (
github.com/timeplus-io/proton-go-driver/v2 v2.1.4
github.com/tmc/langchaingo v0.1.14
github.com/trinodb/trino-go-client v0.333.0
github.com/twmb/avro v1.7.2
github.com/twmb/avro v1.7.3-0.20260513193503-1e5c2a3fc070
github.com/twmb/franz-go v1.20.7
github.com/twmb/franz-go/pkg/kadm v1.17.2
github.com/twmb/franz-go/pkg/kmsg v1.12.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1748,8 +1748,8 @@ github.com/trivago/grok v1.0.0/go.mod h1:9t59xLInhrncYq9a3J7488NgiBZi5y5yC7bss+w
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twmb/avro v1.7.2 h1:cmrEBRSbELRqsg/dRkQvVWuOaR2EfGifHIt/2iJ9lfI=
github.com/twmb/avro v1.7.2/go.mod h1:X0fT1dY2xcbV4YuCE4mYro+qljHl4kUF5uA/2z1rgSk=
github.com/twmb/avro v1.7.3-0.20260513193503-1e5c2a3fc070 h1:gYa95NoqeXYOMIVIe2/YcbC/l0s5q+wQ70Zo+TYQU4k=
github.com/twmb/avro v1.7.3-0.20260513193503-1e5c2a3fc070/go.mod h1:X0fT1dY2xcbV4YuCE4mYro+qljHl4kUF5uA/2z1rgSk=
github.com/twmb/franz-go v1.20.7 h1:P4MGSXJjjAPP3NRGPCks/Lrq+j+twWMVl1qYCVgNmWY=
github.com/twmb/franz-go v1.20.7/go.mod h1:0bRX9HZVaoueqFWhPZNi2ODnJL7DNa6mK0HeCrC2bNU=
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=
Expand Down
6 changes: 4 additions & 2 deletions internal/impl/confluent/avro_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
// decoder path:
// - time-millis/time-micros: time.Duration → time.Time (time-of-day)
// - duration: avro.Duration → ISO 8601 duration string
// - decimal: raw []byte → json.Number for the SetStructuredMut path
// (Go's json.Marshal cannot natively format *big.Rat as a JSON
// number — it emits "33/100"-style fractional strings — so we
// pre-convert to json.Number for downstream bloblang/SQL use).
func preserveLogicalTypeOpts() []avro.SchemaOpt {
return []avro.SchemaOpt{
avro.NewCustomType[time.Time, int32](
Expand Down Expand Up @@ -55,8 +59,6 @@ func preserveLogicalTypeOpts() []avro.SchemaOpt {
return avro.DurationFromBytes(b).String(), nil
},
},
// Decimal: raw type is []byte. Convert to json.Number for the
// SetStructuredMut path (json.Marshal can't handle *big.Rat).
avro.CustomType{
LogicalType: "decimal",
Decode: func(v any, node *avro.SchemaNode) (any, error) {
Expand Down
65 changes: 63 additions & 2 deletions internal/impl/confluent/common_to_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,48 @@ func commonToAvroInner(c schema.Common, recordName, namespace string, isRoot boo
case schema.Any:
return "bytes", nil
case schema.Timestamp:
// Honour Logical.Timestamp params if present; legacy nil-Logical
// schemas fall through EffectiveTimestamp() to {Millis, UTC},
// preserving the pre-PR encoder output exactly.
p := c.EffectiveTimestamp()
base := "long"
logicalName, err := avroTimestampLogicalName(p)
if err != nil {
return nil, fmt.Errorf("timestamp field %q: %w", c.Name, err)
}
return map[string]any{
"type": base,
"logicalType": logicalName,
}, nil
case schema.Date:
return map[string]any{
"type": "long",
"logicalType": "timestamp-millis",
"type": "int",
"logicalType": "date",
}, nil
case schema.TimeOfDay:
if c.Logical == nil || c.Logical.TimeOfDay == nil {
return nil, fmt.Errorf("time-of-day field %q missing Logical.TimeOfDay parameters", c.Name)
}
// Avro time-{millis,micros} carry no zone semantics, so a
// TimeOfDay{AdjustToUTC=true} cannot be expressed faithfully.
// Reject loudly rather than silently dropping that bit.
if c.Logical.TimeOfDay.AdjustToUTC {
return nil, fmt.Errorf("time-of-day field %q has AdjustToUTC=true; Avro time-millis/time-micros have no UTC-adjust variant — coerce upstream before encoding", c.Name)
}
// Avro defines only time-millis (int) and time-micros (long); reject
// anything else loudly rather than silently downcasting.
switch c.Logical.TimeOfDay.Unit {
case schema.TimeUnitMillis:
return map[string]any{"type": "int", "logicalType": "time-millis"}, nil
case schema.TimeUnitMicros:
return map[string]any{"type": "long", "logicalType": "time-micros"}, nil
default:
return nil, fmt.Errorf("time-of-day field %q has unit %v which Avro cannot express; only MILLIS and MICROS are supported", c.Name, c.Logical.TimeOfDay.Unit)
}
case schema.UUID:
return map[string]any{
"type": "string",
"logicalType": "uuid",
}, nil
case schema.Decimal:
if c.Logical == nil || c.Logical.Decimal == nil {
Expand Down Expand Up @@ -172,6 +211,28 @@ func commonToAvroUnion(c schema.Common) (any, error) {
return variants, nil
}

// avroTimestampLogicalName picks the Avro logical-type name for a
// TimestampParams value. The mapping mirrors the reverse path in
// applyAvroLogicalType: AdjustToUTC=true picks `timestamp-*`, false picks
// `local-timestamp-*`. Avro 1.10+ supports nanos; older brokers may reject it.
func avroTimestampLogicalName(p schema.TimestampParams) (string, error) {
var unit string
switch p.Unit {
case schema.TimeUnitMillis:
unit = "millis"
case schema.TimeUnitMicros:
unit = "micros"
case schema.TimeUnitNanos:
unit = "nanos"
default:
return "", fmt.Errorf("unsupported timestamp unit %v (Avro supports only MILLIS, MICROS, NANOS)", p.Unit)
}
if p.AdjustToUTC {
return "timestamp-" + unit, nil
}
return "local-timestamp-" + unit, nil
}

// sanitizeAvroName derives a valid Avro name from an arbitrary subject string.
// Avro names must match [A-Za-z_][A-Za-z0-9_]*. Invalid characters are replaced
// with underscores and a leading digit is prefixed with an underscore.
Expand Down
Loading