Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
buf.build/gen/go/redpandadata/common/connectrpc/go v1.19.1-20260323171043-6e06f84ad823.2
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.36.11-20260323171043-6e06f84ad823.1
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20260323171043-3635d3966b23.1
buf.build/go/hyperpb v0.1.3
cloud.google.com/go/aiplatform v1.121.0
cloud.google.com/go/bigquery v1.74.0
cloud.google.com/go/pubsub v1.50.1
Expand Down Expand Up @@ -319,6 +320,7 @@ require (
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/timandy/routine v1.1.5 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/twpayne/go-geom v1.6.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ atomicgo.dev/keyboard v0.2.9 h1:tOsIid3nlPLZ3lwgG8KZMp/SFmr7P0ssEN5JUsm78K8=
atomicgo.dev/keyboard v0.2.9/go.mod h1:BC4w9g00XkxH/f1HXhW2sXmJFOCWbKn9xrOunSFtExQ=
atomicgo.dev/schedule v0.1.0 h1:nTthAbhZS5YZmgYbb2+DH8uQIZcTlIrd4eYr3UQxEjs=
atomicgo.dev/schedule v0.1.0/go.mod h1:xeUa3oAkiuHYh8bKiQBRojqAMq3PXXbJujjb0hw8pEU=
buf.build/gen/go/bufbuild/hyperpb-examples/protocolbuffers/go v1.36.7-20250725192734-0dd56aa9cbbc.1 h1:bFnppdLYActzr2F0iomSrkjUnGgVufb0DtZxjKgTLGc=
buf.build/gen/go/bufbuild/hyperpb-examples/protocolbuffers/go v1.36.7-20250725192734-0dd56aa9cbbc.1/go.mod h1:x7jYNX5/7EPnsKHEq596krkOGzvR97/MsZw2fw3Mrq0=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 h1:PMmTMyvHScV9Mn8wc6ASge9uRcHy0jtqPd+fM35LmsQ=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1/go.mod h1:tvtbpgaVXZX4g6Pn+AnzFycuRK3MOz5HJfEGeEllXYM=
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 h1:vK2m7N3SPeHRqfVBj4FpmjlNCBEhR05OgCgJ+xIGfAs=
Expand All @@ -18,6 +20,10 @@ buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.36.11-20260323171043-
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.36.11-20260323171043-6e06f84ad823.1/go.mod h1:3w7EzexwlL6PIFGbbeKZ0yHfUlAmI0aBVzF/QoFb8Cg=
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20260323171043-3635d3966b23.1 h1:2lHER4ZVxYLGq0ZCrs9ckz5qzwoAryh71L1+CUve0uY=
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20260323171043-3635d3966b23.1/go.mod h1:akvBCH3f6fL10sDu4NppgjHrQITLe1m5YWLt/yiLEKI=
buf.build/go/hyperpb v0.1.3 h1:wiw2F7POvAe2VA2kkB0TAsFwj91lXbFrKM41D3ZgU1w=
buf.build/go/hyperpb v0.1.3/go.mod h1:IHXAM5qnS0/Fsnd7/HGDghFNvUET646WoHmq1FDZXIE=
buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A=
buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA=
cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4=
cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down Expand Up @@ -910,6 +916,8 @@ github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/cel-go v0.26.0 h1:DPGjXackMpJWH680oGY4lZhYjIameYmR+/6RBdDGmaI=
github.com/google/cel-go v0.26.0/go.mod h1:A9O8OU9rdvrK5MQyrqfIxo1a0u4g3sF8KB6PUIaryMM=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v25.12.19+incompatible h1:haMV2JRRJCe1998HeW/p0X9UaMTK6SDo0ffLn2+DbLs=
Expand Down Expand Up @@ -1513,6 +1521,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc=
github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo=
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9 h1:arwj11zP0yJIxIRiDn22E0H8PxfF7TsTrc2wIPFIsf4=
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9/go.mod h1:SKZx6stCn03JN3BOWTwvVIO2ajMkb/zQdTceXYhKw/4=
github.com/protocolbuffers/txtpbfmt v0.0.0-20251016062345-16587c79cd91 h1:s1LvMaU6mVwoFtbxv/rCZKE7/fwDmDY684FfUe4c1Io=
github.com/protocolbuffers/txtpbfmt v0.0.0-20251016062345-16587c79cd91/go.mod h1:JSbkp0BviKovYYt9XunS95M3mLPibE9bGg+Y95DsEEY=
github.com/pterm/pterm v0.12.27/go.mod h1:PhQ89w4i95rhgE+xedAoqous6K9X+r6aSOI2eFF7DZI=
Expand Down Expand Up @@ -1645,6 +1655,8 @@ github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3A
github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo=
github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stoewer/go-strcase v1.3.1 h1:iS0MdW+kVTxgMoE1LAZyMiYJFKlOzLooE4MxjirtkAs=
github.com/stoewer/go-strcase v1.3.1/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
Expand Down Expand Up @@ -1710,6 +1722,8 @@ github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
github.com/tilt-dev/fsnotify v1.4.8-0.20220602155310-fff9c274a375 h1:QB54BJwA6x8QU9nHY3xJSZR2kX9bgpZekRKGkLTmEXA=
github.com/tilt-dev/fsnotify v1.4.8-0.20220602155310-fff9c274a375/go.mod h1:xRroudyp5iVtxKqZCrA6n2TLFRBf8bmnjr1UD4x+z7g=
github.com/timandy/routine v1.1.5 h1:LSpm7Iijwb9imIPlucl4krpr2EeCeAUvifiQ9Uf5X+M=
github.com/timandy/routine v1.1.5/go.mod h1:kXslgIosdY8LW0byTyPnenDgn4/azt2euufAq9rK51w=
github.com/timeplus-io/proton-go-driver/v2 v2.1.4 h1:gSuIvv827cOgYh/6mRUl4THT+bG3DbOCVrr2RNKfOYE=
github.com/timeplus-io/proton-go-driver/v2 v2.1.4/go.mod h1:rUs4zvXvKsmuyFpzdJnnid6p8IvRJTa/n/jNQ2B6Dfw=
github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA=
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/confluent/serde_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *schemaRegistryDecoder) getProtobufDecoder(
defer mu.Unlock()
if msgDesc.FullName() != cachedMessageName {
cachedMessageName = msgDesc.FullName()
cachedDecoder = common.NewDynamicPbDecoder(msgDesc)
cachedDecoder = common.NewHyperPbDecoder(msgDesc, common.DefaultProfilingOptions)
}
return cachedDecoder
}
Expand Down
63 changes: 60 additions & 3 deletions internal/impl/protobuf/common/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func loadTestFileDescriptorSet(t testing.TB) (protoreflect.MessageDescriptor, *p

// BenchmarkProtobufToMessage benchmarks the complete pipeline of decoding protobuf
// and converting to a Benthos message, testing the matrix of:
// - Decoding: dynamicpb
// - Decoding: dynamicpb vs hyperpb (with PGO)
// - Conversion: Fast (SetStructuredMut) vs Slow (SetBytes)
func BenchmarkProtobufToMessage(b *testing.B) {
md, types := loadTestFileDescriptorSet(b)
Expand Down Expand Up @@ -109,8 +109,18 @@ func BenchmarkProtobufToMessage(b *testing.B) {
},
}

// Create decoder
dynamicpbDecoder := NewDynamicPbDecoder(md)
b.StopTimer()
// Profile-guided optimization settings for hyperpb
pgoOpts := ProfilingOptions{
Rate: 0.01, // Profile every message during priming
RecompileInterval: 100000, // Recompile after 1000 messages
}

// Create decoders
dynamicpbDecoder := NewDynamicPbDecoder(md, ProfilingOptions{})
hyperpbDecoder := NewHyperPbDecoder(md, pgoOpts)

b.StartTimer()

marshalOpts := protojson.MarshalOptions{Resolver: types}

Expand All @@ -127,6 +137,18 @@ func BenchmarkProtobufToMessage(b *testing.B) {
b.Fatal(err)
}

// Prime the hyperpb decoder with sample data to build profile
// Run with enough iterations to trigger at least one recompilation
for range pgoOpts.RecompileInterval * 2 {
err := hyperpbDecoder.WithDecoded(pbBytes, func(proto.Message) error {
return nil
})
if err != nil {
b.Fatal(err)
}
}
b.StartTimer()

// Benchmark: dynamicpb decode + fast conversion + read
b.Run(tc.name+"/dynamicpb/fast", func(b *testing.B) {
b.ReportAllocs()
Expand Down Expand Up @@ -163,5 +185,40 @@ func BenchmarkProtobufToMessage(b *testing.B) {
}
})

// Benchmark: hyperpb decode + fast conversion + read
b.Run(tc.name+"/hyperpb/fast", func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
msg := service.NewMessage(nil)
err := hyperpbDecoder.WithDecoded(pbBytes, func(decoded proto.Message) error {
return ToMessageFast(decoded.(protoreflect.Message), marshalOpts, msg)
})
if err != nil {
b.Fatal(err)
}
_, err = msg.AsStructured()
if err != nil {
b.Fatal(err)
}
}
})

