FastRestore:Use NotifiedVersion to deduplicate requests

Add a NotifiedVersion into an applier data which represents
the smallest version the applier is at.

When a loader sends mutation vector to appliers, it sends
the request that contains prevVersion and commitVersion.

This commits also put actor into an actorCollector for
loop-choose-when situation.
This commit is contained in:
Meng Xu 2019-05-22 13:30:33 -07:00
parent f235bb7e0d
commit fac63a83c4
16 changed files with 295 additions and 617 deletions

View File

@ -26,7 +26,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
struct RestoreLoaderInterface;
struct RestoreApplierInterface;
struct RestoreMasterInterface;

View File

@ -71,7 +71,7 @@ set(FDBSERVER_SRCS
RestoreLoader.actor.h
RestoreLoader.actor.cpp
Restore.actor.cpp
RestoreWorkerInterface.h
RestoreWorkerInterface.actor.h
Resolver.actor.cpp
ResolverInterface.h
ServerDBInfo.h

View File

@ -41,7 +41,7 @@
#include "flow/ActorCollection.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreCommon.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/RestoreLoader.actor.h"
@ -643,4 +643,5 @@ ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> ccf, LocalityD
Database cx = Database::createDatabase(ccf->getFilename(), Database::API_VERSION_LATEST,locality);
wait(_restoreWorker(cx, locality));
return Void();
}
}

View File

