Merge branch 'mengxu/fast-restore-fix-valgrind-PR' into mengxu/fast-restore-sampling-PR

This commit is contained in:
Meng Xu 2019-12-03 12:58:11 -08:00
commit 3310f67e9e
8 changed files with 57 additions and 50 deletions

View File

@ -216,6 +216,8 @@ struct LoadingParam {
Key removePrefix;
Key mutationLogPrefix;
LoadingParam() = default;
// TODO: Compare all fields for loadingParam
bool operator==(const LoadingParam& r) const { return isRangeFile == r.isRangeFile && filename == r.filename; }
bool operator!=(const LoadingParam& r) const { return isRangeFile != r.isRangeFile || filename != r.filename; }

View File

@ -97,6 +97,9 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVec
// Assume: self->processedFileState[req.fileIndex] will not be erased while the actor is active.
// Note: Insert new items into processedFileState will not invalidate the reference.
state NotifiedVersion& curFilePos = self->processedFileState[req.fileIndex];
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state Version commitVersion = req.version;
state int mIndex = 0;
TraceEvent("FastRestore")
.detail("ApplierNode", self->id())
@ -107,13 +110,11 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVec
wait(curFilePos.whenAtLeast(req.prevVersion));
if (curFilePos.get() == req.prevVersion) {
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state Version commitVersion = req.version;
VectorRef<MutationRef> mutations(req.mutations);
if (self->kvOps.find(commitVersion) == self->kvOps.end()) {
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
}
state int mIndex = 0;
for (mIndex = 0; mIndex < mutations.size(); mIndex++) {
MutationRef mutation = mutations[mIndex];
TraceEvent(SevDebug, "FastRestore")
@ -161,7 +162,7 @@ struct DBApplyProgress {
Reference<RestoreApplierData> self;
DBApplyProgress() = default;
DBApplyProgress(Reference<RestoreApplierData> self)
explicit DBApplyProgress(Reference<RestoreApplierData> self)
: self(self), curIndexInCurTxn(0), startIndexInUncommittedTxn(0), curTxnId(0), uncommittedTxnId(0),
lastTxnHasError(false), startNextVersion(false), numAtomicOps(0), transactionSize(0) {
curItInCurTxn = self->kvOps.begin();
@ -245,7 +246,10 @@ struct DBApplyProgress {
};
ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
state std::string typeStr = "";
// state variables must be defined at the start of actor, otherwise it will not be initialized when the actor is created
state std::string typeStr = "";
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state DBApplyProgress progress(self);
// Assume the process will not crash when it apply mutations to DB. The reply message can be lost though
if (self->kvOps.empty()) {
@ -262,8 +266,6 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
self->sanityCheckMutationOps();
state DBApplyProgress progress(self);
if (progress.isDone()) {
TraceEvent("FastRestore_ApplierTxn")
.detail("ApplierApplyToDBFinished", self->id())
@ -271,7 +273,6 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
return Void();
}
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
// Sanity check the restoreApplierKeys, which should be empty at this point
loop {
try {
@ -399,8 +400,6 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
TraceEvent("FastRestore_ApplierTxn")
.detail("ApplierApplyToDBFinished", self->id())
.detail("CleanupCurTxnIds", progress.curTxnId);
// House cleaning
self->kvOps.clear();
// clean up txn ids
loop {
try {
@ -416,6 +415,8 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
wait(tr->onError(e));
}
}
// House cleaning
self->kvOps.clear();
TraceEvent("FastRestore_ApplierTxn").detail("ApplierApplyToDBFinished", self->id());
return Void();

View File

@ -148,9 +148,9 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<RestoreLoa
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state int64_t j;
state int64_t readOffset;
state int64_t readLen;
int64_t j;
int64_t readOffset;
int64_t readLen;
for (j = param.offset; j < param.length; j += param.blockSize) {
readOffset = j;
readLen = std::min<int64_t>(param.blockSize, param.length - j);
@ -197,13 +197,8 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
if (self->rangeToApplier.empty()) {
self->rangeToApplier = req.rangeToApplier;
} else {
ASSERT(self->rangeToApplier == req.rangeToApplier);
}
self->rangeToApplier = req.rangeToApplier;
// Send mutations from log files first to ensure log mutation at the same version is before the range kv
state std::map<LoadingParam, VersionedMutationsMap>::iterator item = self->kvOpsPerLP.begin();
for (; item != self->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {
@ -224,9 +219,13 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
state VersionedMutationsMap& kvOps = *pkvOps;
state int kvCount = 0;
state int splitMutationIndex = 0;
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::vector<std::pair<UID, RestoreSendMutationVectorVersionedRequest>> requests;
state Version prevVersion = startVersion;
TraceEvent("FastRestore")
.detail("SendMutationToApplier", self->id())
TraceEvent("FastRestore_SendMutationToApplier")
.detail("Loader", self->id())
.detail("IsRangeFile", isRangeFile)
.detail("StartVersion", startVersion)
.detail("EndVersion", endVersion)
@ -243,14 +242,10 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
state std::map<UID, double> applierMutationsSize;
state Standalone<VectorRef<MutationRef>> mvector;
state Standalone<VectorRef<UID>> nodeIDs;
// Initialize the above two maps
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::vector<std::pair<UID, RestoreSendMutationVectorVersionedRequest>> requests;
state Version prevVersion = startVersion;
splitMutationIndex = 0;
kvCount = 0;
state VersionedMutationsMap::iterator kvOp;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
applierMutationsBuffer.clear();
@ -260,7 +255,7 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
applierMutationsSize[applierID] = 0.0;
}
state Version commitVersion = kvOp->first;
state int mIndex;
state int mIndex = 0;
state MutationRef kvm;
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
kvm = kvOp->second[mIndex];
@ -285,7 +280,7 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
kvCount++;
}
} else { // mutation operates on a particular key
std::map<Standalone<KeyRef>, UID>::iterator itlow = self->rangeToApplier.upper_bound(kvm.param1);
std::map<Key, UID>::iterator itlow = self->rangeToApplier.upper_bound(kvm.param1);
--itlow; // make sure itlow->first <= m.param1
ASSERT(itlow->first <= kvm.param1);
MutationRef mutation = kvm;
@ -507,12 +502,12 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
state Standalone<VectorRef<KeyValueRef>> blockData =
Standalone<VectorRef<KeyValueRef>> blockData =
wait(parallelFileRestore::decodeRangeFileBlock(inFile, readOffset, readLen));
TraceEvent("FastRestore").detail("DecodedRangeFile", fileName).detail("DataSize", blockData.contents().size());
// First and last key are the range for this file
state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
// If fileRange doesn't intersect restore range then we're done.
if (!fileRange.intersects(restoreRange)) {
@ -537,9 +532,9 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
}
// Now data only contains the kv mutation within restoreRange
state VectorRef<KeyValueRef> data = blockData.slice(rangeStart, rangeEnd);
state int start = 0;
state int end = data.size();
VectorRef<KeyValueRef> data = blockData.slice(rangeStart, rangeEnd);
int start = 0;
int end = data.size();
// Convert KV in data into mutations in kvOps
for (int i = start; i < end; ++i) {
@ -577,7 +572,7 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
std::string fileName, int64_t readOffset, int64_t readLen,
KeyRange restoreRange, Key addPrefix, Key removePrefix,
Key mutationLogPrefix) {
state Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
Reference<IAsyncFile> inFile = wait(bc->readFile(fileName));
// decodeLogFileBlock() must read block by block!
state Standalone<VectorRef<KeyValueRef>> data =
wait(parallelFileRestore::decodeLogFileBlock(inFile, readOffset, readLen));
@ -591,9 +586,9 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
wait(pProcessedFileOffset->whenAtLeast(readOffset));
if (pProcessedFileOffset->get() == readOffset) {
state int start = 0;
state int end = data.size();
state int numConcatenated = 0;
int start = 0;
int end = data.size();
int numConcatenated = 0;
for (int i = start; i < end; ++i) {
// Key k = data[i].key.withPrefix(mutationLogPrefix);
// ValueRef v = data[i].value;

View File

@ -47,8 +47,8 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
std::map<LoadingParam, VersionedMutationsMap> kvOpsPerLP; // Buffered kvOps for each loading param
// rangeToApplier is in master and loader. Loader uses this to determine which applier a mutation should be sent
// KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Standalone<KeyRef>, UID> rangeToApplier;
// Key is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Key, UID> rangeToApplier;
// Sampled mutations to be sent back to restore master
std::map<LoadingParam, Standalone<VectorRef<MutationRef>>> sampleMutations;

View File

@ -304,8 +304,9 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<RestoreMasterData> self,
state std::vector<RestoreLoadFileReply> replies;
// Wait on the batch of load files or log files
wait(getBatchReplies(&RestoreLoaderInterface::loadFile, self->loadersInterf, requests, &replies));
TraceEvent("FastRestore").detail("SamplingReplies", replies.size());
TraceEvent("FastRestore").detail("VersionBatch", self->batchIndex).detail("SamplingReplies", replies.size());
for (auto& reply : replies) {
TraceEvent("FastRestore").detail("VersionBatch", self->batchIndex).detail("SamplingReplies", reply.toString());
for (int i = 0; i < reply.samples.size(); ++i) {
MutationRef mutation = reply.samples[i];
self->samples.addMetric(mutation.param1, mutation.totalSize());
@ -376,7 +377,7 @@ void splitKeyRangeForAppliers(Reference<RestoreMasterData> self) {
} else if (keyrangeSplitter.size() > numAppliers) {
TraceEvent(SevError, "FastRestore").detail("TooManySlotsThanAppliers", keyrangeSplitter.size()).detail("NumAppliers", numAppliers);
}
std::sort(keyrangeSplitter.begin(), keyrangeSplitter.end());
//std::sort(keyrangeSplitter.begin(), keyrangeSplitter.end());
int i = 0;
self->rangeToApplier.clear();
for (auto& applier : self->appliersInterf) {

View File

@ -219,6 +219,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
ACTOR static Future<Void> checkDB(Database cx, std::string when,
BackupAndParallelRestoreCorrectnessWorkload* self) {
wait(delay(1.0)); // Simply avoid compiler warning
return Void();
// state Key keyPrefix = LiteralStringRef("");
@ -496,6 +497,12 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state FileBackupAgent backupAgent;
state Future<Void> extraBackup;
state bool extraTasks = false;
state ReadYourWritesTransaction tr1(cx);
state ReadYourWritesTransaction tr2(cx);
state UID randomID = nondeterministicRandom()->randomUniqueID();
state int restoreIndex = 0;
state bool restoreDone = false;
TraceEvent("BARW_Arguments")
.detail("BackupTag", printable(self->backupTag))
.detail("PerformRestore", self->performRestore)
@ -504,7 +511,6 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("AbortAndRestartAfter", self->abortAndRestartAfter)
.detail("DifferentialAfter", self->stopDifferentialAfter);
state UID randomID = nondeterministicRandom()->randomUniqueID();
if (self->allowPauses && BUGGIFY) {
state Future<Void> cp = changePaused(cx, &backupAgent);
}
@ -614,12 +620,13 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state std::vector<Future<Version>> restores;
state std::vector<Standalone<StringRef>> restoreTags;
state int restoreIndex;
// state int restoreIndex = 0;
// Restore each range by calling backupAgent.restore()
printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size());
state Transaction tr1(cx);
// state Transaction tr1(cx);
loop {
tr1.reset();
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr1.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
@ -671,8 +678,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// We should wait on all restore before proceeds
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
state bool restoreDone = false;
state ReadYourWritesTransaction tr2(cx);
restoreDone = false;
// state ReadYourWritesTransaction tr2(cx);
state Future<Void> watchForRestoreRequestDone;
loop {
try {

View File

@ -3,9 +3,9 @@ testTitle=BackupAndParallelRestoreWithAtomicOp
nodeCount=30000
; Make ops space only 1 key per group
; nodeCount=100
transactionsPerSecond=2500.0
; transactionsPerSecond=2500.0
; transactionsPerSecond=500.0
; transactionsPerSecond=100.0
transactionsPerSecond=100.0
; nodeCount=4
; transactionsPerSecond=250.0
testDuration=30.0

View File

@ -2,7 +2,8 @@ testTitle=BackupAndRestore
testName=Cycle
; nodeCount=30000
nodeCount=1000
transactionsPerSecond=500.0
; transactionsPerSecond=500.0
transactionsPerSecond=50.0
; transactionsPerSecond=2500.0
testDuration=30.0
expectedRate=0