// Benchmark: hyperpb decode + slow conversion + read
b.Run(tc.name+"/hyperpb/slow", func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
msg := service.NewMessage(nil)
err := hyperpbDecoder.WithDecoded(pbBytes, func(decoded proto.Message) error {
return ToMessageSlow(decoded.(protoreflect.Message), marshalOpts, msg)
})
if err != nil {
b.Fatal(err)
}
_, err = msg.AsStructured()
if err != nil {
b.Fatal(err)
}
}
})
}
}
16 changes: 16 additions & 0 deletions internal/impl/protobuf/common/decode_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import "google.golang.org/protobuf/proto"

// ProtobufDecoder is an interface for different methods to parse protobuf
// (the binary format) in a dynamic and reflective way.
//
// Currently, there are two supported approaches: dynamicpb and hyperpb
type ProtobufDecoder interface {
// Decode the buffer into a proto message that is passed into the callback.
//
Expand All @@ -23,3 +25,17 @@ type ProtobufDecoder interface {
// the provided callback.
WithDecoded(buf []byte, cb func(msg proto.Message) error) error
}

// ProfilingOptions specifies the profiling rate and how often we recompile
// for ProtobufDecoders that support profile-guided optimizations in flight (PGO)
type ProfilingOptions struct {
Rate float64
RecompileInterval int64
}

// DefaultProfilingOptions are the standard profiling settings used across all
// hyperpb decoder call sites.
var DefaultProfilingOptions = ProfilingOptions{
Rate: 0.01,
RecompileInterval: 100_000,
}
7 changes: 5 additions & 2 deletions internal/impl/protobuf/common/decode_dynamicpb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
)

