-
Notifications
You must be signed in to change notification settings - Fork 5.8k
feat(inputs.nats_consumer): add durable consumer support for JetStream #18720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,14 +4,17 @@ package nats_consumer | |||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||
| _ "embed" | ||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||
| "slices" | ||||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||||
| "sync" | ||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| "github.com/nats-io/nats.go" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| "github.com/influxdata/telegraf" | ||||||||||||||||||||||||||
| "github.com/influxdata/telegraf/config" | ||||||||||||||||||||||||||
| "github.com/influxdata/telegraf/internal" | ||||||||||||||||||||||||||
| "github.com/influxdata/telegraf/plugins/common/tls" | ||||||||||||||||||||||||||
| "github.com/influxdata/telegraf/plugins/inputs" | ||||||||||||||||||||||||||
|
|
@@ -36,6 +39,14 @@ type NatsConsumer struct { | |||||||||||||||||||||||||
| NkeySeed string `toml:"nkey_seed"` | ||||||||||||||||||||||||||
| JsSubjects []string `toml:"jetstream_subjects"` | ||||||||||||||||||||||||||
| JsStream string `toml:"jetstream_stream"` | ||||||||||||||||||||||||||
| JsDurableName string `toml:"jetstream_durable_name"` | ||||||||||||||||||||||||||
| JsDeliverPolicy string `toml:"jetstream_deliver_policy"` | ||||||||||||||||||||||||||
| JsStartSequence uint64 `toml:"jetstream_start_sequence"` | ||||||||||||||||||||||||||
| JsStartTime string `toml:"jetstream_start_time"` | ||||||||||||||||||||||||||
| JsAckWait config.Duration `toml:"jetstream_ack_wait"` | ||||||||||||||||||||||||||
| JsMaxDeliver int `toml:"jetstream_max_deliver"` | ||||||||||||||||||||||||||
| JsFilterSubjects []string `toml:"jetstream_filter_subjects"` | ||||||||||||||||||||||||||
| JsConsumerName string `toml:"jetstream_consumer_name"` | ||||||||||||||||||||||||||
| PendingMessageLimit int `toml:"pending_message_limit"` | ||||||||||||||||||||||||||
| PendingBytesLimit int `toml:"pending_bytes_limit"` | ||||||||||||||||||||||||||
| MaxUndeliveredMessages int `toml:"max_undelivered_messages"` | ||||||||||||||||||||||||||
|
|
@@ -84,6 +95,29 @@ func (n *NatsConsumer) SetParser(parser telegraf.Parser) { | |||||||||||||||||||||||||
| n.parser = parser | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| func (n *NatsConsumer) Init() error { | ||||||||||||||||||||||||||
| validPolicies := []string{"", "all", "last", "new", "by_start_sequence", "by_start_time"} | ||||||||||||||||||||||||||
| if !slices.Contains(validPolicies, n.JsDeliverPolicy) { | ||||||||||||||||||||||||||
| return fmt.Errorf("invalid jetstream_deliver_policy %q, must be one of: all, last, new, by_start_sequence, by_start_time", n.JsDeliverPolicy) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+99
to
+102
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The following is much easier to read
Suggested change
Same for the checks below. We really should get rid of all other ways of checking this... |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if n.JsDeliverPolicy == "by_start_time" && n.JsStartTime != "" { | ||||||||||||||||||||||||||
| if _, err := time.Parse(time.RFC3339, n.JsStartTime); err != nil { | ||||||||||||||||||||||||||
| return fmt.Errorf("invalid jetstream_start_time %q: must be RFC3339 format: %w", n.JsStartTime, err) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if n.JsDeliverPolicy == "by_start_sequence" && n.JsStartSequence == 0 { | ||||||||||||||||||||||||||
| return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is \"by_start_sequence\"") | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would do
Suggested change
to avoid the escaping. Same below. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if n.JsDeliverPolicy == "by_start_time" && n.JsStartTime == "" { | ||||||||||||||||||||||||||
| return errors.New("jetstream_start_time must be set when jetstream_deliver_policy is \"by_start_time\"") | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Start the nats consumer. Caller must call *NatsConsumer.Stop() to clean up. | ||||||||||||||||||||||||||
| func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { | ||||||||||||||||||||||||||
| n.sem = make(semaphore, n.MaxUndeliveredMessages) | ||||||||||||||||||||||||||
|
|
@@ -157,16 +191,52 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { | |||||||||||||||||||||||||
| if n.JsStream != "" { | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.BindStream(n.JsStream)) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if n.JsDurableName != "" { | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.Durable(n.JsDurableName)) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| switch n.JsDeliverPolicy { | ||||||||||||||||||||||||||
| case "all", "": | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.DeliverAll()) | ||||||||||||||||||||||||||
| case "last": | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.DeliverLast()) | ||||||||||||||||||||||||||
| case "new": | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.DeliverNew()) | ||||||||||||||||||||||||||
| case "by_start_sequence": | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.StartSequence(n.JsStartSequence)) | ||||||||||||||||||||||||||
| case "by_start_time": | ||||||||||||||||||||||||||
| st, err := time.Parse(time.RFC3339, n.JsStartTime) | ||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.StartTime(st)) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if n.JsAckWait > 0 { | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.AckWait(time.Duration(n.JsAckWait))) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if n.JsMaxDeliver > 0 { | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.MaxDeliver(n.JsMaxDeliver)) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if n.JsConsumerName != "" { | ||||||||||||||||||||||||||
| subOptions = append(subOptions, nats.ConsumerName(n.JsConsumerName)) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256)) | ||||||||||||||||||||||||||
| if connErr != nil { | ||||||||||||||||||||||||||
| return connErr | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if n.jsConn != nil { | ||||||||||||||||||||||||||
| for _, jsSub := range n.JsSubjects { | ||||||||||||||||||||||||||
| sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) { | ||||||||||||||||||||||||||
| n.in <- m | ||||||||||||||||||||||||||
| }, subOptions...) | ||||||||||||||||||||||||||
| var sub *nats.Subscription | ||||||||||||||||||||||||||
| var err error | ||||||||||||||||||||||||||
| if n.QueueGroup != "" { | ||||||||||||||||||||||||||
| sub, err = n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) { | ||||||||||||||||||||||||||
| n.in <- m | ||||||||||||||||||||||||||
| }, subOptions...) | ||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||
| sub, err = n.jsConn.Subscribe(jsSub, func(m *nats.Msg) { | ||||||||||||||||||||||||||
| n.in <- m | ||||||||||||||||||||||||||
| }, subOptions...) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
@@ -322,9 +392,16 @@ func (n *NatsConsumer) clean() { | |||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| for _, sub := range n.jsSubs { | ||||||||||||||||||||||||||
| if err := sub.Unsubscribe(); err != nil { | ||||||||||||||||||||||||||
| n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", | ||||||||||||||||||||||||||
| sub.Subject, sub.Queue, err) | ||||||||||||||||||||||||||
| if n.JsDurableName != "" { | ||||||||||||||||||||||||||
| if err := sub.Drain(); err != nil { | ||||||||||||||||||||||||||
| n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s", | ||||||||||||||||||||||||||
| sub.Subject, sub.Queue, err) | ||||||||||||||||||||||||||
|
Comment on lines
+397
to
+398
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||
| if err := sub.Unsubscribe(); err != nil { | ||||||||||||||||||||||||||
| n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", | ||||||||||||||||||||||||||
| sub.Subject, sub.Queue, err) | ||||||||||||||||||||||||||
|
Comment on lines
+402
to
+403
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -342,6 +419,8 @@ func init() { | |||||||||||||||||||||||||
| PendingBytesLimit: nats.DefaultSubPendingBytesLimit, | ||||||||||||||||||||||||||
| PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, | ||||||||||||||||||||||||||
| MaxUndeliveredMessages: defaultMaxUndeliveredMessages, | ||||||||||||||||||||||||||
| JsAckWait: config.Duration(30 * time.Second), | ||||||||||||||||||||||||||
| JsMaxDeliver: -1, | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can remove this as this will leave the Go default value ( |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to move this to an own structure?