From 19893aaebd57d31b38c76f63d2369c5b3737fcce Mon Sep 17 00:00:00 2001 From: Nick Chomey Date: Wed, 10 Sep 2025 20:18:17 +0000 Subject: [PATCH 1/2] update nats.go dep --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index ad58f50d3d..7b42c4e1d3 100644 --- a/go.mod +++ b/go.mod @@ -99,8 +99,8 @@ require ( github.com/microcosm-cc/bluemonday v1.0.27 github.com/microsoft/go-mssqldb v1.8.2 github.com/microsoft/gocosmos v1.1.1 - github.com/nats-io/nats.go v1.37.0 - github.com/nats-io/nkeys v0.4.7 + github.com/nats-io/nats.go v1.45.0 + github.com/nats-io/nkeys v0.4.11 github.com/nats-io/stan.go v0.10.4 github.com/neo4j/neo4j-go-driver/v5 v5.24.0 github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 diff --git a/go.sum b/go.sum index 2a4ebd9ee6..2db2881f77 100644 --- a/go.sum +++ b/go.sum @@ -1662,11 +1662,11 @@ github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/ github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= -github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= +github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0= From 524646f1afddbaaf381bb7fb896f0ffb0358069b Mon Sep 17 00:00:00 2001 From: Nick Chomey Date: Wed, 10 Sep 2025 20:25:03 +0000 Subject: [PATCH 2/2] add createBucket setting to kv components --- internal/impl/nats/cache_kv.go | 37 ++++++++++++++++++++----- internal/impl/nats/docs.go | 9 ++++-- internal/impl/nats/input_kv.go | 25 +++++++++++++++-- internal/impl/nats/output_kv.go | 33 ++++++++++++++++++---- internal/impl/nats/processor_kv.go | 44 +++++++++++++++++++++++------- 5 files changed, 121 insertions(+), 27 deletions(-) diff --git a/internal/impl/nats/cache_kv.go b/internal/impl/nats/cache_kv.go index 34cbe84443..4f8f5b2680 100644 --- a/internal/impl/nats/cache_kv.go +++ b/internal/impl/nats/cache_kv.go @@ -17,6 +17,7 @@ package nats import ( "context" "errors" + "fmt" "sync" "time" @@ -34,7 +35,12 @@ func natsKVCacheConfig() *service.ConfigSpec { Version("4.27.0"). Summary("Cache key/values in a NATS key-value bucket."). Description(connectionNameDescription() + authDescription()). - Fields(kvDocs()...) + Fields(Docs("KV", + service.NewBoolField("create_bucket"). + Description("Whether to automatically create the bucket if it doesn't exist."). + Advanced(). + Default(false), + )...) } func init() { @@ -47,8 +53,9 @@ func init() { } type kvCache struct { - connDetails connectionDetails - bucket string + connDetails connectionDetails + bucket string + createBucket bool log *service.Logger @@ -56,6 +63,7 @@ type kvCache struct { connMut sync.RWMutex natsConn *nats.Conn + js jetstream.JetStream kv jetstream.KeyValue } @@ -74,6 +82,10 @@ func newKVCache(conf *service.ParsedConfig, mgr *service.Resources) (*kvCache, e return nil, err } + if p.createBucket, err = conf.FieldBool("create_bucket"); err != nil { + return nil, err + } + err = p.connect(context.Background()) return p, err } @@ -86,6 +98,7 @@ func (p *kvCache) disconnect() { p.natsConn.Close() p.natsConn = nil } + p.js = nil p.kv = nil } @@ -109,13 +122,23 @@ func (p *kvCache) connect(ctx context.Context) error { } }() - var js jetstream.JetStream - if js, err = jetstream.New(p.natsConn); err != nil { + if p.js, err = jetstream.New(p.natsConn); err != nil { return err } - if p.kv, err = js.KeyValue(ctx, p.bucket); err != nil { - return err + // Check if bucket exists first, create only if config allows + p.kv, err = p.js.KeyValue(ctx, p.bucket) + if err != nil { + if p.createBucket { + if p.kv, err = p.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: p.bucket, + }); err != nil { + return fmt.Errorf("failed to create bucket %s: %w", p.bucket, err) + } + p.log.Infof("Created bucket %s", p.bucket) + } else { + return fmt.Errorf("bucket %s does not exist and create_bucket is false", p.bucket) + } } return nil } diff --git a/internal/impl/nats/docs.go b/internal/impl/nats/docs.go index d91885ef56..539aeba1bb 100644 --- a/internal/impl/nats/docs.go +++ b/internal/impl/nats/docs.go @@ -46,13 +46,18 @@ func outputTracingDocs() *service.ConfigField { return service.NewInjectTracingSpanMappingField().Version(tracingVersion) } -func kvDocs(extraFields ...*service.ConfigField) []*service.ConfigField { +func Docs(natsComponentType string, extraFields ...*service.ConfigField) []*service.ConfigField { // TODO: Use `slices.Concat()` after switching to Go 1.22 + bucketName := "my_bucket" + if natsComponentType == "KV" { + bucketName = "my_kv_bucket" + } + fields := append( connectionHeadFields(), []*service.ConfigField{ service.NewStringField(kvFieldBucket). - Description("The name of the KV bucket.").Example("my_kv_bucket"), + Description("The name of the " + natsComponentType + " bucket.").Example(bucketName), }..., ) fields = append(fields, extraFields...) diff --git a/internal/impl/nats/input_kv.go b/internal/impl/nats/input_kv.go index 63625faaf8..32195b198c 100644 --- a/internal/impl/nats/input_kv.go +++ b/internal/impl/nats/input_kv.go @@ -16,6 +16,7 @@ package nats import ( "context" + "fmt" "sync" "github.com/nats-io/nats.go" @@ -54,11 +55,15 @@ This input adds the following metadata fields to each message: ` + "```" + ` ` + connectionNameDescription() + authDescription()). - Fields(kvDocs([]*service.ConfigField{ + Fields(Docs("KV", []*service.ConfigField{ service.NewStringField(kviFieldKey). Description("Key to watch for updates, can include wildcards."). Default(">"). Example("foo.bar.baz").Example("foo.*.baz").Example("foo.bar.*").Example("foo.>"), + service.NewBoolField("create_bucket"). + Description("Whether to automatically create the bucket if it doesn't exist."). + Advanced(). + Default(false), service.NewAutoRetryNacksToggleField(), service.NewBoolField(kviFieldIgnoreDeletes). Description("Do not send delete markers as messages."). @@ -92,6 +97,7 @@ type kvReader struct { connDetails connectionDetails bucket string key string + createBucket bool ignoreDeletes bool includeHistory bool metaOnly bool @@ -120,6 +126,10 @@ func newKVReader(conf *service.ParsedConfig, mgr *service.Resources) (*kvReader, return nil, err } + if r.createBucket, err = conf.FieldBool("create_bucket"); err != nil { + return nil, err + } + if r.key, err = conf.FieldString(kviFieldKey); err != nil { return nil, err } @@ -167,9 +177,20 @@ func (r *kvReader) Connect(ctx context.Context) (err error) { return err } + // Check if bucket exists first, create only if config allows kv, err := js.KeyValue(ctx, r.bucket) if err != nil { - return err + if r.createBucket { + kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: r.bucket, + }) + if err != nil { + return fmt.Errorf("failed to create bucket %s: %w", r.bucket, err) + } + r.log.Infof("Created bucket %s", r.bucket) + } else { + return fmt.Errorf("bucket %s does not exist and create_bucket is false", r.bucket) + } } var watchOpts []jetstream.WatchOpt diff --git a/internal/impl/nats/output_kv.go b/internal/impl/nats/output_kv.go index 84a78cf9c8..60474aa1e3 100644 --- a/internal/impl/nats/output_kv.go +++ b/internal/impl/nats/output_kv.go @@ -16,6 +16,7 @@ package nats import ( "context" + "fmt" "sync" "github.com/nats-io/nats.go" @@ -42,12 +43,16 @@ xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions], you to create a unique key for each message. ` + connectionNameDescription() + authDescription()). - Fields(kvDocs([]*service.ConfigField{ + Fields(Docs("KV", []*service.ConfigField{ service.NewInterpolatedStringField(kvoFieldKey). Description("The key for each message."). Example("foo"). Example("foo.bar.baz"). Example(`foo.${! json("meta.type") }`), + service.NewBoolField("create_bucket"). + Description("Whether to automatically create the bucket if it doesn't exist."). + Advanced(). + Default(false), service.NewOutputMaxInFlightField().Default(1024), }...)...) } @@ -68,10 +73,11 @@ func init() { //------------------------------------------------------------------------------ type kvOutput struct { - connDetails connectionDetails - bucket string - key *service.InterpolatedString - keyRaw string + connDetails connectionDetails + bucket string + key *service.InterpolatedString + keyRaw string + createBucket bool log *service.Logger @@ -97,6 +103,10 @@ func newKVOutput(conf *service.ParsedConfig, mgr *service.Resources) (*kvOutput, return nil, err } + if kv.createBucket, err = conf.FieldBool("create_bucket"); err != nil { + return nil, err + } + if kv.keyRaw, err = conf.FieldString(kvoFieldKey); err != nil { return nil, err } @@ -134,9 +144,20 @@ func (kv *kvOutput) Connect(ctx context.Context) (err error) { return err } + // Check if bucket exists first, create only if config allows kv.keyValue, err = jsc.KeyValue(ctx, kv.bucket) if err != nil { - return err + if kv.createBucket { + kv.keyValue, err = jsc.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: kv.bucket, + }) + if err != nil { + return fmt.Errorf("failed to create bucket %s: %w", kv.bucket, err) + } + kv.log.Infof("Created bucket %s", kv.bucket) + } else { + return fmt.Errorf("bucket %s does not exist and create_bucket is false", kv.bucket) + } } kv.natsConn = natsConn diff --git a/internal/impl/nats/processor_kv.go b/internal/impl/nats/processor_kv.go index 9a5725bb0d..ef1a8655a4 100644 --- a/internal/impl/nats/processor_kv.go +++ b/internal/impl/nats/processor_kv.go @@ -16,6 +16,7 @@ package nats import ( "context" + "errors" "fmt" "strconv" "sync" @@ -101,7 +102,7 @@ This processor adds the following metadata fields to each message, depending on ` + "```" + ` ` + connectionNameDescription() + authDescription()). - Fields(kvDocs([]*service.ConfigField{ + Fields(Docs("KV", []*service.ConfigField{ service.NewStringAnnotatedEnumField(kvpFieldOperation, kvpOperations). Description("The operation to perform on the KV bucket."), service.NewInterpolatedStringField(kvpFieldKey). @@ -111,6 +112,10 @@ This processor adds the following metadata fields to each message, depending on Example("foo.*"). Example("foo.>"). Example(`foo.${! json("meta.type") }`).LintRule(`if this == "" {[ "'key' must be set to a non-empty string" ]}`), + service.NewBoolField("create_bucket"). + Description("Whether to automatically create the bucket if it doesn't exist."). + Advanced(). + Default(false), service.NewInterpolatedStringField(kvpFieldRevision). Description("The revision of the key to operate on. Used for `get_revision` and `update` operations."). Example("42"). @@ -137,12 +142,13 @@ func init() { } type kvProcessor struct { - connDetails connectionDetails - bucket string - operation kvpOperationType - key *service.InterpolatedString - revision *service.InterpolatedString - timeout time.Duration + connDetails connectionDetails + bucket string + operation kvpOperationType + key *service.InterpolatedString + revision *service.InterpolatedString + timeout time.Duration + createBucket bool log *service.Logger @@ -168,6 +174,10 @@ func newKVProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*kvProc return nil, err } + if p.createBucket, err = conf.FieldBool("create_bucket"); err != nil { + return nil, err + } + if operation, err := conf.FieldString(kvpFieldOperation); err != nil { return nil, err } else { @@ -395,9 +405,23 @@ func (p *kvProcessor) Connect(ctx context.Context) (err error) { return err } - p.kv, err = js.KeyValue(ctx, p.bucket) - if err != nil { - return err + // Try to get existing bucket first + if p.kv, err = js.KeyValue(ctx, p.bucket); err != nil { + if errors.Is(err, jetstream.ErrBucketNotFound) { + if p.createBucket { + // Create the bucket if it doesn't exist + p.log.Infof("Creating KV bucket: %s", p.bucket) + if p.kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: p.bucket, + }); err != nil { + return fmt.Errorf("failed to create bucket %s: %w", p.bucket, err) + } + } else { + return fmt.Errorf("bucket %s does not exist and create_bucket is false", p.bucket) + } + } else { + return err + } } return nil }