Merge pull request #3376 from xumengpanda/mengxu/fr-endpoint-prio-PR

FastRestore: Add priorities for loader and applier endpoints
This commit is contained in:
Jingyu Zhou 2020-06-17 18:10:53 -07:00 committed by GitHub
commit 12db9296fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 8 deletions

View File

@ -146,12 +146,13 @@ struct RestoreLoaderInterface : RestoreRoleInterface {
NetworkAddress address() const { return heartbeat.getEndpoint().addresses.address; }
void initEndpoints() {
// Endpoint in a later restore phase has higher priority
heartbeat.getEndpoint(TaskPriority::LoadBalancedEndpoint);
updateRestoreSysInfo.getEndpoint(TaskPriority::LoadBalancedEndpoint);
loadFile.getEndpoint(TaskPriority::LoadBalancedEndpoint);
sendMutations.getEndpoint(TaskPriority::LoadBalancedEndpoint);
initVersionBatch.getEndpoint(TaskPriority::LoadBalancedEndpoint);
finishVersionBatch.getEndpoint(TaskPriority::LoadBalancedEndpoint);
loadFile.getEndpoint(TaskPriority::RestoreLoaderLoadFiles);
sendMutations.getEndpoint(TaskPriority::RestoreLoaderSendMutations);
finishVersionBatch.getEndpoint(TaskPriority::RestoreLoaderFinishVersionBatch);
collectRestoreRoleInterfaces.getEndpoint(TaskPriority::LoadBalancedEndpoint);
finishRestore.getEndpoint(TaskPriority::LoadBalancedEndpoint);
}
@ -184,9 +185,10 @@ struct RestoreApplierInterface : RestoreRoleInterface {
NetworkAddress address() const { return heartbeat.getEndpoint().addresses.address; }
void initEndpoints() {
// Endpoint in a later restore phase has higher priority
heartbeat.getEndpoint(TaskPriority::LoadBalancedEndpoint);
sendMutationVector.getEndpoint(TaskPriority::LoadBalancedEndpoint);
applyToDB.getEndpoint(TaskPriority::LoadBalancedEndpoint);
sendMutationVector.getEndpoint(TaskPriority::RestoreApplierReceiveMutations);
applyToDB.getEndpoint(TaskPriority::RestoreApplierWriteDB);
initVersionBatch.getEndpoint(TaskPriority::LoadBalancedEndpoint);
collectRestoreRoleInterfaces.getEndpoint(TaskPriority::LoadBalancedEndpoint);
finishRestore.getEndpoint(TaskPriority::LoadBalancedEndpoint);

View File

@ -91,9 +91,13 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
self->loaderInterf = RestoreLoaderInterface();
self->loaderInterf.get().initEndpoints();
RestoreLoaderInterface& recruited = self->loaderInterf.get();
DUMPTOKEN(recruited.heartbeat);
DUMPTOKEN(recruited.updateRestoreSysInfo);
DUMPTOKEN(recruited.initVersionBatch);
DUMPTOKEN(recruited.loadFile);
DUMPTOKEN(recruited.sendMutations);
DUMPTOKEN(recruited.initVersionBatch);
DUMPTOKEN(recruited.finishVersionBatch);
DUMPTOKEN(recruited.collectRestoreRoleInterfaces);
DUMPTOKEN(recruited.finishRestore);
actors->add(restoreLoaderCore(self->loaderInterf.get(), req.nodeIndex, cx));
@ -105,6 +109,7 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
self->applierInterf = RestoreApplierInterface();
self->applierInterf.get().initEndpoints();
RestoreApplierInterface& recruited = self->applierInterf.get();
DUMPTOKEN(recruited.heartbeat);
DUMPTOKEN(recruited.sendMutationVector);
DUMPTOKEN(recruited.applyToDB);
DUMPTOKEN(recruited.initVersionBatch);

View File

@ -93,9 +93,10 @@ enum class TaskPriority {
CompactCache = 2900,
TLogSpilledPeekReply = 2800,
FetchKeys = 2500,
RestoreApplierWriteDB = 2400,
RestoreApplierReceiveMutations = 2310,
RestoreLoaderSendMutations = 2300,
RestoreApplierWriteDB = 2310,
RestoreApplierReceiveMutations = 2300,
RestoreLoaderFinishVersionBatch = 2220,
RestoreLoaderSendMutations = 2210,
RestoreLoaderLoadFiles = 2200,
Low = 2000,