Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 59 additions & 29 deletions src/brpc/redis_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,27 +377,50 @@ size_t RedisCommandParser::ParsedArgsSize() {
ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
std::vector<butil::StringPiece>* args,
butil::Arena* arena) {
ParseError err = PARSE_OK;
do {
RedisCommandConsumeState state = ConsumeImpl(buf, arena, &err);
if (state == CONSUME_STATE_CONTINUE) {
continue;
} else if (state == CONSUME_STATE_DONE) {
break;
} else {
return err;
}
} while (true);

args->swap(_args);
Reset();
return PARSE_OK;
}

RedisCommandConsumeState RedisCommandParser::ConsumeImpl(butil::IOBuf& buf,
butil::Arena* arena,
ParseError* err) {
const auto pfc = static_cast<const char *>(buf.fetch1());
if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
*err = PARSE_ERROR_NOT_ENOUGH_DATA;
return CONSUME_STATE_ERROR;
}
// '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
if (!_parsing_array && *pfc != '*') {
if (!std::isalpha(static_cast<unsigned char>(*pfc))) {
return PARSE_ERROR_TRY_OTHERS;
*err = PARSE_ERROR_TRY_OTHERS;
return CONSUME_STATE_ERROR;
}
const size_t buf_size = buf.size();
const auto copy_str = static_cast<char *>(arena->allocate(buf_size + 1));
buf.copy_to(copy_str, buf_size);
if (*copy_str == ' ') {
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
copy_str[buf_size] = '\0';
const size_t crlf_pos = butil::StringPiece(copy_str, buf_size).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data
return PARSE_ERROR_NOT_ENOUGH_DATA;
*err = PARSE_ERROR_NOT_ENOUGH_DATA;
return CONSUME_STATE_ERROR;
}
args->clear();
size_t offset = 0;
while (offset < crlf_pos && copy_str[offset] != ' ') {
++offset;
Expand All @@ -407,11 +430,11 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
for (size_t i = 0; i < offset; ++i) {
first_arg[i] = tolower(first_arg[i]);
}
args->push_back(butil::StringPiece(first_arg, offset));
_args.push_back(butil::StringPiece(first_arg, offset));
if (offset == crlf_pos) {
// only one argument, directly return
buf.pop_front(crlf_pos + 2);
return PARSE_OK;
return CONSUME_STATE_DONE;
}
size_t arg_start_pos = ++offset;

Expand All @@ -422,7 +445,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
const auto arg_length = offset - arg_start_pos;
const auto arg = static_cast<char *>(arena->allocate(arg_length));
memcpy(arg, copy_str + arg_start_pos, arg_length);
args->push_back(butil::StringPiece(arg, arg_length));
_args.push_back(butil::StringPiece(arg, arg_length));
arg_start_pos = ++offset;
}

Expand All @@ -431,61 +454,69 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
const auto arg_length = crlf_pos - arg_start_pos;
const auto arg = static_cast<char *>(arena->allocate(arg_length));
memcpy(arg, copy_str + arg_start_pos, arg_length);
args->push_back(butil::StringPiece(arg, arg_length));
_args.push_back(butil::StringPiece(arg, arg_length));
}

