blobrestore to previous point of time

This commit is contained in:
Hui Liu 2022-12-03 08:45:23 -08:00
parent ca9464ae23
commit c85b984c3a
11 changed files with 189 additions and 46 deletions

View File

@ -33,8 +33,18 @@ ACTOR Future<bool> blobRestoreCommandActor(Database localDb, std::vector<StringR
return false;
}
Optional<Version> version;
if (tokens.size() > 1) {
Version v;
if (sscanf(tokens[1].toString().c_str(), "%" PRId64, &v) != 1) {
printUsage(tokens[0]);
return false;
}
version = v;
}
state bool success = false;
wait(store(success, localDb->blobRestore(normalKeys)));
wait(store(success, localDb->blobRestore(normalKeys, version)));
if (success) {
fmt::print(
"Started blob restore for the full cluster. Please use 'status details' command to check progress.\n");
@ -44,5 +54,5 @@ ACTOR Future<bool> blobRestoreCommandActor(Database localDb, std::vector<StringR
return success;
}
CommandFactory blobRestoreFactory("blobrestore", CommandHelp("blobrestore", "", ""));
CommandFactory blobRestoreFactory("blobrestore", CommandHelp("blobrestore [version]", "", ""));
} // namespace fdb_cli

View File

@ -10918,7 +10918,7 @@ Future<Standalone<VectorRef<KeyRangeRef>>> DatabaseContext::listBlobbifiedRanges
return listBlobbifiedRangesActor(Reference<DatabaseContext>::addRef(this), range, rangeLimit, tenantName);
}
ACTOR Future<bool> blobRestoreActor(Reference<DatabaseContext> cx, KeyRange range) {
ACTOR Future<bool> blobRestoreActor(Reference<DatabaseContext> cx, KeyRange range, Optional<Version> version) {
state Database db(cx);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
loop {
@ -10937,6 +10937,11 @@ ACTOR Future<bool> blobRestoreActor(Reference<DatabaseContext> cx, KeyRange rang
BlobRestoreStatus status(BlobRestorePhase::INIT);
Value newValue = blobRestoreCommandValueFor(status);
tr->set(key, newValue);
BlobRestoreArg arg(version);
Value argValue = blobRestoreArgValueFor(arg);
tr->set(blobRestoreArgKeyFor(range), argValue);
wait(tr->commit());
return true;
} catch (Error& e) {
@ -10945,8 +10950,8 @@ ACTOR Future<bool> blobRestoreActor(Reference<DatabaseContext> cx, KeyRange rang
}
}
Future<bool> DatabaseContext::blobRestore(KeyRange range) {
return blobRestoreActor(Reference<DatabaseContext>::addRef(this), range);
Future<bool> DatabaseContext::blobRestore(KeyRange range, Optional<Version> version) {
return blobRestoreActor(Reference<DatabaseContext>::addRef(this), range, version);
}
int64_t getMaxKeySize(KeyRef const& key) {

View File

@ -1708,6 +1708,35 @@ Standalone<BlobRestoreStatus> decodeBlobRestoreStatus(ValueRef const& value) {
return status;
}
const KeyRangeRef blobRestoreArgKeys("\xff\x02/blobRestoreArgs/"_sr, "\xff\x02/blobRestoreArgs0"_sr);
const Value blobRestoreArgKeyFor(const KeyRangeRef range) {
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
wr.serializeBytes(blobRestoreArgKeys.begin);
wr << range;
return wr.toValue();
}
const KeyRange decodeBlobRestoreArgKeyFor(const KeyRef key) {
KeyRange range;
BinaryReader reader(key.removePrefix(blobRestoreArgKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule()));
reader >> range;
return range;
}
const Value blobRestoreArgValueFor(BlobRestoreArg args) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << args;
return wr.toValue();
}
Standalone<BlobRestoreArg> decodeBlobRestoreArg(ValueRef const& value) {
Standalone<BlobRestoreArg> args;
BinaryReader reader(value, IncludeVersion());
reader >> args;
return args;
}
const KeyRangeRef storageQuotaKeys("\xff/storageQuota/"_sr, "\xff/storageQuota0"_sr);
const KeyRef storageQuotaPrefix = storageQuotaKeys.begin;

View File

@ -339,4 +339,17 @@ struct BlobRestoreStatus {
}
};
struct BlobRestoreArg {
constexpr static FileIdentifier file_identifier = 947689;
Optional<Version> version;
BlobRestoreArg() {}
BlobRestoreArg(Optional<Version> v) : version(v){};
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version);
}
};
#endif

View File

@ -412,7 +412,7 @@ public:
Future<Version> verifyBlobRange(const KeyRange& range,
Optional<Version> version,
Optional<TenantName> tenantName = {});
Future<bool> blobRestore(const KeyRange range);
Future<bool> blobRestore(const KeyRange range, Optional<Version> version);
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,

View File

@ -726,6 +726,11 @@ const Value blobRestoreCommandKeyFor(const KeyRangeRef range);
const KeyRange decodeBlobRestoreCommandKeyFor(const KeyRef key);
const Value blobRestoreCommandValueFor(BlobRestoreStatus status);
Standalone<BlobRestoreStatus> decodeBlobRestoreStatus(ValueRef const& value);
extern const KeyRangeRef blobRestoreArgKeys;
const Value blobRestoreArgKeyFor(const KeyRangeRef range);
const KeyRange decodeBlobRestoreArgKeyFor(const KeyRef key);
const Value blobRestoreArgValueFor(BlobRestoreArg args);
Standalone<BlobRestoreArg> decodeBlobRestoreArg(ValueRef const& value);
// Storage quota per tenant
// "\xff/storageQuota/[[tenantGroupName]]" := "[[quota]]"

View File

@ -626,3 +626,59 @@ ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRange
}
return result;
}
// Get restore argument
ACTOR Future<Optional<BlobRestoreArg>> getRestoreArg(Database db, KeyRangeRef keys) {
state Transaction tr(db);
state Optional<BlobRestoreArg> result;
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
limits.minRows = 0;
state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreArgKeys.begin);
state KeySelectorRef end = firstGreaterOrEqual(blobRestoreArgKeys.end);
loop {
RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True));
for (auto& r : ranges) {
KeyRange keyRange = decodeBlobRestoreArgKeyFor(r.key);
if (keys.intersects(keyRange)) {
Standalone<BlobRestoreArg> arg = decodeBlobRestoreArg(r.value);
result = arg;
return result;
}
}
if (!ranges.more) {
break;
}
if (ranges.readThrough.present()) {
begin = firstGreaterOrEqual(ranges.readThrough.get());
} else {
begin = firstGreaterThan(ranges.back().key);
}
}
return result;
} catch (Error& e) {
wait(tr.onError(e));
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Get restore target version. Return defaultVersion if no restore argument available for the range
ACTOR Future<Version> getRestoreTargetVersion(Database db, KeyRangeRef range, Version defaultVersion) {
Optional<BlobRestoreArg> arg = wait(getRestoreArg(db, range));
Version expected = defaultVersion;
if (arg.present()) {
if (arg.get().version.present()) {
return arg.get().version.get();
}
}
return expected;
}

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include <algorithm>
#include <cmath>
#include <string>
#include "fdbclient/ClientBooleanParams.h"
#include "fdbserver/RestoreUtil.h"
@ -315,51 +316,60 @@ private:
}
}
// Find mutation logs url
static std::string mlogsUrl(Reference<BlobMigrator> self) {
std::string url = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL;
// A special case for local directory.
// See FileBackupAgent.actor.cpp. if the container string describes a local directory then "/backup-<timestamp>"
// will be added to it. so we need to append this directory name to the url
if (url.find("file://") == 0) {
std::string path = url.substr(7);
path.erase(path.find_last_not_of("\\/") + 1); // Remove trailing slashes
std::vector<std::string> dirs = platform::listDirectories(path);
if (dirs.empty()) {
TraceEvent(SevError, "BlobMigratorMissingMutationLogs").detail("Url", url);
throw restore_missing_data();
// Check if we need to apply mutation logs. If all granules have data up to targetVersion, we don't need to apply
// mutation logs
static bool needApplyLogs(Reference<BlobMigrator> self, Version targetVersion) {
for (auto& granule : self->blobGranules_) {
if (granule.version < targetVersion) {
// at least one granule doesn't have data up to target version, so we'll need to apply mutation logs
return true;
}
// Pick the newest backup folder
std::sort(dirs.begin(), dirs.end());
std::string name = dirs.back();
url.erase(url.find_last_not_of("\\/") + 1); // Remove trailing slashes
return url + "/" + name;
}
return url;
return false;
}
// Apply mutation logs to blob granules so that they reach to a consistent version for all blob granules
ACTOR static Future<Void> applyMutationLogs(Reference<BlobMigrator> self) {
state std::string mutationLogsUrl = mlogsUrl(self);
TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Url", mutationLogsUrl);
// check last version in mutation logs
Optional<std::string> proxy; // unused
Optional<std::string> encryptionKeyFile; // unused
state Reference<IBackupContainer> bc =
IBackupContainer::openContainer(mutationLogsUrl, proxy, encryptionKeyFile);
state std::string baseUrl = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL;
state std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl, {}));
if (containers.size() == 0) {
TraceEvent("MissingMutationLogs", self->interf_.id()).detail("Url", baseUrl);
throw restore_missing_data();
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(containers.front(), {}, {});
BackupDescription desc = wait(bc->describeBackup());
if (!desc.contiguousLogEnd.present()) {
TraceEvent("InvalidMutationLogs").detail("Url", mutationLogsUrl);
TraceEvent("InvalidMutationLogs").detail("Url", baseUrl);
throw restore_missing_data();
}
if (!desc.minLogBegin.present()) {
TraceEvent("InvalidMutationLogs").detail("Url", mutationLogsUrl);
TraceEvent("InvalidMutationLogs").detail("Url", baseUrl);
throw restore_missing_data();
}
state Version minLogVersion = desc.minLogBegin.get();
state Version maxLogVersion = desc.contiguousLogEnd.get() - 1;
state Version targetVersion = wait(getRestoreTargetVersion(self->db_, normalKeys, maxLogVersion));
if (targetVersion < maxLogVersion) {
if (!needApplyLogs(self, targetVersion)) {
TraceEvent("SkipMutationLogs").detail("TargetVersion", targetVersion);
dprint("Skip mutation logs as all granules are at version {}\n", targetVersion);
return Void();
}
}
if (targetVersion < minLogVersion) {
TraceEvent("MissingMutationLogs")
.detail("MinLogVersion", minLogVersion)
.detail("TargetVersion", maxLogVersion);
throw restore_missing_data();
}
if (targetVersion > maxLogVersion) {
TraceEvent("SkipTargetVersion")
.detail("MaxLogVersion", maxLogVersion)
.detail("TargetVersion", targetVersion);
}
// restore to target version
state Standalone<VectorRef<KeyRangeRef>> ranges;
@ -370,11 +380,12 @@ private:
.detail("Granule", granule.granuleID)
.detail("GranuleVersion", granule.version)
.detail("MinLogVersion", minLogVersion)
.detail("MaxLogVersion", maxLogVersion);
.detail("MaxLogVersion", maxLogVersion)
.detail("TargetVersion", targetVersion);
throw restore_missing_data();
}
// no need to apply mutation logs if granule is already on that version
if (granule.version < maxLogVersion) {
if (granule.version < targetVersion) {
ranges.push_back(ranges.arena(), granule.keyRange);
// Blob granule ends at granule.version(inclusive), so we need to apply mutation logs
// after granule.version(exclusive).
@ -391,9 +402,11 @@ private:
throw restore_missing_data();
}
std::string tagName = "blobrestore-" + self->interf_.id().shortString();
TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Version", minLogVersion);
TraceEvent("ApplyMutationLogs", self->interf_.id())
.detail("MinVer", minLogVersion)
.detail("MaxVer", maxLogVersion);
wait(submitRestore(self, KeyRef(tagName), KeyRef(mutationLogsUrl), ranges, beginVersions));
wait(submitRestore(self, KeyRef(tagName), KeyRef(containers.front()), ranges, beginVersions, targetVersion));
return Void();
}
@ -402,7 +415,8 @@ private:
Key tagName,
Key mutationLogsUrl,
Standalone<VectorRef<KeyRangeRef>> ranges,
Standalone<VectorRef<Version>> beginVersions) {
Standalone<VectorRef<Version>> beginVersions,
Version endVersion) {
state Optional<std::string> proxy; // unused
state Optional<Database> origDb; // unused
@ -415,7 +429,7 @@ private:
ranges,
beginVersions,
WaitForComplete::True,
invalidVersion,
endVersion,
Verbose::True,
""_sr, // addPrefix
""_sr, // removePrefix
@ -423,6 +437,7 @@ private:
UnlockDB::True,
OnlyApplyMutationLogs::True));
TraceEvent("ApplyMutationLogsComplete", self->interf_.id()).detail("Version", version);
dprint("Restore to version {} done. Target version {} \n", version, endVersion);
return Void();
}

