@@ -11,25 +11,25 @@ package serde
1111
1212import (
1313 "context"
14+ "encoding/json"
1415 "fmt"
1516
16- "github.com/hamba/avro/v2"
17- "github.com/linkedin/goavro/v2"
17+ "github.com/twmb/avro"
1818 "github.com/twmb/franz-go/pkg/sr"
1919)
2020
2121// newAvroEncoder generates a serializer function that can encode the provided
2222// record using the specified schema. If the schema includes references, it
2323// retrieves them using the supplied client. The generated function returns the
2424// record encoded in the protobuf wire format.
25- func newAvroEncoder (codec * goavro. Codec , schemaID int ) (serdeFunc , error ) {
25+ func newAvroEncoder (schema * avro. Schema , schemaID int ) (serdeFunc , error ) {
2626 return func (record []byte ) ([]byte , error ) {
27- native , _ , err := codec . NativeFromTextual ( record )
28- if err != nil {
27+ var native any
28+ if err := json . Unmarshal ( record , & native ); err != nil {
2929 return nil , fmt .Errorf ("unable to parse record with the provided schema: %v" , err )
3030 }
3131
32- binary , err := codec . BinaryFromNative ( nil , native )
32+ binary , err := schema . Encode ( native )
3333 if err != nil {
3434 return nil , fmt .Errorf ("unable to binary encode the record: %v" , err )
3535 }
@@ -47,66 +47,54 @@ func newAvroEncoder(codec *goavro.Codec, schemaID int) (serdeFunc, error) {
4747// newAvroDecoder generates a deserializer function that decodes the given
4848// avro-encoded record according to the schema. The generated function expects
4949// the record bytes (without the wire format).
50- func newAvroDecoder (codec * goavro. Codec ) (serdeFunc , error ) {
50+ func newAvroDecoder (schema * avro. Schema ) (serdeFunc , error ) {
5151 return func (record []byte ) ([]byte , error ) {
52- native , _ , err := codec . NativeFromBinary ( record )
53- if err != nil {
52+ var native any
53+ if _ , err := schema . Decode ( record , & native ); err != nil {
5454 return nil , fmt .Errorf ("unable to decode avro-encoded record: %v" , err )
5555 }
56-
57- return codec .TextualFromNative (nil , native )
56+ return json .Marshal (native )
5857 }, nil
5958}
6059
61- // generateAvroCodec will generate an AVRO codec, parsing the references if
62- // there are any.
63- func generateAvroCodec (ctx context.Context , cl * sr.Client , schema * sr.Schema ) (* goavro.Codec , error ) {
64- schemaStr := schema .Schema
65- if len (schema .References ) > 0 {
66- err := parseAvroReferences (ctx , cl , schema )
67- if err != nil {
68- return nil , fmt .Errorf ("unable to parse references: %v" , err )
69- }
70-
71- // We use hamba/avro to for the schema reference resolution.
72- refCodec , err := avro .Parse (schema .Schema )
60+ // generateAvroSchema parses the schema and its references, returning a
61+ // compiled schema that can be used for encoding and decoding.
62+ func generateAvroSchema (ctx context.Context , cl * sr.Client , schema * sr.Schema ) (* avro.Schema , error ) {
63+ if len (schema .References ) == 0 {
64+ s , err := avro .Parse (schema .Schema )
7365 if err != nil {
7466 return nil , fmt .Errorf ("unable to parse schema: %v" , err )
7567 }
76- schemaStr = refCodec . String ()
68+ return s , nil
7769 }
78- codec , err := goavro .NewCodec (schemaStr )
70+ cache := avro .NewSchemaCache ()
71+ if err := parseAvroReferences (ctx , cl , cache , schema ); err != nil {
72+ return nil , fmt .Errorf ("unable to parse references: %v" , err )
73+ }
74+ s , err := cache .Parse (schema .Schema )
7975 if err != nil {
80- return nil , fmt .Errorf ("unable to generate codec for the given schema: %v" , err )
76+ return nil , fmt .Errorf ("unable to parse schema: %v" , err )
8177 }
82- return codec , nil
78+ return s , nil
8379}
8480
85- // parseAvroReferences uses hamba/avro Parse method to parse every reference. We
86- // don't need to store the references since the library already cache these
87- // schemas and use it later for handling references in the parent schema.
88- func parseAvroReferences (ctx context.Context , cl * sr.Client , schema * sr.Schema ) error {
89- if len (schema .References ) == 0 {
90- _ , err := avro .Parse (schema .Schema )
91- if err != nil {
92- return err
93- }
94- return nil
95- }
81+ // parseAvroReferences recursively parses all schema references into the cache
82+ // so they are available when parsing the parent schema.
83+ func parseAvroReferences (ctx context.Context , cl * sr.Client , cache * avro.SchemaCache , schema * sr.Schema ) error {
9684 for _ , ref := range schema .References {
9785 r , err := cl .SchemaByVersion (ctx , ref .Subject , ref .Version )
9886 if err != nil {
9987 return err
10088 }
10189 refSchema := r .Schema
102- err = parseAvroReferences (ctx , cl , & refSchema )
103- if err != nil {
90+ if len (refSchema .References ) > 0 {
91+ if err := parseAvroReferences (ctx , cl , cache , & refSchema ); err != nil {
92+ return fmt .Errorf ("unable to parse schema with subject %q and version %v: %v" , ref .Subject , ref .Version , err )
93+ }
94+ }
95+ if _ , err := cache .Parse (refSchema .Schema ); err != nil {
10496 return fmt .Errorf ("unable to parse schema with subject %q and version %v: %v" , ref .Subject , ref .Version , err )
10597 }
10698 }
107- _ , err := avro .Parse (schema .Schema )
108- if err != nil {
109- return err
110- }
11199 return nil
112100}
0 commit comments