keep track of the last time a process became available to set a better starting value for remoteStartTime
This commit is contained in:
parent
45c8f2dfcb
commit
accba4fa1d
|
@ -46,22 +46,24 @@ struct WorkerInfo : NonCopyable {
|
||||||
ReplyPromise<RegisterWorkerReply> reply;
|
ReplyPromise<RegisterWorkerReply> reply;
|
||||||
Generation gen;
|
Generation gen;
|
||||||
int reboots;
|
int reboots;
|
||||||
|
double lastAvailableTime;
|
||||||
WorkerInterface interf;
|
WorkerInterface interf;
|
||||||
ProcessClass initialClass;
|
ProcessClass initialClass;
|
||||||
ProcessClass processClass;
|
ProcessClass processClass;
|
||||||
ClusterControllerPriorityInfo priorityInfo;
|
ClusterControllerPriorityInfo priorityInfo;
|
||||||
|
|
||||||
WorkerInfo() : gen(-1), reboots(0), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
|
WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
|
||||||
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo ) :
|
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo ) :
|
||||||
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo) {}
|
watcher(watcher), reply(reply), gen(gen), reboots(0), lastAvailableTime(now()), interf(interf), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo) {}
|
||||||
|
|
||||||
WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
|
WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
|
||||||
reboots(r.reboots), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), priorityInfo(r.priorityInfo) {}
|
reboots(r.reboots), lastAvailableTime(r.lastAvailableTime), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), priorityInfo(r.priorityInfo) {}
|
||||||
void operator=( WorkerInfo&& r ) noexcept(true) {
|
void operator=( WorkerInfo&& r ) noexcept(true) {
|
||||||
watcher = std::move(r.watcher);
|
watcher = std::move(r.watcher);
|
||||||
reply = std::move(r.reply);
|
reply = std::move(r.reply);
|
||||||
gen = r.gen;
|
gen = r.gen;
|
||||||
reboots = r.reboots;
|
reboots = r.reboots;
|
||||||
|
lastAvailableTime = r.lastAvailableTime;
|
||||||
interf = std::move(r.interf);
|
interf = std::move(r.interf);
|
||||||
initialClass = r.initialClass;
|
initialClass = r.initialClass;
|
||||||
processClass = r.processClass;
|
processClass = r.processClass;
|
||||||
|
@ -518,7 +520,14 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!remoteStartTime.present()) {
|
if(!remoteStartTime.present()) {
|
||||||
remoteStartTime = now();
|
double maxAvailableTime = 0;
|
||||||
|
for(auto& it : result.remoteTLogs) {
|
||||||
|
maxAvailableTime = std::max(maxAvailableTime, id_worker[it.locality.processId()].lastAvailableTime);
|
||||||
|
}
|
||||||
|
for(auto& it : result.logRouters) {
|
||||||
|
maxAvailableTime = std::max(maxAvailableTime, id_worker[it.locality.processId()].lastAvailableTime);
|
||||||
|
}
|
||||||
|
remoteStartTime = maxAvailableTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
if( now() - remoteStartTime.get() < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY &&
|
if( now() - remoteStartTime.get() < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY &&
|
||||||
|
@ -1299,6 +1308,7 @@ ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Stan
|
||||||
auto watcher = cluster->id_worker.find(processID);
|
auto watcher = cluster->id_worker.find(processID);
|
||||||
ASSERT(watcher != cluster->id_worker.end());
|
ASSERT(watcher != cluster->id_worker.end());
|
||||||
|
|
||||||
|
watcher->second.lastAvailableTime = now();
|
||||||
watcher->second.reboots++;
|
watcher->second.reboots++;
|
||||||
Void _ = wait( delay( g_network->isSimulated() ? SERVER_KNOBS->SIM_SHUTDOWN_TIMEOUT : SERVER_KNOBS->SHUTDOWN_TIMEOUT ) );
|
Void _ = wait( delay( g_network->isSimulated() ? SERVER_KNOBS->SIM_SHUTDOWN_TIMEOUT : SERVER_KNOBS->SHUTDOWN_TIMEOUT ) );
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue