Merge pull request #2665 from xumengpanda/mengxu/fast-restore-applier-multi-applying-PR

Performant restore [17/xx]: Improve performance of restore appliers
This commit is contained in:
Jingyu Zhou 2020-02-20 15:33:21 -08:00 committed by GitHub
commit 92c38f49bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 507 additions and 307 deletions

View File

@ -552,7 +552,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; }
// clang-format on

View File

@ -495,6 +495,7 @@ public:
int64_t FASTRESTORE_ROLE_LOGGING_DELAY;
int64_t FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL; // How quickly to update process metrics for restore
int64_t FASTRESTORE_ATOMICOP_WEIGHT; // workload amplication factor for atomic op
int64_t FASTRESTORE_APPLYING_PARALLELISM; // number of outstanding txns writing to dest. DB
int64_t FASTRESTORE_MONITOR_LEADER_DELAY;
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);

View File

@ -108,7 +108,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Note: Insert new items into processedFileState will not invalidate the reference.
state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset];
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("ProcessedFileVersion", curFilePos.get())
@ -120,20 +120,17 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
if (curFilePos.get() == req.prevVersion) {
isDuplicated = false;
Version commitVersion = req.version;
uint16_t numVersionStampedKV = 0;
MutationsVec mutations(req.mutations);
// Sanity check: mutations in range file is in [beginVersion, endVersion);
// mutations in log file is in [beginVersion, endVersion], both inclusive.
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion);
// Loader sends the endVersion to ensure all useful versions are sent
ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) ||
(!req.isRangeFile && commitVersion <= req.asset.endVersion));
ASSERT_WE_THINK(commitVersion <= req.asset.endVersion);
if (batchData->kvOps.find(commitVersion) == batchData->kvOps.end()) {
batchData->kvOps.insert(std::make_pair(commitVersion, MutationsVec()));
}
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
MutationRef mutation = mutations[mIndex];
TraceEvent(SevFRMutationInfo, "FastRestore")
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("ApplierNode", self->id())
.detail("RestoreAsset", req.asset.toString())
.detail("Version", commitVersion)
@ -152,313 +149,276 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
ASSERT(mutation.param1 >= req.asset.range.begin && mutation.param1 < req.asset.range.end);
}
}
batchData->kvOps[commitVersion].push_back_deep(batchData->kvOps[commitVersion].arena(), mutation);
// TODO: What if log file's mutations are delivered out-of-order (behind) the range file's mutations?!
// Note: Log and range mutations may be delivered out of order. Can we handle it?
if (mutation.type == MutationRef::SetVersionstampedKey ||
mutation.type == MutationRef::SetVersionstampedValue) {
batchData->addVersionStampedKV(mutation, commitVersion, numVersionStampedKV);
numVersionStampedKV++;
} else {
batchData->addMutation(mutation, commitVersion);
}
}
curFilePos.set(req.version);
}
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("ProcessedFileVersion", curFilePos.get())
.detail("Request", req.toString());
return Void();
}
// Progress and checkpoint for applying (atomic) mutations in transactions to DB
struct DBApplyProgress {
// Mutation state in the current uncommitted transaction
VersionedMutationsMap::iterator curItInCurTxn;
int curIndexInCurTxn;
// Save the starting point for current txn to handle (commit_unknown_result) error in txn commit
// startItInUncommittedTxn is starting iterator in the most recent uncommitted (and failed) txn
// startIndexInUncommittedTxn is start index in the most recent uncommitted (and failed) txn.
// Note: Txns have different number of mutations
VersionedMutationsMap::iterator startItInUncommittedTxn;
int startIndexInUncommittedTxn;
// State to decide if a txn succeeds or not when txn error (commit_unknown_result) happens;
// curTxnId: The id of the current uncommitted txn, which monotonically increase for each successful transaction
// uncommittedTxnId: The id of the most recent succeeded txn. Used to recover the failed txn id in retry
// lastTxnHasError: Does the last txn has error. TODO: Only need to handle txn_commit_unknown error
Version curTxnId;
Version uncommittedTxnId;
bool lastTxnHasError;
// Decide when to commit a transaction. We buffer enough mutations in a txn before commit the txn
bool startNextVersion; // The next txn will include mutations in next version
int numAtomicOps; // Status counter
double txnBytes; // Decide when to commit a txn
double txnMutations; // Status counter
Reference<ApplierBatchData> batchData;
UID applierId;
DBApplyProgress() = default;
explicit DBApplyProgress(UID applierId, Reference<ApplierBatchData> batchData)
: applierId(applierId), batchData(batchData), curIndexInCurTxn(0), startIndexInUncommittedTxn(0), curTxnId(0),
uncommittedTxnId(0), lastTxnHasError(false), startNextVersion(false), numAtomicOps(0), txnBytes(0),
txnMutations(0) {
curItInCurTxn = batchData->kvOps.begin();
while (curItInCurTxn != batchData->kvOps.end() && curItInCurTxn->second.empty()) {
curItInCurTxn++;
}
startItInUncommittedTxn = curItInCurTxn;
}
// Has all mutations been committed?
bool isDone() { return curItInCurTxn == batchData->kvOps.end(); }
// Set cursor for next mutation
void nextMutation() {
curIndexInCurTxn++;
while (curItInCurTxn != batchData->kvOps.end() && curIndexInCurTxn >= curItInCurTxn->second.size()) {
curIndexInCurTxn = 0;
curItInCurTxn++;
startNextVersion = true;
}
}
// Setup for the next transaction; This should be done after nextMutation()
void nextTxn() {
txnBytes = 0;
txnMutations = 0;
numAtomicOps = 0;
lastTxnHasError = false;
startNextVersion = false;
curTxnId++;
startIndexInUncommittedTxn = curIndexInCurTxn;
startItInUncommittedTxn = curItInCurTxn;
uncommittedTxnId = curTxnId;
}
// Rollback to the starting point of the uncommitted-and-failed transaction to
// re-execute uncommitted txn
void rollback() {
TraceEvent(SevWarn, "FastRestoreApplyTxnError")
.detail("TxnStatusFailed", curTxnId)
.detail("ApplierApplyToDB", applierId)
.detail("UncommittedTxnId", uncommittedTxnId)
.detail("CurIteratorVersion", curItInCurTxn->first)
.detail("StartIteratorVersionInUncommittedTxn", startItInUncommittedTxn->first)
.detail("CurrentIndexInFailedTxn", curIndexInCurTxn)
.detail("StartIndexInUncommittedTxn", startIndexInUncommittedTxn)
.detail("NumIncludedAtomicOps", numAtomicOps);
curItInCurTxn = startItInUncommittedTxn;
curIndexInCurTxn = startIndexInUncommittedTxn;
curTxnId = uncommittedTxnId;
numAtomicOps = 0;
txnBytes = 0;
txnMutations = 0;
startNextVersion = false;
lastTxnHasError = false;
}
bool shouldCommit() {
return (!lastTxnHasError && (startNextVersion || txnBytes >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES ||
curItInCurTxn == batchData->kvOps.end()));
}
bool hasError() { return lastTxnHasError; }
void setTxnError(Error& e) {
TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError")
.detail("TxnStatus", "?")
.detail("ApplierApplyToDB", applierId)
.detail("TxnId", curTxnId)
.detail("StartIndexInCurrentTxn", curIndexInCurTxn)
.detail("Version", curItInCurTxn->first)
.error(e, true);
lastTxnHasError = true;
}
MutationRef getCurrentMutation() {
ASSERT_WE_THINK(curIndexInCurTxn < curItInCurTxn->second.size());
return curItInCurTxn->second[curIndexInCurTxn];
}
};
ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<ApplierBatchData> batchData, Database cx) {
// state variables must be defined at the start of actor to be initialized in the actor constructor
state std::string typeStr = "";
// Clear all ranges in input ranges
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, Database cx) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state DBApplyProgress progress(applierID, batchData);
TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID)
.detail("BatchIndex", batchIndex)
.detail("FromVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.begin()->first)
.detail("EndVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.rbegin()->first);
// Assume the process will not crash when it apply mutations to DB. The reply message can be lost though
if (batchData->kvOps.empty()) {
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID)
.detail("BatchIndex", batchIndex)
.detail("Reason", "NoMutationAtVersions");
return Void();
}
batchData->sanityCheckMutationOps();
if (progress.isDone()) {
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID)
.detail("BatchIndex", batchIndex)
.detail("Reason", "NoMutationAtVersions");
return Void();
}
// Sanity check the restoreApplierKeys, which should be empty at this point
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Key begin = restoreApplierKeyFor(applierID, batchIndex, 0);
Key end = restoreApplierKeyFor(applierID, batchIndex, std::numeric_limits<int64_t>::max());
Standalone<RangeResultRef> txnIds = wait(tr->getRange(KeyRangeRef(begin, end), CLIENT_KNOBS->TOO_MANY));
if (txnIds.size() > 0) {
TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean").detail("TxnIds", txnIds.size());
for (auto& kv : txnIds) {
UID id;
int64_t index;
Version txnId;
std::tie(id, index, txnId) = decodeRestoreApplierKey(kv.key);
TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean")
.detail("Applier", id)
.detail("BatchIndex", index)
.detail("ResidueTxnID", txnId);
}
for (auto& range : ranges) {
tr->clear(range);
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
loop { // Transaction retry loop
try {
// Check if the transaction succeeds
if (progress.hasError()) {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> txnSucceeded =
wait(tr->get(restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId)));
if (!txnSucceeded.present()) {
progress.rollback();
continue;
} else {
TraceEvent(SevWarn, "FastRestoreApplyTxnError")
.detail("TxnStatusSucceeded", progress.curTxnId)
.detail("ApplierApplyToDB", applierID)
.detail("CurIteratorVersion", progress.curItInCurTxn->first)
.detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size())
.detail("CurrentIndexInSucceedTxn", progress.curIndexInCurTxn)
.detail("NumIncludedAtomicOps", progress.numAtomicOps);
// Txn succeeded and exectue the same logic when txn succeeds
}
} else { // !lastTxnHasError: accumulate mutations in a txn
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
TraceEvent(SevFRMutationInfo, "FastRestore_ApplierTxn")
.detail("ApplierApplyToDB", applierID)
.detail("TxnId", progress.curTxnId)
.detail("CurrentIndexInCurrentTxn", progress.curIndexInCurTxn)
.detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size())
.detail("Version", progress.curItInCurTxn->first);
// restoreApplierKeyFor(self->id(), curTxnId) to tell if txn succeeds at an unknown error
tr->set(restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId), restoreApplierTxnValue);
while (1) { // Loop: Accumulate mutations in a transaction
MutationRef m = progress.getCurrentMutation();
if (m.type >= MutationRef::Type::SetValue && m.type <= MutationRef::Type::MAX_ATOMIC_OP) {
typeStr = typeString[m.type];
} else {
TraceEvent(SevError, "FastRestore").detail("InvalidMutationType", m.type);
}
TraceEvent(SevFRMutationInfo, "FastRestore")
.detail("ApplierApplyToDB", applierID)
.detail("Version", progress.curItInCurTxn->first)
.detail("Index", progress.curIndexInCurTxn)
.detail("Mutation", m.toString())
.detail("MutationSize", m.totalSize())
.detail("TxnSize", progress.txnBytes);
if (m.type == MutationRef::SetValue) {
tr->set(m.param1, m.param2);
} else if (m.type == MutationRef::ClearRange) {
KeyRangeRef mutationRange(m.param1, m.param2);
tr->clear(mutationRange);
} else if (isAtomicOp((MutationRef::Type)m.type)) {
tr->atomicOp(m.param1, m.param2, m.type);
progress.numAtomicOps++;
} else {
TraceEvent(SevError, "FastRestore")
.detail("UnhandledMutationType", m.type)
.detail("TypeName", typeStr);
}
progress.txnBytes += m.totalSize(); // Changed expectedSize to totalSize
progress.txnMutations += 1;
progress.nextMutation(); // Prepare for the next mutation
// commit per FASTRESTORE_TXN_BATCH_MAX_BYTES bytes; and commit does not cross version boundary
if (progress.shouldCommit()) {
break; // Got enough mutation in the txn
}
}
} // !lastTxnHasError
// Commit the txn and prepare the starting point for next txn
if (progress.shouldCommit()) {
wait(tr->commit());
// Update status counter appliedWeightedBytes, appliedMutations, atomicOps
batchData->counters.appliedWeightedBytes += progress.txnBytes;
batchData->counters.appliedMutations += progress.txnMutations;
batchData->counters.appliedAtomicOps += progress.numAtomicOps;
batchData->counters.appliedTxns += 1;
}
if (progress.isDone()) { // Are all mutations processed?
break;
}
progress.nextTxn();
} catch (Error& e) {
TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError")
.detail("TxnStatus", "?")
.detail("ApplierApplyToDB", applierID)
.detail("TxnId", progress.curTxnId)
.detail("CurrentIndexInCurrentTxn", progress.curIndexInCurTxn)
.detail("Version", progress.curItInCurTxn->first)
.error(e, true);
progress.lastTxnHasError = true;
wait(tr->onError(e));
}
}
TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID)
.detail("BatchIndex", batchIndex)
.detail("CleanupCurTxnIds", progress.curTxnId);
// clean up txn ids
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// Clear txnIds in [0, progress.curTxnId). We add 100 to curTxnId just to be safe.
tr->clear(KeyRangeRef(restoreApplierKeyFor(applierID, batchIndex, 0),
restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId + 100)));
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
// House cleaning
batchData->kvOps.clear();
return Void();
}
// Get keys in imcompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
ACTOR static Future<Void> getAndComputeStagingKeys(
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::vector<Future<Optional<Value>>> fValues;
state int i = 0;
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (auto& key : imcompleteStagingKeys) {
fValues.push_back(tr->get(key.first));
}
wait(waitForAll(fValues));
break;
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("GetKeys", imcompleteStagingKeys.size())
.detail("Error", e.what())
.detail("ErrorCode", e.code());
wait(tr->onError(e));
fValues.clear();
}
}
ASSERT(fValues.size() == imcompleteStagingKeys.size());
int i = 0;
for (auto& key : imcompleteStagingKeys) {
if (!fValues[i].get().present()) {
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("Key", key.first)
.detail("Reason", "Not found in DB")
.detail("PendingMutations", key.second->second.pendingMutations.size())
.detail("StagingKeyType", (int)key.second->second.type);
for (auto& vm : key.second->second.pendingMutations) {
for (auto& m : vm.second) {
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("PendingMutationVersion", vm.first)
.detail("PendingMutation", m.toString());
}
}
key.second->second.precomputeResult();
i++;
continue;
} else {
// The key's version ideally should be the most recently committed version.
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
key.second->second.add(m, (Version)1);
key.second->second.precomputeResult();
i++;
}
}
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
return Void();
}
ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData> batchData, UID applierID,
int64_t batchIndex, Database cx) {
// Apply range mutations (i.e., clearRange) to database cx
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Step", "Applying clear range mutations to DB")
.detail("ClearRanges", batchData->stagingKeyRanges.size());
state std::vector<Future<Void>> fClearRanges;
std::vector<Standalone<VectorRef<KeyRangeRef>>> clearBuf;
clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>());
Standalone<VectorRef<KeyRangeRef>> clearRanges = clearBuf.back();
double curTxnSize = 0;
for (auto& rangeMutation : batchData->stagingKeyRanges) {
KeyRangeRef range(rangeMutation.mutation.param1, rangeMutation.mutation.param2);
clearRanges.push_back(clearRanges.arena(), range);
curTxnSize += range.expectedSize();
if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx));
clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>());
clearRanges = clearBuf.back();
curTxnSize = 0;
}
}
if (curTxnSize > 0) {
fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx));
}
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Step", "Applying clear range mutations to staging keys")
.detail("ClearRanges", batchData->stagingKeyRanges.size());
for (auto& rangeMutation : batchData->stagingKeyRanges) {
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2);
while (lb != ub) {
lb->second.add(rangeMutation.mutation, rangeMutation.version);
lb++;
}
}
wait(waitForAll(fClearRanges));
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Step", "Getting and computing staging keys")
.detail("StagingKeys", batchData->stagingKeys.size());
// Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys;
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
if (!stagingKeyIter->second.hasBaseValue()) {
imcompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter);
}
}
Future<Void> fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID);
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Step", "Compute the other staging keys")
.detail("StagingKeys", batchData->stagingKeys.size());
// Pre-compute pendingMutations to other keys in stagingKeys that has base value
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
stagingKeyIter++) {
if (stagingKeyIter->second.hasBaseValue()) {
stagingKeyIter->second.precomputeResult();
}
}
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysWaitOn", applierID);
wait(fGetAndComputeKeys);
// Sanity check all stagingKeys have been precomputed
ASSERT_WE_THINK(batchData->allKeysPrecomputed());
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResultDone", applierID).detail("BatchIndex", batchIndex);
return Void();
}
// Apply mutations in batchData->stagingKeys [begin, end).
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
std::map<Key, StagingKey>::iterator end, Database cx,
FlowLock* applyStagingKeysBatchLock, UID applierID) {
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB));
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int sets = 0;
state int clears = 0;
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first);
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
std::map<Key, StagingKey>::iterator iter = begin;
while (iter != end) {
if (iter->second.type == MutationRef::SetValue) {
tr->set(iter->second.key, iter->second.val);
sets++;
} else if (iter->second.type == MutationRef::ClearRange) {
tr->clear(KeyRangeRef(iter->second.key, iter->second.val));
clears++;
} else {
ASSERT(false);
}
iter++;
if (sets > 10000000 || clears > 10000000) {
TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID)
.detail("Begin", begin->first)
.detail("Sets", sets)
.detail("Clears", clears);
}
}
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID)
.detail("Begin", begin->first)
.detail("Sets", sets)
.detail("Clears", clears);
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
return Void();
}
// Apply mutations in stagingKeys in batches in parallel
ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData, UID applierID, int64_t batchIndex,
Database cx) {
std::map<Key, StagingKey>::iterator begin = batchData->stagingKeys.begin();
std::map<Key, StagingKey>::iterator cur = begin;
double txnSize = 0;
std::vector<Future<Void>> fBatches;
TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID)
.detail("BatchIndex", batchIndex)
.detail("StagingKeys", batchData->stagingKeys.size());
while (cur != batchData->stagingKeys.end()) {
txnSize += cur->second.expectedMutationSize();
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
begin = cur;
txnSize = 0;
}
cur++;
}
if (begin != batchData->stagingKeys.end()) {
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
}
wait(waitForAll(fBatches));
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID)
.detail("BatchIndex", batchIndex)
.detail("StagingKeys", batchData->stagingKeys.size());
return Void();
}
// Write mutations to the destination DB
ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Reference<ApplierBatchData> batchData,
Database cx) {
TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID).detail("BatchIndex", batchIndex);
wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx));
wait(applyStagingKeys(batchData, applierID, batchIndex, cx));
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID).detail("BatchIndex", batchIndex);
return Void();
@ -480,7 +440,7 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
if (!batchData->dbApplier.present()) {
isDuplicated = false;
batchData->dbApplier = Never();
batchData->dbApplier = applyToDB(self->id(), req.batchIndex, batchData, cx);
batchData->dbApplier = writeMutationsToDB(self->id(), req.batchIndex, batchData, cx);
}
ASSERT(batchData->dbApplier.present());
@ -496,4 +456,35 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
return Void();
}
}
// Copy from WriteDuringRead.actor.cpp with small modifications
// Not all AtomicOps are handled in this function: SetVersionstampedKey, SetVersionstampedValue, and CompareAndClear
Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type) {
Arena arena;
if (type == MutationRef::AddValue)
return doLittleEndianAdd(existingValue, value, arena);
else if (type == MutationRef::AppendIfFits)
return doAppendIfFits(existingValue, value, arena);
else if (type == MutationRef::And || type == MutationRef::AndV2)
return doAndV2(existingValue, value, arena);
else if (type == MutationRef::Or)
return doOr(existingValue, value, arena);
else if (type == MutationRef::Xor)
return doXor(existingValue, value, arena);
else if (type == MutationRef::Max)
return doMax(existingValue, value, arena);
else if (type == MutationRef::Min || type == MutationRef::MinV2)
return doMinV2(existingValue, value, arena);
else if (type == MutationRef::ByteMin)
return doByteMin(existingValue, value, arena);
else if (type == MutationRef::ByteMax)
return doByteMax(existingValue, value, arena);
else {
TraceEvent(SevError, "ApplyAtomicOpUnhandledType")
.detail("TypeCode", (int)type)
.detail("TypeName", typeString[type]);
ASSERT(false);
}
return Value();
}

