Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/impl/nats/input_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
kviFieldKey = "key"
kviFieldIgnoreDeletes = "ignore_deletes"
kviFieldIncludeHistory = "include_history"
kviFieldUpdatesOnly = "updates_only"
kviFieldMetaOnly = "meta_only"
)

Expand Down Expand Up @@ -68,6 +69,10 @@ This input adds the following metadata fields to each message:
Description("Include all the history per key, not just the last one.").
Default(false).
Advanced(),
service.NewBoolField(kviFieldUpdatesOnly).
Description("Only receive updates to keys, skip delivering existing values on connect.").
Default(false).
Advanced(),
service.NewBoolField(kviFieldMetaOnly).
Description("Retrieve only the metadata of the entry").
Default(false).
Expand All @@ -94,6 +99,7 @@ type kvReader struct {
key string
ignoreDeletes bool
includeHistory bool
updatesOnly bool
metaOnly bool

log *service.Logger
Expand Down Expand Up @@ -132,6 +138,10 @@ func newKVReader(conf *service.ParsedConfig, mgr *service.Resources) (*kvReader,
return nil, err
}

if r.updatesOnly, err = conf.FieldBool(kviFieldUpdatesOnly); err != nil {
return nil, err
}

if r.metaOnly, err = conf.FieldBool(kviFieldMetaOnly); err != nil {
return nil, err
}
Expand Down Expand Up @@ -192,6 +202,9 @@ func (r *kvReader) Connect(ctx context.Context) (err error) {
if r.includeHistory {
watchOpts = append(watchOpts, jetstream.IncludeHistory())
}
if r.updatesOnly {
watchOpts = append(watchOpts, jetstream.UpdatesOnly())
}
if r.metaOnly {
watchOpts = append(watchOpts, jetstream.MetaOnly())
}
Expand Down
2 changes: 2 additions & 0 deletions internal/impl/nats/input_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ bucket: testbucket
key: testkey
ignore_deletes: true
include_history: true
updates_only: true
meta_only: true
max_reconnects: -1
auth:
Expand All @@ -54,6 +55,7 @@ auth:
assert.Equal(t, "testkey", e.key)
assert.True(t, e.ignoreDeletes)
assert.True(t, e.includeHistory)
assert.True(t, e.updatesOnly)
assert.True(t, e.metaOnly)
assert.Equal(t, -1, *e.connDetails.maxReconnects)
assert.Equal(t, "test auth n key file", e.connDetails.authConf.NKeyFile)
Expand Down
52 changes: 52 additions & 0 deletions internal/impl/nats/integration_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,56 @@ cache_resources:
assert.JSONEq(t, string(expected), string(msg))
})
})

t.Run("input", func(t *testing.T) {
t.Run("updates_only skips existing values", func(t *testing.T) {
u4, err := uuid.NewV4()
require.NoError(t, err)
js, err := jetstream.New(natsConn)
require.NoError(t, err)

bucket, err := js.CreateKeyValue(t.Context(), jetstream.KeyValueConfig{
Bucket: "bucket-" + u4.String(),
})
require.NoError(t, err)

_, err = bucket.PutString(t.Context(), "existing", "old_value")
require.NoError(t, err)

yaml := fmt.Sprintf(`
bucket: %s
updates_only: true
urls: [%s]`, bucket.Bucket(), fmt.Sprintf("tcp://localhost:%v", resource.GetPort("4222/tcp")))

spec := natsKVInputConfig()
parsed, err := spec.ParseYAML(yaml, nil)
require.NoError(t, err)

input, err := newKVReader(parsed, service.MockResources())
require.NoError(t, err)

err = input.Connect(t.Context())
require.NoError(t, err)
defer input.Close(t.Context())

ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond)
defer cancel()
_, _, err = input.Read(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)

_, err = bucket.PutString(t.Context(), "new_key", "new_value")
require.NoError(t, err)

msg, _, err := input.Read(t.Context())
require.NoError(t, err)

bytes, err := msg.AsBytes()
require.NoError(t, err)
assert.Equal(t, []byte("new_value"), bytes)

key, exists := msg.MetaGet("nats_kv_key")
require.True(t, exists)
assert.Equal(t, "new_key", key)
})
})
}