Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 11 additions & 39 deletions src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,53 +144,25 @@ Status ParseHashExpireFields(const std::vector<std::string> &args, size_t start,
HashFieldExpireCondition *condition_out, std::vector<std::string> *fields) {
*condition_out = HashFieldExpireCondition::kNone;
fields->clear();
bool fields_seen = false;

for (size_t i = start; i < args.size();) {
if (util::EqualICase(args[i], "FIELDS")) {
if (fields_seen) {
return {Status::RedisParseErr, errInvalidSyntax};
}
fields_seen = true;
if (i + 1 >= args.size()) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

auto num_fields = ParseInt<int64_t>(args[i + 1], 10);
if (!num_fields || *num_fields < 1) {
return {Status::RedisParseErr, errValueNotInteger};
}

size_t first_field = i + 2;
auto field_count = static_cast<size_t>(*num_fields);
if (field_count > args.size() - first_field) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
size_t i = start;
if (i < args.size()) {
auto condition = ParseHashExpireCondition(args[i]);
if (condition) {
*condition_out = *condition;
i++;

fields->clear();
fields->reserve(field_count);
for (size_t j = 0; j < field_count; j++) {
fields->emplace_back(args[first_field + j]);
if (i < args.size() && ParseHashExpireCondition(args[i])) {
return {Status::RedisParseErr, errInvalidSyntax};
}
i = first_field + field_count;
continue;
}

auto condition = ParseHashExpireCondition(args[i]);
if (!condition) {
return {Status::RedisParseErr, errInvalidSyntax};
}
if (*condition_out != HashFieldExpireCondition::kNone && *condition_out != *condition) {
return {Status::RedisParseErr, errInvalidSyntax};
}
*condition_out = *condition;
i++;
}

if (!fields_seen) {
CommandParser parser(args, i);
if (!parser.EatEqICase("FIELDS")) {
return {Status::RedisParseErr, errInvalidSyntax};
}
return Status::OK();
return ParseHashFieldListTail(parser, fields);
Comment on lines -147 to +165

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally refactored the HFE parser while trying to align the parsing style with the new XDELEX implementation, but it unnecessarily broadened the scope of this PR.I will fix this in the next push.Thanks for you review.

}

int64_t FormatHashFieldExpireResult(int64_t expire_at, uint64_t now, HashFieldExpireTimeMode time_mode) {
Expand Down
98 changes: 98 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <limits>
#include <memory>
#include <stdexcept>
#include <string_view>

#include "command_parser.h"
#include "commander.h"
Expand Down Expand Up @@ -49,6 +50,14 @@ CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args, uint3
range.last_key = range.first_key + stream_size - 1;
return range;
}

bool IsXDelExNumIDs(std::string_view input) {
if (input.empty() || input[0] < '1' || input[0] > '9') {
return false;
}

return std::all_of(input.begin() + 1, input.end(), [](char c) { return c >= '0' && c <= '9'; });
}
} // namespace

void AddStreamEntriesToResponse(std::string *output, const std::vector<StreamEntry> &entries) {
Expand Down Expand Up @@ -268,6 +277,94 @@ class CommandXDel : public Commander {
std::vector<redis::StreamEntryID> ids_;
};

class CommandXDelEx : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
stream_name_ = GET_OR_RET(parser.TakeStr());

option_ = redis::StreamDeleteOption::KeepRef;
bool has_option = false;
bool has_ids = false;

while (parser.Good()) {
if (parser.EatEqICase("KEEPREF")) {
if (has_option) {
return parser.InvalidSyntax();
}
has_option = true;
option_ = redis::StreamDeleteOption::KeepRef;
} else if (parser.EatEqICase("DELREF")) {
if (has_option) {
return parser.InvalidSyntax();
}
has_option = true;
option_ = redis::StreamDeleteOption::DelRef;
} else if (parser.EatEqICase("ACKED")) {
if (has_option) {
return parser.InvalidSyntax();
}
has_option = true;
option_ = redis::StreamDeleteOption::Acked;
} else if (parser.EatEqICase("IDS")) {
has_ids = true;

if (!parser.Good() || !IsXDelExNumIDs(parser.RawPeek())) {
return {Status::RedisParseErr, errValueNotInteger};
}
auto numids_result = parser.TakeInt<int64_t>();
if (!numids_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
int64_t numids = numids_result.GetValue();
if (numids <= 0) {
return {Status::RedisParseErr, "numids must be positive"};
}

std::vector<redis::StreamEntryID> ids;
for (int64_t i = 0; i < numids; i++) {
auto id_str = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseStreamEntryID(id_str, &id);
if (!s.IsOK()) return s;
ids.emplace_back(id);
}
entry_ids_ = std::move(ids);
} else {
return parser.InvalidSyntax();
}
}

if (!has_ids) {
return {Status::RedisParseErr, "syntax error, expected IDS keyword"};
}

return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<int> results;

auto s = stream_db.DeleteEntriesWithOption(ctx, stream_name_, entry_ids_, option_, &results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

output->append(redis::MultiLen(results.size()));
for (int r : results) {
output->append(redis::Integer(r));
}

return Status::OK();
}

private:
std::string stream_name_;
redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef;
std::vector<redis::StreamEntryID> entry_ids_;
};

class CommandXClaim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1907,6 +2004,7 @@ class CommandXSetId : public Commander {
REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXDelEx>("xdelex", -5, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
Expand Down
94 changes: 91 additions & 3 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

#include "batch_extractor.h"

#include <utility>

#include "cluster/redis_slot.h"
#include "logging.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "types/redis_bitmap.h"
#include "types/redis_stream_base.h"

void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
log_data_ = redis::WriteBatchLogData();
seen_xdelex_entry_keys_.clear();

// Currently, we only have two kinds of log data
if (ServerLogData::IsServerLogData(blob.data())) {
ServerLogData server_log;
Expand Down Expand Up @@ -266,6 +272,21 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
break;
}
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
InternalKey ikey(key, is_slot_id_encoded_);
Slice entry_id_check = ikey.GetSubKey();
uint64_t delimiter = 0;
GetFixed64(&entry_id_check, &delimiter);
if (delimiter == UINT64_MAX) {
return rocksdb::Status::OK();
}

user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args);
if (!s.IsOK()) {
ERROR("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
Expand Down Expand Up @@ -396,9 +417,76 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
InternalKey ikey(key, is_slot_id_encoded_);
Slice encoded_id = ikey.GetSubKey();
redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
if (!GetFixed64(&encoded_id, &entry_id.ms)) {
return rocksdb::Status::OK();
}

if (entry_id.ms == UINT64_MAX) {
// DELREF may remove dangling PELs without a stream entry deletion.
auto args = log_data_.GetArguments();
if (!args->empty() && (*args)[0] == "XDELEX" && args->size() >= 2 && (*args)[1] == "DELREF") {
uint8_t type_delimiter = 0;
if (!GetFixed8(&encoded_id, &type_delimiter)) {
return rocksdb::Status::OK();
}
if (type_delimiter == static_cast<uint8_t>(redis::StreamSubkeyType::StreamPelEntry)) {
uint64_t group_name_len = 0;
if (!GetFixed64(&encoded_id, &group_name_len)) {
return rocksdb::Status::OK();
}
if (group_name_len > encoded_id.size() || encoded_id.size() - group_name_len < 16) {
return rocksdb::Status::OK();
}
encoded_id.remove_prefix(group_name_len);

if (!GetFixed64(&encoded_id, &entry_id.ms) || !GetFixed64(&encoded_id, &entry_id.seq)) {
return rocksdb::Status::OK();
}
std::string entry_id_str = entry_id.ToString();

std::string user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

std::string dedup_key = ns + '\0' + user_key + '\0' + entry_id_str;
if (seen_xdelex_entry_keys_.insert(std::move(dedup_key)).second) {
command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
}
}
return rocksdb::Status::OK();
}

if (!GetFixed64(&encoded_id, &entry_id.seq)) {
return rocksdb::Status::OK();
}
std::string entry_id_str = entry_id.ToString();
std::string user_key = ikey.GetKey().ToString();

auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

auto args = log_data_.GetArguments();
if (!args->empty()) {
if ((*args)[0] == "XDELEX" && args->size() >= 2) {
std::string dedup_key = ns + '\0' + user_key + '\0' + entry_id_str;
if (seen_xdelex_entry_keys_.insert(std::move(dedup_key)).second) {
std::string option = (*args)[1] == "ACKED" ? "KEEPREF" : (*args)[1];
command_args = {(*args)[0], user_key, option, "IDS", "1", entry_id_str};
}
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
}

if (!command_args.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/batch_extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <map>
#include <string>
#include <unordered_set>
#include <vector>

#include "cluster/cluster_defs.h"
Expand Down Expand Up @@ -54,4 +55,5 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
bool is_slot_id_encoded_ = false;
SlotRange slot_range_;
bool to_redis_;
std::unordered_set<std::string> seen_xdelex_entry_keys_;
};
Loading
Loading