FastRestore:Revise trace events to be descriptive
Revert changes that send mutations to appliers out of order
This commit is contained in:
parent
38193a3866
commit
f073049865
|
@ -89,8 +89,8 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "FastRestore")
|
||||
.detail("RestoreLoaderError", e.what())
|
||||
TraceEvent(SevWarn, "FastRestoreApplierError", self->id())
|
||||
.detail("Error", e.what())
|
||||
.detail("RequestType", requestTypeStr);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -296,8 +296,9 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
|
|||
if (ongoingReplies.empty()) {
|
||||
break;
|
||||
} else {
|
||||
wait(quorum(ongoingReplies, std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL,
|
||||
(int)ongoingReplies.size())));
|
||||
wait(waitForAny(ongoingReplies));
|
||||
// wait(quorum(ongoingReplies, std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL,
|
||||
// (int)ongoingReplies.size())));
|
||||
}
|
||||
// At least one reply is received; Calculate the reply duration
|
||||
for (int j = 0; j < ongoingReplies.size(); ++j) {
|
||||
|
@ -354,6 +355,9 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
|
|||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) break;
|
||||
fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
|
||||
TraceEvent(SevWarn, "FastRestoreSendBatchRequests")
|
||||
.detail("ErrorCode", e.code())
|
||||
.detail("ErrorInfo", e.what());
|
||||
for (auto& request : requests) {
|
||||
TraceEvent(SevWarn, "FastRestore")
|
||||
.detail("SendBatchRequests", requests.size())
|
||||
|
|
|
@ -540,8 +540,8 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
.detail("Version", commitVersion.toString())
|
||||
.detail("Mutation", kvm.toString());
|
||||
}
|
||||
applierVersionedMutationsBuffer[applierID].push_back(applierVersionedMutationsBuffer[applierID].arena(),
|
||||
VersionedMutation(kvm, commitVersion));
|
||||
applierVersionedMutationsBuffer[applierID].push_back_deep(
|
||||
applierVersionedMutationsBuffer[applierID].arena(), VersionedMutation(kvm, commitVersion));
|
||||
msgSize += kvm.expectedSize();
|
||||
}
|
||||
|
||||
|
@ -558,8 +558,10 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
.detail("MessageIndex", msgIndex)
|
||||
.detail("RestoreAsset", asset.toString())
|
||||
.detail("Requests", requests.size());
|
||||
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces,
|
||||
requests, TaskPriority::RestoreLoaderSendMutations));
|
||||
// fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces,
|
||||
// requests, TaskPriority::RestoreLoaderSendMutations));
|
||||
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
|
||||
TaskPriority::RestoreLoaderSendMutations));
|
||||
msgIndex++;
|
||||
msgSize = 0;
|
||||
for (auto& applierID : applierIDs) {
|
||||
|
@ -582,12 +584,18 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
|
|||
.detail("MessageIndex", msgIndex)
|
||||
.detail("RestoreAsset", asset.toString())
|
||||
.detail("Requests", requests.size());
|
||||
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
|
||||
TaskPriority::RestoreLoaderSendMutations));
|
||||
// fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces,
|
||||
// requests,
|
||||
// TaskPriority::RestoreLoaderSendMutations));
|
||||
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
|
||||
TaskPriority::RestoreLoaderSendMutations));
|
||||
}
|
||||
wait(waitForAll(fSends));
|
||||
// wait(waitForAll(fSends));
|
||||
|
||||
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);
|
||||
TraceEvent("FastRestoreLoaderSendMutationToAppliers")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("RestoreAsset", asset.toString())
|
||||
.detail("Mutations", kvCount);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,8 @@ struct VersionedMutation {
|
|||
VersionedMutation() = default;
|
||||
explicit VersionedMutation(MutationRef mutation, LogMessageVersion version)
|
||||
: mutation(mutation), version(version) {}
|
||||
explicit VersionedMutation(Arena& to, const VersionedMutation& from)
|
||||
: mutation(to, from.mutation), version(from.version) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
|
Loading…
Reference in New Issue