buf.pop_front(crlf_pos + 2);
return PARSE_OK;
return CONSUME_STATE_DONE;
}
// '$' stands for bulk string "$<length>\r\n<string>\r\n"
if (_parsing_array && *pfc != '$') {
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
char intbuf[32]; // enough for fc + 64-bit decimal + \r\n
const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
intbuf[ncopied] = '\0';
const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data
return PARSE_ERROR_NOT_ENOUGH_DATA;
*err = PARSE_ERROR_NOT_ENOUGH_DATA;
return CONSUME_STATE_ERROR;
}
char* endptr = NULL;
int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
if (endptr != intbuf + crlf_pos) {
LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
if (value < 0) {
LOG(ERROR) << "Invalid len=" << value << " in redis command";
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
if (!_parsing_array) {
if (value > (int64_t)(FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece))) {
LOG(ERROR) << "command array size exceeds limit! max="
<< (FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece))
LOG(ERROR) << "command array size exceeds limit! max="
<< (FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece))
<< ", actually=" << value;
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
_parsing_array = true;
_length = value;
_index = 0;
_args.resize(value);
return Consume(buf, args, arena);
return CONSUME_STATE_CONTINUE;
}
CHECK(_index < _length) << "a complete command has been parsed. "
"impl of RedisCommandParser::Parse is buggy";
"impl of RedisCommandParser::Parse is buggy";
const int64_t len = value; // `value' is length of the string
if (len < 0) {
LOG(ERROR) << "string in command is nil!";
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
if (len > FLAGS_redis_max_allocation_size) {
LOG(ERROR) << "command string exceeds max allocation size! max="
LOG(ERROR) << "command string exceeds max allocation size! max="
<< FLAGS_redis_max_allocation_size << ", actually=" << len;
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
*err = PARSE_ERROR_NOT_ENOUGH_DATA;
return CONSUME_STATE_ERROR;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
char* d = (char*)arena->allocate((len/8 + 1) * 8);
Expand All @@ -502,14 +533,13 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') {
LOG(ERROR) << "string in command is not ended with CRLF";
return PARSE_ERROR_ABSOLUTELY_WRONG;
*err = PARSE_ERROR_ABSOLUTELY_WRONG;
return CONSUME_STATE_ERROR;
}
if (++_index < _length) {
return Consume(buf, args, arena);
if (++_index == _length) {
return CONSUME_STATE_DONE;
}
args->swap(_args);
Reset();
return PARSE_OK;
return CONSUME_STATE_CONTINUE;
}

void RedisCommandParser::Reset() {
Expand Down
14 changes: 14 additions & 0 deletions src/brpc/redis_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf,
const butil::StringPiece* components,
size_t num_components);

enum RedisCommandConsumeState {
CONSUME_STATE_CONTINUE,
CONSUME_STATE_DONE,
CONSUME_STATE_ERROR,
};

// A parser used to parse redis raw command.
class RedisCommandParser {
public:
Expand All @@ -59,6 +65,14 @@ class RedisCommandParser {
// Reset parser to the initial state.
void Reset();

// Consume one arg from `buf'.
// Return CONSUME_STATE_CONTINUE if the parser needs more data.
// Return CONSUME_STATE_DONE if the parser has parsed a complete command.
// Return CONSUME_STATE_ERROR if the parser meets an error.
RedisCommandConsumeState ConsumeImpl(butil::IOBuf& buf,
butil::Arena* arena,
ParseError* err);

bool _parsing_array; // if the parser has met array indicator '*'
int _length; // array length
int _index; // current parsing array index
Expand Down
19 changes: 19 additions & 0 deletions test/brpc_redis_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,25 @@ TEST_F(RedisTest, memory_allocation_limits) {
brpc::ParseError err = parser.Consume(buf, &args, &arena);
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, err);
}

{
// Test large command array work
int32_t original_limit_tmp = brpc::FLAGS_redis_max_allocation_size;
brpc::FLAGS_redis_max_allocation_size = 1024 * 1024;
brpc::RedisCommandParser parser;
butil::IOBuf buf;
int32_t large_array_size = brpc::FLAGS_redis_max_allocation_size / sizeof(butil::StringPiece);
std::string large_array_cmd = "*" + std::to_string(large_array_size) + "\r\n";
for(int i = 0; i < large_array_size; i++){
large_array_cmd.append("$1\r\n1\r\n");
}
buf.append(large_array_cmd);

std::vector<butil::StringPiece> args;
brpc::ParseError err = parser.Consume(buf, &args, &arena);
ASSERT_EQ(brpc::PARSE_OK, err);
brpc::FLAGS_redis_max_allocation_size = original_limit_tmp;
}

// Test valid cases within limits
{
Expand Down
Loading