diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 7a753df210..d9f783e3de 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -293,7 +293,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { bool slowRateKeeper = randomize && BUGGIFY; init( SMOOTHING_AMOUNT, 1.0 ); if( slowRateKeeper ) SMOOTHING_AMOUNT = 5.0; init( SLOW_SMOOTHING_AMOUNT, 10.0 ); if( slowRateKeeper ) SLOW_SMOOTHING_AMOUNT = 50.0; - init( RATEKEEPER_LOGGING_INTERVAL, 5.0 ); init( METRIC_UPDATE_RATE, .1 ); if( slowRateKeeper ) METRIC_UPDATE_RATE = 0.5; bool smallStorageTarget = randomize && BUGGIFY; diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 31858b6ddb..6abdf0b55b 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -235,7 +235,6 @@ public: //Ratekeeper double SMOOTHING_AMOUNT; double SLOW_SMOOTHING_AMOUNT; - double RATEKEEPER_LOGGING_INTERVAL; double METRIC_UPDATE_RATE; double LAST_LIMITED_RATIO; diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 421a06a546..f844716133 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -88,27 +88,38 @@ ACTOR Future getDataInFlight( Database cx, Reference md ) { - int64_t bytesInput, bytesDurable; + double inputRate, durableRate; + double inputRoughness, durableRoughness; + int64_t inputBytes, durableBytes; - sscanf(extractAttribute(md.toString(), "BytesInput").c_str(), "%lld", &bytesInput); - sscanf(extractAttribute(md.toString(), "BytesDurable").c_str(), "%lld", &bytesDurable); + sscanf(extractAttribute(md.toString(), "bytesInput").c_str(), "%f %f %lld", &inputRate, &inputRoughness, &inputBytes); + sscanf(extractAttribute(md.toString(), "bytesDurable").c_str(), "%f %f %lld", &durableRate, &durableRoughness, &durableBytes); - return bytesInput - bytesDurable; + return inputBytes - durableBytes; } // This is not robust in the face of a TLog failure ACTOR Future getMaxTLogQueueSize( Database cx, Reference> dbInfo, WorkerInterface masterWorker ) { - TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName)) - .detail("Stage", "ContactingMaster"); + TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingLogs"); + + state std::vector> workers = wait(getWorkers(dbInfo)); + std::map workersMap; + for(auto worker : workers) { + workersMap[worker.first.address()] = worker.first; + } state std::vector>> messages; state std::vector tlogs = dbInfo->get().logSystemConfig.allPresentLogs(); for(int i = 0; i < tlogs.size(); i++) { - messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply( - EventLogRequest( StringRef( "TLogQueueSize/" + tlogs[i].id().toString() ) ) ), 1.0 ) ); + auto itr = workersMap.find(tlogs[i].address()); + if(itr == workersMap.end()) { + TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for log server").detail("Tlog", tlogs[i].id()); + throw attribute_not_found(); + } + messages.push_back( timeoutError(itr->second.eventLogRequest.getReply( + EventLogRequest( StringRef(tlogs[i].id().toString() + "/TLogMetrics") ) ), 1.0 ) ); } Void _ = wait( waitForAll( messages ) ); @@ -121,7 +132,7 @@ ACTOR Future getMaxTLogQueueSize( Database cx, Reference> getStorageServers( Database cx, boo //Gets the maximum size of all the storage server queues ACTOR Future getMaxStorageServerQueueSize( Database cx, Reference> dbInfo, WorkerInterface masterWorker ) { - TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster"); + TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingStorageServers"); + + Future> serversFuture = getStorageServers(cx); + state Future>> workersFuture = getWorkers(dbInfo); + + state std::vector servers = wait(serversFuture); + state std::vector> workers = wait(workersFuture); + + std::map workersMap; + for(auto worker : workers) { + workersMap[worker.first.address()] = worker.first; + } - state vector servers = wait( getStorageServers( cx ) ); state std::vector>> messages; for(int i = 0; i < servers.size(); i++) { - messages.push_back( timeoutError(masterWorker.eventLogRequest.getReply( - EventLogRequest( StringRef( "StorageServerQueueSize/" + servers[i].id().toString() ) ) ), 1.0 ) ); + auto itr = workersMap.find(servers[i].address()); + if(itr == workersMap.end()) { + TraceEvent("QuietDatabaseFailure").detail("Reason", "Could not find worker for storage server").detail("SS", servers[i].id()); + throw attribute_not_found(); + } + messages.push_back( timeoutError(itr->second.eventLogRequest.getReply( + EventLogRequest( StringRef(servers[i].id().toString() + "/StorageMetrics") ) ), 1.0 ) ); } Void _ = wait( waitForAll(messages) ); diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 15f56338e7..ad367c3e24 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -137,7 +137,6 @@ struct Ratekeeper { //SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function ACTOR Future trackStorageServerQueueInfo( Ratekeeper* self, StorageServerInterface ssi ) { - state double debug_lastTraceTime = 0; self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) ); state Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); TraceEvent("RkTracking", ssi.id()); @@ -163,25 +162,7 @@ ACTOR Future trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available ); myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total ); } - if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){ - TraceEvent("RkServerQueueInfo", ssi.id()) - .detail("LocalTime", reply.get().localTime) - .detail("BytesDurable", reply.get().bytesDurable) - .detail("BytesInput", reply.get().bytesInput) - .detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal()) - .detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal()) - .detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate()) - .detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate()) - .detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal()) - .detail("Version", reply.get().v) - .trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str()); - debug_lastTraceTime = now(); - } } else { - //If the SS didn't respond, clear the queue info so that we know it might have failed - if(myQueueInfo->value.valid) - TraceEvent("RkServerQueueInfo", ssi.id()).trackLatest(("StorageServerQueueSize/" + ssi.id().toString()).c_str()); - myQueueInfo->value.valid = false; } @@ -195,7 +176,6 @@ ACTOR Future trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI } ACTOR Future trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) { - state double debug_lastTraceTime = 0; self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) ); state Map::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id()); TraceEvent("RkTracking", tli.id()); @@ -220,20 +200,7 @@ ACTOR Future trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) { myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); } - if (now() > debug_lastTraceTime + SERVER_KNOBS->RATEKEEPER_LOGGING_INTERVAL){ - TraceEvent("RkTLogQueueInfo", tli.id()).detail("LocalTime", reply.get().localTime).detail("BytesDurable", reply.get().bytesDurable).detail("BytesInput", reply.get().bytesInput) - .detail("BytesDurableSmooth", myQueueInfo->value.smoothDurableBytes.smoothTotal()).detail("BytesInputSmooth", myQueueInfo->value.smoothInputBytes.smoothTotal()) - .detail("BytesDurableRate", myQueueInfo->value.verySmoothDurableBytes.smoothRate()).detail("BytesInputRate", myQueueInfo->value.smoothInputBytes.smoothRate()) - .detail("FreeSpaceSmooth", myQueueInfo->value.smoothFreeSpace.smoothTotal()).detail("TotalSpaceSmooth", myQueueInfo->value.smoothTotalSpace.smoothTotal()) - .detail("Version", reply.get().v) - .trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str()); - debug_lastTraceTime = now(); - } } else { - //If the TLog didn't respond, clear the queue info so that we know it might have failed - if(myQueueInfo->value.valid) - TraceEvent("RkTLogQueueInfo", tli.id()).trackLatest(("TLogQueueSize/" + tli.id().toString()).c_str()); - myQueueInfo->value.valid = false; }