Cleanup batch buffer for each restore request

This commit is contained in:
Meng Xu 2020-01-16 16:19:51 -08:00
parent e933716109
commit 4ac92d223b
9 changed files with 85 additions and 27 deletions

View File

@ -51,6 +51,7 @@ struct RestoreSendMutationsToAppliersRequest;
struct RestoreSendVersionedMutationsRequest;
struct RestoreSysInfo;
struct RestoreApplierInterface;
struct RestoreFinishRequest;
// RestoreSysInfo includes information each (type of) restore roles should know.
// At this moment, it only include appliers. We keep the name for future extension.
@ -129,7 +130,7 @@ struct RestoreLoaderInterface : RestoreRoleInterface {
RequestStream<RestoreSendMutationsToAppliersRequest> sendMutations;
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
RequestStream<RestoreSimpleRequest> collectRestoreRoleInterfaces;
RequestStream<RestoreVersionBatchRequest> finishRestore;
RequestStream<RestoreFinishRequest> finishRestore;
bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); }
@ -166,7 +167,7 @@ struct RestoreApplierInterface : RestoreRoleInterface {
RequestStream<RestoreVersionBatchRequest> applyToDB;
RequestStream<RestoreVersionBatchRequest> initVersionBatch;
RequestStream<RestoreSimpleRequest> collectRestoreRoleInterfaces;
RequestStream<RestoreVersionBatchRequest> finishRestore;
RequestStream<RestoreFinishRequest> finishRestore;
bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); }
@ -485,6 +486,28 @@ struct RestoreVersionBatchRequest : TimedRequest {
}
};
struct RestoreFinishRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 13018413;
bool terminate;
ReplyPromise<RestoreCommonReply> reply;
RestoreFinishRequest() = default;
explicit RestoreFinishRequest(bool terminate) : terminate(terminate) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, terminate, reply);
}
std::string toString() {
std::stringstream ss;
ss << "RestoreFinishRequest terminate:" << terminate;
return ss.str();
}
};
struct RestoreRequest {
constexpr static FileIdentifier file_identifier = 49589770;

View File

@ -67,10 +67,12 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
requestTypeStr = "initVersionBatch";
wait(handleInitVersionBatchRequest(req, self));
}
when(RestoreVersionBatchRequest req = waitNext(applierInterf.finishRestore.getFuture())) {
when(RestoreFinishRequest req = waitNext(applierInterf.finishRestore.getFuture())) {
requestTypeStr = "finishRestore";
handleFinishRestoreRequest(req, self);
exitRole = Void();
if (req.terminate) {
exitRole = Void();
}
}
when(wait(exitRole)) {
TraceEvent("FastRestore").detail("RestoreApplierCore", "ExitRole").detail("NodeID", self->id());

View File

@ -118,6 +118,11 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
batch[batchIndex] = Reference<ApplierBatchData>(new ApplierBatchData());
}
void resetPerRestoreRequest() {
batch.clear();
finishedBatch = NotifiedVersion(0);
}
std::string describeNode() {
std::stringstream ss;
ss << "NodeID:" << nodeID.toString() << " nodeIndex:" << nodeIndex;

View File

@ -88,10 +88,12 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
requestTypeStr = "initVersionBatch";
wait(handleInitVersionBatchRequest(req, self));
}
when(RestoreVersionBatchRequest req = waitNext(loaderInterf.finishRestore.getFuture())) {
when(RestoreFinishRequest req = waitNext(loaderInterf.finishRestore.getFuture())) {
requestTypeStr = "finishRestore";
handleFinishRestoreRequest(req, self);
exitRole = Void();
if (req.terminate) {
exitRole = Void();
}
}
when(wait(exitRole)) {
TraceEvent("FastRestore").detail("RestoreLoaderCore", "ExitRole").detail("NodeID", self->id());
@ -180,12 +182,12 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ASSERT(batchData.isValid());
if (batchData->processedFileParams.find(req.param) == batchData->processedFileParams.end()) {
TraceEvent("FastRestore").detail("Loader", self->id()).detail("ProcessLoadParam", req.param.toString());
TraceEvent("FastRestoreLoadFile", self->id()).detail("BatchIndex", req.batchIndex).detail("ProcessLoadParam", req.param.toString());
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
batchData->processedFileParams[req.param] = Never(); // Ensure second exec. wait on _processLoadingParam()
batchData->processedFileParams[req.param] = _processLoadingParam(req.param, batchData, self->id(), self->bc);
} else {
TraceEvent("FastRestore").detail("Loader", self->id()).detail("WaitOnProcessLoadParam", req.param.toString());
TraceEvent("FastRestoreLoadFile", self->id()).detail("BatchIndex", req.batchIndex).detail("WaitOnProcessLoadParam", req.param.toString());
}
ASSERT(batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end());
wait(batchData->processedFileParams[req.param]); // wait on the processing of the req.param.
@ -200,6 +202,8 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
state std::map<LoadingParam, VersionedMutationsMap>::iterator item = batchData->kvOpsPerLP.begin();
TraceEvent("FastRestoreSendMutations", self->id()).detail("BatchIndex", req.batchIndex).detail("UseRangeFile", req.useRangeFile);
batchData->rangeToApplier = req.rangeToApplier;
for (; item != batchData->kvOpsPerLP.end(); item++) {
if (item->first.isRangeFile == req.useRangeFile) {

View File

@ -93,6 +93,10 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
batch[batchIndex] = Reference<LoaderBatchData>(new LoaderBatchData());
}
void resetPerRestoreRequest() {
batch.clear();
}
void initBackupContainer(Key url) {
if (bcUrl == url && bc.isValid()) {
return;

View File

@ -55,7 +55,8 @@ ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInt
std::map<UID, RestoreLoaderInterface> loadersInterf, int batchIndex);
ACTOR static Future<Void> notifyApplierToApplyMutations(std::map<UID, RestoreApplierInterface> appliersInterf,
int batchIndex);
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> self, Database cx);
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> self, bool terminate);
ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreMasterData> self, Database cx);
void splitKeyRangeForAppliers(Reference<MasterBatchData> batchData,
std::map<UID, RestoreApplierInterface> appliersInterf, int batchIndex);
@ -199,7 +200,10 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) {
RestoreRequest& request = restoreRequests[restoreIndex];
TraceEvent("FastRestore").detail("RestoreRequestInfo", request.toString());
// TODO: Initialize MasterData and all loaders and appliers' data for each restore request!
self->resetPerRestoreRequest();
wait(success(processRestoreRequest(self, cx, request)));
wait(notifyRestoreCompleted(self, false));
}
} catch (Error& e) {
if (restoreIndex < restoreRequests.size()) {
@ -213,7 +217,7 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
}
// Step: Notify all restore requests have been handled by cleaning up the restore keys
wait(notifyRestoreCompleted(self, cx));
wait(signalRestoreCompleted(self, cx));
try {
wait(unlockDatabase(cx, randomUID));
@ -559,25 +563,34 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(std::map<UID, RestoreApp
return Void();
}
// Ask all loaders and appliers to perform housecleaning at the end of restore and
// Register the restoreRequestDoneKey to signal the end of restore
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> self, Database cx) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
// Ask all loaders and appliers to perform housecleaning at the end of a restore request
// Terminate those roles if terminate = true
ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> self, bool terminate=false) {
std::vector<std::pair<UID, RestoreFinishRequest>> requests;
for (auto& loader : self->loadersInterf) {
requests.push_back(std::make_pair(loader.first, RestoreVersionBatchRequest(self->batchIndex)));
requests.push_back(std::make_pair(loader.first, RestoreFinishRequest(terminate)));
}
// A loader exits immediately after it receives the request. Master may not receive acks.
Future<Void> endLoaders = sendBatchRequests(&RestoreLoaderInterface::finishRestore, self->loadersInterf, requests);
requests.clear();
for (auto& applier : self->appliersInterf) {
requests.push_back(std::make_pair(applier.first, RestoreVersionBatchRequest(self->batchIndex)));
requests.push_back(std::make_pair(applier.first, RestoreFinishRequest(terminate)));
}
Future<Void> endApplier =
sendBatchRequests(&RestoreApplierInterface::finishRestore, self->appliersInterf, requests);
TraceEvent("FastRestore").detail("RestoreMaster", "RestoreRequestCompleted");
return Void();
}
// Register the restoreRequestDoneKey to signal the end of restore
ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreMasterData> self, Database cx) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
wait(notifyRestoreCompleted(self, true));
wait(delay(5.0)); // Give some time for loaders and appliers to exit
// Notify tester that the restore has finished
@ -596,7 +609,7 @@ ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> se
}
}
TraceEvent("FastRestore").detail("RestoreMaster", "RestoreCompleted");
TraceEvent("FastRestore").detail("RestoreMaster", "AllRestoreCompleted");
return Void();
}
}

