From fc7b6226e1c7f125e72061be4b722d2a89b3148c Mon Sep 17 00:00:00 2001 From: var-nan Date: Fri, 14 Nov 2025 17:54:00 -0600 Subject: [PATCH 1/7] feat(ts): add the support of TWA aggregator to Range and MRange rangeCommon() in redis_timeseries.cc will add two samples, 'prev_sample' and 'next_sample' to the 'res' vector when the aggregator is twa. These two samples are used to compute the area of the polygons that are at the front of the first bucket and at the end of last bucket. prev_sample is the biggest sample in the data with timestamp less than or equal to first sample of filtered range and next_sample is the smallest sample in the data with timestamp greater than or equal to the last sample of filtered range. TODO: test TWA with FILTER_BY_TS/FILTER_BY_VALUE option. --- src/commands/cmd_timeseries.cc | 3 + src/types/redis_timeseries.cc | 114 ++++++++++++++++- src/types/redis_timeseries.h | 1 + tests/cppunit/types/timeseries_test.cc | 117 ++++++++++++++++++ .../unit/type/timeseries/timeseries_test.go | 14 +++ 5 files changed, 246 insertions(+), 3 deletions(-) diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc index 7b502481048..234908ae280 100644 --- a/src/commands/cmd_timeseries.cc +++ b/src/commands/cmd_timeseries.cc @@ -57,6 +57,7 @@ const std::unordered_map kAggregatorTypeMap {TSAggregatorType::MAX, "max"}, {TSAggregatorType::RANGE, "range"}, {TSAggregatorType::COUNT, "count"}, {TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"}, {TSAggregatorType::STD_P, "std.p"}, {TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"}, {TSAggregatorType::VAR_S, "var.s"}, + {TSAggregatorType::TWA, "twa"}, }; const std::unordered_map kGroupReducerTypeMap = { {GroupReducerType::AVG, "avg"}, {GroupReducerType::SUM, "sum"}, {GroupReducerType::MIN, "min"}, @@ -556,6 +557,8 @@ class CommandTSAggregatorBase : public KeywordCommandBase { type = TSAggregatorType::VAR_P; } else if (parser.EatEqICase("VAR.S")) { type = TSAggregatorType::VAR_S; + } else if (parser.EatEqICase("TWA")) { + type = TSAggregatorType::TWA; } else { return {Status::RedisParseErr, "Invalid aggregator type"}; } diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index cb7dde33fb8..86e591d8a48 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -72,10 +72,31 @@ struct Reducer { [](const TSSample &a, const TSSample &b) { return a.v < b.v; }); return max->v - min->v; } + static inline double Twa(nonstd::span samples) { + // Intra bucket area is 0 for single element. + double result = 0; + for (size_t i = 1; i < samples.size(); i++) { + auto t_diff = static_cast(samples[i].ts - samples[i - 1].ts); + // Area of bottom rectangle + Area of above triangle + result += (t_diff * samples[i - 1].v) + (t_diff * (samples[i].v - samples[i - 1].v) * 0.5); + } + return result; + } }; std::vector AggregateSamplesByRangeOption(std::vector samples, const TSRangeOption &option) { const auto &aggregator = option.aggregator; + // Retrieve prev_sample and next_sample from samples + TSSample prev_sample, next_sample; + bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available, next_available; + if (is_twa_aggregator) { + next_sample = samples.back(); + samples.pop_back(); + prev_sample = samples.back(); + samples.pop_back(); + prev_available = (samples.front().ts != prev_sample.ts); + next_available = (samples.back().ts != next_sample.ts); + } std::vector res; if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { res = std::move(samples); @@ -111,6 +132,7 @@ std::vector AggregateSamplesByRangeOption(std::vector sample if (option.is_return_empty && spans[i].empty()) { switch (aggregator.type) { case TSAggregatorType::SUM: + case TSAggregatorType::TWA: case TSAggregatorType::COUNT: sample.v = 0; break; @@ -126,6 +148,71 @@ std::vector AggregateSamplesByRangeOption(std::vector sample } } else if (!spans[i].empty()) { sample.v = aggregator.AggregateSamplesValue(spans[i]); + + if (is_twa_aggregator) { + auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + + /// Computes area of polygon from start of the current bucket to the first sample of the current span. + /// Total Area = Area of bottom rectangle + Area of above triangle. + auto front_area = [](uint64_t bucket_left, const TSSample &prev, const TSSample &curr) { + auto x = static_cast(bucket_left - prev.ts); // Distance from + auto y = static_cast(curr.ts - prev.ts); + auto z = curr.v - prev.v; + auto triangle_area = (z * (y - (x * x) / y)) / 2; + auto rect_area = static_cast(y - x) * prev.v; + return triangle_area + rect_area; + }; + /// Computes area of polygon from the last sample of the current span to the end of current bucket. + /// Total Area = Area of bottom rectangle + Area of above triangle. + auto end_area = [](uint64_t bucket_right, const TSSample &curr, const TSSample &next) { + auto x = static_cast(bucket_right - curr.ts); + auto y = static_cast(next.ts - curr.ts); + auto z = next.v - curr.v; + auto rect_area = x * curr.v; + auto triangle_area = (x * x * z) / (2 * y); + return triangle_area + rect_area; + }; + auto non_empty_left_bucket = [&spans](size_t curr) { + while (--curr && spans[curr].empty()); + return curr; + }; + auto non_empty_right_bucket = [&spans](size_t curr) { + while (++curr < spans.size() && spans[curr].empty()); + return curr; + }; + + // Cut left and right empty regions. + bucket_left = std::max(bucket_left, option.start_ts); + bucket_right = std::min(bucket_right, option.end_ts); + uint64_t l = bucket_left, r = bucket_right; + double area = 0.0; + if (spans.size() == 1) { + area += prev_available ? front_area(bucket_left, prev_sample, spans[i].front()) : 0; + area += next_available ? end_area(bucket_right, spans[i].back(), next_sample) : 0; + l = prev_available ? bucket_left : spans[i].front().ts; + r = next_available ? bucket_right : spans[i].back().ts; + // Edge case: single bucket with only one element. + area += (!prev_available && !next_available && spans[i].size() == 1) ? spans[i][0].v : 0; + } else if (i == 0) { + size_t p = non_empty_right_bucket(i); + area += spans[i].back().ts != bucket_right ? end_area(bucket_right, spans[i].back(), spans[p].front()) : 0; + area += prev_available ? front_area(bucket_left, prev_sample, spans[i].front()) : 0; + l = prev_available ? bucket_left : spans[i].front().ts; + } else if (i == (spans.size() - 1)) { + size_t p = non_empty_left_bucket(i); + area += spans[i].front().ts != bucket_left ? front_area(bucket_left, spans[p].back(), spans[i].front()) : 0; + area += next_available ? end_area(bucket_right, spans[i].back(), next_sample) : 0; + // Edge case: when last bucket contains one sample and its timestamp equals bucket boundary. + area += (spans[i].size() == 1 && spans[i].front().ts == bucket_left && !next_available) ? spans[i][0].v : 0; + r = next_available ? bucket_right : spans[i].back().ts; + } else { + size_t x = non_empty_left_bucket(i), y = non_empty_right_bucket(i); + area += spans[i].front().ts != bucket_left ? front_area(bucket_left, spans[x].back(), spans[i].front()) : 0; + area += spans[i].back().ts != bucket_right ? end_area(bucket_right, spans[i].back(), spans[y].front()) : 0; + } + sample.v += area; + sample.v /= std::max(static_cast(r - l), 1.0); + } } else { continue; } @@ -810,6 +897,9 @@ double TSAggregator::AggregateSamplesValue(nonstd::span samples) case TSAggregatorType::VAR_S: res = Reducer::VarS(samples); break; + case TSAggregatorType::TWA: + res = Reducer::Twa(samples); + break; default: unreachable(); } @@ -1055,18 +1145,24 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke bool has_aggregator = aggregator.type != TSAggregatorType::NONE; if (iter->Valid()) { if (option.count_limit != 0 && !has_aggregator) { - temp_results.reserve(option.count_limit); + temp_results.reserve(option.count_limit + 2); } else { chunk = CreateTSChunkFromData(iter->value()); auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1; auto estimate_chunks = std::min((end_timestamp - start_timestamp) / range, uint64_t(32)); - temp_results.reserve(estimate_chunks * metadata.chunk_size); + temp_results.reserve(estimate_chunks * metadata.chunk_size + 2); } } // Get samples from chunks uint64_t bucket_count = 0; uint64_t last_bucket = 0; bool is_not_enough = true; + // Add these two samples at end when aggregator is TWA. + TSSample prev_sample, next_sample; + prev_sample.ts = TSSample::MAX_TIMESTAMP; + next_sample.ts = TSSample::MAX_TIMESTAMP; + const bool is_twa_aggregator = option.aggregator.type == TSAggregatorType::TWA; + for (; iter->Valid() && is_not_enough; iter->Next()) { chunk = CreateTSChunkFromData(iter->value()); auto it = chunk->CreateIterator(); @@ -1081,7 +1177,12 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke const bool not_time_filtered = option.filter_by_ts.empty() || option.filter_by_ts.count(sample->ts); const bool value_in_range = !option.filter_by_value || (sample->v >= option.filter_by_value->first && sample->v <= option.filter_by_value->second); - + // Record prev and next samples around the filtered range when aggregator is TWA + if (is_twa_aggregator) { + prev_sample = (sample->ts <= start_timestamp) ? *sample : prev_sample; + next_sample = + (sample->ts >= end_timestamp && next_sample.ts == TSSample::MAX_TIMESTAMP) ? *sample : next_sample; + } if (!in_time_range || !not_time_filtered || !value_in_range) { continue; } @@ -1103,6 +1204,13 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke } } + if (is_twa_aggregator) { + // prev_sample might not get initialized, if first element is in first bucket. + prev_sample = prev_sample.ts == TSSample::MAX_TIMESTAMP ? temp_results.front() : prev_sample; + temp_results.push_back(prev_sample); + temp_results.push_back(next_sample); + } + // Process compaction logic *res = AggregateSamplesByRangeOption(std::move(temp_results), option); diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h index 92a10465782..3500d1d816d 100644 --- a/src/types/redis_timeseries.h +++ b/src/types/redis_timeseries.h @@ -53,6 +53,7 @@ enum class TSAggregatorType : uint8_t { STD_S = 10, VAR_P = 11, VAR_S = 12, + TWA = 13, }; inline bool IsIncrementalAggregatorType(TSAggregatorType type) { diff --git a/tests/cppunit/types/timeseries_test.cc b/tests/cppunit/types/timeseries_test.cc index a268e670657..3092a46d89a 100644 --- a/tests/cppunit/types/timeseries_test.cc +++ b/tests/cppunit/types/timeseries_test.cc @@ -345,6 +345,123 @@ TEST_F(TimeSeriesTest, Range) { EXPECT_EQ(res.size(), 1); } +TEST_F(TimeSeriesTest, Twa) { + redis::TSCreateOption option; + option.labels = {{"type", "readings"}, {"name", "instrument"}}; + auto s = ts_db_->Create(*ctx_, key_, option); + EXPECT_TRUE(s.ok()); + + std::vector samples = {{12, 10}, {40, 12}, {380, 13}, {401, 18}, {595, 12}, {924, 13}}; + std::vector results2; + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, key_, samples, &results2); + EXPECT_TRUE(s.ok()); + + std::vector res; + redis::TSRangeOption range_opt; + range_opt.start_ts = 0; + range_opt.end_ts = TSSample::MAX_TIMESTAMP; + range_opt.aggregator.type = redis::TSAggregatorType::TWA; + range_opt.aggregator.bucket_duration = 1000; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res[0].ts, 0); + EXPECT_NEAR(res[0].v, 13.05482456, 1e-5); + + res.clear(); + range_opt.aggregator.bucket_duration = 100; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 5); + EXPECT_NEAR(res[0].v, 11.7419786, 1e-5); + EXPECT_NEAR(res[1].v, 13.382072, 1e-5); + EXPECT_NEAR(res[2].v, 16.483190, 1e-5); + EXPECT_NEAR(res[3].v, 13.3959984, 1e-5); + EXPECT_NEAR(res[4].v, 12.963525, 1e-5); + + res.clear(); + range_opt.aggregator.bucket_duration = 10; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 6); + EXPECT_NEAR(res[0].v, 10.28571, 1e-5); + EXPECT_NEAR(res[1].v, 12.01470, 1e-5); + EXPECT_NEAR(res[2].v, 14.19047, 1e-5); + EXPECT_NEAR(res[3].v, 17.86283, 1e-5); + EXPECT_NEAR(res[4].v, 12.04245, 1e-5); + EXPECT_NEAR(res[5].v, 12.99392, 1e-5); + + res.clear(); + range_opt.start_ts = 100; + range_opt.end_ts = 1000; + range_opt.aggregator.bucket_duration = 5; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 4); + EXPECT_NEAR(res[0].v, 13.59523, 1e-5); + EXPECT_NEAR(res[1].v, 17.92670, 1e-5); + EXPECT_NEAR(res[2].v, 12.00759, 1e-5); + EXPECT_NEAR(res[3].v, 12.99392, 1e-5); + + res.clear(); + range_opt.start_ts = 500; + range_opt.end_ts = 713; + range_opt.aggregator.bucket_duration = 10; + s = ts_db_->Range(*ctx_, key_, range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 1); + EXPECT_NEAR(res[0].v, 12.04245, 1e-5); + + // Two more datasets. + option.labels = {{"type", "numbers"}, {"distribution", "random"}}; + s = ts_db_->Create(*ctx_, "s:a", option); + EXPECT_TRUE(s.ok()); + s = ts_db_->Create(*ctx_, "s:b", option); + EXPECT_TRUE(s.ok()); + + samples = {{100, 20}, {200, 21}, {402, 18}, {600, 22}}; + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, "s:a", samples, &results2); + EXPECT_TRUE(s.ok()); + + samples = {{100, 15}, {300, 16}, {400, 17}, {402, 18}}; + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, "s:b", samples, &results2); + EXPECT_TRUE(s.ok()); + + res.clear(); + range_opt.start_ts = 200; + range_opt.end_ts = 512; + range_opt.aggregator.bucket_duration = 100; + s = ts_db_->Range(*ctx_, "s:a", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 2); + EXPECT_NEAR(res[0].v, 20.25742, 1e-5); + EXPECT_NEAR(res[1].v, 18.97039, 1e-5); + + res.clear(); + s = ts_db_->Range(*ctx_, "s:b", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res[0].ts, 300); + EXPECT_NEAR(res[0].v, 16.5, 1e-5); + EXPECT_EQ(res[1].ts, 400); + EXPECT_NEAR(res[1].v, 17.5, 1e-5); + + res.clear(); + range_opt.start_ts = 200; + range_opt.end_ts = 512; + range_opt.aggregator.bucket_duration = 1000; + s = ts_db_->Range(*ctx_, "s:a", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 1); + EXPECT_NEAR(res[0].v, 19.36289, 1e-5); + + res.clear(); + s = ts_db_->Range(*ctx_, "s:b", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_NEAR(res[0].v, 16.13861, 1e-5); +} + TEST_F(TimeSeriesTest, Get) { redis::TSCreateOption option; auto s = ts_db_->Create(*ctx_, key_, option); diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go b/tests/gocase/unit/type/timeseries/timeseries_test.go index d1ddf9405cf..72216ea7fcb 100644 --- a/tests/gocase/unit/type/timeseries/timeseries_test.go +++ b/tests/gocase/unit/type/timeseries/timeseries_test.go @@ -419,6 +419,20 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) { assert.Equal(t, 1, len(res)) }) + t.Run("TS.RANGE With TWA Aggregation", func(t *testing.T) { + first_key := "first_twa_key" + second_key := "second_twa_key" + require.NoError(t, rdb.Do(ctx, "ts.create", first_key).Err()) + require.NoError(t, rdb.Do(ctx, "ts.create", second_key).Err()) + require.NoError(t, rdb.Do(ctx, "ts.madd", first_key, 100, 20, first_key, 200, 21, first_key, 402, 18, first_key, 600, 22).Err()) + require.NoError(t, rdb.Do(ctx, "ts.madd", second_key, 100, 15, second_key, 300, 16, second_key, 400, 17, second_key, 402, 18).Err()) + + res := rdb.Do(ctx, "ts.range", first_key, 200, 512, "AGGREGATION", "twa", 100).Val().([]interface{}) + require.Equal(t, 2, len(res)) + val := math.Abs((res[0].([]interface{})[1].(float64)) - 20.25742) + assert.True(t, val < 1e-5) + }) + t.Run("TS.GET Basic", func(t *testing.T) { key := "test_get_key" require.NoError(t, rdb.Del(ctx, key).Err()) From 279c9bdd457782ffa0b12d637d9b59398719abcd Mon Sep 17 00:00:00 2001 From: var-nan Date: Mon, 17 Nov 2025 12:20:44 -0600 Subject: [PATCH 2/7] fix TWA aggregator with FILTER_BY_TS/FILTER_BY_VALUE When filtered with FILTER_BY_TS/FILTER_BY_VALUE, the `next` and `prev` samples are discarded while computing the area. --- src/types/redis_timeseries.cc | 6 +++-- tests/cppunit/types/timeseries_test.cc | 35 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index 86e591d8a48..0ee2a97947e 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -90,12 +90,14 @@ std::vector AggregateSamplesByRangeOption(std::vector sample TSSample prev_sample, next_sample; bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available, next_available; if (is_twa_aggregator) { + const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value(); next_sample = samples.back(); samples.pop_back(); prev_sample = samples.back(); samples.pop_back(); - prev_available = (samples.front().ts != prev_sample.ts); - next_available = (samples.back().ts != next_sample.ts); + // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples. + prev_available = discard_boundaries ? false : (samples.front().ts != prev_sample.ts); + next_available = discard_boundaries ? false : (samples.back().ts != next_sample.ts); } std::vector res; if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { diff --git a/tests/cppunit/types/timeseries_test.cc b/tests/cppunit/types/timeseries_test.cc index 3092a46d89a..e94ff189cfd 100644 --- a/tests/cppunit/types/timeseries_test.cc +++ b/tests/cppunit/types/timeseries_test.cc @@ -460,6 +460,41 @@ TEST_F(TimeSeriesTest, Twa) { s = ts_db_->Range(*ctx_, "s:b", range_opt, &res); EXPECT_TRUE(s.ok()); EXPECT_NEAR(res[0].v, 16.13861, 1e-5); + + // Test with FILTER_BY_TS + samples = {{15, 19}, {24, 13}, {25, 15}, {30, 14}, {35, 12}, {32, 19}}; + s = ts_db_->Create(*ctx_, "s:c", option); + EXPECT_TRUE(s.ok()); + results2.clear(); + results2.resize(samples.size()); + s = ts_db_->MAdd(*ctx_, "s:c", samples, &results2); + res.clear(); + std::set filtered_ts = {24, 30, 35}; + range_opt.filter_by_ts.insert(filtered_ts.begin(), filtered_ts.end()); + range_opt.start_ts = 0; + range_opt.end_ts = TSSample::MAX_TIMESTAMP; + range_opt.aggregator.bucket_duration = 10; + s = ts_db_->Range(*ctx_, "s:c", range_opt, &res); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(res.size(), 2); + EXPECT_EQ(res[0].ts, 20); + EXPECT_EQ(res[1].ts, 30); + EXPECT_NEAR(res[0].v, 13.5, 1e-5); + EXPECT_NEAR(res[1].v, 13, 1e-5); + + res.clear(); + range_opt.aggregator.bucket_duration = 100; + s = ts_db_->Range(*ctx_, "s:c", range_opt, &res); + EXPECT_EQ(res[0].ts, 0); + EXPECT_NEAR(res[0].v, 13.27272, 1e-5); + + // Test with FILTER_BY_VALUE + res.clear(); + range_opt.filter_by_ts.clear(); + range_opt.filter_by_value = std::make_pair(15, 19); + s = ts_db_->Range(*ctx_, "s:c", range_opt, &res); + EXPECT_EQ(res[0].ts, 0); + EXPECT_NEAR(res[0].v, 17, 1e-5); } TEST_F(TimeSeriesTest, Get) { From 9d8e076e9a2b79f74a4b1964b570af849f0b13cf Mon Sep 17 00:00:00 2001 From: var-nan Date: Mon, 17 Nov 2025 21:28:48 -0600 Subject: [PATCH 3/7] fix Clang-tidy errors --- src/types/redis_timeseries.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index 0ee2a97947e..4b37d8ce0aa 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -88,7 +88,7 @@ std::vector AggregateSamplesByRangeOption(std::vector sample const auto &aggregator = option.aggregator; // Retrieve prev_sample and next_sample from samples TSSample prev_sample, next_sample; - bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available, next_available; + bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false; if (is_twa_aggregator) { const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value(); next_sample = samples.back(); From 9174002252e5ba1e18e69d8e47710b50c1730588 Mon Sep 17 00:00:00 2001 From: var-nan Date: Sun, 23 Nov 2025 22:49:53 -0600 Subject: [PATCH 4/7] fix: correct results when EMPTY flag is specified. some code is refactored for readability. --- src/types/redis_timeseries.cc | 216 +++++++++++++++---------- tests/cppunit/types/timeseries_test.cc | 20 +++ 2 files changed, 155 insertions(+), 81 deletions(-) diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index 4b37d8ce0aa..180241a6728 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -72,7 +72,7 @@ struct Reducer { [](const TSSample &a, const TSSample &b) { return a.v < b.v; }); return max->v - min->v; } - static inline double Twa(nonstd::span samples) { + static inline double Area(nonstd::span samples) { // Intra bucket area is 0 for single element. double result = 0; for (size_t i = 1; i < samples.size(); i++) { @@ -86,7 +86,51 @@ struct Reducer { std::vector AggregateSamplesByRangeOption(std::vector samples, const TSRangeOption &option) { const auto &aggregator = option.aggregator; - // Retrieve prev_sample and next_sample from samples + + auto get_bucket_ts = [&](uint64_t left) -> uint64_t { + using BucketTimestampType = TSRangeOption::BucketTimestampType; + switch (option.bucket_timestamp_type) { + case BucketTimestampType::Start: + return left; + case BucketTimestampType::End: + return left + aggregator.bucket_duration; + case BucketTimestampType::Mid: + return left + aggregator.bucket_duration / 2; + default: + unreachable(); + } + return 0; + }; + /// Computes area of polygon from start of the current bucket to the first sample of the current span. + /// Total Area = Area of bottom rectangle + Area of above triangle. + auto front_area = [](uint64_t bucket_left, const TSSample &prev, const TSSample &curr) { + auto x = static_cast(bucket_left - prev.ts); // Distance from + auto y = static_cast(curr.ts - prev.ts); + auto z = curr.v - prev.v; + auto triangle_area = (z * (y - (x * x) / y)) / 2; + auto rect_area = static_cast(y - x) * prev.v; + return triangle_area + rect_area; + }; + /// Computes area of polygon from the last sample of the current span to the end of current bucket. + /// Total Area = Area of bottom rectangle + Area of above triangle. + auto end_area = [](uint64_t bucket_right, const TSSample &curr, const TSSample &next) { + auto x = static_cast(bucket_right - curr.ts); + auto y = static_cast(next.ts - curr.ts); + auto z = next.v - curr.v; + auto rect_area = x * curr.v; + auto triangle_area = (x * x * z) / (2 * y); + return triangle_area + rect_area; + }; + // Computes the TWA of empty bucket from its neighbor samples. + auto empty_bucket_twa = [&front_area](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right, + const TSSample &right_nb) { + // Area of empty bucket = Area from left_nb to bucket_right - Area from left_nb to bucket_left + auto f_area = front_area(bucket_left, left_nb, right_nb); + auto s_area = front_area(bucket_right, left_nb, right_nb); + return (f_area - s_area) / static_cast(bucket_right - bucket_left); + }; + + // Retrieve prev_sample and next_sample from samples when TWA aggregation. TSSample prev_sample, next_sample; bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false; if (is_twa_aggregator) { @@ -96,31 +140,75 @@ std::vector AggregateSamplesByRangeOption(std::vector sample prev_sample = samples.back(); samples.pop_back(); // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples. - prev_available = discard_boundaries ? false : (samples.front().ts != prev_sample.ts); - next_available = discard_boundaries ? false : (samples.back().ts != next_sample.ts); + prev_available = discard_boundaries ? false : !samples.empty() && (samples.front().ts != prev_sample.ts); + next_available = discard_boundaries ? false : !samples.empty() && (samples.back().ts != next_sample.ts); } std::vector res; - if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { + if (is_twa_aggregator && option.is_return_empty && samples.empty()) { + const bool early_return = prev_sample.ts == TSSample::MAX_TIMESTAMP || next_sample.ts == TSSample::MAX_TIMESTAMP || + prev_sample.ts == next_sample.ts; // When filter entire range lies left or right to data. + if (early_return) { + res = std::move(samples); + return res; + } + // Both prev and next should be available. Total range should be in between the prev and next samples. + assert(prev_sample.ts <= option.start_ts && option.end_ts <= next_sample.ts); + + uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / option.aggregator.bucket_duration; + res.reserve(n_buckets_estimate + 1); + uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(option.start_ts); + uint64_t bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + for (size_t i = 0; i < n_buckets_estimate; i++) { + bucket_left = std::max(bucket_left, option.start_ts); + bucket_right = std::min(bucket_right, option.end_ts); + TSSample sample; + sample.ts = bucket_left; + sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample); + res.push_back(sample); + bucket_left = bucket_right; + bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + } + // Process last bucket. + TSSample sample; + sample.ts = bucket_left; + if (bucket_left == option.end_ts) { // Calculate last sample. + double y_diff = next_sample.v - prev_sample.v; + double x_diff = static_cast(next_sample.ts - prev_sample.ts); + double x_prime_diff = static_cast(option.end_ts - prev_sample.ts); + double y_prime_diff = (x_prime_diff * y_diff) / x_diff; + sample.v = y_prime_diff + prev_sample.v; + } else { + sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample); + } + res.push_back(sample); + return res; + } else if (aggregator.type == TSAggregatorType::NONE || samples.empty()) { res = std::move(samples); return res; } + auto spans = aggregator.SplitSamplesToBuckets(samples); + res.reserve(spans.size()); - auto get_bucket_ts = [&](uint64_t left) -> uint64_t { - using BucketTimestampType = TSRangeOption::BucketTimestampType; - switch (option.bucket_timestamp_type) { - case BucketTimestampType::Start: - return left; - case BucketTimestampType::End: - return left + aggregator.bucket_duration; - case BucketTimestampType::Mid: - return left + aggregator.bucket_duration / 2; - default: - unreachable(); - } - return 0; + auto non_empty_left_bucket_idx = [&spans](size_t curr) { + while (--curr && spans[curr].empty()); + return curr; }; - res.reserve(spans.size()); + auto non_empty_right_bucket_idx = [&spans](size_t curr) { + while (++curr < spans.size() && spans[curr].empty()); + return curr; + }; + + std::vector> neighbors; + neighbors.reserve(spans.size()); + for (size_t i = 0; i < spans.size(); i++) { + TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : prev_sample; + TSSample next = (i != (spans.size() - 1)) ? spans[non_empty_right_bucket_idx(i)].front() : next_sample; + neighbors.emplace_back(prev, next); + assert(spans[i].empty() || + (neighbors[i].first.ts <= spans[i].front().ts && spans[i].back().ts <= neighbors[i].second.ts)); + } // Should follow: neighbors[i].first <= span[i].front() <= span[i].back() <= neighbors[i].second; + uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts); for (size_t i = 0; i < spans.size(); i++) { if (option.count_limit && res.size() >= option.count_limit) { @@ -134,10 +222,17 @@ std::vector AggregateSamplesByRangeOption(std::vector sample if (option.is_return_empty && spans[i].empty()) { switch (aggregator.type) { case TSAggregatorType::SUM: - case TSAggregatorType::TWA: case TSAggregatorType::COUNT: sample.v = 0; break; + case TSAggregatorType::TWA: + if ((i == 0 && !prev_available) || (i == spans.size() - 1 && !next_available)) { + sample.v = TSSample::NAN_VALUE; + } else { + auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); + sample.v = empty_bucket_twa(neighbors[i].first, bucket_left, bucket_right, neighbors[i].second); + } + break; case TSAggregatorType::LAST: if (i == 0 || spans[i - 1].empty()) { sample.v = TSSample::NAN_VALUE; @@ -153,67 +248,21 @@ std::vector AggregateSamplesByRangeOption(std::vector sample if (is_twa_aggregator) { auto bucket_right = aggregator.CalculateAlignedBucketRight(bucket_left); - - /// Computes area of polygon from start of the current bucket to the first sample of the current span. - /// Total Area = Area of bottom rectangle + Area of above triangle. - auto front_area = [](uint64_t bucket_left, const TSSample &prev, const TSSample &curr) { - auto x = static_cast(bucket_left - prev.ts); // Distance from - auto y = static_cast(curr.ts - prev.ts); - auto z = curr.v - prev.v; - auto triangle_area = (z * (y - (x * x) / y)) / 2; - auto rect_area = static_cast(y - x) * prev.v; - return triangle_area + rect_area; - }; - /// Computes area of polygon from the last sample of the current span to the end of current bucket. - /// Total Area = Area of bottom rectangle + Area of above triangle. - auto end_area = [](uint64_t bucket_right, const TSSample &curr, const TSSample &next) { - auto x = static_cast(bucket_right - curr.ts); - auto y = static_cast(next.ts - curr.ts); - auto z = next.v - curr.v; - auto rect_area = x * curr.v; - auto triangle_area = (x * x * z) / (2 * y); - return triangle_area + rect_area; - }; - auto non_empty_left_bucket = [&spans](size_t curr) { - while (--curr && spans[curr].empty()); - return curr; - }; - auto non_empty_right_bucket = [&spans](size_t curr) { - while (++curr < spans.size() && spans[curr].empty()); - return curr; - }; - - // Cut left and right empty regions. + // Cut left and right empty regions. In case of first and last bucket. bucket_left = std::max(bucket_left, option.start_ts); bucket_right = std::min(bucket_right, option.end_ts); uint64_t l = bucket_left, r = bucket_right; - double area = 0.0; - if (spans.size() == 1) { - area += prev_available ? front_area(bucket_left, prev_sample, spans[i].front()) : 0; - area += next_available ? end_area(bucket_right, spans[i].back(), next_sample) : 0; - l = prev_available ? bucket_left : spans[i].front().ts; - r = next_available ? bucket_right : spans[i].back().ts; - // Edge case: single bucket with only one element. - area += (!prev_available && !next_available && spans[i].size() == 1) ? spans[i][0].v : 0; - } else if (i == 0) { - size_t p = non_empty_right_bucket(i); - area += spans[i].back().ts != bucket_right ? end_area(bucket_right, spans[i].back(), spans[p].front()) : 0; - area += prev_available ? front_area(bucket_left, prev_sample, spans[i].front()) : 0; - l = prev_available ? bucket_left : spans[i].front().ts; - } else if (i == (spans.size() - 1)) { - size_t p = non_empty_left_bucket(i); - area += spans[i].front().ts != bucket_left ? front_area(bucket_left, spans[p].back(), spans[i].front()) : 0; - area += next_available ? end_area(bucket_right, spans[i].back(), next_sample) : 0; - // Edge case: when last bucket contains one sample and its timestamp equals bucket boundary. - area += (spans[i].size() == 1 && spans[i].front().ts == bucket_left && !next_available) ? spans[i][0].v : 0; - r = next_available ? bucket_right : spans[i].back().ts; - } else { - size_t x = non_empty_left_bucket(i), y = non_empty_right_bucket(i); - area += spans[i].front().ts != bucket_left ? front_area(bucket_left, spans[x].back(), spans[i].front()) : 0; - area += spans[i].back().ts != bucket_right ? end_area(bucket_right, spans[i].back(), spans[y].front()) : 0; - } - sample.v += area; - sample.v /= std::max(static_cast(r - l), 1.0); + // Front area available iff prev_sample < bucket_left < span[i].front(). Similarly for end_area. + bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left); + bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts); + double area = 0; + area += front_available ? front_area(bucket_left, neighbors[i].first, spans[i].front()) : 0.0; + area += back_available ? end_area(bucket_right, spans[i].back(), neighbors[i].second) : 0.0; + // Edge case: If single bucket and it contains only one element. + area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0; + l = front_available ? bucket_left : spans[i].front().ts; + r = back_available ? bucket_right : spans[i].back().ts; + sample.v = (sample.v + area) / std::max(static_cast(r - l), 1.0); } } else { continue; @@ -900,7 +949,7 @@ double TSAggregator::AggregateSamplesValue(nonstd::span samples) res = Reducer::VarS(samples); break; case TSAggregatorType::TWA: - res = Reducer::Twa(samples); + res = Reducer::Area(samples); break; default: unreachable(); @@ -1207,8 +1256,13 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke } if (is_twa_aggregator) { - // prev_sample might not get initialized, if first element is in first bucket. - prev_sample = prev_sample.ts == TSSample::MAX_TIMESTAMP ? temp_results.front() : prev_sample; + // If the first element of the series is in first bucket, prev_sample might not get initialized. Similarly if the + // last element in the series is in last bucket, next_sample might not get initialized. If the series is empty, + // prev_sample and next_sample points to infinity (MAX_TIMESTAMP) + prev_sample = + prev_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.front() : prev_sample; + next_sample = + next_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.back() : next_sample; temp_results.push_back(prev_sample); temp_results.push_back(next_sample); } diff --git a/tests/cppunit/types/timeseries_test.cc b/tests/cppunit/types/timeseries_test.cc index e94ff189cfd..08bd175117b 100644 --- a/tests/cppunit/types/timeseries_test.cc +++ b/tests/cppunit/types/timeseries_test.cc @@ -495,6 +495,26 @@ TEST_F(TimeSeriesTest, Twa) { s = ts_db_->Range(*ctx_, "s:c", range_opt, &res); EXPECT_EQ(res[0].ts, 0); EXPECT_NEAR(res[0].v, 17, 1e-5); + + // Test with EMPTY filter + samples = {{100, 25}, {1000, 12}}; + res.clear(); + results2.clear(); + redis::TSRangeOption range_opt2; + range_opt2.aggregator.type = redis::TSAggregatorType::TWA; + range_opt2.start_ts = 0; + range_opt2.end_ts = TSSample::MAX_TIMESTAMP; + range_opt2.aggregator.bucket_duration = 100; + range_opt2.is_return_empty = true; + EXPECT_TRUE((ts_db_->MAdd(*ctx_, "s:c", samples, &results2)).ok()); + std::vector> results = { + {0, 17.7941176}, {100, 24.27777}, {200, 22.833333}, {300, 21.388888}, {400, 19.944444}, {500, 18.5}, + {600, 17.0555555}, {700, 15.611111}, {800, 14.1666666}, {900, 12.7222222}, {1000, 12}}; + EXPECT_TRUE((ts_db_->Range(*ctx_, "s:c", range_opt2, &res)).ok()); + for (size_t i = 0; i < results.size(); i++) { + EXPECT_EQ(res[i].ts, results[i].first); + EXPECT_NEAR(res[i].v, results[i].second, 1e-4); + } } TEST_F(TimeSeriesTest, Get) { From ae2268fe9d2e27d2bd1c032c5c4f86d4ffdb499f Mon Sep 17 00:00:00 2001 From: DeEMO Date: Wed, 26 Nov 2025 14:22:29 +0000 Subject: [PATCH 5/7] fix clang-tidy --- src/types/redis_timeseries.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index 742a2f9c584..f3444fd70a0 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -173,8 +173,8 @@ std::vector AggregateSamplesByRangeOption(std::vector sample sample.ts = bucket_left; if (bucket_left == option.end_ts) { // Calculate last sample. double y_diff = next_sample.v - prev_sample.v; - double x_diff = static_cast(next_sample.ts - prev_sample.ts); - double x_prime_diff = static_cast(option.end_ts - prev_sample.ts); + auto x_diff = static_cast(next_sample.ts - prev_sample.ts); + auto x_prime_diff = static_cast(option.end_ts - prev_sample.ts); double y_prime_diff = (x_prime_diff * y_diff) / x_diff; sample.v = y_prime_diff + prev_sample.v; } else { @@ -251,7 +251,6 @@ std::vector AggregateSamplesByRangeOption(std::vector sample // Cut left and right empty regions. In case of first and last bucket. bucket_left = std::max(bucket_left, option.start_ts); bucket_right = std::min(bucket_right, option.end_ts); - uint64_t l = bucket_left, r = bucket_right; // Front area available iff prev_sample < bucket_left < span[i].front(). Similarly for end_area. bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left); bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts); @@ -260,8 +259,8 @@ std::vector AggregateSamplesByRangeOption(std::vector sample area += back_available ? end_area(bucket_right, spans[i].back(), neighbors[i].second) : 0.0; // Edge case: If single bucket and it contains only one element. area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0; - l = front_available ? bucket_left : spans[i].front().ts; - r = back_available ? bucket_right : spans[i].back().ts; + uint64_t l = front_available ? bucket_left : spans[i].front().ts; + uint64_t r = back_available ? bucket_right : spans[i].back().ts; sample.v = (sample.v + area) / std::max(static_cast(r - l), 1.0); } } else { From 021bae7e6dc0160e4f4b6fa59bb6d3de3170a30a Mon Sep 17 00:00:00 2001 From: var-nan Date: Thu, 27 Nov 2025 09:28:10 -0600 Subject: [PATCH 6/7] TWA code refactor --- src/types/redis_timeseries.cc | 68 +++++++++++++++-------------------- 1 file changed, 29 insertions(+), 39 deletions(-) diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index f3444fd70a0..28222fbf586 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -101,33 +101,23 @@ std::vector AggregateSamplesByRangeOption(std::vector sample } return 0; }; - /// Computes area of polygon from start of the current bucket to the first sample of the current span. - /// Total Area = Area of bottom rectangle + Area of above triangle. - auto front_area = [](uint64_t bucket_left, const TSSample &prev, const TSSample &curr) { - auto x = static_cast(bucket_left - prev.ts); // Distance from - auto y = static_cast(curr.ts - prev.ts); - auto z = curr.v - prev.v; - auto triangle_area = (z * (y - (x * x) / y)) / 2; - auto rect_area = static_cast(y - x) * prev.v; - return triangle_area + rect_area; - }; - /// Computes area of polygon from the last sample of the current span to the end of current bucket. - /// Total Area = Area of bottom rectangle + Area of above triangle. - auto end_area = [](uint64_t bucket_right, const TSSample &curr, const TSSample &next) { - auto x = static_cast(bucket_right - curr.ts); - auto y = static_cast(next.ts - curr.ts); - auto z = next.v - curr.v; - auto rect_area = x * curr.v; - auto triangle_area = (x * x * z) / (2 * y); - return triangle_area + rect_area; + // Linear interpolation. + auto interpolate_sample = [](const TSSample &left_nb, uint64_t ts, const TSSample &right_nb) { + auto y_diff = right_nb.v - left_nb.v; + auto x_diff = static_cast(right_nb.ts - left_nb.ts); + auto x_diff_prime = static_cast(ts - left_nb.ts); + auto y_diff_prime = (x_diff_prime * y_diff) / x_diff; + TSSample sample; + sample.ts = ts; + sample.v = y_diff_prime + left_nb.v; + return sample; }; // Computes the TWA of empty bucket from its neighbor samples. - auto empty_bucket_twa = [&front_area](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right, - const TSSample &right_nb) { - // Area of empty bucket = Area from left_nb to bucket_right - Area from left_nb to bucket_left - auto f_area = front_area(bucket_left, left_nb, right_nb); - auto s_area = front_area(bucket_right, left_nb, right_nb); - return (f_area - s_area) / static_cast(bucket_right - bucket_left); + auto empty_bucket_twa = [&interpolate_sample](const TSSample &left_nb, uint64_t bucket_left, uint64_t bucket_right, + const TSSample &right_nb) { + auto left = interpolate_sample(left_nb, bucket_left, right_nb); + auto right = interpolate_sample(left_nb, bucket_right, right_nb); + return Reducer::Area(std::array{left, right}) / static_cast(bucket_right - bucket_left); }; // Retrieve prev_sample and next_sample from samples when TWA aggregation. @@ -151,8 +141,6 @@ std::vector AggregateSamplesByRangeOption(std::vector sample res = std::move(samples); return res; } - // Both prev and next should be available. Total range should be in between the prev and next samples. - assert(prev_sample.ts <= option.start_ts && option.end_ts <= next_sample.ts); uint64_t n_buckets_estimate = (option.end_ts - option.start_ts) / option.aggregator.bucket_duration; res.reserve(n_buckets_estimate + 1); @@ -172,11 +160,7 @@ std::vector AggregateSamplesByRangeOption(std::vector sample TSSample sample; sample.ts = bucket_left; if (bucket_left == option.end_ts) { // Calculate last sample. - double y_diff = next_sample.v - prev_sample.v; - auto x_diff = static_cast(next_sample.ts - prev_sample.ts); - auto x_prime_diff = static_cast(option.end_ts - prev_sample.ts); - double y_prime_diff = (x_prime_diff * y_diff) / x_diff; - sample.v = y_prime_diff + prev_sample.v; + sample.v = interpolate_sample(prev_sample, option.end_ts, next_sample).v; } else { sample.v = empty_bucket_twa(prev_sample, bucket_left, bucket_right, next_sample); } @@ -205,9 +189,7 @@ std::vector AggregateSamplesByRangeOption(std::vector sample TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : prev_sample; TSSample next = (i != (spans.size() - 1)) ? spans[non_empty_right_bucket_idx(i)].front() : next_sample; neighbors.emplace_back(prev, next); - assert(spans[i].empty() || - (neighbors[i].first.ts <= spans[i].front().ts && spans[i].back().ts <= neighbors[i].second.ts)); - } // Should follow: neighbors[i].first <= span[i].front() <= span[i].back() <= neighbors[i].second; + } uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts); for (size_t i = 0; i < spans.size(); i++) { @@ -255,12 +237,20 @@ std::vector AggregateSamplesByRangeOption(std::vector sample bool front_available = (spans[i].front().ts != bucket_left) && (neighbors[i].first.ts <= bucket_left); bool back_available = (spans[i].back().ts != bucket_right) && (bucket_right <= neighbors[i].second.ts); double area = 0; - area += front_available ? front_area(bucket_left, neighbors[i].first, spans[i].front()) : 0.0; - area += back_available ? end_area(bucket_right, spans[i].back(), neighbors[i].second) : 0.0; + uint64_t l = spans[i].front().ts; + uint64_t r = spans[i].back().ts; + if (front_available) { + TSSample left_sample = interpolate_sample(neighbors[i].first, bucket_left, spans[i].front()); + area += Reducer::Area(std::array{left_sample, spans[i].front()}); + l = bucket_left; + } + if (back_available) { + TSSample right_sample = interpolate_sample(spans[i].back(), bucket_right, neighbors[i].second); + area += Reducer::Area(std::array{spans[i].back(), right_sample}); + r = bucket_right; + } // Edge case: If single bucket and it contains only one element. area += !front_available && !back_available && spans[i].size() == 1 ? spans[i][0].v : 0; - uint64_t l = front_available ? bucket_left : spans[i].front().ts; - uint64_t r = back_available ? bucket_right : spans[i].back().ts; sample.v = (sample.v + area) / std::max(static_cast(r - l), 1.0); } } else { From 45ff6fa9961170add40a278005f2a6f91e4ab48c Mon Sep 17 00:00:00 2001 From: var-nan Date: Fri, 5 Dec 2025 10:19:11 -0600 Subject: [PATCH 7/7] refactored twa-bounds struct --- src/types/redis_timeseries.cc | 35 ++++++++++++++++++++--------------- src/types/redis_timeseries.h | 5 +++++ 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc index 28222fbf586..09f796e50bf 100644 --- a/src/types/redis_timeseries.cc +++ b/src/types/redis_timeseries.cc @@ -84,7 +84,8 @@ struct Reducer { } }; -std::vector AggregateSamplesByRangeOption(std::vector samples, const TSRangeOption &option) { +std::vector AggregateSamplesByRangeOption(std::vector samples, const TSRangeOption &option, + const TWABounds &twa_bounds) { const auto &aggregator = option.aggregator; auto get_bucket_ts = [&](uint64_t left) -> uint64_t { @@ -120,15 +121,12 @@ std::vector AggregateSamplesByRangeOption(std::vector sample return Reducer::Area(std::array{left, right}) / static_cast(bucket_right - bucket_left); }; - // Retrieve prev_sample and next_sample from samples when TWA aggregation. TSSample prev_sample, next_sample; bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false; if (is_twa_aggregator) { const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value(); - next_sample = samples.back(); - samples.pop_back(); - prev_sample = samples.back(); - samples.pop_back(); + next_sample = twa_bounds.next_sample; + prev_sample = twa_bounds.prev_sample; // When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples. prev_available = discard_boundaries ? false : !samples.empty() && (samples.front().ts != prev_sample.ts); next_available = discard_boundaries ? false : !samples.empty() && (samples.back().ts != next_sample.ts); @@ -183,12 +181,18 @@ std::vector AggregateSamplesByRangeOption(std::vector sample return curr; }; - std::vector> neighbors; - neighbors.reserve(spans.size()); - for (size_t i = 0; i < spans.size(); i++) { - TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : prev_sample; - TSSample next = (i != (spans.size() - 1)) ? spans[non_empty_right_bucket_idx(i)].front() : next_sample; - neighbors.emplace_back(prev, next); + size_t sz = spans.size() - 1; + std::vector> neighbors(spans.size()); + neighbors[0].first = prev_sample; + neighbors[sz].second = next_sample; + if (spans.size() > 1) { + neighbors[0].second = spans[non_empty_right_bucket_idx(0)].front(); + neighbors[sz].first = spans[non_empty_left_bucket_idx(sz)].back(); + } + sz--; + for (size_t i = 1; i < spans.size() - 1; i++, sz--) { + neighbors[i].first = spans[i - 1].empty() ? neighbors[i - 1].first : spans[i - 1].back(); + neighbors[sz].second = spans[sz + 1].empty() ? neighbors[sz + 1].second : spans[sz + 1].front(); } uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts); @@ -1244,6 +1248,7 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke } } + TWABounds twa_bounds; if (is_twa_aggregator) { // If the first element of the series is in first bucket, prev_sample might not get initialized. Similarly if the // last element in the series is in last bucket, next_sample might not get initialized. If the series is empty, @@ -1252,12 +1257,12 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_ke prev_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.front() : prev_sample; next_sample = next_sample.ts == TSSample::MAX_TIMESTAMP && !temp_results.empty() ? temp_results.back() : next_sample; - temp_results.push_back(prev_sample); - temp_results.push_back(next_sample); + twa_bounds.prev_sample = prev_sample; + twa_bounds.next_sample = next_sample; } // Process compaction logic - *res = AggregateSamplesByRangeOption(std::move(temp_results), option); + *res = AggregateSamplesByRangeOption(std::move(temp_results), option, twa_bounds); return rocksdb::Status::OK(); } diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h index f770ffac904..9ba4e3e84a3 100644 --- a/src/types/redis_timeseries.h +++ b/src/types/redis_timeseries.h @@ -89,6 +89,11 @@ struct TSAggregator { double AggregateSamplesValue(nonstd::span samples) const; }; +struct TWABounds { + TSSample prev_sample; + TSSample next_sample; +}; + struct TSDownStreamMeta { TSAggregator aggregator; uint64_t latest_bucket_idx;