Merge pull request #2886 from xumengpanda/mengxu/fr-write-during-read-PR

Performant restore[25/xx]: Add new tests for parallel restore
This commit is contained in:
Jingyu Zhou 2020-04-02 12:03:03 -07:00 committed by GitHub
commit fe7c5d6bec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 299 additions and 31 deletions

View File

@ -48,7 +48,8 @@ static const char* typeString[] = { "SetValue",
"ByteMax",
"MinV2",
"AndV2",
"CompareAndClear"};
"CompareAndClear",
"MAX_ATOMIC_OP" };
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries

View File

@ -78,16 +78,16 @@ public:
Range range() { return Range(begin(),end()); }
Val& value() {
Val& value() {
//ASSERT( it->key != allKeys.end );
return it->value;
return it->value;
}
void operator ++() { ++it; }
void operator --() { it.decrementNonEnd(); }
bool operator ==(Iterator const& r) const { return it == r.it; }
bool operator !=(Iterator const& r) const { return it != r.it; }
// operator* and -> return this
Iterator& operator*() { return *this; }
Iterator* operator->() { return this; }
@ -131,10 +131,10 @@ public:
--i;
return i;
}
Iterator lastItem() {
Iterator lastItem() {
auto i = map.lastItem();
i.decrementNonEnd();
return Iterator(i);
return Iterator(i);
}
int size() const { return map.size() - 1; } // We always have one range bounded by two entries
Iterator randomRange() {

View File

@ -220,10 +220,12 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
wait(waitForAll(fValues));
break;
} catch (Error& e) {
retries++;
TraceEvent(retries > 10 ? SevError : SevWarn, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("GetKeys", incompleteStagingKeys.size())
.error(e);
if (retries++ > 10) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysGetKeysStuck")
.detail("GetKeys", incompleteStagingKeys.size())
.error(e);
}
wait(tr->onError(e));
fValues.clear();
}
@ -233,17 +235,17 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
int i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].get().present()) {
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
.detail("Key", key.first)
.detail("Reason", "Not found in DB")
.detail("PendingMutations", key.second->second.pendingMutations.size())
.detail("StagingKeyType", (int)key.second->second.type);
for (auto& vm : key.second->second.pendingMutations) {
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
.detail("PendingMutationVersion", vm.first.toString())
.detail("PendingMutation", vm.second.toString());
}
key.second->second.precomputeResult();
key.second->second.precomputeResult("GetAndComputeStagingKeysNoBaseValueInDB");
i++;
continue;
} else {
@ -251,7 +253,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
key.second->second.add(m, LogMessageVersion(1));
key.second->second.precomputeResult();
key.second->second.precomputeResult("GetAndComputeStagingKeys");
i++;
}
}
@ -296,9 +298,16 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
.detail("ClearRanges", batchData->stagingKeyRanges.size());
for (auto& rangeMutation : batchData->stagingKeyRanges) {
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2);
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param2);
while (lb != ub) {
lb->second.add(rangeMutation.mutation, rangeMutation.version);
if (lb->first >= rangeMutation.mutation.param2) {
TraceEvent(SevError, "FastRestoreApplerPhasePrecomputeMutationsResult_IncorrectUpperBound")
.detail("Key", lb->first)
.detail("ClearRangeUpperBound", rangeMutation.mutation.param2)
.detail("UsedUpperBound", ub->first);
}
MutationRef clearKey(MutationRef::ClearRange, lb->first, lb->first);
lb->second.add(clearKey, rangeMutation.version);
lb++;
}
}
@ -338,7 +347,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
stagingKeyIter++) {
if (stagingKeyIter->second.hasBaseValue()) {
stagingKeyIter->second.precomputeResult();
stagingKeyIter->second.precomputeResult("HasBaseValue");
}
}

View File

