FR:Add controller interface and send to roles

This commit is contained in:
Meng Xu 2020-08-03 15:06:15 -07:00
parent f071d81ad0
commit e9adec421d
5 changed files with 47 additions and 7 deletions

View File

@ -203,6 +203,31 @@ struct RestoreApplierInterface : RestoreRoleInterface {
std::string toString() { return nodeID.toString(); }
};
struct RestoreControllerInterface : RestoreRoleInterface {
constexpr static FileIdentifier file_identifier = 54253047;
RequestStream<RestoreSimpleRequest> samples;
bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); }
RestoreControllerInterface() {
role = RestoreRole::Controller;
nodeID = deterministicRandom()->randomUniqueID();
}
NetworkAddress address() const { return samples.getEndpoint().addresses.address; }
void initEndpoints() { samples.getEndpoint(TaskPriority::LoadBalancedEndpoint); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, *(RestoreRoleInterface*)this, samples);
}
std::string toString() { return nodeID.toString(); }
}
// RestoreAsset uniquely identifies the work unit done by restore roles;
// It is used to ensure exact-once processing on restore loader and applier;
// By combining all RestoreAssets across all verstion batches, restore should process all mutations in
@ -361,22 +386,25 @@ struct RestoreRecruitRoleReply : TimedRequest {
struct RestoreRecruitRoleRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 87022360;
RestoreControllerInterface ci;
RestoreRole role;
int nodeIndex; // Each role is a node
ReplyPromise<RestoreRecruitRoleReply> reply;
RestoreRecruitRoleRequest() : role(RestoreRole::Invalid) {}
explicit RestoreRecruitRoleRequest(RestoreRole role, int nodeIndex) : role(role), nodeIndex(nodeIndex) {}
explicit RestoreRecruitRoleRequest(RestoreControllerInterface ci, RestoreRole role, int nodeIndex)
: ci(ci), role(role), nodeIndex(nodeIndex) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, role, nodeIndex, reply);
serializer(ar, ci, role, nodeIndex, reply);
}
std::string printable() {
std::stringstream ss;
ss << "RestoreRecruitRoleRequest Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex;
ss << "RestoreRecruitRoleRequest Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex
<< " RestoreController:" << ci.id();
return ss.str();
}

View File

@ -74,7 +74,10 @@ void splitKeyRangeForAppliers(Reference<ControllerBatchData> batchData,
std::map<UID, RestoreApplierInterface> appliersInterf, int batchIndex);
ACTOR Future<Void> startRestoreController(Reference<RestoreWorkerData> controllerWorker, Database cx) {
state Reference<RestoreControllerData> self = Reference<RestoreControllerData>(new RestoreControllerData());
ASSERT(controllerWorker.isValid());
ASSERT(controllerWorker->controllerInterf.present());
state Reference<RestoreControllerData> self =
Reference<RestoreControllerData>(new RestoreControllerData(controllerWorker->controllerInterf.id()));
state ActorCollectionNoErrors actors;
try {
@ -107,6 +110,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
.detail("NumLoaders", SERVER_KNOBS->FASTRESTORE_NUM_LOADERS)
.detail("NumAppliers", SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS);
ASSERT(controllerData->loadersInterf.empty() && controllerData->appliersInterf.empty());
ASSERT(controllerWorker->controllerInterf.present());
ASSERT(controllerData.isValid());
ASSERT(SERVER_KNOBS->FASTRESTORE_NUM_LOADERS > 0 && SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS > 0);
@ -129,7 +133,8 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
}
TraceEvent("FastRestoreController", controllerData->id()).detail("WorkerNode", workerInterf.first);
requests.emplace_back(workerInterf.first, RestoreRecruitRoleRequest(role, nodeIndex));
requests.emplace_back(workerInterf.first,
RestoreRecruitRoleRequest(controllerWorker->controllerInterf.get(), role, nodeIndex));
nodeIndex++;
}

View File

@ -150,9 +150,9 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
void addref() { return ReferenceCounted<RestoreControllerData>::addref(); }
void delref() { return ReferenceCounted<RestoreControllerData>::delref(); }
RestoreControllerData() {
RestoreControllerData(UID interfId) {
role = RestoreRole::Controller;
nodeID = UID();
nodeID = interfId;
runningVersionBatches.set(0);
}

View File

@ -88,6 +88,7 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
if (req.role == RestoreRole::Loader) {
ASSERT(!self->loaderInterf.present());
self->controllerInterf = req.ci;
self->loaderInterf = RestoreLoaderInterface();
self->loaderInterf.get().initEndpoints();
RestoreLoaderInterface& recruited = self->loaderInterf.get();
@ -106,6 +107,7 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
RestoreRecruitRoleReply(self->loaderInterf.get().id(), RestoreRole::Loader, self->loaderInterf.get()));
} else if (req.role == RestoreRole::Applier) {
ASSERT(!self->applierInterf.present());
self->controllerInterf = req.ci;
self->applierInterf = RestoreApplierInterface();
self->applierInterf.get().initEndpoints();
RestoreApplierInterface& recruited = self->applierInterf.get();
@ -202,6 +204,10 @@ ACTOR Future<Void> startRestoreWorkerLeader(Reference<RestoreWorkerData> self, R
// TODO: Needs to keep this monitor's future. May use actorCollection
state Future<Void> workersFailureMonitor = monitorWorkerLiveness(self);
RestoreControllerInterface controllerInterface;
DUMPTOKEN(controllerInterface.samples);
self->controllerInterf = controllerInterface;
wait(startRestoreController(self, cx) || workersFailureMonitor);
return Void();

View File

@ -49,6 +49,7 @@ struct RestoreWorkerData : NonCopyable, public ReferenceCounted<RestoreWorkerDa
std::map<UID, RestoreWorkerInterface> workerInterfaces; // UID is worker's node id, RestoreWorkerInterface is worker's communication workerInterface
// Restore Roles
Optional<RestoreLoaderInterface> controllerInterf;
Optional<RestoreLoaderInterface> loaderInterf;
Optional<RestoreApplierInterface> applierInterf;