Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,12 @@ use_repo(
"com_github_fatih_color",
"com_github_google_uuid",
"com_github_goreleaser_nfpm_v2",
"com_github_hamba_avro_v2",
"com_github_hashicorp_go_multierror",
"com_github_kballard_go_shellquote",
"com_github_kr_pretty",
"com_github_kr_text",
"com_github_ladicle_tabwriter",
"com_github_lestrrat_go_jwx_v2",
"com_github_linkedin_goavro_v2",
"com_github_lithammer_go_jump_consistent_hash",
"com_github_lorenzosaino_go_sysctl",
"com_github_mark3labs_mcp_go",
Expand Down Expand Up @@ -314,6 +312,7 @@ use_repo(
"com_github_stretchr_testify",
"com_github_tidwall_sjson",
"com_github_tklauser_go_sysconf",
"com_github_twmb_avro",
"com_github_twmb_franz_go",
"com_github_twmb_franz_go_pkg_kadm",
"com_github_twmb_franz_go_pkg_kfake",
Expand Down
11 changes: 2 additions & 9 deletions src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ module github.com/redpanda-data/redpanda/src/go/rpk

go 1.26.2

// add the git commit hash as the target version and `go mod tidy` will transform it into pseudo-version
replace github.com/hamba/avro/v2 => github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525

require (
buf.build/gen/go/redpandadata/ai-gateway/connectrpc/go v1.19.1-20260203101113-1c7702ddb57a.2
buf.build/gen/go/redpandadata/cloud/connectrpc/go v1.19.1-20251208213618-d95eb1f5bf36.2
Expand Down Expand Up @@ -32,12 +29,10 @@ require (
github.com/docker/go-units v0.5.0
github.com/fatih/color v1.18.0
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.30.0
github.com/hashicorp/go-multierror v1.1.1
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/kr/text v0.2.0
github.com/lestrrat-go/jwx/v2 v2.1.6
github.com/linkedin/goavro/v2 v2.14.1
github.com/lithammer/go-jump-consistent-hash v1.0.2
github.com/lorenzosaino/go-sysctl v0.3.1
github.com/mark3labs/mcp-go v0.44.0
Expand All @@ -63,6 +58,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/tidwall/sjson v1.2.5
github.com/tklauser/go-sysconf v0.3.15
github.com/twmb/avro v1.3.4
github.com/twmb/franz-go v1.20.4
github.com/twmb/franz-go/pkg/kadm v1.17.1
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251024215757-aea970d4d0d2
Expand Down Expand Up @@ -98,7 +94,6 @@ require (
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/cfssl v1.6.5 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -115,15 +110,14 @@ require (
github.com/goccy/go-json v0.10.3 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/jsonschema v0.13.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/lestrrat-go/blackmagic v1.0.3 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
github.com/lestrrat-go/httprc v1.0.6 // indirect
Expand All @@ -133,7 +127,6 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/atomicwriter v0.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
18 changes: 4 additions & 14 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM=
github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY=
github.com/cloudflare/cfssl v1.6.5 h1:46zpNkm6dlNkMZH/wMW22ejih6gIaJbzL2du6vD7ZeI=
github.com/cloudflare/cfssl v1.6.5/go.mod h1:Bk1si7sq8h2+yVEDrFJiz3d7Aw+pfjjJSZVaD+Taky4=
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M=
github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE=
Expand Down Expand Up @@ -133,9 +131,6 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
Expand Down Expand Up @@ -170,8 +165,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand All @@ -192,8 +187,6 @@ github.com/lestrrat-go/jwx/v2 v2.1.6 h1:hxM1gfDILk/l5ylers6BX/Eq1m/pnxe9NBwW6lVf
github.com/lestrrat-go/jwx/v2 v2.1.6/go.mod h1:Y722kU5r/8mV7fYDifjug0r8FK8mZdw0K0GpJw/l8pU=
github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU=
github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
github.com/linkedin/goavro/v2 v2.14.1 h1:/8VjDpd38PRsy02JS0jflAu7JZPfJcGTwqWgMkFS2iI=
github.com/linkedin/goavro/v2 v2.14.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/lithammer/go-jump-consistent-hash v1.0.2 h1:w74N9XiMa4dWZdoVnfLbnDhfpGOMCxlrudzt2e7wtyk=
github.com/lithammer/go-jump-consistent-hash v1.0.2/go.mod h1:4MD1WDikNGnb9D56hAtscaZaOWOiCG+lLbRR5ZN9JL0=
github.com/lorenzosaino/go-sysctl v0.3.1 h1:3phX80tdITw2fJjZlwbXQnDWs4S30beNcMbw0cn0HtY=
Expand All @@ -218,8 +211,6 @@ github.com/miekg/dns v1.1.72 h1:vhmr+TF2A3tuoGNkLDFK9zi36F2LS+hKTRW0Uf8kbzI=
github.com/miekg/dns v1.1.72/go.mod h1:+EuEPhdHOsfk6Wk5TT2CzssZdqkmFhf8r+aVyDEToIs=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw=
Expand Down Expand Up @@ -270,8 +261,6 @@ github.com/redpanda-data/common-go/rpadmin v0.2.5 h1:mKYr5ffO6TmBwNSAHgnMI703bbf
github.com/redpanda-data/common-go/rpadmin v0.2.5/go.mod h1:uOAY10WXPtcDPU0aUdpkqHR+b1BqUvRhlvMf0vha73A=
github.com/redpanda-data/common-go/rpsr v0.1.4 h1:d9lu5q5wyhZWBYR1GnZkq+eZGKU0qoaSwwybRS9Uk2k=
github.com/redpanda-data/common-go/rpsr v0.1.4/go.mod h1:qVa7b0yaCRdZDn5dcZ9CazqVr4jYbgtOJUywI2X3G3I=
github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525 h1:vskZrV6q8W8flL0Ud23AJUYAd8ZgTadO45+loFnG2G0=
github.com/redpanda-data/go-avro/v2 v2.0.0-20240405204525-77b1144dc525/go.mod h1:3YqAM7pgS5vW/EH7naCjFqnAajSgi0f0CfMe1HGhLxQ=
github.com/redpanda-data/protoc-gen-go-mcp v0.0.0-20250930092048-a98b94b5957a h1:jNHT6Fcy/rBAFnX8rjbwrJ+lSF2Ufa1jqmnCV6m6RKY=
github.com/redpanda-data/protoc-gen-go-mcp v0.0.0-20250930092048-a98b94b5957a/go.mod h1:bGTBF/Nvx6NQ/R4Jje5lBjTpfWrazOna3udJwx4EAHE=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
Expand Down Expand Up @@ -315,7 +304,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
Expand All @@ -334,6 +322,8 @@ github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8O
github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4=
github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso=
github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ=
github.com/twmb/avro v1.3.4 h1:4tTV207HOUHKTdAMv6fGPyqgwmGfMLUfnFg5R4cfIX0=
github.com/twmb/avro v1.3.4/go.mod h1:TUQS96Ptl8tDRyK0Jw91FXIVCoPKz4sXxvUSShiG5FA=
github.com/twmb/franz-go v1.20.4 h1:1wTvyLTOxS0oJh5ro/DVt2JHVdx7/kGNtmtFhbcr0O0=
github.com/twmb/franz-go v1.20.4/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA=
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
Expand Down
3 changes: 1 addition & 2 deletions src/go/rpk/pkg/serde/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ go_library(
"//src/go/rpk/pkg/serde/embed",
"@com_github_bufbuild_protocompile//:protocompile",
"@com_github_bufbuild_protocompile//linker",
"@com_github_hamba_avro_v2//:avro",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_santhosh_tekuri_jsonschema_v6//:jsonschema",
"@com_github_twmb_avro//:avro",
"@com_github_twmb_franz_go_pkg_sr//:sr",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//proto",
Expand Down
89 changes: 41 additions & 48 deletions src/go/rpk/pkg/serde/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,21 @@ import (
"context"
"fmt"

"github.com/hamba/avro/v2"
"github.com/linkedin/goavro/v2"
"github.com/twmb/avro"
"github.com/twmb/franz-go/pkg/sr"
)

// newAvroEncoder generates a serializer function that can encode the provided
// record using the specified schema. If the schema includes references, it
// retrieves them using the supplied client. The generated function returns the
// record encoded in the protobuf wire format.
func newAvroEncoder(codec *goavro.Codec, schemaID int) (serdeFunc, error) {
// record using the specified schema. The generated function returns the record
// encoded in the Confluent Schema Registry wire format.
func newAvroEncoder(schema *avro.Schema, schemaID int) (serdeFunc, error) {
return func(record []byte) ([]byte, error) {
native, _, err := codec.NativeFromTextual(record)
if err != nil {
var native any
if err := schema.DecodeJSON(record, &native); err != nil {
return nil, fmt.Errorf("unable to parse record with the provided schema: %v", err)
}

binary, err := codec.BinaryFromNative(nil, native)
binary, err := schema.Encode(native)
if err != nil {
return nil, fmt.Errorf("unable to binary encode the record: %v", err)
Comment thread
r-vasquez marked this conversation as resolved.
}
Expand All @@ -47,66 +45,61 @@ func newAvroEncoder(codec *goavro.Codec, schemaID int) (serdeFunc, error) {
// newAvroDecoder generates a deserializer function that decodes the given
// avro-encoded record according to the schema. The generated function expects
// the record bytes (without the wire format).
func newAvroDecoder(codec *goavro.Codec) (serdeFunc, error) {
func newAvroDecoder(schema *avro.Schema) (serdeFunc, error) {
return func(record []byte) ([]byte, error) {
native, _, err := codec.NativeFromBinary(record)
if err != nil {
var native any
if _, err := schema.Decode(record, &native); err != nil {
return nil, fmt.Errorf("unable to decode avro-encoded record: %v", err)
}

return codec.TextualFromNative(nil, native)
return schema.EncodeJSON(native)
}, nil
Comment thread
r-vasquez marked this conversation as resolved.
}

// generateAvroCodec will generate an AVRO codec, parsing the references if
// there are any.
func generateAvroCodec(ctx context.Context, cl *sr.Client, schema *sr.Schema) (*goavro.Codec, error) {
schemaStr := schema.Schema
if len(schema.References) > 0 {
err := parseAvroReferences(ctx, cl, schema)
if err != nil {
return nil, fmt.Errorf("unable to parse references: %v", err)
}

// We use hamba/avro to for the schema reference resolution.
refCodec, err := avro.Parse(schema.Schema)
// generateAvroSchema parses the schema and its references, returning a
// compiled schema that can be used for encoding and decoding.
func generateAvroSchema(ctx context.Context, cl *sr.Client, schema *sr.Schema) (*avro.Schema, error) {
if len(schema.References) == 0 {
s, err := avro.Parse(schema.Schema)
if err != nil {
return nil, fmt.Errorf("unable to parse schema: %v", err)
}
schemaStr = refCodec.String()
return s, nil
}
codec, err := goavro.NewCodec(schemaStr)
cache := &avro.SchemaCache{}
seen := make(map[string]bool)
if err := parseAvroReferences(ctx, cl, cache, schema, seen); err != nil {
return nil, fmt.Errorf("unable to parse references: %v", err)
}
s, err := cache.Parse(schema.Schema)
if err != nil {
return nil, fmt.Errorf("unable to generate codec for the given schema: %v", err)
return nil, fmt.Errorf("unable to parse schema: %v", err)
}
return codec, nil
return s, nil
}

// parseAvroReferences uses hamba/avro Parse method to parse every reference. We
// don't need to store the references since the library already cache these
// schemas and use it later for handling references in the parent schema.
func parseAvroReferences(ctx context.Context, cl *sr.Client, schema *sr.Schema) error {
if len(schema.References) == 0 {
_, err := avro.Parse(schema.Schema)
if err != nil {
return err
}
return nil
}
// parseAvroReferences recursively parses all schema references into the cache
// so they are available when parsing the parent schema. The seen map tracks
// already-fetched subject+version pairs to avoid redundant registry requests.
func parseAvroReferences(ctx context.Context, cl *sr.Client, cache *avro.SchemaCache, schema *sr.Schema, seen map[string]bool) error {
for _, ref := range schema.References {
key := fmt.Sprintf("%s-%d", ref.Subject, ref.Version)
if seen[key] {
continue
}
seen[key] = true
r, err := cl.SchemaByVersion(ctx, ref.Subject, ref.Version)
if err != nil {
return err
return fmt.Errorf("unable to get reference schema with subject %q and version %v: %v", ref.Subject, ref.Version, err)
}
refSchema := r.Schema
err = parseAvroReferences(ctx, cl, &refSchema)
if err != nil {
if len(refSchema.References) > 0 {
if err := parseAvroReferences(ctx, cl, cache, &refSchema, seen); err != nil {
return fmt.Errorf("unable to parse schema with subject %q and version %v: %v", ref.Subject, ref.Version, err)
}
}
if _, err := cache.Parse(refSchema.Schema); err != nil {
return fmt.Errorf("unable to parse schema with subject %q and version %v: %v", ref.Subject, ref.Version, err)
}
}
_, err := avro.Parse(schema.Schema)
if err != nil {
return err
}
return nil
}
Loading
Loading