From dd8b4251203002389d4caa02b35ce4dc0f4abc1c Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 1 Mar 2026 15:19:55 -0700 Subject: [PATCH] rpk/serde: replace hamba/avro and linkedin/goavro with twmb/avro Replace the two separate avro libraries (hamba/avro for schema reference resolution, linkedin/goavro for encoding/decoding) with the single twmb/avro library, which handles both. This changes union serialization output: decoded union values are now written as bare JSON values rather than the Avro-JSON {"type": value} form used by linkedin/goavro. The encoder accepts both formats as input. Encoding and decoding now use twmb/avro's schema-aware DecodeJSON and EncodeJSON methods, which preserve full int64 precision for Avro long fields (json.Unmarshal loses precision beyond 2^53). Schema reference resolution now uses an explicit SchemaCache rather than relying on hamba/avro's implicit global cache. This also removes the replace directive for the redpanda-data/go-avro fork. (cherry picked from commit 59dcf96edc800bbe9068b941a357bcb0ef0ccab4) --- MODULE.bazel | 3 +- src/go/rpk/go.mod | 11 +-- src/go/rpk/go.sum | 18 ++--- src/go/rpk/pkg/serde/BUILD | 3 +- src/go/rpk/pkg/serde/avro.go | 89 +++++++++++------------- src/go/rpk/pkg/serde/avro_test.go | 112 ++++++++++++++++++++++++++---- src/go/rpk/pkg/serde/serde.go | 6 +- 7 files changed, 149 insertions(+), 93 deletions(-) diff --git a/MODULE.bazel b/MODULE.bazel index d23bc70aabdde..4b99afbb9991c 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -275,14 +275,12 @@ use_repo( "com_github_fatih_color", "com_github_google_uuid", "com_github_goreleaser_nfpm_v2", - "com_github_hamba_avro_v2", "com_github_hashicorp_go_multierror", "com_github_kballard_go_shellquote", "com_github_kr_pretty", "com_github_kr_text", "com_github_ladicle_tabwriter", "com_github_lestrrat_go_jwx_v2", - "com_github_linkedin_goavro_v2", "com_github_lithammer_go_jump_consistent_hash", "com_github_lorenzosaino_go_sysctl", "com_github_mark3labs_mcp_go", @@ -314,6 +312,7 @@ use_repo( "com_github_stretchr_testify", "com_github_tidwall_sjson", "com_github_tklauser_go_sysconf", + "com_github_twmb_avro", "com_github_twmb_franz_go", "com_github_twmb_franz_go_pkg_kadm", "com_github_twmb_franz_go_pkg_kfake", diff --git a/src/go/rpk/go.mod b/src/go/rpk/go.mod index 5e6158aaf4c59..a1b77e7185427 100644 --- a/src/go/rpk/go.mod +++ b/src/go/rpk/go.mod @@ -2,9 +2,6 @@ module github.com/redpanda-data/redpanda/src/go/rpk go 1.26.2 -// add the git commit hash as the target version and `go mod tidy` will transform it into pseudo-version -replace github.com/hamba/avro/v2 => github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 - require ( buf.build/gen/go/redpandadata/ai-gateway/connectrpc/go v1.19.1-20260203101113-1c7702ddb57a.2 buf.build/gen/go/redpandadata/cloud/connectrpc/go v1.19.1-20251208213618-d95eb1f5bf36.2 @@ -32,12 +29,10 @@ require ( github.com/docker/go-units v0.5.0 github.com/fatih/color v1.18.0 github.com/google/uuid v1.6.0 - github.com/hamba/avro/v2 v2.30.0 github.com/hashicorp/go-multierror v1.1.1 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kr/text v0.2.0 github.com/lestrrat-go/jwx/v2 v2.1.6 - github.com/linkedin/goavro/v2 v2.14.1 github.com/lithammer/go-jump-consistent-hash v1.0.2 github.com/lorenzosaino/go-sysctl v0.3.1 github.com/mark3labs/mcp-go v0.44.0 @@ -63,6 +58,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/tidwall/sjson v1.2.5 github.com/tklauser/go-sysconf v0.3.15 + github.com/twmb/avro v1.3.4 github.com/twmb/franz-go v1.20.4 github.com/twmb/franz-go/pkg/kadm v1.17.1 github.com/twmb/franz-go/pkg/kfake v0.0.0-20251024215757-aea970d4d0d2 @@ -98,7 +94,6 @@ require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cloudflare/cfssl v1.6.5 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -115,7 +110,6 @@ require ( github.com/goccy/go-json v0.10.3 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic-models v0.7.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -123,7 +117,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/lestrrat-go/blackmagic v1.0.3 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httprc v1.0.6 // indirect @@ -133,7 +127,6 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/sys/atomicwriter v0.1.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/src/go/rpk/go.sum b/src/go/rpk/go.sum index 9802df3a3329e..edd089e6f6526 100644 --- a/src/go/rpk/go.sum +++ b/src/go/rpk/go.sum @@ -68,8 +68,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM= github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY= -github.com/cloudflare/cfssl v1.6.5 h1:46zpNkm6dlNkMZH/wMW22ejih6gIaJbzL2du6vD7ZeI= -github.com/cloudflare/cfssl v1.6.5/go.mod h1:Bk1si7sq8h2+yVEDrFJiz3d7Aw+pfjjJSZVaD+Taky4= github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= @@ -133,9 +131,6 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo= github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -170,8 +165,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -192,8 +187,6 @@ github.com/lestrrat-go/jwx/v2 v2.1.6 h1:hxM1gfDILk/l5ylers6BX/Eq1m/pnxe9NBwW6lVf github.com/lestrrat-go/jwx/v2 v2.1.6/go.mod h1:Y722kU5r/8mV7fYDifjug0r8FK8mZdw0K0GpJw/l8pU= github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= -github.com/linkedin/goavro/v2 v2.14.1 h1:/8VjDpd38PRsy02JS0jflAu7JZPfJcGTwqWgMkFS2iI= -github.com/linkedin/goavro/v2 v2.14.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/lithammer/go-jump-consistent-hash v1.0.2 h1:w74N9XiMa4dWZdoVnfLbnDhfpGOMCxlrudzt2e7wtyk= github.com/lithammer/go-jump-consistent-hash v1.0.2/go.mod h1:4MD1WDikNGnb9D56hAtscaZaOWOiCG+lLbRR5ZN9JL0= github.com/lorenzosaino/go-sysctl v0.3.1 h1:3phX80tdITw2fJjZlwbXQnDWs4S30beNcMbw0cn0HtY= @@ -218,8 +211,6 @@ github.com/miekg/dns v1.1.72 h1:vhmr+TF2A3tuoGNkLDFK9zi36F2LS+hKTRW0Uf8kbzI= github.com/miekg/dns v1.1.72/go.mod h1:+EuEPhdHOsfk6Wk5TT2CzssZdqkmFhf8r+aVyDEToIs= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= -github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw= @@ -270,8 +261,6 @@ github.com/redpanda-data/common-go/rpadmin v0.2.5 h1:mKYr5ffO6TmBwNSAHgnMI703bbf github.com/redpanda-data/common-go/rpadmin v0.2.5/go.mod h1:uOAY10WXPtcDPU0aUdpkqHR+b1BqUvRhlvMf0vha73A= github.com/redpanda-data/common-go/rpsr v0.1.4 h1:d9lu5q5wyhZWBYR1GnZkq+eZGKU0qoaSwwybRS9Uk2k= github.com/redpanda-data/common-go/rpsr v0.1.4/go.mod h1:qVa7b0yaCRdZDn5dcZ9CazqVr4jYbgtOJUywI2X3G3I= -github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 h1:vskZrV6q8W8flL0Ud23AJUYAd8ZgTadO45+loFnG2G0= -github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525/go.mod h1:3YqAM7pgS5vW/EH7naCjFqnAajSgi0f0CfMe1HGhLxQ= github.com/redpanda-data/protoc-gen-go-mcp v0.0.0-20250930092048-a98b94b5957a h1:jNHT6Fcy/rBAFnX8rjbwrJ+lSF2Ufa1jqmnCV6m6RKY= github.com/redpanda-data/protoc-gen-go-mcp v0.0.0-20250930092048-a98b94b5957a/go.mod h1:bGTBF/Nvx6NQ/R4Jje5lBjTpfWrazOna3udJwx4EAHE= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= @@ -315,7 +304,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -334,6 +322,8 @@ github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8O github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso= github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= +github.com/twmb/avro v1.3.4 h1:4tTV207HOUHKTdAMv6fGPyqgwmGfMLUfnFg5R4cfIX0= +github.com/twmb/avro v1.3.4/go.mod h1:TUQS96Ptl8tDRyK0Jw91FXIVCoPKz4sXxvUSShiG5FA= github.com/twmb/franz-go v1.20.4 h1:1wTvyLTOxS0oJh5ro/DVt2JHVdx7/kGNtmtFhbcr0O0= github.com/twmb/franz-go v1.20.4/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA= github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw= diff --git a/src/go/rpk/pkg/serde/BUILD b/src/go/rpk/pkg/serde/BUILD index a7fb6b167a987..f52a02192875f 100644 --- a/src/go/rpk/pkg/serde/BUILD +++ b/src/go/rpk/pkg/serde/BUILD @@ -14,9 +14,8 @@ go_library( "//src/go/rpk/pkg/serde/embed", "@com_github_bufbuild_protocompile//:protocompile", "@com_github_bufbuild_protocompile//linker", - "@com_github_hamba_avro_v2//:avro", - "@com_github_linkedin_goavro_v2//:goavro", "@com_github_santhosh_tekuri_jsonschema_v6//:jsonschema", + "@com_github_twmb_avro//:avro", "@com_github_twmb_franz_go_pkg_sr//:sr", "@org_golang_google_protobuf//encoding/protojson", "@org_golang_google_protobuf//proto", diff --git a/src/go/rpk/pkg/serde/avro.go b/src/go/rpk/pkg/serde/avro.go index fe8a83dadc05f..de8cd92eb7284 100644 --- a/src/go/rpk/pkg/serde/avro.go +++ b/src/go/rpk/pkg/serde/avro.go @@ -13,23 +13,21 @@ import ( "context" "fmt" - "github.com/hamba/avro/v2" - "github.com/linkedin/goavro/v2" + "github.com/twmb/avro" "github.com/twmb/franz-go/pkg/sr" ) // newAvroEncoder generates a serializer function that can encode the provided -// record using the specified schema. If the schema includes references, it -// retrieves them using the supplied client. The generated function returns the -// record encoded in the protobuf wire format. -func newAvroEncoder(codec *goavro.Codec, schemaID int) (serdeFunc, error) { +// record using the specified schema. The generated function returns the record +// encoded in the Confluent Schema Registry wire format. +func newAvroEncoder(schema *avro.Schema, schemaID int) (serdeFunc, error) { return func(record []byte) ([]byte, error) { - native, _, err := codec.NativeFromTextual(record) - if err != nil { + var native any + if err := schema.DecodeJSON(record, &native); err != nil { return nil, fmt.Errorf("unable to parse record with the provided schema: %v", err) } - binary, err := codec.BinaryFromNative(nil, native) + binary, err := schema.Encode(native) if err != nil { return nil, fmt.Errorf("unable to binary encode the record: %v", err) } @@ -47,66 +45,61 @@ func newAvroEncoder(codec *goavro.Codec, schemaID int) (serdeFunc, error) { // newAvroDecoder generates a deserializer function that decodes the given // avro-encoded record according to the schema. The generated function expects // the record bytes (without the wire format). -func newAvroDecoder(codec *goavro.Codec) (serdeFunc, error) { +func newAvroDecoder(schema *avro.Schema) (serdeFunc, error) { return func(record []byte) ([]byte, error) { - native, _, err := codec.NativeFromBinary(record) - if err != nil { + var native any + if _, err := schema.Decode(record, &native); err != nil { return nil, fmt.Errorf("unable to decode avro-encoded record: %v", err) } - - return codec.TextualFromNative(nil, native) + return schema.EncodeJSON(native) }, nil } -// generateAvroCodec will generate an AVRO codec, parsing the references if -// there are any. -func generateAvroCodec(ctx context.Context, cl *sr.Client, schema *sr.Schema) (*goavro.Codec, error) { - schemaStr := schema.Schema - if len(schema.References) > 0 { - err := parseAvroReferences(ctx, cl, schema) - if err != nil { - return nil, fmt.Errorf("unable to parse references: %v", err) - } - - // We use hamba/avro to for the schema reference resolution. - refCodec, err := avro.Parse(schema.Schema) +// generateAvroSchema parses the schema and its references, returning a +// compiled schema that can be used for encoding and decoding. +func generateAvroSchema(ctx context.Context, cl *sr.Client, schema *sr.Schema) (*avro.Schema, error) { + if len(schema.References) == 0 { + s, err := avro.Parse(schema.Schema) if err != nil { return nil, fmt.Errorf("unable to parse schema: %v", err) } - schemaStr = refCodec.String() + return s, nil } - codec, err := goavro.NewCodec(schemaStr) + cache := &avro.SchemaCache{} + seen := make(map[string]bool) + if err := parseAvroReferences(ctx, cl, cache, schema, seen); err != nil { + return nil, fmt.Errorf("unable to parse references: %v", err) + } + s, err := cache.Parse(schema.Schema) if err != nil { - return nil, fmt.Errorf("unable to generate codec for the given schema: %v", err) + return nil, fmt.Errorf("unable to parse schema: %v", err) } - return codec, nil + return s, nil } -// parseAvroReferences uses hamba/avro Parse method to parse every reference. We -// don't need to store the references since the library already cache these -// schemas and use it later for handling references in the parent schema. -func parseAvroReferences(ctx context.Context, cl *sr.Client, schema *sr.Schema) error { - if len(schema.References) == 0 { - _, err := avro.Parse(schema.Schema) - if err != nil { - return err - } - return nil - } +// parseAvroReferences recursively parses all schema references into the cache +// so they are available when parsing the parent schema. The seen map tracks +// already-fetched subject+version pairs to avoid redundant registry requests. +func parseAvroReferences(ctx context.Context, cl *sr.Client, cache *avro.SchemaCache, schema *sr.Schema, seen map[string]bool) error { for _, ref := range schema.References { + key := fmt.Sprintf("%s-%d", ref.Subject, ref.Version) + if seen[key] { + continue + } + seen[key] = true r, err := cl.SchemaByVersion(ctx, ref.Subject, ref.Version) if err != nil { - return err + return fmt.Errorf("unable to get reference schema with subject %q and version %v: %v", ref.Subject, ref.Version, err) } refSchema := r.Schema - err = parseAvroReferences(ctx, cl, &refSchema) - if err != nil { + if len(refSchema.References) > 0 { + if err := parseAvroReferences(ctx, cl, cache, &refSchema, seen); err != nil { + return fmt.Errorf("unable to parse schema with subject %q and version %v: %v", ref.Subject, ref.Version, err) + } + } + if _, err := cache.Parse(refSchema.Schema); err != nil { return fmt.Errorf("unable to parse schema with subject %q and version %v: %v", ref.Subject, ref.Version, err) } } - _, err := avro.Parse(schema.Schema) - if err != nil { - return err - } return nil } diff --git a/src/go/rpk/pkg/serde/avro_test.go b/src/go/rpk/pkg/serde/avro_test.go index eb5a552b9884e..2172d5db2903d 100644 --- a/src/go/rpk/pkg/serde/avro_test.go +++ b/src/go/rpk/pkg/serde/avro_test.go @@ -26,13 +26,14 @@ import ( func Test_encodeDecodeAvroRecordNoReferences(t *testing.T) { tests := []struct { - name string - schema string - schemaID int - record string - expRecord string - expErr bool // error building the Serde. - expEncErr bool // error encoding the record. + name string + schema string + schemaID int + record string + expRecord string + rawCompare bool // compare raw JSON output instead of unmarshaling (for precision tests). + expErr bool // error building the Serde. + expEncErr bool // error encoding the record. }{ { name: "Valid record and schema", @@ -94,6 +95,83 @@ func Test_encodeDecodeAvroRecordNoReferences(t *testing.T) { schemaID: 1, record: "{}", expRecord: `{"name":null}`, + }, { + name: "Nullable string union with string value", + schema: ` +{ + "type":"record", + "name":"test", + "fields": + [{ + "name":"value", + "type":["null","string"] + }] +}`, + schemaID: 1, + record: `{"value":"hello"}`, + expRecord: `{"value":"hello"}`, + }, { + name: "Nullable string union with null value", + schema: ` +{ + "type":"record", + "name":"test", + "fields": + [{ + "name":"value", + "type":["null","string"] + }] +}`, + schemaID: 1, + record: `{"value":null}`, + expRecord: `{"value":null}`, + }, { + name: "Multi-branch union", + schema: ` +{ + "type":"record", + "name":"test", + "fields": + [{ + "name":"value", + "type":["null","int","string"] + }] +}`, + schemaID: 1, + record: `{"value":42}`, + expRecord: `{"value":42}`, + }, { + name: "Union with default null and empty input", + schema: ` +{ + "type":"record", + "name":"test", + "fields": + [{ + "name":"value", + "type":["null","string"], + "default":null + }] +}`, + schemaID: 1, + record: `{}`, + expRecord: `{"value":null}`, + }, { + name: "Long value preserves precision beyond float64", + schema: ` +{ + "type":"record", + "name":"test", + "fields": + [{ + "name":"big_id", + "type":"long" + }] +}`, + schemaID: 1, + record: `{"big_id":9007199254740993}`, + expRecord: `{"big_id":9007199254740993}`, + rawCompare: true, }, { name: "Invalid record for a valid schema", schema: ` @@ -163,15 +241,19 @@ func Test_encodeDecodeAvroRecordNoReferences(t *testing.T) { } require.NoError(t, err) - // To avoid any mismatch in the decode order of the elements, we - // better unmarshal and compare the unmarshaled records. - var gotU, expU map[string]any - err = json.Unmarshal(gotDecoded, &gotU) - require.NoError(t, err) - err = json.Unmarshal([]byte(tt.expRecord), &expU) - require.NoError(t, err) + if tt.rawCompare { + require.Equal(t, tt.expRecord, string(gotDecoded)) + } else { + // To avoid any mismatch in the decode order of the elements, we + // better unmarshal and compare the unmarshaled records. + var gotU, expU map[string]any + err = json.Unmarshal(gotDecoded, &gotU) + require.NoError(t, err) + err = json.Unmarshal([]byte(tt.expRecord), &expU) + require.NoError(t, err) - require.Equal(t, expU, gotU) + require.Equal(t, expU, gotU) + } }) } } diff --git a/src/go/rpk/pkg/serde/serde.go b/src/go/rpk/pkg/serde/serde.go index 8d1d80cfd950b..69de239154d55 100644 --- a/src/go/rpk/pkg/serde/serde.go +++ b/src/go/rpk/pkg/serde/serde.go @@ -34,15 +34,15 @@ type Serde struct { func NewSerde(ctx context.Context, cl *sr.Client, schema *sr.Schema, schemaID int, protoFQN string) (*Serde, error) { switch schema.Type { case sr.TypeAvro: - codec, err := generateAvroCodec(ctx, cl, schema) + avroSchema, err := generateAvroSchema(ctx, cl, schema) if err != nil { return nil, fmt.Errorf("unable to parse avro schema: %v", err) } - encFn, err := newAvroEncoder(codec, schemaID) + encFn, err := newAvroEncoder(avroSchema, schemaID) if err != nil { return nil, fmt.Errorf("unable to build avro encoder: %v", err) } - decFn, err := newAvroDecoder(codec) + decFn, err := newAvroDecoder(avroSchema) if err != nil { return nil, fmt.Errorf("unable to build avro decoder: %v", err) }