FastRestoreLoader:Fix:sched may stuck due to race condition
This commit is contained in:
parent
015397be74
commit
98167a6d20
|
@ -644,7 +644,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( FASTRESTORE_USE_LOG_FILE, true ); // Perf test only: set it to false will cause simulation failure
|
||||
init( FASTRESTORE_SAMPLE_MSG_BYTES, 1048576 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLE_MSG_BYTES = deterministicRandom()->random01() * 2048;}
|
||||
init( FASTRESTORE_SCHED_UPDATE_DELAY, 0.5 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_UPDATE_DELAY = deterministicRandom()->random01() * 2;}
|
||||
init( FASTRESTORE_SCHED_TARGET_CPU_PERCENT, 70 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_TARGET_CPU_PERCENT = deterministicRandom()->random01() * 100;}
|
||||
init( FASTRESTORE_SCHED_TARGET_CPU_PERCENT, 70 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_TARGET_CPU_PERCENT = deterministicRandom()->random01() * 100 + 50;} // simulate cpu usage can be larger than 100
|
||||
init( FASTRESTORE_SCHED_MAX_CPU_PERCENT, 90 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_MAX_CPU_PERCENT = FASTRESTORE_SCHED_TARGET_CPU_PERCENT + deterministicRandom()->random01() * 100;}
|
||||
init( FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS, 20 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS = deterministicRandom()->random01() * 30;}
|
||||
init( FASTRESTORE_SCHED_INFLIGHT_SEND_REQS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SEND_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 5 + 1;}
|
||||
|
|
|
@ -67,6 +67,16 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
|
|||
ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
|
||||
try {
|
||||
loop {
|
||||
TraceEvent(SevDebug, "FastRestoreLoaderDispatchRequests", self->id())
|
||||
.detail("SendingQueue", self->sendingQueue.size())
|
||||
.detail("LoadingQueue", self->loadingQueue.size())
|
||||
.detail("InflightSendingReqs", self->inflightSendingReqs)
|
||||
.detail("InflightSendingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SEND_REQS)
|
||||
.detail("InflightLoadingReqs", self->inflightLoadingReqs)
|
||||
.detail("InflightLoadingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS)
|
||||
.detail("CpuUsage", self->cpuUsage)
|
||||
.detail("TargetCpuUsage", SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT)
|
||||
.detail("MaxCpuUsage", SERVER_KNOBS->FASTRESTORE_SCHED_MAX_CPU_PERCENT);
|
||||
while (!self->sendingQueue.empty()) {
|
||||
const RestoreSendMutationsToAppliersRequest& req = self->sendingQueue.top();
|
||||
// Dispatch the request if it is the next version batch to process or if cpu usage is low
|
||||
|
@ -105,16 +115,19 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
|
|||
self->addActor.send(handleLoadFileRequest(req, self));
|
||||
self->loadingQueue.pop();
|
||||
}
|
||||
|
||||
if (self->cpuUsage >= SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) {
|
||||
wait(delay(0.1));
|
||||
updateProcessStats(self);
|
||||
}
|
||||
updateProcessStats(self);
|
||||
|
||||
if (self->loadingQueue.empty() && self->sendingQueue.empty()) {
|
||||
TraceEvent(SevDebug, "FastRestoreLoaderDispatchRequestsWaitOnRequests", self->id())
|
||||
.detail("HasPendingRequests", self->hasPendingRequests->get());
|
||||
self->hasPendingRequests->set(false);
|
||||
wait(self->hasPendingRequests->onChange()); // CAREFUL: may stuck here
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
TraceEvent(SevError, "FastRestoreLoaderDispatchRequests").error(e, true);
|
||||
|
@ -140,7 +153,6 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
|
|||
|
||||
loop {
|
||||
state std::string requestTypeStr = "[Init]";
|
||||
hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty();
|
||||
|
||||
try {
|
||||
choose {
|
||||
|
@ -154,6 +166,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
|
|||
}
|
||||
when(RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture())) {
|
||||
requestTypeStr = "loadFile";
|
||||
hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty();
|
||||
self->initBackupContainer(req.param.url);
|
||||
self->loadingQueue.push(req);
|
||||
if (!hasQueuedRequests) {
|
||||
|
@ -162,6 +175,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
|
|||
}
|
||||
when(RestoreSendMutationsToAppliersRequest req = waitNext(loaderInterf.sendMutations.getFuture())) {
|
||||
requestTypeStr = "sendMutations";
|
||||
hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty();
|
||||
self->sendingQueue.push(req);
|
||||
if (!hasQueuedRequests) {
|
||||
self->hasPendingRequests->set(true);
|
||||
|
|
Loading…
Reference in New Issue