@ -79,7 +79,20 @@ struct StagingKey {
// newVersion can be smaller than version as different loaders can send
// mutations out of order.
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
if (m.type == MutationRef::ClearRange) {
// We should only clear this key! Otherwise, it causes side effect to other keys
ASSERT(m.param1 == m.param2);
}
if (version < newVersion) {
if (debugMutation("StagingKeyAdd", newVersion.version, m)) {
TraceEvent("StagingKeyAdd")
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("MType", typeString[(int)type])
.detail("Key", key)
.detail("Val", val)
.detail("NewMutation", m.toString());
}
key = m.param1;
val = m.param2;
type = (MutationRef::Type)m.type;
@ -91,6 +104,7 @@ struct StagingKey {
pendingMutations.emplace(newVersion, m);
} else {
// Duplicated mutation ignored.
// TODO: Add SevError here
TraceEvent("SameVersion")
.detail("Version", version.toString())
.detail("Mutation", m.toString())
@ -102,16 +116,21 @@ struct StagingKey {
// Precompute the final value of the key.
// TODO: Look at the last LogMessageVersion, if it set or clear, we can ignore the rest of versions.
void precomputeResult() {
void precomputeResult(const char* context) {
// TODO: Change typeString[(int)type] to a safe function that validate type range
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
.detail("Key", key)
.detail("Context", context)
.detail("Version", version.toString())
.detail("Key", key)
.detail("Value", val)
.detail("MType", type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "[Unset]")
.detail("LargestPendingVersion",
(pendingMutations.empty() ? "[none]" : pendingMutations.rbegin()->first.toString()));
std::map<LogMessageVersion, Standalone<MutationRef>>::iterator lb = pendingMutations.lower_bound(version);
if (lb == pendingMutations.end()) {
return;
}
ASSERT(!pendingMutations.empty());
if (lb->first == version) {
// Sanity check mutations at version are either atomicOps which can be ignored or the same value as buffered
MutationRef m = lb->second;
@ -130,7 +149,11 @@ struct StagingKey {
MutationRef mutation = lb->second;
if (type == MutationRef::CompareAndClear) { // Special atomicOp
Arena arena;
Optional<ValueRef> retVal = doCompareAndClear(val, mutation.param2, arena);
Optional<StringRef> inputVal;
if (hasBaseValue()) {
inputVal = val;
}
Optional<ValueRef> retVal = doCompareAndClear(inputVal, mutation.param2, arena);
if (!retVal.present()) {
val = key;
type = MutationRef::ClearRange;
@ -152,6 +175,7 @@ struct StagingKey {
.detail("MutationType", typeString[mutation.type])
.detail("Version", lb->first.toString());
}
ASSERT(lb->first > version);
version = lb->first;
}
}

View File

@ -21,6 +21,7 @@
// This file implements the functions and actors used by the RestoreLoader role.
// The RestoreLoader role starts with the restoreLoaderCore actor
#include "flow/UnitTest.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
@ -201,7 +202,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
mutation.param2 = mutation.param2 < asset.range.end ? mutation.param2 : asset.range.end;
}
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
TraceEvent(SevFRMutationInfo, "FastRestoreDecodePartitionedLogFile")
.detail("CommitVersion", msgVersion.toString())
.detail("ParsedMutation", mutation.toString());
it->second.push_back_deep(it->second.arena(), mutation);
@ -586,8 +587,12 @@ void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mv
curm.param2 = itlow->first;
}
ASSERT(curm.param1 <= curm.param2);
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
// itup > m.param2: (itup-1) may be out of mutation m's range
// Ensure the added mutations have overlap with mutation m
if (m.param1 < curm.param2 && m.param2 > curm.param1) {
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
}
}
}
@ -716,7 +721,7 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
cc->sampledLogBytes += mutation.totalSize();
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeLogFile")
.detail("CommitVersion", commitVersion)
.detail("ParsedMutation", mutation.toString());
@ -805,7 +810,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// We cache all kv operations into kvOps, and apply all kv operations later in one place
auto it = kvOps.insert(std::make_pair(msgVersion, MutationsVec()));
TraceEvent(SevFRMutationInfo, "FastRestore_VerboseDebug")
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeRangeFile")
.detail("CommitVersion", version)
.detail("ParsedMutationKV", m.toString());
@ -882,3 +887,99 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
req.reply.send(RestoreCommonReply(self->id(), false));
return Void();
}
// Test splitMutation
TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
std::map<Key, UID> rangeToApplier;
MutationsVec mvector;
Standalone<VectorRef<UID>> nodeIDs;
// Prepare RangeToApplier
rangeToApplier.emplace(normalKeys.begin, deterministicRandom()->randomUniqueID());
int numAppliers = deterministicRandom()->randomInt(1, 50);
for (int i = 0; i < numAppliers; ++i) {
Key k = Key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(1, 1000)));
UID node = deterministicRandom()->randomUniqueID();
rangeToApplier.emplace(k, node);
TraceEvent("RangeToApplier").detail("Key", k).detail("Node", node);
}
Key k1 = Key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(1, 500)));
Key k2 = Key(deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(1, 1000)));
Key beginK = k1 < k2 ? k1 : k2;
Key endK = k1 < k2 ? k2 : k1;
Standalone<MutationRef> mutation(MutationRef(MutationRef::ClearRange, beginK.contents(), endK.contents()));
// Method 1: Use splitMutation
splitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
// Method 2: Use intersection
KeyRangeMap<UID> krMap;
std::map<Key, UID>::iterator beginKey = rangeToApplier.begin();
std::map<Key, UID>::iterator endKey = std::next(beginKey, 1);
while (endKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", endKey->first)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second);
beginKey = endKey;
endKey++;
}
if (beginKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", normalKeys.end)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second);
}
int splitMutationIndex = 0;
auto r = krMap.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2));
bool correctResult = true;
for (auto i = r.begin(); i != r.end(); ++i) {
// intersectionRange result
// Calculate the overlap range
KeyRef rangeBegin = mutation.param1 > i->range().begin ? mutation.param1 : i->range().begin;
KeyRef rangeEnd = mutation.param2 < i->range().end ? mutation.param2 : i->range().end;
KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd));
UID nodeID = i->value();
// splitMuation result
if (splitMutationIndex >= mvector.size()) {
correctResult = false;
break;
}
MutationRef result2M = mvector[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
KeyRange krange2(KeyRangeRef(result2M.param1, result2M.param2));
TraceEvent("Result")
.detail("KeyRange1", krange1.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
if (krange1 != krange2 || nodeID != applierID) {
correctResult = false;
TraceEvent(SevError, "IncorrectResult")
.detail("Mutation", mutation.toString())
.detail("KeyRange1", krange1.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
}
splitMutationIndex++;
}
if (splitMutationIndex != mvector.size()) {
correctResult = false;
TraceEvent(SevError, "SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Results", mvector.size());
for (; splitMutationIndex < mvector.size(); splitMutationIndex++) {
TraceEvent("SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Result", mvector[splitMutationIndex].toString());
}
}
return Void();
}

View File

@ -73,7 +73,7 @@ struct WriteDuringReadWorkload : TestWorkload {
nodes = newNodes;
TEST(adjacentKeys && (nodes + minNode) > CLIENT_KNOBS->KEY_SIZE_LIMIT); //WriteDuringReadWorkload testing large keys
useExtraDB = g_simulator.extraDB != NULL;
if(useExtraDB) {
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));

View File

@ -496,7 +496,7 @@ public:
return substr( 0, size() - s.size() );
}
std::string toString() const { return std::string( (const char*)data, length ); }
std::string toString() const { return std::string((const char*)data, length); }
static bool isPrintable(char c) { return c > 32 && c < 127; }
inline std::string printable() const;

