FastRestore:Reuse getBatchReplies for sendBatchRequests

Remove old sendBatchRequests and getBatchReplies as well.
This commit is contained in:
Meng Xu 2020-02-21 16:15:01 -08:00
parent 4dd206b1b8
commit 8506bce493
1 changed files with 6 additions and 165 deletions

View File

@ -251,144 +251,9 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IA
} // namespace parallelFileRestore
// Send each request in requests via channel of the request's interface.
// Do not expect a meaningful reply
// Save replies to replies if replies != NULL
// The UID in a request is the UID of the interface to handle the request
ACTOR template <class Interface, class Request>
Future<Void> sendBatchRequestsToDelete(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
std::vector<std::pair<UID, Request>> requests,
TaskPriority taskID = TaskPriority::Low) {
if (requests.empty()) {
return Void();
}
loop {
try {
std::vector<Future<REPLY_TYPE(Request)>> cmdReplies;
for (auto& request : requests) {
RequestStream<Request> const* stream = &(interfaces[request.first].*channel);
cmdReplies.push_back(stream->getReply(request.second, taskID));
}
// Alex: Unless you want to do some action when it timeout multiple times, you should use timout. Otherwise,
// getReply will automatically keep retrying for you.
// Alex: you probably do NOT need the timeoutError.
std::vector<REPLY_TYPE(Request)> reps = wait(
timeoutError(getAll(cmdReplies), SERVER_KNOBS->FASTRESTORE_FAILURE_TIMEOUT));
break;
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) break;
fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
for (auto& request : requests) {
TraceEvent(SevWarn, "FastRestore")
.detail("SendBatchRequests", requests.size())
.detail("RequestID", request.first)
.detail("Request", request.second.toString());
}
}
}
return Void();
}
ACTOR template <class Interface, class Request>
Future<Void> sendBatchRequests(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
std::vector<std::pair<UID, Request>> requests, TaskPriority taskID = TaskPriority::Low,
bool trackRequestLatency = true) {
if (requests.empty()) {
return Void();
}
state double start = now();
state int oustandingReplies = requests.size();
loop {
try {
state std::vector<Future<REPLY_TYPE(Request)>> cmdReplies;
state std::vector<std::tuple<UID, Request, double>> replyDurations; // double is end time of the request
for (auto& request : requests) {
RequestStream<Request> const* stream = &(interfaces[request.first].*channel);
cmdReplies.push_back(stream->getReply(request.second, taskID));
replyDurations.emplace_back(request.first, request.second, 0);
}
state std::vector<Future<REPLY_TYPE(Request)>> ongoingReplies;
state std::vector<int> ongoingRepliesIndex;
loop {
ongoingReplies.clear();
ongoingRepliesIndex.clear();
for (int i = 0; i < cmdReplies.size(); ++i) {
if (!cmdReplies[i].isReady()) { // still wait for reply
ongoingReplies.push_back(cmdReplies[i]);
ongoingRepliesIndex.push_back(i);
}
}
if (ongoingReplies.empty()) {
break;
} else {
wait(waitForAny(ongoingReplies));
}
// At least one reply is received; Calculate the reply duration
for (int j = 0; j < ongoingReplies.size(); ++j) {
if (ongoingReplies[j].isReady()) {
std::get<2>(replyDurations[ongoingRepliesIndex[j]]) = now();
--oustandingReplies;
}
}
}
ASSERT(oustandingReplies == 0);
if (trackRequestLatency && SERVER_KNOBS->FASTRESTORE_TRACK_REQUEST_LATENCY) {
// Calculate the latest end time for each interface
std::map<UID, double> maxEndTime;
UID bathcID = deterministicRandom()->randomUniqueID();
for (int i = 0; i < replyDurations.size(); ++i) {
double endTime = std::get<2>(replyDurations[i]);
TraceEvent(SevInfo, "ProfileSendRequestBatchLatency", bathcID)
.detail("NodeID", std::get<0>(replyDurations[i]))
.detail("Request", std::get<1>(replyDurations[i]).toString())
.detail("Duration", endTime - start);
auto item = maxEndTime.emplace(std::get<0>(replyDurations[i]), endTime);
item.first->second = std::max(item.first->second, endTime);
}
// Check the time gap between the earliest and latest node
double earliest = std::numeric_limits<double>::max();
double latest = std::numeric_limits<double>::min();
UID earliestNode, latestNode;
for (auto& endTime : maxEndTime) {
if (earliest > endTime.second) {
earliest = endTime.second;
earliestNode = endTime.first;
}
if (latest < endTime.second) {
latest = endTime.second;
latestNode = endTime.first;
}
}
if (latest - earliest > SERVER_KNOBS->FASTRESTORE_STRAGGLER_THRESHOLD) {
TraceEvent(SevWarn, "ProfileSendRequestBatchLatencyFoundStraggler", bathcID)
.detail("SlowestNode", latestNode)
.detail("FatestNode", earliestNode);
}
}
break;
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) break;
fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
for (auto& request : requests) {
TraceEvent(SevWarn, "FastRestore")
.detail("SendBatchRequests", requests.size())
.detail("RequestID", request.first)
.detail("Request", request.second.toString());
}
}
}
return Void();
}
// Similar to sendBatchRequests except that the caller expect to process the reply.
ACTOR template <class Interface, class Request>
Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
std::vector<std::pair<UID, Request>> requests, std::vector<REPLY_TYPE(Request)>* replies,
TaskPriority taskID = TaskPriority::Low, bool trackRequestLatency = true) {
@ -489,36 +354,12 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
return Void();
}
// Similar to sendBatchRequests except that the caller expect to process the reply.
// This actor can be combined with sendBatchRequests(...)
// Similar to getBatchReplies except that the caller does not expect to process the reply info.
ACTOR template <class Interface, class Request>
Future<Void> getBatchRepliesToDelete(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
std::vector<std::pair<UID, Request>> requests, std::vector<REPLY_TYPE(Request)>* replies,
TaskPriority taskID = TaskPriority::Low) {
if (requests.empty()) {
return Void();
}
loop {
try {
std::vector<Future<REPLY_TYPE(Request)>> cmdReplies;
for (auto& request : requests) {
RequestStream<Request> const* stream = &(interfaces[request.first].*channel);
cmdReplies.push_back(stream->getReply(request.second, taskID));
}
// Alex: Unless you want to do some action when it timeout multiple times, you should use timout. Otherwise,
// getReply will automatically keep retrying for you.
std::vector<REPLY_TYPE(Request)> reps = wait(
timeoutError(getAll(cmdReplies), SERVER_KNOBS->FASTRESTORE_FAILURE_TIMEOUT));
*replies = reps;
break;
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) break;
fprintf(stdout, "getBatchReplies Error code:%d, error message:%s\n", e.code(), e.what());
}
}
Future<Void> sendBatchRequests(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
std::vector<std::pair<UID, Request>> requests, TaskPriority taskID = TaskPriority::Low,
bool trackRequestLatency = true) {
wait(getBatchReplies(channel, interfaces, requests, NULL, taskID, trackRequestLatency));
return Void();
}