diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 8f3a741ba9..6561fb0352 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -646,9 +646,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( FASTRESTORE_SCHED_UPDATE_DELAY, 0.1 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_UPDATE_DELAY = deterministicRandom()->random01() * 2;} init( FASTRESTORE_SCHED_TARGET_CPU_PERCENT, 70 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_TARGET_CPU_PERCENT = deterministicRandom()->random01() * 100 + 50;} // simulate cpu usage can be larger than 100 init( FASTRESTORE_SCHED_MAX_CPU_PERCENT, 90 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_MAX_CPU_PERCENT = FASTRESTORE_SCHED_TARGET_CPU_PERCENT + deterministicRandom()->random01() * 100;} - init( FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 30;} - init( FASTRESTORE_SCHED_INFLIGHT_SEND_REQS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SEND_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10;} - init( FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10;} + init( FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 30 + 1;} + init( FASTRESTORE_SCHED_INFLIGHT_SEND_REQS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SEND_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10 + 1;} + init( FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10 + 1;} init( FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 15 + 1;} init( FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 15 + 1;} init( FASTRESTORE_NUM_TRACE_EVENTS, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_TRACE_EVENTS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 500 + 1;} diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index e9001c3e35..65a11d9e02 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -70,6 +70,7 @@ ACTOR Future dispatchRequests(Reference self) { try { state int curVBInflightReqs = 0; state int sendLoadParams = 0; + state int lastLoadReqs = 0; loop { TraceEvent(SevDebug, "FastRestoreLoaderDispatchRequests", self->id()) .detail("SendingQueue", self->sendingQueue.size()) @@ -79,6 +80,8 @@ ACTOR Future dispatchRequests(Reference self) { .detail("InflightSendingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SEND_REQS) .detail("InflightLoadingReqs", self->inflightLoadingReqs) .detail("InflightLoadingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS) + .detail("LastLoadFileRequests", lastLoadReqs) + .detail("LoadFileRequestsBatchThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE) .detail("LastDispatchSendLoadParamReqsForCurrentVB", curVBInflightReqs) .detail("LastDispatchSendLoadParamReqsForFutureVB", sendLoadParams) .detail("CpuUsage", self->cpuUsage) @@ -168,15 +171,24 @@ ACTOR Future dispatchRequests(Reference self) { } // Dispatch loading backup file requests - int loadReqs = 0; + lastLoadReqs = 0; while (!self->loadingQueue.empty()) { - if (loadReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE) { + if (lastLoadReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE) { break; } - loadReqs++; const RestoreLoadFileRequest& req = self->loadingQueue.top(); - self->addActor.send(handleLoadFileRequest(req, self)); - self->loadingQueue.pop(); + if (req.batchIndex <= self->finishedBatch.get()) { + TraceEvent(SevError, "FastRestoreLoaderDispatchRestoreLoadFileRequestTooOld") + .detail("FinishedBatchIndex", self->finishedBatch.get()) + .detail("RequestBatchIndex", req.batchIndex); + req.reply.send(RestoreLoadFileReply(req.param, true)); + self->loadingQueue.pop(); + ASSERT(false); // Check if this ever happens easily + } else { + self->addActor.send(handleLoadFileRequest(req, self)); + self->loadingQueue.pop(); + lastLoadReqs++; + } } if (self->cpuUsage >= SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) { @@ -595,11 +607,22 @@ ACTOR Future handleLoadFileRequest(RestoreLoadFileRequest req, Reference handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req, Reference self) { - state Reference batchData = self->batch[req.batchIndex]; - state Reference batchStatus = self->status[req.batchIndex]; + state Reference batchData; + state Reference batchStatus; state bool isDuplicated = true; + if (req.batchIndex <= self->finishedBatch.get()) { + TraceEvent(SevWarn, "FastRestoreLoaderRestoreSendMutationsToAppliersRequestTooOld") + .detail("FinishedBatchIndex", self->finishedBatch.get()) + .detail("RequestBatchIndex", req.batchIndex); + req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); + return Void(); + } + + batchData = self->batch[req.batchIndex]; + batchStatus = self->status[req.batchIndex]; ASSERT(batchData.isValid() && batchStatus.isValid()); + // Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex); ASSERT(req.batchIndex > self->finishedBatch.get()); TraceEvent("FastRestoreLoaderPhaseSendMutations", self->id()) .detail("BatchIndex", req.batchIndex) @@ -607,8 +630,6 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ .detail("LoaderSendStatus", batchStatus->toString()); // The VB must finish loading phase before it can send mutations; update finishedLoadingVB for scheduler self->finishedLoadingVB = std::max(self->finishedLoadingVB, req.batchIndex); - // Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex); - ASSERT(self->finishedBatch.get() < req.batchIndex); // Ensure each file is sent exactly once by using batchStatus->sendAllLogs and batchStatus->sendAllRanges if (!req.useRangeFile) { @@ -1220,19 +1241,21 @@ ACTOR Future handleFinishVersionBatchRequest(RestoreVersionBatchRequest re if (self->finishedBatch.get() == req.batchIndex - 1) { // Sanity check: All requests before and in this batchIndex must have been processed; otherwise, // those requests may cause segmentation fault after applier remove the batch data - // TODO: Pop old requests - if (!self->loadingQueue.empty() && self->loadingQueue.top().batchIndex <= req.batchIndex) { + while (!self->loadingQueue.empty() && self->loadingQueue.top().batchIndex <= req.batchIndex) { // Still has pending requests from earlier batchIndex and current batchIndex, which should not happen - TraceEvent(SevError, "FastRestoreLoaderHasPendingLoadFileRequests") + TraceEvent(SevWarn, "FastRestoreLoaderHasPendingLoadFileRequests") .detail("PendingRequest", self->loadingQueue.top().toString()); + self->loadingQueue.pop(); } - if (!self->sendingQueue.empty() && self->sendingQueue.top().batchIndex <= req.batchIndex) { - TraceEvent(SevError, "FastRestoreLoaderHasPendingSendRequests") + while (!self->sendingQueue.empty() && self->sendingQueue.top().batchIndex <= req.batchIndex) { + TraceEvent(SevWarn, "FastRestoreLoaderHasPendingSendRequests") .detail("PendingRequest", self->sendingQueue.top().toString()); + self->sendingQueue.pop(); } - if (!self->sendLoadParamQueue.empty() && self->sendLoadParamQueue.top().batchIndex <= req.batchIndex) { - TraceEvent(SevError, "FastRestoreLoaderHasPendingSendLoadParamRequests") + while (!self->sendLoadParamQueue.empty() && self->sendLoadParamQueue.top().batchIndex <= req.batchIndex) { + TraceEvent(SevWarn, "FastRestoreLoaderHasPendingSendLoadParamRequests") .detail("PendingRequest", self->sendLoadParamQueue.top().toString()); + self->sendLoadParamQueue.pop(); } self->finishedBatch.set(req.batchIndex);