View File

@ -169,6 +169,8 @@ ACTOR Future<Void> updateRestoreStatus(Database db,
Optional<BlobRestorePhase> expectedPhase);
ACTOR Future<std::pair<KeyRange, BlobRestoreStatus>> getRestoreRangeStatus(Database db, KeyRangeRef keys);
ACTOR Future<Optional<BlobRestoreStatus>> getRestoreStatus(Database db, KeyRangeRef range);
ACTOR Future<Optional<BlobRestoreArg>> getRestoreArg(Database db, KeyRangeRef range);
ACTOR Future<Version> getRestoreTargetVersion(Database db, KeyRangeRef range, Version defaultVersion);
#include "flow/unactorcompiler.h"
#endif

View File

@ -6339,7 +6339,10 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> tryReadBlobGranules(Tra
loop {
try {
Standalone<VectorRef<BlobGranuleChunkRef>> chunks = wait(tr->readBlobGranules(keys, 0, readVersion));
TraceEvent(SevDebug, "ReadBlobGranules").detail("Keys", keys).detail("Chunks", chunks.size());
TraceEvent(SevDebug, "ReadBlobGranules")
.detail("Keys", keys)
.detail("Chunks", chunks.size())
.detail("FetchVersion", fetchVersion);
return chunks;
} catch (Error& e) {
if (retryCount >= maxRetryCount) {
@ -6372,7 +6375,10 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
for (i = 0; i < chunks.size(); ++i) {
state KeyRangeRef chunkRange = chunks[i].keyRange;
state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn));
TraceEvent(SevDebug, "ReadBlobData").detail("Rows", rows.size()).detail("ChunkRange", chunkRange);
TraceEvent(SevDebug, "ReadBlobData")
.detail("Rows", rows.size())
.detail("ChunkRange", chunkRange)
.detail("FetchVersion", fetchVersion);
if (rows.size() == 0) {
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
}
@ -7298,12 +7304,13 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state PromiseStream<RangeResult> results;
state Future<Void> hold;
if (isFullRestore) {
std::pair<KeyRange, BlobRestoreStatus> rangeStatus = wait(getRestoreRangeStatus(data->cx, keys));
state std::pair<KeyRange, BlobRestoreStatus> rangeStatus = wait(getRestoreRangeStatus(data->cx, keys));
// Read from blob only when it's copying data for full restore. Otherwise it may cause data corruptions
// e.g we don't want to copy from blob any more when it's applying mutation logs(APPLYING_MLOGS)
if (rangeStatus.second.phase == BlobRestorePhase::COPYING_DATA ||
rangeStatus.second.phase == BlobRestorePhase::ERROR) {
hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, fetchVersion, data->blobConn);
Version targetVersion = wait(getRestoreTargetVersion(data->cx, keys, fetchVersion));
hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, targetVersion, data->blobConn);
} else {
hold = tryGetRange(results, &tr, keys);
}

View File

@ -24,6 +24,7 @@
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/workloads/workloads.actor.h"
@ -72,7 +73,7 @@ struct BlobRestoreWorkload : TestWorkload {
if (self->performRestore_) {
fmt::print("Perform blob restore\n");
wait(store(result, self->extraDb_->blobRestore(normalKeys)));
wait(store(result, self->extraDb_->blobRestore(normalKeys, {})));
state std::vector<Future<Void>> futures;
futures.push_back(self->runBackupAgent(self));