From 4ac92d223bcf8e7911ed539a6fc33ea69cdb8410 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 16 Jan 2020 16:19:51 -0800 Subject: [PATCH] Cleanup batch buffer for each restore request --- fdbclient/RestoreWorkerInterface.actor.h | 27 +++++++++++++++-- fdbserver/RestoreApplier.actor.cpp | 6 ++-- fdbserver/RestoreApplier.actor.h | 5 ++++ fdbserver/RestoreLoader.actor.cpp | 12 +++++--- fdbserver/RestoreLoader.actor.h | 4 +++ fdbserver/RestoreMaster.actor.cpp | 37 ++++++++++++++++-------- fdbserver/RestoreMaster.actor.h | 8 +++++ fdbserver/RestoreRoleCommon.actor.cpp | 9 ++---- fdbserver/RestoreRoleCommon.actor.h | 4 ++- 9 files changed, 85 insertions(+), 27 deletions(-) diff --git a/fdbclient/RestoreWorkerInterface.actor.h b/fdbclient/RestoreWorkerInterface.actor.h index c92464b836..4e3bbad43d 100644 --- a/fdbclient/RestoreWorkerInterface.actor.h +++ b/fdbclient/RestoreWorkerInterface.actor.h @@ -51,6 +51,7 @@ struct RestoreSendMutationsToAppliersRequest; struct RestoreSendVersionedMutationsRequest; struct RestoreSysInfo; struct RestoreApplierInterface; +struct RestoreFinishRequest; // RestoreSysInfo includes information each (type of) restore roles should know. // At this moment, it only include appliers. We keep the name for future extension. @@ -129,7 +130,7 @@ struct RestoreLoaderInterface : RestoreRoleInterface { RequestStream sendMutations; RequestStream initVersionBatch; RequestStream collectRestoreRoleInterfaces; - RequestStream finishRestore; + RequestStream finishRestore; bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); } bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); } @@ -166,7 +167,7 @@ struct RestoreApplierInterface : RestoreRoleInterface { RequestStream applyToDB; RequestStream initVersionBatch; RequestStream collectRestoreRoleInterfaces; - RequestStream finishRestore; + RequestStream finishRestore; bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); } bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); } @@ -485,6 +486,28 @@ struct RestoreVersionBatchRequest : TimedRequest { } }; +struct RestoreFinishRequest : TimedRequest { + constexpr static FileIdentifier file_identifier = 13018413; + + bool terminate; + + ReplyPromise reply; + + RestoreFinishRequest() = default; + explicit RestoreFinishRequest(bool terminate) : terminate(terminate) {} + + template + void serialize(Ar& ar) { + serializer(ar, terminate, reply); + } + + std::string toString() { + std::stringstream ss; + ss << "RestoreFinishRequest terminate:" << terminate; + return ss.str(); + } +}; + struct RestoreRequest { constexpr static FileIdentifier file_identifier = 49589770; diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 1d425c3592..eca4535da8 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -67,10 +67,12 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int requestTypeStr = "initVersionBatch"; wait(handleInitVersionBatchRequest(req, self)); } - when(RestoreVersionBatchRequest req = waitNext(applierInterf.finishRestore.getFuture())) { + when(RestoreFinishRequest req = waitNext(applierInterf.finishRestore.getFuture())) { requestTypeStr = "finishRestore"; handleFinishRestoreRequest(req, self); - exitRole = Void(); + if (req.terminate) { + exitRole = Void(); + } } when(wait(exitRole)) { TraceEvent("FastRestore").detail("RestoreApplierCore", "ExitRole").detail("NodeID", self->id()); diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 35cce8bac4..e6371313f1 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -118,6 +118,11 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted(new ApplierBatchData()); } + void resetPerRestoreRequest() { + batch.clear(); + finishedBatch = NotifiedVersion(0); + } + std::string describeNode() { std::stringstream ss; ss << "NodeID:" << nodeID.toString() << " nodeIndex:" << nodeIndex; diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index fa9f8d90a2..a9940c460e 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -88,10 +88,12 @@ ACTOR Future restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no requestTypeStr = "initVersionBatch"; wait(handleInitVersionBatchRequest(req, self)); } - when(RestoreVersionBatchRequest req = waitNext(loaderInterf.finishRestore.getFuture())) { + when(RestoreFinishRequest req = waitNext(loaderInterf.finishRestore.getFuture())) { requestTypeStr = "finishRestore"; handleFinishRestoreRequest(req, self); - exitRole = Void(); + if (req.terminate) { + exitRole = Void(); + } } when(wait(exitRole)) { TraceEvent("FastRestore").detail("RestoreLoaderCore", "ExitRole").detail("NodeID", self->id()); @@ -180,12 +182,12 @@ ACTOR Future handleLoadFileRequest(RestoreLoadFileRequest req, ReferenceprocessedFileParams.find(req.param) == batchData->processedFileParams.end()) { - TraceEvent("FastRestore").detail("Loader", self->id()).detail("ProcessLoadParam", req.param.toString()); + TraceEvent("FastRestoreLoadFile", self->id()).detail("BatchIndex", req.batchIndex).detail("ProcessLoadParam", req.param.toString()); ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end()); batchData->processedFileParams[req.param] = Never(); // Ensure second exec. wait on _processLoadingParam() batchData->processedFileParams[req.param] = _processLoadingParam(req.param, batchData, self->id(), self->bc); } else { - TraceEvent("FastRestore").detail("Loader", self->id()).detail("WaitOnProcessLoadParam", req.param.toString()); + TraceEvent("FastRestoreLoadFile", self->id()).detail("BatchIndex", req.batchIndex).detail("WaitOnProcessLoadParam", req.param.toString()); } ASSERT(batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end()); wait(batchData->processedFileParams[req.param]); // wait on the processing of the req.param. @@ -200,6 +202,8 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ state Reference batchData = self->batch[req.batchIndex]; state std::map::iterator item = batchData->kvOpsPerLP.begin(); + TraceEvent("FastRestoreSendMutations", self->id()).detail("BatchIndex", req.batchIndex).detail("UseRangeFile", req.useRangeFile); + batchData->rangeToApplier = req.rangeToApplier; for (; item != batchData->kvOpsPerLP.end(); item++) { if (item->first.isRangeFile == req.useRangeFile) { diff --git a/fdbserver/RestoreLoader.actor.h b/fdbserver/RestoreLoader.actor.h index 967dd541d4..3521e4ab37 100644 --- a/fdbserver/RestoreLoader.actor.h +++ b/fdbserver/RestoreLoader.actor.h @@ -93,6 +93,10 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted(new LoaderBatchData()); } + void resetPerRestoreRequest() { + batch.clear(); + } + void initBackupContainer(Key url) { if (bcUrl == url && bc.isValid()) { return; diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index ec3d917a0e..528f3ecacd 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -55,7 +55,8 @@ ACTOR static Future initializeVersionBatch(std::map loadersInterf, int batchIndex); ACTOR static Future notifyApplierToApplyMutations(std::map appliersInterf, int batchIndex); -ACTOR static Future notifyRestoreCompleted(Reference self, Database cx); +ACTOR static Future notifyRestoreCompleted(Reference self, bool terminate); +ACTOR static Future signalRestoreCompleted(Reference self, Database cx); void splitKeyRangeForAppliers(Reference batchData, std::map appliersInterf, int batchIndex); @@ -199,7 +200,10 @@ ACTOR Future startProcessRestoreRequests(Reference self for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) { RestoreRequest& request = restoreRequests[restoreIndex]; TraceEvent("FastRestore").detail("RestoreRequestInfo", request.toString()); + // TODO: Initialize MasterData and all loaders and appliers' data for each restore request! + self->resetPerRestoreRequest(); wait(success(processRestoreRequest(self, cx, request))); + wait(notifyRestoreCompleted(self, false)); } } catch (Error& e) { if (restoreIndex < restoreRequests.size()) { @@ -213,7 +217,7 @@ ACTOR Future startProcessRestoreRequests(Reference self } // Step: Notify all restore requests have been handled by cleaning up the restore keys - wait(notifyRestoreCompleted(self, cx)); + wait(signalRestoreCompleted(self, cx)); try { wait(unlockDatabase(cx, randomUID)); @@ -559,25 +563,34 @@ ACTOR static Future notifyApplierToApplyMutations(std::map notifyRestoreCompleted(Reference self, Database cx) { - state Reference tr(new ReadYourWritesTransaction(cx)); - - std::vector> requests; +// Ask all loaders and appliers to perform housecleaning at the end of a restore request +// Terminate those roles if terminate = true +ACTOR static Future notifyRestoreCompleted(Reference self, bool terminate=false) { + std::vector> requests; for (auto& loader : self->loadersInterf) { - requests.push_back(std::make_pair(loader.first, RestoreVersionBatchRequest(self->batchIndex))); + requests.push_back(std::make_pair(loader.first, RestoreFinishRequest(terminate))); } // A loader exits immediately after it receives the request. Master may not receive acks. Future endLoaders = sendBatchRequests(&RestoreLoaderInterface::finishRestore, self->loadersInterf, requests); requests.clear(); for (auto& applier : self->appliersInterf) { - requests.push_back(std::make_pair(applier.first, RestoreVersionBatchRequest(self->batchIndex))); + requests.push_back(std::make_pair(applier.first, RestoreFinishRequest(terminate))); } Future endApplier = sendBatchRequests(&RestoreApplierInterface::finishRestore, self->appliersInterf, requests); + TraceEvent("FastRestore").detail("RestoreMaster", "RestoreRequestCompleted"); + + return Void(); +} + +// Register the restoreRequestDoneKey to signal the end of restore +ACTOR static Future signalRestoreCompleted(Reference self, Database cx) { + state Reference tr(new ReadYourWritesTransaction(cx)); + + wait(notifyRestoreCompleted(self, true)); + wait(delay(5.0)); // Give some time for loaders and appliers to exit // Notify tester that the restore has finished @@ -596,7 +609,7 @@ ACTOR static Future notifyRestoreCompleted(Reference se } } - TraceEvent("FastRestore").detail("RestoreMaster", "RestoreCompleted"); + TraceEvent("FastRestore").detail("RestoreMaster", "AllRestoreCompleted"); return Void(); -} +} \ No newline at end of file diff --git a/fdbserver/RestoreMaster.actor.h b/fdbserver/RestoreMaster.actor.h index e2cd38baa4..ae7d658a30 100644 --- a/fdbserver/RestoreMaster.actor.h +++ b/fdbserver/RestoreMaster.actor.h @@ -137,6 +137,14 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted handleHeartbeat(RestoreSimpleRequest req, UID id) { return Void(); } -void handleFinishRestoreRequest(const RestoreVersionBatchRequest& req, Reference self) { - if (self->versionBatchStart) { - self->versionBatchStart = false; - } - +void handleFinishRestoreRequest(const RestoreFinishRequest& req, Reference self) { + self->resetPerRestoreRequest(); TraceEvent("FastRestore") - .detail("FinishRestoreRequest", req.batchIndex) + .detail("FinishRestoreRequest", req.terminate) .detail("Role", getRoleStr(self->role)) .detail("Node", self->id()); diff --git a/fdbserver/RestoreRoleCommon.actor.h b/fdbserver/RestoreRoleCommon.actor.h index 25fa0c2732..7f397b164a 100644 --- a/fdbserver/RestoreRoleCommon.actor.h +++ b/fdbserver/RestoreRoleCommon.actor.h @@ -56,7 +56,7 @@ using VersionedMutationsMap = std::map; ACTOR Future handleHeartbeat(RestoreSimpleRequest req, UID id); ACTOR Future handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference self); -void handleFinishRestoreRequest(const RestoreVersionBatchRequest& req, Reference self); +void handleFinishRestoreRequest(const RestoreFinishRequest& req, Reference self); // Helper class for reading restore data from a buffer and throwing the right errors. // This struct is mostly copied from StringRefReader. We add a sanity check in this struct. @@ -129,6 +129,8 @@ public: virtual void resetPerVersionBatch(int batchIndex) = 0; + virtual void resetPerRestoreRequest() = 0; + void clearInterfaces() { loadersInterf.clear(); appliersInterf.clear();