FastRestore:Profile performance for getBatchReplies
Generic approach to profile getBatchReplies performance and detect straggler.
This commit is contained in:
parent
ab2dd36bdc
commit
05ea79f584
|
@ -556,6 +556,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
|
||||
init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; }
|
||||
init( FASTRESTORE_STRAGGLER_THRESHOLD, 60 ); if( randomize && BUGGIFY ) { FASTRESTORE_STRAGGLER_THRESHOLD = deterministicRandom()->random01() * 240 + 10; }
|
||||
init( FASTRESTORE_TRACK_REQUEST_LATENCY, true ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_REQUEST_LATENCY = false; }
|
||||
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -498,6 +498,7 @@ public:
|
|||
int64_t FASTRESTORE_APPLYING_PARALLELISM; // number of outstanding txns writing to dest. DB
|
||||
int64_t FASTRESTORE_MONITOR_LEADER_DELAY;
|
||||
int64_t FASTRESTORE_STRAGGLER_THRESHOLD;
|
||||
bool FASTRESTORE_TRACK_REQUEST_LATENCY;
|
||||
|
||||
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);
|
||||
};
|
||||
|
|
|
@ -293,7 +293,7 @@ Future<Void> sendBatchRequests(RequestStream<Request> Interface::*channel, std::
|
|||
ACTOR template <class Interface, class Request>
|
||||
Future<Void> sendBatchRequestsV2(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
|
||||
std::vector<std::pair<UID, Request>> requests,
|
||||
TaskPriority taskID = TaskPriority::Low) {
|
||||
TaskPriority taskID = TaskPriority::Low, bool trackRequestLatency = true) {
|
||||
|
||||
if (requests.empty()) {
|
||||
return Void();
|
||||
|
@ -336,37 +336,141 @@ Future<Void> sendBatchRequestsV2(RequestStream<Request> Interface::*channel, std
|
|||
}
|
||||
}
|
||||
ASSERT(oustandingReplies == 0);
|
||||
// Calculate the latest end time for each interface
|
||||
std::map<UID, double> maxEndTime;
|
||||
UID bathcID = deterministicRandom()->randomUniqueID();
|
||||
for (int i = 0; i < replyDurations.size(); ++i) {
|
||||
double endTime = std::get<2>(replyDurations[i]);
|
||||
TraceEvent(SevInfo, "ProfileSendRequestBatchLatency", bathcID)
|
||||
.detail("NodeID", std::get<0>(replyDurations[i]))
|
||||
.detail("Request", std::get<1>(replyDurations[i]).toString())
|
||||
.detail("Duration", endTime - start);
|
||||
auto item = maxEndTime.emplace(std::get<0>(replyDurations[i]), endTime);
|
||||
item.first->second = std::max(item.first->second, endTime);
|
||||
}
|
||||
// Check the time gap between the earliest and latest node
|
||||
double earliest = std::numeric_limits<double>::max();
|
||||
double latest = std::numeric_limits<double>::min();
|
||||
UID earliestNode, latestNode;
|
||||
if (trackRequestLatency && SERVER_KNOBS->FASTRESTORE_TRACK_REQUEST_LATENCY) {
|
||||
// Calculate the latest end time for each interface
|
||||
std::map<UID, double> maxEndTime;
|
||||
UID bathcID = deterministicRandom()->randomUniqueID();
|
||||
for (int i = 0; i < replyDurations.size(); ++i) {
|
||||
double endTime = std::get<2>(replyDurations[i]);
|
||||
TraceEvent(SevInfo, "ProfileSendRequestBatchLatency", bathcID)
|
||||
.detail("NodeID", std::get<0>(replyDurations[i]))
|
||||
.detail("Request", std::get<1>(replyDurations[i]).toString())
|
||||
.detail("Duration", endTime - start);
|
||||
auto item = maxEndTime.emplace(std::get<0>(replyDurations[i]), endTime);
|
||||
item.first->second = std::max(item.first->second, endTime);
|
||||
}
|
||||
// Check the time gap between the earliest and latest node
|
||||
double earliest = std::numeric_limits<double>::max();
|
||||
double latest = std::numeric_limits<double>::min();
|
||||
UID earliestNode, latestNode;
|
||||
|
||||
for (auto& endTime : maxEndTime) {
|
||||
if (earliest > endTime.second) {
|
||||
earliest = endTime.second;
|
||||
earliestNode = endTime.first;
|
||||
for (auto& endTime : maxEndTime) {
|
||||
if (earliest > endTime.second) {
|
||||
earliest = endTime.second;
|
||||
earliestNode = endTime.first;
|
||||
}
|
||||
if (latest < endTime.second) {
|
||||
latest = endTime.second;
|
||||
latestNode = endTime.first;
|
||||
}
|
||||
}
|
||||
if (latest < endTime.second) {
|
||||
latest = endTime.second;
|
||||
latestNode = endTime.first;
|
||||
if (latest - earliest > SERVER_KNOBS->FASTRESTORE_STRAGGLER_THRESHOLD) {
|
||||
TraceEvent(SevWarn, "ProfileSendRequestBatchLatencyFoundStraggler", bathcID)
|
||||
.detail("SlowestNode", latestNode)
|
||||
.detail("FatestNode", earliestNode);
|
||||
}
|
||||
}
|
||||
if (latest - earliest > SERVER_KNOBS->FASTRESTORE_STRAGGLER_THRESHOLD) {
|
||||
TraceEvent(SevWarn, "ProfileSendRequestBatchLatencyFoundStraggler", bathcID)
|
||||
.detail("SlowestNode", latestNode)
|
||||
.detail("FatestNode", earliestNode);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) break;
|
||||
fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
|
||||
for (auto& request : requests) {
|
||||
TraceEvent(SevWarn, "FastRestore")
|
||||
.detail("SendBatchRequests", requests.size())
|
||||
.detail("RequestID", request.first)
|
||||
.detail("Request", request.second.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Similar to sendBatchRequests except that the caller expect to process the reply.
|
||||
ACTOR template <class Interface, class Request>
|
||||
Future<Void> getBatchRepliesV2(RequestStream<Request> Interface::*channel, std::map<UID, Interface> interfaces,
|
||||
std::vector<std::pair<UID, Request>> requests, std::vector<REPLY_TYPE(Request)>* replies,
|
||||
TaskPriority taskID = TaskPriority::Low, bool trackRequestLatency = true) {
|
||||
if (requests.empty()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
state double start = now();
|
||||
state int oustandingReplies = requests.size();
|
||||
loop {
|
||||
try {
|
||||
state std::vector<Future<REPLY_TYPE(Request)>> cmdReplies;
|
||||
state std::vector<std::tuple<UID, Request, double>> replyDurations; // double is end time of the request
|
||||
for (auto& request : requests) {
|
||||
RequestStream<Request> const* stream = &(interfaces[request.first].*channel);
|
||||
cmdReplies.push_back(stream->getReply(request.second, taskID));
|
||||
replyDurations.emplace_back(request.first, request.second, 0);
|
||||
}
|
||||
|
||||
state std::vector<Future<REPLY_TYPE(Request)>> ongoingReplies;
|
||||
state std::vector<int> ongoingRepliesIndex;
|
||||
loop {
|
||||
ongoingReplies.clear();
|
||||
ongoingRepliesIndex.clear();
|
||||
for (int i = 0; i < cmdReplies.size(); ++i) {
|
||||
if (!cmdReplies[i].isReady()) { // still wait for reply
|
||||
ongoingReplies.push_back(cmdReplies[i]);
|
||||
ongoingRepliesIndex.push_back(i);
|
||||
}
|
||||
}
|
||||
if (ongoingReplies.empty()) {
|
||||
break;
|
||||
} else {
|
||||
wait(waitForAny(ongoingReplies));
|
||||
}
|
||||
// At least one reply is received; Calculate the reply duration
|
||||
for (int j = 0; j < ongoingReplies.size(); ++j) {
|
||||
if (ongoingReplies[j].isReady()) {
|
||||
std::get<2>(replyDurations[ongoingRepliesIndex[j]]) = now();
|
||||
--oustandingReplies;
|
||||
}
|
||||
}
|
||||
}
|
||||
ASSERT(oustandingReplies == 0);
|
||||
if (trackRequestLatency && SERVER_KNOBS->FASTRESTORE_TRACK_REQUEST_LATENCY) {
|
||||
// Calculate the latest end time for each interface
|
||||
std::map<UID, double> maxEndTime;
|
||||
UID bathcID = deterministicRandom()->randomUniqueID();
|
||||
for (int i = 0; i < replyDurations.size(); ++i) {
|
||||
double endTime = std::get<2>(replyDurations[i]);
|
||||
TraceEvent(SevInfo, "ProfileSendRequestBatchLatency", bathcID)
|
||||
.detail("NodeID", std::get<0>(replyDurations[i]))
|
||||
.detail("Request", std::get<1>(replyDurations[i]).toString())
|
||||
.detail("Duration", endTime - start);
|
||||
auto item = maxEndTime.emplace(std::get<0>(replyDurations[i]), endTime);
|
||||
item.first->second = std::max(item.first->second, endTime);
|
||||
}
|
||||
// Check the time gap between the earliest and latest node
|
||||
double earliest = std::numeric_limits<double>::max();
|
||||
double latest = std::numeric_limits<double>::min();
|
||||
UID earliestNode, latestNode;
|
||||
|
||||
for (auto& endTime : maxEndTime) {
|
||||
if (earliest > endTime.second) {
|
||||
earliest = endTime.second;
|
||||
earliestNode = endTime.first;
|
||||
}
|
||||
if (latest < endTime.second) {
|
||||
latest = endTime.second;
|
||||
latestNode = endTime.first;
|
||||
}
|
||||
}
|
||||
if (latest - earliest > SERVER_KNOBS->FASTRESTORE_STRAGGLER_THRESHOLD) {
|
||||
TraceEvent(SevWarn, "ProfileSendRequestBatchLatencyFoundStraggler", bathcID)
|
||||
.detail("SlowestNode", latestNode)
|
||||
.detail("FatestNode", earliestNode);
|
||||
}
|
||||
}
|
||||
// Update replies
|
||||
if (replies != NULL) {
|
||||
for(int i = 0; i < cmdReplies.size(); ++i) {
|
||||
replies->emplace_back(cmdReplies[i].get());
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
|
Loading…
Reference in New Issue