@ -40,11 +40,13 @@ ACTOR Future<Void> handleSetApplierKeyRangeRequest(RestoreSetApplierKeyRangeRequ
ACTOR Future<Void> handleCalculateApplierKeyRangeRequest(RestoreCalculateApplierKeyRangeRequest req, Reference<RestoreApplierData> self);
ACTOR Future<Void> handleSendSampleMutationVectorRequest(RestoreSendMutationVectorRequest req, Reference<RestoreApplierData> self);
ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorRequest req, Reference<RestoreApplierData> self);
ACTOR Future<Void> handleSendMutationVectorVersionedRequest(RestoreSendMutationVectorVersionedRequest req, Reference<RestoreApplierData> self);
ACTOR Future<Void> handleApplyToDBRequest(RestoreSimpleRequest req, Reference<RestoreApplierData> self, Database cx);
ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, RestoreApplierInterface applierInterf, Database cx) {
state ActorCollection actors(false);
state Future<Void> exitRole = Never();
state double lastLoopTopTime;
loop {
double loopTopTime = now();
@ -60,28 +62,29 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
choose {
when ( RestoreSimpleRequest req = waitNext(applierInterf.heartbeat.getFuture()) ) {
requestTypeStr = "heartbeat";
wait(handleHeartbeat(req, applierInterf.id()));
actors.add(handleHeartbeat(req, applierInterf.id()));
}
when ( RestoreGetApplierKeyRangeRequest req = waitNext(applierInterf.getApplierKeyRangeRequest.getFuture()) ) {
requestTypeStr = "getApplierKeyRangeRequest";
wait(handleGetApplierKeyRangeRequest(req, self));
actors.add(handleGetApplierKeyRangeRequest(req, self));
}
when ( RestoreSetApplierKeyRangeRequest req = waitNext(applierInterf.setApplierKeyRangeRequest.getFuture()) ) {
requestTypeStr = "setApplierKeyRangeRequest";
wait(handleSetApplierKeyRangeRequest(req, self));
actors.add(handleSetApplierKeyRangeRequest(req, self));
}
when ( RestoreCalculateApplierKeyRangeRequest req = waitNext(applierInterf.calculateApplierKeyRange.getFuture()) ) {
requestTypeStr = "calculateApplierKeyRange";
wait(handleCalculateApplierKeyRangeRequest(req, self));
actors.add(handleCalculateApplierKeyRangeRequest(req, self));
}
when ( RestoreSendMutationVectorRequest req = waitNext(applierInterf.sendSampleMutationVector.getFuture()) ) {
requestTypeStr = "sendSampleMutationVector";
actors.add( handleSendSampleMutationVectorRequest(req, self));
}
when ( RestoreSendMutationVectorRequest req = waitNext(applierInterf.sendMutationVector.getFuture()) ) {
when ( RestoreSendMutationVectorVersionedRequest req = waitNext(applierInterf.sendMutationVector.getFuture()) ) {
requestTypeStr = "sendMutationVector";
actors.add( handleSendMutationVectorRequest(req, self) );
//actors.add( handleSendMutationVectorRequest(req, self) );
actors.add( handleSendMutationVectorVersionedRequest(req, self) );
}
when ( RestoreSimpleRequest req = waitNext(applierInterf.applyToDB.getFuture()) ) {
requestTypeStr = "applyToDB";
@ -89,17 +92,19 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
}
when ( RestoreVersionBatchRequest req = waitNext(applierInterf.initVersionBatch.getFuture()) ) {
requestTypeStr = "initVersionBatch";
wait(handleInitVersionBatchRequest(req, self));
actors.add(handleInitVersionBatchRequest(req, self));
}
when ( RestoreSimpleRequest req = waitNext(applierInterf.finishRestore.getFuture()) ) {
requestTypeStr = "finishRestore";
wait( handlerFinishRestoreRequest(req, self, cx) );
break;
exitRole = handlerFinishRestoreRequest(req, self, cx);
}
when ( RestoreSimpleRequest req = waitNext(applierInterf.collectRestoreRoleInterfaces.getFuture()) ) {
// NOTE: This must be after wait(configureRolesHandler()) because we must ensure all workers have registered their workerInterfaces into DB before we can read the workerInterface.
// TODO: Wait until all workers have registered their workerInterface.
wait( handleCollectRestoreRoleInterfaceRequest(req, self, cx) );
actors.add( handleCollectRestoreRoleInterfaceRequest(req, self, cx) );
}
when ( wait(exitRole) ) {
break;
}
}
} catch (Error &e) {
@ -112,7 +117,6 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
}
}
}
return Void();
}
@ -270,6 +274,60 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorRequ
return Void();
}
// ATTENTION: If a loader sends mutations of range and log files at the same time,
// Race condition may happen in this actor?
// MX: Maybe we won't have race condition even in the above situation because all actors run on 1 thread
// as long as we do not wait or yield when operate the shared data, it should be fine.
ACTOR Future<Void> handleSendMutationVectorVersionedRequest(RestoreSendMutationVectorVersionedRequest req, Reference<RestoreApplierData> self) {
state int numMutations = 0;
if ( debug_verbose ) {
// NOTE: Print out the current version and received req is helpful in debugging
printf("[VERBOSE_DEBUG] handleSendMutationVectorVersionedRequest Node:%s at rangeVersion:%ld logVersion:%ld receive mutation number:%d, req:%s\n",
self->describeNode().c_str(), self->rangeVersion.get(), self->logVersion.get(), req.mutations.size(), req.toString().c_str());
}
if ( req.isRangeFile ) {
wait( self->rangeVersion.whenAtLeast(req.prevVersion) );
} else {
wait( self->logVersion.whenAtLeast(req.prevVersion) );
}
// ASSUME: Log file is processed before range file. We do NOT mix range and log file.
//ASSERT_WE_THINK( self->rangeVersion.get() > 0 && req.isRangeFile );
if ( (req.isRangeFile && self->rangeVersion.get() == req.prevVersion) ||
(!req.isRangeFile && self->logVersion.get() == req.prevVersion) ) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!)
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
state Version commitVersion = req.version;
VectorRef<MutationRef> mutations(req.mutations);
printf("[DEBUG] Node:%s receive %d mutations at version:%ld\n", self->describeNode().c_str(), mutations.size(), commitVersion);
if ( self->kvOps.find(commitVersion) == self->kvOps.end() ) {
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
}
state int mIndex = 0;
for (mIndex = 0; mIndex < mutations.size(); mIndex++) {
MutationRef mutation = mutations[mIndex];
self->kvOps[commitVersion].push_back_deep(self->kvOps[commitVersion].arena(), mutation);
numMutations++;
//if ( numMutations % 100000 == 1 ) { // Should be different value in simulation and in real mode
printf("[INFO][Applier] Node:%s Receives %d mutations. cur_mutation:%s\n",
self->describeNode().c_str(), numMutations, mutation.toString().c_str());
//}
}
// Notify the same actor and unblock the request at the next version
if ( req.isRangeFile ) {
self->rangeVersion.set(req.version);
} else {
self->logVersion.set(req.version);
}
}
req.reply.send(RestoreCommonReply(self->id(), req.cmdID));
return Void();
}
ACTOR Future<Void> handleSendSampleMutationVectorRequest(RestoreSendMutationVectorRequest req, Reference<RestoreApplierData> self) {
state int numMutations = 0;
self->numSampledMutations = 0;

View File

@ -36,13 +36,16 @@
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "flow/actorcompiler.h" // has to be last include
extern double transactionBatchSizeThreshold;
struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreApplierData> {
NotifiedVersion rangeVersion; // All requests of mutations in range file below this version has been processed
NotifiedVersion logVersion; // All requests of mutations in log file below this version has been processed
// range2Applier is in master and loader node. Loader node uses this to determine which applier a mutation should be sent
std::map<Standalone<KeyRef>, UID> range2Applier; // KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Standalone<KeyRef>, int> keyOpsCount; // The number of operations per key which is used to determine the key-range boundary for appliers
@ -64,10 +67,13 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
nodeID = applierInterfID;
nodeIndex = assignedIndex;
// Q: Why do we need to initMetric?
//version.initMetric(LiteralStringRef("RestoreApplier.Version"), cc.id);
role = RestoreRole::Applier;
}
~RestoreApplierData() {}
~RestoreApplierData() = default;
std::string describeNode() {
std::stringstream ss;

View File

@ -41,6 +41,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLo
std::string fileName, int64_t readOffset_input, int64_t readLen_input,
KeyRange restoreRange, Key addPrefix, Key removePrefix);
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self);
ACTOR Future<Void> registerMutationsToApplierV2(Reference<RestoreLoaderData> self, bool isRangeFile, Version prevVersion, Version endVersion);
void parseSerializedMutation(Reference<RestoreLoaderData> self, bool isSampling);
bool isRangeMutation(MutationRef m);
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) ;
@ -48,6 +49,7 @@ void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mve
ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreLoaderInterface loaderInterf, Database cx) {
state ActorCollection actors(false);
state Future<Void> exitRole = Never();
state double lastLoopTopTime;
loop {
@ -64,7 +66,7 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
choose {
when ( RestoreSimpleRequest req = waitNext(loaderInterf.heartbeat.getFuture()) ) {
requestTypeStr = "heartbeat";
wait(handleHeartbeat(req, loaderInterf.id()));
actors.add(handleHeartbeat(req, loaderInterf.id()));
}
when ( RestoreLoadFileRequest req = waitNext(loaderInterf.sampleRangeFile.getFuture()) ) {
requestTypeStr = "sampleRangeFile";
@ -78,7 +80,7 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
}
when ( RestoreSetApplierKeyRangeVectorRequest req = waitNext(loaderInterf.setApplierKeyRangeVectorRequest.getFuture()) ) {
requestTypeStr = "setApplierKeyRangeVectorRequest";
wait(handleSetApplierKeyRangeVectorRequest(req, self));
actors.add(handleSetApplierKeyRangeVectorRequest(req, self));
}
when ( RestoreLoadFileRequest req = waitNext(loaderInterf.loadRangeFile.getFuture()) ) {
requestTypeStr = "loadRangeFile";
@ -93,22 +95,23 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
when ( RestoreVersionBatchRequest req = waitNext(loaderInterf.initVersionBatch.getFuture()) ) {
requestTypeStr = "initVersionBatch";
wait( handleInitVersionBatchRequest(req, self) );
actors.add( handleInitVersionBatchRequest(req, self) );
}
when ( RestoreSimpleRequest req = waitNext(loaderInterf.finishRestore.getFuture()) ) {
requestTypeStr = "finishRestore";
wait( handlerFinishRestoreRequest(req, self, cx) );
break;
exitRole = handlerFinishRestoreRequest(req, self, cx);
}
// TODO: To modify the following when conditions
when ( RestoreSimpleRequest req = waitNext(loaderInterf.collectRestoreRoleInterfaces.getFuture()) ) {
// Step: Find other worker's workerInterfaces
// NOTE: This must be after wait(configureRolesHandler()) because we must ensure all workers have registered their workerInterfaces into DB before we can read the workerInterface.
// TODO: Wait until all workers have registered their workerInterface.
wait( handleCollectRestoreRoleInterfaceRequest(req, self, cx) );
actors.add( handleCollectRestoreRoleInterfaceRequest(req, self, cx) );
}
when ( wait(exitRole) ) {
break;
}
}
} catch (Error &e) {
fprintf(stdout, "[ERROR] Restore Loader handle received request:%s error. error code:%d, error message:%s\n",
requestTypeStr.c_str(), e.code(), e.what());
@ -119,7 +122,6 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
}
}
}
return Void();
}
@ -228,10 +230,10 @@ ACTOR Future<Void> handleLoadRangeFileRequest(RestoreLoadFileRequest req, Refere
if ( isSampling ) {
wait( registerMutationsToMasterApplier(self) );
} else {
wait( registerMutationsToApplier(self) ); // Send the parsed mutation to applier who will apply the mutation to DB
wait( registerMutationsToApplierV2(self, true, req.param.prevVersion, req.param.endVersion) ); // Send the parsed mutation to applier who will apply the mutation to DB
}
wait ( delay(1.0) );
// wait ( delay(1.0) );
if ( !isSampling ) {
self->processedFiles[param.filename] = 1;
@ -333,17 +335,23 @@ ACTOR Future<Void> handleLoadLogFileRequest(RestoreLoadFileRequest req, Referenc
if ( isSampling ) {
wait( registerMutationsToMasterApplier(self) );
} else {
wait( registerMutationsToApplier(self) ); // Send the parsed mutation to applier who will apply the mutation to DB
wait( registerMutationsToApplierV2(self, false, req.param.prevVersion, req.param.endVersion) ); // Send the parsed mutation to applier who will apply the mutation to DB
}
req.reply.send(RestoreCommonReply(self->id(), req.cmdID)); // master node is waiting
// TODO: NOTE: If we parse log file, the DB status will be incorrect.
if ( !isSampling ) {
self->processedFiles[param.filename] = 1;
}
self->processedCmd[req.cmdID] = 1;
self->clearInProgressFlag(cmdType);
printf("[INFO][Loader] Node:%s CMDUID:%s clear inProgressFlag :%lx for Assign_Log_Range_File.\n",
self->describeNode().c_str(), req.cmdID.toString().c_str(), self->inProgressFlag);
//Send ack to master that loader has finished loading the data
printf("[INFO][Loader] Node:%s CMDUID:%s send ack.\n",
self->describeNode().c_str(), self->cmdID.toString().c_str());
req.reply.send(RestoreCommonReply(self->id(), req.cmdID)); // master node is waiting
return Void();
}
@ -444,6 +452,7 @@ ACTOR Future<Void> registerMutationsToMasterApplier(Reference<RestoreLoaderData>
// TODO: ATTENTION: Different loaders may generate the same CMDUID, which may let applier miss some mutations
/*
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self) {
printf("[INFO][Loader] Node:%s self->masterApplierInterf:%s, registerMutationsToApplier\n",
self->describeNode().c_str(), self->masterApplierInterf.toString().c_str());
@ -602,6 +611,176 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
return Void();
}
*/
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self) {
return Void();
}
ACTOR Future<Void> registerMutationsToApplierV2(Reference<RestoreLoaderData> self, bool isRangeFile, Version startVersion, Version endVersion) {
printf("[INFO][Loader] Node:%s self->masterApplierInterf:%s, registerMutationsToApplier\n",
self->describeNode().c_str(), self->masterApplierInterf.toString().c_str());
state int packMutationNum = 0;
state int packMutationThreshold = 10;
state int kvCount = 0;
state std::vector<Future<RestoreCommonReply>> cmdReplies;
state int splitMutationIndex = 0;
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
if ( self->kvOps.find(endVersion) == self->kvOps.end() ) {
self->kvOps[endVersion] = VectorRef<MutationRef>();
}
self->printAppliersKeyRange();
//state double mutationVectorThreshold = 1;//1024 * 10; // Bytes.
state std::map<UID, Standalone<VectorRef<MutationRef>>> applierMutationsBuffer; // The mutation vector to be sent to each applier
state std::map<UID, double> applierMutationsSize; // buffered mutation vector size for each applier
state Standalone<VectorRef<MutationRef>> mvector;
state Standalone<VectorRef<UID>> nodeIDs;
// Initialize the above two maps
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
state std::map<UID, RestoreSendMutationVectorVersionedRequest> requestsToAppliers;
state Version prevVersion = startVersion;
loop {
try {
packMutationNum = 0;
splitMutationIndex = 0;
kvCount = 0;
state std::map<Version, Standalone<VectorRef<MutationRef>>>::iterator kvOp;
// MX: NEED TO A WAY TO GENERATE NON_DUPLICATE CMDUID across loaders
self->cmdID.setPhase(RestoreCommandEnum::Loader_Send_Mutations_To_Applier); //MX: THIS MAY BE WRONG! CMDID may duplicate across loaders
for ( kvOp = self->kvOps.begin(); kvOp != self->kvOps.end(); kvOp++) {
// In case try-catch has error and loop back
applierMutationsBuffer.clear();
applierMutationsSize.clear();
for (auto &applierID : applierIDs) {
applierMutationsBuffer[applierID] = Standalone<VectorRef<MutationRef>>(VectorRef<MutationRef>());
applierMutationsSize[applierID] = 0.0;
}
state Version commitVersion = kvOp->first;
state int mIndex;
state MutationRef kvm;
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
kvm = kvOp->second[mIndex];
if ( debug_verbose ) {
printf("[VERBOSE_DEBUG] mutation to sent to applier, mutation:%s\n", kvm.toString().c_str());
}
// Send the mutation to applier
if ( isRangeMutation(kvm) ) { // MX: Use false to skip the range mutation handling
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
mvector.pop_front(mvector.size());
nodeIDs.pop_front(nodeIDs.size());
//state std::map<Standalone<MutationRef>, UID> m2appliers;
// '' Bug may be here! The splitMutation() may be wrong!
splitMutation(self, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
// m2appliers = splitMutationv2(self, kvm);
// // convert m2appliers to mvector and nodeIDs
// for (auto& m2applier : m2appliers) {
// mvector.push_back(m2applier.first);
// nodeIDs.push_back(m2applier.second);
// }
printf("SPLITMUTATION: mvector.size:%d\n", mvector.size());
ASSERT(mvector.size() == nodeIDs.size());
for (splitMutationIndex = 0; splitMutationIndex < mvector.size(); splitMutationIndex++ ) {
MutationRef mutation = mvector[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
printf("SPLITTED MUTATION: %d: mutation:%s applierID:%s\n", splitMutationIndex, mutation.toString().c_str(), applierID.toString().c_str());
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation); // Q: Maybe push_back_deep()?
applierMutationsSize[applierID] += mutation.expectedSize();
kvCount++;
}
// for (auto &applierID : applierIDs) {
// if ( applierMutationsSize[applierID] >= mutationVectorThreshold ) {
// state int tmpNumMutations = applierMutationsBuffer[applierID].size();
// self->cmdID.nextCmd();
// cmdReplies.push_back(self->appliersInterf[applierID].sendMutationVector.getReply(
// RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID])));
// applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
// applierMutationsSize[applierID] = 0;
// printf("[INFO][Loader] Waits for applier:%s to receive %ld range mutations\n", applierID.toString().c_str(), tmpNumMutations);
// std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
// cmdReplies.clear();
// }
// }
} else { // mutation operates on a particular key
std::map<Standalone<KeyRef>, UID>::iterator itlow = self->range2Applier.lower_bound(kvm.param1); // lower_bound returns the iterator that is >= m.param1
// make sure itlow->first <= m.param1
if ( itlow == self->range2Applier.end() || itlow->first > kvm.param1 ) {
if ( itlow == self->range2Applier.begin() ) {
printf("KV-Applier: SHOULD NOT HAPPEN. kvm.param1:%s\n", kvm.param1.toString().c_str());
}
--itlow;
}
ASSERT( itlow->first <= kvm.param1 );
MutationRef mutation = kvm;
UID applierID = itlow->second;
printf("KV--Applier: K:%s ApplierID:%s\n", kvm.param1.toString().c_str(), applierID.toString().c_str());
kvCount++;
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation); // Q: Maybe push_back_deep()?
applierMutationsSize[applierID] += mutation.expectedSize();
// if ( applierMutationsSize[applierID] >= mutationVectorThreshold ) {
// self->cmdID.nextCmd();
// cmdReplies.push_back(self->appliersInterf[applierID].sendMutationVector.getReply(
// RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID])));
// printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", applierMutationsBuffer[applierID].size());
// applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
// applierMutationsSize[applierID] = 0;
// std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
// cmdReplies.clear();
// }
}
} // Mutations at the same version
// In case the mutation vector is not larger than mutationVectorThreshold
// We must send out the leftover mutations any way; otherwise, the mutations at different versions will be mixed together
printf("[DEBUG][Loader] sendMutationVector send mutations at Version:%ld to appliers, applierIDs.size:%d\n", commitVersion, applierIDs.size());
for (auto &applierID : applierIDs) {
printf("[DEBUG][Loader] sendMutationVector size:%d for applierID:%s\n", applierMutationsBuffer[applierID].size(), applierID.toString().c_str());
self->cmdID.nextCmd(); // no-use
requestsToAppliers[applierID] = RestoreSendMutationVectorVersionedRequest(self->cmdID, prevVersion, commitVersion, isRangeFile, applierMutationsBuffer[applierID]);
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0;
//std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) ); // Q: We need to wait for each reply, otherwise, correctness has error. Why?
//cmdReplies.clear();
}
wait( getBatchReplies(&RestoreApplierInterface::sendMutationVector, self->appliersInterf, requestsToAppliers) );
requestsToAppliers.clear();
ASSERT( prevVersion < commitVersion );
prevVersion = commitVersion;
} // all versions of mutations
// if (!cmdReplies.empty()) {
// printf("[INFO][Loader] Last Waits for applier to receive %ld range mutations\n", cmdReplies.size());
// std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
// //std::vector<RestoreCommonReply> reps = wait( getAll(cmdReplies) );
// cmdReplies.clear();
// }
printf("[Summary][Loader] Node:%s Last CMDUID:%s produces %d mutation operations\n",
self->describeNode().c_str(), self->cmdID.toString().c_str(), kvCount);
self->kvOps.clear();
break;
} catch (Error &e) {
fprintf(stdout, "[ERROR] registerMutationsToApplier Node:%s, Commands before cmdID:%s error. error code:%d, error message:%s\n", self->describeNode().c_str(),
self->cmdID.toString().c_str(), e.code(), e.what());
}
};
return Void();
}
// std::map<Standalone<MutationRef>, UID> splitMutationv2(Reference<RestoreLoaderData> self, MutationRef m) {
// std::map<Standalone<MutationRef>, UID> m2appliers;

View File

@ -38,7 +38,7 @@
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreCommon.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "fdbclient/BackupContainer.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -333,6 +333,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
state int curFileIndex = 0; // The smallest index of the files that has not been FULLY loaded
state long curOffset = 0;
state bool allLoadReqsSent = false;
state Version prevVersion = 0; // Start version for range or log file is 0
loop {
try {
if ( allLoadReqsSent ) {
@ -360,6 +361,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
}
LoadingParam param;
//self->files[curFileIndex].cursor = 0; // This is a hacky way to make sure cursor is correct in current version when we load 1 file at a time
// MX: May Need to specify endVersion as well because the
param.url = request.url;
param.version = self->files[curFileIndex].version;
param.filename = self->files[curFileIndex].fileName;
@ -372,6 +374,7 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
param.addPrefix = addPrefix;
param.removePrefix = removePrefix;
param.mutationLogPrefix = mutationLogPrefix;
if ( !(param.length > 0 && param.offset >= 0 && param.offset < self->files[curFileIndex].fileSize) ) {
printf("[ERROR] param: length:%ld offset:%ld fileSize:%ld for %ldth filename:%s\n",
param.length, param.offset, self->files[curFileIndex].fileSize, curFileIndex,
@ -386,9 +389,11 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
if (self->files[curFileIndex].isRange) {
cmdType = RestoreCommandEnum::Assign_Loader_Range_File;
self->cmdID.setPhase(RestoreCommandEnum::Assign_Loader_Range_File);
} else {
cmdType = RestoreCommandEnum::Assign_Loader_Log_File;
self->cmdID.setPhase(RestoreCommandEnum::Assign_Loader_Log_File);
}
if ( (phaseType == RestoreCommandEnum::Assign_Loader_Log_File && self->files[curFileIndex].isRange)
@ -398,6 +403,9 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
curOffset = 0;
} else { // load the type of file in the phaseType
self->cmdID.nextCmd();
param.prevVersion = prevVersion;
prevVersion = self->files[curFileIndex].isRange ? self->files[curFileIndex].version : self->files[curFileIndex].endVersion;
param.endVersion = prevVersion;
printf("[CMD] Loading fileIndex:%ld fileInfo:%s loadingParam:%s on node %s\n",
curFileIndex, self->files[curFileIndex].toString().c_str(),
param.toString().c_str(), loaderID.toString().c_str()); // VERY USEFUL INFO
@ -853,7 +861,7 @@ ACTOR Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequests(Datab
wait(tr.onError(e));
}
}
return restoreRequests;
}
@ -973,33 +981,6 @@ ACTOR static Future<Void> _clearDB(Reference<ReadYourWritesTransaction> tr) {
return Void();
}
// Send each request in requests via channel of the request's interface
// The UID in a request is the UID of the interface to handle the request
ACTOR template <class Interface, class Request>
//Future< REPLY_TYPE(Request) >
Future<Void> getBatchReplies(
RequestStream<Request> Interface::* channel,
std::map<UID, Interface> interfaces,
std::map<UID, Request> requests) {
loop{
try {
std::vector<Future<REPLY_TYPE(Request)>> cmdReplies;
for(auto& request : requests) {
RequestStream<Request> const* stream = & (interfaces[request.first].*channel);
cmdReplies.push_back( stream->getReply(request.second) );
}
std::vector<REPLY_TYPE(Request)> reps = wait( timeoutError(getAll(cmdReplies), FastRestore_Failure_Timeout) );
break;
} catch (Error &e) {
fprintf(stdout, "Error code:%d, error message:%s\n", e.code(), e.what());
}
}
return Void();
}
ACTOR Future<Void> initializeVersionBatch(Reference<RestoreMasterData> self) {
self->cmdID.initPhase(RestoreCommandEnum::Reset_VersionBatch);

View File

@ -36,7 +36,7 @@
#include "fdbrpc/Locality.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -37,7 +37,7 @@ std::string getRoleStr(RestoreRole role) {
// CMDUID implementation
void CMDUID::initPhase(RestoreCommandEnum newPhase) {
printf("CMDID, current phase:%d, new phase:%d\n", phase, newPhase);
//printf("CMDID, current phase:%d, new phase:%d\n", phase, newPhase);
phase = (uint16_t) newPhase;
cmdID = 0;
}

View File

@ -1,552 +0,0 @@
/*
* RestoreWorkerInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file declare and define the interface for RestoreWorker and restore roles
// which are RestoreMaster, RestoreLoader, and RestoreApplier
#ifndef FDBSERVER_RESTORE_WORKER_INTERFACE_H
#define FDBSERVER_RESTORE_WORKER_INTERFACE_H
#pragma once
#include <sstream>
#include "flow/Stats.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbrpc/Locality.h"
#include "fdbserver/RestoreUtil.h"
//#include "fdbserver/RestoreRoleCommon.actor.h"
#include "flow/actorcompiler.h" // has to be last include
#define DUMPTOKEN( name ) TraceEvent("DumpToken", recruited.id()).detail("Name", #name).detail("Token", name.getEndpoint().token)
class RestoreConfig;
// Timeout threshold in seconds for restore commands
extern int FastRestore_Failure_Timeout;
struct RestoreCommonReply;
struct GetKeyRangeReply;
struct GetKeyRangeReply;
struct RestoreRecruitRoleRequest;
struct RestoreLoadFileRequest;
struct RestoreGetApplierKeyRangeRequest;
struct RestoreSetApplierKeyRangeRequest;
struct GetKeyRangeNumberReply;
struct RestoreVersionBatchRequest;
struct RestoreCalculateApplierKeyRangeRequest;
struct RestoreSendMutationVectorRequest;
struct RestoreSetApplierKeyRangeVectorRequest;
struct RestoreWorkerInterface {
UID interfID;
RequestStream<RestoreSimpleRequest> heartbeat;
RequestStream<RestoreRecruitRoleRequest> recruitRole;
RequestStream<RestoreSimpleRequest> terminateWorker;
bool operator == (RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator != (RestoreWorkerInterface const& r) const { return id() != r.id(); }
UID id() const { return interfID; } //cmd.getEndpoint().token;
NetworkAddress address() const { return recruitRole.getEndpoint().addresses.address; }
void initEndpoints() {
heartbeat.getEndpoint( TaskClusterController );
recruitRole.getEndpoint( TaskClusterController );// Q: Why do we need this?
terminateWorker.getEndpoint( TaskClusterController );
interfID = g_random->randomUniqueID();
}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, interfID, heartbeat, recruitRole, terminateWorker);
}
};
struct RestoreRoleInterface {
public:
RestoreRole role;
RestoreRoleInterface() {
role = RestoreRole::Invalid;
}
};
struct RestoreLoaderInterface : RestoreRoleInterface {
public:
UID nodeID;
RequestStream<RestoreSimpleRequest> heartbeat;
RequestStream<RestoreLoadFileRequest> sampleRangeFile;
RequestStream<RestoreLoadFileRequest> sampleLogFile;
RequestStream<RestoreSetApplierKeyRangeVectorRequest> setApplierKeyRangeVectorRequest;
RequestStream<RestoreLoadFileRequest> loadRangeFile;
RequestStream<RestoreLoadFileRequest> loadLogFile;
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
RequestStream<RestoreSimpleRequest> collectRestoreRoleInterfaces; // TODO: Change to collectRestoreRoleInterfaces
RequestStream<RestoreSimpleRequest> finishRestore;
bool operator == (RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator != (RestoreWorkerInterface const& r) const { return id() != r.id(); }
RestoreLoaderInterface () {
nodeID = g_random->randomUniqueID();
}
UID id() const { return nodeID; }
NetworkAddress address() const { return heartbeat.getEndpoint().addresses.address; }
void initEndpoints() {
heartbeat.getEndpoint( TaskClusterController );
sampleRangeFile.getEndpoint( TaskClusterController );
sampleLogFile.getEndpoint( TaskClusterController );
setApplierKeyRangeVectorRequest.getEndpoint( TaskClusterController );
loadRangeFile.getEndpoint( TaskClusterController );
loadLogFile.getEndpoint( TaskClusterController );
initVersionBatch.getEndpoint( TaskClusterController );
collectRestoreRoleInterfaces.getEndpoint( TaskClusterController );
finishRestore.getEndpoint( TaskClusterController );
}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, nodeID, heartbeat, sampleRangeFile, sampleLogFile,
setApplierKeyRangeVectorRequest, loadRangeFile, loadLogFile,
initVersionBatch, collectRestoreRoleInterfaces, finishRestore);
}
};
struct RestoreApplierInterface : RestoreRoleInterface {
public:
UID nodeID;
RequestStream<RestoreSimpleRequest> heartbeat;
RequestStream<RestoreCalculateApplierKeyRangeRequest> calculateApplierKeyRange;
RequestStream<RestoreGetApplierKeyRangeRequest> getApplierKeyRangeRequest;
RequestStream<RestoreSetApplierKeyRangeRequest> setApplierKeyRangeRequest;
RequestStream<RestoreSendMutationVectorRequest> sendSampleMutationVector;
RequestStream<RestoreSendMutationVectorRequest> sendMutationVector;
RequestStream<RestoreSimpleRequest> applyToDB;
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
RequestStream<RestoreSimpleRequest> collectRestoreRoleInterfaces;
RequestStream<RestoreSimpleRequest> finishRestore;
bool operator == (RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator != (RestoreWorkerInterface const& r) const { return id() != r.id(); }
RestoreApplierInterface() {
nodeID = g_random->randomUniqueID();
}
UID id() const { return nodeID; }
NetworkAddress address() const { return heartbeat.getEndpoint().addresses.address; }
void initEndpoints() {
heartbeat.getEndpoint( TaskClusterController );
calculateApplierKeyRange.getEndpoint( TaskClusterController );
getApplierKeyRangeRequest.getEndpoint( TaskClusterController );
setApplierKeyRangeRequest.getEndpoint( TaskClusterController );
sendSampleMutationVector.getEndpoint( TaskClusterController );
sendMutationVector.getEndpoint( TaskClusterController );
applyToDB.getEndpoint( TaskClusterController );
initVersionBatch.getEndpoint( TaskClusterController );
collectRestoreRoleInterfaces.getEndpoint( TaskClusterController );
finishRestore.getEndpoint( TaskClusterController );
}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, nodeID, heartbeat, calculateApplierKeyRange,
getApplierKeyRangeRequest, setApplierKeyRangeRequest,
sendSampleMutationVector, sendMutationVector,
applyToDB, initVersionBatch, collectRestoreRoleInterfaces, finishRestore);
}
std::string toString() {
return nodeID.toString();
}
};
struct LoadingParam {
Key url;
Version version;
std::string filename;
int64_t offset;
int64_t length;
int64_t blockSize;
KeyRange restoreRange;
Key addPrefix;
Key removePrefix;
Key mutationLogPrefix;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, url, version, filename, offset, length, blockSize, restoreRange, addPrefix, removePrefix, mutationLogPrefix);
//ar & url & version & filename & offset & length & blockSize & restoreRange & addPrefix & removePrefix & mutationLogPrefix;
}
std::string toString() {
std::stringstream str;
str << "url:" << url.toString() << "version:" << version
<< " filename:" << filename << " offset:" << offset << " length:" << length << " blockSize:" << blockSize
<< " restoreRange:" << restoreRange.toString()
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
return str.str();
}
};
struct RestoreRecruitRoleRequest : TimedRequest {
CMDUID cmdID;
RestoreRole role;
int nodeIndex; // Each role is a node
ReplyPromise<RestoreCommonReply> reply;
RestoreRecruitRoleRequest() : cmdID(CMDUID()), role(RestoreRole::Invalid) {}
explicit RestoreRecruitRoleRequest(CMDUID cmdID, RestoreRole role, int nodeIndex) :
cmdID(cmdID), role(role), nodeIndex(nodeIndex){}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, role, nodeIndex, reply);
}
std::string printable() {
std::stringstream ss;
ss << "CMDID:" << cmdID.toString() << " Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex;
return ss.str();
}
};
// Sample_Range_File and Assign_Loader_Range_File, Assign_Loader_Log_File
struct RestoreLoadFileRequest : TimedRequest {
CMDUID cmdID;
LoadingParam param;
ReplyPromise<RestoreCommonReply> reply;
RestoreLoadFileRequest() : cmdID(CMDUID()) {}
explicit RestoreLoadFileRequest(CMDUID cmdID, LoadingParam param) : cmdID(cmdID), param(param) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, param, reply);
}
};
struct RestoreSendMutationVectorRequest : TimedRequest {
CMDUID cmdID;
uint64_t commitVersion;
Standalone<VectorRef<MutationRef>> mutations;
ReplyPromise<RestoreCommonReply> reply;
RestoreSendMutationVectorRequest() : cmdID(CMDUID()), commitVersion(0), mutations(VectorRef<MutationRef>()) {}
explicit RestoreSendMutationVectorRequest(CMDUID cmdID, uint64_t commitVersion, VectorRef<MutationRef> mutations) : cmdID(cmdID), commitVersion(commitVersion), mutations(mutations) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, commitVersion, mutations, reply);
}
};
struct RestoreCalculateApplierKeyRangeRequest : TimedRequest {
CMDUID cmdID;
int numAppliers;
ReplyPromise<GetKeyRangeNumberReply> reply;
RestoreCalculateApplierKeyRangeRequest() : cmdID(CMDUID()), numAppliers(0) {}
explicit RestoreCalculateApplierKeyRangeRequest(CMDUID cmdID, int numAppliers) : cmdID(cmdID), numAppliers(numAppliers) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, numAppliers, reply);
}
};
struct RestoreVersionBatchRequest : TimedRequest {
CMDUID cmdID;
int batchID;
ReplyPromise<RestoreCommonReply> reply;
RestoreVersionBatchRequest() : cmdID(CMDUID()), batchID(0) {}
explicit RestoreVersionBatchRequest(CMDUID cmdID, int batchID) : cmdID(cmdID), batchID(batchID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, batchID, reply);
}
};
struct RestoreGetApplierKeyRangeRequest : TimedRequest {
CMDUID cmdID;
int applierIndex; // The applier ID whose key range will be replied // TODO: Maybe change to use applier's UID
ReplyPromise<GetKeyRangeReply> reply;
RestoreGetApplierKeyRangeRequest() : cmdID(CMDUID()), applierIndex(0) {}
explicit RestoreGetApplierKeyRangeRequest(CMDUID cmdID, int applierIndex) : cmdID(cmdID), applierIndex(applierIndex) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, applierIndex, reply);
}
};
// Notify the server node about the key range the applier node (nodeID) is responsible for
struct RestoreSetApplierKeyRangeRequest : TimedRequest {
CMDUID cmdID;
UID applierID;
KeyRange range; // the key range that will be assigned to the node
ReplyPromise<RestoreCommonReply> reply;
RestoreSetApplierKeyRangeRequest() : cmdID(CMDUID()), applierID(UID()), range(KeyRange()) {}
explicit RestoreSetApplierKeyRangeRequest(CMDUID cmdID, UID applierID, KeyRange range) : cmdID(cmdID), applierID(applierID), range(range) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, applierID, range, reply);
}
};
struct RestoreSetApplierKeyRangeVectorRequest : TimedRequest {
CMDUID cmdID;
VectorRef<UID> applierIDs;
VectorRef<KeyRange> ranges; // the key range that will be assigned to the node
ReplyPromise<RestoreCommonReply> reply;
RestoreSetApplierKeyRangeVectorRequest() : cmdID(CMDUID()), applierIDs(VectorRef<UID>()), ranges(VectorRef<KeyRange>()) {}
explicit RestoreSetApplierKeyRangeVectorRequest(CMDUID cmdID, VectorRef<UID> applierIDs, VectorRef<KeyRange> ranges) : cmdID(cmdID), applierIDs(applierIDs), ranges(ranges) { ASSERT(applierIDs.size() == ranges.size()); }
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, applierIDs, ranges, reply);
}
};
struct GetKeyRangeReply : RestoreCommonReply {
int index;
Standalone<KeyRef> lowerBound; // inclusive
Standalone<KeyRef> upperBound; // exclusive
GetKeyRangeReply() : index(0), lowerBound(KeyRef()), upperBound(KeyRef()) {}
explicit GetKeyRangeReply(int index, KeyRef lowerBound, KeyRef upperBound) : index(index), lowerBound(lowerBound), upperBound(upperBound) {}
explicit GetKeyRangeReply(UID id, CMDUID cmdID, int index, KeyRef lowerBound, KeyRef upperBound) :
RestoreCommonReply(id, cmdID), index(index), lowerBound(lowerBound), upperBound(upperBound) {}
// explicit GetKeyRangeReply(UID id, CMDUID cmdID) :
// RestoreCommonReply(id, cmdID) {}
std::string toString() const {
std::stringstream ss;
ss << "ServerNodeID:" << id.toString() << " CMDID:" << cmdID.toString()
<< " index:" << std::to_string(index) << " lowerBound:" << lowerBound.toHexString()
<< " upperBound:" << upperBound.toHexString();
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, *(RestoreCommonReply *) this, index, lowerBound, upperBound);
}
};
struct GetKeyRangeNumberReply : RestoreCommonReply {
int keyRangeNum;
GetKeyRangeNumberReply() : keyRangeNum(0) {}
explicit GetKeyRangeNumberReply(int keyRangeNum) : keyRangeNum(keyRangeNum) {}
explicit GetKeyRangeNumberReply(UID id, CMDUID cmdID) : RestoreCommonReply(id, cmdID) {}
std::string toString() const {
std::stringstream ss;
ss << "ServerNodeID:" << id.toString() << " CMDID:" << cmdID.toString()
<< " keyRangeNum:" << std::to_string(keyRangeNum);
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, *(RestoreCommonReply *) this, keyRangeNum);
}
};
struct RestoreRequest {
//Database cx;
int index;
Key tagName;
Key url;
bool waitForComplete;
Version targetVersion;
bool verbose;
KeyRange range;
Key addPrefix;
Key removePrefix;
bool lockDB;
UID randomUid;
int testData;
std::vector<int> restoreRequests;
//Key restoreTag;
ReplyPromise< struct RestoreReply > reply;
RestoreRequest() : testData(0) {}
explicit RestoreRequest(int testData) : testData(testData) {}
explicit RestoreRequest(int testData, std::vector<int> &restoreRequests) : testData(testData), restoreRequests(restoreRequests) {}
explicit RestoreRequest(const int index, const Key &tagName, const Key &url, bool waitForComplete, Version targetVersion, bool verbose,
const KeyRange &range, const Key &addPrefix, const Key &removePrefix, bool lockDB,
const UID &randomUid) : index(index), tagName(tagName), url(url), waitForComplete(waitForComplete),
targetVersion(targetVersion), verbose(verbose), range(range),
addPrefix(addPrefix), removePrefix(removePrefix), lockDB(lockDB),
randomUid(randomUid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, index , tagName , url , waitForComplete , targetVersion , verbose , range , addPrefix , removePrefix , lockDB , randomUid ,
testData , restoreRequests , reply);
}
std::string toString() const {
std::stringstream ss;
ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString() << " url:" << url.contents().toString()
<< " waitForComplete:" << std::to_string(waitForComplete) << " targetVersion:" << std::to_string(targetVersion)
<< " verbose:" << std::to_string(verbose) << " range:" << range.toString() << " addPrefix:" << addPrefix.contents().toString()
<< " removePrefix:" << removePrefix.contents().toString() << " lockDB:" << std::to_string(lockDB) << " randomUid:" << randomUid.toString();
return ss.str();
}
};
struct RestoreReply {
int replyData;
RestoreReply() : replyData(0) {}
explicit RestoreReply(int replyData) : replyData(replyData) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, replyData);
}
};
std::string getRoleStr(RestoreRole role);
struct RestoreNodeStatus {
// ConfigureKeyRange is to determine how to split the key range and apply the splitted key ranges to appliers
// NotifyKeyRange is to notify the Loaders and Appliers about the key range each applier is responsible for
// Loading is to notify all Loaders to load the backup data and send the mutation to appliers
// Applying is to notify appliers to apply the aggregated mutations to DB
// Done is to notify the test workload (or user) that we have finished restore
enum class MasterState {Invalid = -1, Ready, ConfigureRoles, Sampling, ConfigureKeyRange, NotifyKeyRange, Loading, Applying, Done};
enum class LoaderState {Invalid = -1, Ready, Sampling, LoadRange, LoadLog, Done};
enum class ApplierState {Invalid = -1, Ready, Aggregating, ApplyToDB, Done};
UID nodeID;
int nodeIndex; // The continuous number to indicate which worker it is. It is an alias for nodeID
RestoreRole role;
MasterState masterState;
LoaderState loaderState;
ApplierState applierState;
double lastStart; // The most recent start time. now() - lastStart = execution time
double totalExecTime; // The total execution time.
double lastSuspend; // The most recent time when the process stops exeuction
double processedDataSize; // The size of all data processed so far
RestoreNodeStatus() : nodeID(UID()), role(RestoreRole::Invalid),
masterState(MasterState::Invalid), loaderState(LoaderState::Invalid), applierState(ApplierState::Invalid),
lastStart(0), totalExecTime(0), lastSuspend(0) {}
std::string toString() {
std::stringstream str;
str << "nodeID:" << nodeID.toString() << " role:" << getRoleStr(role)
<< " masterState:" << (int) masterState << " loaderState:" << (int) loaderState << " applierState:" << (int) applierState
<< " lastStart:" << lastStart << " totalExecTime:" << totalExecTime << " lastSuspend:" << lastSuspend;
return str.str();
}
void init(RestoreRole newRole) {
role = newRole;
if ( newRole == RestoreRole::Loader ) {
loaderState = LoaderState::Ready;
} else if ( newRole == RestoreRole::Applier) {
applierState = ApplierState::Ready;
} else if ( newRole == RestoreRole::Master) {
masterState = MasterState::Ready;
}
lastStart = 0;
totalExecTime = 0;
lastSuspend = 0;
}
};
////--- Interface functions
Future<Void> _restoreWorker(Database const& cx, LocalityData const& locality);
Future<Void> restoreWorker(Reference<ClusterConnectionFile> const& ccf, LocalityData const& locality);
#include "flow/unactorcompiler.h"
#endif

View File

@ -33,7 +33,7 @@
#include "fdbclient/FailureMonitorClient.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/MoveKeys.actor.h"

View File

@ -218,7 +218,9 @@
<ActorCompiler Include="RestoreCommon.actor.h">
<EnableCompile>false</EnableCompile>
</ActorCompiler>
<ClInclude Include="RestoreWorkerInterface.h" />
<ActorCompiler Include="RestoreWorkerInterface.actor.h">
<EnableCompile>false</EnableCompile>
</ActorCompiler>
<ClInclude Include="ServerDBInfo.h" />
<ClInclude Include="SimulatedCluster.h" />
<ClInclude Include="sqlite\btree.h" />

View File

@ -23,7 +23,7 @@
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -163,17 +163,20 @@ struct CycleWorkload : TestWorkload {
}
if (data[i].key != key(i)) {
TraceEvent(SevError, "TestFailure").detail("Reason", "Key changed").detail("KeyPrefix", keyPrefix.printable());
logTestData(data);
return false;
}
double d = testKeyToDouble(data[i].value, keyPrefix);
i = (int)d;
if ( i != d || i<0 || i>=nodeCount) {
TraceEvent(SevError, "TestFailure").detail("Reason", "Invalid value").detail("KeyPrefix", keyPrefix.printable());
logTestData(data);
return false;
}
}
if (i != 0) {
TraceEvent(SevError, "TestFailure").detail("Reason", "Cycle got longer").detail("KeyPrefix", keyPrefix.printable());
logTestData(data);
return false;
}
return true;

View File

@ -23,7 +23,7 @@
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbserver/RestoreWorkerInterface.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.