additional tracing for quietDatabase

This commit is contained in:
Markus Pilman 2021-06-15 16:00:28 -06:00
parent c6f47c1f15
commit b2271f2176
1 changed files with 95 additions and 30 deletions

View File

@ -284,6 +284,52 @@ ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx,
return result;
}
int64_t extractMaxQueueSize(const std::vector<Future<TraceEventFields>>& messages,
const std::vector<StorageServerInterface>& servers) {
int64_t maxQueueSize = 0;
UID maxQueueServer;
for (int i = 0; i < messages.size(); i++) {
try {
auto queueSize = getQueueSize(messages[i].get());
if (queueSize > maxQueueSize) {
maxQueueSize = queueSize;
maxQueueServer = servers[i].id();
}
} catch (Error& e) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Failed to extract MaxStorageServerQueue")
.detail("SS", servers[i].id());
for (auto& m : messages) {
TraceEvent("Messages").detail("Info", m.get().toString());
}
throw;
}
}
TraceEvent("QuietDatabaseGotMaxStorageServerQueueSize")
.detail("Stage", "MaxComputed")
.detail("Max", maxQueueSize)
.detail("MaxQueueServer", format("%016" PRIx64, maxQueueServer.first()));
return maxQueueSize;
}
ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInterface wi) {
state Future<TraceEventFields> result =
wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics")));
state Future<Void> timeout = delay(1.0);
choose {
when(TraceEventFields res = wait(result)) { return res; }
when(wait(timeout)) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Could not fetch StorageMetrics")
.detail("Storage", format("%016" PRIx64, storage.first()));
throw timed_out();
}
}
};
// Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
@ -311,9 +357,7 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncV
// Ignore TSS in add delay mode since it can purposefully freeze forever
if (!servers[i].isTss() || !g_network->isSimulated() ||
g_simulator.tssMode != ISimulator::TSSMode::EnabledAddDelay) {
messages.push_back(timeoutError(itr->second.eventLogRequest.getReply(EventLogRequest(
StringRef(servers[i].id().toString() + "/StorageMetrics"))),
1.0));
messages.push_back(getStorageMetricsTimeout(servers[i].id(), itr->second));
}
}
@ -321,23 +365,7 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncV
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ComputingMax").detail("MessageCount", messages.size());
state int64_t maxQueueSize = 0;
state int i = 0;
for (; i < messages.size(); i++) {
try {
maxQueueSize = std::max(maxQueueSize, getQueueSize(messages[i].get()));
} catch (Error& e) {
TraceEvent("QuietDatabaseFailure")
.detail("Reason", "Failed to extract MaxStorageServerQueue")
.detail("SS", servers[i].id());
for (auto& m : messages) {
TraceEvent("Messages").detail("Info", m.get().toString());
}
throw;
}
}
return maxQueueSize;
return extractMaxQueueSize(messages, servers);
}
// Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of
@ -590,6 +618,13 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
int64_t maxPoppedVersionLag = 30e6) {
state Future<Void> reconfig =
reconfigureAfter(cx, 100 + (deterministicRandom()->random01() * 100), dbInfo, "QuietDatabase");
state Future<int64_t> dataInFlight;
state Future<std::pair<int64_t, int64_t>> tLogQueueInfo;
state Future<int64_t> dataDistributionQueueSize;
state Future<bool> teamCollectionValid;
state Future<int64_t> storageQueueSize;
state Future<bool> dataDistributionActive;
state Future<bool> storageServersRecruiting;
auto traceMessage = "QuietDatabase" + phase + "Begin";
TraceEvent(traceMessage.c_str());
@ -613,15 +648,13 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
TraceEvent("QuietDatabaseGotDataDistributor", distributorUID)
.detail("Locality", distributorWorker.locality.toString());
state Future<int64_t> dataInFlight = getDataInFlight(cx, distributorWorker);
state Future<std::pair<int64_t, int64_t>> tLogQueueInfo = getTLogQueueInfo(cx, dbInfo);
state Future<int64_t> dataDistributionQueueSize =
getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0);
state Future<bool> teamCollectionValid = getTeamCollectionValid(cx, distributorWorker);
state Future<int64_t> storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo);
state Future<bool> dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
state Future<bool> storageServersRecruiting =
getStorageServersRecruiting(cx, distributorWorker, distributorUID);
dataInFlight = getDataInFlight(cx, distributorWorker);
tLogQueueInfo = getTLogQueueInfo(cx, dbInfo);
dataDistributionQueueSize = getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0);
teamCollectionValid = getTeamCollectionValid(cx, distributorWorker);
storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo);
dataDistributionActive = getDataDistributionActive(cx, distributorWorker);
storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID);
wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) &&
success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) &&
@ -661,6 +694,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
}
}
} catch (Error& e) {
TraceEvent(("QuietDatabase" + phase + "Error").c_str()).error(e, true);
if (e.code() != error_code_actor_cancelled && e.code() != error_code_attribute_not_found &&
e.code() != error_code_timed_out)
TraceEvent(("QuietDatabase" + phase + "Error").c_str()).error(e);
@ -670,7 +704,38 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
if (e.code() != error_code_attribute_not_found && e.code() != error_code_timed_out)
throw;
TraceEvent(("QuietDatabase" + phase + "Retry").c_str()).error(e);
auto evtType = "QuietDatabase" + phase + "Retry";
TraceEvent evt(evtType.c_str());
evt.error(e);
int notReadyCount = 0;
if (dataInFlight.isReady() && dataInFlight.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "dataInFlight");
}
if (tLogQueueInfo.isReady() && tLogQueueInfo.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "tLogQueueInfo");
}
if (dataDistributionQueueSize.isReady() && dataDistributionQueueSize.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "dataDistributionQueueSize");
}
if (teamCollectionValid.isReady() && teamCollectionValid.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "teamCollectionValid");
}
if (storageQueueSize.isReady() && storageQueueSize.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "storageQueueSize");
}
if (dataDistributionActive.isReady() && dataDistributionActive.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "dataDistributionActive");
}
if (storageServersRecruiting.isReady() && storageServersRecruiting.isError()) {
auto key = "NotReady" + std::to_string(notReadyCount++);
evt.detail(key.c_str(), "storageServersRecruiting");
}
wait(delay(1.0));
numSuccesses = 0;
}