Remove RkServerQueueInfo and RkTLogQueueInfo trace events, since this information is more or less already logged on the storage servers and tlogs. Update the quiet database check and magnesium to use the information from the logs and storage servers.
This commit is contained in:
parent
3b952efb4e
commit
bb1297c686
|
@ -293,7 +293,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
bool slowRateKeeper = randomize && BUGGIFY;
|
||||
init( SMOOTHING_AMOUNT, 1.0 ); if( slowRateKeeper ) SMOOTHING_AMOUNT = 5.0;
|
||||
init( SLOW_SMOOTHING_AMOUNT, 10.0 ); if( slowRateKeeper ) SLOW_SMOOTHING_AMOUNT = 50.0;
|
||||
init( RATEKEEPER_LOGGING_INTERVAL, 5.0 );
|
||||
init( METRIC_UPDATE_RATE, .1 ); if( slowRateKeeper ) METRIC_UPDATE_RATE = 0.5;
|
||||
|
||||
bool smallStorageTarget = randomize && BUGGIFY;
|
||||
|
|
|
@ -235,7 +235,6 @@ public:
|
|||
//Ratekeeper
|
||||
double SMOOTHING_AMOUNT;
|
||||
double SLOW_SMOOTHING_AMOUNT;
|
||||
double RATEKEEPER_LOGGING_INTERVAL;
|
||||
double METRIC_UPDATE_RATE;
|
||||
double LAST_LIMITED_RATIO;
|
||||
|
||||
|
|
|
@ -88,27 +88,38 @@ ACTOR Future<int64_t> getDataInFlight( Database cx, Reference<AsyncVar<ServerDBI
|
|||
return dataInFlight;
|
||||
}
|
||||
|
||||
//Computes the queue size for storage servers and tlogs using the BytesInput and BytesDurable attributes
|
||||
//For now, we must ignore invalid keys on storage servers because of a bug that can cause them to be orphaned
|
||||
//Computes the queue size for storage servers and tlogs using the bytesInput and bytesDurable attributes
|
||||
int64_t getQueueSize( Standalone<StringRef> md ) {
|
||||
int64_t bytesInput, bytesDurable;
|
||||
double inputRate, durableRate;
|
||||
double inputRoughness, durableRoughness;
|
||||
int64_t inputBytes, durableBytes;
|
||||
|
||||
sscanf(extractAttribute(md.toString(), "BytesInput").c_str(), "%lld", &bytesInput);
|
||||
sscanf(extractAttribute(md.toString(), "BytesDurable").c_str(), "%lld", &bytesDurable);
|
||||
sscanf(extractAttribute(md.toString(), "bytesInput").c_str(), "%f %f %lld", &inputRate, &inputRoughness, &inputBytes);
|
||||
sscanf(extractAttribute(md.toString(), "bytesDurable").c_str(), "%f %f %lld", &durableRate, &durableRoughness, &durableBytes);
|
||||
|
||||
return bytesInput - bytesDurable;
|
||||
return inputBytes - durableBytes;
|
||||
}
|
||||
|
||||
// This is not robust in the face of a TLog failure
|
||||
ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
|
||||
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName))
|
||||
.detail("Stage", "ContactingMaster");
|
||||
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingLogs");
|
||||
|
||||
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(getWorkers(dbInfo));
|
||||
std::map<NetworkAddress, WorkerInterface> workersMap;
|
||||
for(auto worker : workers) {
|
||||
workersMap[worker.first.address()] = worker.first;
|
||||
}
|
||||
|
||||
state std::vector<Future<Standalone<StringRef>>> messages;
|
||||
state std::vector<TLogInterface> tlogs = dbInfo->get().logSystemConfig.allPresentLogs();
|
||||
for(int i = 0; i < tlogs.size(); i++) {
|
||||
messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( "TLogQueueSize/" + tlogs[i].id().toString() ) ) ), 1.0 ) );
|
||||
auto itr = workersMap.find(tlogs[i].address());
|
||||
if(itr == workersMap.end()) {
|
||||
TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for log server").detail("Tlog", tlogs[i].id());
|
||||
throw attribute_not_found();
|
||||
}
|
||||
messages.push_back( timeoutError(itr->second.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef(tlogs[i].id().toString() + "/TLogMetrics") ) ), 1.0 ) );
|
||||
}
|
||||
Void _ = wait( waitForAll( messages ) );
|
||||
|
||||
|
@ -121,7 +132,7 @@ ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<Serve
|
|||
try {
|
||||
maxQueueSize = std::max( maxQueueSize, getQueueSize( messages[i].get() ) );
|
||||
} catch( Error &e ) {
|
||||
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract MaxTLogQueue").detail("Tlog", tlogs[i].id());
|
||||
TraceEvent("QuietDatabaseFailure").detail("Reason", "Failed to extract MaxTLogQueue").detail("Tlog", tlogs[i].id());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -158,13 +169,28 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, boo
|
|||
|
||||
//Gets the maximum size of all the storage server queues
|
||||
ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingStorageServers");
|
||||
|
||||
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
|
||||
state Future<std::vector<std::pair<WorkerInterface, ProcessClass>>> workersFuture = getWorkers(dbInfo);
|
||||
|
||||
state std::vector<StorageServerInterface> servers = wait(serversFuture);
|
||||
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(workersFuture);
|
||||
|
||||
std::map<NetworkAddress, WorkerInterface> workersMap;
|
||||
for(auto worker : workers) {
|
||||
workersMap[worker.first.address()] = worker.first;
|
||||
}
|
||||
|
||||
state vector<StorageServerInterface> servers = wait( getStorageServers( cx ) );
|
||||
state std::vector<Future<Standalone<StringRef>>> messages;
|
||||
for(int i = 0; i < servers.size(); i++) {
|
||||
messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( "StorageServerQueueSize/" + servers[i].id().toString() ) ) ), 1.0 ) );
|
||||
auto itr = workersMap.find(servers[i].address());
|
||||
if(itr == workersMap.end()) {
|
||||
TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for storage server").detail("SS", servers[i].id());
|
||||
throw attribute_not_found();
|
||||
}
|
||||
messages.push_back( timeoutError(itr->second.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef(servers[i].id().toString() + "/StorageMetrics") ) ), 1.0 ) );
|
||||
}
|
||||
|
||||
Void _ = wait( waitForAll(messages) );
|
||||
|
|
|
@ -137,7 +137,6 @@ struct Ratekeeper {
|
|||
|
||||
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
|
||||
ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerInterface ssi ) {
|
||||
state double debug_lastTraceTime = 0;
|
||||
self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) );
|
||||
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
|
||||
TraceEvent("RkTracking", ssi.id());
|
||||
|
@ -163,25 +162,7 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
|
|||
myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available );
|
||||
myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total );
|
||||
}
|
||||
if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){
|
||||
TraceEvent("RkServerQueueInfo", ssi.id())
|
||||
.detail("LocalTime", reply.get().localTime)
|
||||
.detail("BytesDurable", reply.get().bytesDurable)
|
||||
.detail("BytesInput", reply.get().bytesInput)
|
||||
.detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal())
|
||||
.detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal())
|
||||
.detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate())
|
||||
.detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate())
|
||||
.detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal())
|
||||
.detail("Version", reply.get().v)
|
||||
.trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str());
|
||||
debug_lastTraceTime = now();
|
||||
}
|
||||
} else {
|
||||
//If the SS didn't respond, clear the queue info so that we know it might have failed
|
||||
if(myQueueInfo->value.valid)
|
||||
TraceEvent("RkServerQueueInfo", ssi.id()).trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str());
|
||||
|
||||
myQueueInfo->value.valid = false;
|
||||
}
|
||||
|
||||
|
@ -195,7 +176,6 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
|
|||
}
|
||||
|
||||
ACTOR Future<Void> trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) {
|
||||
state double debug_lastTraceTime = 0;
|
||||
self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) );
|
||||
state Map<UID, TLogQueueInfo>::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id());
|
||||
TraceEvent("RkTracking", tli.id());
|
||||
|
@ -220,20 +200,7 @@ ACTOR Future<Void> trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) {
|
|||
myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available);
|
||||
myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total);
|
||||
}
|
||||
if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){
|
||||
TraceEvent("RkTLogQueueInfo", tli.id()).detail("LocalTime", reply.get().localTime).detail("BytesDurable", reply.get().bytesDurable).detail("BytesInput", reply.get().bytesInput)
|
||||
.detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal()).detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal())
|
||||
.detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate()).detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate())
|
||||
.detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal())
|
||||
.detail("Version", reply.get().v)
|
||||
.trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str());
|
||||
debug_lastTraceTime = now();
|
||||
}
|
||||
} else {
|
||||
//If the TLog didn't respond, clear the queue info so that we know it might have failed
|
||||
if(myQueueInfo->value.valid)
|
||||
TraceEvent("RkTLogQueueInfo", tli.id()).trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str());
|
||||
|
||||
myQueueInfo->value.valid = false;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue