Merge pull request #8109 from sfc-gh-huliu/bmr
Add blob manifest for full restore
This commit is contained in:
commit
b19f1b5e3b
|
@ -962,6 +962,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 );
|
||||
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT, 1.5 );
|
||||
init( BLOB_MANAGER_CONCURRENT_MERGE_CHECKS, 64 ); if( randomize && BUGGIFY ) BLOB_MANAGER_CONCURRENT_MERGE_CHECKS = 1 << deterministicRandom()->randomInt(0, 7);
|
||||
init( BLOB_MANIFEST_BACKUP, false );
|
||||
init( BLOB_FULL_RESTORE_MODE, false );
|
||||
|
||||
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
|
||||
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );
|
||||
|
|
|
@ -276,4 +276,15 @@ struct BlobGranuleHistoryValue {
|
|||
}
|
||||
};
|
||||
|
||||
// A manifest to assist full fdb restore from blob granule files
|
||||
struct BlobManifest {
|
||||
constexpr static FileIdentifier file_identifier = 298872;
|
||||
VectorRef<KeyValueRef> rows;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, rows);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -943,6 +943,8 @@ public:
|
|||
int BLOB_MANAGER_CONCURRENT_MERGE_CHECKS;
|
||||
double BGCC_TIMEOUT;
|
||||
double BGCC_MIN_INTERVAL;
|
||||
bool BLOB_MANIFEST_BACKUP;
|
||||
bool BLOB_FULL_RESTORE_MODE;
|
||||
|
||||
// Blob metadata
|
||||
int64_t BLOB_METADATA_CACHE_TTL;
|
||||
|
|
|
@ -155,7 +155,7 @@ void GranuleFiles::getFiles(Version beginVersion,
|
|||
int64_t& deltaBytesCounter,
|
||||
bool summarize) const {
|
||||
BlobFileIndex dummyIndex; // for searching
|
||||
|
||||
ASSERT(!snapshotFiles.empty());
|
||||
// if beginVersion == 0 or we can collapse, find the latest snapshot <= readVersion
|
||||
auto snapshotF = snapshotFiles.end();
|
||||
if (beginVersion == 0 || canCollapse) {
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include <vector>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "fdbclient/ServerKnobs.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fmt/format.h"
|
||||
#include "fdbclient/BackupContainerFileSystem.h"
|
||||
|
@ -1944,6 +1945,7 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
|
|||
for (auto it = splitPoints.boundaries.begin(); it != splitPoints.boundaries.end(); it++) {
|
||||
bmData->mergeBoundaries[it->first] = it->second;
|
||||
}
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
|
@ -3455,6 +3457,10 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
|
|||
// Once we acknowledge the existing blob workers, we can go ahead and recruit new ones
|
||||
bmData->startRecruiting.trigger();
|
||||
|
||||
bmData->initBStore();
|
||||
if (isFullRestoreMode())
|
||||
wait(loadManifest(bmData->db, bmData->bstore));
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
|
||||
|
||||
// set up force purge keys if not done already
|
||||
|
@ -5042,6 +5048,28 @@ ACTOR Future<int64_t> bgccCheckGranule(Reference<BlobManagerData> bmData, KeyRan
|
|||
return bytesRead;
|
||||
}
|
||||
|
||||
// Check if there is any pending split. It's a precheck for manifest backup
|
||||
ACTOR Future<bool> hasPendingSplit(Reference<BlobManagerData> self) {
|
||||
state Transaction tr(self->db);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
RangeResult result = wait(tr.getRange(blobGranuleSplitKeys, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
for (auto& row : result) {
|
||||
std::pair<BlobGranuleSplitState, Version> gss = decodeBlobGranuleSplitValue(row.value);
|
||||
if (gss.first != BlobGranuleSplitState::Done) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: could eventually make this more thorough by storing some state in the DB or something
|
||||
// FIXME: simpler solution could be to shuffle ranges
|
||||
ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
|
||||
|
@ -5053,6 +5081,8 @@ ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
|
|||
if (BM_DEBUG) {
|
||||
fmt::print("BGCC starting\n");
|
||||
}
|
||||
if (isFullRestoreMode())
|
||||
wait(printRestoreSummary(bmData->db, bmData->bstore));
|
||||
|
||||
loop {
|
||||
if (g_network->isSimulated() && g_simulator->speedUpSimulation) {
|
||||
|
@ -5062,6 +5092,14 @@ ACTOR Future<Void> bgConsistencyCheck(Reference<BlobManagerData> bmData) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Only dump blob manifest when there is no pending split to ensure data consistency
|
||||
if (SERVER_KNOBS->BLOB_MANIFEST_BACKUP && !isFullRestoreMode()) {
|
||||
bool pendingSplit = wait(hasPendingSplit(bmData));
|
||||
if (!pendingSplit) {
|
||||
wait(dumpManifest(bmData->db, bmData->bstore));
|
||||
}
|
||||
}
|
||||
|
||||
if (bmData->workersById.size() >= 1) {
|
||||
int tries = 10;
|
||||
state KeyRange range;
|
||||
|
|
|
@ -0,0 +1,374 @@
|
|||
/*
|
||||
* BlobManifest.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/BlobConnectionProvider.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/BackupContainerFileSystem.h"
|
||||
#include "fdbclient/BlobGranuleReader.actor.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
#include "fmt/core.h"
|
||||
|
||||
//
|
||||
// This module offers routines to dump or load blob manifest file, which is used for full restore from granules
|
||||
//
|
||||
|
||||
static std::string MANIFEST_FILENAME = "manifest"; // Default manifest file name on external blob storage
|
||||
|
||||
#define ENABLE_DEBUG_PRINT true
|
||||
template <typename... T>
|
||||
inline void dprint(fmt::format_string<T...> fmt, T&&... args) {
|
||||
if (ENABLE_DEBUG_PRINT)
|
||||
fmt::print(fmt, std::forward<T>(args)...);
|
||||
}
|
||||
|
||||
// This class dumps blob manifest to external blob storage.
|
||||
class BlobManifestDumper : public ReferenceCounted<BlobManifestDumper> {
|
||||
public:
|
||||
BlobManifestDumper(Database& db, Reference<BlobConnectionProvider> blobConn) : db_(db), blobConn_(blobConn) {}
|
||||
virtual ~BlobManifestDumper() {}
|
||||
|
||||
// Execute the dumper
|
||||
ACTOR static Future<Void> execute(Reference<BlobManifestDumper> self) {
|
||||
try {
|
||||
state Standalone<BlobManifest> manifest;
|
||||
Standalone<VectorRef<KeyValueRef>> rows = wait(getSystemKeys(self));
|
||||
manifest.rows = rows;
|
||||
Value data = encode(manifest);
|
||||
wait(writeToFile(self, data));
|
||||
} catch (Error& e) {
|
||||
dprint("WARNING: unexpected blob manifest dumper error {}\n", e.what()); // skip error handling for now
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
private:
|
||||
// Return system keys that to be backed up
|
||||
ACTOR static Future<Standalone<VectorRef<KeyValueRef>>> getSystemKeys(Reference<BlobManifestDumper> self) {
|
||||
state Standalone<VectorRef<KeyValueRef>> rows;
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
state std::vector<KeyRangeRef> ranges = {
|
||||
blobGranuleMappingKeys, // Map granule to workers. Track the active granules
|
||||
blobGranuleFileKeys, // Map a granule version to granule files. Track files for a granule
|
||||
blobGranuleHistoryKeys, // Map granule to its parents and parent bundaries. for time-travel read
|
||||
blobRangeKeys // Key ranges managed by blob
|
||||
};
|
||||
for (auto range : ranges) {
|
||||
// todo use getRangeStream for better performance
|
||||
RangeResult result = wait(tr.getRange(range, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
for (auto& row : result) {
|
||||
rows.push_back_deep(rows.arena(), KeyValueRef(row.key, row.value));
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write data to blob manifest file
|
||||
ACTOR static Future<Void> writeToFile(Reference<BlobManifestDumper> self, Value data) {
|
||||
state Reference<BackupContainerFileSystem> writer;
|
||||
state std::string fileName;
|
||||
|
||||
std::tie(writer, fileName) = self->blobConn_->createForWrite(MANIFEST_FILENAME);
|
||||
state Reference<IBackupFile> file = wait(writer->writeFile(fileName));
|
||||
wait(file->append(data.begin(), data.size()));
|
||||
wait(file->finish());
|
||||
dprint("Write blob manifest file with {} bytes\n", data.size());
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Encode manifest as binary data
|
||||
static Value encode(BlobManifest& manifest) {
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
|
||||
wr << manifest;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
Database db_;
|
||||
Reference<BlobConnectionProvider> blobConn_;
|
||||
};
|
||||
|
||||
// Defines granule info that interests full restore
|
||||
struct BlobGranuleVersion {
|
||||
// Two constructors required by VectorRef
|
||||
BlobGranuleVersion() {}
|
||||
BlobGranuleVersion(Arena& a, const BlobGranuleVersion& copyFrom)
|
||||
: granuleID(copyFrom.granuleID), keyRange(a, copyFrom.keyRange), version(copyFrom.version),
|
||||
sizeInBytes(copyFrom.sizeInBytes) {}
|
||||
|
||||
UID granuleID;
|
||||
KeyRangeRef keyRange;
|
||||
Version version;
|
||||
int64_t sizeInBytes;
|
||||
};
|
||||
|
||||
// Defines a vector for BlobGranuleVersion
|
||||
typedef Standalone<VectorRef<BlobGranuleVersion>> BlobGranuleVersionVector;
|
||||
|
||||
// Defines filename, version, size for each granule file that interests full restore
|
||||
struct GranuleFileVersion {
|
||||
Version version;
|
||||
uint8_t fileType;
|
||||
std::string filename;
|
||||
int64_t sizeInBytes;
|
||||
};
|
||||
|
||||
// This class is to load blob manifest into system key space, which is part of for bare metal restore
|
||||
class BlobManifestLoader : public ReferenceCounted<BlobManifestLoader> {
|
||||
public:
|
||||
BlobManifestLoader(Database& db, Reference<BlobConnectionProvider> blobConn) : db_(db), blobConn_(blobConn) {}
|
||||
virtual ~BlobManifestLoader() {}
|
||||
|
||||
// Execute the loader
|
||||
ACTOR static Future<Void> execute(Reference<BlobManifestLoader> self) {
|
||||
try {
|
||||
Value data = wait(readFromFile(self));
|
||||
Standalone<BlobManifest> manifest = decode(data);
|
||||
wait(writeSystemKeys(self, manifest.rows));
|
||||
BlobGranuleVersionVector _ = wait(listGranules(self));
|
||||
} catch (Error& e) {
|
||||
dprint("WARNING: unexpected manifest loader error {}\n", e.what()); // skip error handling so far
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Print out a summary for blob granules
|
||||
ACTOR static Future<Void> print(Reference<BlobManifestLoader> self) {
|
||||
state BlobGranuleVersionVector granules = wait(listGranules(self));
|
||||
for (auto granule : granules) {
|
||||
wait(checkGranuleFiles(self, granule));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
private:
|
||||
// Read data from a manifest file
|
||||
ACTOR static Future<Value> readFromFile(Reference<BlobManifestLoader> self) {
|
||||
state Reference<BackupContainerFileSystem> readBstore = self->blobConn_->getForRead(MANIFEST_FILENAME);
|
||||
state Reference<IAsyncFile> reader = wait(readBstore->readFile(MANIFEST_FILENAME));
|
||||
state int64_t fileSize = wait(reader->size());
|
||||
state Arena arena;
|
||||
state uint8_t* data = new (arena) uint8_t[fileSize];
|
||||
int readSize = wait(reader->read(data, fileSize, 0));
|
||||
dprint("Blob manifest restoring {} bytes\n", readSize);
|
||||
StringRef ref = StringRef(data, readSize);
|
||||
return Value(ref, arena);
|
||||
}
|
||||
|
||||
// Decode blob manifest from binary data
|
||||
static Standalone<BlobManifest> decode(Value data) {
|
||||
Standalone<BlobManifest> manifest;
|
||||
BinaryReader binaryReader(data, IncludeVersion());
|
||||
binaryReader >> manifest;
|
||||
return manifest;
|
||||
}
|
||||
|
||||
// Write system keys to database
|
||||
ACTOR static Future<Void> writeSystemKeys(Reference<BlobManifestLoader> self, VectorRef<KeyValueRef> rows) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
for (auto& row : rows) {
|
||||
tr.set(row.key, row.value);
|
||||
}
|
||||
wait(tr.commit());
|
||||
dprint("Blob manifest loaded {} rows\n", rows.size());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate active granules and return their version/sizes
|
||||
ACTOR static Future<BlobGranuleVersionVector> listGranules(Reference<BlobManifestLoader> self) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
state BlobGranuleVersionVector results;
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
try {
|
||||
std::vector<KeyRangeRef> granules;
|
||||
state int i = 0;
|
||||
auto limit = GetRangeLimits::BYTE_LIMIT_UNLIMITED;
|
||||
state RangeResult blobRanges = wait(tr.getRange(blobGranuleMappingKeys, limit));
|
||||
for (i = 0; i < blobRanges.size() - 1; i++) {
|
||||
Key startKey = blobRanges[i].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key endKey = blobRanges[i + 1].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
state KeyRange granuleRange = KeyRangeRef(startKey, endKey);
|
||||
try {
|
||||
Standalone<BlobGranuleVersion> granule = wait(getGranule(&tr, granuleRange));
|
||||
results.push_back_deep(results.arena(), granule);
|
||||
} catch (Error& e) {
|
||||
dprint("missing data for key range {} \n", granuleRange.toString());
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Find the newest granule for a key range. The newest granule has the max version and relevant files
|
||||
ACTOR static Future<Standalone<BlobGranuleVersion>> getGranule(Transaction* tr, KeyRangeRef range) {
|
||||
state Standalone<BlobGranuleVersion> granuleVersion;
|
||||
KeyRange historyKeyRange = blobGranuleHistoryKeyRangeFor(range);
|
||||
// reverse lookup so that the first row is the newest version
|
||||
state RangeResult results =
|
||||
wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::False, Reverse::True));
|
||||
|
||||
for (KeyValueRef row : results) {
|
||||
state KeyRange keyRange;
|
||||
state Version version;
|
||||
std::tie(keyRange, version) = decodeBlobGranuleHistoryKey(row.key);
|
||||
Standalone<BlobGranuleHistoryValue> historyValue = decodeBlobGranuleHistoryValue(row.value);
|
||||
state UID granuleID = historyValue.granuleID;
|
||||
|
||||
std::vector<GranuleFileVersion> files = wait(listGranuleFiles(tr, granuleID));
|
||||
if (files.empty()) {
|
||||
dprint("Granule {} doesn't have files for version {}\n", granuleID.toString(), version);
|
||||
continue; // check previous version
|
||||
}
|
||||
|
||||
granuleVersion.keyRange = KeyRangeRef(granuleVersion.arena(), keyRange);
|
||||
granuleVersion.granuleID = granuleID;
|
||||
granuleVersion.version = files.back().version;
|
||||
granuleVersion.sizeInBytes = granuleSizeInBytes(files);
|
||||
|
||||
dprint("Granule {}: \n", granuleVersion.granuleID.toString());
|
||||
dprint(" {} {} {}\n", keyRange.toString(), granuleVersion.version, granuleVersion.sizeInBytes);
|
||||
for (auto& file : files) {
|
||||
dprint(" File {}: {} bytes\n", file.filename, file.sizeInBytes);
|
||||
}
|
||||
return granuleVersion;
|
||||
}
|
||||
throw restore_missing_data(); // todo a better error code
|
||||
}
|
||||
|
||||
// Return sum of last snapshot file size and delta files afterwards
|
||||
static int64_t granuleSizeInBytes(std::vector<GranuleFileVersion> files) {
|
||||
int64_t totalSize = 0;
|
||||
for (auto it = files.rbegin(); it < files.rend(); ++it) {
|
||||
totalSize += it->sizeInBytes;
|
||||
if (it->fileType == BG_FILE_TYPE_SNAPSHOT)
|
||||
break;
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
// List all files for given granule
|
||||
ACTOR static Future<std::vector<GranuleFileVersion>> listGranuleFiles(Transaction* tr, UID granuleID) {
|
||||
state KeyRange fileKeyRange = blobGranuleFileKeyRangeFor(granuleID);
|
||||
RangeResult results = wait(tr->getRange(fileKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
|
||||
std::vector<GranuleFileVersion> files;
|
||||
for (auto& row : results) {
|
||||
UID gid;
|
||||
Version version;
|
||||
uint8_t fileType;
|
||||
Standalone<StringRef> filename;
|
||||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key);
|
||||
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = decodeBlobGranuleFileValue(row.value);
|
||||
GranuleFileVersion vs = { version, fileType, filename.toString(), length };
|
||||
files.push_back(vs);
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
||||
// Read data from granules and print out summary
|
||||
ACTOR static Future<Void> checkGranuleFiles(Reference<BlobManifestLoader> self, BlobGranuleVersion granule) {
|
||||
state KeyRangeRef range = granule.keyRange;
|
||||
state Version readVersion = granule.version;
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks =
|
||||
wait(tr.readBlobGranules(range, 0, readVersion));
|
||||
state int count = 0;
|
||||
for (const BlobGranuleChunkRef& chunk : chunks) {
|
||||
RangeResult rows = wait(readBlobGranule(chunk, range, 0, readVersion, self->blobConn_));
|
||||
count += rows.size();
|
||||
}
|
||||
|
||||
dprint("Restorable blob granule {} @ {}\n", granule.granuleID.toString(), readVersion);
|
||||
dprint(" Range: {}\n", range.toString());
|
||||
dprint(" Keys : {}\n", count);
|
||||
dprint(" Size : {} bytes\n", granule.sizeInBytes);
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Database db_;
|
||||
Reference<BlobConnectionProvider> blobConn_;
|
||||
};
|
||||
|
||||
// API to dump a manifest copy to external storage
|
||||
ACTOR Future<Void> dumpManifest(Database db, Reference<BlobConnectionProvider> blobConn) {
|
||||
Reference<BlobManifestDumper> dumper = makeReference<BlobManifestDumper>(db, blobConn);
|
||||
wait(BlobManifestDumper::execute(dumper));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// API to load manifest from external blob storage
|
||||
ACTOR Future<Void> loadManifest(Database db, Reference<BlobConnectionProvider> blobConn) {
|
||||
Reference<BlobManifestLoader> loader = makeReference<BlobManifestLoader>(db, blobConn);
|
||||
wait(BlobManifestLoader::execute(loader));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// API to print summary for restorable granules
|
||||
ACTOR Future<Void> printRestoreSummary(Database db, Reference<BlobConnectionProvider> blobConn) {
|
||||
Reference<BlobManifestLoader> loader = makeReference<BlobManifestLoader>(db, blobConn);
|
||||
wait(BlobManifestLoader::print(loader));
|
||||
return Void();
|
||||
}
|
|
@ -1973,6 +1973,10 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
metadata->historyVersion = startState.history.present() ? startState.history.get().version : startVersion;
|
||||
}
|
||||
|
||||
// No need to start Change Feed in full restore mode
|
||||
if (isFullRestoreMode())
|
||||
return Void();
|
||||
|
||||
checkMergeCandidate = granuleCheckMergeCandidate(bwData,
|
||||
metadata,
|
||||
startState.granuleID,
|
||||
|
@ -3397,10 +3401,12 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
}
|
||||
state Reference<GranuleMetadata> metadata = m;
|
||||
state Version granuleBeginVersion = req.beginVersion;
|
||||
|
||||
choose {
|
||||
when(wait(metadata->readable.getFuture())) {}
|
||||
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
|
||||
// skip waiting for CF ready for recovery mode
|
||||
if (!isFullRestoreMode()) {
|
||||
choose {
|
||||
when(wait(metadata->readable.getFuture())) {}
|
||||
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
|
||||
}
|
||||
}
|
||||
|
||||
// in case both readable and cancelled are ready, check cancelled
|
||||
|
@ -3453,6 +3459,10 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
CODE_PROBE(true, "Granule Active Read");
|
||||
// this is an active granule query
|
||||
loop {
|
||||
// skip check since CF doesn't start for bare metal recovery mode
|
||||
if (isFullRestoreMode()) {
|
||||
break;
|
||||
}
|
||||
if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) {
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
|
@ -3493,12 +3503,14 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
// if feed was popped by another worker and BW only got empty versions, it wouldn't itself see that it
|
||||
// got popped, but we can still reject the in theory this should never happen with other protections but
|
||||
// it's a useful and inexpensive sanity check
|
||||
Version emptyVersion = metadata->activeCFData.get()->popVersion - 1;
|
||||
if (req.readVersion > metadata->durableDeltaVersion.get() &&
|
||||
emptyVersion > metadata->bufferedDeltaVersion) {
|
||||
CODE_PROBE(true, "feed popped for read but granule updater didn't notice yet");
|
||||
// FIXME: could try to cancel the actor here somehow, but it should find out eventually
|
||||
throw wrong_shard_server();
|
||||
if (!isFullRestoreMode()) {
|
||||
Version emptyVersion = metadata->activeCFData.get()->popVersion - 1;
|
||||
if (req.readVersion > metadata->durableDeltaVersion.get() &&
|
||||
emptyVersion > metadata->bufferedDeltaVersion) {
|
||||
CODE_PROBE(true, "feed popped for read but granule updater didn't notice yet");
|
||||
// FIXME: could try to cancel the actor here somehow, but it should find out eventually
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
}
|
||||
rangeGranulePair.push_back(std::pair(metadata->keyRange, metadata->files));
|
||||
}
|
||||
|
@ -3795,7 +3807,6 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
std::tuple<int64_t, int64_t, UID> prevOwner = decodeBlobGranuleLockValue(prevLockValue.get());
|
||||
|
||||
info.granuleID = std::get<2>(prevOwner);
|
||||
|
||||
state bool doLockCheck = true;
|
||||
// if it's the first snapshot of a new granule, history won't be present
|
||||
if (info.history.present()) {
|
||||
|
@ -3859,9 +3870,28 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
// if this granule is not derived from a split or merge, use new granule id
|
||||
info.granuleID = newGranuleID;
|
||||
}
|
||||
createChangeFeed = true;
|
||||
info.doSnapshot = true;
|
||||
info.previousDurableVersion = invalidVersion;
|
||||
|
||||
// for recovery mode - don't create change feed, don't create snapshot
|
||||
if (isFullRestoreMode()) {
|
||||
createChangeFeed = false;
|
||||
info.doSnapshot = false;
|
||||
GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, info.granuleID));
|
||||
info.existingFiles = granuleFiles;
|
||||
|
||||
if (info.existingFiles.get().snapshotFiles.empty()) {
|
||||
ASSERT(info.existingFiles.get().deltaFiles.empty());
|
||||
info.previousDurableVersion = invalidVersion;
|
||||
} else if (info.existingFiles.get().deltaFiles.empty()) {
|
||||
info.previousDurableVersion = info.existingFiles.get().snapshotFiles.back().version;
|
||||
} else {
|
||||
info.previousDurableVersion = info.existingFiles.get().deltaFiles.back().version;
|
||||
}
|
||||
info.changeFeedStartVersion = info.previousDurableVersion;
|
||||
} else {
|
||||
createChangeFeed = true;
|
||||
info.doSnapshot = true;
|
||||
info.previousDurableVersion = invalidVersion;
|
||||
}
|
||||
}
|
||||
|
||||
if (createChangeFeed) {
|
||||
|
@ -3876,7 +3906,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
|
|||
// If anything in previousGranules, need to do the handoff logic and set
|
||||
// ret.previousChangeFeedId, and the previous durable version will come from the previous
|
||||
// granules
|
||||
if (info.history.present() && info.history.get().value.parentVersions.size() > 0) {
|
||||
if (info.history.present() && info.history.get().value.parentVersions.size() > 0 && !isFullRestoreMode()) {
|
||||
CODE_PROBE(true, "Granule open found parent");
|
||||
if (info.history.get().value.parentVersions.size() == 1) { // split
|
||||
state KeyRangeRef parentRange(info.history.get().value.parentBoundaries[0],
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
#include "fdbclient/Tenant.h"
|
||||
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
@ -145,6 +146,13 @@ private:
|
|||
Future<Void> collection;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> dumpManifest(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<Void> loadManifest(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
ACTOR Future<Void> printRestoreSummary(Database db, Reference<BlobConnectionProvider> blobConn);
|
||||
inline bool isFullRestoreMode() {
|
||||
return SERVER_KNOBS->BLOB_FULL_RESTORE_MODE;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue