Merge pull request #8984 from sfc-gh-huliu/restoretest
Add correctness test for blob restore
This commit is contained in:
commit
46d92bbf3f
|
@ -11026,11 +11026,12 @@ ACTOR Future<bool> blobRestoreActor(Reference<DatabaseContext> cx, KeyRange rang
|
|||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Key key = blobRestoreCommandKeyFor(range);
|
||||
Optional<Value> value = wait(tr->get(key));
|
||||
if (value.present()) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(value.get());
|
||||
if (status.progress < 100) {
|
||||
if (status.phase != BlobRestorePhase::DONE) {
|
||||
return false; // stop if there is in-progress restore.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1039,7 +1039,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BLOB_FULL_RESTORE_MODE, false );
|
||||
init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0);
|
||||
init( BLOB_MANIFEST_RW_ROWS, isSimulated ? 10 : 1000);
|
||||
init( BLOB_RESTORE_MLOGS_URL, isSimulated ? "file://simfdb/fdbblob/mlogs" : "");
|
||||
init( BLOB_RESTORE_MLOGS_URL, isSimulated ? "file://simfdb/backups/" : "");
|
||||
init( BLOB_MIGRATOR_ERROR_RETRIES, 20);
|
||||
|
||||
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
|
||||
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );
|
||||
|
|
|
@ -314,19 +314,28 @@ struct BlobManifest {
|
|||
};
|
||||
|
||||
// Defines blob restore status
|
||||
enum BlobRestorePhase { INIT = 0, LOAD_MANIFEST = 1, MANIFEST_DONE = 2, MIGRATE = 3, APPLY_MLOGS = 4, DONE = 5 };
|
||||
enum BlobRestorePhase {
|
||||
INIT = 0,
|
||||
STARTING_MIGRATOR = 1,
|
||||
LOADING_MANIFEST = 2,
|
||||
LOADED_MANIFEST = 3,
|
||||
COPYING_DATA = 4,
|
||||
APPLYING_MLOGS = 5,
|
||||
DONE = 6,
|
||||
ERROR = 7
|
||||
};
|
||||
struct BlobRestoreStatus {
|
||||
constexpr static FileIdentifier file_identifier = 378657;
|
||||
BlobRestorePhase phase;
|
||||
int progress;
|
||||
int status;
|
||||
|
||||
BlobRestoreStatus() : phase(BlobRestorePhase::INIT){};
|
||||
BlobRestoreStatus(BlobRestorePhase pha) : phase(pha), progress(0){};
|
||||
BlobRestoreStatus(BlobRestorePhase pha, int prog) : phase(pha), progress(prog){};
|
||||
BlobRestoreStatus(BlobRestorePhase pha) : phase(pha), status(0){};
|
||||
BlobRestoreStatus(BlobRestorePhase pha, int prog) : phase(pha), status(prog){};
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, phase, progress);
|
||||
serializer(ar, phase, status);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1009,6 +1009,7 @@ public:
|
|||
double BLOB_MIGRATOR_CHECK_INTERVAL;
|
||||
int BLOB_MANIFEST_RW_ROWS;
|
||||
std::string BLOB_RESTORE_MLOGS_URL;
|
||||
int BLOB_MIGRATOR_ERROR_RETRIES;
|
||||
|
||||
// Blob metadata
|
||||
int64_t BLOB_METADATA_CACHE_TTL;
|
||||
|
|
|
@ -289,6 +289,8 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
|
|||
return ProcessClass::GoodFit;
|
||||
case ProcessClass::MasterClass:
|
||||
return ProcessClass::OkayFit;
|
||||
case ProcessClass::BlobWorkerClass:
|
||||
return ProcessClass::OkayFit;
|
||||
default:
|
||||
return ProcessClass::NeverAssign;
|
||||
}
|
||||
|
|
|
@ -1174,8 +1174,6 @@ ACTOR Future<Void> checkManagerLock(Transaction* tr, Reference<BlobManagerData>
|
|||
ASSERT(currentLockValue.present());
|
||||
int64_t currentEpoch = decodeBlobManagerEpochValue(currentLockValue.get());
|
||||
if (currentEpoch != bmData->epoch) {
|
||||
ASSERT(currentEpoch > bmData->epoch);
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print(
|
||||
"BM {0} found new epoch {1} > {2} in lock check\n", bmData->id.toString(), currentEpoch, bmData->epoch);
|
||||
|
@ -3547,16 +3545,27 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
bool isFullRestore = wait(isFullRestoreMode(bmData->db, normalKeys));
|
||||
bmData->isFullRestoreMode = isFullRestore;
|
||||
if (bmData->isFullRestoreMode) {
|
||||
BlobRestoreStatus initStatus(BlobRestorePhase::LOAD_MANIFEST);
|
||||
wait(updateRestoreStatus(bmData->db, normalKeys, initStatus));
|
||||
|
||||
wait(loadManifest(bmData->db, bmData->bstore));
|
||||
|
||||
int64_t epoc = wait(lastBlobEpoc(bmData->db, bmData->bstore));
|
||||
wait(updateEpoch(bmData, epoc + 1));
|
||||
|
||||
BlobRestoreStatus completedStatus(BlobRestorePhase::MANIFEST_DONE);
|
||||
wait(updateRestoreStatus(bmData->db, normalKeys, completedStatus));
|
||||
Optional<BlobRestoreStatus> status = wait(getRestoreStatus(bmData->db, normalKeys));
|
||||
ASSERT(status.present());
|
||||
state BlobRestorePhase phase = status.get().phase;
|
||||
if (phase == BlobRestorePhase::STARTING_MIGRATOR || phase == BlobRestorePhase::LOADING_MANIFEST) {
|
||||
wait(updateRestoreStatus(bmData->db, normalKeys, BlobRestoreStatus(LOADING_MANIFEST), {}));
|
||||
try {
|
||||
wait(loadManifest(bmData->db, bmData->bstore));
|
||||
int64_t epoc = wait(lastBlobEpoc(bmData->db, bmData->bstore));
|
||||
wait(updateEpoch(bmData, epoc + 1));
|
||||
BlobRestoreStatus completedStatus(BlobRestorePhase::LOADED_MANIFEST);
|
||||
wait(updateRestoreStatus(bmData->db, normalKeys, completedStatus, BlobRestorePhase::LOADING_MANIFEST));
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_restore_missing_data) {
|
||||
throw e; // retryable errors
|
||||
}
|
||||
// terminate blob restore for non-retryable errors
|
||||
TraceEvent("ManifestLoadError", bmData->id).error(e).detail("Phase", phase);
|
||||
BlobRestoreStatus error(BlobRestorePhase::ERROR, e.code());
|
||||
wait(updateRestoreStatus(bmData->db, normalKeys, error, {}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
|
@ -5310,10 +5319,6 @@ ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) {
|
||||
if (g_network->isSimulated() && g_simulator->speedUpSimulation) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
bmData->initBStore();
|
||||
loop {
|
||||
wait(dumpManifest(bmData->db, bmData->bstore, bmData->epoch, bmData->manifestDumperSeqNo));
|
||||
|
@ -5322,8 +5327,8 @@ ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) {
|
|||
}
|
||||
}
|
||||
|
||||
// Simulation validation that multiple blob managers aren't started with the same epoch
|
||||
static std::map<int64_t, UID> managerEpochsSeen;
|
||||
// Simulation validation that multiple blob managers aren't started with the same epoch within same cluster
|
||||
static std::map<std::pair<std::string, int64_t>, UID> managerEpochsSeen;
|
||||
|
||||
ACTOR Future<Void> checkBlobManagerEpoch(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int64_t epoch, UID dbgid) {
|
||||
loop {
|
||||
|
@ -5338,15 +5343,18 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
|
||||
int64_t epoch) {
|
||||
if (g_network->isSimulated()) {
|
||||
bool managerEpochAlreadySeen = managerEpochsSeen.count(epoch);
|
||||
std::string clusterId = dbInfo->get().clusterInterface.id().shortString();
|
||||
auto clusterEpoc = std::make_pair(clusterId, epoch);
|
||||
bool managerEpochAlreadySeen = managerEpochsSeen.count(clusterEpoc);
|
||||
if (managerEpochAlreadySeen) {
|
||||
TraceEvent(SevError, "DuplicateBlobManagersAtEpoch")
|
||||
.detail("ClusterId", clusterId)
|
||||
.detail("Epoch", epoch)
|
||||
.detail("BMID1", bmInterf.id())
|
||||
.detail("BMID2", managerEpochsSeen.at(epoch));
|
||||
.detail("BMID2", managerEpochsSeen.at(clusterEpoc));
|
||||
}
|
||||
ASSERT(!managerEpochAlreadySeen);
|
||||
managerEpochsSeen[epoch] = bmInterf.id();
|
||||
managerEpochsSeen[clusterEpoc] = bmInterf.id();
|
||||
}
|
||||
state Reference<BlobManagerData> self =
|
||||
makeReference<BlobManagerData>(bmInterf.id(),
|
||||
|
@ -5390,7 +5398,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
self->addActor.send(backupManifest(self));
|
||||
}
|
||||
|
||||
if (BUGGIFY) {
|
||||
if (BUGGIFY && !self->isFullRestoreMode) {
|
||||
self->addActor.send(chaosRangeMover(self));
|
||||
}
|
||||
|
||||
|
|
|
@ -87,13 +87,6 @@ struct BlobManifestFile {
|
|||
std::sort(result.begin(), result.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
// Find the last manifest file
|
||||
ACTOR static Future<std::string> last(Reference<BackupContainerFileSystem> reader) {
|
||||
std::vector<BlobManifestFile> files = wait(list(reader));
|
||||
ASSERT(!files.empty());
|
||||
return files.front().fileName;
|
||||
}
|
||||
};
|
||||
|
||||
// This class dumps blob manifest to external blob storage.
|
||||
|
@ -135,6 +128,7 @@ private:
|
|||
blobGranuleMappingKeys, // Map granule to workers. Track the active granules
|
||||
blobGranuleFileKeys, // Map a granule version to granule files. Track files for a granule
|
||||
blobGranuleHistoryKeys, // Map granule to its parents and parent bundaries. for time-travel read
|
||||
blobGranuleSplitKeys, // Granule split state to recover from a splitting granule
|
||||
blobRangeKeys // Key ranges managed by blob
|
||||
};
|
||||
for (auto range : ranges) {
|
||||
|
@ -233,11 +227,16 @@ public:
|
|||
ACTOR static Future<Void> execute(Reference<BlobManifestLoader> self) {
|
||||
try {
|
||||
Value data = wait(readFromFile(self));
|
||||
if (data.empty()) {
|
||||
throw restore_missing_data();
|
||||
}
|
||||
state Standalone<BlobManifest> manifest = decode(data);
|
||||
wait(writeSystemKeys(self, manifest.rows));
|
||||
BlobGranuleRestoreVersionVector _ = wait(listGranules(self));
|
||||
} catch (Error& e) {
|
||||
dprint("WARNING: unexpected manifest loader error {}\n", e.what()); // skip error handling so far
|
||||
dprint("WARNING: unexpected manifest loader error {}\n", e.what());
|
||||
TraceEvent("BlobManfiestError").error(e).log();
|
||||
throw;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -286,9 +285,8 @@ public:
|
|||
if (e.code() == error_code_restore_missing_data) {
|
||||
dprint("missing data for key range {} \n", granuleRange.toString());
|
||||
TraceEvent("BlobRestoreMissingData").detail("KeyRange", granuleRange.toString());
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
return results;
|
||||
|
@ -319,7 +317,12 @@ private:
|
|||
// Read data from a manifest file
|
||||
ACTOR static Future<Value> readFromFile(Reference<BlobManifestLoader> self) {
|
||||
state Reference<BackupContainerFileSystem> container = self->blobConn_->getForRead(MANIFEST_FOLDER);
|
||||
std::string fileName = wait(BlobManifestFile::last(container));
|
||||
std::vector<BlobManifestFile> files = wait(BlobManifestFile::list(container));
|
||||
if (files.empty()) {
|
||||
dprint("No blob manifest files for restore\n");
|
||||
return Value();
|
||||
}
|
||||
std::string fileName = files.front().fileName;
|
||||
state Reference<IAsyncFile> reader = wait(container->readFile(fileName));
|
||||
state int64_t fileSize = wait(reader->size());
|
||||
state Arena arena;
|
||||
|
@ -376,7 +379,7 @@ private:
|
|||
// Find the newest granule for a key range. The newest granule has the max version and relevant files
|
||||
ACTOR static Future<Standalone<BlobGranuleRestoreVersion>> getGranule(Transaction* tr, KeyRangeRef range) {
|
||||
state Standalone<BlobGranuleRestoreVersion> granuleVersion;
|
||||
KeyRange historyKeyRange = blobGranuleHistoryKeyRangeFor(range);
|
||||
state KeyRange historyKeyRange = blobGranuleHistoryKeyRangeFor(range);
|
||||
// reverse lookup so that the first row is the newest version
|
||||
state RangeResult results =
|
||||
wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::True, Reverse::True));
|
||||
|
@ -529,13 +532,13 @@ ACTOR Future<int64_t> lastBlobEpoc(Database db, Reference<BlobConnectionProvider
|
|||
|
||||
// Return true if the given key range is restoring
|
||||
ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef keys) {
|
||||
KeyRange range = wait(getRestoringRange(db, keys));
|
||||
return !range.empty();
|
||||
std::pair<KeyRange, BlobRestoreStatus> rangeStatus = wait(getRestoreRangeStatus(db, keys));
|
||||
return !rangeStatus.first.empty() && rangeStatus.second.phase != BlobRestorePhase::DONE;
|
||||
}
|
||||
|
||||
// Check the given key range and return subrange that is doing restore. Returns empty range if no restoring
|
||||
// for any portion of the given range.
|
||||
ACTOR Future<KeyRange> getRestoringRange(Database db, KeyRangeRef keys) {
|
||||
ACTOR Future<std::pair<KeyRange, BlobRestoreStatus>> getRestoreRangeStatus(Database db, KeyRangeRef keys) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
@ -552,9 +555,8 @@ ACTOR Future<KeyRange> getRestoringRange(Database db, KeyRangeRef keys) {
|
|||
KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key);
|
||||
if (keys.intersects(keyRange)) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(r.value);
|
||||
if (status.phase < BlobRestorePhase::DONE) {
|
||||
return KeyRangeRef(std::max(keys.begin, keyRange.begin), std::min(keys.end, keyRange.end));
|
||||
}
|
||||
KeyRangeRef intersected(std::max(keys.begin, keyRange.begin), std::min(keys.end, keyRange.end));
|
||||
return std::make_pair(intersected, status);
|
||||
}
|
||||
}
|
||||
if (!ranges.more) {
|
||||
|
@ -566,7 +568,7 @@ ACTOR Future<KeyRange> getRestoringRange(Database db, KeyRangeRef keys) {
|
|||
begin = firstGreaterThan(ranges.end()[-1].key);
|
||||
}
|
||||
}
|
||||
return KeyRangeRef();
|
||||
return std::make_pair(KeyRangeRef(), BlobRestoreStatus(BlobRestorePhase::DONE));
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
@ -574,14 +576,29 @@ ACTOR Future<KeyRange> getRestoringRange(Database db, KeyRangeRef keys) {
|
|||
}
|
||||
|
||||
// Update restore status
|
||||
ACTOR Future<Void> updateRestoreStatus(Database db, KeyRangeRef range, BlobRestoreStatus status) {
|
||||
ACTOR Future<Void> updateRestoreStatus(Database db,
|
||||
KeyRangeRef range,
|
||||
BlobRestoreStatus status,
|
||||
Optional<BlobRestorePhase> expectedPhase) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Key key = blobRestoreCommandKeyFor(range);
|
||||
state Key key = blobRestoreCommandKeyFor(range);
|
||||
|
||||
// check if current phase is expected
|
||||
if (expectedPhase.present()) {
|
||||
Optional<Value> oldValue = wait(tr.get(key));
|
||||
if (oldValue.present()) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(oldValue.get());
|
||||
if (status.phase != expectedPhase.get()) {
|
||||
throw restore_error();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Value value = blobRestoreCommandValueFor(status);
|
||||
tr.set(key, value);
|
||||
wait(tr.commit());
|
||||
|
@ -593,23 +610,11 @@ ACTOR Future<Void> updateRestoreStatus(Database db, KeyRangeRef range, BlobResto
|
|||
}
|
||||
|
||||
// Get restore status
|
||||
ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRangeRef range) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Key key = blobRestoreCommandKeyFor(range);
|
||||
Optional<Value> value = wait(tr.get(key));
|
||||
Optional<BlobRestoreStatus> result;
|
||||
if (value.present()) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(value.get());
|
||||
result = status;
|
||||
}
|
||||
return result;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRangeRef keys) {
|
||||
state Optional<BlobRestoreStatus> result;
|
||||
std::pair<KeyRange, BlobRestoreStatus> rangeStatus = wait(getRestoreRangeStatus(db, keys));
|
||||
if (!rangeStatus.first.empty()) {
|
||||
result = rangeStatus.second;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -67,18 +67,26 @@ public:
|
|||
// Start migration
|
||||
ACTOR static Future<Void> start(Reference<BlobMigrator> self) {
|
||||
wait(checkIfReadyForMigration(self));
|
||||
wait(lockDatabase(self->db_, self->interf_.id()));
|
||||
wait(prepare(self, normalKeys));
|
||||
wait(advanceVersion(self));
|
||||
wait(serverLoop(self));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> updateStatus(Reference<BlobMigrator> self, KeyRange keys, BlobRestoreStatus status) {
|
||||
wait(updateRestoreStatus(self->db_, keys, status, {}));
|
||||
return Void();
|
||||
}
|
||||
|
||||
private:
|
||||
// Check if blob manifest is loaded so that blob migration can start
|
||||
ACTOR static Future<Void> checkIfReadyForMigration(Reference<BlobMigrator> self) {
|
||||
loop {
|
||||
Optional<BlobRestoreStatus> status = wait(getRestoreStatus(self->db_, normalKeys));
|
||||
if (canStartMigration(status)) {
|
||||
ASSERT(status.present());
|
||||
BlobRestorePhase phase = status.get().phase;
|
||||
if (phase == BlobRestorePhase::LOADED_MANIFEST) {
|
||||
BlobGranuleRestoreVersionVector granules = wait(listBlobGranules(self->db_, self->blobConn_));
|
||||
if (!granules.empty()) {
|
||||
self->blobGranules_ = granules;
|
||||
|
@ -89,27 +97,24 @@ private:
|
|||
.detail("Version", granule.version)
|
||||
.detail("SizeInBytes", granule.sizeInBytes);
|
||||
}
|
||||
|
||||
BlobRestoreStatus status(BlobRestorePhase::MIGRATE, 0);
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, status));
|
||||
wait(updateRestoreStatus(self->db_,
|
||||
normalKeys,
|
||||
BlobRestoreStatus(BlobRestorePhase::COPYING_DATA),
|
||||
BlobRestorePhase::LOADED_MANIFEST));
|
||||
return Void();
|
||||
}
|
||||
} else if (phase >= BlobRestorePhase::COPYING_DATA) {
|
||||
TraceEvent("BlobMigratorUnexpectedPhase", self->interf_.id()).detail("Phase", status.get().phase);
|
||||
throw restore_error();
|
||||
}
|
||||
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we should start migration. Migration can be started after manifest is fully loaded
|
||||
static bool canStartMigration(Optional<BlobRestoreStatus> status) {
|
||||
if (status.present()) {
|
||||
BlobRestoreStatus value = status.get();
|
||||
return value.phase == BlobRestorePhase::MANIFEST_DONE; // manifest is loaded successfully
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Prepare for data migration for given key range.
|
||||
ACTOR static Future<Void> prepare(Reference<BlobMigrator> self, KeyRangeRef keys) {
|
||||
wait(waitForDataMover(self));
|
||||
state int oldMode = wait(setDDMode(self->db_, 0));
|
||||
// Register as a storage server, so that DataDistributor could start data movement after
|
||||
std::pair<Version, Tag> verAndTag = wait(addStorageServer(self->db_, self->interf_.ssi));
|
||||
dprint("Started storage server interface {} {}\n", verAndTag.first, verAndTag.second.toString());
|
||||
|
@ -118,14 +123,38 @@ private:
|
|||
// It'll restart DataDistributor so that internal data structures like ShardTracker, ShardsAffectedByTeamFailure
|
||||
// could be re-initialized. Ideally it should be done within DataDistributor, then we don't need to
|
||||
// restart DataDistributor
|
||||
state int oldMode = wait(setDDMode(self->db_, 0));
|
||||
wait(unassignServerKeys(self, keys));
|
||||
wait(assignKeysToServer(self, keys, self->interf_.ssi.id()));
|
||||
wait(success(setDDMode(self->db_, oldMode)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Assign given key range to specified storage server. Subsquent
|
||||
// Wait until all pending data moving is done before doing full restore.
|
||||
ACTOR static Future<Void> waitForDataMover(Reference<BlobMigrator> self) {
|
||||
state int retries = 0;
|
||||
loop {
|
||||
state Transaction tr(self->db_);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
RangeResult dms = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
if (dms.size() == 0) {
|
||||
return Void();
|
||||
} else {
|
||||
dprint("Wait pending data moving {}\n", dms.size());
|
||||
wait(delay(2));
|
||||
if (++retries > SERVER_KNOBS->BLOB_MIGRATOR_ERROR_RETRIES) {
|
||||
throw restore_error();
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Assign given key range to specified storage server.
|
||||
ACTOR static Future<Void> assignKeysToServer(Reference<BlobMigrator> self, KeyRangeRef keys, UID serverUID) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
|
@ -147,20 +176,31 @@ private:
|
|||
|
||||
// Unassign given key range from its current storage servers
|
||||
ACTOR static Future<Void> unassignServerKeys(Reference<BlobMigrator> self, KeyRangeRef keys) {
|
||||
state Transaction tr(self->db_);
|
||||
state int retries = 0;
|
||||
loop {
|
||||
state Transaction tr(self->db_);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
state RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
state RangeResult serverList =
|
||||
wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY, Snapshot::True));
|
||||
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
for (auto& server : serverList) {
|
||||
state UID id = decodeServerListValue(server.value).id();
|
||||
Optional<Value> tag = wait(tr.get(serverTagKeyFor(id)));
|
||||
if (!tag.present()) {
|
||||
dprint("Server {} no tag\n", id.shortString());
|
||||
continue;
|
||||
}
|
||||
if (id == self->interf_.id()) {
|
||||
continue;
|
||||
}
|
||||
RangeResult ranges = wait(krmGetRanges(&tr, serverKeysPrefixFor(id), keys));
|
||||
|
||||
bool owning = false;
|
||||
for (auto& r : ranges) {
|
||||
if (r.value == serverKeysTrue) {
|
||||
if (r.value != serverKeysFalse) {
|
||||
owning = true;
|
||||
break;
|
||||
}
|
||||
|
@ -175,6 +215,9 @@ private:
|
|||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
if (++retries > SERVER_KNOBS->BLOB_MIGRATOR_ERROR_RETRIES) {
|
||||
throw restore_error();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -184,9 +227,17 @@ private:
|
|||
loop {
|
||||
bool done = wait(checkProgress(self));
|
||||
if (done) {
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, BlobRestoreStatus(BlobRestorePhase::APPLY_MLOGS)));
|
||||
wait(updateRestoreStatus(self->db_,
|
||||
normalKeys,
|
||||
BlobRestoreStatus(BlobRestorePhase::APPLYING_MLOGS),
|
||||
BlobRestorePhase::COPYING_DATA));
|
||||
wait(unlockDatabase(self->db_, self->interf_.id()));
|
||||
wait(applyMutationLogs(self));
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, BlobRestoreStatus(BlobRestorePhase::DONE)));
|
||||
|
||||
wait(updateRestoreStatus(self->db_,
|
||||
normalKeys,
|
||||
BlobRestoreStatus(BlobRestorePhase::DONE),
|
||||
BlobRestorePhase::APPLYING_MLOGS));
|
||||
return Void();
|
||||
}
|
||||
wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL));
|
||||
|
@ -223,8 +274,8 @@ private:
|
|||
state bool done = incompleted == 0;
|
||||
dprint("Migration progress :{}%. done {}\n", progress, done);
|
||||
TraceEvent("BlobMigratorProgress", self->interf_.id()).detail("Progress", progress);
|
||||
BlobRestoreStatus status(BlobRestorePhase::MIGRATE, progress);
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, status));
|
||||
BlobRestoreStatus status(BlobRestorePhase::COPYING_DATA, progress);
|
||||
wait(updateRestoreStatus(self->db_, normalKeys, status, BlobRestorePhase::COPYING_DATA));
|
||||
return done;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
@ -247,6 +298,9 @@ private:
|
|||
dprint("Advance version from {} to {}\n", current, expected);
|
||||
TraceEvent("AdvanceVersion", self->interf_.id()).detail("From", current).detail("To", expected);
|
||||
wait(tr.commit());
|
||||
} else {
|
||||
dprint("Skip advancing version {}. current {}\n", expected, current);
|
||||
TraceEvent("SkipAdvanceVersion", self->interf_.id()).detail("From", current).detail("To", expected);
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
|
@ -287,23 +341,49 @@ private:
|
|||
// check last version in mutation logs
|
||||
Optional<std::string> proxy; // unused
|
||||
Optional<std::string> encryptionKeyFile; // unused
|
||||
Reference<IBackupContainer> bc = IBackupContainer::openContainer(mutationLogsUrl, proxy, encryptionKeyFile);
|
||||
state Reference<IBackupContainer> bc =
|
||||
IBackupContainer::openContainer(mutationLogsUrl, proxy, encryptionKeyFile);
|
||||
BackupDescription desc = wait(bc->describeBackup());
|
||||
if (!desc.contiguousLogEnd.present()) {
|
||||
TraceEvent(SevError, "BlobMigratorInvalidMutationLogs").detail("Url", mutationLogsUrl);
|
||||
TraceEvent("InvalidMutationLogs").detail("Url", mutationLogsUrl);
|
||||
throw restore_missing_data();
|
||||
}
|
||||
Version targetVersion = desc.contiguousLogEnd.get() - 1;
|
||||
TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Version", targetVersion);
|
||||
if (!desc.minLogBegin.present()) {
|
||||
TraceEvent("InvalidMutationLogs").detail("Url", mutationLogsUrl);
|
||||
throw restore_missing_data();
|
||||
}
|
||||
state Version minLogVersion = desc.minLogBegin.get();
|
||||
state Version maxLogVersion = desc.contiguousLogEnd.get() - 1;
|
||||
|
||||
// restore to target version
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges;
|
||||
Standalone<VectorRef<Version>> beginVersions;
|
||||
state Standalone<VectorRef<KeyRangeRef>> ranges;
|
||||
state Standalone<VectorRef<Version>> beginVersions;
|
||||
for (auto& granule : self->blobGranules_) {
|
||||
ranges.push_back(ranges.arena(), granule.keyRange);
|
||||
beginVersions.push_back(beginVersions.arena(), granule.version);
|
||||
if (granule.version < minLogVersion || granule.version > maxLogVersion) {
|
||||
TraceEvent("InvalidMutationLogs")
|
||||
.detail("Granule", granule.granuleID)
|
||||
.detail("GranuleVersion", granule.version)
|
||||
.detail("MinLogVersion", minLogVersion)
|
||||
.detail("MaxLogVersion", maxLogVersion);
|
||||
throw restore_missing_data();
|
||||
}
|
||||
// no need to apply mutation logs if granule is already on that version
|
||||
if (granule.version < maxLogVersion) {
|
||||
ranges.push_back(ranges.arena(), granule.keyRange);
|
||||
beginVersions.push_back(beginVersions.arena(), granule.version + 1);
|
||||
}
|
||||
}
|
||||
Optional<RestorableFileSet> restoreSet =
|
||||
wait(bc->getRestoreSet(maxLogVersion, self->db_, ranges, OnlyApplyMutationLogs::True, minLogVersion));
|
||||
if (!restoreSet.present()) {
|
||||
TraceEvent("InvalidMutationLogs")
|
||||
.detail("MinLogVersion", minLogVersion)
|
||||
.detail("MaxLogVersion", maxLogVersion);
|
||||
throw restore_missing_data();
|
||||
}
|
||||
std::string tagName = "blobrestore-" + self->interf_.id().shortString();
|
||||
TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Version", minLogVersion);
|
||||
|
||||
wait(submitRestore(self, KeyRef(tagName), KeyRef(mutationLogsUrl), ranges, beginVersions));
|
||||
return Void();
|
||||
}
|
||||
|
@ -327,11 +407,11 @@ private:
|
|||
beginVersions,
|
||||
WaitForComplete::True,
|
||||
invalidVersion,
|
||||
Verbose::False,
|
||||
Verbose::True,
|
||||
""_sr, // addPrefix
|
||||
""_sr, // removePrefix
|
||||
LockDB::False,
|
||||
UnlockDB::False,
|
||||
LockDB::True,
|
||||
UnlockDB::True,
|
||||
OnlyApplyMutationLogs::True));
|
||||
TraceEvent("ApplyMutationLogsComplete", self->interf_.id()).detail("Version", version);
|
||||
return Void();
|
||||
|
@ -396,6 +476,11 @@ private:
|
|||
metrics.bytes = sizeInBytes(self);
|
||||
GetStorageMetricsReply resp;
|
||||
resp.load = metrics;
|
||||
resp.available = StorageMetrics();
|
||||
resp.capacity = StorageMetrics();
|
||||
resp.bytesInputRate = 0;
|
||||
resp.versionLag = 0;
|
||||
resp.lastUpdate = now();
|
||||
req.reply.send(resp);
|
||||
}
|
||||
when(ReplyPromise<KeyValueStoreType> reply = waitNext(ssi.getKeyValueStoreType.getFuture())) {
|
||||
|
@ -436,11 +521,14 @@ private:
|
|||
req.reply.sendError(unsupported_operation());
|
||||
}
|
||||
when(GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture())) {
|
||||
/* dprint("Unsupported GetKeyValuesRequest {} - {} @ {}\n",
|
||||
dprint("Unsupported GetKeyValuesRequest {} - {} @ {}\n",
|
||||
req.begin.getKey().printable(),
|
||||
req.end.getKey().printable(),
|
||||
req.version); */
|
||||
req.reply.sendError(unsupported_operation());
|
||||
req.version);
|
||||
// A temp fix to send back broken promise error so that fetchKey can switch to another
|
||||
// storage server. We should remove the storage server interface after
|
||||
// restore is done
|
||||
req.reply.sendError(broken_promise());
|
||||
}
|
||||
when(GetValueRequest req = waitNext(ssi.getValue.getFuture())) {
|
||||
dprint("Unsupported GetValueRequest\n");
|
||||
|
@ -525,12 +613,13 @@ private:
|
|||
ACTOR Future<Void> blobMigrator(BlobMigratorInterface interf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
TraceEvent("StartBlobMigrator", interf.id()).detail("Interface", interf.id().toString());
|
||||
dprint("Starting blob migrator {}\n", interf.id().toString());
|
||||
state Reference<BlobMigrator> self = makeReference<BlobMigrator>(dbInfo, interf);
|
||||
try {
|
||||
Reference<BlobMigrator> self = makeReference<BlobMigrator>(dbInfo, interf);
|
||||
wait(BlobMigrator::start(self));
|
||||
} catch (Error& e) {
|
||||
dprint("Unexpected blob migrator error {}\n", e.what());
|
||||
TraceEvent("BlobMigratorError", interf.id()).error(e);
|
||||
wait(BlobMigrator::updateStatus(self, normalKeys, BlobRestoreStatus(BlobRestorePhase::ERROR, e.code())));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -2134,7 +2134,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
metadata->files = startState.existingFiles.get();
|
||||
snapshotEligible = true;
|
||||
}
|
||||
|
@ -2178,8 +2177,29 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
}
|
||||
|
||||
// No need to start Change Feed in full restore mode
|
||||
if (bwData->isFullRestoreMode)
|
||||
if (bwData->isFullRestoreMode) {
|
||||
while (inFlightFiles.size() > 0) {
|
||||
if (inFlightFiles.front().future.isReady()) {
|
||||
BlobFileIndex completedFile = wait(inFlightFiles.front().future);
|
||||
if (inFlightFiles.front().snapshot) {
|
||||
if (metadata->files.deltaFiles.empty()) {
|
||||
ASSERT(completedFile.version == metadata->initialSnapshotVersion);
|
||||
} else {
|
||||
ASSERT(completedFile.version == metadata->files.deltaFiles.back().version);
|
||||
}
|
||||
metadata->files.snapshotFiles.push_back(completedFile);
|
||||
metadata->durableSnapshotVersion.set(completedFile.version);
|
||||
pendingSnapshots--;
|
||||
}
|
||||
inFlightFiles.pop_front();
|
||||
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
|
||||
} else {
|
||||
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
|
||||
}
|
||||
}
|
||||
metadata->readable.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
||||
checkMergeCandidate = granuleCheckMergeCandidate(bwData,
|
||||
metadata,
|
||||
|
@ -3626,12 +3646,10 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
state Reference<GranuleMetadata> metadata = m;
|
||||
// state Version granuleBeginVersion = req.beginVersion;
|
||||
// skip waiting for CF ready for recovery mode
|
||||
if (!bwData->isFullRestoreMode) {
|
||||
choose {
|
||||
when(wait(metadata->readable.getFuture())) {}
|
||||
when(wait(metadata->cancelled.getFuture())) {
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
choose {
|
||||
when(wait(metadata->readable.getFuture())) {}
|
||||
when(wait(metadata->cancelled.getFuture())) {
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4035,7 +4053,6 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
state Future<Optional<Value>> fLockValue = tr.get(lockKey);
|
||||
state Future<ForcedPurgeState> fForcedPurgeState = getForcePurgedState(&tr, req.keyRange);
|
||||
Future<Optional<GranuleHistory>> fHistory = getLatestGranuleHistory(&tr, req.keyRange);
|
||||
|
||||
Optional<GranuleHistory> history = wait(fHistory);
|
||||
info.history = history;
|
||||
|
||||
|
@ -4137,6 +4154,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
if (info.existingFiles.get().snapshotFiles.empty()) {
|
||||
ASSERT(info.existingFiles.get().deltaFiles.empty());
|
||||
info.previousDurableVersion = invalidVersion;
|
||||
info.doSnapshot = true;
|
||||
} else if (info.existingFiles.get().deltaFiles.empty()) {
|
||||
info.previousDurableVersion = info.existingFiles.get().snapshotFiles.back().version;
|
||||
} else {
|
||||
|
@ -4162,8 +4180,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
// If anything in previousGranules, need to do the handoff logic and set
|
||||
// ret.previousChangeFeedId, and the previous durable version will come from the previous
|
||||
// granules
|
||||
if (info.history.present() && info.history.get().value.parentVersions.size() > 0 &&
|
||||
!bwData->isFullRestoreMode) {
|
||||
if (info.history.present() && info.history.get().value.parentVersions.size() > 0) {
|
||||
CODE_PROBE(true, "Granule open found parent");
|
||||
if (info.history.get().value.parentVersions.size() == 1) { // split
|
||||
state KeyRangeRef parentRange(info.history.get().value.parentBoundaries[0],
|
||||
|
@ -4195,11 +4212,13 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
info.changeFeedStartVersion = granuleSplitState.second;
|
||||
} else if (granuleSplitState.first == BlobGranuleSplitState::Initialized) {
|
||||
CODE_PROBE(true, "Granule open found granule in initialized state");
|
||||
wait(updateGranuleSplitState(&tr,
|
||||
info.splitParentGranule.get().first,
|
||||
info.splitParentGranule.get().second,
|
||||
info.granuleID,
|
||||
BlobGranuleSplitState::Assigned));
|
||||
if (!bwData->isFullRestoreMode) {
|
||||
wait(updateGranuleSplitState(&tr,
|
||||
info.splitParentGranule.get().first,
|
||||
info.splitParentGranule.get().second,
|
||||
info.granuleID,
|
||||
BlobGranuleSplitState::Assigned));
|
||||
}
|
||||
// change feed was created as part of this transaction, changeFeedStartVersion
|
||||
// will be set later
|
||||
} else {
|
||||
|
|
|
@ -2226,7 +2226,6 @@ ACTOR Future<Void> monitorConsistencyScan(ClusterControllerData* self) {
|
|||
TraceEvent("CCMonitorConsistencyScanWaitingForRecovery", self->id).log();
|
||||
wait(self->db.serverInfo->onChange());
|
||||
}
|
||||
|
||||
TraceEvent("CCMonitorConsistencyScan", self->id).log();
|
||||
loop {
|
||||
if (self->db.serverInfo->get().consistencyScan.present() && !self->recruitConsistencyScan.get()) {
|
||||
|
@ -2375,11 +2374,12 @@ ACTOR Future<Void> watchBlobRestoreCommand(ClusterControllerData* self) {
|
|||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Optional<Value> blobRestoreCommand = wait(tr->get(blobRestoreCommandKey));
|
||||
if (blobRestoreCommand.present()) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(blobRestoreCommand.get());
|
||||
TraceEvent("WatchBlobRestoreCommand").detail("Progress", status.progress).detail("Phase", status.phase);
|
||||
state Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(blobRestoreCommand.get());
|
||||
TraceEvent("WatchBlobRestore", self->id).detail("Phase", status.phase);
|
||||
if (status.phase == BlobRestorePhase::INIT) {
|
||||
self->db.blobRestoreEnabled.set(true);
|
||||
if (self->db.blobGranulesEnabled.get()) {
|
||||
wait(updateRestoreStatus(
|
||||
self->cx, normalKeys, BlobRestoreStatus(BlobRestorePhase::STARTING_MIGRATOR), {}));
|
||||
const auto& blobManager = self->db.serverInfo->get().blobManager;
|
||||
if (blobManager.present()) {
|
||||
BlobManagerSingleton(blobManager)
|
||||
|
@ -2389,8 +2389,12 @@ ACTOR Future<Void> watchBlobRestoreCommand(ClusterControllerData* self) {
|
|||
if (blobMigrator.present()) {
|
||||
BlobMigratorSingleton(blobMigrator).halt(*self, blobMigrator.get().locality.processId());
|
||||
}
|
||||
} else {
|
||||
TraceEvent("SkipBlobRestoreInitCommand", self->id).log();
|
||||
wait(updateRestoreStatus(self->cx, normalKeys, BlobRestoreStatus(BlobRestorePhase::ERROR), {}));
|
||||
}
|
||||
}
|
||||
self->db.blobRestoreEnabled.set(status.phase < BlobRestorePhase::DONE);
|
||||
}
|
||||
|
||||
state Future<Void> watch = tr->watch(blobRestoreCommandKey);
|
||||
|
@ -2483,8 +2487,8 @@ ACTOR Future<Void> monitorBlobMigrator(ClusterControllerData* self) {
|
|||
loop {
|
||||
choose {
|
||||
when(wait(wfClient)) {
|
||||
TraceEvent("CCBlobMigratorDied", self->id)
|
||||
.detail("MGID", self->db.serverInfo->get().blobMigrator.get().id());
|
||||
UID mgID = self->db.serverInfo->get().blobMigrator.get().id();
|
||||
TraceEvent("CCBlobMigratorDied", self->id).detail("MGID", mgID);
|
||||
self->db.clearInterf(ProcessClass::BlobMigratorClass);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -2466,22 +2466,28 @@ ACTOR static Future<JsonBuilderObject> blobRestoreStatusFetcher(Database db, std
|
|||
case BlobRestorePhase::INIT:
|
||||
statusObj["blob_full_restore_phase"] = "Initializing";
|
||||
break;
|
||||
case BlobRestorePhase::LOAD_MANIFEST:
|
||||
case BlobRestorePhase::STARTING_MIGRATOR:
|
||||
statusObj["blob_full_restore_phase"] = "Starting migrator";
|
||||
break;
|
||||
case BlobRestorePhase::LOADING_MANIFEST:
|
||||
statusObj["blob_full_restore_phase"] = "Loading manifest";
|
||||
break;
|
||||
case BlobRestorePhase::MANIFEST_DONE:
|
||||
statusObj["blob_full_restore_phase"] = "Manifest loaded";
|
||||
case BlobRestorePhase::LOADED_MANIFEST:
|
||||
statusObj["blob_full_restore_phase"] = "Manifest is loaded";
|
||||
break;
|
||||
case BlobRestorePhase::MIGRATE:
|
||||
case BlobRestorePhase::COPYING_DATA:
|
||||
statusObj["blob_full_restore_phase"] = "Copying data";
|
||||
statusObj["blob_full_restore_progress"] = status.get().progress;
|
||||
statusObj["blob_full_restore_progress"] = status.get().status;
|
||||
break;
|
||||
case BlobRestorePhase::APPLY_MLOGS:
|
||||
case BlobRestorePhase::APPLYING_MLOGS:
|
||||
statusObj["blob_full_restore_phase"] = "Applying mutation logs";
|
||||
statusObj["blob_full_restore_progress"] = status.get().progress;
|
||||
statusObj["blob_full_restore_progress"] = status.get().status;
|
||||
break;
|
||||
case BlobRestorePhase::DONE:
|
||||
statusObj["blob_full_restore_phase"] = "Completed";
|
||||
statusObj["blob_full_restore_phase"] = "Completed successfully";
|
||||
break;
|
||||
case BlobRestorePhase::ERROR:
|
||||
statusObj["blob_full_restore_phase"] = "Completed with fatal error";
|
||||
break;
|
||||
default:
|
||||
statusObj["blob_full_restore_phase"] = "Unexpected phase";
|
||||
|
|
|
@ -163,8 +163,11 @@ ACTOR Future<Void> printRestoreSummary(Database db, Reference<BlobConnectionProv
|
|||
ACTOR Future<BlobGranuleRestoreVersionVector> listBlobGranules(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<int64_t> lastBlobEpoc(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef range);
|
||||
ACTOR Future<Void> updateRestoreStatus(Database db, KeyRangeRef range, BlobRestoreStatus status);
|
||||
ACTOR Future<KeyRange> getRestoringRange(Database db, KeyRangeRef keys);
|
||||
ACTOR Future<Void> updateRestoreStatus(Database db,
|
||||
KeyRangeRef range,
|
||||
BlobRestoreStatus status,
|
||||
Optional<BlobRestorePhase> expectedPhase);
|
||||
ACTOR Future<std::pair<KeyRange, BlobRestoreStatus>> getRestoreRangeStatus(Database db, KeyRangeRef keys);
|
||||
ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRangeRef range);
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
||||
|
|
|
@ -6285,6 +6285,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> tryReadBlobGranules(Tra
|
|||
|
||||
// Read keys from blob storage if they exist. Fail back to tryGetRange, which reads keys
|
||||
// from storage servers with locally attached disks
|
||||
|
||||
ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
|
||||
Transaction* tr,
|
||||
KeyRange keys,
|
||||
|
@ -6321,7 +6322,7 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
|
|||
.detail("Error", e.what());
|
||||
tr->reset();
|
||||
tr->setVersion(fetchVersion);
|
||||
throw;
|
||||
results.sendError(e);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -7229,8 +7230,15 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
state PromiseStream<RangeResult> results;
|
||||
state Future<Void> hold;
|
||||
if (isFullRestore) {
|
||||
KeyRange range = wait(getRestoringRange(data->cx, keys));
|
||||
hold = tryGetRangeFromBlob(results, &tr, range, fetchVersion, data->blobConn);
|
||||
std::pair<KeyRange, BlobRestoreStatus> rangeStatus = wait(getRestoreRangeStatus(data->cx, keys));
|
||||
// Read from blob only when it's copying data for full restore. Otherwise it may cause data corruptions
|
||||
// e.g we don't want to copy from blob any more when it's applying mutation logs(APPLYING_MLOGS)
|
||||
if (rangeStatus.second.phase == BlobRestorePhase::COPYING_DATA ||
|
||||
rangeStatus.second.phase == BlobRestorePhase::ERROR) {
|
||||
hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, fetchVersion, data->blobConn);
|
||||
} else {
|
||||
hold = tryGetRange(results, &tr, keys);
|
||||
}
|
||||
} else {
|
||||
hold = tryGetRange(results, &tr, keys);
|
||||
}
|
||||
|
@ -7295,7 +7303,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
} catch (Error& e) {
|
||||
if (e.code() != error_code_end_of_stream && e.code() != error_code_connection_failed &&
|
||||
e.code() != error_code_transaction_too_old && e.code() != error_code_future_version &&
|
||||
e.code() != error_code_process_behind && e.code() != error_code_server_overloaded) {
|
||||
e.code() != error_code_process_behind && e.code() != error_code_server_overloaded &&
|
||||
e.code() != error_code_blob_granule_request_failed &&
|
||||
e.code() != error_code_blob_granule_transaction_too_old) {
|
||||
throw;
|
||||
}
|
||||
lastError = e;
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* BlobRestoreWorkload.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbclient/ClientBooleanParams.h"
|
||||
#include "fdbclient/ClientKnobs.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbclient/BackupContainerFileSystem.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// This worload provides building blocks to test blob restore. The following 2 functions are offered:
|
||||
// 1) SetupBlob - blobbify key ranges so that we could backup fdb to a blob storage
|
||||
// 2) PerformRestore - Start blob restore to the extra db instance and wait until it finishes
|
||||
//
|
||||
// A general flow to test blob restore:
|
||||
// 1) start two db instances and blobbify normalKeys for the default db
|
||||
// 2) submit mutation log only backup to the default db with IncrementalBackup
|
||||
// 3) start cycle workload to write data to the default db
|
||||
// 4) perform blob restore to the extra db
|
||||
// 5) verify data in the extra db
|
||||
//
|
||||
// Please refer to BlobRestoreBasic.toml to see how to run a blob restore test with the help from IncrementalBackup
|
||||
// and Cycle.
|
||||
//
|
||||
struct BlobRestoreWorkload : TestWorkload {
|
||||
static constexpr auto NAME = "BlobRestoreWorkload";
|
||||
BlobRestoreWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
ASSERT(g_simulator->extraDatabases.size() == 1); // extra db must be enabled
|
||||
extraDb_ = Database::createSimulatedExtraDatabase(g_simulator->extraDatabases[0]);
|
||||
setupBlob_ = getOption(options, "setupBlob"_sr, false);
|
||||
performRestore_ = getOption(options, "performRestore"_sr, false);
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (clientId != 0)
|
||||
return Void();
|
||||
return _start(cx, this);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _start(Database cx, BlobRestoreWorkload* self) {
|
||||
state bool result = false;
|
||||
if (self->setupBlob_) {
|
||||
fmt::print("Blobbify normal range\n");
|
||||
wait(store(result, cx->blobbifyRange(normalKeys)));
|
||||
}
|
||||
|
||||
if (self->performRestore_) {
|
||||
fmt::print("Perform blob restore\n");
|
||||
wait(store(result, self->extraDb_->blobRestore(normalKeys)));
|
||||
|
||||
state std::vector<Future<Void>> futures;
|
||||
futures.push_back(self->runBackupAgent(self));
|
||||
futures.push_back(self->monitorProgress(cx, self));
|
||||
wait(waitForAny(futures));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Start backup agent on the extra db
|
||||
ACTOR Future<Void> runBackupAgent(BlobRestoreWorkload* self) {
|
||||
state FileBackupAgent backupAgent;
|
||||
state Future<Void> future = backupAgent.run(
|
||||
self->extraDb_, 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE, CLIENT_KNOBS->SIM_BACKUP_TASKS_PER_AGENT);
|
||||
wait(Future<Void>(Never()));
|
||||
throw internal_error();
|
||||
}
|
||||
|
||||
// Monitor restore progress and copy data back to original db after successful restore
|
||||
ACTOR Future<Void> monitorProgress(Database cx, BlobRestoreWorkload* self) {
|
||||
loop {
|
||||
Optional<BlobRestoreStatus> status = wait(getRestoreStatus(self->extraDb_, normalKeys));
|
||||
if (status.present()) {
|
||||
state BlobRestoreStatus s = status.get();
|
||||
if (s.phase == BlobRestorePhase::DONE) {
|
||||
wait(copyToOriginalDb(cx, self));
|
||||
return Void();
|
||||
}
|
||||
// TODO need to define more specific error handling
|
||||
if (s.phase == BlobRestorePhase::ERROR) {
|
||||
fmt::print("Unexpected restore error code = {}\n", s.status);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
wait(delay(1));
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
ACTOR static Future<Void> copyToOriginalDb(Database cx, BlobRestoreWorkload* self) {
|
||||
state RangeResult data;
|
||||
|
||||
// Read data from restored db
|
||||
state Transaction tr1(self->extraDb_->clone());
|
||||
loop {
|
||||
try {
|
||||
RangeResult result = wait(tr1.getRange(normalKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
data = result;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr1.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
// Write back to original db for Cycle worker load to verify
|
||||
state Transaction tr2(cx);
|
||||
loop {
|
||||
try {
|
||||
tr2.clear(normalKeys);
|
||||
for (auto kv : data) {
|
||||
tr2.set(kv.key, kv.value);
|
||||
}
|
||||
wait(tr2.commit());
|
||||
fmt::print("Copied {} rows to origin db\n", data.size());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr2.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.emplace("Attrition"); }
|
||||
|
||||
private:
|
||||
Database extraDb_;
|
||||
bool setupBlob_;
|
||||
bool performRestore_;
|
||||
};
|
||||
|
||||
WorkloadFactory<BlobRestoreWorkload> BlobRestoreWorkloadFactory;
|
|
@ -121,12 +121,12 @@ struct IncrementalBackupWorkload : TestWorkload {
|
|||
.detail("ContiguousLogEndVersion",
|
||||
desc.contiguousLogEnd.present() ? desc.contiguousLogEnd.get() : invalidVersion)
|
||||
.detail("TargetVersion", v);
|
||||
if (self->waitRetries != -1 && tries > self->waitRetries)
|
||||
break;
|
||||
if (!desc.contiguousLogEnd.present())
|
||||
continue;
|
||||
if (desc.contiguousLogEnd.get() >= v)
|
||||
break;
|
||||
if (self->waitRetries != -1 && tries > self->waitRetries)
|
||||
break;
|
||||
// Avoid spamming requests with a delay
|
||||
wait(delay(5.0));
|
||||
}
|
||||
|
|
|
@ -141,6 +141,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES fast/BlobGranuleVerifySmallClean.toml)
|
||||
# TODO: test occasionally times out due to too many change feed shard parts
|
||||
add_fdb_test(TEST_FILES fast/BlobGranuleMoveVerifyCycle.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES fast/BlobRestoreBasic.toml)
|
||||
add_fdb_test(TEST_FILES fast/CacheTest.toml)
|
||||
add_fdb_test(TEST_FILES fast/CloggedSideband.toml)
|
||||
add_fdb_test(TEST_FILES fast/CompressionUtilsUnit.toml)
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
[configuration]
|
||||
testClass = "BlobRestore"
|
||||
blobGranulesEnabled = true
|
||||
extraDatabaseMode = 'Single'
|
||||
allowDefaultTenant = false
|
||||
disableTss = true
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
[[knobs]]
|
||||
bg_consistency_check_enabled = 0
|
||||
blob_manifest_backup = true
|
||||
shard_encode_location_metadata = false
|
||||
bw_throttling_enabled = false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SetupBlob'
|
||||
simBackupAgents = 'BackupToFile'
|
||||
runConsistencyCheck = false
|
||||
clearAfterTest = false
|
||||
waitForQuiescence = false
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'BlobRestoreWorkload'
|
||||
setupBlob = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BackupMutationLogs'
|
||||
simBackupAgents = 'BackupToFile'
|
||||
runConsistencyCheck = false
|
||||
waitForQuiescence = false
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IncrementalBackup'
|
||||
tag = 'default'
|
||||
submitOnly = true
|
||||
waitForBackup = true
|
||||
waitRetries = 100
|
||||
|
||||
[[test]]
|
||||
testTitle = 'CycleTest'
|
||||
simBackupAgents = 'BackupToFile'
|
||||
runConsistencyCheck = false
|
||||
waitForQuiescence = false
|
||||
clearAfterTest = false
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Cycle'
|
||||
nodeCount = 3000
|
||||
transactionsPerSecond = 3000.0
|
||||
testDuration = 10.0
|
||||
expectedRate = 0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IncrementalBackup'
|
||||
tag = 'default'
|
||||
waitForBackup = true
|
||||
stopBackup = true
|
||||
waitRetries = 500
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BlobRestore'
|
||||
simBackupAgents = 'BackupToFile'
|
||||
runConsistencyCheck = false
|
||||
clearAfterTest = false
|
||||
waitForQuiescence = false
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'BlobRestoreWorkload'
|
||||
performRestore = true
|
Loading…
Reference in New Issue