Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,47 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details.
## thus jetstream_subjects won't work
jetstream_stream = ""

## jetstream durable consumer name
## When set, the plugin creates or binds to a durable consumer that
## survives restarts. Without this, an ephemeral consumer is created
## and messages may be replayed or missed on restart.
# jetstream_durable_name = ""

## jetstream deliver policy
## Controls where in the stream the consumer starts receiving messages.
## One of: "all", "last", "new", "by_start_sequence", "by_start_time"
# jetstream_deliver_policy = "all"

## jetstream start sequence
## Used with jetstream_deliver_policy = "by_start_sequence".
## The consumer will start receiving messages from this stream sequence number.
# jetstream_start_sequence = 0

## jetstream start time (RFC3339)
## Used with jetstream_deliver_policy = "by_start_time".
## The consumer will start receiving messages from this timestamp.
# jetstream_start_time = ""

## jetstream ack wait duration
## How long the server waits for an acknowledgement before redelivering
## a message to the consumer.
# jetstream_ack_wait = "30s"

## jetstream max deliver
## Maximum number of delivery attempts for a single message. Set to -1
## for unlimited redelivery attempts.
# jetstream_max_deliver = -1

## jetstream filter subjects
## Filter specific subjects within a stream (JetStream 2.10+).
## This allows consuming only a subset of subjects from a stream.
# jetstream_filter_subjects = []

## jetstream consumer name
## Explicit consumer name for named consumers (different from durable_name).
## Used for pre-created consumers or when you want a specific consumer identity.
# jetstream_consumer_name = ""

## name a queue group
queue_group = "telegraf_consumers"

Expand Down
91 changes: 85 additions & 6 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ package nats_consumer
import (
"context"
_ "embed"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"

"github.com/nats-io/nats.go"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
Expand All @@ -36,6 +39,14 @@ type NatsConsumer struct {
NkeySeed string `toml:"nkey_seed"`
JsSubjects []string `toml:"jetstream_subjects"`
JsStream string `toml:"jetstream_stream"`
JsDurableName string `toml:"jetstream_durable_name"`
JsDeliverPolicy string `toml:"jetstream_deliver_policy"`
JsStartSequence uint64 `toml:"jetstream_start_sequence"`
JsStartTime string `toml:"jetstream_start_time"`
JsAckWait config.Duration `toml:"jetstream_ack_wait"`
JsMaxDeliver int `toml:"jetstream_max_deliver"`
JsFilterSubjects []string `toml:"jetstream_filter_subjects"`
JsConsumerName string `toml:"jetstream_consumer_name"`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this to an own structure?

PendingMessageLimit int `toml:"pending_message_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Expand Down Expand Up @@ -84,6 +95,29 @@ func (n *NatsConsumer) SetParser(parser telegraf.Parser) {
n.parser = parser
}

func (n *NatsConsumer) Init() error {
validPolicies := []string{"", "all", "last", "new", "by_start_sequence", "by_start_time"}
if !slices.Contains(validPolicies, n.JsDeliverPolicy) {
return fmt.Errorf("invalid jetstream_deliver_policy %q, must be one of: all, last, new, by_start_sequence, by_start_time", n.JsDeliverPolicy)
}
Comment on lines +99 to +102
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following is much easier to read

Suggested change
validPolicies := []string{"", "all", "last", "new", "by_start_sequence", "by_start_time"}
if !slices.Contains(validPolicies, n.JsDeliverPolicy) {
return fmt.Errorf("invalid jetstream_deliver_policy %q, must be one of: all, last, new, by_start_sequence, by_start_time", n.JsDeliverPolicy)
}
switch n.JsDeliverPolicy {
case "":
n.JsDeliverPolicy = "all"
case "all", "last", "new", "by_start_sequence", "by_start_time":
// Do nothing, those are valid
default:
return fmt.Errorf("invalid 'jetstream_deliver_policy' %q", n.JsDeliverPolicy)
}

Same for the checks below.

We really should get rid of all other ways of checking this...


if n.JsDeliverPolicy == "by_start_time" && n.JsStartTime != "" {
if _, err := time.Parse(time.RFC3339, n.JsStartTime); err != nil {
return fmt.Errorf("invalid jetstream_start_time %q: must be RFC3339 format: %w", n.JsStartTime, err)
}
}

if n.JsDeliverPolicy == "by_start_sequence" && n.JsStartSequence == 0 {
return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is \"by_start_sequence\"")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do

Suggested change
return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is \"by_start_sequence\"")
return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is 'by_start_sequence'")

to avoid the escaping. Same below.

}

if n.JsDeliverPolicy == "by_start_time" && n.JsStartTime == "" {
return errors.New("jetstream_start_time must be set when jetstream_deliver_policy is \"by_start_time\"")
}

return nil
}

// Start the nats consumer. Caller must call *NatsConsumer.Stop() to clean up.
func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
n.sem = make(semaphore, n.MaxUndeliveredMessages)
Expand Down Expand Up @@ -157,16 +191,52 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
if n.JsStream != "" {
subOptions = append(subOptions, nats.BindStream(n.JsStream))
}
if n.JsDurableName != "" {
subOptions = append(subOptions, nats.Durable(n.JsDurableName))
}
switch n.JsDeliverPolicy {
case "all", "":
subOptions = append(subOptions, nats.DeliverAll())
case "last":
subOptions = append(subOptions, nats.DeliverLast())
case "new":
subOptions = append(subOptions, nats.DeliverNew())
case "by_start_sequence":
subOptions = append(subOptions, nats.StartSequence(n.JsStartSequence))
case "by_start_time":
st, err := time.Parse(time.RFC3339, n.JsStartTime)
if err != nil {
return err
}
subOptions = append(subOptions, nats.StartTime(st))
}
if n.JsAckWait > 0 {
subOptions = append(subOptions, nats.AckWait(time.Duration(n.JsAckWait)))
}
if n.JsMaxDeliver > 0 {
subOptions = append(subOptions, nats.MaxDeliver(n.JsMaxDeliver))
}
if n.JsConsumerName != "" {
subOptions = append(subOptions, nats.ConsumerName(n.JsConsumerName))
}
n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256))
if connErr != nil {
return connErr
}

if n.jsConn != nil {
for _, jsSub := range n.JsSubjects {
sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
}, subOptions...)
var sub *nats.Subscription
var err error
if n.QueueGroup != "" {
sub, err = n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
}, subOptions...)
} else {
sub, err = n.jsConn.Subscribe(jsSub, func(m *nats.Msg) {
n.in <- m
}, subOptions...)
}
if err != nil {
return err
}
Expand Down Expand Up @@ -322,9 +392,16 @@ func (n *NatsConsumer) clean() {
}

for _, sub := range n.jsSubs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
if n.JsDurableName != "" {
if err := sub.Drain(); err != nil {
n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
Comment on lines +397 to +398
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s", sub.Subject, sub.Queue, err)

}
} else {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
Comment on lines +402 to +403
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", sub.Subject, sub.Queue, err)

}
}
}

Expand All @@ -342,6 +419,8 @@ func init() {
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
JsAckWait: config.Duration(30 * time.Second),
JsMaxDeliver: -1,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this as this will leave the Go default value (0) because you check for > 0 in the code above.

}
})
}
Loading
Loading