Merge pull request #205 from cie/cleanup-spammy-traceevents
Cleanup spammy traceevents
This commit is contained in:
commit
30464e943c
|
@ -278,22 +278,6 @@ public:
|
|||
fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass));
|
||||
}
|
||||
else {
|
||||
if (it.second.interf.locality.dataHallId().present())
|
||||
TraceEvent(SevWarn,"GWFTADNotAvailable", functionId)
|
||||
.detail("Fitness", fitness)
|
||||
.detailext("Zone", it.second.interf.locality.zoneId())
|
||||
.detailext("DataHall", it.second.interf.locality.dataHallId())
|
||||
.detail("Address", it.second.interf.address())
|
||||
.detail("workerAvailable", workerAvailable(it.second, checkStable))
|
||||
.detail("isExcludedServer", conf.isExcludedServer(it.second.interf.address()))
|
||||
.detail("checkStable", checkStable)
|
||||
.detail("reboots", it.second.reboots)
|
||||
.detail("isAvailable", IFailureMonitor::failureMonitor().getState(it.second.interf.storage.getEndpoint()).isAvailable())
|
||||
.detail("Locality", it.second.interf.locality.toString())
|
||||
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
|
||||
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
|
||||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("InterfaceId", id);
|
||||
unavailableLocals.push_back(it.second.interf.locality);
|
||||
}
|
||||
}
|
||||
|
@ -941,7 +925,6 @@ ACTOR Future<Void> clusterGetServerInfo(
|
|||
|
||||
removeIssue( db->workersWithIssues, reply.getEndpoint().address, issues, issueID );
|
||||
|
||||
TraceEvent("SendingServerInfo").detail("Dest", reply.getEndpoint().address );
|
||||
reply.send( db->serverInfo->get() );
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -293,7 +293,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
bool slowRateKeeper = randomize && BUGGIFY;
|
||||
init( SMOOTHING_AMOUNT, 1.0 ); if( slowRateKeeper ) SMOOTHING_AMOUNT = 5.0;
|
||||
init( SLOW_SMOOTHING_AMOUNT, 10.0 ); if( slowRateKeeper ) SLOW_SMOOTHING_AMOUNT = 50.0;
|
||||
init( RATEKEEPER_LOGGING_INTERVAL, 5.0 );
|
||||
init( METRIC_UPDATE_RATE, .1 ); if( slowRateKeeper ) METRIC_UPDATE_RATE = 0.5;
|
||||
|
||||
bool smallStorageTarget = randomize && BUGGIFY;
|
||||
|
|
|
@ -235,7 +235,6 @@ public:
|
|||
//Ratekeeper
|
||||
double SMOOTHING_AMOUNT;
|
||||
double SLOW_SMOOTHING_AMOUNT;
|
||||
double RATEKEEPER_LOGGING_INTERVAL;
|
||||
double METRIC_UPDATE_RATE;
|
||||
double LAST_LIMITED_RATIO;
|
||||
|
||||
|
|
|
@ -88,27 +88,38 @@ ACTOR Future<int64_t> getDataInFlight( Database cx, Reference<AsyncVar<ServerDBI
|
|||
return dataInFlight;
|
||||
}
|
||||
|
||||
//Computes the queue size for storage servers and tlogs using the BytesInput and BytesDurable attributes
|
||||
//For now, we must ignore invalid keys on storage servers because of a bug that can cause them to be orphaned
|
||||
//Computes the queue size for storage servers and tlogs using the bytesInput and bytesDurable attributes
|
||||
int64_t getQueueSize( Standalone<StringRef> md ) {
|
||||
int64_t bytesInput, bytesDurable;
|
||||
double inputRate, durableRate;
|
||||
double inputRoughness, durableRoughness;
|
||||
int64_t inputBytes, durableBytes;
|
||||
|
||||
sscanf(extractAttribute(md.toString(), "BytesInput").c_str(), "%lld", &bytesInput);
|
||||
sscanf(extractAttribute(md.toString(), "BytesDurable").c_str(), "%lld", &bytesDurable);
|
||||
sscanf(extractAttribute(md.toString(), "bytesInput").c_str(), "%f %f %lld", &inputRate, &inputRoughness, &inputBytes);
|
||||
sscanf(extractAttribute(md.toString(), "bytesDurable").c_str(), "%f %f %lld", &durableRate, &durableRoughness, &durableBytes);
|
||||
|
||||
return bytesInput - bytesDurable;
|
||||
return inputBytes - durableBytes;
|
||||
}
|
||||
|
||||
// This is not robust in the face of a TLog failure
|
||||
ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
|
||||
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName))
|
||||
.detail("Stage", "ContactingMaster");
|
||||
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingLogs");
|
||||
|
||||
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(getWorkers(dbInfo));
|
||||
std::map<NetworkAddress, WorkerInterface> workersMap;
|
||||
for(auto worker : workers) {
|
||||
workersMap[worker.first.address()] = worker.first;
|
||||
}
|
||||
|
||||
state std::vector<Future<Standalone<StringRef>>> messages;
|
||||
state std::vector<TLogInterface> tlogs = dbInfo->get().logSystemConfig.allPresentLogs();
|
||||
for(int i = 0; i < tlogs.size(); i++) {
|
||||
messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( "TLogQueueSize/" + tlogs[i].id().toString() ) ) ), 1.0 ) );
|
||||
auto itr = workersMap.find(tlogs[i].address());
|
||||
if(itr == workersMap.end()) {
|
||||
TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for log server").detail("Tlog", tlogs[i].id());
|
||||
throw attribute_not_found();
|
||||
}
|
||||
messages.push_back( timeoutError(itr->second.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef(tlogs[i].id().toString() + "/TLogMetrics") ) ), 1.0 ) );
|
||||
}
|
||||
Void _ = wait( waitForAll( messages ) );
|
||||
|
||||
|
@ -121,7 +132,7 @@ ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<Serve
|
|||
try {
|
||||
maxQueueSize = std::max( maxQueueSize, getQueueSize( messages[i].get() ) );
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract MaxTLogQueue").detail("Tlog", tlogs[i].id());
|
||||
TraceEvent("QuietDatabaseFailure").detail("Reason", "Failed to extract MaxTLogQueue").detail("Tlog", tlogs[i].id());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -158,13 +169,28 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, boo
|
|||
|
||||
//Gets the maximum size of all the storage server queues
|
||||
ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingStorageServers");
|
||||
|
||||
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
|
||||
state Future<std::vector<std::pair<WorkerInterface, ProcessClass>>> workersFuture = getWorkers(dbInfo);
|
||||
|
||||
state std::vector<StorageServerInterface> servers = wait(serversFuture);
|
||||
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(workersFuture);
|
||||
|
||||
std::map<NetworkAddress, WorkerInterface> workersMap;
|
||||
for(auto worker : workers) {
|
||||
workersMap[worker.first.address()] = worker.first;
|
||||
}
|
||||
|
||||
state vector<StorageServerInterface> servers = wait( getStorageServers( cx ) );
|
||||
state std::vector<Future<Standalone<StringRef>>> messages;
|
||||
for(int i = 0; i < servers.size(); i++) {
|
||||
messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( "StorageServerQueueSize/" + servers[i].id().toString() ) ) ), 1.0 ) );
|
||||
auto itr = workersMap.find(servers[i].address());
|
||||
if(itr == workersMap.end()) {
|
||||
TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for storage server").detail("SS", servers[i].id());
|
||||
throw attribute_not_found();
|
||||
}
|
||||
messages.push_back( timeoutError(itr->second.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef(servers[i].id().toString() + "/StorageMetrics") ) ), 1.0 ) );
|
||||
}
|
||||
|
||||
Void _ = wait( waitForAll(messages) );
|
||||
|
|
|
@ -137,7 +137,6 @@ struct Ratekeeper {
|
|||
|
||||
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
|
||||
ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerInterface ssi ) {
|
||||
state double debug_lastTraceTime = 0;
|
||||
self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) );
|
||||
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
||||
TraceEvent("RkTracking", ssi.id());
|
||||
|
@ -163,25 +162,7 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
|
|||
myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available );
|
||||
myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total );
|
||||
}
|
||||
if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){
|
||||
TraceEvent("RkServerQueueInfo", ssi.id())
|
||||
.detail("LocalTime", reply.get().localTime)
|
||||
.detail("BytesDurable", reply.get().bytesDurable)
|
||||
.detail("BytesInput", reply.get().bytesInput)
|
||||
.detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal())
|
||||
.detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal())
|
||||
.detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate())
|
||||
.detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate())
|
||||
.detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal())
|
||||
.detail("Version", reply.get().v)
|
||||
.trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str());
|
||||
debug_lastTraceTime = now();
|
||||
}
|
||||
} else {
|
||||
//If the SS didn't respond, clear the queue info so that we know it might have failed
|
||||
if(myQueueInfo->value.valid)
|
||||
TraceEvent("RkServerQueueInfo", ssi.id()).trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str());
|
||||
|
||||
myQueueInfo->value.valid = false;
|
||||
}
|
||||
|
||||
|
@ -195,7 +176,6 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
|
|||
}
|
||||
|
||||
ACTOR Future<Void> trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) {
|
||||
state double debug_lastTraceTime = 0;
|
||||
self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) );
|
||||
state Map<UID, TLogQueueInfo>::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id());
|
||||
TraceEvent("RkTracking", tli.id());
|
||||
|
@ -220,20 +200,7 @@ ACTOR Future<Void> trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) {
|
|||
myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available);
|
||||
myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total);
|
||||
}
|
||||
if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){
|
||||
TraceEvent("RkTLogQueueInfo", tli.id()).detail("LocalTime", reply.get().localTime).detail("BytesDurable", reply.get().bytesDurable).detail("BytesInput", reply.get().bytesInput)
|
||||
.detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal()).detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal())
|
||||
.detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate()).detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate())
|
||||
.detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal())
|
||||
.detail("Version", reply.get().v)
|
||||
.trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str());
|
||||
debug_lastTraceTime = now();
|
||||
}
|
||||
} else {
|
||||
//If the TLog didn't respond, clear the queue info so that we know it might have failed
|
||||
if(myQueueInfo->value.valid)
|
||||
TraceEvent("RkTLogQueueInfo", tli.id()).trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str());
|
||||
|
||||
myQueueInfo->value.valid = false;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue