Skip to content

feat(inputs.nats_consumer): add durable consumer support for JetStream#18720

Closed
aleksclark wants to merge 2 commits into
influxdata:masterfrom
aleksclark:feat/nats-durable-consumer
Closed

feat(inputs.nats_consumer): add durable consumer support for JetStream#18720
aleksclark wants to merge 2 commits into
influxdata:masterfrom
aleksclark:feat/nats-durable-consumer

Conversation

@aleksclark
Copy link
Copy Markdown

@aleksclark aleksclark commented Apr 15, 2026

Summary

Add durable consumer support to the NATS JetStream input plugin, allowing consumers to survive Telegraf restarts without message loss or replay.

The existing JetStream support uses ephemeral push consumers. When Telegraf restarts, the consumer is lost and messages may be replayed from the beginning of the stream or missed entirely. Durable consumers persist on the NATS server and resume from where they left off.

New Configuration Options

Option Type Default Description
jetstream_durable_name string "" Durable consumer name — survives restarts
jetstream_deliver_policy string "all" Where to start: all, last, new, by_start_sequence, by_start_time
jetstream_start_sequence uint64 0 Starting sequence for by_start_sequence policy
jetstream_start_time string "" RFC3339 start time for by_start_time policy
jetstream_ack_wait duration 30s Server-side ack timeout before redelivery
jetstream_max_deliver int -1 Max redelivery attempts (-1 = unlimited)
jetstream_filter_subjects []string [] Filter subjects within a stream
jetstream_consumer_name string "" Explicit consumer name for pre-created consumers

Behavior Changes

  • When durable_name is set, uses Drain() instead of Unsubscribe() on shutdown to preserve the consumer
  • Correctly uses js.Subscribe() when no queue group is set, instead of always using QueueSubscribe()

Tests

  • 6 new integration tests covering: durable restart resilience, deliver policies (new, by_start_sequence), max deliver limits, filter subjects, and durable + queue group combinations
  • All tests use testcontainers with the official nats Docker image

Files Changed

  • plugins/inputs/nats_consumer/nats_consumer.go — New config fields + subscription options
  • plugins/inputs/nats_consumer/nats_consumer_test.go — 6 new integration tests (580 lines)
  • plugins/inputs/nats_consumer/sample.conf — New config options documentation
  • plugins/inputs/nats_consumer/README.md — Updated documentation

Checklist

Related issues

resolves #18734

Add support for durable JetStream consumers that survive Telegraf restarts,
preventing message loss or replay. New config options:

- jetstream_durable_name: bind to a named durable consumer
- jetstream_deliver_policy: control where in the stream to start (all, last, new, by_start_sequence, by_start_time)
- jetstream_start_sequence / jetstream_start_time: for sequence/time-based delivery
- jetstream_ack_wait: server-side ack timeout before redelivery
- jetstream_max_deliver: max redelivery attempts per message
- jetstream_filter_subjects: filter subjects within a stream
- jetstream_consumer_name: explicit consumer name for pre-created consumers

When durable_name is set, subscriptions use Drain() instead of
Unsubscribe() on shutdown to preserve the consumer on the server.
The plugin also now uses js.Subscribe() when no queue group is set,
instead of always using QueueSubscribe().

🐨 Generated with Crush

Assisted-by: AWS Claude Opus 4.6 via Crush <crush@charm.land>
@telegraf-tiger
Copy link
Copy Markdown
Contributor

Thanks so much for the pull request!
🤝 ✒️ Just a reminder that the CLA has not yet been signed, and we'll need it before merging. Please sign the CLA when you get a chance, then post a comment here saying !signed-cla

@telegraf-tiger telegraf-tiger Bot added feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins labels Apr 15, 2026
@aleksclark
Copy link
Copy Markdown
Author

!signed-cla

🐘 Generated with Crush

Assisted-by: AWS Claude Opus 4.6 via Crush <crush@charm.land>
@telegraf-tiger
Copy link
Copy Markdown
Contributor

@srebhan
Copy link
Copy Markdown
Member

srebhan commented Apr 17, 2026

@aleksclark can you please restore the PR description template, especially the AI section, as we cannot review your PR otherwise!

@srebhan srebhan self-assigned this Apr 17, 2026
@srebhan srebhan added the waiting for response waiting for response from contributor label Apr 17, 2026
@aleksclark
Copy link
Copy Markdown
Author

@srebhan sure thing, sorry about that!

@telegraf-tiger telegraf-tiger Bot removed the waiting for response waiting for response from contributor label Apr 17, 2026
Copy link
Copy Markdown
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution @aleksclark! I do have some comments in the code.
One question came up on if jetstream_consumer_name and jetstream_filter_subjects are required for the durable stream feature or if those are separate things. If they are separate, please move them to dedicated PRs!

