Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ void ClientKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( RESTORE_RANGES_READ_BATCH, 10000 );

init( BACKUP_RANGE_PARTITIONED_VDIR_INTERVAL, 100000 * 1000000LL );
init( BACKUP_NUM_OF_PARTITIONS, 100 );
init( BACKUP_CONTAINER_LOCAL_ALLOW_RELATIVE_PATH, false );
init( ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS, false ); if( randomize && buggify() ) { ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS = true; }
init( BACKUP_CONSISTENCY_CHECK_REQUIRED_REPLICAS, -2 ); // Do consistency check based on all available storage replicas
Expand Down
11 changes: 11 additions & 0 deletions fdbclient/FileBackupAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,11 @@ static Future<Void> clearBackupStartID(Reference<ReadYourWritesTransaction> tr,
if (ids.empty()) {
TraceEvent("ClearBackup").detail("BackupID", backupUid);
tr->clear(backupStartedKey);
// Last backup just finished. If outgoing backup was range-partitioned, ask DD to clear the partition list.
Optional<MutationLogType> logType = co_await BackupConfig(backupUid).mutationLogType().get(tr);
if (logType.present() && logType.get() == MutationLogType::RANGE_PARTITIONED_LOG) {
tr->set(backupPartitionRequiredKey, backupPartitionRequiredValue(2));
}
Comment thread
akankshamahajan15 marked this conversation as resolved.
} else {
tr->set(backupStartedKey, encodeBackupStartedValue(ids));
}
Expand Down Expand Up @@ -4286,6 +4291,12 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase {
if (started.get().present()) {
ids = decodeBackupStartedValue(started.get().get());
}

// First range-partitioned backup on this cluster: ask DD to compute the partition list.
if (ids.empty() && mutationLogType.get().get() == MutationLogType::RANGE_PARTITIONED_LOG) {
Comment thread
akankshamahajan15 marked this conversation as resolved.
tr->set(backupPartitionRequiredKey, backupPartitionRequiredValue(1));
}

const UID uid = config.getUid();
auto it = std::find_if(
ids.begin(), ids.end(), [uid](const std::pair<UID, Version>& p) { return p.first == uid; });
Expand Down
33 changes: 33 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,39 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
return ids;
}

const KeyRef backupPartitionRequiredKey = "\xff\x02/backupPartitionRequired"_sr;
const KeyRef backupPartitionListKey = "\xff\x02/backupPartitionList"_sr;

Value backupPartitionRequiredValue(int8_t requestType) {
BinaryWriter wr(Unversioned());
wr << requestType;
return wr.toValue();
}

int8_t decodeBackupPartitionRequiredValue(const ValueRef& value) {
int8_t requestType = 0;
if (!value.empty()) {
BinaryReader reader(value, Unversioned());
reader >> requestType;
}
return requestType;
}

Value encodeBackupPartitionListValue(const std::vector<KeyRange>& partitions) {
BinaryWriter wr(IncludeVersion());
wr << partitions;
return wr.toValue();
}

std::vector<KeyRange> decodeBackupPartitionListValue(const ValueRef& value) {
std::vector<KeyRange> partitions;
if (!value.empty()) {
BinaryReader reader(value, IncludeVersion());
reader >> partitions;
}
return partitions;
}
Comment thread
akankshamahajan15 marked this conversation as resolved.

bool mutationForKey(const MutationRef& m, const KeyRef& key) {
return isSingleKeyMutation((MutationRef::Type)m.type) && m.param1 == key;
}
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/include/fdbclient/Knobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ClientKnobs : public KnobsImpl<ClientKno

// interval for version directory bucketing in range-partitioned backup.
int64_t BACKUP_RANGE_PARTITIONED_VDIR_INTERVAL;
// Number of contiguous user keyspace partitions for range-partitioned backup.
int BACKUP_NUM_OF_PARTITIONS;
bool BACKUP_CONTAINER_LOCAL_ALLOW_RELATIVE_PATH;
bool ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS;
int BACKUP_CONSISTENCY_CHECK_REQUIRED_REPLICAS;
Expand Down
18 changes: 18 additions & 0 deletions fdbclient/include/fdbclient/SystemData.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,24 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
// 1 = Send a signal to pause/already paused.
extern const KeyRef backupPausedKey;

// The key BackupAgent writes to request DataDistributor to (re)compute partitions for
// range-partitioned backup, or to clear the partition state on backup stop. DD watches
// this key, performs the requested action, and clears it (sets value back to 0).
// "\xff\x02/backupPartitionRequired" := "[[0|1|2]]"
// 0 = cleared / no pending request.
// 1 = initial partition or manual/adaptive re-partition.
// 2 = cleanup partitionMap (issued on backup abort/stop when the last backup leaves).
extern const KeyRef backupPartitionRequiredKey;
Value backupPartitionRequiredValue(int8_t requestType);
int8_t decodeBackupPartitionRequiredValue(const ValueRef& value);

// The key DataDistributor writes the computed partition list to. CommitProxy will read this
// in a later change to construct the PartitionMap.
// "\xff\x02/backupPartitionList" := "[[vector<KeyRange>]]"
extern const KeyRef backupPartitionListKey;
Value encodeBackupPartitionListValue(const std::vector<KeyRange>& partitions);
std::vector<KeyRange> decodeBackupPartitionListValue(const ValueRef& value);

// "\xff/previousCoordinators" = "[[ClusterConnectionString]]"
// Set to the encoded structure of the cluster's previous set of coordinators.
// Changed when performing quorumChange.
Expand Down
4 changes: 2 additions & 2 deletions fdbserver/core/BackupPartitionMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "fdbserver/core/BackupPartitionMap.h"
#include "fdbclient/JsonBuilder.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/SystemData.h"

std::string serializePartitionListJSON(PartitionMap const& partitionMap) {
Expand All @@ -42,8 +43,7 @@ std::string serializePartitionListJSON(PartitionMap const& partitionMap) {

// KeyRangeMap guarantees that key ranges are contiguous with no gaps in shards.
Future<std::vector<KeyRange>> calculateBackupPartitionKeyRanges(KeyRangeMap<ShardTrackedData>* shards) {
// TODO akanksha: Hardcoded for now.
const int NUM_PARTITIONS = 100;
const int NUM_PARTITIONS = CLIENT_KNOBS->BACKUP_NUM_OF_PARTITIONS;
std::vector<std::pair<KeyRange, int64_t>> userShards; // Pair of shard key range and shard size in bytes.
int64_t totalBytes = 0;

Expand Down
8 changes: 6 additions & 2 deletions fdbserver/core/include/fdbserver/core/BackupPartitionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#pragma once

#include "fdbclient/FDBTypes.h"
#include "fdbclient/KeyRangeMap.h"
#include "fdbserver/core/ShardMetrics.h"
#include <map>
#include <vector>
Expand All @@ -38,10 +39,13 @@ struct Partition {
}
};

typedef std::vector<Partition> PartitionList;
using PartitionList = std::vector<Partition>;
// NOTE: PartitionMap is ordered by Tag so that multiple backup workers can upload the same content and overwrite to
// blob storage at the same time without conflicts. If the map is not ordered, then there can be conflicts in blob
// storage when multiple backup workers upload the partition map at the same time.
typedef std::map<Tag, PartitionList> PartitionMap;
using PartitionMap = std::map<Tag, PartitionList>;

std::string serializePartitionListJSON(PartitionMap const& PartitionMap);

// Partitions the user keyspace into balanced contiguous KeyRanges using shard byte sizes.
Future<std::vector<KeyRange>> calculateBackupPartitionKeyRanges(KeyRangeMap<ShardTrackedData>* shards);
75 changes: 75 additions & 0 deletions fdbserver/datadistributor/DataDistribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "fdbclient/RunRYWTransaction.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/core/BackupPartitionMap.h"
#include "fdbserver/core/BulkDumpUtil.h"
#include "fdbserver/core/BulkLoadUtil.h"
#include "fdbserver/datadistributor/DataDistributor.h"
Expand Down Expand Up @@ -307,6 +308,79 @@ Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
}
}

// Watches backupPartitionRequiredKey.
// On value=1, computes the user keyspace partitions and writes them to backupPartitionListKey.
// On value=2, clears backupPartitionListKey.
// In both cases the request key is cleared in the same commit.
Future<Void> monitorBackupPartitionRequired(Database cx, KeyRangeMap<ShardTrackedData>* shards, UID ddId) {
// The partition computation can wait arbitrarily long on shard-metrics tracking, so it runs OUTSIDE any
// transaction to avoid transaction_too_old. A short re-read in the write transaction protects against
// the race where a value=2 (cleanup) arrives while we are computing for a value=1.
while (true) {
// Phase 1: peek the request key in a loop. If nothing pending, park on watch and wait, then re-read.
int8_t requestType = 0;
while (requestType == 0) {
ReadYourWritesTransaction tr(cx);
Error err;
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> value = co_await tr.get(backupPartitionRequiredKey);
requestType = value.present() ? decodeBackupPartitionRequiredValue(value.get()) : 0;
if (requestType == 0) {
Future<Void> watchFuture = tr.watch(backupPartitionRequiredKey);
co_await tr.commit();
co_await watchFuture;
}
continue;
} catch (Error& e) {
err = e;
}
co_await tr.onError(err);
}

// Phase 2: compute outside any transaction (may wait long on shard metrics).
std::vector<KeyRange> partitions;
if (requestType == 1) {
partitions = co_await calculateBackupPartitionKeyRanges(shards);
}

// Phase 3: short txn to re-check the request value and write the result.
{
ReadYourWritesTransaction tr(cx);
while (true) {
Error err;
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> value = co_await tr.get(backupPartitionRequiredKey);
int8_t currentType = value.present() ? decodeBackupPartitionRequiredValue(value.get()) : 0;
if (currentType != requestType) {
// Someone wrote a new request while we were computing partitions; restart the outer loop
// so the next iteration acts on the new value.
break;
}
if (requestType == 1) {
tr.set(backupPartitionListKey, encodeBackupPartitionListValue(partitions));
tr.clear(backupPartitionRequiredKey);
co_await tr.commit();
TraceEvent("DDBackupPartitionsComputed", ddId).detail("NumPartitions", partitions.size());
} else {
tr.clear(backupPartitionListKey);
tr.clear(backupPartitionRequiredKey);
co_await tr.commit();
TraceEvent("DDBackupPartitionsCleared", ddId);
}
break;
} catch (Error& e) {
err = e;
}
co_await tr.onError(err);
}
}
}
}

// Ensures that the serverKeys key space is properly coalesced
// This method is only used for testing and is not implemented in a manner that is safe for large databases
Future<Void> debugCheckCoalescing(Database cx) {
Expand Down Expand Up @@ -2814,6 +2888,7 @@ Future<Void> dataDistribution(Reference<DataDistributor> self,
}

actors.push_back(self->pollMoveKeysLock());
actors.push_back(monitorBackupPartitionRequired(self->txnProcessor->context(), &shards, self->ddId));

self->context->tracker = makeReference<DataDistributionTracker>(
DataDistributionTrackerInitParams{ .db = self->txnProcessor,
Expand Down
Loading