Add fdbcli blobrestore to start the full restore

This commit is contained in:
Hui Liu 2022-10-26 09:55:55 -07:00
parent 012465ec26
commit 5834517570
18 changed files with 323 additions and 66 deletions

View File

@ -0,0 +1,47 @@
/*
* BlobRestoreCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/FDBOptions.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
ACTOR Future<bool> blobRestoreCommandActor(Database localDb, std::vector<StringRef> tokens) {
if (tokens.size() != 1 && tokens.size() != 2) {
printUsage(tokens[0]);
return false;
}
state bool success = false;
wait(store(success, localDb->blobRestore(normalKeys)));
if (success) {
fmt::print("Started blob restore for the full cluster. Please use 'status' command to check progress.\n");
} else {
fmt::print("Fail to start a new blob restore while there is a pending one.\n");
}
return success;
}
CommandFactory blobRestoreFactory("blobrestore", CommandHelp("blobrestore", "", ""));
} // namespace fdb_cli

View File

@ -1416,6 +1416,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
continue;
}
if (tokencmp(tokens[0], "blobrestore")) {
bool _result = wait(makeInterruptable(blobRestoreCommandActor(localDb, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "unlock")) {
if ((tokens.size() != 2) || (tokens[1].size() != 32) ||
!std::all_of(tokens[1].begin(), tokens[1].end(), &isxdigit)) {

View File

@ -213,6 +213,9 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb,
ACTOR Future<bool> blobKeyCommandActor(Database localDb,
Optional<TenantMapEntry> tenantEntry,
std::vector<StringRef> tokens);
// blobrestore command
ACTOR Future<bool> blobRestoreCommandActor(Database localDb, std::vector<StringRef> tokens);
// maintenance command
ACTOR Future<bool> setHealthyZone(Reference<IDatabase> db, StringRef zoneId, double seconds, bool printWarning = false);
ACTOR Future<bool> clearHealthyZone(Reference<IDatabase> db,

View File

@ -10915,6 +10915,37 @@ Future<Standalone<VectorRef<KeyRangeRef>>> DatabaseContext::listBlobbifiedRanges
return listBlobbifiedRangesActor(Reference<DatabaseContext>::addRef(this), range, rangeLimit, tenantName);
}
ACTOR Future<bool> blobRestoreActor(Reference<DatabaseContext> cx, KeyRange range) {
state Database db(cx);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Key key = blobRestoreCommandKeyFor(range);
Optional<Value> value = wait(tr->get(key));
if (value.present()) {
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(value.get());
if (status.progress < 100) {
return false; // stop if there is in-progress restore.
}
}
Standalone<BlobRestoreStatus> status;
status.progress = 0;
Value newValue = blobRestoreCommandValueFor(status);
tr->set(key, newValue);
wait(tr->commit());
return true;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
Future<bool> DatabaseContext::blobRestore(KeyRange range) {
return blobRestoreActor(Reference<DatabaseContext>::addRef(this), range);
}
int64_t getMaxKeySize(KeyRef const& key) {
return getMaxWriteKeySize(key, true);
}

View File

@ -1660,6 +1660,36 @@ BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) {
return interf;
}
const KeyRangeRef blobRestoreCommandKeys("\xff\x02/blobRestoreCommand/"_sr, "\xff\x02/blobRestoreCommand0"_sr);
const Value blobRestoreCommandKeyFor(const KeyRangeRef range) {
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
wr.serializeBytes(blobRestoreCommandKeys.begin);
wr << range;
return wr.toValue();
}
const KeyRange decodeBlobRestoreCommandKeyFor(const KeyRef key) {
KeyRange range;
BinaryReader reader(key.removePrefix(blobRestoreCommandKeys.begin),
AssumeVersion(ProtocolVersion::withBlobGranule()));
reader >> range;
return range;
}
const Value blobRestoreCommandValueFor(BlobRestoreStatus status) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << status;
return wr.toValue();
}
Standalone<BlobRestoreStatus> decodeBlobRestoreStatus(ValueRef const& value) {
Standalone<BlobRestoreStatus> status;
BinaryReader reader(value, IncludeVersion());
reader >> status;
return status;
}
const KeyRangeRef storageQuotaKeys("\xff/storageQuota/"_sr, "\xff/storageQuota0"_sr);
const KeyRef storageQuotaPrefix = storageQuotaKeys.begin;

View File

@ -313,4 +313,15 @@ struct BlobManifest {
}
};
// Defines blob restore status
struct BlobRestoreStatus {
constexpr static FileIdentifier file_identifier = 378657;
int progress;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, progress);
}
};
#endif

View File

@ -403,6 +403,7 @@ public:
Future<Version> verifyBlobRange(const KeyRange& range,
Optional<Version> version,
Optional<TenantName> tenantName = {});
Future<bool> blobRestore(const KeyRange range);
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,

View File

@ -710,6 +710,13 @@ UID decodeBlobWorkerListKey(KeyRef const& key);
const Value blobWorkerListValue(BlobWorkerInterface const& interface);
BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value);
// Blob restore command
extern const KeyRangeRef blobRestoreCommandKeys;
const Value blobRestoreCommandKeyFor(const KeyRangeRef range);
const KeyRange decodeBlobRestoreCommandKeyFor(const KeyRef key);
const Value blobRestoreCommandValueFor(BlobRestoreStatus status);
Standalone<BlobRestoreStatus> decodeBlobRestoreStatus(ValueRef const& value);
// Storage quota per tenant
// "\xff/storageQuota/[[tenantName]]" := "[[quota]]"
extern const KeyRangeRef storageQuotaKeys;

View File

@ -388,6 +388,8 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
Promise<Void> iAmReplaced;
bool isFullRestoreMode = false;
BlobManagerData(UID id,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Database db,
@ -3537,7 +3539,10 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
bmData->startRecruiting.trigger();
bmData->initBStore();
if (isFullRestoreMode()) {
bool isFullRestore = wait(isFullRestoreMode(bmData->db, normalKeys));
bmData->isFullRestoreMode = isFullRestore;
if (bmData->isFullRestoreMode) {
wait(loadManifest(bmData->db, bmData->bstore));
int64_t epoc = wait(lastBlobEpoc(bmData->db, bmData->bstore));
@ -5297,11 +5302,8 @@ ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) {
bmData->initBStore();
loop {
bool pendingSplit = wait(hasPendingSplit(bmData));
if (!pendingSplit) {
wait(dumpManifest(bmData->db, bmData->bstore, bmData->epoch, bmData->manifestDumperSeqNo));
bmData->manifestDumperSeqNo++;
}
wait(dumpManifest(bmData->db, bmData->bstore, bmData->epoch, bmData->manifestDumperSeqNo));
bmData->manifestDumperSeqNo++;
wait(delay(SERVER_KNOBS->BLOB_MANIFEST_BACKUP_INTERVAL));
}
}
@ -5370,7 +5372,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
if (SERVER_KNOBS->BG_ENABLE_MERGING) {
self->addActor.send(granuleMergeChecker(self));
}
if (SERVER_KNOBS->BLOB_MANIFEST_BACKUP && !isFullRestoreMode()) {
if (SERVER_KNOBS->BLOB_MANIFEST_BACKUP && !self->isFullRestoreMode) {
self->addActor.send(backupManifest(self));
}

View File

@ -60,7 +60,7 @@ struct BlobManifestFile {
int64_t seqNo{ 0 };
BlobManifestFile(const std::string& path) {
if (sscanf(path.c_str(), MANIFEST_FOLDER "/manifest.%" SCNd64 ".%" SCNd64, &epoch, &seqNo) == 2) {
if (sscanf(path.c_str(), MANIFEST_FOLDER "/" MANIFEST_FOLDER ".%" SCNd64 ".%" SCNd64, &epoch, &seqNo) == 2) {
fileName = path;
}
}
@ -76,7 +76,7 @@ struct BlobManifestFile {
BlobManifestFile file(path);
return file.epoch > 0 && file.seqNo > 0;
};
BackupContainerFileSystem::FilesAndSizesT filesAndSizes = wait(reader->listFiles(MANIFEST_FOLDER, filter));
BackupContainerFileSystem::FilesAndSizesT filesAndSizes = wait(reader->listFiles(MANIFEST_FOLDER "/", filter));
std::vector<BlobManifestFile> result;
for (auto& f : filesAndSizes) {
@ -107,6 +107,9 @@ public:
try {
state Standalone<BlobManifest> manifest;
Standalone<VectorRef<KeyValueRef>> rows = wait(getSystemKeys(self));
if (rows.size() == 0) {
return Void();
}
manifest.rows = rows;
Value data = encode(manifest);
wait(writeToFile(self, data));
@ -153,7 +156,8 @@ private:
state std::string fullPath;
std::tie(writer, fullPath) = self->blobConn_->createForWrite(MANIFEST_FOLDER);
state std::string fileName = format(MANIFEST_FOLDER "/manifest.%lld.%lld", self->epoch_, self->seqNo_);
state std::string fileName =
format(MANIFEST_FOLDER "/" MANIFEST_FOLDER ".%lld.%lld", self->epoch_, self->seqNo_);
state Reference<IBackupFile> file = wait(writer->writeFile(fileName));
wait(file->append(data.begin(), data.size()));
wait(file->finish());
@ -453,3 +457,26 @@ ACTOR Future<int64_t> lastBlobEpoc(Database db, Reference<BlobConnectionProvider
int64_t epoc = wait(BlobManifestLoader::lastBlobEpoc(loader));
return epoc;
}
// Return true if the given key range is restoring
ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef keys) {
state Transaction tr(db);
loop {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
RangeResult ranges = wait(tr.getRange(blobRestoreCommandKeys, CLIENT_KNOBS->TOO_MANY));
for (auto& r : ranges) {
KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key);
if (keyRange.contains(keys)) {
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(r.value);
return status.progress < 100; // progress is less than 100
}
}
return false;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}

View File

@ -21,6 +21,7 @@
#include "flow/ActorCollection.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/BlobConnectionProvider.h"
@ -63,14 +64,7 @@ public:
// Start migration
ACTOR static Future<Void> start(Reference<BlobMigrator> self) {
if (!isFullRestoreMode()) {
return Void();
}
wait(delay(10)); // TODO need to wait for a signal for readiness of blob manager
BlobGranuleRestoreVersionVector granules = wait(listBlobGranules(self->db_, self->blobConn_));
self->blobGranules_ = granules;
wait(checkIfReadyForMigration(self));
wait(prepare(self, normalKeys));
wait(advanceVersion(self));
wait(serverLoop(self));
@ -78,6 +72,28 @@ public:
}
private:
// Check if blob manifest is loaded so that blob migration can start
ACTOR static Future<Void> checkIfReadyForMigration(Reference<BlobMigrator> self) {
loop {
bool isFullRestore = wait(isFullRestoreMode(self->db_, normalKeys));
if (isFullRestore) {
BlobGranuleRestoreVersionVector granules = wait(listBlobGranules(self->db_, self->blobConn_));
if (!granules.empty()) {
self->blobGranules_ = granules;
for (BlobGranuleRestoreVersion granule : granules) {
TraceEvent("RestorableGranule")
.detail("GranuleId", granule.granuleID.toString())
.detail("KeyRange", granule.keyRange.toString())
.detail("Version", granule.version)
.detail("SizeInBytes", granule.sizeInBytes);
}
return Void();
}
}
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
}
}
// Prepare for data migration for given key range.
ACTOR static Future<Void> prepare(Reference<BlobMigrator> self, KeyRangeRef keys) {
// Register as a storage server, so that DataDistributor could start data movement after
@ -136,8 +152,9 @@ private:
}
}
if (owning) {
dprint("Unassign {} from storage server {}\n", keys.toString(), id.toString());
wait(krmSetRange(&tr, serverKeysPrefixFor(id), keys, serverKeysFalse));
dprint("Unassign {} from storage server {}\n", keys.toString(), id.toString());
TraceEvent("UnassignKeys").detail("Keys", keys.toString()).detail("From", id.toString());
}
}
wait(tr.commit());
@ -185,8 +202,10 @@ private:
// Calculated progress
int64_t total = sizeInBytes(self);
int progress = (total - incompleted) * 100 / total;
bool done = incompleted == 0;
dprint("Progress {} :{}%. done {}\n", serverID.toString(), progress, done);
state bool done = incompleted == 0;
dprint("Migration progress :{}%. done {}\n", progress, done);
TraceEvent("BlobMigratorProgress").detail("Progress", progress).detail("Done", done);
wait(updateProgress(self, normalKeys, progress));
return done;
} catch (Error& e) {
wait(tr.onError(e));
@ -194,6 +213,32 @@ private:
}
}
// Update restore progress
ACTOR static Future<Void> updateProgress(Reference<BlobMigrator> self, KeyRangeRef range, int progress) {
state Transaction tr(self->db_);
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state Key key = blobRestoreCommandKeyFor(range);
Optional<Value> value = wait(tr.get(key));
if (value.present()) {
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(value.get());
if (progress > status.progress) {
status.progress = progress;
Value updatedValue = blobRestoreCommandValueFor(status);
tr.set(key, updatedValue);
wait(tr.commit());
}
}
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Advance version, so that future commits will have a larger version than the restored data
ACTOR static Future<Void> advanceVersion(Reference<BlobMigrator> self) {
state Transaction tr(self->db_);
@ -207,6 +252,7 @@ private:
if (currentVersion <= expectedVersion) {
tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(expectedVersion + 1, Unversioned()));
dprint("Advance version from {} to {}\n", currentVersion, expectedVersion);
TraceEvent("AdvanceVersion").detail("Current", currentVersion).detail("New", expectedVersion);
wait(tr.commit());
}
return Void();
@ -218,7 +264,7 @@ private:
// Main server loop
ACTOR static Future<Void> serverLoop(Reference<BlobMigrator> self) {
self->actors_.add(waitFailureServer(self->interf_.ssi.waitFailure.getFuture()));
self->actors_.add(waitFailureServer(self->interf_.waitFailure.getFuture()));
self->actors_.add(logProgress(self));
self->actors_.add(handleRequest(self));
self->actors_.add(handleUnsupportedRequest(self));
@ -226,6 +272,7 @@ private:
try {
choose {
when(HaltBlobMigratorRequest req = waitNext(self->interf_.haltBlobMigrator.getFuture())) {
dprint("Stopping blob migrator {}\n", self->interf_.id().toString());
req.reply.send(Void());
TraceEvent("BlobMigratorHalted", self->interf_.id()).detail("ReqID", req.requesterID);
break;
@ -237,6 +284,8 @@ private:
throw;
}
}
self->actors_.clear(true);
dprint("Stopped blob migrator {}\n", self->interf_.id().toString());
return Void();
}
@ -267,7 +316,7 @@ private:
req.reply.send(rep);
}
when(GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) {
fmt::print("Handle GetStorageMetrics\n");
// fmt::print("Handle GetStorageMetrics\n");
StorageMetrics metrics;
metrics.bytes = sizeInBytes(self);
GetStorageMetricsReply resp;
@ -331,7 +380,7 @@ private:
req.reply.sendError(unsupported_operation());
}
when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) {
dprint("Unsupported UpdateCommitCostRequest\n");
// dprint("Unsupported UpdateCommitCostRequest\n");
req.reply.sendError(unsupported_operation());
}
when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) {
@ -358,9 +407,9 @@ private:
}
ACTOR static Future<Void> processStorageQueuingMetricsRequest(StorageQueuingMetricsRequest req) {
dprint("Unsupported StorageQueuingMetricsRequest\n");
// FIXME get rid of this delay. it's a temp solution to avoid starvaion scheduling of DD
// processes
// dprint("Unsupported StorageQueuingMetricsRequest\n");
// FIXME get rid of this delay. it's a temp solution to avoid starvaion scheduling of DD
// processes
wait(delay(1));
req.reply.sendError(unsupported_operation());
return Void();
@ -398,7 +447,8 @@ private:
// Main entry point
ACTOR Future<Void> blobMigrator(BlobMigratorInterface interf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
fmt::print("Start blob migrator {} \n", interf.id().toString());
TraceEvent("StartBlobMigrator").detail("Interface", interf.id().toString());
dprint("Starting blob migrator {}\n", interf.id().toString());
try {
Reference<BlobMigrator> self = makeReference<BlobMigrator>(dbInfo, interf);
wait(BlobMigrator::start(self));

View File

@ -292,6 +292,8 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
int64_t lastResidentMemory = 0;
double lastResidentMemoryCheckTime = -100.0;
bool isFullRestoreMode = false;
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db)
: id(id), db(db), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo),
initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)),
@ -2146,7 +2148,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
// No need to start Change Feed in full restore mode
if (isFullRestoreMode())
if (bwData->isFullRestoreMode)
return Void();
checkMergeCandidate = granuleCheckMergeCandidate(bwData,
@ -3588,7 +3590,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
state Reference<GranuleMetadata> metadata = m;
// state Version granuleBeginVersion = req.beginVersion;
// skip waiting for CF ready for recovery mode
if (!isFullRestoreMode()) {
if (!bwData->isFullRestoreMode) {
choose {
when(wait(metadata->readable.getFuture())) {}
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
@ -3646,7 +3648,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// this is an active granule query
loop {
// skip check since CF doesn't start for bare metal recovery mode
if (isFullRestoreMode()) {
if (bwData->isFullRestoreMode) {
break;
}
if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) {
@ -3689,7 +3691,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// if feed was popped by another worker and BW only got empty versions, it wouldn't itself see that it
// got popped, but we can still reject the in theory this should never happen with other protections but
// it's a useful and inexpensive sanity check
if (!isFullRestoreMode()) {
if (!bwData->isFullRestoreMode) {
Version emptyVersion = metadata->activeCFData.get()->popVersion - 1;
if (req.readVersion > metadata->durableDeltaVersion.get() &&
emptyVersion > metadata->bufferedDeltaVersion) {
@ -3995,6 +3997,9 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
throw granule_assignment_conflict();
}
bool isFullRestore = wait(isFullRestoreMode(bwData->db, req.keyRange));
bwData->isFullRestoreMode = isFullRestore;
Optional<Value> prevLockValue = wait(fLockValue);
state bool hasPrevOwner = prevLockValue.present();
state bool createChangeFeed = false;
@ -4069,7 +4074,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
}
// for recovery mode - don't create change feed, don't create snapshot
if (isFullRestoreMode()) {
if (bwData->isFullRestoreMode) {
createChangeFeed = false;
info.doSnapshot = false;
GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, info.granuleID));
@ -4091,7 +4096,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
}
}
if (createChangeFeed && !isFullRestoreMode()) {
if (createChangeFeed && !bwData->isFullRestoreMode) {
// create new change feed for new version of granule
wait(updateChangeFeed(
&tr, granuleIDToCFKey(info.granuleID), ChangeFeedStatus::CHANGE_FEED_CREATE, req.keyRange));
@ -4103,7 +4108,8 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
// If anything in previousGranules, need to do the handoff logic and set
// ret.previousChangeFeedId, and the previous durable version will come from the previous
// granules
if (info.history.present() && info.history.get().value.parentVersions.size() > 0 && !isFullRestoreMode()) {
if (info.history.present() && info.history.get().value.parentVersions.size() > 0 &&
!bwData->isFullRestoreMode) {
CODE_PROBE(true, "Granule open found parent");
if (info.history.get().value.parentVersions.size() == 1) { // split
state KeyRangeRef parentRange(info.history.get().value.parentBoundaries[0],

View File

@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <set>
#include <tuple>
#include <vector>
#include "fdbclient/FDBTypes.h"
@ -691,7 +692,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
WorkerDetails newMGWorker;
if (self->db.blobGranulesEnabled.get()) {
newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used);
if (isFullRestoreMode()) {
if (self->db.blobRestoreEnabled.get()) {
newMGWorker = findNewProcessForSingleton(self, ProcessClass::BlobMigrator, id_used);
}
}
@ -710,7 +711,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
ProcessClass::Fitness bestFitnessForMG;
if (self->db.blobGranulesEnabled.get()) {
bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager);
if (isFullRestoreMode()) {
if (self->db.blobRestoreEnabled.get()) {
bestFitnessForMG = findBestFitnessForSingleton(self, newMGWorker, ProcessClass::BlobManager);
}
}
@ -744,7 +745,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
if (self->db.blobGranulesEnabled.get()) {
bmHealthy = isHealthySingleton<BlobManagerInterface>(
self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID);
if (isFullRestoreMode()) {
if (self->db.blobRestoreEnabled.get()) {
mgHealthy = isHealthySingleton<BlobMigratorInterface>(
self, newMGWorker, mgSingleton, bestFitnessForMG, self->recruitingBlobMigratorID);
}
@ -775,7 +776,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
if (self->db.blobGranulesEnabled.get()) {
currBMProcessId = bmSingleton.interface.get().locality.processId();
newBMProcessId = newBMWorker.interf.locality.processId();
if (isFullRestoreMode()) {
if (self->db.blobRestoreEnabled.get()) {
currMGProcessId = mgSingleton.interface.get().locality.processId();
newMGProcessId = newMGWorker.interf.locality.processId();
}
@ -792,7 +793,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
if (self->db.blobGranulesEnabled.get()) {
currPids.emplace_back(currBMProcessId);
newPids.emplace_back(newBMProcessId);
if (isFullRestoreMode()) {
if (self->db.blobRestoreEnabled.get()) {
currPids.emplace_back(currMGProcessId);
newPids.emplace_back(newMGProcessId);
}
@ -810,7 +811,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
if (!self->db.blobGranulesEnabled.get()) {
ASSERT(currColocMap[currBMProcessId] == 0);
ASSERT(newColocMap[newBMProcessId] == 0);
if (isFullRestoreMode()) {
if (self->db.blobRestoreEnabled.get()) {
ASSERT(currColocMap[currMGProcessId] == 0);
ASSERT(newColocMap[newMGProcessId] == 0);
}
@ -836,7 +837,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
ddSingleton.recruit(self);
} else if (self->db.blobGranulesEnabled.get() && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) {
bmSingleton.recruit(self);
} else if (self->db.blobGranulesEnabled.get() && isFullRestoreMode() &&
} else if (self->db.blobGranulesEnabled.get() && self->db.blobRestoreEnabled.get() &&
newColocMap[newMGProcessId] < currColocMap[currMGProcessId]) {
mgSingleton.recruit(self);
} else if (SERVER_KNOBS->ENABLE_ENCRYPTION && newColocMap[newEKPProcessId] < currColocMap[currEKPProcessId]) {
@ -1404,13 +1405,13 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID);
}
if (self->db.blobGranulesEnabled.get() && isFullRestoreMode() && req.blobManagerInterf.present()) {
if (self->db.blobGranulesEnabled.get() && req.blobManagerInterf.present()) {
auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager);
auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf);
haltRegisteringOrCurrentSingleton<BlobManagerInterface>(
self, w, currSingleton, registeringSingleton, self->recruitingBlobManagerID);
}
if (req.blobMigratorInterf.present()) {
if (req.blobMigratorInterf.present() && self->db.blobRestoreEnabled.get()) {
auto currSingleton = BlobMigratorSingleton(self->db.serverInfo->get().blobMigrator);
auto registeringSingleton = BlobMigratorSingleton(req.blobMigratorInterf);
haltRegisteringOrCurrentSingleton<BlobMigratorInterface>(
@ -2553,6 +2554,43 @@ ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) {
}
}
ACTOR Future<Void> watchBlobRestoreCommand(ClusterControllerData* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
state Key blobRestoreCommandKey = blobRestoreCommandKeyFor(normalKeys);
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> blobRestoreCommand = wait(tr->get(blobRestoreCommandKey));
if (blobRestoreCommand.present()) {
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(blobRestoreCommand.get());
TraceEvent("WatchBlobRestoreCommand").detail("Progress", status.progress);
if (status.progress == 0) {
self->db.blobRestoreEnabled.set(true);
if (self->db.blobGranulesEnabled.get()) {
const auto& blobManager = self->db.serverInfo->get().blobManager;
if (blobManager.present()) {
BlobManagerSingleton(blobManager)
.haltBlobGranules(self, blobManager.get().locality.processId());
}
const auto& blobMigrator = self->db.serverInfo->get().blobMigrator;
if (blobMigrator.present()) {
BlobMigratorSingleton(blobMigrator).halt(self, blobMigrator.get().locality.processId());
}
}
}
}
state Future<Void> watch = tr->watch(blobRestoreCommandKey);
wait(tr->commit());
wait(watch);
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> startBlobMigrator(ClusterControllerData* self, double waitTime) {
// If master fails at the same time, give it a chance to clear master PID.
// Also wait to avoid too many consecutive recruits in a small time window.
@ -2629,9 +2667,8 @@ ACTOR Future<Void> monitorBlobMigrator(ClusterControllerData* self) {
}
loop {
if (self->db.serverInfo->get().blobMigrator.present() && !self->recruitBlobMigrator.get()) {
state Future<Void> wfClient =
waitFailureClient(self->db.serverInfo->get().blobMigrator.get().ssi.waitFailure,
SERVER_KNOBS->BLOB_MIGRATOR_FAILURE_TIME);
state Future<Void> wfClient = waitFailureClient(self->db.serverInfo->get().blobMigrator.get().waitFailure,
SERVER_KNOBS->BLOB_MIGRATOR_FAILURE_TIME);
loop {
choose {
when(wait(wfClient)) {
@ -2643,11 +2680,11 @@ ACTOR Future<Void> monitorBlobMigrator(ClusterControllerData* self) {
when(wait(self->recruitBlobMigrator.onChange())) {}
}
}
} else if (self->db.blobGranulesEnabled.get() && isFullRestoreMode()) {
} else if (self->db.blobGranulesEnabled.get() && self->db.blobRestoreEnabled.get()) {
// if there is no blob migrator present but blob granules are now enabled, recruit a BM
wait(startBlobMigrator(self, recruitThrottler.newRecruitment()));
} else {
wait(self->db.blobGranulesEnabled.onChange());
wait(self->db.blobGranulesEnabled.onChange() || self->db.blobRestoreEnabled.onChange());
}
}
}
@ -2778,7 +2815,7 @@ ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
const auto& blobManager = self->db.serverInfo->get().blobManager;
BlobManagerSingleton(blobManager)
.haltBlobGranules(self, blobManager.get().locality.processId());
if (isFullRestoreMode()) {
if (self->db.blobRestoreEnabled.get()) {
const auto& blobMigrator = self->db.serverInfo->get().blobMigrator;
BlobMigratorSingleton(blobMigrator).halt(self, blobMigrator.get().locality.processId());
}
@ -3079,8 +3116,9 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self));
self.addActor.send(monitorBlobManager(&self));
self.addActor.send(monitorBlobMigrator(&self));
self.addActor.send(watchBlobGranulesConfigKey(&self));
self.addActor.send(monitorBlobMigrator(&self));
self.addActor.send(watchBlobRestoreCommand(&self));
self.addActor.send(monitorConsistencyScan(&self));
self.addActor.send(metaclusterMetricsUpdater(&self));
self.addActor.send(dbInfoUpdater(&self));

View File

@ -162,10 +162,7 @@ ACTOR Future<Void> loadManifest(Database db, Reference<BlobConnectionProvider> b
ACTOR Future<Void> printRestoreSummary(Database db, Reference<BlobConnectionProvider> blobConn);
ACTOR Future<BlobGranuleRestoreVersionVector> listBlobGranules(Database db, Reference<BlobConnectionProvider> blobConn);
ACTOR Future<int64_t> lastBlobEpoc(Database db, Reference<BlobConnectionProvider> blobConn);
inline bool isFullRestoreMode() {
return SERVER_KNOBS->BLOB_FULL_RESTORE_MODE;
};
ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef range);
#include "flow/unactorcompiler.h"

View File

@ -30,6 +30,7 @@
struct BlobMigratorInterface {
constexpr static FileIdentifier file_identifier = 869199;
RequestStream<struct HaltBlobMigratorRequest> haltBlobMigrator;
RequestStream<ReplyPromise<Void>> waitFailure;
LocalityData locality;
UID uniqueID;
StorageServerInterface ssi;
@ -48,7 +49,7 @@ struct BlobMigratorInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, locality, uniqueID, haltBlobMigrator);
serializer(ar, locality, uniqueID, haltBlobMigrator, waitFailure);
}
};

View File

@ -144,6 +144,7 @@ public:
Future<Void> clientCounter;
int clientCount;
AsyncVar<bool> blobGranulesEnabled;
AsyncVar<bool> blobRestoreEnabled;
ClusterType clusterType = ClusterType::STANDALONE;
Optional<ClusterName> metaclusterName;
Optional<MetaclusterRegistrationEntry> metaclusterRegistration;
@ -159,7 +160,7 @@ public:
TaskPriority::DefaultEndpoint,
LockAware::True)), // SOMEDAY: Locality!
unfinishedRecoveries(0), logGenerations(0), cachePopulated(false), clientCount(0),
blobGranulesEnabled(config.blobGranulesEnabled) {
blobGranulesEnabled(config.blobGranulesEnabled), blobRestoreEnabled(false) {
clientCounter = countClients(this);
}

View File

@ -6138,6 +6138,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> tryReadBlobGranules(Tra
loop {
try {
Standalone<VectorRef<BlobGranuleChunkRef>> chunks = wait(tr->readBlobGranules(keys, 0, readVersion));
TraceEvent(SevDebug, "ReadBlobGranules").detail("Keys", keys).detail("Chunks", chunks.size());
return chunks;
} catch (Error& e) {
if (retryCount >= maxRetryCount) {
@ -6169,10 +6170,7 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
for (i = 0; i < chunks.size(); ++i) {
state KeyRangeRef chunkRange = chunks[i].keyRange;
state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn));
TraceEvent("ReadBlobData")
.detail("Rows", rows.size())
.detail("ChunkRange", chunkRange.toString())
.detail("Keys", keys.toString());
TraceEvent(SevDebug, "ReadBlobData").detail("Rows", rows.size()).detail("ChunkRange", chunkRange);
if (rows.size() == 0) {
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
}
@ -6185,7 +6183,7 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
} catch (Error& e) {
TraceEvent(SevWarn, "ReadBlobDataFailure")
.suppressFor(5.0)
.detail("Keys", keys.toString())
.detail("Keys", keys)
.detail("FetchVersion", fetchVersion)
.detail("Error", e.what());
tr->reset();
@ -6994,7 +6992,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// We must also ensure we have fetched all change feed metadata BEFORE changing the phase to fetching to ensure
// change feed mutations get applied correctly
state std::vector<Key> changeFeedsToFetch;
if (!isFullRestoreMode()) {
state bool isFullRestore = wait(isFullRestoreMode(data->cx, keys));
if (!isFullRestore) {
std::vector<Key> _cfToFetch = wait(fetchCFMetadata);
changeFeedsToFetch = _cfToFetch;
}
@ -7072,7 +7071,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state PromiseStream<RangeResult> results;
state Future<Void> hold;
if (SERVER_KNOBS->FETCH_USING_BLOB) {
if (isFullRestore) {
hold = tryGetRangeFromBlob(results, &tr, keys, fetchVersion, data->blobConn);
} else {
hold = tryGetRange(results, &tr, keys);
@ -7110,7 +7109,6 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
data->thisServerID);
}
}
metricReporter.addFetchedBytes(expectedBlockSize, this_block.size());
// Write this_block to storage

View File

@ -2335,6 +2335,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
} else {
startRole(Role::BLOB_MIGRATOR, recruited.id(), interf.id());
DUMPTOKEN(recruited.haltBlobMigrator);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.ssi.getValue);
DUMPTOKEN(recruited.ssi.getKey);
DUMPTOKEN(recruited.ssi.getKeyValues);
@ -2345,7 +2346,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.ssi.getReadHotRanges);
DUMPTOKEN(recruited.ssi.getRangeSplitPoints);
DUMPTOKEN(recruited.ssi.getStorageMetrics);
DUMPTOKEN(recruited.ssi.waitFailure);
DUMPTOKEN(recruited.ssi.getQueuingMetrics);
DUMPTOKEN(recruited.ssi.getKeyValueStoreType);
DUMPTOKEN(recruited.ssi.watchValue);