FastRestore: Remove deprecated RestoreCommand

This commit is contained in:
Meng Xu 2019-04-09 11:49:54 -07:00
parent 18fb2ea99d
commit 6a86492e6e
2 changed files with 42 additions and 68 deletions

View File

@ -2604,6 +2604,8 @@ ACTOR Future<Void> _restoreWorker(Database cx_input, LocalityData locality) {
//we are not the leader, so put our interface in the agent list
if(leaderInterf.present()) {
// Initialize the node's UID
rd->localNodeStatus.nodeID = interf.id();
// Step: Find other worker's interfaces
// NOTE: This must be after wait(configureRolesHandler()) because we must ensure all workers have registered their interfaces into DB before we can read the interface.

View File

@ -132,7 +132,7 @@ struct RestoreInterface {
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
// ToDelete
RequestStream< struct RestoreCommand > cmd; // Restore commands from master to loader and applier
// RequestStream< struct RestoreCommand > cmd; // Restore commands from master to loader and applier
// RequestStream< struct RestoreRequest > request; // Restore requests used by loader and applier
bool operator == (RestoreInterface const& r) const { return id() == r.id(); }
@ -141,90 +141,62 @@ struct RestoreInterface {
void initNodeID() { nodeID = setRole.getEndpoint().token; }
UID id() const { return nodeID; } //cmd.getEndpoint().token;
NetworkAddress address() const { return cmd.getEndpoint().addresses.address; }
NetworkAddress address() const { return setRole.getEndpoint().addresses.address; }
void initEndpoints() {
cmd.getEndpoint( TaskClusterController ); // Q: Why do we need this?
setRole.getEndpoint( TaskClusterController );// Q: Why do we need this?
sampleRangeFile.getEndpoint( TaskClusterController );
sampleLogFile.getEndpoint( TaskClusterController );
sendSampleMutation.getEndpoint( TaskClusterController );
calculateApplierKeyRange.getEndpoint( TaskClusterController );
getApplierKeyRangeRequest.getEndpoint( TaskClusterController );
setApplierKeyRangeRequest.getEndpoint( TaskClusterController );
loadRangeFile.getEndpoint( TaskClusterController );
loadLogFile.getEndpoint( TaskClusterController );
sendMutation.getEndpoint( TaskClusterController );
applyToDB.getEndpoint( TaskClusterController );
initVersionBatch.getEndpoint( TaskClusterController );
}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, setRole, sampleRangeFile, sampleLogFile, sendSampleMutation,
calculateApplierKeyRange, getApplierKeyRangeRequest, setApplierKeyRangeRequest,
loadRangeFile, loadLogFile, sendMutation, applyToDB);
loadRangeFile, loadLogFile, sendMutation, applyToDB, initVersionBatch);
}
};
struct RestoreCommand {
RestoreCommandEnum cmd; // 0: set role, -1: end of the command stream
CMDUID cmdID; // monotonically increase index for commands.
UID id; // Node id that will receive the command
int nodeIndex; // The index of the node in the global node status
UID masterApplier;
RestoreRole role; // role of the command;
KeyRange keyRange;
uint64_t commitVersion;
MutationRef mutation; //TODO: change to a vector
KeyRef applierKeyRangeLB;
UID applierID;
int keyRangeIndex;
struct LoadingParam {
Key url;
Version version;
std::string filename;
int64_t offset;
int64_t length;
int64_t blockSize;
KeyRange restoreRange;
Key addPrefix;
Key removePrefix;
Key mutationLogPrefix;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, url, version, filename, offset, length, blockSize, restoreRange, addPrefix, removePrefix, mutationLogPrefix);
//ar & url & version & filename & offset & length & blockSize & restoreRange & addPrefix & removePrefix & mutationLogPrefix;
}
std::string toString() {
std::stringstream str;
str << "url:" << url.toString() << "version:" << version
<< " filename:" << filename << " offset:" << offset << " length:" << length << " blockSize:" << blockSize
<< " restoreRange:" << restoreRange.toString()
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
return str.str();
}
};
LoadingParam loadingParam;
ReplyPromise< struct RestoreCommandReply > reply;
RestoreCommand() : id(UID()), role(RestoreRole::Invalid) {}
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id): cmd(cmd), cmdID(cmdID), id(id) {};
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id, RestoreRole role) : cmd(cmd), cmdID(cmdID), id(id), role(role) {}
// Set_Role
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id, RestoreRole role, int nodeIndex, UID masterApplier) : cmd(cmd), cmdID(cmdID), id(id), role(role), nodeIndex(nodeIndex), masterApplier(masterApplier) {} // Temporary when we use masterApplier to apply mutations
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id, KeyRange keyRange): cmd(cmd), cmdID(cmdID), id(id), keyRange(keyRange) {};
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id, LoadingParam loadingParam): cmd(cmd), cmdID(cmdID), id(id), loadingParam(loadingParam) {};
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id, int keyRangeIndex): cmd(cmd), cmdID(cmdID), id(id), keyRangeIndex(keyRangeIndex) {};
// For loader send mutation to applier
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id, uint64_t commitVersion, struct MutationRef mutation): cmd(cmd), cmdID(cmdID), id(id), commitVersion(commitVersion), mutation(mutation) {};
// Notify loader about applier key ranges
explicit RestoreCommand(RestoreCommandEnum cmd, CMDUID cmdID, UID id, KeyRef applierKeyRangeLB, UID applierID): cmd(cmd), cmdID(cmdID), id(id), applierKeyRangeLB(applierKeyRangeLB), applierID(applierID) {};
struct LoadingParam {
Key url;
Version version;
std::string filename;
int64_t offset;
int64_t length;
int64_t blockSize;
KeyRange restoreRange;
Key addPrefix;
Key removePrefix;
Key mutationLogPrefix;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar , cmd , cmdID , nodeIndex, id , masterApplier , role , keyRange , commitVersion , mutation , applierKeyRangeLB , applierID , keyRangeIndex , loadingParam , reply);
//ar & cmd & cmdIndex & id & masterApplier & role & keyRange & commitVersion & mutation & applierKeyRangeLB & applierID & keyRangeIndex & loadingParam & reply;
serializer(ar, url, version, filename, offset, length, blockSize, restoreRange, addPrefix, removePrefix, mutationLogPrefix);
//ar & url & version & filename & offset & length & blockSize & restoreRange & addPrefix & removePrefix & mutationLogPrefix;
}
std::string toString() {
std::stringstream str;
str << "url:" << url.toString() << "version:" << version
<< " filename:" << filename << " offset:" << offset << " length:" << length << " blockSize:" << blockSize
<< " restoreRange:" << restoreRange.toString()
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
return str.str();
}
};
typedef RestoreCommand::LoadingParam LoadingParam;
struct RestoreSetRoleRequest : TimedRequest {
CMDUID cmdID;