diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc index c0901d0a04f..c401c6129dc 100644 --- a/src/commands/cmd_timeseries.cc +++ b/src/commands/cmd_timeseries.cc @@ -29,6 +29,7 @@ constexpr const char *errBadRetention = "Couldn't parse RETENTION"; constexpr const char *errBadChunkSize = "invalid CHUNK_SIZE"; constexpr const char *errBadEncoding = "unknown ENCODING parameter"; constexpr const char *errDuplicatePolicy = "Unknown DUPLICATE_POLICY"; +constexpr const char *errBadIgnore = "Couldn't parse IGNORE"; constexpr const char *errInvalidTimestamp = "invalid timestamp"; constexpr const char *errInvalidValue = "invalid value"; constexpr const char *errOldTimestamp = "Timestamp is older than retention"; @@ -252,6 +253,9 @@ class CommandTSCreateBase : public KeywordCommandBase { registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) { return handleDuplicatePolicy(parser, create_option_.duplicate_policy); }); + registerHandler("IGNORE", [this](TSOptionsParser &parser) { + return handleIgnore(parser, create_option_.ignore_max_time_diff, create_option_.ignore_max_val_diff); + }); registerHandler("LABELS", [this](TSOptionsParser &parser) { return handleLabels(parser, create_option_.labels); }); } @@ -315,6 +319,17 @@ class CommandTSCreateBase : public KeywordCommandBase { return Status::OK(); } + static Status handleIgnore(TSOptionsParser &parser, uint64_t &ignore_max_time_diff, double &ignore_max_val_diff) { + auto parse_time_diff = parser.TakeInt(); + auto parse_val_diff = parser.TakeFloat(); + if (!parse_time_diff.IsOK() || !parse_val_diff.IsOK() || parse_val_diff.GetValue() < 0) { + return {Status::RedisParseErr, errBadIgnore}; + } + ignore_max_time_diff = parse_time_diff.GetValue(); + ignore_max_val_diff = parse_val_diff.GetValue(); + return Status::OK(); + } + TSCreateOption create_option_; }; diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index 6945a0a6d1c..2c8ed160bdb 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -623,13 +623,15 @@ void TimeSeriesMetadata::Encode(std::string *dst) const { PutFixed8(dst, static_cast(duplicate_policy)); PutSizedString(dst, source_key); PutFixed64(dst, last_timestamp); + PutFixed64(dst, ignore_max_time_diff); + PutDouble(dst, ignore_max_val_diff); } rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) { if (auto s = Metadata::Decode(input); !s.ok()) { return s; } - if (input->size() < sizeof(uint64_t) * 2 + sizeof(uint8_t) * 2 + sizeof(uint32_t)) { + if (input->size() < sizeof(uint64_t) * 3 + sizeof(uint8_t) * 2 + sizeof(uint32_t) + sizeof(double)) { return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); } @@ -641,6 +643,8 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) { GetSizedString(input, &source_key_slice); source_key = source_key_slice.ToString(); GetFixed64(input, &last_timestamp); + GetFixed64(input, &ignore_max_time_diff); + GetDouble(input, &ignore_max_val_diff); return rocksdb::Status::OK(); } diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 2e79cf23240..8ea5b415cf3 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -415,6 +415,8 @@ class TimeSeriesMetadata : public Metadata { uint64_t chunk_size; ChunkType chunk_type; DuplicatePolicy duplicate_policy; + uint64_t ignore_max_time_diff; + double ignore_max_val_diff; std::string source_key; uint64_t last_timestamp = 0; // Approximate last timestamp, used for compaction filter @@ -423,14 +425,18 @@ class TimeSeriesMetadata : public Metadata { retention_time(0), chunk_size(0), chunk_type(ChunkType::UNCOMPRESSED), - duplicate_policy(DuplicatePolicy::BLOCK) {} + duplicate_policy(DuplicatePolicy::BLOCK), + ignore_max_time_diff(0), + ignore_max_val_diff(0.0) {} TimeSeriesMetadata(uint64_t retention_time, uint64_t chunk_size, ChunkType chunk_type, DuplicatePolicy duplicate_policy, bool generate_version = true) : Metadata(kRedisTimeSeries, generate_version), retention_time(retention_time), chunk_size(chunk_size), chunk_type(chunk_type), - duplicate_policy(duplicate_policy) {} + duplicate_policy(duplicate_policy), + ignore_max_time_diff(0), + ignore_max_val_diff(0.0) {} void SetSourceKey(Slice key); diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index cebf453dd64..0a737286d73 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -20,6 +20,7 @@ #include "redis_timeseries.h" +#include #include #include "commands/error_constants.h" @@ -515,7 +516,9 @@ TSCreateOption::TSCreateOption() : retention_time(kDefaultRetentionTime), chunk_size(kDefaultChunkSize), chunk_type(kDefaultChunkType), - duplicate_policy(kDefaultDuplicatePolicy) {} + duplicate_policy(kDefaultDuplicatePolicy), + ignore_max_time_diff(0), + ignore_max_val_diff(0.0) {} Status TSMQueryFilterParser::Parse(std::string_view expr) { if (expr.empty()) return Status::OK(); @@ -678,6 +681,8 @@ TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) { metadata.chunk_size = option.chunk_size; metadata.chunk_type = option.chunk_type; metadata.duplicate_policy = option.duplicate_policy; + metadata.ignore_max_time_diff = option.ignore_max_time_diff; + metadata.ignore_max_val_diff = option.ignore_max_val_diff; metadata.SetSourceKey(option.source_key); return metadata; @@ -851,6 +856,44 @@ rocksdb::Status TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl return createTimeSeries(ctx, ns_key, metadata_out, option); } +rocksdb::Status TimeSeries::filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key, + const TimeSeriesMetadata &metadata, SampleBatch *sample_batch) { + if (!metadata.source_key.empty() || metadata.duplicate_policy != DuplicatePolicy::LAST) { + return rocksdb::Status::OK(); + } + + std::vector latest_samples; + auto s = getCommon(ctx, ns_key, metadata, true, &latest_samples); + if (!s.ok() || latest_samples.empty()) { + return s; + } + + auto latest_sample = latest_samples.back(); + auto all_samples = sample_batch->AsSlice(); + auto samples = all_samples.GetSampleSpan(); + auto add_results = all_samples.GetAddResultSpan(); + + for (size_t i = 0; i < samples.size(); i++) { + if (add_results[i].type != TSChunk::AddResultType::kNone) { + continue; + } + + const auto &sample = samples[i]; + if (sample.ts >= latest_sample.ts && sample.ts - latest_sample.ts <= metadata.ignore_max_time_diff && + std::abs(sample.v - latest_sample.v) <= metadata.ignore_max_val_diff) { + add_results[i].type = TSChunk::AddResultType::kSkip; + add_results[i].sample.ts = latest_sample.ts; + continue; + } + + if (sample.ts >= latest_sample.ts) { + latest_sample = sample; + } + } + + return rocksdb::Status::OK(); +} + rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, SampleBatch &sample_batch, DownstreamUpsertArgs *ds_args) { auto batch = storage_->GetWriteBatchBase(); @@ -1930,6 +1973,8 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, TSS rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option); if (!s.ok()) return s; auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : metadata.duplicate_policy); + s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch); + if (!s.ok()) return s; DownstreamUpsertArgs ds_args; s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args); @@ -1950,6 +1995,8 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key, st return s; } auto sample_batch = SampleBatch(std::move(samples), metadata.duplicate_policy); + s = filterSamplesByIgnorePolicy(ctx, ns_key, metadata, &sample_batch); + if (!s.ok()) return s; DownstreamUpsertArgs ds_args; s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args); if (!s.ok()) return s; diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h index 768b8a48f82..e94fc1a4928 100644 --- a/src/types/redis_timeseries.h +++ b/src/types/redis_timeseries.h @@ -151,6 +151,8 @@ struct TSCreateOption { uint64_t chunk_size; TimeSeriesMetadata::ChunkType chunk_type; TimeSeriesMetadata::DuplicatePolicy duplicate_policy; + uint64_t ignore_max_time_diff; + double ignore_max_val_diff; std::string source_key; LabelKVList labels; @@ -257,8 +259,7 @@ enum class TSAlterMode : uint8_t { RETENTION = 1, CHUNK_SIZE = 1 << 1, DUPLICATE_POLICY = 1 << 2, - IGNORE = 1 << 3, - LABELS = 1 << 4, + LABELS = 1 << 3, }; std::vector GroupSamplesAndReduce(const std::vector> &all_samples, @@ -317,6 +318,8 @@ class TimeSeries : public SubKeyScanner { const TSCreateOption *options); rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata_out, const TSCreateOption *option = nullptr); + rocksdb::Status filterSamplesByIgnorePolicy(engine::Context &ctx, const Slice &ns_key, + const TimeSeriesMetadata &metadata, SampleBatch *sample_batch); rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata, LabelKVList *labels); rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc index bcec3707bd3..e5190110592 100644 --- a/src/types/timeseries.cc +++ b/src/types/timeseries.cc @@ -165,7 +165,9 @@ std::vector TSChunk::SampleBatch::GetFinalResults() const { res.resize(add_results_.size()); for (size_t idx = 0; idx < add_results_.size(); idx++) { res[indexes_[idx]] = add_results_[idx]; - res[indexes_[idx]].sample.ts = samples_[idx].ts; + if (!(res[indexes_[idx]].type == AddResultType::kSkip && res[indexes_[idx]].sample.ts != 0)) { + res[indexes_[idx]].sample.ts = samples_[idx].ts; + } } return res; } diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go b/tests/gocase/unit/type/timeseries/timeseries_test.go index 1fd08c2aa22..43a6f908e67 100644 --- a/tests/gocase/unit/type/timeseries/timeseries_test.go +++ b/tests/gocase/unit/type/timeseries/timeseries_test.go @@ -201,6 +201,24 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) { require.ErrorContains(t, rdb.Do(ctx, "ts.add", key, "1000", "13.4").Err(), "update is not supported when DUPLICATE_POLICY is set to BLOCK mode") }) + t.Run("TS.ADD Ignore Option", func(t *testing.T) { + ignoreKey := "test_add_ignore_key" + require.NoError(t, rdb.Del(ctx, ignoreKey).Err()) + require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err()) + + require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val()) + require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1003", "11").Val()) + + res := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{}) + require.Equal(t, 1, len(res)) + assert.Equal(t, []interface{}{int64(1000), float64(10)}, res[0]) + + require.Equal(t, int64(1008), rdb.Do(ctx, "ts.add", ignoreKey, "1008", "20").Val()) + res = rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{}) + require.Equal(t, 2, len(res)) + assert.Equal(t, []interface{}{int64(1008), float64(20)}, res[1]) + }) + t.Run("TS.ADD With Retention", func(t *testing.T) { require.NoError(t, rdb.Del(ctx, key).Err()) require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", "1000").Err()) @@ -231,6 +249,21 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) { assert.Contains(t, res[1], "update is not supported when DUPLICATE_POLICY is set to BLOCK mode") }) + t.Run("TS.MADD Ignore Option", func(t *testing.T) { + ignoreKey := "test_madd_ignore_key" + require.NoError(t, rdb.Del(ctx, ignoreKey).Err()) + require.NoError(t, rdb.Do(ctx, "ts.create", ignoreKey, "duplicate_policy", "last", "ignore", "5", "2").Err()) + require.Equal(t, int64(1000), rdb.Do(ctx, "ts.add", ignoreKey, "1000", "10").Val()) + + res := rdb.Do(ctx, "ts.madd", ignoreKey, "1003", "11", ignoreKey, "1004", "13", ignoreKey, "1007", "14").Val().([]interface{}) + assert.Equal(t, []interface{}{int64(1000), int64(1004), int64(1004)}, res) + + samples := rdb.Do(ctx, "ts.range", ignoreKey, "-", "+").Val().([]interface{}) + require.Equal(t, 2, len(samples)) + assert.Equal(t, []interface{}{int64(1000), float64(10)}, samples[0]) + assert.Equal(t, []interface{}{int64(1004), float64(13)}, samples[1]) + }) + t.Run("TS.MADD Nonexistent Key", func(t *testing.T) { require.NoError(t, rdb.Del(ctx, "nonexistent").Err()) require.NoError(t, rdb.Del(ctx, "existent").Err())