Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions src/commands/cmd_topk.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
#include "types/redis_topk.h"

namespace {
constexpr const char *errBadK = "Bad K";
constexpr const char *errBadWidth = "Bad width";
constexpr const char *errBadDepth = "Bad depth";
constexpr const char *errBadDecay = "Bad decay";
constexpr const char *errInvalidDecay = "Decay must be between 0 and 1";
} // namespace

namespace redis {

class CommandTopKReserve final : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
auto parse_k = ParseInt<uint32_t>(args[2], 10);
if (!parse_k) {
return {Status::RedisParseErr, errBadK};
}
k_ = *parse_k;
if (args_.size() >= 4) {
auto parse_width = ParseInt<uint32_t>(args[3], 10);
if (!parse_width) {
return {Status::RedisParseErr, errBadWidth};
}
width_ = *parse_width;
}
if (args_.size() >= 5) {
auto parse_depth = ParseInt<uint32_t>(args[4], 10);
if (!parse_depth) {
return {Status::RedisParseErr, errBadDepth};
}
depth_ = *parse_depth;
}
if (args_.size() >= 6) {
auto parse_decay = ParseFloat<double>(args[5]);
if (!parse_decay) {
return {Status::RedisParseErr, errBadDecay};
}
decay_ = *parse_decay;
if (decay_ <= 0.0 || decay_ >= 1.0) {
return {Status::RedisParseErr, errInvalidDecay};
}
}
if (args_.size() > 6) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());

auto s = topk.Reserve(ctx, args_[1], k_, width_, depth_, decay_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::RESP_OK;
return Status::OK();
}

private:
uint32_t k_;
uint32_t width_ = 7;
uint32_t depth_ = 8;
double decay_ = 0.9;
};

class CommandTopKAdd final : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 3);

auto s = topk.Add(ctx, args_[1], args_[2]);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::RESP_OK;
return Status::OK();
}
};

class CommandTopKIncrBy final : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args_.size() != 4) {
return {Status::InvalidArgument, "invalid argument"};
}
auto parse_incr = ParseInt<uint32_t>(args[3], 10);
if (!parse_incr) {
return {Status::InvalidArgument, "invalid argument"};
}
incr_ = *parse_incr;
return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 4);

auto s = topk.IncrBy(ctx, args_[1], args_[2], incr_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::RESP_OK;
return Status::OK();
}

private:
uint32_t incr_;
};

class CommandTopKList final : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 2);

std::vector<std::string> items;
auto s = topk.List(ctx, args_[1], items);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = MultiBulkString(redis::RESP::v2, items);
return Status::OK();
}
};

class CommandTopKInfo final : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() > 3) {
return {Status::InvalidArgument, errWrongNumOfArguments};
}

CommandParser parser(args, 2);
if (parser.Good()) {
if (args.size() == 3) {
if (args[2] == "topk") {
type_ = TopKInfoType::kTopK;
} else if (args[2] == "width") {
type_ = TopKInfoType::kWidth;
} else if (args[2] == "depth") {
type_ = TopKInfoType::kDepth;
} else if (args[2] == "decay") {
type_ = TopKInfoType::kDecay;
} else {
return {Status::InvalidArgument, "Invalid info type"};
}
}
}

return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override {
redis::TopK topk_db(srv->storage, conn->GetNamespace());
TopKInfo info;

auto s = topk_db.Info(ctx, args_[1], &info);
if (s.IsNotFound()) return {Status::RedisExecErr, "key does not exist"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

switch (type_) {
case TopKInfoType::kAll:
*output = redis::MultiLen(2 * 4);
*output += redis::SimpleString("K");
*output += redis::Integer(info.k);
*output += redis::SimpleString("Width");
*output += redis::Integer(info.width);
*output += redis::SimpleString("Depth");
*output += redis::Integer(info.depth);
*output += redis::SimpleString("Decay");
*output += redis::Double(redis::RESP::v2, info.decay);
break;
case TopKInfoType::kTopK:
*output = redis::Integer(info.k);
break;
case TopKInfoType::kWidth:
*output = redis::Integer(info.width);
break;
case TopKInfoType::kDepth:
*output = redis::Integer(info.depth);
break;
case TopKInfoType::kDecay:
*output = redis::Double(redis::RESP::v2, info.decay);
break;
}
return Status::OK();
}

private:
TopKInfoType type_ = TopKInfoType::kAll;
};

class CommandTopKQuery final : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 3);

bool is_exists = false;
auto s = topk.Query(ctx, args_[1], args_[2], &is_exists);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::Bool(redis::RESP::v2, is_exists);
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(TopK, MakeCmdAttr<CommandTopKAdd>("topk.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandTopKList>("topk.list", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTopKInfo>("topk.info", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTopKQuery>("topk.query", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTopKReserve>("topk.reserve", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTopKIncrBy>("topk.incrby", 4, "write", 1, 1, 1));

} // namespace redis
1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ enum class CommandCategory : uint8_t {
Txn,
ZSet,
Timeseries,
TopK,
// this is a special category for disabling commands,
// basically can be used for version releasing or debugging
Disabled,
Expand Down
26 changes: 25 additions & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type()

bool Metadata::IsEmptyableType() const {
return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog ||
Type() == kRedisTDigest || Type() == kRedisTimeSeries;
Type() == kRedisTDigest || Type() == kRedisTimeSeries || Type() == kRedisTopK;
}

bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }
Expand Down Expand Up @@ -569,3 +569,27 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {

return rocksdb::Status::OK();
}

void TopKMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);
PutFixed32(dst, top_k);
PutFixed16(dst, width);
PutFixed32(dst, depth);
PutDouble(dst, decay);
}

rocksdb::Status TopKMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}
if (input->size() < sizeof(double) + sizeof(uint32_t) * 2 + sizeof(uint16_t)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

GetFixed32(input, &top_k);
GetFixed16(input, &width);
GetFixed32(input, &depth);
GetDouble(input, &decay);

return rocksdb::Status::OK();
}
22 changes: 20 additions & 2 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ enum RedisType : uint8_t {
kRedisHyperLogLog = 11,
kRedisTDigest = 12,
kRedisTimeSeries = 13,
kRedisTopK = 14,
kRedisTypeMax
};

inline constexpr const std::array<std::string_view, kRedisTypeMax> RedisTypeNames = {
"none", "string", "hash", "list", "set", "zset", "bitmap",
"sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries"};
"none", "string", "hash", "list", "set", "zset", "bitmap", "sortedint",
"stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries", "topk"};

struct RedisTypes {
RedisTypes(std::initializer_list<RedisType> list) {
Expand Down Expand Up @@ -409,3 +410,20 @@ class TimeSeriesMetadata : public Metadata {
void Encode(std::string *dst) const override;
rocksdb::Status Decode(Slice *input) override;
};

class TopKMetadata : public Metadata {
public:
uint32_t top_k;
uint16_t width;
uint32_t depth;
double decay;

explicit TopKMetadata(bool generate_version = true) : Metadata(kRedisTopK, generate_version) {}

explicit TopKMetadata(uint64_t top_k, uint64_t width = 7, uint64_t depth = 8, double decay = 0.9,
bool generate_version = true)
: Metadata(kRedisTopK, generate_version), top_k(top_k), width(width), depth(depth), decay(decay) {}

void Encode(std::string *dst) const override;
rocksdb::Status Decode(Slice *input) override;
};
Loading