FastRestore: loaderCore and applierCore for requests

Use loaderCore and applierCore to handle all requests
to loader and applier.
This commit is contained in:
Meng Xu 2019-04-06 22:30:19 -07:00
parent ed0d3c8b57
commit 18fb2ea99d
2 changed files with 944 additions and 1360 deletions

File diff suppressed because it is too large Load Diff

View File

@ -49,6 +49,9 @@ struct RestoreSendMutationRequest;
struct RestoreLoadFileRequest;
struct RestoreGetApplierKeyRangeRequest;
struct RestoreSetApplierKeyRangeRequest;
struct GetKeyRangeNumberReply;
struct RestoreVersionBatchRequest;
struct RestoreCalculateApplierKeyRangeRequest;
// RestoreCommandEnum is also used as the phase ID for CMDUID
enum class RestoreCommandEnum {Init = 0,
@ -117,7 +120,7 @@ struct RestoreInterface {
RequestStream<RestoreLoadFileRequest> sampleLogFile;
RequestStream<RestoreSendMutationRequest> sendSampleMutation;
RequestStream<RestoreSimpleRequest> calculateApplierKeyRange;
RequestStream<RestoreCalculateApplierKeyRangeRequest> calculateApplierKeyRange;
RequestStream<RestoreGetApplierKeyRangeRequest> getApplierKeyRangeRequest;
RequestStream<RestoreSetApplierKeyRangeRequest> setApplierKeyRangeRequest;
@ -126,6 +129,8 @@ struct RestoreInterface {
RequestStream<RestoreSendMutationRequest> sendMutation;
RequestStream<RestoreSimpleRequest> applyToDB;
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
// ToDelete
RequestStream< struct RestoreCommand > cmd; // Restore commands from master to loader and applier
// RequestStream< struct RestoreRequest > request; // Restore requests used by loader and applier
@ -224,15 +229,18 @@ typedef RestoreCommand::LoadingParam LoadingParam;
struct RestoreSetRoleRequest : TimedRequest {
CMDUID cmdID;
RestoreRole role;
int nodeIndex;
UID masterApplierID;
ReplyPromise<RestoreCommonReply> reply;
RestoreSetRoleRequest() : cmdID(CMDUID()), role(RestoreRole::Invalid) {}
explicit RestoreSetRoleRequest(CMDUID cmdID, RestoreRole role) : cmdID(cmdID), role(role) {}
explicit RestoreSetRoleRequest(CMDUID cmdID, RestoreRole role, int nodeIndex, UID masterApplierID) :
cmdID(cmdID), role(role), nodeIndex(nodeIndex), masterApplierID(masterApplierID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, role, reply);
serializer(ar, cmdID, role, nodeIndex, masterApplierID, reply);
}
};
@ -257,16 +265,16 @@ struct RestoreLoadFileRequest : TimedRequest {
struct RestoreSendMutationRequest : TimedRequest {
CMDUID cmdID;
uint64_t commitVersion;
MutationRef kvm;
MutationRef mutation;
ReplyPromise<RestoreCommonReply> reply;
RestoreSendMutationRequest() : cmdID(CMDUID()), commitVersion(0), kvm(MutationRef()) {}
explicit RestoreSendMutationRequest(CMDUID cmdID, uint64_t commitVersion, MutationRef kvm) : cmdID(cmdID), commitVersion(commitVersion), kvm(kvm) {}
RestoreSendMutationRequest() : cmdID(CMDUID()), commitVersion(0), mutation(MutationRef()) {}
explicit RestoreSendMutationRequest(CMDUID cmdID, uint64_t commitVersion, MutationRef mutation) : cmdID(cmdID), commitVersion(commitVersion), mutation(mutation) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, commitVersion, kvm, reply);
serializer(ar, cmdID, commitVersion, mutation, reply);
}
};
@ -285,18 +293,48 @@ struct RestoreSimpleRequest : TimedRequest {
}
};
struct RestoreGetApplierKeyRangeRequest : TimedRequest {
struct RestoreCalculateApplierKeyRangeRequest : TimedRequest {
CMDUID cmdID;
UID applierID; // The applier ID whose key range will be replied
int numAppliers;
ReplyPromise<GetKeyRangeReply> reply;
ReplyPromise<GetKeyRangeNumberReply> reply;
RestoreGetApplierKeyRangeRequest() : cmdID(CMDUID()), applierID(UID()) {}
explicit RestoreGetApplierKeyRangeRequest(CMDUID cmdID, UID applierID) : cmdID(cmdID), applierID(applierID) {}
RestoreCalculateApplierKeyRangeRequest() : cmdID(CMDUID()), numAppliers(0) {}
explicit RestoreCalculateApplierKeyRangeRequest(CMDUID cmdID, int numAppliers) : cmdID(cmdID), numAppliers(numAppliers) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, applierID, reply);
serializer(ar, cmdID, numAppliers, reply);
}
};
struct RestoreVersionBatchRequest : TimedRequest {
CMDUID cmdID;
int batchID;
ReplyPromise<RestoreCommonReply> reply;
RestoreVersionBatchRequest() : cmdID(CMDUID()), batchID(0) {}
explicit RestoreVersionBatchRequest(CMDUID cmdID, int batchID) : cmdID(cmdID), batchID(batchID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, batchID, reply);
}
};
struct RestoreGetApplierKeyRangeRequest : TimedRequest {
CMDUID cmdID;
int applierIndex; // The applier ID whose key range will be replied // TODO: Maybe change to use applier's UID
ReplyPromise<GetKeyRangeReply> reply;
RestoreGetApplierKeyRangeRequest() : cmdID(CMDUID()), applierIndex(0) {}
explicit RestoreGetApplierKeyRangeRequest(CMDUID cmdID, int applierIndex) : cmdID(cmdID), applierIndex(applierIndex) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, cmdID, applierIndex, reply);
}
};
@ -346,6 +384,10 @@ struct GetKeyRangeReply : RestoreCommonReply {
GetKeyRangeReply() : index(0), lowerBound(KeyRef()), upperBound(KeyRef()) {}
explicit GetKeyRangeReply(int index, KeyRef lowerBound, KeyRef upperBound) : index(index), lowerBound(lowerBound), upperBound(upperBound) {}
explicit GetKeyRangeReply(UID id, CMDUID cmdID, int index, KeyRef lowerBound, KeyRef upperBound) :
RestoreCommonReply(id, cmdID), index(index), lowerBound(lowerBound), upperBound(upperBound) {}
explicit GetKeyRangeReply(UID id, CMDUID cmdID) :
RestoreCommonReply(id, cmdID) {}
std::string toString() const {
std::stringstream ss;
@ -362,6 +404,28 @@ struct GetKeyRangeReply : RestoreCommonReply {
};
struct GetKeyRangeNumberReply : RestoreCommonReply {
int keyRangeNum;
GetKeyRangeNumberReply() : keyRangeNum(0) {}
explicit GetKeyRangeNumberReply(int keyRangeNum) : keyRangeNum(keyRangeNum) {}
explicit GetKeyRangeNumberReply(UID id, CMDUID cmdID) : RestoreCommonReply(id, cmdID) {}
std::string toString() const {
std::stringstream ss;
ss << "ServerNodeID:" << id.toString() << " CMDID:" << cmdID.toString()
<< " keyRangeNum:" << std::to_string(keyRangeNum);
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, *(RestoreCommonReply *) this, keyRangeNum);
}
};
// ToDelete
struct RestoreCommandReply {
UID id; // placeholder, which reply the worker's node id back to master