FastRestore:Add assertion and trace events for diagnosis
This commit is contained in:
parent
4d90384c58
commit
e4bf6d570f
|
@ -113,6 +113,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
||||||
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset];
|
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset];
|
||||||
|
|
||||||
TraceEvent(SevInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
TraceEvent(SevInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||||
|
.suppressFor(1.0)
|
||||||
.detail("BatchIndex", req.batchIndex)
|
.detail("BatchIndex", req.batchIndex)
|
||||||
.detail("RestoreAsset", req.asset.toString())
|
.detail("RestoreAsset", req.asset.toString())
|
||||||
.detail("RestoreAssetMesssageIndex", curMsgIndex.get())
|
.detail("RestoreAssetMesssageIndex", curMsgIndex.get())
|
||||||
|
@ -157,6 +158,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
||||||
|
|
||||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||||
TraceEvent(SevInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
|
TraceEvent(SevInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
|
||||||
|
.suppressFor(1.0)
|
||||||
.detail("BatchIndex", req.batchIndex)
|
.detail("BatchIndex", req.batchIndex)
|
||||||
.detail("RestoreAsset", req.asset.toString())
|
.detail("RestoreAsset", req.asset.toString())
|
||||||
.detail("ProcessedMessageIndex", curMsgIndex.get())
|
.detail("ProcessedMessageIndex", curMsgIndex.get())
|
||||||
|
|
|
@ -283,6 +283,7 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
|
||||||
for (int i = 0; i < cmdReplies.size(); ++i) {
|
for (int i = 0; i < cmdReplies.size(); ++i) {
|
||||||
if (SERVER_KNOBS->FASTRESTORE_REQBATCH_LOG) {
|
if (SERVER_KNOBS->FASTRESTORE_REQBATCH_LOG) {
|
||||||
TraceEvent(SevInfo, "FastRestoreGetBatchReplies")
|
TraceEvent(SevInfo, "FastRestoreGetBatchReplies")
|
||||||
|
.suppressFor(1.0)
|
||||||
.detail("Requests", requests.size())
|
.detail("Requests", requests.size())
|
||||||
.detail("OutstandingReplies", oustandingReplies)
|
.detail("OutstandingReplies", oustandingReplies)
|
||||||
.detail("ReplyIndex", i)
|
.detail("ReplyIndex", i)
|
||||||
|
@ -296,6 +297,7 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
|
||||||
ongoingRepliesIndex.push_back(i);
|
ongoingRepliesIndex.push_back(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ASSERT(ongoingReplies.size() == oustandingReplies);
|
||||||
if (ongoingReplies.empty()) {
|
if (ongoingReplies.empty()) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -359,7 +361,7 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
|
||||||
// fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
|
// fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
|
||||||
TraceEvent(SevWarn, "FastRestoreSendBatchRequests").error(e);
|
TraceEvent(SevWarn, "FastRestoreSendBatchRequests").error(e);
|
||||||
for (auto& request : requests) {
|
for (auto& request : requests) {
|
||||||
TraceEvent(SevWarn, "FastRestoreLoader")
|
TraceEvent(SevWarn, "FastRestoreSendBatchRequests")
|
||||||
.detail("SendBatchRequests", requests.size())
|
.detail("SendBatchRequests", requests.size())
|
||||||
.detail("RequestID", request.first)
|
.detail("RequestID", request.first)
|
||||||
.detail("Request", request.second.toString());
|
.detail("Request", request.second.toString());
|
||||||
|
|
|
@ -317,7 +317,8 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
|
||||||
TraceEvent("FastRestoreMasterDispatchVersionBatches")
|
TraceEvent("FastRestoreMasterDispatchVersionBatches")
|
||||||
.detail("BatchIndex", batchIndex)
|
.detail("BatchIndex", batchIndex)
|
||||||
.detail("BatchSize", versionBatch->size)
|
.detail("BatchSize", versionBatch->size)
|
||||||
.detail("RunningVersionBatches", self->runningVersionBatches.get());
|
.detail("RunningVersionBatches", self->runningVersionBatches.get())
|
||||||
|
.detail("VersionBatches", versionBatches.size());
|
||||||
self->batch[batchIndex] = Reference<MasterBatchData>(new MasterBatchData());
|
self->batch[batchIndex] = Reference<MasterBatchData>(new MasterBatchData());
|
||||||
self->batchStatus[batchIndex] = Reference<MasterBatchStatus>(new MasterBatchStatus());
|
self->batchStatus[batchIndex] = Reference<MasterBatchStatus>(new MasterBatchStatus());
|
||||||
fBatches.push_back(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, *versionBatch));
|
fBatches.push_back(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, *versionBatch));
|
||||||
|
|
Loading…
Reference in New Issue