JsAckWait config.Duration `toml:"jetstream_ack_wait"`
JsMaxDeliver int `toml:"jetstream_max_deliver"`
JsFilterSubjects []string `toml:"jetstream_filter_subjects"`
JsConsumerName string `toml:"jetstream_consumer_name"`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this to an own structure?

Comment on lines +99 to +102
validPolicies := []string{"", "all", "last", "new", "by_start_sequence", "by_start_time"}
if !slices.Contains(validPolicies, n.JsDeliverPolicy) {
return fmt.Errorf("invalid jetstream_deliver_policy %q, must be one of: all, last, new, by_start_sequence, by_start_time", n.JsDeliverPolicy)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following is much easier to read

Suggested change
validPolicies := []string{"", "all", "last", "new", "by_start_sequence", "by_start_time"}
if !slices.Contains(validPolicies, n.JsDeliverPolicy) {
return fmt.Errorf("invalid jetstream_deliver_policy %q, must be one of: all, last, new, by_start_sequence, by_start_time", n.JsDeliverPolicy)
}
switch n.JsDeliverPolicy {
case "":
n.JsDeliverPolicy = "all"
case "all", "last", "new", "by_start_sequence", "by_start_time":
// Do nothing, those are valid
default:
return fmt.Errorf("invalid 'jetstream_deliver_policy' %q", n.JsDeliverPolicy)
}

Same for the checks below.

We really should get rid of all other ways of checking this...

}

if n.JsDeliverPolicy == "by_start_sequence" && n.JsStartSequence == 0 {
return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is \"by_start_sequence\"")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do

Suggested change
return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is \"by_start_sequence\"")
return errors.New("jetstream_start_sequence must be set when jetstream_deliver_policy is 'by_start_sequence'")

to avoid the escaping. Same below.

Comment on lines +397 to +398
n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
n.Log.Errorf("Error draining JetStream subject %s in queue %s: %s", sub.Subject, sub.Queue, err)

Comment on lines +402 to +403
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", sub.Subject, sub.Queue, err)

}
}

func TestJetStreamDeliverPolicyNew(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above!

The question is: Can't we turn this into a table-based test function to avoid the large amount of code duplication?

Comment on lines +25 to +28
## jetstream durable consumer name
## When set, the plugin creates or binds to a durable consumer that
## survives restarts. Without this, an ephemeral consumer is created
## and messages may be replayed or missed on restart.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be brief here

Suggested change
## jetstream durable consumer name
## When set, the plugin creates or binds to a durable consumer that
## survives restarts. Without this, an ephemeral consumer is created
## and messages may be replayed or missed on restart.
## When set, the plugin creates or binds to a durable consumer surviving restarts

Same below. Please remove the variable naming header line as this does not add any information!

Comment on lines +36 to +39
## jetstream start sequence
## Used with jetstream_deliver_policy = "by_start_sequence".
## The consumer will start receiving messages from this stream sequence number.
# jetstream_start_sequence = 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would a user know what to put in here? Is this shown somewhere in NATS?

Comment on lines +52 to +54
## Maximum number of delivery attempts for a single message. Set to -1
## for unlimited redelivery attempts.
# jetstream_max_deliver = -1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## Maximum number of delivery attempts for a single message. Set to -1
## for unlimited redelivery attempts.
# jetstream_max_deliver = -1
## Maximum number of delivery attempts for a single message (default: unlimited)
# jetstream_max_deliver = 0

Comment on lines +56 to +64
## jetstream filter subjects
## Filter specific subjects within a stream (JetStream 2.10+).
## This allows consuming only a subset of subjects from a stream.
# jetstream_filter_subjects = []

## jetstream consumer name
## Explicit consumer name for named consumers (different from durable_name).
## Used for pre-created consumers or when you want a specific consumer identity.
# jetstream_consumer_name = ""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like these two are unrelated to the durable stream feature, aren't they?

@srebhan srebhan added the waiting for response waiting for response from contributor label Apr 27, 2026
@telegraf-tiger
Copy link
Copy Markdown
Contributor

Hello! I am closing this issue due to inactivity. I hope you were able to resolve your problem, if not please try posting this question in our Community Slack or Community Forums or provide additional details in this issue and reqeust that it be re-opened. Thank you!

@telegraf-tiger telegraf-tiger Bot closed this May 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins waiting for response waiting for response from contributor

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(inputs.nats_consumer): add durable consumer support for JetStream

2 participants