Skip to content

Commit 8fd40de

Browse files
committed
chore: bump twmb/avro to v1.7.3 with field-level logicalType lift
Picks up two upstream landings from the rolling-fix work: 1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our own metadata parser already handles. Pulling it in means the value-side decoder now produces time.Time for sibling-form timestamp-millis (and the rest of the matrix) natively, instead of returning int64 and relying on the iceberg shredder's metadata-driven numeric scaling bridge to reconcile. 2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and spec-compliance pass. Includes "decimal precision/scale, spec form" which changes how decimal-typed values serialise under EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as the codepoint-mapped string "!" rather than the numeric 0.33, matching Java's JsonEncoder output. The shredder coerce bridge in iceberg/shredder/temporal.go stays — it's now a safety net rather than load-bearing infrastructure. The metadata-side fix in confluent/ecs_avro.go also stays because it parses schemas into schema.Common independently of twmb (the iceberg output's schema_metadata path uses Common, not twmb's schemaNode). Coverage: - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the upstream PR #38 behaviour by asserting that sibling-form schemas decode to time.Time end-to-end. If twmb ever regresses on this, the test surfaces it in the package that depends on the contract. - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson: pos_0_33333333 default-mode expectation updated from `0.33` to `"!"` per the spec form. Preserved-mode expectation unchanged — our preserveLogicalTypeOpts decimal CustomType still produces json.Number, which the SetStructuredMut path preserves through Go's json.Marshal. CHANGELOG: a "Changed (potentially breaking)" entry documents the decimal serialisation shape change for default-mode users and points at preserve_logical_types: true as the migration knob.
1 parent ef50fd0 commit 8fd40de

6 files changed

