diff --git a/internal/impl/nats/input_kv.go b/internal/impl/nats/input_kv.go index 14cd27b1f5..f398046985 100644 --- a/internal/impl/nats/input_kv.go +++ b/internal/impl/nats/input_kv.go @@ -30,6 +30,7 @@ const ( kviFieldKey = "key" kviFieldIgnoreDeletes = "ignore_deletes" kviFieldIncludeHistory = "include_history" + kviFieldUpdatesOnly = "updates_only" kviFieldMetaOnly = "meta_only" ) @@ -68,6 +69,10 @@ This input adds the following metadata fields to each message: Description("Include all the history per key, not just the last one."). Default(false). Advanced(), + service.NewBoolField(kviFieldUpdatesOnly). + Description("Only receive updates to keys, skip delivering existing values on connect."). + Default(false). + Advanced(), service.NewBoolField(kviFieldMetaOnly). Description("Retrieve only the metadata of the entry"). Default(false). @@ -94,6 +99,7 @@ type kvReader struct { key string ignoreDeletes bool includeHistory bool + updatesOnly bool metaOnly bool log *service.Logger @@ -132,6 +138,10 @@ func newKVReader(conf *service.ParsedConfig, mgr *service.Resources) (*kvReader, return nil, err } + if r.updatesOnly, err = conf.FieldBool(kviFieldUpdatesOnly); err != nil { + return nil, err + } + if r.metaOnly, err = conf.FieldBool(kviFieldMetaOnly); err != nil { return nil, err } @@ -192,6 +202,9 @@ func (r *kvReader) Connect(ctx context.Context) (err error) { if r.includeHistory { watchOpts = append(watchOpts, jetstream.IncludeHistory()) } + if r.updatesOnly { + watchOpts = append(watchOpts, jetstream.UpdatesOnly()) + } if r.metaOnly { watchOpts = append(watchOpts, jetstream.MetaOnly()) } diff --git a/internal/impl/nats/input_kv_test.go b/internal/impl/nats/input_kv_test.go index c002f3a969..877fbd4cb9 100644 --- a/internal/impl/nats/input_kv_test.go +++ b/internal/impl/nats/input_kv_test.go @@ -34,6 +34,7 @@ bucket: testbucket key: testkey ignore_deletes: true include_history: true +updates_only: true meta_only: true max_reconnects: -1 auth: @@ -54,6 +55,7 @@ auth: assert.Equal(t, "testkey", e.key) assert.True(t, e.ignoreDeletes) assert.True(t, e.includeHistory) + assert.True(t, e.updatesOnly) assert.True(t, e.metaOnly) assert.Equal(t, -1, *e.connDetails.maxReconnects) assert.Equal(t, "test auth n key file", e.connDetails.authConf.NKeyFile) diff --git a/internal/impl/nats/integration_kv_test.go b/internal/impl/nats/integration_kv_test.go index dd5fbd3530..a4aed75198 100644 --- a/internal/impl/nats/integration_kv_test.go +++ b/internal/impl/nats/integration_kv_test.go @@ -386,4 +386,56 @@ cache_resources: assert.JSONEq(t, string(expected), string(msg)) }) }) + + t.Run("input", func(t *testing.T) { + t.Run("updates_only skips existing values", func(t *testing.T) { + u4, err := uuid.NewV4() + require.NoError(t, err) + js, err := jetstream.New(natsConn) + require.NoError(t, err) + + bucket, err := js.CreateKeyValue(t.Context(), jetstream.KeyValueConfig{ + Bucket: "bucket-" + u4.String(), + }) + require.NoError(t, err) + + _, err = bucket.PutString(t.Context(), "existing", "old_value") + require.NoError(t, err) + + yaml := fmt.Sprintf(` + bucket: %s + updates_only: true + urls: [%s]`, bucket.Bucket(), fmt.Sprintf("tcp://localhost:%v", resource.GetPort("4222/tcp"))) + + spec := natsKVInputConfig() + parsed, err := spec.ParseYAML(yaml, nil) + require.NoError(t, err) + + input, err := newKVReader(parsed, service.MockResources()) + require.NoError(t, err) + + err = input.Connect(t.Context()) + require.NoError(t, err) + defer input.Close(t.Context()) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + defer cancel() + _, _, err = input.Read(ctx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + _, err = bucket.PutString(t.Context(), "new_key", "new_value") + require.NoError(t, err) + + msg, _, err := input.Read(t.Context()) + require.NoError(t, err) + + bytes, err := msg.AsBytes() + require.NoError(t, err) + assert.Equal(t, []byte("new_value"), bytes) + + key, exists := msg.MetaGet("nats_kv_key") + require.True(t, exists) + assert.Equal(t, "new_key", key) + }) + }) }