feat(inputs.nats_consumer): add durable consumer support for JetStream#18720
feat(inputs.nats_consumer): add durable consumer support for JetStream#18720aleksclark wants to merge 2 commits into
Conversation
Add support for durable JetStream consumers that survive Telegraf restarts, preventing message loss or replay. New config options: - jetstream_durable_name: bind to a named durable consumer - jetstream_deliver_policy: control where in the stream to start (all, last, new, by_start_sequence, by_start_time) - jetstream_start_sequence / jetstream_start_time: for sequence/time-based delivery - jetstream_ack_wait: server-side ack timeout before redelivery - jetstream_max_deliver: max redelivery attempts per message - jetstream_filter_subjects: filter subjects within a stream - jetstream_consumer_name: explicit consumer name for pre-created consumers When durable_name is set, subscriptions use Drain() instead of Unsubscribe() on shutdown to preserve the consumer on the server. The plugin also now uses js.Subscribe() when no queue group is set, instead of always using QueueSubscribe(). 🐨 Generated with Crush Assisted-by: AWS Claude Opus 4.6 via Crush <crush@charm.land>
|
Thanks so much for the pull request! |
|
!signed-cla |
🐘 Generated with Crush Assisted-by: AWS Claude Opus 4.6 via Crush <crush@charm.land>
|
Download PR build artifacts for linux_amd64.tar.gz, darwin_arm64.tar.gz, and windows_amd64.zip. 📦 Click here to get additional PR build artifactsArtifact URLs |
|
@aleksclark can you please restore the PR description template, especially the AI section, as we cannot review your PR otherwise! |
|
@srebhan sure thing, sorry about that! |
srebhan
left a comment
There was a problem hiding this comment.
Thanks for your contribution @aleksclark! I do have some comments in the code.
One question came up on if jetstream_consumer_name and jetstream_filter_subjects are required for the durable stream feature or if those are separate things. If they are separate, please move them to dedicated PRs!
| JsAckWait config.Duration `toml:"jetstream_ack_wait"` | ||
| JsMaxDeliver int `toml:"jetstream_max_deliver"` | ||
| JsFilterSubjects []string `toml:"jetstream_filter_subjects"` | ||
| JsConsumerName string `toml:"jetstream_consumer_name"` |
There was a problem hiding this comment.
Does it make sense to move this to an own structure?
| validPolicies := []string{"", "all", "last", "new", "by_start_sequence", "by_start_time"} | ||
| if !slices.Contains(validPolicies, n.JsDeliverPolicy) { | ||
| return fmt.Errorf("invalid jetstream_deliver_policy %q, must be one of: all, last, new, by_start_sequence, by_start_time", n.JsDeliverPolicy) | ||
| } |
There was a problem hiding this comment.
The following is much easier to read
| validPolicies := []string{"", "all", "last", "new", "by_start_sequence", "by_start_time"} | |
| if !slices.Contains(validPolicies, n.JsDeliverPolicy) { | |
| return fmt.Errorf("invalid jetstream_deliver_policy %q, must be one of: all, last, new, by_start_sequence, by_start_time", n.JsDeliverPolicy) | |
| } | |
| switch n.JsDeliverPolicy { | |
| case "": | |
| n.JsDeliverPolicy = "all" | |
| case "all", "last", "new", "by_start_sequence", "by_start_time": | |
| // Do nothing, those are valid | |
| default: | |
| return fmt.Errorf("invalid 'jetstream_deliver_policy' %q", n.JsDeliverPolicy) | |
| } |
Same for the checks below.
We really should get rid of all other ways of checking this...
| } | ||
|
|
||
| if n.JsDeliverPolicy == "by_start_sequence" && n.JsStartSequence == 0 { | ||
| return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is \"by_start_sequence\"") |
There was a problem hiding this comment.
I would do
| return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is \"by_start_sequence\"") | |
| return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is 'by_start_sequence'") |
to avoid the escaping. Same below.
| n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s", | ||
| sub.Subject, sub.Queue, err) |
There was a problem hiding this comment.
| n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s", | |
| sub.Subject, sub.Queue, err) | |
| n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s", sub.Subject, sub.Queue, err) |
| n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", | ||
| sub.Subject, sub.Queue, err) |
There was a problem hiding this comment.
| n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", | |
| sub.Subject, sub.Queue, err) | |
| n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", sub.Subject, sub.Queue, err) |
| } | ||
| } | ||
|
|
||
| func TestJetStreamDeliverPolicyNew(t *testing.T) { |
There was a problem hiding this comment.
Same as above!
The question is: Can't we turn this into a table-based test function to avoid the large amount of code duplication?
| ## jetstream durable consumer name | ||
| ## When set, the plugin creates or binds to a durable consumer that | ||
| ## survives restarts. Without this, an ephemeral consumer is created | ||
| ## and messages may be replayed or missed on restart. |
There was a problem hiding this comment.
Please be brief here
| ## jetstream durable consumer name | |
| ## When set, the plugin creates or binds to a durable consumer that | |
| ## survives restarts. Without this, an ephemeral consumer is created | |
| ## and messages may be replayed or missed on restart. | |
| ## When set, the plugin creates or binds to a durable consumer surviving restarts |
Same below. Please remove the variable naming header line as this does not add any information!
| ## jetstream start sequence | ||
| ## Used with jetstream_deliver_policy = "by_start_sequence". | ||
| ## The consumer will start receiving messages from this stream sequence number. | ||
| # jetstream_start_sequence = 0 |
There was a problem hiding this comment.
How would a user know what to put in here? Is this shown somewhere in NATS?
| ## Maximum number of delivery attempts for a single message. Set to -1 | ||
| ## for unlimited redelivery attempts. | ||
| # jetstream_max_deliver = -1 |
There was a problem hiding this comment.
| ## Maximum number of delivery attempts for a single message. Set to -1 | |
| ## for unlimited redelivery attempts. | |
| # jetstream_max_deliver = -1 | |
| ## Maximum number of delivery attempts for a single message (default: unlimited) | |
| # jetstream_max_deliver = 0 |
| ## jetstream filter subjects | ||
| ## Filter specific subjects within a stream (JetStream 2.10+). | ||
| ## This allows consuming only a subset of subjects from a stream. | ||
| # jetstream_filter_subjects = [] | ||
|
|
||
| ## jetstream consumer name | ||
| ## Explicit consumer name for named consumers (different from durable_name). | ||
| ## Used for pre-created consumers or when you want a specific consumer identity. | ||
| # jetstream_consumer_name = "" |
There was a problem hiding this comment.
It seems like these two are unrelated to the durable stream feature, aren't they?
|
Hello! I am closing this issue due to inactivity. I hope you were able to resolve your problem, if not please try posting this question in our Community Slack or Community Forums or provide additional details in this issue and reqeust that it be re-opened. Thank you! |
Summary
Add durable consumer support to the NATS JetStream input plugin, allowing consumers to survive Telegraf restarts without message loss or replay.
The existing JetStream support uses ephemeral push consumers. When Telegraf restarts, the consumer is lost and messages may be replayed from the beginning of the stream or missed entirely. Durable consumers persist on the NATS server and resume from where they left off.
New Configuration Options
jetstream_durable_name""jetstream_deliver_policy"all"jetstream_start_sequence0jetstream_start_time""jetstream_ack_wait30sjetstream_max_deliver-1jetstream_filter_subjects[]jetstream_consumer_name""Behavior Changes
durable_nameis set, usesDrain()instead ofUnsubscribe()on shutdown to preserve the consumerjs.Subscribe()when no queue group is set, instead of always usingQueueSubscribe()Tests
natsDocker imageFiles Changed
plugins/inputs/nats_consumer/nats_consumer.go— New config fields + subscription optionsplugins/inputs/nats_consumer/nats_consumer_test.go— 6 new integration tests (580 lines)plugins/inputs/nats_consumer/sample.conf— New config options documentationplugins/inputs/nats_consumer/README.md— Updated documentationChecklist
Related issues
resolves #18734