minor cleanups to SnapTest

This commit is contained in:
sramamoorthy 2019-04-09 10:11:15 -07:00 committed by Alex Miller
parent 00ccee8a6c
commit 858604b51d
1 changed files with 53 additions and 116 deletions

View File

@ -23,12 +23,6 @@ void getVersionAndnumTags(TraceEventFields md, Version& version, int& numTags) {
sscanf(md.getValue("NumTags").c_str(), "%d:%d", &numTags);
}
void getNumTagServerInfo(TraceEventFields md, int& numTagServers)
{
numTagServers = 0;
sscanf(md.getValue("NumTagServers").c_str(), "%lld", &numTagServers);
}
void getTagAndDurableVersion(TraceEventFields md, Version version, Tag& tag, Version& durableVersion) {
Version verifyVersion;
durableVersion = -1;
@ -434,30 +428,17 @@ public: // workload functions
state int foundTagServers = 0;
for (; i < tLogWorkers.size(); i++) {
tLogMessages.push_back(
timeoutError(tLogWorkers[i].eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 1.0));
timeoutError(tLogWorkers[i].eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 3.0));
state int retryCnt = 0;
loop {
try {
TraceEvent(SevDebug, "WaitingForTlogMessages");
wait(waitForAll(tLogMessages));
break;
} catch (Error& e) {
bool isFail = false;
if (e.code() != error_code_timed_out) {
isFail = true;
break;
} else {
++retryCnt;
}
if (isFail || retryCnt >= self->maxRetryCntToRetrieveMessage ) {
TraceEvent(SevError, "UnableToRetrieveTLogMessages")
.detail("Token", eventTokenRef.toString())
.detail("Reason", "Failed to get tLogMessages")
.detail("Code", e.what());
return false;
}
}
try {
TraceEvent(SevDebug, "WaitingForTlogMessages");
wait(waitForAll(tLogMessages));
} catch (Error& e) {
TraceEvent(SevError, "UnableToRetrieveTLogMessages")
.detail("Token", eventTokenRef.toString())
.detail("Reason", "FailedToGetTLogMessages")
.detail("Code", e.what());
return false;
}
printMessages(tLogMessages);
filterEmptyMessages(tLogMessages);
@ -496,58 +477,34 @@ public: // workload functions
state int numDurableVersionChecks = 0;
state std::map<Tag, bool> visitedStorageTags;
state int retryCnt = 0;
loop {
proxyMessages.clear();
storageMessages.clear();
coordMessages.clear();
for (int i = 0; i < workers.size(); i++) {
std::string eventToken = "ExecTrace/Coordinators/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
coordMessages.push_back(
timeoutError(workers[i].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 3.0));
}
state bool retry = false;
for (int i = 0; i < workers.size(); i++) {
std::string eventToken = "ExecTrace/Proxy/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
proxyMessages.push_back(
timeoutError(workers[i].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 3.0));
}
for (int i = 0; i < workers.size(); i++) {
std::string eventToken = "ExecTrace/Coordinators/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
coordMessages.push_back(
timeoutError(workers[i].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 1.0));
}
for (int i = 0; i < storageWorkers.size(); i++) {
std::string eventToken = "ExecTrace/storage/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
storageMessages.push_back(timeoutError(
storageWorkers[i].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 3.0));
}
for (int i = 0; i < workers.size(); i++) {
std::string eventToken = "ExecTrace/Proxy/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
proxyMessages.push_back(
timeoutError(workers[i].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 1.0));
}
for (int i = 0; i < storageWorkers.size(); i++) {
std::string eventToken = "ExecTrace/storage/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
storageMessages.push_back(timeoutError(
storageWorkers[i].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 1.0));
}
try {
wait(waitForAll(proxyMessages));
wait(waitForAll(storageMessages));
wait(waitForAll(coordMessages));
} catch (Error& e) {
if (e.code() != error_code_timed_out) {
TraceEvent(SevError, "VerifyExecTraceVersionFailure")
.detail("Reason", "FailedToGetProxyOrStorageMessages")
.detail("Code", e.what());
return false;
} else {
retry = true;
++retryCnt;
}
}
if (retry == false) {
break;
}
if (retry && retryCnt >= 4) {
TraceEvent(SevError, "UnableToRetrieveProxyStorageCoordMessages");
return false;
}
try {
wait(waitForAll(proxyMessages));
wait(waitForAll(storageMessages));
wait(waitForAll(coordMessages));
} catch (Error& e) {
TraceEvent(SevError, "UnableToRetrieveProxyStorageCoordMessages");
return false;
}
// filter out empty messages
@ -600,46 +557,26 @@ public: // workload functions
getTagAndDurableVersion(storageMessages[j].get(), execVersion, tag, durableVersion);
TraceEvent("SearchingTLogMessages").detail("Tag", tag.toString());
retryCnt = 0;
loop {
retry = false;
tLogMessages.clear();
for (int m = 0; (tag != invalidTag) && m < tLogWorkers.size(); m++) {
visitedStorageTags[tag] = true;
std::string eventToken = "ExecTrace/TLog/" + tag.toString() + "/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
tLogMessages.push_back(timeoutError(
tLogWorkers[m].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 1.0));
}
try {
TraceEvent("WaitingForTlogMessages");
if (tag != invalidTag) {
wait(waitForAll(tLogMessages));
}
} catch (Error& e) {
if (e.code() != error_code_timed_out) {
TraceEvent(SevError, "VerifyExecTraceVersionFailure")
.detail("Reason", "FailedToGetTLogMessages")
.detail("Code", e.what());
return false;
} else {
retry = true;
++retryCnt;
}
}
if (retry == false) {
break;
}
if (retry && retryCnt > self->maxRetryCntToRetrieveMessage) {
TraceEvent(SevError, "UnableToRetrieveTLogMessagesAfterRetries");
return false;
}
tLogMessages.clear();
for (int m = 0; (tag != invalidTag) && m < tLogWorkers.size(); m++) {
visitedStorageTags[tag] = true;
std::string eventToken = "ExecTrace/TLog/" + tag.toString() + "/" + self->snapUID.toString();
StringRef eventTokenRef(eventToken);
tLogMessages.push_back(timeoutError(
tLogWorkers[m].interf.eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 3.0));
}
try {
TraceEvent("WaitingForTlogMessages");
if (tag != invalidTag) {
wait(waitForAll(tLogMessages));
}
} catch (Error& e) {
TraceEvent(SevError, "VerifyExecTraceVersionFailure")
.detail("Reason", "FailedToGetTLogMessages")
.detail("Code", e.what());
return false;
}
filterEmptyMessages(tLogMessages);
state int k = 0;
numDurableVersionChecks = 0;
for (; (tag != invalidTag) && k < tLogMessages.size(); k++) {