Skip to content

Commit afce4b9

Browse files
committed
server: tests for rdb loading with chunks
Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
1 parent 2c339bb commit afce4b9

1 file changed

Lines changed: 247 additions & 1 deletion

File tree

src/server/rdb_test.cc

Lines changed: 247 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
extern "C" {
77
#include "redis/crc64.h"
8-
#include "redis/listpack.h"
98
#include "redis/redis_aux.h"
109
#include "redis/zmalloc.h"
1110
}
@@ -16,11 +15,14 @@ extern "C" {
1615
#include "base/flags.h"
1716
#include "base/gtest.h"
1817
#include "base/logging.h"
18+
#include "core/bloom.h"
1919
#include "facade/facade_test.h" // needed to find operator== for RespExpr.
2020
#include "io/file.h"
2121
#include "server/engine_shard_set.h"
22+
#include "server/rdb_extensions.h"
2223
#include "server/rdb_load.h"
2324
#include "server/rdb_save.h"
25+
#include "server/serializer_commons.h"
2426
#include "server/test_utils.h"
2527

2628
namespace rng = std::ranges;
@@ -1027,6 +1029,7 @@ TEST_F(RdbTest, SnapshotTooBig) {
10271029
}
10281030

10291031
TEST_F(RdbTest, HugeKeyIssue4497) {
1032+
absl::FlagSaver fs;
10301033
SetTestFlag("cache_mode", "true");
10311034
ResetService();
10321035

@@ -1039,6 +1042,7 @@ TEST_F(RdbTest, HugeKeyIssue4497) {
10391042
}
10401043

10411044
TEST_F(RdbTest, HugeKeyIssue4554) {
1045+
absl::FlagSaver fs;
10421046
SetTestFlag("cache_mode", "true");
10431047
// We need to stress one flow/shard such that the others finish early. Lock on hashtags allows
10441048
// that.
@@ -1216,4 +1220,246 @@ TEST_F(RdbTest, TopkSerializationDecayParameter) {
12161220
EXPECT_THAT(resp2, RespArray(ElementsAre("item3", "item4")));
12171221
}
12181222

1223+
namespace {
1224+
1225+
// Wraps string in rdb version, eof, checksum, etc so it can be fed to a loader
1226+
std::string WrapInRdb(std::string_view body) {
1227+
std::string out = absl::StrFormat("REDIS%04d", RDB_SER_VERSION);
1228+
out.append(body);
1229+
out.push_back(static_cast<char>(RDB_OPCODE_EOF));
1230+
constexpr uint8_t checksum[8] = {};
1231+
out.append(reinterpret_cast<const char*>(checksum), sizeof(checksum));
1232+
return out;
1233+
}
1234+
1235+
std::error_code LoadRdbData(Service* service, const std::string& rdb,
1236+
std::optional<uint64_t> journal_offset = std::nullopt) {
1237+
io::BytesSource src{io::Buffer(rdb)};
1238+
RdbLoadContext load_context;
1239+
RdbLoader loader(service, &load_context);
1240+
auto ec = loader.Load(&src);
1241+
EXPECT_EQ(loader.journal_offset(), journal_offset);
1242+
return ec;
1243+
}
1244+
1245+
void AppendLen(std::string* out, uint64_t len) {
1246+
uint8_t buf[9];
1247+
const auto sz = WritePackedUInt(len, {buf, sizeof(buf)});
1248+
out->append(reinterpret_cast<const char*>(buf), sz);
1249+
}
1250+
1251+
void AppendString(std::string* out, std::string_view s) {
1252+
AppendLen(out, s.size());
1253+
out->append(s);
1254+
}
1255+
1256+
void AddKV(std::string* out, std::string_view key, std::string_view val) {
1257+
AppendString(out, key);
1258+
AppendString(out, val);
1259+
}
1260+
1261+
std::string MakeTaggedChunk(uint32_t id, std::string_view payload) {
1262+
std::string out;
1263+
out.push_back(static_cast<char>(RDB_OPCODE_TAGGED_CHUNK));
1264+
1265+
uint8_t header[8];
1266+
absl::little_endian::Store32(header, id);
1267+
absl::little_endian::Store32(header + 4, payload.size());
1268+
out.append(reinterpret_cast<const char*>(header), sizeof(header));
1269+
1270+
out.append(payload);
1271+
return out;
1272+
}
1273+
1274+
void AppendBinaryDouble(std::string* out, double val) {
1275+
uint64_t bits;
1276+
memcpy(&bits, &val, sizeof(bits));
1277+
1278+
uint8_t buf[8];
1279+
absl::little_endian::Store64(buf, bits);
1280+
out->append(reinterpret_cast<const char*>(buf), sizeof(buf));
1281+
}
1282+
1283+
} // namespace
1284+
1285+
// The following are tests that directly feed byte data to loader to exercise chunk loading.
1286+
// Some of these will become redundant once the saver starts sending chunked data, so instead of
1287+
// hand-crafting data we will be able to load from the db directly.
1288+
1289+
TEST_F(RdbTest, LoadTwoChunks) {
1290+
std::string first;
1291+
first.push_back(RDB_TYPE_HASH);
1292+
AppendString(&first, "h");
1293+
AppendLen(&first, 2);
1294+
AddKV(&first, "f1", "v1");
1295+
1296+
std::string second;
1297+
AddKV(&second, "f2", "v2");
1298+
1299+
std::string body;
1300+
// hash is split across two tagged chunks
1301+
body += MakeTaggedChunk(1, first);
1302+
body += MakeTaggedChunk(1, second);
1303+
1304+
const auto ec = pp_->at(0)->Await([&] { return LoadRdbData(service_.get(), WrapInRdb(body)); });
1305+
ASSERT_FALSE(ec) << ec.message();
1306+
1307+
EXPECT_EQ(Run({"HGET", "h", "f1"}), "v1");
1308+
EXPECT_EQ(Run({"HGET", "h", "f2"}), "v2");
1309+
}
1310+
1311+
TEST_F(RdbTest, InterleavedLoad) {
1312+
std::string a1;
1313+
a1.push_back(RDB_TYPE_HASH);
1314+
AppendString(&a1, "a");
1315+
AppendLen(&a1, 2);
1316+
AddKV(&a1, "f1", "v1");
1317+
1318+
std::string b;
1319+
b.push_back(RDB_TYPE_STRING);
1320+
AppendString(&b, "b");
1321+
AppendString(&b, "plain");
1322+
1323+
std::string a2;
1324+
AddKV(&a2, "f2", "v2");
1325+
1326+
std::string body;
1327+
// interleave - one tag, one plain, then one tag for the same id=1
1328+
body += MakeTaggedChunk(1, a1);
1329+
body += b;
1330+
body += MakeTaggedChunk(1, a2);
1331+
1332+
auto ec = pp_->at(0)->Await([&] { return LoadRdbData(service_.get(), WrapInRdb(body)); });
1333+
ASSERT_FALSE(ec) << ec.message();
1334+
1335+
EXPECT_EQ(Run({"HGET", "a", "f1"}), "v1");
1336+
EXPECT_EQ(Run({"HGET", "a", "f2"}), "v2");
1337+
EXPECT_EQ(Run({"GET", "b"}), "plain");
1338+
}
1339+
1340+
TEST_F(RdbTest, ChunksAroundJournalOffset) {
1341+
std::string a1;
1342+
a1.push_back(RDB_TYPE_HASH);
1343+
AppendString(&a1, "a");
1344+
AppendLen(&a1, 2);
1345+
AddKV(&a1, "f1", "v1");
1346+
1347+
std::string a2;
1348+
AddKV(&a2, "f2", "v2");
1349+
1350+
std::string body;
1351+
body += MakeTaggedChunk(1, a1);
1352+
1353+
// put the journal offset in the middle
1354+
body.push_back(static_cast<char>(RDB_OPCODE_JOURNAL_OFFSET));
1355+
uint8_t offset_bytes[8];
1356+
absl::little_endian::Store64(offset_bytes, 1234);
1357+
body.append(reinterpret_cast<const char*>(offset_bytes), sizeof(offset_bytes));
1358+
1359+
body += MakeTaggedChunk(1, a2);
1360+
1361+
auto ec = pp_->at(0)->Await([&] { return LoadRdbData(service_.get(), WrapInRdb(body), 1234); });
1362+
ASSERT_FALSE(ec) << ec.message();
1363+
1364+
EXPECT_EQ(Run({"HGET", "a", "f1"}), "v1");
1365+
EXPECT_EQ(Run({"HGET", "a", "f2"}), "v2");
1366+
}
1367+
1368+
TEST_F(RdbTest, SplitSBF) {
1369+
// this test creates two filter SBF, then splits one of the filters. Since in sbf loading there
1370+
// are two layers of possible splits, intra-filter and inter-filter, this test exercises both
1371+
// splits. A plain string is also added between the split filter.
1372+
1373+
// Creates filter in db to copy the fields from
1374+
auto resp = Run({"BF.RESERVE", "bf_src", "0.01", "10"});
1375+
EXPECT_EQ(resp, "OK");
1376+
for (size_t i = 0; i < 50; ++i) {
1377+
resp = Run({"BF.ADD", "bf_src", StrCat("item", i)});
1378+
EXPECT_THAT(resp, AnyOf(0, 1));
1379+
}
1380+
1381+
std::string first;
1382+
std::string blob1;
1383+
1384+
// split the blob of the second filter into three chunks. this exercises the loader path where we
1385+
// first try to load the incomplete filter, and return early before that finishes
1386+
constexpr size_t first_split = 17;
1387+
constexpr size_t second_split = 13;
1388+
1389+
pp_->at(0)->Await([&] {
1390+
const DbContext ctx{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()};
1391+
const auto& db = ctx.GetDbSlice(0);
1392+
auto it = db.FindReadOnly(ctx, "bf_src", OBJ_SBF);
1393+
ASSERT_TRUE(it.ok());
1394+
1395+
const SBF* sbf = it.value()->second.GetSBF();
1396+
ASSERT_GE(sbf->num_filters(), 2);
1397+
1398+
const std::string blob0{sbf->data(0)};
1399+
1400+
blob1 = std::string{sbf->data(1)};
1401+
ASSERT_GT(blob1.size(), first_split + second_split);
1402+
1403+
first.push_back(RDB_TYPE_SBF2);
1404+
// brand new key whose shape is copied off bf_src
1405+
AppendString(&first, "bf_loaded");
1406+
AppendLen(&first, 0);
1407+
AppendBinaryDouble(&first, sbf->grow_factor());
1408+
AppendBinaryDouble(&first, sbf->fp_probability());
1409+
AppendLen(&first, sbf->prev_size());
1410+
AppendLen(&first, sbf->current_size());
1411+
AppendLen(&first, sbf->max_capacity());
1412+
AppendLen(&first, sbf->num_filters());
1413+
1414+
AppendLen(&first, sbf->hashfunc_cnt(0));
1415+
// total size of blob0
1416+
AppendLen(&first, blob0.size());
1417+
// this chunk size (all of blob0 is fit in one chunk)
1418+
AppendLen(&first, blob0.size());
1419+
first.append(blob0);
1420+
1421+
AppendLen(&first, sbf->hashfunc_cnt(1));
1422+
// total size of blob1
1423+
AppendLen(&first, blob1.size());
1424+
// only 17 bytes from blob1 in this chunk
1425+
AppendLen(&first, first_split);
1426+
first.append(blob1.data(), first_split);
1427+
});
1428+
1429+
// add this plain string between chunks of blob1 filter
1430+
std::string plain;
1431+
plain.push_back(RDB_TYPE_STRING);
1432+
AppendString(&plain, "plain_key");
1433+
AppendString(&plain, "plain_val");
1434+
1435+
// p2 of blob1
1436+
std::string second;
1437+
AppendLen(&second, second_split);
1438+
second.append(blob1.data() + first_split, second_split);
1439+
1440+
// p2 of blob1
1441+
std::string third;
1442+
constexpr auto sum = first_split + second_split;
1443+
AppendLen(&third, blob1.size() - sum);
1444+
third.append(blob1.data() + sum, blob1.size() - sum);
1445+
1446+
std::string body;
1447+
body += MakeTaggedChunk(1, first);
1448+
body += plain;
1449+
body += MakeTaggedChunk(1, second);
1450+
body += MakeTaggedChunk(1, third);
1451+
1452+
EXPECT_EQ(Run({"FLUSHALL"}), "OK");
1453+
1454+
auto ec = pp_->at(0)->Await([&] { return LoadRdbData(service_.get(), WrapInRdb(body)); });
1455+
ASSERT_FALSE(ec) << ec.message();
1456+
1457+
EXPECT_EQ(Run({"TYPE", "bf_loaded"}), "MBbloom--");
1458+
EXPECT_EQ(Run({"GET", "plain_key"}), "plain_val");
1459+
1460+
for (size_t i = 0; i < 50; ++i) {
1461+
EXPECT_THAT(Run({"BF.EXISTS", "bf_loaded", StrCat("item", i)}), IntArg(1));
1462+
}
1463+
}
1464+
12191465
} // namespace dfly

0 commit comments

Comments
 (0)