2019-05-07 07:56:49 +08:00
|
|
|
/*
|
|
|
|
* RestoreApplier.actor.cpp
|
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
|
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
2019-05-10 11:55:44 +08:00
|
|
|
|
2019-05-13 12:53:09 +08:00
|
|
|
// This file defines the functions used by the RestoreApplier role.
|
|
|
|
// RestoreApplier role starts at restoreApplierCore actor
|
2019-05-10 11:55:44 +08:00
|
|
|
|
|
|
|
#include "fdbclient/NativeAPI.actor.h"
|
|
|
|
#include "fdbclient/SystemData.h"
|
|
|
|
#include "fdbclient/BackupAgent.actor.h"
|
|
|
|
#include "fdbclient/ManagementAPI.actor.h"
|
|
|
|
#include "fdbclient/MutationList.h"
|
|
|
|
#include "fdbclient/BackupContainer.h"
|
2020-02-29 08:00:47 +08:00
|
|
|
#include "fdbserver/Knobs.h"
|
2019-05-10 11:55:44 +08:00
|
|
|
#include "fdbserver/RestoreCommon.actor.h"
|
|
|
|
#include "fdbserver/RestoreUtil.h"
|
|
|
|
#include "fdbserver/RestoreRoleCommon.actor.h"
|
|
|
|
#include "fdbserver/RestoreApplier.actor.h"
|
|
|
|
|
2019-08-02 08:00:13 +08:00
|
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
2020-07-31 11:27:54 +08:00
|
|
|
#include "flow/network.h"
|
2019-05-10 11:55:44 +08:00
|
|
|
|
2020-02-20 07:29:14 +08:00
|
|
|
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
|
|
|
|
Reference<RestoreApplierData> self);
|
2019-08-02 08:00:13 +08:00
|
|
|
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
|
|
|
|
Database cx);
|
2019-05-10 11:55:44 +08:00
|
|
|
|
2019-07-25 07:59:05 +08:00
|
|
|
ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int nodeIndex, Database cx) {
|
2019-08-02 08:00:13 +08:00
|
|
|
state Reference<RestoreApplierData> self =
|
|
|
|
Reference<RestoreApplierData>(new RestoreApplierData(applierInterf.id(), nodeIndex));
|
2019-05-10 11:55:44 +08:00
|
|
|
state ActorCollection actors(false);
|
2019-05-23 04:30:33 +08:00
|
|
|
state Future<Void> exitRole = Never();
|
2020-02-11 07:06:03 +08:00
|
|
|
|
2020-06-29 03:33:07 +08:00
|
|
|
actors.add(updateProcessMetrics(self));
|
2020-05-05 02:20:53 +08:00
|
|
|
actors.add(traceProcessMetrics(self, "RestoreApplier"));
|
|
|
|
actors.add(traceRoleVersionBatchProgress(self, "RestoreApplier"));
|
2020-02-11 07:06:03 +08:00
|
|
|
|
2019-05-10 11:55:44 +08:00
|
|
|
loop {
|
|
|
|
state std::string requestTypeStr = "[Init]";
|
|
|
|
|
|
|
|
try {
|
|
|
|
choose {
|
2019-08-02 08:00:13 +08:00
|
|
|
when(RestoreSimpleRequest req = waitNext(applierInterf.heartbeat.getFuture())) {
|
2019-05-10 11:55:44 +08:00
|
|
|
requestTypeStr = "heartbeat";
|
2019-05-23 04:30:33 +08:00
|
|
|
actors.add(handleHeartbeat(req, applierInterf.id()));
|
2019-05-10 11:55:44 +08:00
|
|
|
}
|
2019-12-11 09:22:51 +08:00
|
|
|
when(RestoreSendVersionedMutationsRequest req =
|
2019-08-02 08:00:13 +08:00
|
|
|
waitNext(applierInterf.sendMutationVector.getFuture())) {
|
2019-05-10 11:55:44 +08:00
|
|
|
requestTypeStr = "sendMutationVector";
|
2020-02-20 07:29:14 +08:00
|
|
|
actors.add(handleSendMutationVectorRequest(req, self));
|
2019-05-10 11:55:44 +08:00
|
|
|
}
|
2019-08-02 08:00:13 +08:00
|
|
|
when(RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture())) {
|
2019-05-10 11:55:44 +08:00
|
|
|
requestTypeStr = "applyToDB";
|
2020-06-28 06:16:38 +08:00
|
|
|
actors.add(handleApplyToDBRequest(
|
|
|
|
req, self, cx)); // TODO: Check how FDB uses TaskPriority for ACTORS. We may need to add
|
|
|
|
// priority here to avoid requests at later VB block requests at earlier VBs
|
2019-05-10 11:55:44 +08:00
|
|
|
}
|
2019-08-02 08:00:13 +08:00
|
|
|
when(RestoreVersionBatchRequest req = waitNext(applierInterf.initVersionBatch.getFuture())) {
|
2019-05-10 11:55:44 +08:00
|
|
|
requestTypeStr = "initVersionBatch";
|
2020-01-18 03:06:07 +08:00
|
|
|
actors.add(handleInitVersionBatchRequest(req, self));
|
2019-05-10 11:55:44 +08:00
|
|
|
}
|
2020-01-17 08:19:51 +08:00
|
|
|
when(RestoreFinishRequest req = waitNext(applierInterf.finishRestore.getFuture())) {
|
2019-05-11 07:48:01 +08:00
|
|
|
requestTypeStr = "finishRestore";
|
2019-10-24 06:05:03 +08:00
|
|
|
handleFinishRestoreRequest(req, self);
|
2020-01-17 08:19:51 +08:00
|
|
|
if (req.terminate) {
|
|
|
|
exitRole = Void();
|
|
|
|
}
|
2019-05-11 07:48:01 +08:00
|
|
|
}
|
2020-06-06 07:40:19 +08:00
|
|
|
when(wait(actors.getResult())) {}
|
2019-08-02 08:00:13 +08:00
|
|
|
when(wait(exitRole)) {
|
2020-05-02 04:35:13 +08:00
|
|
|
TraceEvent("RestoreApplierCoreExitRole", self->id());
|
2019-05-23 04:30:33 +08:00
|
|
|
break;
|
2019-05-10 11:55:44 +08:00
|
|
|
}
|
|
|
|
}
|
2020-08-18 11:34:33 +08:00
|
|
|
TraceEvent("RestoreApplierCore", self->id()).detail("Request", requestTypeStr); // For debug only
|
2019-08-02 08:00:13 +08:00
|
|
|
} catch (Error& e) {
|
2020-05-03 10:51:52 +08:00
|
|
|
TraceEvent(SevWarn, "FastRestoreApplierError", self->id())
|
|
|
|
.detail("RequestType", requestTypeStr)
|
|
|
|
.error(e, true);
|
2020-06-06 07:40:19 +08:00
|
|
|
actors.clear(false);
|
2019-06-01 02:09:31 +08:00
|
|
|
break;
|
2019-05-10 11:55:44 +08:00
|
|
|
}
|
|
|
|
}
|
2019-06-01 02:09:31 +08:00
|
|
|
|
2019-05-10 11:55:44 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-07-31 07:43:35 +08:00
|
|
|
// The actor may be invoked multiple times and executed async.
|
2020-03-24 05:15:36 +08:00
|
|
|
// No race condition as long as we do not wait or yield when operate the shared
|
|
|
|
// data. Multiple such actors can run on different fileIDs.
|
|
|
|
// Different files may contain mutations of the same commit versions, but with
|
|
|
|
// different subsequence number.
|
|
|
|
// Only one actor can process mutations from the same file.
|
2020-02-20 07:29:14 +08:00
|
|
|
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
|
|
|
|
Reference<RestoreApplierData> self) {
|
2020-02-13 06:12:38 +08:00
|
|
|
state Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
2020-06-28 06:16:38 +08:00
|
|
|
state bool printTrace = false;
|
2020-02-13 06:12:38 +08:00
|
|
|
|
2020-08-18 13:42:41 +08:00
|
|
|
ASSERT(batchData.isValid());
|
|
|
|
ASSERT(self->finishedBatch.get() < req.batchIndex);
|
2020-06-29 02:11:58 +08:00
|
|
|
// wait(delay(0.0, TaskPriority::RestoreApplierReceiveMutations)); // This hurts performance from 100MB/s to 60MB/s
|
|
|
|
// on circus
|
2020-06-28 06:16:38 +08:00
|
|
|
|
|
|
|
batchData->receiveMutationReqs += 1;
|
|
|
|
// Trace when the receive phase starts at a VB and when it finishes.
|
2020-06-30 01:18:18 +08:00
|
|
|
// This can help check if receiveMutations block applyMutation phase.
|
|
|
|
// If so, we need more sophisticated scheduler to ensure priority execution
|
2020-08-18 11:34:33 +08:00
|
|
|
printTrace = (batchData->receiveMutationReqs % SERVER_KNOBS->FASTRESTORE_NUM_TRACE_EVENTS == 0);
|
2020-06-28 06:16:38 +08:00
|
|
|
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
2020-02-13 06:12:38 +08:00
|
|
|
.detail("BatchIndex", req.batchIndex)
|
|
|
|
.detail("RestoreAsset", req.asset.toString())
|
2020-08-18 13:42:41 +08:00
|
|
|
.detail("RestoreAssetMesssageIndex", batchData->processedFileState[req.asset].get())
|
2020-02-27 06:12:56 +08:00
|
|
|
.detail("Request", req.toString())
|
2020-02-28 11:23:29 +08:00
|
|
|
.detail("CurrentMemory", getSystemStatistics().processMemory)
|
2020-06-28 06:16:38 +08:00
|
|
|
.detail("PreviousVersionBatchState", batchData->vbState.get())
|
|
|
|
.detail("ReceiveMutationRequests", batchData->receiveMutationReqs);
|
2020-02-27 06:12:56 +08:00
|
|
|
|
2020-02-27 06:40:01 +08:00
|
|
|
wait(isSchedulable(self, req.batchIndex, __FUNCTION__));
|
2020-02-13 06:12:38 +08:00
|
|
|
|
2020-08-18 13:42:41 +08:00
|
|
|
ASSERT(batchData.isValid());
|
|
|
|
// Assume: processedFileState[req.asset] will not be erased while the actor is active.
|
|
|
|
// Note: Insert new items into processedFileState will not invalidate the reference.
|
|
|
|
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset];
|
2020-04-12 13:31:55 +08:00
|
|
|
wait(curMsgIndex.whenAtLeast(req.msgIndex - 1));
|
2020-02-28 11:23:29 +08:00
|
|
|
batchData->vbState = ApplierVersionBatchState::RECEIVE_MUTATIONS;
|
2020-02-13 06:12:38 +08:00
|
|
|
|
|
|
|
state bool isDuplicated = true;
|
2020-04-12 13:31:55 +08:00
|
|
|
if (curMsgIndex.get() == req.msgIndex - 1) {
|
2020-02-13 06:12:38 +08:00
|
|
|
isDuplicated = false;
|
|
|
|
|
2020-04-22 13:04:42 +08:00
|
|
|
for (int mIndex = 0; mIndex < req.versionedMutations.size(); mIndex++) {
|
|
|
|
const VersionedMutation& versionedMutation = req.versionedMutations[mIndex];
|
2020-06-25 13:10:54 +08:00
|
|
|
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
2020-02-13 06:12:38 +08:00
|
|
|
.detail("RestoreAsset", req.asset.toString())
|
2020-04-22 13:04:42 +08:00
|
|
|
.detail("Version", versionedMutation.version.toString())
|
2020-02-13 06:12:38 +08:00
|
|
|
.detail("Index", mIndex)
|
2020-04-22 13:04:42 +08:00
|
|
|
.detail("MutationReceived", versionedMutation.mutation.toString());
|
2020-08-01 05:59:45 +08:00
|
|
|
batchData->receivedBytes += versionedMutation.mutation.totalSize();
|
2020-04-22 13:04:42 +08:00
|
|
|
batchData->counters.receivedBytes += versionedMutation.mutation.totalSize();
|
|
|
|
batchData->counters.receivedWeightedBytes +=
|
|
|
|
versionedMutation.mutation.weightedTotalSize(); // atomicOp will be amplified
|
2020-02-13 06:12:38 +08:00
|
|
|
batchData->counters.receivedMutations += 1;
|
2020-04-22 13:04:42 +08:00
|
|
|
batchData->counters.receivedAtomicOps +=
|
|
|
|
isAtomicOp((MutationRef::Type)versionedMutation.mutation.type) ? 1 : 0;
|
2020-02-13 06:12:38 +08:00
|
|
|
// Sanity check
|
2020-04-22 13:04:42 +08:00
|
|
|
ASSERT_WE_THINK(req.asset.isInVersionRange(versionedMutation.version.version));
|
2020-06-22 13:18:07 +08:00
|
|
|
ASSERT_WE_THINK(req.asset.isInKeyRange(
|
|
|
|
versionedMutation.mutation)); // mutation is already applied removePrefix and addPrefix
|
2020-04-08 06:56:44 +08:00
|
|
|
|
2020-02-13 06:12:38 +08:00
|
|
|
// Note: Log and range mutations may be delivered out of order. Can we handle it?
|
2020-04-22 13:04:42 +08:00
|
|
|
batchData->addMutation(versionedMutation.mutation, versionedMutation.version);
|
2020-04-07 13:27:47 +08:00
|
|
|
|
2020-04-22 13:04:42 +08:00
|
|
|
ASSERT(versionedMutation.mutation.type != MutationRef::SetVersionstampedKey &&
|
|
|
|
versionedMutation.mutation.type != MutationRef::SetVersionstampedValue);
|
2020-02-13 06:12:38 +08:00
|
|
|
}
|
2020-04-12 13:31:55 +08:00
|
|
|
curMsgIndex.set(req.msgIndex);
|
2020-02-13 06:12:38 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
2020-06-28 06:16:38 +08:00
|
|
|
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
|
2020-02-14 03:26:56 +08:00
|
|
|
.detail("BatchIndex", req.batchIndex)
|
|
|
|
.detail("RestoreAsset", req.asset.toString())
|
2020-04-12 13:31:55 +08:00
|
|
|
.detail("ProcessedMessageIndex", curMsgIndex.get())
|
2020-02-14 03:26:56 +08:00
|
|
|
.detail("Request", req.toString());
|
2020-02-13 06:12:38 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-02-13 13:45:29 +08:00
|
|
|
// Clear all ranges in input ranges
|
2020-05-05 23:46:18 +08:00
|
|
|
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, double delayTime,
|
2020-06-28 06:21:06 +08:00
|
|
|
Database cx, UID applierID, int batchIndex,
|
|
|
|
ApplierBatchData::Counters* cc) {
|
2020-02-13 13:45:29 +08:00
|
|
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
2020-05-08 06:06:59 +08:00
|
|
|
state int retries = 0;
|
2020-05-06 00:00:02 +08:00
|
|
|
state double numOps = 0;
|
2020-05-05 23:46:18 +08:00
|
|
|
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
|
2020-08-11 12:34:47 +08:00
|
|
|
TraceEvent(delayTime > 5 ? SevWarnAlways : SevDebug, "FastRestoreApplierClearRangeMutationsStart", applierID)
|
2020-05-05 23:46:18 +08:00
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("Ranges", ranges.size())
|
|
|
|
.detail("DelayTime", delayTime);
|
2020-07-31 03:10:32 +08:00
|
|
|
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) {
|
|
|
|
TraceEvent("FastRestoreApplierClearRangeMutationsNotWriteDB", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("Ranges", ranges.size());
|
2020-07-31 11:27:54 +08:00
|
|
|
ASSERT(!g_network->isSimulated());
|
2020-07-31 03:10:32 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-02-13 13:45:29 +08:00
|
|
|
loop {
|
|
|
|
try {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
|
|
for (auto& range : ranges) {
|
2020-04-08 06:57:03 +08:00
|
|
|
debugFRMutation("FastRestoreApplierApplyClearRangeMutation", 0,
|
|
|
|
MutationRef(MutationRef::ClearRange, range.begin, range.end));
|
2020-02-13 13:45:29 +08:00
|
|
|
tr->clear(range);
|
2020-06-28 06:16:38 +08:00
|
|
|
cc->clearOps += 1;
|
2020-05-06 00:00:02 +08:00
|
|
|
++numOps;
|
2020-05-06 00:28:45 +08:00
|
|
|
if (numOps >= SERVER_KNOBS->FASTRESTORE_TXN_CLEAR_MAX) {
|
2020-06-25 13:10:54 +08:00
|
|
|
TraceEvent(SevWarn, "FastRestoreApplierClearRangeMutationsTooManyClearsInTxn")
|
|
|
|
.suppressFor(5.0)
|
2020-05-06 00:00:02 +08:00
|
|
|
.detail("Clears", numOps)
|
|
|
|
.detail("Ranges", ranges.size())
|
2020-05-06 03:36:32 +08:00
|
|
|
.detail("Range", range.toString());
|
2020-05-06 00:00:02 +08:00
|
|
|
}
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
wait(tr->commit());
|
2020-06-28 06:16:38 +08:00
|
|
|
cc->clearTxns += 1;
|
2020-02-13 13:45:29 +08:00
|
|
|
break;
|
|
|
|
} catch (Error& e) {
|
2020-05-08 06:06:59 +08:00
|
|
|
retries++;
|
|
|
|
if (retries > SERVER_KNOBS->FASTRESTORE_TXN_RETRY_MAX) {
|
2020-05-05 13:32:57 +08:00
|
|
|
TraceEvent(SevWarnAlways, "RestoreApplierApplyClearRangeMutationsStuck", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("ClearRanges", ranges.size())
|
|
|
|
.error(e);
|
|
|
|
}
|
2020-02-13 13:45:29 +08:00
|
|
|
wait(tr->onError(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-03-03 03:33:07 +08:00
|
|
|
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
|
2020-02-13 13:45:29 +08:00
|
|
|
ACTOR static Future<Void> getAndComputeStagingKeys(
|
2020-05-05 23:46:18 +08:00
|
|
|
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, double delayTime, Database cx,
|
2020-06-27 15:20:54 +08:00
|
|
|
UID applierID, int batchIndex, ApplierBatchData::Counters* cc) {
|
2020-02-13 13:45:29 +08:00
|
|
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
2020-06-04 12:17:27 +08:00
|
|
|
state std::vector<Future<Optional<Value>>> fValues(incompleteStagingKeys.size(), Never());
|
2020-02-29 08:00:47 +08:00
|
|
|
state int retries = 0;
|
2020-06-07 12:17:57 +08:00
|
|
|
state UID randomID = deterministicRandom()->randomUniqueID();
|
2020-03-17 09:22:24 +08:00
|
|
|
|
2020-06-28 01:13:34 +08:00
|
|
|
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
|
2020-06-04 06:28:59 +08:00
|
|
|
|
2020-07-31 02:17:05 +08:00
|
|
|
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) { // Get dummy value to short-circut DB
|
2020-07-31 07:43:35 +08:00
|
|
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStartNotUseDB", applierID)
|
|
|
|
.detail("RandomUID", randomID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("GetKeys", incompleteStagingKeys.size())
|
|
|
|
.detail("DelayTime", delayTime);
|
2020-07-31 11:27:54 +08:00
|
|
|
ASSERT(!g_network->isSimulated());
|
2020-07-31 02:17:05 +08:00
|
|
|
int i = 0;
|
|
|
|
for (auto& key : incompleteStagingKeys) {
|
|
|
|
MutationRef m(MutationRef::SetValue, key.first, LiteralStringRef("0"));
|
|
|
|
key.second->second.add(m, LogMessageVersion(1));
|
|
|
|
key.second->second.precomputeResult("GetAndComputeStagingKeys", applierID, batchIndex);
|
|
|
|
i++;
|
|
|
|
}
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-07-31 07:43:35 +08:00
|
|
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
|
|
|
|
.detail("RandomUID", randomID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("GetKeys", incompleteStagingKeys.size())
|
|
|
|
.detail("DelayTime", delayTime);
|
|
|
|
|
2020-02-14 13:37:40 +08:00
|
|
|
loop {
|
2020-06-06 07:40:19 +08:00
|
|
|
try {
|
2020-06-09 11:11:47 +08:00
|
|
|
int i = 0;
|
2020-06-06 07:40:19 +08:00
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
|
|
for (auto& key : incompleteStagingKeys) {
|
2020-06-09 11:11:47 +08:00
|
|
|
fValues[i++] = tr->get(key.first);
|
2020-06-27 15:20:54 +08:00
|
|
|
cc->fetchKeys += 1;
|
2020-06-04 12:17:27 +08:00
|
|
|
}
|
2020-06-08 11:35:07 +08:00
|
|
|
wait(waitForAll(fValues));
|
2020-06-27 15:20:54 +08:00
|
|
|
cc->fetchTxns += 1;
|
2020-06-08 11:35:07 +08:00
|
|
|
break;
|
2020-06-06 07:40:19 +08:00
|
|
|
} catch (Error& e) {
|
2020-06-27 15:20:54 +08:00
|
|
|
cc->fetchTxnRetries += 1;
|
2020-06-09 11:11:47 +08:00
|
|
|
if (retries++ > incompleteStagingKeys.size()) {
|
|
|
|
TraceEvent(SevWarnAlways, "GetAndComputeStagingKeys", applierID)
|
|
|
|
.suppressFor(1.0)
|
|
|
|
.detail("RandomUID", randomID)
|
2020-06-06 07:40:19 +08:00
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.error(e);
|
|
|
|
}
|
|
|
|
wait(tr->onError(e));
|
2020-06-04 06:28:59 +08:00
|
|
|
}
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
|
2020-03-03 03:33:07 +08:00
|
|
|
ASSERT(fValues.size() == incompleteStagingKeys.size());
|
2020-03-17 09:22:24 +08:00
|
|
|
int i = 0;
|
2020-03-03 03:33:07 +08:00
|
|
|
for (auto& key : incompleteStagingKeys) {
|
2020-06-09 11:11:47 +08:00
|
|
|
if (!fValues[i].get().present()) { // Key not exist in DB
|
2020-06-05 12:26:09 +08:00
|
|
|
// if condition: fValues[i].Valid() && fValues[i].isReady() && !fValues[i].isError() &&
|
2020-08-11 12:34:47 +08:00
|
|
|
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
|
2020-06-25 13:10:54 +08:00
|
|
|
.suppressFor(5.0)
|
2020-05-05 07:25:07 +08:00
|
|
|
.detail("BatchIndex", batchIndex)
|
2020-02-14 15:06:44 +08:00
|
|
|
.detail("Key", key.first)
|
2020-06-04 06:28:59 +08:00
|
|
|
.detail("IsReady", fValues[i].isReady())
|
2020-02-14 15:06:44 +08:00
|
|
|
.detail("PendingMutations", key.second->second.pendingMutations.size())
|
2020-06-04 06:28:59 +08:00
|
|
|
.detail("StagingKeyType", getTypeString(key.second->second.type));
|
2020-02-14 15:06:44 +08:00
|
|
|
for (auto& vm : key.second->second.pendingMutations) {
|
2020-08-11 12:34:47 +08:00
|
|
|
TraceEvent(SevDebug, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
|
2020-03-17 09:22:24 +08:00
|
|
|
.detail("PendingMutationVersion", vm.first.toString())
|
|
|
|
.detail("PendingMutation", vm.second.toString());
|
2020-02-14 14:17:04 +08:00
|
|
|
}
|
2020-05-05 07:25:07 +08:00
|
|
|
key.second->second.precomputeResult("GetAndComputeStagingKeysNoBaseValueInDB", applierID, batchIndex);
|
2020-02-14 14:44:15 +08:00
|
|
|
} else {
|
|
|
|
// The key's version ideally should be the most recently committed version.
|
|
|
|
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
|
|
|
|
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
|
2020-02-22 03:47:51 +08:00
|
|
|
key.second->second.add(m, LogMessageVersion(1));
|
2020-05-05 07:25:07 +08:00
|
|
|
key.second->second.precomputeResult("GetAndComputeStagingKeys", applierID, batchIndex);
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
2020-05-08 06:06:59 +08:00
|
|
|
i++;
|
2020-02-14 13:37:40 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
|
2020-06-07 12:17:57 +08:00
|
|
|
.detail("RandomUID", randomID)
|
2020-05-05 07:25:07 +08:00
|
|
|
.detail("BatchIndex", batchIndex)
|
2020-06-05 12:26:09 +08:00
|
|
|
.detail("GetKeys", incompleteStagingKeys.size())
|
|
|
|
.detail("DelayTime", delayTime);
|
2020-02-14 13:37:40 +08:00
|
|
|
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-02-13 13:45:29 +08:00
|
|
|
ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData> batchData, UID applierID,
|
|
|
|
int64_t batchIndex, Database cx) {
|
2020-02-14 03:26:56 +08:00
|
|
|
// Apply range mutations (i.e., clearRange) to database cx
|
2020-05-04 11:57:09 +08:00
|
|
|
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResultStart", applierID)
|
2020-02-13 13:45:29 +08:00
|
|
|
.detail("BatchIndex", batchIndex)
|
2020-02-14 07:39:50 +08:00
|
|
|
.detail("Step", "Applying clear range mutations to DB")
|
2020-02-20 07:22:52 +08:00
|
|
|
.detail("ClearRanges", batchData->stagingKeyRanges.size());
|
2020-02-13 13:45:29 +08:00
|
|
|
state std::vector<Future<Void>> fClearRanges;
|
2020-05-05 13:32:57 +08:00
|
|
|
Standalone<VectorRef<KeyRangeRef>> clearRanges;
|
2020-02-13 13:45:29 +08:00
|
|
|
double curTxnSize = 0;
|
2020-05-05 23:46:18 +08:00
|
|
|
double delayTime = 0;
|
2020-02-13 13:45:29 +08:00
|
|
|
for (auto& rangeMutation : batchData->stagingKeyRanges) {
|
|
|
|
KeyRangeRef range(rangeMutation.mutation.param1, rangeMutation.mutation.param2);
|
2020-04-08 06:57:03 +08:00
|
|
|
debugFRMutation("FastRestoreApplierPrecomputeMutationsResultClearRange", rangeMutation.version.version,
|
|
|
|
MutationRef(MutationRef::ClearRange, range.begin, range.end));
|
2020-05-05 13:32:57 +08:00
|
|
|
clearRanges.push_back_deep(clearRanges.arena(), range);
|
2020-02-13 13:45:29 +08:00
|
|
|
curTxnSize += range.expectedSize();
|
|
|
|
if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
2020-06-28 06:16:38 +08:00
|
|
|
fClearRanges.push_back(
|
|
|
|
applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex, &batchData->counters));
|
2020-06-30 12:16:30 +08:00
|
|
|
delayTime += SERVER_KNOBS->FASTRESTORE_TXN_EXTRA_DELAY;
|
2020-05-05 13:32:57 +08:00
|
|
|
clearRanges = Standalone<VectorRef<KeyRangeRef>>();
|
2020-02-13 13:45:29 +08:00
|
|
|
curTxnSize = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (curTxnSize > 0) {
|
2020-06-28 06:16:38 +08:00
|
|
|
fClearRanges.push_back(
|
|
|
|
applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex, &batchData->counters));
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
|
2020-02-14 03:26:56 +08:00
|
|
|
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
2020-02-14 07:39:50 +08:00
|
|
|
.detail("Step", "Applying clear range mutations to staging keys")
|
2020-05-05 13:32:57 +08:00
|
|
|
.detail("ClearRanges", batchData->stagingKeyRanges.size())
|
|
|
|
.detail("FutureClearRanges", fClearRanges.size());
|
2020-02-13 13:45:29 +08:00
|
|
|
for (auto& rangeMutation : batchData->stagingKeyRanges) {
|
2020-05-05 13:32:57 +08:00
|
|
|
ASSERT(rangeMutation.mutation.param1 <= rangeMutation.mutation.param2);
|
2020-02-13 13:45:29 +08:00
|
|
|
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
|
2020-04-01 07:43:53 +08:00
|
|
|
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param2);
|
2020-02-13 13:45:29 +08:00
|
|
|
while (lb != ub) {
|
2020-04-01 07:43:53 +08:00
|
|
|
if (lb->first >= rangeMutation.mutation.param2) {
|
2020-05-05 07:25:07 +08:00
|
|
|
TraceEvent(SevError, "FastRestoreApplerPhasePrecomputeMutationsResultIncorrectUpperBound")
|
2020-04-02 12:27:49 +08:00
|
|
|
.detail("Key", lb->first)
|
|
|
|
.detail("ClearRangeUpperBound", rangeMutation.mutation.param2)
|
|
|
|
.detail("UsedUpperBound", ub->first);
|
2020-04-01 07:43:53 +08:00
|
|
|
}
|
2020-04-08 06:56:44 +08:00
|
|
|
// We make the beginKey = endKey for the ClearRange on purpose so that
|
|
|
|
// we can sanity check ClearRange mutation when we apply it to DB.
|
2020-04-01 08:45:19 +08:00
|
|
|
MutationRef clearKey(MutationRef::ClearRange, lb->first, lb->first);
|
|
|
|
lb->second.add(clearKey, rangeMutation.version);
|
2020-02-14 03:26:56 +08:00
|
|
|
lb++;
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
}
|
2020-05-05 13:32:57 +08:00
|
|
|
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("Step", "Wait on applying clear range mutations to DB")
|
|
|
|
.detail("FutureClearRanges", fClearRanges.size());
|
2020-02-13 13:45:29 +08:00
|
|
|
|
|
|
|
wait(waitForAll(fClearRanges));
|
|
|
|
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
2020-02-14 07:39:50 +08:00
|
|
|
.detail("Step", "Getting and computing staging keys")
|
2020-02-20 07:22:52 +08:00
|
|
|
.detail("StagingKeys", batchData->stagingKeys.size());
|
2020-02-13 13:45:29 +08:00
|
|
|
|
|
|
|
// Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value
|
2020-02-29 08:00:47 +08:00
|
|
|
std::vector<Future<Void>> fGetAndComputeKeys;
|
2020-04-09 03:21:53 +08:00
|
|
|
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys;
|
2020-02-13 13:45:29 +08:00
|
|
|
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
|
2020-02-29 08:00:47 +08:00
|
|
|
int numKeysInBatch = 0;
|
2020-08-18 11:34:33 +08:00
|
|
|
int numGetTxns = 0;
|
2020-06-27 15:20:54 +08:00
|
|
|
double delayTime = 0; // Start transactions at different time to avoid overwhelming FDB.
|
2020-02-13 13:45:29 +08:00
|
|
|
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
|
|
|
|
if (!stagingKeyIter->second.hasBaseValue()) {
|
2020-03-03 03:33:07 +08:00
|
|
|
incompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter);
|
2020-02-29 08:00:47 +08:00
|
|
|
numKeysInBatch++;
|
|
|
|
}
|
|
|
|
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
|
2020-06-28 01:13:34 +08:00
|
|
|
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID,
|
|
|
|
batchIndex, &batchData->counters));
|
2020-08-18 11:34:33 +08:00
|
|
|
numGetTxns++;
|
2020-06-30 12:16:30 +08:00
|
|
|
delayTime += SERVER_KNOBS->FASTRESTORE_TXN_EXTRA_DELAY;
|
2020-02-29 08:00:47 +08:00
|
|
|
numKeysInBatch = 0;
|
2020-04-09 03:21:53 +08:00
|
|
|
incompleteStagingKeys.clear();
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
}
|
2020-06-07 12:17:57 +08:00
|
|
|
if (numKeysInBatch > 0) {
|
2020-08-18 11:34:33 +08:00
|
|
|
numGetTxns++;
|
2020-06-27 15:20:54 +08:00
|
|
|
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID,
|
|
|
|
batchIndex, &batchData->counters));
|
2020-02-29 08:00:47 +08:00
|
|
|
}
|
2020-02-13 13:45:29 +08:00
|
|
|
|
|
|
|
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
2020-02-14 07:39:50 +08:00
|
|
|
.detail("Step", "Compute the other staging keys")
|
2020-08-18 11:34:33 +08:00
|
|
|
.detail("StagingKeys", batchData->stagingKeys.size())
|
|
|
|
.detail("GetStagingKeyBatchTxns", numGetTxns);
|
2020-02-13 13:45:29 +08:00
|
|
|
// Pre-compute pendingMutations to other keys in stagingKeys that has base value
|
|
|
|
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
|
|
|
|
stagingKeyIter++) {
|
|
|
|
if (stagingKeyIter->second.hasBaseValue()) {
|
2020-05-05 07:25:07 +08:00
|
|
|
stagingKeyIter->second.precomputeResult("HasBaseValue", applierID, batchIndex);
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-14 01:24:53 +08:00
|
|
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysWaitOn", applierID);
|
2020-02-29 08:00:47 +08:00
|
|
|
wait(waitForAll(fGetAndComputeKeys));
|
2020-02-13 13:45:29 +08:00
|
|
|
|
|
|
|
// Sanity check all stagingKeys have been precomputed
|
|
|
|
ASSERT_WE_THINK(batchData->allKeysPrecomputed());
|
|
|
|
|
|
|
|
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResultDone", applierID).detail("BatchIndex", batchIndex);
|
|
|
|
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Apply mutations in batchData->stagingKeys [begin, end).
|
|
|
|
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
|
|
|
|
std::map<Key, StagingKey>::iterator end, Database cx,
|
2020-06-27 15:20:54 +08:00
|
|
|
FlowLock* applyStagingKeysBatchLock, UID applierID,
|
|
|
|
ApplierBatchData::Counters* cc) {
|
2020-07-31 00:22:06 +08:00
|
|
|
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) {
|
|
|
|
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchSkipped", applierID).detail("Begin", begin->first);
|
2020-07-31 11:27:54 +08:00
|
|
|
ASSERT(!g_network->isSimulated());
|
2020-07-31 00:22:06 +08:00
|
|
|
return Void();
|
|
|
|
}
|
2020-06-28 01:13:34 +08:00
|
|
|
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock?
|
|
|
|
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
2020-02-13 13:45:29 +08:00
|
|
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
2020-02-14 13:37:40 +08:00
|
|
|
state int sets = 0;
|
|
|
|
state int clears = 0;
|
2020-04-06 06:00:36 +08:00
|
|
|
state Key endKey = begin->second.key;
|
2020-06-25 13:10:54 +08:00
|
|
|
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first);
|
2020-02-13 13:45:29 +08:00
|
|
|
loop {
|
|
|
|
try {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
|
|
|
std::map<Key, StagingKey>::iterator iter = begin;
|
|
|
|
while (iter != end) {
|
|
|
|
if (iter->second.type == MutationRef::SetValue) {
|
|
|
|
tr->set(iter->second.key, iter->second.val);
|
2020-06-27 15:20:54 +08:00
|
|
|
cc->appliedMutations += 1;
|
2020-04-08 06:57:03 +08:00
|
|
|
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
|
|
|
|
.detail("SetKey", iter->second.key);
|
2020-02-14 13:37:40 +08:00
|
|
|
sets++;
|
2020-02-13 13:45:29 +08:00
|
|
|
} else if (iter->second.type == MutationRef::ClearRange) {
|
2020-04-05 08:33:59 +08:00
|
|
|
if (iter->second.key != iter->second.val) {
|
|
|
|
TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchClearTooMuchData", applierID)
|
|
|
|
.detail("KeyBegin", iter->second.key)
|
|
|
|
.detail("KeyEnd", iter->second.val)
|
|
|
|
.detail("Version", iter->second.version.version)
|
|
|
|
.detail("SubVersion", iter->second.version.sub);
|
|
|
|
}
|
2020-04-07 03:24:24 +08:00
|
|
|
tr->clear(singleKeyRange(iter->second.key));
|
2020-06-27 15:20:54 +08:00
|
|
|
cc->appliedMutations += 1;
|
2020-04-08 06:57:03 +08:00
|
|
|
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
|
|
|
|
.detail("ClearKey", iter->second.key);
|
2020-02-14 13:37:40 +08:00
|
|
|
clears++;
|
2020-02-13 13:45:29 +08:00
|
|
|
} else {
|
|
|
|
ASSERT(false);
|
|
|
|
}
|
2020-04-06 06:00:36 +08:00
|
|
|
endKey = iter != end ? iter->second.key : endKey;
|
2020-02-13 13:45:29 +08:00
|
|
|
iter++;
|
2020-02-14 13:37:40 +08:00
|
|
|
if (sets > 10000000 || clears > 10000000) {
|
2020-02-20 07:22:52 +08:00
|
|
|
TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID)
|
|
|
|
.detail("Begin", begin->first)
|
|
|
|
.detail("Sets", sets)
|
|
|
|
.detail("Clears", clears);
|
2020-02-14 13:37:40 +08:00
|
|
|
}
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
2020-06-25 13:10:54 +08:00
|
|
|
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID)
|
2020-02-20 07:22:52 +08:00
|
|
|
.detail("Begin", begin->first)
|
2020-04-08 06:57:03 +08:00
|
|
|
.detail("End", endKey)
|
2020-02-20 07:22:52 +08:00
|
|
|
.detail("Sets", sets)
|
|
|
|
.detail("Clears", clears);
|
2020-02-13 13:45:29 +08:00
|
|
|
wait(tr->commit());
|
2020-06-27 15:20:54 +08:00
|
|
|
cc->appliedTxns += 1;
|
2020-02-13 13:45:29 +08:00
|
|
|
break;
|
|
|
|
} catch (Error& e) {
|
2020-06-27 15:20:54 +08:00
|
|
|
cc->appliedTxnRetries += 1;
|
2020-02-13 13:45:29 +08:00
|
|
|
wait(tr->onError(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Apply mutations in stagingKeys in batches in parallel
|
|
|
|
ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData, UID applierID, int64_t batchIndex,
|
|
|
|
Database cx) {
|
|
|
|
std::map<Key, StagingKey>::iterator begin = batchData->stagingKeys.begin();
|
|
|
|
std::map<Key, StagingKey>::iterator cur = begin;
|
2020-06-29 03:33:07 +08:00
|
|
|
state int txnBatches = 0;
|
2020-02-13 13:45:29 +08:00
|
|
|
double txnSize = 0;
|
|
|
|
std::vector<Future<Void>> fBatches;
|
2020-05-04 11:57:09 +08:00
|
|
|
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysStart", applierID)
|
2020-02-20 07:22:52 +08:00
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("StagingKeys", batchData->stagingKeys.size());
|
2020-02-13 13:45:29 +08:00
|
|
|
while (cur != batchData->stagingKeys.end()) {
|
|
|
|
txnSize += cur->second.expectedMutationSize();
|
|
|
|
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
2020-06-27 15:20:54 +08:00
|
|
|
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
|
|
|
&batchData->counters));
|
2020-06-30 01:18:18 +08:00
|
|
|
batchData->counters.appliedBytes += txnSize;
|
2020-08-01 05:59:45 +08:00
|
|
|
batchData->appliedBytes += txnSize;
|
2020-02-13 13:45:29 +08:00
|
|
|
begin = cur;
|
2020-02-14 07:39:50 +08:00
|
|
|
txnSize = 0;
|
2020-06-29 03:33:07 +08:00
|
|
|
txnBatches++;
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
cur++;
|
|
|
|
}
|
2020-02-14 07:39:50 +08:00
|
|
|
if (begin != batchData->stagingKeys.end()) {
|
2020-06-27 15:20:54 +08:00
|
|
|
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
|
|
|
&batchData->counters));
|
2020-06-30 01:18:18 +08:00
|
|
|
batchData->counters.appliedBytes += txnSize;
|
2020-08-01 05:59:45 +08:00
|
|
|
batchData->appliedBytes += txnSize;
|
2020-06-29 03:33:07 +08:00
|
|
|
txnBatches++;
|
2020-02-14 07:39:50 +08:00
|
|
|
}
|
2020-02-13 13:45:29 +08:00
|
|
|
|
|
|
|
wait(waitForAll(fBatches));
|
|
|
|
|
2020-02-20 07:22:52 +08:00
|
|
|
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
2020-06-29 03:33:07 +08:00
|
|
|
.detail("StagingKeys", batchData->stagingKeys.size())
|
|
|
|
.detail("TransactionBatches", txnBatches);
|
2020-02-13 13:45:29 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-02-20 07:29:14 +08:00
|
|
|
// Write mutations to the destination DB
|
|
|
|
ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Reference<ApplierBatchData> batchData,
|
|
|
|
Database cx) {
|
2020-05-02 07:31:51 +08:00
|
|
|
TraceEvent("FastRestoreApplerPhaseApplyTxnStart", applierID).detail("BatchIndex", batchIndex);
|
2020-02-13 13:45:29 +08:00
|
|
|
wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx));
|
|
|
|
|
|
|
|
wait(applyStagingKeys(batchData, applierID, batchIndex, cx));
|
2020-08-01 08:48:55 +08:00
|
|
|
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID)
|
|
|
|
.detail("BatchIndex", batchIndex)
|
|
|
|
.detail("AppliedBytes", batchData->appliedBytes)
|
|
|
|
.detail("ReceivedBytes", batchData->receivedBytes);
|
2020-02-13 13:45:29 +08:00
|
|
|
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2019-08-02 08:00:13 +08:00
|
|
|
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
|
|
|
|
Database cx) {
|
2020-05-05 06:13:49 +08:00
|
|
|
TraceEvent("FastRestoreApplierPhaseHandleApplyToDBStart", self->id())
|
|
|
|
.detail("BatchIndex", req.batchIndex)
|
|
|
|
.detail("FinishedBatch", self->finishedBatch.get());
|
|
|
|
|
2020-01-28 10:13:20 +08:00
|
|
|
// Ensure batch (i-1) is applied before batch i
|
2020-06-06 07:40:19 +08:00
|
|
|
// TODO: Add a counter to warn when too many requests are waiting on the actor
|
2020-01-28 10:13:20 +08:00
|
|
|
wait(self->finishedBatch.whenAtLeast(req.batchIndex - 1));
|
2020-01-15 06:18:41 +08:00
|
|
|
|
2020-01-24 08:24:12 +08:00
|
|
|
state bool isDuplicated = true;
|
2020-01-28 10:13:20 +08:00
|
|
|
if (self->finishedBatch.get() == req.batchIndex - 1) {
|
2020-05-05 06:13:49 +08:00
|
|
|
Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
2020-08-18 13:20:54 +08:00
|
|
|
ASSERT(batchData.isValid());
|
2020-05-05 06:13:49 +08:00
|
|
|
TraceEvent("FastRestoreApplierPhaseHandleApplyToDBRunning", self->id())
|
|
|
|
.detail("BatchIndex", req.batchIndex)
|
|
|
|
.detail("FinishedBatch", self->finishedBatch.get())
|
|
|
|
.detail("HasStarted", batchData->dbApplier.present())
|
|
|
|
.detail("WroteToDBDone", batchData->dbApplier.present() ? batchData->dbApplier.get().isReady() : 0)
|
|
|
|
.detail("PreviousVersionBatchState", batchData->vbState.get());
|
|
|
|
|
2020-01-18 03:06:07 +08:00
|
|
|
ASSERT(batchData.isValid());
|
|
|
|
if (!batchData->dbApplier.present()) {
|
2020-01-22 06:49:13 +08:00
|
|
|
isDuplicated = false;
|
|
|
|
batchData->dbApplier = Never();
|
2020-02-20 07:29:14 +08:00
|
|
|
batchData->dbApplier = writeMutationsToDB(self->id(), req.batchIndex, batchData, cx);
|
2020-05-05 06:13:49 +08:00
|
|
|
batchData->vbState = ApplierVersionBatchState::WRITE_TO_DB;
|
2020-01-18 03:06:07 +08:00
|
|
|
}
|
2019-05-10 11:55:44 +08:00
|
|
|
|
2020-01-18 03:06:07 +08:00
|
|
|
ASSERT(batchData->dbApplier.present());
|
2020-06-06 08:41:03 +08:00
|
|
|
ASSERT(!batchData->dbApplier.get().isError()); // writeMutationsToDB actor cannot have error.
|
|
|
|
// We cannot blindly retry because it is not idempodent
|
2020-01-15 06:18:41 +08:00
|
|
|
|
2020-01-18 03:06:07 +08:00
|
|
|
wait(batchData->dbApplier.get());
|
2019-05-10 11:55:44 +08:00
|
|
|
|
2020-01-17 02:54:10 +08:00
|
|
|
// Multiple actor invokation can wait on req.batchIndex-1;
|
|
|
|
// Avoid setting finishedBatch when finishedBatch > req.batchIndex
|
2020-01-28 10:13:20 +08:00
|
|
|
if (self->finishedBatch.get() == req.batchIndex - 1) {
|
2020-01-18 03:06:07 +08:00
|
|
|
self->finishedBatch.set(req.batchIndex);
|
2020-08-18 13:20:54 +08:00
|
|
|
// self->batch[req.batchIndex]->vbState = ApplierVersionBatchState::DONE;
|
2020-05-05 06:13:49 +08:00
|
|
|
// Free memory for the version batch
|
|
|
|
self->batch.erase(req.batchIndex);
|
|
|
|
if (self->delayedActors > 0) {
|
|
|
|
self->checkMemory.trigger();
|
|
|
|
}
|
2020-01-18 03:06:07 +08:00
|
|
|
}
|
2020-01-17 02:54:10 +08:00
|
|
|
}
|
2020-02-27 06:35:03 +08:00
|
|
|
|
2020-01-22 06:49:13 +08:00
|
|
|
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
2019-05-10 11:55:44 +08:00
|
|
|
|
2020-05-05 06:13:49 +08:00
|
|
|
TraceEvent("FastRestoreApplierPhaseHandleApplyToDBDone", self->id())
|
2020-05-05 06:02:53 +08:00
|
|
|
.detail("BatchIndex", req.batchIndex)
|
|
|
|
.detail("FinishedBatch", self->finishedBatch.get())
|
2020-05-05 06:13:49 +08:00
|
|
|
.detail("IsDuplicated", isDuplicated);
|
2020-05-05 06:02:53 +08:00
|
|
|
|
2019-06-05 02:40:23 +08:00
|
|
|
return Void();
|
2020-02-13 13:45:29 +08:00
|
|
|
}
|
|
|
|
|
2020-02-14 03:06:20 +08:00
|
|
|
// Copy from WriteDuringRead.actor.cpp with small modifications
|
2020-02-14 03:06:29 +08:00
|
|
|
// Not all AtomicOps are handled in this function: SetVersionstampedKey, SetVersionstampedValue, and CompareAndClear
|
2020-02-13 13:45:29 +08:00
|
|
|
Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type) {
|
|
|
|
Arena arena;
|
2020-02-14 02:48:36 +08:00
|
|
|
if (type == MutationRef::AddValue)
|
2020-02-13 13:45:29 +08:00
|
|
|
return doLittleEndianAdd(existingValue, value, arena);
|
|
|
|
else if (type == MutationRef::AppendIfFits)
|
|
|
|
return doAppendIfFits(existingValue, value, arena);
|
2020-02-14 03:06:20 +08:00
|
|
|
else if (type == MutationRef::And || type == MutationRef::AndV2)
|
2020-02-13 13:45:29 +08:00
|
|
|
return doAndV2(existingValue, value, arena);
|
|
|
|
else if (type == MutationRef::Or)
|
|
|
|
return doOr(existingValue, value, arena);
|
|
|
|
else if (type == MutationRef::Xor)
|
|
|
|
return doXor(existingValue, value, arena);
|
|
|
|
else if (type == MutationRef::Max)
|
|
|
|
return doMax(existingValue, value, arena);
|
2020-02-14 03:06:20 +08:00
|
|
|
else if (type == MutationRef::Min || type == MutationRef::MinV2)
|
2020-02-13 13:45:29 +08:00
|
|
|
return doMinV2(existingValue, value, arena);
|
|
|
|
else if (type == MutationRef::ByteMin)
|
|
|
|
return doByteMin(existingValue, value, arena);
|
|
|
|
else if (type == MutationRef::ByteMax)
|
|
|
|
return doByteMax(existingValue, value, arena);
|
2020-02-14 01:24:53 +08:00
|
|
|
else {
|
2020-02-14 05:17:32 +08:00
|
|
|
TraceEvent(SevError, "ApplyAtomicOpUnhandledType")
|
|
|
|
.detail("TypeCode", (int)type)
|
2020-04-14 05:50:43 +08:00
|
|
|
.detail("TypeName", getTypeString(type));
|
2020-02-14 01:24:53 +08:00
|
|
|
ASSERT(false);
|
|
|
|
}
|
2020-02-13 13:45:29 +08:00
|
|
|
return Value();
|
2020-06-05 12:26:09 +08:00
|
|
|
}
|