workaround for log giving remote log and others

logSystemConfig.allLocalLogs() sometimes returns remote TLog interface
and a workaround is implemented here. Other minor cleanup.
This commit is contained in:
sramamoorthy 2019-04-08 13:55:50 -07:00 committed by Alex Miller
parent 8838ba3d3b
commit 00ccee8a6c
2 changed files with 37 additions and 20 deletions

View File

@ -867,7 +867,6 @@ ACTOR Future<Void> commitBatch(
if (m.param1 == execSnap) {
te1.trackLatest(tokenStr.c_str());
}
int i = 0;
std::string allTagString;
for (auto& tag : allSources) {
allTagString += tag.toString() + ",";

View File

@ -23,6 +23,12 @@ void getVersionAndnumTags(TraceEventFields md, Version& version, int& numTags) {
sscanf(md.getValue("NumTags").c_str(), "%d:%d", &numTags);
}
void getNumTagServerInfo(TraceEventFields md, int& numTagServers)
{
numTagServers = 0;
sscanf(md.getValue("NumTagServers").c_str(), "%lld", &numTagServers);
}
void getTagAndDurableVersion(TraceEventFields md, Version version, Tag& tag, Version& durableVersion) {
Version verifyVersion;
durableVersion = -1;
@ -59,7 +65,7 @@ void filterEmptyMessages(std::vector<Future<TraceEventFields>>& messages) {
std::string emptyStr;
auto it = messages.begin();
while (it != messages.end()) {
if (it->get().toString() == emptyStr) {
if (!it->isReady() || it->get().toString() == emptyStr) {
it = messages.erase(it);
} else {
++it;
@ -287,7 +293,7 @@ public: // workload functions
// disable pop of the TLog
tr.reset();
try {
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=test");
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=a36b2ca0e8dab0452ac3e12b6b926f4b");
tr.execute(execDisableTLogPop, payLoadRef);
wait(tr.commit());
break;
@ -298,6 +304,7 @@ public: // workload functions
// wait for 40 seconds and verify that the enabled pop happened
// automatically
wait(delay(40.0));
self->snapUID = UID::fromString("a36b2ca0e8dab0452ac3e12b6b926f4b");
} else if (self->testID == 5) {
// description: disable TLog pop and enable TLog pop with
// different UIDs should mis-match and print an error
@ -305,7 +312,7 @@ public: // workload functions
// disable pop of the TLog
tr.reset();
try {
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=tmatch");
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=956349f5f368d37a802f1f37d7f4b9c1");
tr.execute(execDisableTLogPop, payLoadRef);
wait(tr.commit());
break;
@ -317,7 +324,7 @@ public: // workload functions
// enable pop of the TLog
tr.reset();
try {
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=didnotmatch");
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=5810898ca2f3143a246886c79d1bea92");
tr.execute(execEnableTLogPop, payLoadRef);
wait(tr.commit());
break;
@ -325,6 +332,7 @@ public: // workload functions
wait(tr.onError(e));
}
}
self->snapUID = UID::fromString("5810898ca2f3143a246886c79d1bea92");
} else if (self->testID == 6) {
// snapshot create without disabling pop of the TLog
loop {
@ -340,13 +348,14 @@ public: // workload functions
wait(tr.onError(e));
}
}
self->snapUID = UID::fromString("d78b08d47f341158e9a54d4baaf4a4dd");
} else if (self->testID == 7) {
// disable popping of TLog and snapshot create with mis-matching
loop {
// disable pop of the TLog
tr.reset();
try {
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=tmatch");
StringRef payLoadRef = LiteralStringRef("empty-binary:uid=f49d27ddf7a28b6549d930743e0ebdbe");
tr.execute(execDisableTLogPop, payLoadRef);
wait(tr.commit());
break;
@ -367,6 +376,7 @@ public: // workload functions
wait(tr.onError(e));
}
}
self->snapUID = UID::fromString("ba61e9612a561d60bd83ad83e1b63568");
} else if (self->testID == 8) {
// create a snapshot with a non whitelisted binary path and operation
// should fail
@ -401,7 +411,6 @@ public: // workload functions
state StringRef eventTokenRef(event);
state vector<WorkerInterface> tLogWorkers;
state std::vector<Future<TraceEventFields>> tLogMessages;
state std::vector<WorkerDetails> workers = wait(getWorkers(self->dbInfo));
state std::map<NetworkAddress, WorkerInterface> address_workers;
@ -422,45 +431,54 @@ public: // workload functions
}
state int i = 0;
state int foundTagServers = 0;
for (; i < tLogWorkers.size(); i++) {
tLogMessages.push_back(
timeoutError(tLogWorkers[i].eventLogRequest.getReply(EventLogRequest(eventTokenRef)), 1.0));
state int retryCnt = 0;
state bool retry = false;
loop {
retry = false;
try {
TraceEvent(SevDebug, "WaitingForTlogMessages");
wait(waitForAll(tLogMessages));
break;
} catch (Error& e) {
bool isFail = false;
if (e.code() != error_code_timed_out) {
TraceEvent(SevError, "VerifyTLogTrackLatest")
isFail = true;
break;
} else {
++retryCnt;
}
if (isFail || retryCnt >= self->maxRetryCntToRetrieveMessage ) {
TraceEvent(SevError, "UnableToRetrieveTLogMessages")
.detail("Token", eventTokenRef.toString())
.detail("Reason", "Failed to get tLogMessages")
.detail("Code", e.what());
return false;
} else {
retry = true;
++retryCnt;
}
}
if (retryCnt >= self->maxRetryCntToRetrieveMessage ) {
TraceEvent(SevError, "UnableToRetrieveTLogMessages");
return false;
}
}
printMessages(tLogMessages);
filterEmptyMessages(tLogMessages);
if (tLogMessages.size() != 1) {
TraceEvent(SevError, "VerifyTLogTrackLatestMessageNotFound")
if (tLogMessages.size() < 1) {
TraceEvent("VerifyTLogTrackLatestMessageNotFound")
.detail("Address", tLogWorkers[i].address())
.detail("Token", eventTokenRef.toString());
return false;
} else {
++foundTagServers;
}
tLogMessages.clear();
}
// FIXME: logSystemConfig.allLocalLogs returns remote tlogServers also in few cases and hence the test fails.
// Verify that foundTagServers matches the number of TLogServers in the local region
if (foundTagServers < 1) {
TraceEvent(SevError, "VerifyTLogTrackLatestMessageNotReachAllTLogservers")
.detail("Token", eventTokenRef.toString())
.detail("FoundaTagServers", foundTagServers);
return false;
}
TraceEvent("VerifyTLogTrackLatestDone");
return true;
}