// NewDynamicPbDecoder returns a new ProtobufDecoder based on standard proto reflection
// in the official protobuf library.
func NewDynamicPbDecoder(md protoreflect.MessageDescriptor) ProtobufDecoder {
// in the offical protobuf library.
func NewDynamicPbDecoder(
md protoreflect.MessageDescriptor,
_ ProfilingOptions,
) ProtobufDecoder {
return &dynamicPbParser{dynamicpb.NewMessageType(md)}
}

Expand Down
94 changes: 94 additions & 0 deletions internal/impl/protobuf/common/decode_hyperpb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2025 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

//go:build arm64 || amd64

package common

import (
"fmt"
"sync"
"sync/atomic"

"buf.build/go/hyperpb"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

// NewHyperPbDecoder returns a new ProtobufDecoder based on hyperpb.
func NewHyperPbDecoder(
md protoreflect.MessageDescriptor,
opts ProfilingOptions,
) ProtobufDecoder {
msgType := hyperpb.CompileMessageDescriptor(md)
parser := &hyperPbParser{opts: opts}
parser.state.Store(newHyperPbParserState(msgType, msgType.NewProfile()))
return parser
}

// hyperPbParserState bundles a compiled MessageType, its active Profile, and
// the Shared pool that goroutines use while this state is current.
//
// Keeping the pool inside the state means that when the state is atomically
// replaced after a profile-guided recompile, the old pool travels with the old
// state. Once every goroutine that loaded the old state has finished its call
// (returning its Shared back to the old pool), the old state — together with
// its pool and all its Shards — becomes unreachable and can be collected by
// the GC as a unit.
type hyperPbParserState struct {
msgType *hyperpb.MessageType
profile *hyperpb.Profile
pool *sync.Pool
}

func newHyperPbParserState(msgType *hyperpb.MessageType, profile *hyperpb.Profile) *hyperPbParserState {
return &hyperPbParserState{
msgType: msgType,
profile: profile,
pool: &sync.Pool{New: func() any { return new(hyperpb.Shared) }},
}
}

type hyperPbParser struct {
state atomic.Pointer[hyperPbParserState]
opts ProfilingOptions
seen atomic.Int64
}

var _ ProtobufDecoder = (*hyperPbParser)(nil)

// WithDecoded implements ProtobufDecoder.
func (p *hyperPbParser) WithDecoded(buf []byte, cb func(msg proto.Message) error) error {
// Load state once so that the Shared obtained below is always returned to
// the same pool it came from, even if the state is swapped mid-call.
state := p.state.Load()
shared := state.pool.Get().(*hyperpb.Shared)
defer func() {
shared.Free()
state.pool.Put(shared)
}()
msg := shared.NewMessage(state.msgType)
if err := msg.Unmarshal(buf, hyperpb.WithRecordProfile(state.profile, p.opts.Rate)); err != nil {
return fmt.Errorf("unmarshalling protobuf message: '%v': %w", state.msgType.Descriptor().FullName(), err)
}
if state.profile != nil && p.opts.RecompileInterval > 0 && p.seen.Add(1)%p.opts.RecompileInterval == 0 {
// Temporarily disable profiling while we recompile (to prevent races
// where multiple goroutines trigger simultaneous recompiles).
temp := newHyperPbParserState(state.msgType, nil)
if p.state.CompareAndSwap(state, temp) {
// Do recompilation in the background — it can be slow.
go func() {
recompiled := state.msgType.Recompile(state.profile)
next := newHyperPbParserState(recompiled, recompiled.NewProfile())
p.state.Store(next)
}()
}
}
return cb(msg)
}
25 changes: 25 additions & 0 deletions internal/impl/protobuf/common/decode_hyperpb_fallback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2025 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

//go:build !(arm64 || amd64)

package common

import (
"google.golang.org/protobuf/reflect/protoreflect"
)

// NewHyperPbDecoder falls back to using NewDynamicPbDecoder
// on platforms where hyperpb is not supported.
func NewHyperPbDecoder(
md protoreflect.MessageDescriptor,
opts ProfilingOptions,
) ProtobufDecoder {
return NewDynamicPbDecoder(md, opts)
}
4 changes: 2 additions & 2 deletions internal/impl/protobuf/processor_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func newProtobufToJSONOperator(
if err != nil {
return nil, fmt.Errorf("unable to find protobuf type %q: %w", msg, err)
}
decoder := common.NewDynamicPbDecoder(msgType.Descriptor())
decoder := common.NewHyperPbDecoder(msgType.Descriptor(), common.DefaultProfilingOptions)
opts.Resolver = types
return func(part *service.Message) error {
partBytes, err := part.AsBytes()
Expand Down Expand Up @@ -398,7 +398,7 @@ func newProtobufToJSONBSROperator(
if err != nil {
return nil, fmt.Errorf("unable to find message '%v' definition: %w", msg, err)
}
decoder := common.NewDynamicPbDecoder(d.Descriptor())
decoder := common.NewHyperPbDecoder(d.Descriptor(), common.DefaultProfilingOptions)
opts.Resolver = multiModuleWatcher
return func(part *service.Message) error {
partBytes, err := part.AsBytes()
Expand Down
1 change: 1 addition & 0 deletions public/bundle/enterprise/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 // indirect
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 // indirect
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.11-20240117202343-bf8f65e8876c.1 // indirect
buf.build/go/hyperpb v0.1.3 // indirect
cel.dev/expr v0.25.1 // indirect
cloud.google.com/go/aiplatform v1.121.0 // indirect
cloud.google.com/go/bigquery v1.74.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions public/bundle/free/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 // indirect
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 // indirect
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.11-20240117202343-bf8f65e8876c.1 // indirect
buf.build/go/hyperpb v0.1.3 // indirect
cel.dev/expr v0.25.1 // indirect
cloud.google.com/go/aiplatform v1.121.0 // indirect
cloud.google.com/go/bigquery v1.74.0 // indirect
Expand Down
Loading