FastRestore:Fix various bugs discovered by enhanced simulation
1. sendMutation request can be dispatched when the version batch has finished and its data has been deleted; 2. Request scheduler on loader may get into infinite loop because FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE knob can be set to 0 in simulation
This commit is contained in:
parent
f1bd2a18ed
commit
7a29a3157f
|
@ -646,9 +646,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( FASTRESTORE_SCHED_UPDATE_DELAY, 0.1 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_UPDATE_DELAY = deterministicRandom()->random01() * 2;}
|
||||
init( FASTRESTORE_SCHED_TARGET_CPU_PERCENT, 70 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_TARGET_CPU_PERCENT = deterministicRandom()->random01() * 100 + 50;} // simulate cpu usage can be larger than 100
|
||||
init( FASTRESTORE_SCHED_MAX_CPU_PERCENT, 90 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_MAX_CPU_PERCENT = FASTRESTORE_SCHED_TARGET_CPU_PERCENT + deterministicRandom()->random01() * 100;}
|
||||
init( FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 30;}
|
||||
init( FASTRESTORE_SCHED_INFLIGHT_SEND_REQS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SEND_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10;}
|
||||
init( FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10;}
|
||||
init( FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 30 + 1;}
|
||||
init( FASTRESTORE_SCHED_INFLIGHT_SEND_REQS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SEND_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10 + 1;}
|
||||
init( FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10 + 1;}
|
||||
init( FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 15 + 1;}
|
||||
init( FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 15 + 1;}
|
||||
init( FASTRESTORE_NUM_TRACE_EVENTS, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_TRACE_EVENTS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 500 + 1;}
|
||||
|
|
|
@ -70,6 +70,7 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
|
|||
try {
|
||||
state int curVBInflightReqs = 0;
|
||||
state int sendLoadParams = 0;
|
||||
state int lastLoadReqs = 0;
|
||||
loop {
|
||||
TraceEvent(SevDebug, "FastRestoreLoaderDispatchRequests", self->id())
|
||||
.detail("SendingQueue", self->sendingQueue.size())
|
||||
|
@ -79,6 +80,8 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
|
|||
.detail("InflightSendingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SEND_REQS)
|
||||
.detail("InflightLoadingReqs", self->inflightLoadingReqs)
|
||||
.detail("InflightLoadingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS)
|
||||
.detail("LastLoadFileRequests", lastLoadReqs)
|
||||
.detail("LoadFileRequestsBatchThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE)
|
||||
.detail("LastDispatchSendLoadParamReqsForCurrentVB", curVBInflightReqs)
|
||||
.detail("LastDispatchSendLoadParamReqsForFutureVB", sendLoadParams)
|
||||
.detail("CpuUsage", self->cpuUsage)
|
||||
|
@ -168,15 +171,24 @@ ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
|
|||
}
|
||||
|
||||
// Dispatch loading backup file requests
|
||||
int loadReqs = 0;
|
||||
lastLoadReqs = 0;
|
||||
while (!self->loadingQueue.empty()) {
|
||||
if (loadReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE) {
|
||||
if (lastLoadReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE) {
|
||||
break;
|
||||
}
|
||||
loadReqs++;
|
||||
const RestoreLoadFileRequest& req = self->loadingQueue.top();
|
||||
self->addActor.send(handleLoadFileRequest(req, self));
|
||||
self->loadingQueue.pop();
|
||||
if (req.batchIndex <= self->finishedBatch.get()) {
|
||||
TraceEvent(SevError, "FastRestoreLoaderDispatchRestoreLoadFileRequestTooOld")
|
||||
.detail("FinishedBatchIndex", self->finishedBatch.get())
|
||||
.detail("RequestBatchIndex", req.batchIndex);
|
||||
req.reply.send(RestoreLoadFileReply(req.param, true));
|
||||
self->loadingQueue.pop();
|
||||
ASSERT(false); // Check if this ever happens easily
|
||||
} else {
|
||||
self->addActor.send(handleLoadFileRequest(req, self));
|
||||
self->loadingQueue.pop();
|
||||
lastLoadReqs++;
|
||||
}
|
||||
}
|
||||
|
||||
if (self->cpuUsage >= SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) {
|
||||
|
@ -595,11 +607,22 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
// Do not need to block on low memory usage because this actor should not increase memory usage.
|
||||
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
|
||||
Reference<RestoreLoaderData> self) {
|
||||
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
|
||||
state Reference<LoaderBatchStatus> batchStatus = self->status[req.batchIndex];
|
||||
state Reference<LoaderBatchData> batchData;
|
||||
state Reference<LoaderBatchStatus> batchStatus;
|
||||
state bool isDuplicated = true;
|
||||
|
||||
if (req.batchIndex <= self->finishedBatch.get()) {
|
||||
TraceEvent(SevWarn, "FastRestoreLoaderRestoreSendMutationsToAppliersRequestTooOld")
|
||||
.detail("FinishedBatchIndex", self->finishedBatch.get())
|
||||
.detail("RequestBatchIndex", req.batchIndex);
|
||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||
return Void();
|
||||
}
|
||||
|
||||
batchData = self->batch[req.batchIndex];
|
||||
batchStatus = self->status[req.batchIndex];
|
||||
ASSERT(batchData.isValid() && batchStatus.isValid());
|
||||
// Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex);
|
||||
ASSERT(req.batchIndex > self->finishedBatch.get());
|
||||
TraceEvent("FastRestoreLoaderPhaseSendMutations", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
|
@ -607,8 +630,6 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
.detail("LoaderSendStatus", batchStatus->toString());
|
||||
// The VB must finish loading phase before it can send mutations; update finishedLoadingVB for scheduler
|
||||
self->finishedLoadingVB = std::max(self->finishedLoadingVB, req.batchIndex);
|
||||
// Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex);
|
||||
ASSERT(self->finishedBatch.get() < req.batchIndex);
|
||||
|
||||
// Ensure each file is sent exactly once by using batchStatus->sendAllLogs and batchStatus->sendAllRanges
|
||||
if (!req.useRangeFile) {
|
||||
|
@ -1220,19 +1241,21 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
|
|||
if (self->finishedBatch.get() == req.batchIndex - 1) {
|
||||
// Sanity check: All requests before and in this batchIndex must have been processed; otherwise,
|
||||
// those requests may cause segmentation fault after applier remove the batch data
|
||||
// TODO: Pop old requests
|
||||
if (!self->loadingQueue.empty() && self->loadingQueue.top().batchIndex <= req.batchIndex) {
|
||||
while (!self->loadingQueue.empty() && self->loadingQueue.top().batchIndex <= req.batchIndex) {
|
||||
// Still has pending requests from earlier batchIndex and current batchIndex, which should not happen
|
||||
TraceEvent(SevError, "FastRestoreLoaderHasPendingLoadFileRequests")
|
||||
TraceEvent(SevWarn, "FastRestoreLoaderHasPendingLoadFileRequests")
|
||||
.detail("PendingRequest", self->loadingQueue.top().toString());
|
||||
self->loadingQueue.pop();
|
||||
}
|
||||
if (!self->sendingQueue.empty() && self->sendingQueue.top().batchIndex <= req.batchIndex) {
|
||||
TraceEvent(SevError, "FastRestoreLoaderHasPendingSendRequests")
|
||||
while (!self->sendingQueue.empty() && self->sendingQueue.top().batchIndex <= req.batchIndex) {
|
||||
TraceEvent(SevWarn, "FastRestoreLoaderHasPendingSendRequests")
|
||||
.detail("PendingRequest", self->sendingQueue.top().toString());
|
||||
self->sendingQueue.pop();
|
||||
}
|
||||
if (!self->sendLoadParamQueue.empty() && self->sendLoadParamQueue.top().batchIndex <= req.batchIndex) {
|
||||
TraceEvent(SevError, "FastRestoreLoaderHasPendingSendLoadParamRequests")
|
||||
while (!self->sendLoadParamQueue.empty() && self->sendLoadParamQueue.top().batchIndex <= req.batchIndex) {
|
||||
TraceEvent(SevWarn, "FastRestoreLoaderHasPendingSendLoadParamRequests")
|
||||
.detail("PendingRequest", self->sendLoadParamQueue.top().toString());
|
||||
self->sendLoadParamQueue.pop();
|
||||
}
|
||||
|
||||
self->finishedBatch.set(req.batchIndex);
|
||||
|
|
Loading…
Reference in New Issue