Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/xgodev/boost/bootstrap/function"
"github.com/xgodev/boost/model/errors"
"github.com/xgodev/boost/wrapper/log"
"github.com/xgodev/boost/wrapper/log/contrib/rs/zerolog/v1"

"math"
"time"

Expand Down Expand Up @@ -61,7 +63,7 @@ func (l *Subscriber[T]) Subscribe(ctx context.Context) error {

// processMessage processes each message, retries if needed, and applies backoff
func (l *Subscriber[T]) processMessage(ctx context.Context, msg *pubsub.Message) error {
logger := log.FromContext(ctx).WithTypeOf(*l)
ctx = zerolog.NewLogger().ToContext(ctx)

retryCount := 0

Expand All @@ -80,7 +82,7 @@ func (l *Subscriber[T]) processMessage(ctx context.Context, msg *pubsub.Message)
cancel()
retryCount++

logger.Warnf("msgID=%s handler failed (attempt %d/%d): %v\nPayload: %s", msg.ID, retryCount, l.options.RetryLimit, err, string(msg.Data))
log.Ctx(ctx, *l).Warnf("msgID=%s handler failed (attempt %d/%d): %v\nPayload: %s", msg.ID, retryCount, l.options.RetryLimit, err, string(msg.Data))

// Check retry limit
if l.options.RetryLimit != -1 && retryCount >= l.options.RetryLimit {
Expand All @@ -97,7 +99,6 @@ func (l *Subscriber[T]) processMessage(ctx context.Context, msg *pubsub.Message)
}

cancel()
// Acknowledge the message after successful processing
msg.Ack()
break
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package pubsub

import (
"fmt"
"time"

pb "cloud.google.com/go/pubsub/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
"github.com/xgodev/boost/model/errors"
)

func generateCloudEvent(msg *pb.Message, subscription string) (event.Event, error) {
in := event.New()

ce := false
contentType := "application/json"

// Checks attributes and transforms into a CloudEvent if applicable
for key, value := range msg.Attributes {
switch key {
case "content-type":
in.SetDataContentType(value)
contentType = value
case "ce_specversion":
in.SetSpecVersion(value)
ce = true
case "ce_id":
in.SetID(value)
ce = true
case "ce_source":
in.SetSource(value)
ce = true
case "ce_type":
in.SetType(value)
ce = true
case "ce_time":
ce = true
if t, err := time.Parse(time.RFC3339, value); err == nil {
in.SetTime(t)
}
case "ce_subject":
ce = true
in.SetSubject(value)
default:
in.SetExtension(key, value)
}
}

// If the event does not have a time, populate it with the time the message was published
if in.Time().IsZero() {
in.SetTime(msg.PublishTime)
}

// If it's not a CloudEvent, create one manually
if !ce {
in.SetID(uuid.NewString())
in.SetSource(fmt.Sprintf("pubsub://%s", subscription))
in.SetType("pubsub.message")
}

// Set the message body as CloudEvent data
if err := in.SetData(contentType, msg.Data); err != nil {
return event.Event{}, errors.Wrap(err, errors.Internalf("could not set data from pubsub message: %s", err.Error()))
}

return in, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package pubsub

import (
"cloud.google.com/go/pubsub/v2"
co "github.com/spf13/cobra"
"github.com/xgodev/boost/bootstrap/function"
)

// New returns CmdFunc for cloudevents command.
func New[T any](client *pubsub.Client) function.CmdFunc[T] {
return func(fn function.Handler[T]) *co.Command {
return &co.Command{
Use: "gcp_pubsub_v2",
Short: "gcp_pubsub_v2",
Long: "",
RunE: func(cmd *co.Command, args []string) error {
helper := NewHelper[T](client, fn)
helper.Start()
return nil
},
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pubsub

import (
"github.com/xgodev/boost/bootstrap/function/adapter"
"github.com/xgodev/boost/wrapper/config"
)

const (
root = adapter.Root + ".pubsub.v2"
subscriptions = root + ".subscriptions"
concurrency = root + ".concurrency"
)

func init() {
config.Add(subscriptions, []string{"changeme"}, "pubsub listener topics")
config.Add(concurrency, 10, "pubsub max concurrent workers")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package pubsub

import (
"cloud.google.com/go/pubsub/v2"
"context"
"github.com/xgodev/boost/bootstrap/function"
"github.com/xgodev/boost/wrapper/log"
"sync"
)

// Helper assists in creating event handlers for Pub/Sub with multiple topics.
type Helper[T any] struct {
handler function.Handler[T]
options *Options
client *pubsub.Client
}

// NewHelperWithOptions returns a new Helper with custom options.
func NewHelperWithOptions[T any](client *pubsub.Client, handler function.Handler[T], options *Options) *Helper[T] {
return &Helper[T]{
handler: handler,
options: options,
client: client,
}
}

// NewHelper returns a new Helper with default options.
func NewHelper[T any](client *pubsub.Client, handler function.Handler[T]) *Helper[T] {
opt, err := DefaultOptions()
if err != nil {
log.Fatal(err.Error())
}
return NewHelperWithOptions(client, handler, opt)
}

// Start subscribes to the topics and processes messages concurrently.
func (h *Helper[T]) Start() {
logger := log.WithTypeOf(*h)
var wg sync.WaitGroup

for _, subscription := range h.options.Subscriptions {

wg.Go(func() {
subscriber := NewSubscriber[T](h.client, h.handler, subscription, h.options)

if err := subscriber.Subscribe(context.Background()); err != nil {
logger.Errorf("Failed to subscribe to subscription %s: %v", subscription, err)
} else {
logger.Infof("Successfully subscribed to subscription %s", subscription)
}
})
}

// Wait for all subscriptions to complete
wg.Wait()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package pubsub

import (
"github.com/xgodev/boost/wrapper/config"
)

// Options can be used to create customized handler.
type Options struct {
Subscriptions []string
Concurrency int64 // Max concurrent workers
}

// DefaultOptions returns options based in config.
func DefaultOptions() (*Options, error) {
return config.NewOptionsWithPath[Options](root)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package pubsub

import (
"context"

pb "cloud.google.com/go/pubsub/v2"
"github.com/xgodev/boost/bootstrap/function"
"github.com/xgodev/boost/model/errors"
"github.com/xgodev/boost/wrapper/log"
"github.com/xgodev/boost/wrapper/log/contrib/rs/zerolog/v1"
)

// Subscriber contains the Pub/Sub client, handler function, and options
type Subscriber[T any] struct {
client *pb.Client
handler function.Handler[T]
subscription string
options *Options
}

// NewSubscriber returns a subscriber listener.
func NewSubscriber[T any](client *pb.Client, handler function.Handler[T], subscription string, options *Options) *Subscriber[T] {
return &Subscriber[T]{
client: client,
handler: handler,
subscription: subscription,
options: options,
}
}

// Subscribe subscribes and consumes messages from multiple Pub/Sub topics concurrently
func (l *Subscriber[T]) Subscribe(ctx context.Context) error {
log.Ctx(ctx, *l).Tracef("pubsub - Subscribing to %s", l.subscription)

subscription := l.client.Subscriber(l.subscription)
subscription.ReceiveSettings.MaxOutstandingMessages = int(l.options.Concurrency)

err := subscription.Receive(ctx, func(ctx context.Context, msg *pb.Message) {
err := l.processMessage(ctx, msg)

if err != nil {
msg.Nack()
}

msg.Ack()
})

if err != nil {
log.Ctx(ctx, *l).Fatalf("Failed to start subscription %s: %v", l.subscription, err)
}

return nil
}

// processMessage processes each message, retries if needed, and applies backoff
func (l *Subscriber[T]) processMessage(ctx context.Context, msg *pb.Message) error {
ctx = zerolog.NewLogger().ToContext(ctx)

in, err := generateCloudEvent(msg, l.subscription)
if err != nil {
log.Ctx(ctx, *l).Errorf("could not generate CloudEvent: %s", err)
return errors.Wrap(err, errors.Internalf("could not generate CloudEvent: %s", err.Error()))
}

if _, err := l.handler(ctx, in); err != nil {
a := 1
if msg.DeliveryAttempt != nil {
a = *msg.DeliveryAttempt
}
log.Ctx(ctx, *l).Warnf("msgID=%s handler failed (attempt %d): %v | Payload: %s", msg.ID, a, err, string(msg.Data))
return err
}

return nil
}
Loading
Loading