View File

@ -137,6 +137,14 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
.detail("VersionBatchIndex", batchIndex);
}
// Reset master data at the beginning of each restore request
void resetPerRestoreRequest() {
TraceEvent("FastRestoreMasterReset").detail("OldVersionBatches", versionBatches.size());
versionBatches.clear();
batchIndex = 1;
batch.clear();
}
std::string describeNode() {
std::stringstream ss;
ss << "Master versionBatch:" << batchIndex;

View File

@ -42,13 +42,10 @@ ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id) {
return Void();
}
void handleFinishRestoreRequest(const RestoreVersionBatchRequest& req, Reference<RestoreRoleData> self) {
if (self->versionBatchStart) {
self->versionBatchStart = false;
}
void handleFinishRestoreRequest(const RestoreFinishRequest& req, Reference<RestoreRoleData> self) {
self->resetPerRestoreRequest();
TraceEvent("FastRestore")
.detail("FinishRestoreRequest", req.batchIndex)
.detail("FinishRestoreRequest", req.terminate)
.detail("Role", getRoleStr(self->role))
.detail("Node", self->id());

View File

@ -56,7 +56,7 @@ using VersionedMutationsMap = std::map<Version, MutationsVec>;
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
void handleFinishRestoreRequest(const RestoreVersionBatchRequest& req, Reference<RestoreRoleData> self);
void handleFinishRestoreRequest(const RestoreFinishRequest& req, Reference<RestoreRoleData> self);
// Helper class for reading restore data from a buffer and throwing the right errors.
// This struct is mostly copied from StringRefReader. We add a sanity check in this struct.
@ -129,6 +129,8 @@ public:
virtual void resetPerVersionBatch(int batchIndex) = 0;
virtual void resetPerRestoreRequest() = 0;
void clearInterfaces() {
loadersInterf.clear();
appliersInterf.clear();