55
66extern " C" {
77#include " redis/crc64.h"
8- #include " redis/listpack.h"
98#include " redis/redis_aux.h"
109#include " redis/zmalloc.h"
1110}
@@ -16,11 +15,14 @@ extern "C" {
1615#include " base/flags.h"
1716#include " base/gtest.h"
1817#include " base/logging.h"
18+ #include " core/bloom.h"
1919#include " facade/facade_test.h" // needed to find operator== for RespExpr.
2020#include " io/file.h"
2121#include " server/engine_shard_set.h"
22+ #include " server/rdb_extensions.h"
2223#include " server/rdb_load.h"
2324#include " server/rdb_save.h"
25+ #include " server/serializer_commons.h"
2426#include " server/test_utils.h"
2527
2628namespace rng = std::ranges;
@@ -1027,6 +1029,7 @@ TEST_F(RdbTest, SnapshotTooBig) {
10271029}
10281030
10291031TEST_F (RdbTest, HugeKeyIssue4497) {
1032+ absl::FlagSaver fs;
10301033 SetTestFlag (" cache_mode" , " true" );
10311034 ResetService ();
10321035
@@ -1039,6 +1042,7 @@ TEST_F(RdbTest, HugeKeyIssue4497) {
10391042}
10401043
10411044TEST_F (RdbTest, HugeKeyIssue4554) {
1045+ absl::FlagSaver fs;
10421046 SetTestFlag (" cache_mode" , " true" );
10431047 // We need to stress one flow/shard such that the others finish early. Lock on hashtags allows
10441048 // that.
@@ -1216,4 +1220,246 @@ TEST_F(RdbTest, TopkSerializationDecayParameter) {
12161220 EXPECT_THAT (resp2, RespArray (ElementsAre (" item3" , " item4" )));
12171221}
12181222
1223+ namespace {
1224+
1225+ // Wraps string in rdb version, eof, checksum, etc so it can be fed to a loader
1226+ std::string WrapInRdb (std::string_view body) {
1227+ std::string out = absl::StrFormat (" REDIS%04d" , RDB_SER_VERSION);
1228+ out.append (body);
1229+ out.push_back (static_cast <char >(RDB_OPCODE_EOF));
1230+ constexpr uint8_t checksum[8 ] = {};
1231+ out.append (reinterpret_cast <const char *>(checksum), sizeof (checksum));
1232+ return out;
1233+ }
1234+
1235+ std::error_code LoadRdbData (Service* service, const std::string& rdb,
1236+ std::optional<uint64_t > journal_offset = std::nullopt ) {
1237+ io::BytesSource src{io::Buffer (rdb)};
1238+ RdbLoadContext load_context;
1239+ RdbLoader loader (service, &load_context);
1240+ auto ec = loader.Load (&src);
1241+ EXPECT_EQ (loader.journal_offset (), journal_offset);
1242+ return ec;
1243+ }
1244+
1245+ void AppendLen (std::string* out, uint64_t len) {
1246+ uint8_t buf[9 ];
1247+ const auto sz = WritePackedUInt (len, {buf, sizeof (buf)});
1248+ out->append (reinterpret_cast <const char *>(buf), sz);
1249+ }
1250+
1251+ void AppendString (std::string* out, std::string_view s) {
1252+ AppendLen (out, s.size ());
1253+ out->append (s);
1254+ }
1255+
1256+ void AddKV (std::string* out, std::string_view key, std::string_view val) {
1257+ AppendString (out, key);
1258+ AppendString (out, val);
1259+ }
1260+
1261+ std::string MakeTaggedChunk (uint32_t id, std::string_view payload) {
1262+ std::string out;
1263+ out.push_back (static_cast <char >(RDB_OPCODE_TAGGED_CHUNK));
1264+
1265+ uint8_t header[8 ];
1266+ absl::little_endian::Store32 (header, id);
1267+ absl::little_endian::Store32 (header + 4 , payload.size ());
1268+ out.append (reinterpret_cast <const char *>(header), sizeof (header));
1269+
1270+ out.append (payload);
1271+ return out;
1272+ }
1273+
1274+ void AppendBinaryDouble (std::string* out, double val) {
1275+ uint64_t bits;
1276+ memcpy (&bits, &val, sizeof (bits));
1277+
1278+ uint8_t buf[8 ];
1279+ absl::little_endian::Store64 (buf, bits);
1280+ out->append (reinterpret_cast <const char *>(buf), sizeof (buf));
1281+ }
1282+
1283+ } // namespace
1284+
1285+ // The following are tests that directly feed byte data to loader to exercise chunk loading.
1286+ // Some of these will become redundant once the saver starts sending chunked data, so instead of
1287+ // hand-crafting data we will be able to load from the db directly.
1288+
1289+ TEST_F (RdbTest, LoadTwoChunks) {
1290+ std::string first;
1291+ first.push_back (RDB_TYPE_HASH);
1292+ AppendString (&first, " h" );
1293+ AppendLen (&first, 2 );
1294+ AddKV (&first, " f1" , " v1" );
1295+
1296+ std::string second;
1297+ AddKV (&second, " f2" , " v2" );
1298+
1299+ std::string body;
1300+ // hash is split across two tagged chunks
1301+ body += MakeTaggedChunk (1 , first);
1302+ body += MakeTaggedChunk (1 , second);
1303+
1304+ const auto ec = pp_->at (0 )->Await ([&] { return LoadRdbData (service_.get (), WrapInRdb (body)); });
1305+ ASSERT_FALSE (ec) << ec.message ();
1306+
1307+ EXPECT_EQ (Run ({" HGET" , " h" , " f1" }), " v1" );
1308+ EXPECT_EQ (Run ({" HGET" , " h" , " f2" }), " v2" );
1309+ }
1310+
1311+ TEST_F (RdbTest, InterleavedLoad) {
1312+ std::string a1;
1313+ a1.push_back (RDB_TYPE_HASH);
1314+ AppendString (&a1, " a" );
1315+ AppendLen (&a1, 2 );
1316+ AddKV (&a1, " f1" , " v1" );
1317+
1318+ std::string b;
1319+ b.push_back (RDB_TYPE_STRING);
1320+ AppendString (&b, " b" );
1321+ AppendString (&b, " plain" );
1322+
1323+ std::string a2;
1324+ AddKV (&a2, " f2" , " v2" );
1325+
1326+ std::string body;
1327+ // interleave - one tag, one plain, then one tag for the same id=1
1328+ body += MakeTaggedChunk (1 , a1);
1329+ body += b;
1330+ body += MakeTaggedChunk (1 , a2);
1331+
1332+ auto ec = pp_->at (0 )->Await ([&] { return LoadRdbData (service_.get (), WrapInRdb (body)); });
1333+ ASSERT_FALSE (ec) << ec.message ();
1334+
1335+ EXPECT_EQ (Run ({" HGET" , " a" , " f1" }), " v1" );
1336+ EXPECT_EQ (Run ({" HGET" , " a" , " f2" }), " v2" );
1337+ EXPECT_EQ (Run ({" GET" , " b" }), " plain" );
1338+ }
1339+
1340+ TEST_F (RdbTest, ChunksAroundJournalOffset) {
1341+ std::string a1;
1342+ a1.push_back (RDB_TYPE_HASH);
1343+ AppendString (&a1, " a" );
1344+ AppendLen (&a1, 2 );
1345+ AddKV (&a1, " f1" , " v1" );
1346+
1347+ std::string a2;
1348+ AddKV (&a2, " f2" , " v2" );
1349+
1350+ std::string body;
1351+ body += MakeTaggedChunk (1 , a1);
1352+
1353+ // put the journal offset in the middle
1354+ body.push_back (static_cast <char >(RDB_OPCODE_JOURNAL_OFFSET));
1355+ uint8_t offset_bytes[8 ];
1356+ absl::little_endian::Store64 (offset_bytes, 1234 );
1357+ body.append (reinterpret_cast <const char *>(offset_bytes), sizeof (offset_bytes));
1358+
1359+ body += MakeTaggedChunk (1 , a2);
1360+
1361+ auto ec = pp_->at (0 )->Await ([&] { return LoadRdbData (service_.get (), WrapInRdb (body), 1234 ); });
1362+ ASSERT_FALSE (ec) << ec.message ();
1363+
1364+ EXPECT_EQ (Run ({" HGET" , " a" , " f1" }), " v1" );
1365+ EXPECT_EQ (Run ({" HGET" , " a" , " f2" }), " v2" );
1366+ }
1367+
1368+ TEST_F (RdbTest, SplitSBF) {
1369+ // this test creates two filter SBF, then splits one of the filters. Since in sbf loading there
1370+ // are two layers of possible splits, intra-filter and inter-filter, this test exercises both
1371+ // splits. A plain string is also added between the split filter.
1372+
1373+ // Creates filter in db to copy the fields from
1374+ auto resp = Run ({" BF.RESERVE" , " bf_src" , " 0.01" , " 10" });
1375+ EXPECT_EQ (resp, " OK" );
1376+ for (size_t i = 0 ; i < 50 ; ++i) {
1377+ resp = Run ({" BF.ADD" , " bf_src" , StrCat (" item" , i)});
1378+ EXPECT_THAT (resp, AnyOf (0 , 1 ));
1379+ }
1380+
1381+ std::string first;
1382+ std::string blob1;
1383+
1384+ // split the blob of the second filter into three chunks. this exercises the loader path where we
1385+ // first try to load the incomplete filter, and return early before that finishes
1386+ constexpr size_t first_split = 17 ;
1387+ constexpr size_t second_split = 13 ;
1388+
1389+ pp_->at (0 )->Await ([&] {
1390+ const DbContext ctx{&namespaces->GetDefaultNamespace (), 0 , GetCurrentTimeMs ()};
1391+ const auto & db = ctx.GetDbSlice (0 );
1392+ auto it = db.FindReadOnly (ctx, " bf_src" , OBJ_SBF);
1393+ ASSERT_TRUE (it.ok ());
1394+
1395+ const SBF* sbf = it.value ()->second .GetSBF ();
1396+ ASSERT_GE (sbf->num_filters (), 2 );
1397+
1398+ const std::string blob0{sbf->data (0 )};
1399+
1400+ blob1 = std::string{sbf->data (1 )};
1401+ ASSERT_GT (blob1.size (), first_split + second_split);
1402+
1403+ first.push_back (RDB_TYPE_SBF2);
1404+ // brand new key whose shape is copied off bf_src
1405+ AppendString (&first, " bf_loaded" );
1406+ AppendLen (&first, 0 );
1407+ AppendBinaryDouble (&first, sbf->grow_factor ());
1408+ AppendBinaryDouble (&first, sbf->fp_probability ());
1409+ AppendLen (&first, sbf->prev_size ());
1410+ AppendLen (&first, sbf->current_size ());
1411+ AppendLen (&first, sbf->max_capacity ());
1412+ AppendLen (&first, sbf->num_filters ());
1413+
1414+ AppendLen (&first, sbf->hashfunc_cnt (0 ));
1415+ // total size of blob0
1416+ AppendLen (&first, blob0.size ());
1417+ // this chunk size (all of blob0 is fit in one chunk)
1418+ AppendLen (&first, blob0.size ());
1419+ first.append (blob0);
1420+
1421+ AppendLen (&first, sbf->hashfunc_cnt (1 ));
1422+ // total size of blob1
1423+ AppendLen (&first, blob1.size ());
1424+ // only 17 bytes from blob1 in this chunk
1425+ AppendLen (&first, first_split);
1426+ first.append (blob1.data (), first_split);
1427+ });
1428+
1429+ // add this plain string between chunks of blob1 filter
1430+ std::string plain;
1431+ plain.push_back (RDB_TYPE_STRING);
1432+ AppendString (&plain, " plain_key" );
1433+ AppendString (&plain, " plain_val" );
1434+
1435+ // p2 of blob1
1436+ std::string second;
1437+ AppendLen (&second, second_split);
1438+ second.append (blob1.data () + first_split, second_split);
1439+
1440+ // p2 of blob1
1441+ std::string third;
1442+ constexpr auto sum = first_split + second_split;
1443+ AppendLen (&third, blob1.size () - sum);
1444+ third.append (blob1.data () + sum, blob1.size () - sum);
1445+
1446+ std::string body;
1447+ body += MakeTaggedChunk (1 , first);
1448+ body += plain;
1449+ body += MakeTaggedChunk (1 , second);
1450+ body += MakeTaggedChunk (1 , third);
1451+
1452+ EXPECT_EQ (Run ({" FLUSHALL" }), " OK" );
1453+
1454+ auto ec = pp_->at (0 )->Await ([&] { return LoadRdbData (service_.get (), WrapInRdb (body)); });
1455+ ASSERT_FALSE (ec) << ec.message ();
1456+
1457+ EXPECT_EQ (Run ({" TYPE" , " bf_loaded" }), " MBbloom--" );
1458+ EXPECT_EQ (Run ({" GET" , " plain_key" }), " plain_val" );
1459+
1460+ for (size_t i = 0 ; i < 50 ; ++i) {
1461+ EXPECT_THAT (Run ({" BF.EXISTS" , " bf_loaded" , StrCat (" item" , i)}), IntArg (1 ));
1462+ }
1463+ }
1464+
12191465} // namespace dfly
0 commit comments