View File

@ -29,6 +29,7 @@
#include <sstream>
#include "flow/Stats.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/fdbrpc.h"
@ -40,11 +41,174 @@
#include "flow/actorcompiler.h" // has to be last include
Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type);
// Key whose mutations are buffered on applier.
// key, value, type and version defines the parsed mutation at version.
// pendingMutations has all versioned mutations to be applied.
// Mutations in pendingMutations whose version is below the version in StagingKey can be ignored in applying phase.
struct StagingKey {
Key key; // TODO: Maybe not needed?
Value val;
MutationRef::Type type; // set or clear
Version version; // largest version of set or clear for the key
std::map<Version, MutationsVec> pendingMutations; // mutations not set or clear type
explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {}
// Add mutation m at newVersion to stagingKey
// Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set
void add(const MutationRef& m, Version newVersion) {
ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue);
if (version < newVersion) {
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
key = m.param1;
val = m.param2;
type = (MutationRef::Type)m.type;
version = newVersion;
} else {
if (pendingMutations.find(newVersion) == pendingMutations.end()) {
pendingMutations.emplace(newVersion, MutationsVec());
}
// TODO: Do we really need deep copy?
MutationsVec& mutations = pendingMutations[newVersion];
mutations.push_back_deep(mutations.arena(), m);
}
} else if (version == newVersion) { // Sanity check
TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion")
.detail("Version", newVersion)
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type]);
if (m.type == MutationRef::SetValue) {
if (type == MutationRef::SetValue) {
if (m.param2 != val) {
TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled")
.detail("Version", newVersion)
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type])
.detail("ExitingKeyValue", val)
.detail("Investigate",
"Why would backup have two sets with different value at same version");
} // else {} Backup has duplicate set at the same version
} else {
TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionOverride")
.detail("Version", newVersion)
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type])
.detail("ExitingKeyValue", val);
type = (MutationRef::Type)m.type;
val = m.param2;
}
} else if (m.type == MutationRef::ClearRange) {
TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionSkipped")
.detail("Version", newVersion)
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type])
.detail("ExitingKeyValue", val);
}
} // else input mutation is old and can be ignored
}
// Precompute the final value of the key.
void precomputeResult() {
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
.detail("Key", key)
.detail("Version", version)
.detail("LargestPendingVersion", (pendingMutations.empty() ? -1 : pendingMutations.rbegin()->first));
std::map<Version, MutationsVec>::iterator lb = pendingMutations.lower_bound(version);
if (lb == pendingMutations.end()) {
return;
}
if (lb->first == version) {
// Sanity check mutations at version are either atomicOps which can be ignored or the same value as buffered
for (int i = 0; i < lb->second.size(); i++) {
MutationRef m = lb->second[i];
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
if (std::tie(type, key, val) != std::tie(m.type, m.param1, m.param2)) {
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnhandledSituation")
.detail("BufferedType", typeString[type])
.detail("PendingType", typeString[m.type])
.detail("BufferedVal", val.toString())
.detail("PendingVal", m.param2.toString());
}
}
}
}
while (lb != pendingMutations.end()) {
if (lb->first == version) {
lb++;
continue;
}
for (auto& mutation : lb->second) {
if (type == MutationRef::CompareAndClear) { // Special atomicOp
Arena arena;
Optional<ValueRef> retVal = doCompareAndClear(val, mutation.param2, arena);
if (!retVal.present()) {
val = key;
type = MutationRef::ClearRange;
} // else no-op
} else if (isAtomicOp((MutationRef::Type)mutation.type)) {
Optional<StringRef> inputVal;
if (hasBaseValue()) {
inputVal = val;
}
val = applyAtomicOp(inputVal, mutation.param2, (MutationRef::Type)mutation.type);
type = MutationRef::SetValue; // Precomputed result should be set to DB.
} else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) {
type = MutationRef::SetValue; // Precomputed result should be set to DB.
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet")
.detail("Type", typeString[mutation.type])
.detail("Version", lb->first);
} else {
TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation")
.detail("Type", typeString[mutation.type])
.detail("Version", lb->first);
}
}
version = lb->first;
lb++;
}
}
// Does the key has at least 1 set or clear mutation to get the base value
bool hasBaseValue() {
if (version > 0) {
ASSERT(type == MutationRef::SetValue || type == MutationRef::ClearRange);
}
return version > 0;
}
// Has all pendingMutations been pre-applied to the val?
bool hasPrecomputed() {
ASSERT(pendingMutations.empty() || pendingMutations.rbegin()->first >= pendingMutations.begin()->first);
return pendingMutations.empty() || version >= pendingMutations.rbegin()->first;
}
int expectedMutationSize() { return key.size() + val.size(); }
};
// The range mutation received on applier.
// Range mutations should be applied both to the destination DB and to the StagingKeys
struct StagingKeyRange {
Standalone<MutationRef> mutation;
Version version;
explicit StagingKeyRange(MutationRef m, Version newVersion) : mutation(m), version(newVersion) {}
bool operator<(const StagingKeyRange& rhs) const {
return std::tie(version, mutation.type, mutation.param1, mutation.param2) <
std::tie(rhs.version, rhs.mutation.type, rhs.mutation.param1, rhs.mutation.param2);
}
};
struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
// processedFileState: key: RestoreAsset; value: largest version of mutation received on the applier
std::map<RestoreAsset, NotifiedVersion> processedFileState;
Optional<Future<Void>> dbApplier;
VersionedMutationsMap kvOps; // Mutations at each version
std::map<Key, StagingKey> stagingKeys;
std::set<StagingKeyRange> stagingKeyRanges;
FlowLock applyStagingKeysBatchLock;
Future<Void> pollMetrics;
@ -66,7 +230,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
void addref() { return ReferenceCounted<ApplierBatchData>::addref(); }
void delref() { return ReferenceCounted<ApplierBatchData>::delref(); }
explicit ApplierBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) {
explicit ApplierBatchData(UID nodeID, int batchIndex)
: counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM) {
pollMetrics =
traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY,
&counters.cc, nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));
@ -74,6 +239,50 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
}
~ApplierBatchData() = default;
void addMutation(MutationRef m, Version ver) {
if (!isRangeMutation(m)) {
auto item = stagingKeys.emplace(m.param1, StagingKey());
item.first->second.add(m, ver);
} else {
stagingKeyRanges.insert(StagingKeyRange(m, ver));
}
}
void addVersionStampedKV(MutationRef m, Version ver, uint16_t numVersionStampedKV) {
if (m.type == MutationRef::SetVersionstampedKey) {
// Assume transactionNumber = 0 does not affect result
TraceEvent(SevDebug, "FastRestoreApplierAddMutation")
.detail("MutationType", typeString[m.type])
.detail("FakedTransactionNumber", numVersionStampedKV);
transformVersionstampMutation(m, &MutationRef::param1, ver, numVersionStampedKV);
addMutation(m, ver);
} else if (m.type == MutationRef::SetVersionstampedValue) {
// Assume transactionNumber = 0 does not affect result
TraceEvent(SevDebug, "FastRestoreApplierAddMutation")
.detail("MutationType", typeString[m.type])
.detail("FakedTransactionNumber", numVersionStampedKV);
transformVersionstampMutation(m, &MutationRef::param2, ver, numVersionStampedKV);
addMutation(m, ver);
} else {
ASSERT(false);
}
}
// Return true if all staging keys have been precomputed
bool allKeysPrecomputed() {
for (auto& stagingKey : stagingKeys) {
if (!stagingKey.second.hasPrecomputed()) {
TraceEvent("FastRestoreApplierAllKeysPrecomputedFalse")
.detail("Key", stagingKey.first)
.detail("BufferedVersion", stagingKey.second.version)
.detail("MaxPendingVersion", stagingKey.second.pendingMutations.rbegin()->first);
return false;
}
}
TraceEvent("FastRestoreApplierAllKeysPrecomputed");
return true;
}
void reset() {
kvOps.clear();
dbApplier = Optional<Future<Void>>();

View File

@ -292,8 +292,6 @@ std::string RestoreConfigFR::toString() {
return ss.str();
}
//typedef RestoreConfigFR::RestoreFile RestoreFileFR;
// parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied
// The implementation of parallelFileRestore is copied from FileBackupAgent.actor.cpp
// parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied

View File

@ -264,7 +264,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile);
wait(batchStatus->sendAllRanges.get());
} else if (req.useRangeFile) {
} else {
TraceEvent(SevDebug, "FastRestoreSendMutationsSkipDuplicateRangeRequest", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile);

View File

@ -746,7 +746,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
int batchIndex, NotifiedVersion* finishedBatch) {
wait(finishedBatch->whenAtLeast(batchIndex - 1));
TraceEvent("FastRestoreMasterPhaseApplyToDBStart")
TraceEvent("FastRestoreMasterPhaseApplyToDB")
.detail("BatchIndex", batchIndex)
.detail("FinishedBatch", finishedBatch->get());
@ -754,7 +754,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
// Prepare the applyToDB requests
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
TraceEvent("FastRestoreMasterPhaseApplyMutations")
TraceEvent("FastRestoreMasterPhaseApplyToDB")
.detail("BatchIndex", batchIndex)
.detail("Appliers", appliersInterf.size());
for (auto& applier : appliersInterf) {
@ -770,7 +770,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
batchData->applyToDB = getBatchReplies(&RestoreApplierInterface::applyToDB, appliersInterf, requests,
&replies, TaskPriority::RestoreApplierWriteDB);
} else {
TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations")
TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB")
.detail("BatchIndex", batchIndex)
.detail("Attention", "Actor should not be invoked twice for the same batch index");
}
@ -783,7 +783,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
if (batchStatus->applyStatus[reply.id] == RestoreApplyStatus::Applying) {
batchStatus->applyStatus[reply.id] = RestoreApplyStatus::Applied;
if (reply.isDuplicated) {
TraceEvent(SevWarn, "FastRestoreNotifyApplierToApplierMutations")
TraceEvent(SevWarn, "FastRestoreMasterPhaseApplyToDB")
.detail("Applier", reply.id)
.detail("DuplicateRequestReturnEarlier", "Apply db request should have been processed");
}
@ -791,7 +791,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
}
for (auto& applier : appliersInterf) {
if (batchStatus->applyStatus[applier.first] != RestoreApplyStatus::Applied) {
TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations")
TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB")
.detail("Applier", applier.first)
.detail("ApplyStatus", batchStatus->applyStatus[applier.first]);
}

View File

@ -288,7 +288,7 @@ struct RandomSelectorWorkload : TestWorkload {
myValue = format("%d", deterministicRandom()->randomInt( 0, 10000000 ) );
//TraceEvent("RYOWor").detail("Key",myKeyA).detail("Value", myValue);
trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::Or);
loop {
try {
tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());
@ -311,7 +311,7 @@ struct RandomSelectorWorkload : TestWorkload {
myValue = format("%d", deterministicRandom()->randomInt( 0, 10000000 ) );
//TraceEvent("RYOWxor").detail("Key",myKeyA).detail("Value", myValue);
trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::Xor);
loop {
try {
tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());