Lines changed: 90 additions & 14 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ require (
170170
github.com/timeplus-io/proton-go-driver/v2 v2.1.4
171171
github.com/tmc/langchaingo v0.1.14
172172
github.com/trinodb/trino-go-client v0.333.0
173-
github.com/twmb/avro v1.7.2
173+
github.com/twmb/avro v1.7.3-0.20260513193503-1e5c2a3fc070
174174
github.com/twmb/franz-go v1.20.7
175175
github.com/twmb/franz-go/pkg/kadm v1.17.2
176176
github.com/twmb/franz-go/pkg/kmsg v1.12.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,8 +1748,8 @@ github.com/trivago/grok v1.0.0/go.mod h1:9t59xLInhrncYq9a3J7488NgiBZi5y5yC7bss+w
17481748
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
17491749
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
17501750
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
1751-
github.com/twmb/avro v1.7.2 h1:cmrEBRSbELRqsg/dRkQvVWuOaR2EfGifHIt/2iJ9lfI=
1752-
github.com/twmb/avro v1.7.2/go.mod h1:X0fT1dY2xcbV4YuCE4mYro+qljHl4kUF5uA/2z1rgSk=
1751+
github.com/twmb/avro v1.7.3-0.20260513193503-1e5c2a3fc070 h1:gYa95NoqeXYOMIVIe2/YcbC/l0s5q+wQ70Zo+TYQU4k=
1752+
github.com/twmb/avro v1.7.3-0.20260513193503-1e5c2a3fc070/go.mod h1:X0fT1dY2xcbV4YuCE4mYro+qljHl4kUF5uA/2z1rgSk=
17531753
github.com/twmb/franz-go v1.20.7 h1:P4MGSXJjjAPP3NRGPCks/Lrq+j+twWMVl1qYCVgNmWY=
17541754
github.com/twmb/franz-go v1.20.7/go.mod h1:0bRX9HZVaoueqFWhPZNi2ODnJL7DNa6mK0HeCrC2bNU=
17551755
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=

internal/impl/confluent/avro_walker.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import (
2828
// decoder path:
2929
// - time-millis/time-micros: time.Duration → time.Time (time-of-day)
3030
// - duration: avro.Duration → ISO 8601 duration string
31+
// - decimal: raw []byte → json.Number for the SetStructuredMut path
32+
// (Go's json.Marshal cannot natively format *big.Rat as a JSON
33+
// number — it emits "33/100"-style fractional strings — so we
34+
// pre-convert to json.Number for downstream bloblang/SQL use).
3135
func preserveLogicalTypeOpts() []avro.SchemaOpt {
3236
return []avro.SchemaOpt{
3337
avro.NewCustomType[time.Time, int32](
@@ -55,8 +59,6 @@ func preserveLogicalTypeOpts() []avro.SchemaOpt {
5559
return avro.DurationFromBytes(b).String(), nil
5660
},
5761
},
58-
// Decimal: raw type is []byte. Convert to json.Number for the
59-
// SetStructuredMut path (json.Marshal can't handle *big.Rat).
6062
avro.CustomType{
6163
LogicalType: "decimal",
6264
Decode: func(v any, node *avro.SchemaNode) (any, error) {

internal/impl/confluent/ecs_avro_field_level_logical_type_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"encoding/json"
77
"fmt"
88
"testing"
9+
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
13+
"github.com/twmb/avro"
1214

1315
"github.com/redpanda-data/benthos/v4/public/schema"
1416
)
@@ -211,3 +213,66 @@ func mustJSON(v any) string {
211213
b, _ := json.MarshalIndent(v, "", " ")
212214
return string(b)
213215
}
216+
217+
// TestUpstreamTwmbHonoursSiblingFormLogicalType is a regression guard
218+
// for the upstream twmb/avro dependency. After the bump that landed our
219+
// own field-level logicalType fix upstream (twmb/avro PR #38), the
220+
// value-side decoder now produces time.Time for sibling-form schemas
221+
// natively — previously it returned int64 because the parser silently
222+
// dropped the field-level annotation.
223+
//
224+
// If twmb ever regresses on this, the shredder's metadata-driven
225+
// numeric scaling path in iceberg/shredder/temporal.go would become
226+
// load-bearing again. Pinning the upstream behaviour here makes any
227+
// such regression surface immediately, in the package that depends on
228+
// it, rather than as a customer report.
229+
func TestUpstreamTwmbHonoursSiblingFormLogicalType(t *testing.T) {
230+
cases := []struct {
231+
name string
232+
schema string
233+
}{
234+
{
235+
"primitive timestamp-millis",
236+
`{"type":"record","name":"R","fields":[
237+
{"name":"ts","type":"long","logicalType":"timestamp-millis"}
238+
]}`,
239+
},
240+
{
241+
"union timestamp-millis (null first)",
242+
`{"type":"record","name":"R","fields":[
243+
{"name":"ts","type":["null","long"],"logicalType":"timestamp-millis"}
244+
]}`,
245+
},
246+
}
247+
248+
for _, tc := range cases {
249+
t.Run(tc.name, func(t *testing.T) {
250+
s, err := avro.Parse(tc.schema)
251+
require.NoError(t, err)
252+
253+
// Encode a value via the binary-shape side schema to avoid
254+
// requiring time.Time on the encode path; this is the same
255+
// trick our integration test uses, and what a Java/JDBC
256+
// producer would do — write raw long bytes on the wire and
257+
// rely on the schema's logicalType for interpretation.
258+
bin := avro.MustParse(`{"type":"record","name":"R","fields":[
259+
{"name":"ts","type":["null","long"]}
260+
]}`)
261+
tsMillis := int64(1700000000000)
262+
payload, err := bin.Encode(&struct {
263+
TS *int64 `avro:"ts"`
264+
}{TS: &tsMillis})
265+
require.NoError(t, err)
266+
267+
var native any
268+
_, err = s.Decode(payload, &native)
269+
require.NoError(t, err)
270+
271+
row, ok := native.(map[string]any)
272+
require.True(t, ok, "expected map, got %T", native)
273+
_, isTime := row["ts"].(time.Time)
274+
assert.True(t, isTime,
275+
"upstream twmb must decode sibling-form timestamp-millis as time.Time; got %T (regression in twmb/avro PR #38)", row["ts"])
276+
})
277+
}
278+
}

internal/impl/confluent/processor_schema_registry_decode_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,16 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
290290
output: `{"Name":"foo","MaybeHobby":null,"Address": null}`,
291291
},
292292
{
293-
schemaID: 4,
294-
name: "successful message with logical types",
295-
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
296-
output: `{"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":20192000000000},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000},"pos_0_33333333":{"bytes.decimal":0.33}}`,
293+
schemaID: 4,
294+
name: "successful message with logical types",
295+
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
296+
// pos_0_33333333 emits "!" — the spec-blessed codepoint-mapped
297+
// string form for decimal under Avro JSON encoding (the single
298+
// byte 0x21 = "!"). twmb/avro v1.7.3+ matches Java's
299+
// JsonEncoder here; users who want a numeric form should set
300+
// preserve_logical_types: true, which routes through our
301+
// json.Number CustomType registration instead.
302+
output: `{"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":20192000000000},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000},"pos_0_33333333":{"bytes.decimal":"!"}}`,
297303
preservedOutput: `{"int_time_millis":{"int.time-millis":"0001-01-01T09:47:25Z"},"long_time_micros":{"long.time-micros":"0001-08-22T16:53:20Z"},"long_timestamp_micros":{"long.timestamp-micros":"3939-01-01T00:00:00Z"},"pos_0_33333333":{"bytes.decimal":0.33}}`,
298304
},
299305
{
@@ -503,10 +509,12 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
503509
output: `{"Name":"foo","MaybeHobby":null,"Address": null}`,
504510
},
505511
{
506-
schemaID: 4,
507-
name: "successful message with logical types",
508-
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
509-
output: `{"int_time_millis":35245000,"long_time_micros":20192000000000,"long_timestamp_micros":62135596800000000,"pos_0_33333333":0.33}`,
512+
schemaID: 4,
513+
name: "successful message with logical types",
514+
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
515+
// pos_0_33333333 emits "!" in default mode — see the matching
516+
// note on TestSchemaRegistryDecodeAvro for why.
517+
output: `{"int_time_millis":35245000,"long_time_micros":20192000000000,"long_timestamp_micros":62135596800000000,"pos_0_33333333":"!"}`,
510518
preservedOutput: `{"int_time_millis":"0001-01-01T09:47:25Z","long_time_micros":"0001-08-22T16:53:20Z","long_timestamp_micros":"3939-01-01T00:00:00Z","pos_0_33333333":0.33}`,
511519
},
512520
{

internal/impl/confluent/serde_avro.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ func (s *schemaRegistryDecoder) getAvroDecoder(ctx context.Context, aschema fran
201201

202202
// Build parse options for preserve_logical_types: register custom
203203
// types that convert time.Duration→time.Time for time-of-day fields,
204-
// avro.Duration→string, and optionally Kafka Connect types.
204+
// avro.Duration→string, decimal bytes→json.Number, and optionally
205+
// Kafka Connect (Debezium) types.
205206
var parseOpts []avro.SchemaOpt
206207
if s.cfg.avro.preserveLogicalTypes {
207208
parseOpts = append(parseOpts, preserveLogicalTypeOpts()...)

0 commit comments

Comments
 (0)