Skip to content

Commit ccc15fd

Browse files
committed
feat(stream): support XDELEX command
Add XDELEX to delete stream entries with KEEPREF/DELREF options. Includes refactoring for unified entry/PEL deletion. Also fix a consumer pending count bug when destroying a consumer.
1 parent 4cf6824 commit ccc15fd

7 files changed

Lines changed: 745 additions & 1 deletion

File tree

src/commands/cmd_stream.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,73 @@ class CommandXDel : public Commander {
268268
std::vector<redis::StreamEntryID> ids_;
269269
};
270270

271+
class CommandXDelEx : public Commander {
272+
public:
273+
Status Parse(const std::vector<std::string> &args) override {
274+
CommandParser parser(args, 1);
275+
stream_name_ = GET_OR_RET(parser.TakeStr());
276+
277+
option_ = redis::StreamDeleteOption::KeepRef;
278+
279+
while (parser.Good() && !util::EqualICase(parser.RawPeek(), "IDS")) {
280+
if (parser.EatEqICase("KEEPREF")) {
281+
option_ = redis::StreamDeleteOption::KeepRef;
282+
} else if (parser.EatEqICase("DELREF")) {
283+
option_ = redis::StreamDeleteOption::DelRef;
284+
} else if (parser.EatEqICase("ACKED")) {
285+
option_ = redis::StreamDeleteOption::Acked;
286+
} else {
287+
return parser.InvalidSyntax();
288+
}
289+
}
290+
291+
if (!parser.EatEqICase("IDS")) {
292+
return {Status::RedisParseErr, "syntax error, expected IDS keyword"};
293+
}
294+
295+
auto numids_result = parser.TakeInt<int64_t>();
296+
if (!numids_result.IsOK()) {
297+
return {Status::RedisParseErr, errValueNotInteger};
298+
}
299+
int64_t numids = numids_result.GetValue();
300+
if (numids <= 0) {
301+
return {Status::RedisParseErr, "numids must be positive"};
302+
}
303+
304+
for (int64_t i = 0; i < numids; i++) {
305+
auto id_str = GET_OR_RET(parser.TakeStr());
306+
redis::StreamEntryID id;
307+
auto s = ParseStreamEntryID(id_str, &id);
308+
if (!s.IsOK()) return s;
309+
entry_ids_.emplace_back(id);
310+
}
311+
312+
return Status::OK();
313+
}
314+
315+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
316+
redis::Stream stream_db(srv->storage, conn->GetNamespace());
317+
std::vector<int> results;
318+
319+
auto s = stream_db.DeleteEntriesWithOption(ctx, stream_name_, entry_ids_, option_, &results);
320+
if (!s.ok()) {
321+
return {Status::RedisExecErr, s.ToString()};
322+
}
323+
324+
output->append(redis::MultiLen(results.size()));
325+
for (int r : results) {
326+
output->append(redis::Integer(r));
327+
}
328+
329+
return Status::OK();
330+
}
331+
332+
private:
333+
std::string stream_name_;
334+
redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef;
335+
std::vector<redis::StreamEntryID> entry_ids_;
336+
};
337+
271338
class CommandXClaim : public Commander {
272339
public:
273340
Status Parse(const std::vector<std::string> &args) override {
@@ -1907,6 +1974,7 @@ class CommandXSetId : public Commander {
19071974
REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
19081975
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
19091976
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
1977+
MakeCmdAttr<CommandXDelEx>("xdelex", -5, "write no-dbsize-check", 1, 1, 1),
19101978
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
19111979
MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, "write", 1, 1, 1),
19121980
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),

src/storage/batch_extractor.cc

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,14 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
266266
break;
267267
}
268268
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
269+
InternalKey ikey(key, is_slot_id_encoded_);
270+
Slice entry_id_check = ikey.GetSubKey();
271+
uint64_t delimiter = 0;
272+
GetFixed64(&entry_id_check, &delimiter);
273+
if (delimiter == UINT64_MAX) {
274+
return rocksdb::Status::OK();
275+
}
276+
269277
auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args);
270278
if (!s.IsOK()) {
271279
ERROR("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
@@ -397,8 +405,25 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
397405
Slice encoded_id = ikey.GetSubKey();
398406
redis::StreamEntryID entry_id;
399407
GetFixed64(&encoded_id, &entry_id.ms);
408+
409+
if (entry_id.ms == UINT64_MAX) {
410+
return rocksdb::Status::OK();
411+
}
412+
400413
GetFixed64(&encoded_id, &entry_id.seq);
401-
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
414+
std::string entry_id_str = entry_id.ToString();
415+
std::string user_key = ikey.GetKey().ToString();
416+
417+
auto args = log_data_.GetArguments();
418+
if (!args->empty()) {
419+
if ((*args)[0] == "XDELEX" && args->size() >= 2) {
420+
command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str};
421+
} else {
422+
command_args = {"XDEL", user_key, entry_id_str};
423+
}
424+
} else {
425+
command_args = {"XDEL", user_key, entry_id_str};
426+
}
402427
}
403428

404429
if (!command_args.empty()) {

0 commit comments

Comments
 (0)