FastRestore:Add sanity check and trace events
This commit is contained in:
parent
bffb4e380d
commit
e011f39829
|
@ -17,7 +17,7 @@
|
|||
KV ranges {(a-b, v0), (c-d, v1), (e-f, v2) ... (y-z, v10)}. With mutation log recorded all along, we can still use
|
||||
the simple backup-restore scheme described above on sub keyspaces seperately. Assuming we did record mutation log from
|
||||
v0 to vn, that allows us to restore
|
||||
|
||||
|
||||
* Keyspace a-b to any version between v0 and vn
|
||||
* Keyspace c-d to any version between v1 and vn
|
||||
* Keyspace y-z to any version between v10 and vn
|
||||
|
|
|
@ -67,7 +67,7 @@ The transaction system state before the recovery is the starting point for the c
|
|||
|
||||
## Phase 2: LOCKING_CSTATE
|
||||
|
||||
This phase locks the coordinated state (cstate) to make sure there is only one master who can change the cstate. Otherwise, we may end up with more than one master accepting commits after the recovery. To achieve that, the master needs to get currently alive tLogs’ interfaces and sends commands to tLogs to lock their states, preventing them from accepting any further writes.
|
||||
This phase locks the coordinated state (cstate) to make sure there is only one master who can change the cstate. Otherwise, we may end up with more than one master accepting commits after the recovery. To achieve that, the master needs to get currently alive tLogs’ interfaces and sends commands to tLogs to lock their states, preventing them from accepting any further writes.
|
||||
|
||||
Recall that `ServerDBInfo` has master's interface and is propogated by CC to every process in a cluster. The current running tLogs can use the master interface in its `ServerDBInfo` to send itself's interface to master.
|
||||
Master simply waits on receiving the `TLogRejoinRequest` streams: for each tLog’s interface received, the master compares the interface ID with the tLog ID read from cstate. Once the master collects enough old tLog interfaces, it will use the interfaces to lock those tLogs.
|
||||
|
|
|
@ -101,9 +101,8 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
// Note: Insert new items into processedFileState will not invalidate the reference.
|
||||
state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset];
|
||||
|
||||
TraceEvent("FastRestore")
|
||||
.detail("ApplierNode", self->id())
|
||||
.detail("VersionBatchIndex", req.batchIndex)
|
||||
TraceEvent("FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("ProcessedFileVersion", curFilePos.get())
|
||||
.detail("Request", req.toString());
|
||||
|
@ -148,6 +147,11 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
curFilePos.set(req.version);
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreApplierPhaseReceiveMutationsDone", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("ProcessedFileVersion", curFilePos.get())
|
||||
.detail("Request", req.toString());
|
||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||
return Void();
|
||||
}
|
||||
|
@ -271,24 +275,22 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
|
|||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state DBApplyProgress progress(applierID, batchData);
|
||||
|
||||
TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("FromVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.begin()->first)
|
||||
.detail("EndVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.rbegin()->first);
|
||||
|
||||
// Assume the process will not crash when it apply mutations to DB. The reply message can be lost though
|
||||
if (batchData->kvOps.empty()) {
|
||||
TraceEvent("FastRestore_ApplierTxn")
|
||||
.detail("ApplierApplyToDBFinished", applierID)
|
||||
.detail("Reason", "EmptyVersionMutation");
|
||||
return Void();
|
||||
}
|
||||
ASSERT_WE_THINK(batchData->kvOps.size());
|
||||
TraceEvent("FastRestore")
|
||||
.detail("ApplierApplyToDB", applierID)
|
||||
.detail("FromVersion", batchData->kvOps.begin()->first)
|
||||
.detail("EndVersion", batchData->kvOps.rbegin()->first);
|
||||
|
||||
batchData->sanityCheckMutationOps();
|
||||
|
||||
if (progress.isDone()) {
|
||||
TraceEvent("FastRestore_ApplierTxn")
|
||||
.detail("ApplierApplyToDBFinished", applierID)
|
||||
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Reason", "NoMutationAtVersions");
|
||||
return Void();
|
||||
}
|
||||
|
@ -422,8 +424,8 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
|
|||
}
|
||||
}
|
||||
|
||||
TraceEvent("FastRestore_ApplierTxn")
|
||||
.detail("ApplierApplyToDBFinished", applierID)
|
||||
TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("CleanupCurTxnIds", progress.curTxnId);
|
||||
// clean up txn ids
|
||||
loop {
|
||||
|
@ -442,7 +444,7 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
|
|||
}
|
||||
// House cleaning
|
||||
batchData->kvOps.clear();
|
||||
TraceEvent("FastRestore_ApplierTxn").detail("ApplierApplyToDBFinished", applierID);
|
||||
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID).detail("BatchIndex", batchIndex);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -452,14 +454,14 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
// Ensure batch i is applied before batch (i+1)
|
||||
wait(self->finishedBatch.whenAtLeast(req.batchIndex-1));
|
||||
|
||||
TraceEvent("FastRestoreApplierPhaseHandleApplyToDB", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("FinishedBatch", self->finishedBatch.get())
|
||||
.detail("HasStarted", batchData->dbApplier.present());
|
||||
state bool isDuplicated = true;
|
||||
if (self->finishedBatch.get() == req.batchIndex-1) {
|
||||
Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
||||
ASSERT(batchData.isValid());
|
||||
TraceEvent("FastRestore")
|
||||
.detail("ApplierApplyToDB", self->id())
|
||||
.detail("VersionBatchIndex", req.batchIndex)
|
||||
.detail("DBApplierPresent", batchData->dbApplier.present());
|
||||
if (!batchData->dbApplier.present()) {
|
||||
isDuplicated = false;
|
||||
batchData->dbApplier = Never();
|
||||
|
|
|
@ -181,7 +181,14 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
|
||||
state bool isDuplicated = true;
|
||||
ASSERT(batchData.isValid());
|
||||
bool paramExist = batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end();
|
||||
bool isReady = paramExist ? batchData->processedFileParams[req.param].isReady() : false;
|
||||
|
||||
TraceEvent("FastRestoreLoaderPhaseLoadFile", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("ProcessLoadParam", req.param.toString())
|
||||
.detail("NotProcessed", !paramExist)
|
||||
.detail("Processed", isReady);
|
||||
if (batchData->processedFileParams.find(req.param) == batchData->processedFileParams.end()) {
|
||||
TraceEvent("FastRestoreLoadFile", self->id()).detail("BatchIndex", req.batchIndex).detail("ProcessLoadParam", req.param.toString());
|
||||
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
|
||||
|
@ -195,6 +202,9 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
wait(batchData->processedFileParams[req.param]); // wait on the processing of the req.param.
|
||||
|
||||
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param], isDuplicated));
|
||||
TraceEvent("FastRestoreLoaderPhaseLoadFileDone", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("ProcessLoadParam", req.param.toString());
|
||||
// TODO: clear self->sampleMutations[req.param] memory to save memory on loader
|
||||
return Void();
|
||||
}
|
||||
|
@ -206,7 +216,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
state Reference<LoaderBatchStatus> batchStatus = self->status[req.batchIndex];
|
||||
state bool isDuplicated = true;
|
||||
|
||||
TraceEvent("FastRestoreSendMutations", self->id())
|
||||
TraceEvent("FastRestoreLoaderPhaseSendMutations", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("UseRangeFile", req.useRangeFile)
|
||||
.detail("LoaderSendStatus", batchStatus->toString());
|
||||
|
@ -267,6 +277,10 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
}
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreLoaderPhaseSendMutationsDone", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("UseRangeFile", req.useRangeFile)
|
||||
.detail("LoaderSendStatus", batchStatus->toString());
|
||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||
return Void();
|
||||
}
|
||||
|
@ -288,11 +302,19 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
state Version prevVersion = 0; // startVersion
|
||||
state std::vector<UID> applierIDs = getApplierIDs(*pRangeToApplier);
|
||||
|
||||
TraceEvent("FastRestore_SendMutationToApplier")
|
||||
TraceEvent("FastRestoreLoaderSendMutationToApplier")
|
||||
.detail("IsRangeFile", isRangeFile)
|
||||
.detail("EndVersion", asset.endVersion)
|
||||
.detail("RestoreAsset", asset.toString());
|
||||
|
||||
// There should be no mutation at asset.endVersion version because it is exclusive
|
||||
if (kvOps.find(asset.endVersion) != kvOps.end()) {
|
||||
TraceEvent(SevError, "FastRestoreLoaderSendMutationToApplier")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("RestoreAsset", asset.toString())
|
||||
.detail("IsRangeFile", isRangeFile)
|
||||
.detail("Data loss at version", asset.endVersion);
|
||||
}
|
||||
// Ensure there is a mutation request sent at endVersion, so that applier can advance its notifiedVersion
|
||||
if (kvOps.find(asset.endVersion) == kvOps.end()) {
|
||||
kvOps[asset.endVersion] = MutationsVec(); // Empty mutation vector will be handled by applier
|
||||
|
|
|
@ -293,7 +293,8 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<MasterBatchData> batchDat
|
|||
std::map<UID, RestoreLoaderInterface> loadersInterf, int batchIndex,
|
||||
Database cx, RestoreRequest request, VersionBatch versionBatch,
|
||||
bool isRangeFile) {
|
||||
TraceEvent("FastRestore")
|
||||
TraceEvent("FastRestoreMasterPhaseLoadFiles")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("FileTypeLoadedInVersionBatch", isRangeFile)
|
||||
.detail("BeginVersion", versionBatch.beginVersion)
|
||||
.detail("EndVersion", versionBatch.endVersion);
|
||||
|
@ -404,7 +405,9 @@ ACTOR static Future<Void> sendMutationsFromLoaders(Reference<MasterBatchData> ba
|
|||
Reference<MasterBatchStatus> batchStatus,
|
||||
std::map<UID, RestoreLoaderInterface> loadersInterf, int batchIndex,
|
||||
bool useRangeFile) {
|
||||
TraceEvent("FastRestore").detail("SendMutationsFromLoaders", batchIndex).detail("UseRangeFiles", useRangeFile);
|
||||
TraceEvent("FastRestoreMasterPhaseSendMutationsFromLoaders")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("UseRangeFiles", useRangeFile);
|
||||
|
||||
std::vector<std::pair<UID, RestoreSendMutationsToAppliersRequest>> requests;
|
||||
for (auto& loader : loadersInterf) {
|
||||
|
@ -502,7 +505,9 @@ void splitKeyRangeForAppliers(Reference<MasterBatchData> batchData,
|
|||
std::vector<Key> keyrangeSplitter;
|
||||
keyrangeSplitter.push_back(normalKeys.begin); // First slot
|
||||
double cumulativeSize = slotSize;
|
||||
TraceEvent("FastRestore").detail("VersionBatch", batchIndex).detail("SamplingSize", batchData->samplesSize);
|
||||
TraceEvent("FastRestoreMasterPhaseCalculateApplierKeyRanges")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("SamplingSize", batchData->samplesSize);
|
||||
while (cumulativeSize < batchData->samplesSize) {
|
||||
IndexedSet<Key, int64_t>::iterator lowerBound = batchData->samples.index(cumulativeSize);
|
||||
if (lowerBound == batchData->samples.end()) {
|
||||
|
@ -640,7 +645,10 @@ ACTOR static Future<Void> clearDB(Database cx) {
|
|||
|
||||
ACTOR static Future<Void> initializeVersionBatch(std::map<UID, RestoreApplierInterface> appliersInterf,
|
||||
std::map<UID, RestoreLoaderInterface> loadersInterf, int batchIndex) {
|
||||
TraceEvent("FastRestoreInitVersionBatch").detail("BatchIndex", batchIndex).detail("Appliers", appliersInterf.size()).detail("Loaders", loadersInterf.size());
|
||||
TraceEvent("FastRestoreMasterPhaseInitVersionBatch")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Appliers", appliersInterf.size())
|
||||
.detail("Loaders", loadersInterf.size());
|
||||
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requestsToAppliers;
|
||||
for (auto& applier : appliersInterf) {
|
||||
requestsToAppliers.emplace_back(applier.first, RestoreVersionBatchRequest(batchIndex));
|
||||
|
@ -663,6 +671,9 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchSta
|
|||
// Prepare the applyToDB requests
|
||||
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
|
||||
|
||||
TraceEvent("FastRestoreMasterPhaseApplyMutations")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Appliers", appliersInterf.size());
|
||||
for (auto& applier : appliersInterf) {
|
||||
ASSERT(batchStatus->applyStatus.find(applier.first) == batchStatus->applyStatus.end());
|
||||
requests.emplace_back(applier.first, RestoreVersionBatchRequest(batchIndex));
|
||||
|
@ -701,7 +712,7 @@ ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> se
|
|||
for (auto& loader : self->loadersInterf) {
|
||||
requests.emplace_back(loader.first, RestoreFinishRequest(terminate));
|
||||
}
|
||||
|
||||
|
||||
Future<Void> endLoaders = sendBatchRequests(&RestoreLoaderInterface::finishRestore, self->loadersInterf, requests);
|
||||
|
||||
requests.clear();
|
||||
|
|
|
@ -44,16 +44,18 @@ ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id) {
|
|||
|
||||
void handleFinishRestoreRequest(const RestoreFinishRequest& req, Reference<RestoreRoleData> self) {
|
||||
self->resetPerRestoreRequest();
|
||||
TraceEvent("FastRestore")
|
||||
TraceEvent("FastRestoreRolePhaseFinishRestoreRequest", self->id())
|
||||
.detail("FinishRestoreRequest", req.terminate)
|
||||
.detail("Role", getRoleStr(self->role))
|
||||
.detail("Node", self->id());
|
||||
.detail("Role", getRoleStr(self->role));
|
||||
|
||||
req.reply.send(RestoreCommonReply(self->id()));
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self) {
|
||||
TraceEvent("FastRestoreHandleInitVersionBatch", self->id()).detail("Role", getRoleStr(self->role)).detail("BatchIndex", req.batchIndex).detail("VersionBatchId", self->versionBatchId.get());
|
||||
TraceEvent("FastRestoreRolePhaseInitVersionBatch", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("Role", getRoleStr(self->role))
|
||||
.detail("VersionBatchId", self->versionBatchId.get());
|
||||
// batchId is continuous. (req.batchIndex-1) is the id of the just finished batch.
|
||||
wait(self->versionBatchId.whenAtLeast(req.batchIndex - 1));
|
||||
|
||||
|
|
|
@ -329,7 +329,7 @@ struct AtomicOpsWorkload : TestWorkload {
|
|||
memcpy(&intValue, kv.value.begin(), kv.value.size());
|
||||
opsVal[kv.key] = intValue;
|
||||
if (!inRecord) {
|
||||
TraceEvent(SevError, "MissingLogKey").detail("OpsKey", kv.key);
|
||||
TraceEvent(SevWarnAlways, "MissingLogKey").detail("OpsKey", kv.key);
|
||||
}
|
||||
if (inRecord && (self->actorCount == 1 && intValue != logVal[records[kv.key]])) {
|
||||
// When multiple actors exist, 1 opsKey can have multiple log keys
|
||||
|
|
Loading…
Reference in New Issue