diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 3e594c21886..0b35d243822 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -306,6 +306,7 @@ void ClientKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( RESTORE_RANGES_READ_BATCH, 10000 ); init( BACKUP_RANGE_PARTITIONED_VDIR_INTERVAL, 100000 * 1000000LL ); + init( BACKUP_NUM_OF_PARTITIONS, 100 ); init( BACKUP_CONTAINER_LOCAL_ALLOW_RELATIVE_PATH, false ); init( ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS, false ); if( randomize && buggify() ) { ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS = true; } init( BACKUP_CONSISTENCY_CHECK_REQUIRED_REPLICAS, -2 ); // Do consistency check based on all available storage replicas diff --git a/fdbclient/FileBackupAgent.cpp b/fdbclient/FileBackupAgent.cpp index 68e435a0e54..2d282293945 100644 --- a/fdbclient/FileBackupAgent.cpp +++ b/fdbclient/FileBackupAgent.cpp @@ -2105,6 +2105,11 @@ static Future clearBackupStartID(Reference tr, if (ids.empty()) { TraceEvent("ClearBackup").detail("BackupID", backupUid); tr->clear(backupStartedKey); + // Last backup just finished. If outgoing backup was range-partitioned, ask DD to clear the partition list. + Optional logType = co_await BackupConfig(backupUid).mutationLogType().get(tr); + if (logType.present() && logType.get() == MutationLogType::RANGE_PARTITIONED_LOG) { + tr->set(backupPartitionRequiredKey, backupPartitionRequiredValue(2)); + } } else { tr->set(backupStartedKey, encodeBackupStartedValue(ids)); } @@ -4286,6 +4291,12 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase { if (started.get().present()) { ids = decodeBackupStartedValue(started.get().get()); } + + // First range-partitioned backup on this cluster: ask DD to compute the partition list. + if (ids.empty() && mutationLogType.get().get() == MutationLogType::RANGE_PARTITIONED_LOG) { + tr->set(backupPartitionRequiredKey, backupPartitionRequiredValue(1)); + } + const UID uid = config.getUid(); auto it = std::find_if( ids.begin(), ids.end(), [uid](const std::pair& p) { return p.first == uid; }); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 07ca49a33f3..335313a5fef 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1074,6 +1074,39 @@ std::vector> decodeBackupStartedValue(const ValueRef& va return ids; } +const KeyRef backupPartitionRequiredKey = "\xff\x02/backupPartitionRequired"_sr; +const KeyRef backupPartitionListKey = "\xff\x02/backupPartitionList"_sr; + +Value backupPartitionRequiredValue(int8_t requestType) { + BinaryWriter wr(Unversioned()); + wr << requestType; + return wr.toValue(); +} + +int8_t decodeBackupPartitionRequiredValue(const ValueRef& value) { + int8_t requestType = 0; + if (!value.empty()) { + BinaryReader reader(value, Unversioned()); + reader >> requestType; + } + return requestType; +} + +Value encodeBackupPartitionListValue(const std::vector& partitions) { + BinaryWriter wr(IncludeVersion()); + wr << partitions; + return wr.toValue(); +} + +std::vector decodeBackupPartitionListValue(const ValueRef& value) { + std::vector partitions; + if (!value.empty()) { + BinaryReader reader(value, IncludeVersion()); + reader >> partitions; + } + return partitions; +} + bool mutationForKey(const MutationRef& m, const KeyRef& key) { return isSingleKeyMutation((MutationRef::Type)m.type) && m.param1 == key; } diff --git a/fdbclient/include/fdbclient/Knobs.h b/fdbclient/include/fdbclient/Knobs.h index 493506f69d5..dc05686f1cc 100644 --- a/fdbclient/include/fdbclient/Knobs.h +++ b/fdbclient/include/fdbclient/Knobs.h @@ -208,6 +208,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ClientKnobs : public KnobsImpl> decodeBackupStartedValue(const ValueRef& va // 1 = Send a signal to pause/already paused. extern const KeyRef backupPausedKey; +// The key BackupAgent writes to request DataDistributor to (re)compute partitions for +// range-partitioned backup, or to clear the partition state on backup stop. DD watches +// this key, performs the requested action, and clears it (sets value back to 0). +// "\xff\x02/backupPartitionRequired" := "[[0|1|2]]" +// 0 = cleared / no pending request. +// 1 = initial partition or manual/adaptive re-partition. +// 2 = cleanup partitionMap (issued on backup abort/stop when the last backup leaves). +extern const KeyRef backupPartitionRequiredKey; +Value backupPartitionRequiredValue(int8_t requestType); +int8_t decodeBackupPartitionRequiredValue(const ValueRef& value); + +// The key DataDistributor writes the computed partition list to. CommitProxy will read this +// in a later change to construct the PartitionMap. +// "\xff\x02/backupPartitionList" := "[[vector]]" +extern const KeyRef backupPartitionListKey; +Value encodeBackupPartitionListValue(const std::vector& partitions); +std::vector decodeBackupPartitionListValue(const ValueRef& value); + // "\xff/previousCoordinators" = "[[ClusterConnectionString]]" // Set to the encoded structure of the cluster's previous set of coordinators. // Changed when performing quorumChange. diff --git a/fdbserver/core/BackupPartitionMap.cpp b/fdbserver/core/BackupPartitionMap.cpp index 3f976a4e651..e1baf3fd95f 100644 --- a/fdbserver/core/BackupPartitionMap.cpp +++ b/fdbserver/core/BackupPartitionMap.cpp @@ -21,6 +21,7 @@ #include "fdbserver/core/BackupPartitionMap.h" #include "fdbclient/JsonBuilder.h" #include "fdbclient/KeyRangeMap.h" +#include "fdbclient/Knobs.h" #include "fdbclient/SystemData.h" std::string serializePartitionListJSON(PartitionMap const& partitionMap) { @@ -42,8 +43,7 @@ std::string serializePartitionListJSON(PartitionMap const& partitionMap) { // KeyRangeMap guarantees that key ranges are contiguous with no gaps in shards. Future> calculateBackupPartitionKeyRanges(KeyRangeMap* shards) { - // TODO akanksha: Hardcoded for now. - const int NUM_PARTITIONS = 100; + const int NUM_PARTITIONS = CLIENT_KNOBS->BACKUP_NUM_OF_PARTITIONS; std::vector> userShards; // Pair of shard key range and shard size in bytes. int64_t totalBytes = 0; diff --git a/fdbserver/core/include/fdbserver/core/BackupPartitionMap.h b/fdbserver/core/include/fdbserver/core/BackupPartitionMap.h index 56b776bce36..9471dc655ee 100644 --- a/fdbserver/core/include/fdbserver/core/BackupPartitionMap.h +++ b/fdbserver/core/include/fdbserver/core/BackupPartitionMap.h @@ -21,6 +21,7 @@ #pragma once #include "fdbclient/FDBTypes.h" +#include "fdbclient/KeyRangeMap.h" #include "fdbserver/core/ShardMetrics.h" #include #include @@ -38,10 +39,13 @@ struct Partition { } }; -typedef std::vector PartitionList; +using PartitionList = std::vector; // NOTE: PartitionMap is ordered by Tag so that multiple backup workers can upload the same content and overwrite to // blob storage at the same time without conflicts. If the map is not ordered, then there can be conflicts in blob // storage when multiple backup workers upload the partition map at the same time. -typedef std::map PartitionMap; +using PartitionMap = std::map; std::string serializePartitionListJSON(PartitionMap const& PartitionMap); + +// Partitions the user keyspace into balanced contiguous KeyRanges using shard byte sizes. +Future> calculateBackupPartitionKeyRanges(KeyRangeMap* shards); diff --git a/fdbserver/datadistributor/DataDistribution.cpp b/fdbserver/datadistributor/DataDistribution.cpp index b251cae0d51..565c836c027 100644 --- a/fdbserver/datadistributor/DataDistribution.cpp +++ b/fdbserver/datadistributor/DataDistribution.cpp @@ -30,6 +30,7 @@ #include "fdbclient/RunRYWTransaction.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/SystemData.h" +#include "fdbserver/core/BackupPartitionMap.h" #include "fdbserver/core/BulkDumpUtil.h" #include "fdbserver/core/BulkLoadUtil.h" #include "fdbserver/datadistributor/DataDistributor.h" @@ -307,6 +308,79 @@ Future remoteRecovered(Reference const> db) { } } +// Watches backupPartitionRequiredKey. +// On value=1, computes the user keyspace partitions and writes them to backupPartitionListKey. +// On value=2, clears backupPartitionListKey. +// In both cases the request key is cleared in the same commit. +Future monitorBackupPartitionRequired(Database cx, KeyRangeMap* shards, UID ddId) { + // The partition computation can wait arbitrarily long on shard-metrics tracking, so it runs OUTSIDE any + // transaction to avoid transaction_too_old. A short re-read in the write transaction protects against + // the race where a value=2 (cleanup) arrives while we are computing for a value=1. + while (true) { + // Phase 1: peek the request key in a loop. If nothing pending, park on watch and wait, then re-read. + int8_t requestType = 0; + while (requestType == 0) { + ReadYourWritesTransaction tr(cx); + Error err; + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + Optional value = co_await tr.get(backupPartitionRequiredKey); + requestType = value.present() ? decodeBackupPartitionRequiredValue(value.get()) : 0; + if (requestType == 0) { + Future watchFuture = tr.watch(backupPartitionRequiredKey); + co_await tr.commit(); + co_await watchFuture; + } + continue; + } catch (Error& e) { + err = e; + } + co_await tr.onError(err); + } + + // Phase 2: compute outside any transaction (may wait long on shard metrics). + std::vector partitions; + if (requestType == 1) { + partitions = co_await calculateBackupPartitionKeyRanges(shards); + } + + // Phase 3: short txn to re-check the request value and write the result. + { + ReadYourWritesTransaction tr(cx); + while (true) { + Error err; + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + Optional value = co_await tr.get(backupPartitionRequiredKey); + int8_t currentType = value.present() ? decodeBackupPartitionRequiredValue(value.get()) : 0; + if (currentType != requestType) { + // Someone wrote a new request while we were computing partitions; restart the outer loop + // so the next iteration acts on the new value. + break; + } + if (requestType == 1) { + tr.set(backupPartitionListKey, encodeBackupPartitionListValue(partitions)); + tr.clear(backupPartitionRequiredKey); + co_await tr.commit(); + TraceEvent("DDBackupPartitionsComputed", ddId).detail("NumPartitions", partitions.size()); + } else { + tr.clear(backupPartitionListKey); + tr.clear(backupPartitionRequiredKey); + co_await tr.commit(); + TraceEvent("DDBackupPartitionsCleared", ddId); + } + break; + } catch (Error& e) { + err = e; + } + co_await tr.onError(err); + } + } + } +} + // Ensures that the serverKeys key space is properly coalesced // This method is only used for testing and is not implemented in a manner that is safe for large databases Future debugCheckCoalescing(Database cx) { @@ -2814,6 +2888,7 @@ Future dataDistribution(Reference self, } actors.push_back(self->pollMoveKeysLock()); + actors.push_back(monitorBackupPartitionRequired(self->txnProcessor->context(), &shards, self->ddId)); self->context->tracker = makeReference( DataDistributionTrackerInitParams{ .db = self->txnProcessor,