View File

@ -209,12 +209,16 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/WriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/WriteDuringReadSwitchover.txt)
add_fdb_test(TEST_FILES slow/ddbalance.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOp.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupCorrectnessAtomicOp.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreNewBackupWriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessAtomicOp.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupApiCorrectnessAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreTestSplitMutation.txt)
# Note that status tests are not deterministic.
add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt)
add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt)

View File

@ -27,7 +27,7 @@ restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=false
usePartitionedLogs=true
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier

View File

@ -0,0 +1,43 @@
testTitle=WriteDuringReadTest
testName=WriteDuringRead
maximumTotalData=1000000
testDuration=240.0
slowModeStart=60.0
minNode=1
useSystemKeys=false
testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=true
testName=RandomClogging
testDuration=60.0
testName=Rollback
meanDelay=60.0
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
StderrSeverity=30
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,37 @@
testTitle=ApiCorrectnessTest
testName=ApiCorrectness
runSetup=true
clearAfterTest=true
numKeys=5000
onlyLowerCase=true
shortKeysRatio=0.5
minShortKeyLength=1
maxShortKeyLength=3
minLongKeyLength=1
maxLongKeyLength=128
minValueLength=1
maxValueLength=1000
numGets=1000
numGetRanges=100
numGetRangeSelectors=100
numGetKeys=100
numClears=100
numClearRanges=10
maxTransactionBytes=500000
randomTestDuration=60
timeout=2100
testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=false
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,43 @@
testTitle=WriteDuringReadTest
testName=WriteDuringRead
maximumTotalData=1000000
testDuration=240.0
slowModeStart=60.0
minNode=1
useSystemKeys=false
testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
usePartitionedLogs=false
testName=RandomClogging
testDuration=60.0
testName=Rollback
meanDelay=60.0
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=60.0
StderrSeverity=30
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
;timeout is in seconds
timeout=360000

View File

@ -0,0 +1,6 @@
testTitle=UnitTests
testName=UnitTests
startDelay=0
useDB=false
maxTestCases=0
testsMatching=/FastRestore/RestoreLoader/splitMutation