From b79feaddd36a13bf4c7ffa1a26267863cb1d7ba4 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Mon, 18 Jun 2018 17:22:40 -0700 Subject: [PATCH 1/3] added a hard memory limit to the TLog to prevent it from running out of memory. Because remote logs are not ratekeeper controlled this is their only protection --- fdbserver/Knobs.cpp | 1 + fdbserver/Knobs.h | 1 + fdbserver/TLogServer.actor.cpp | 26 +++++++++++++++++++++++++- fdbserver/storageserver.actor.cpp | 5 ++--- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 89aa792474..56155ef8fd 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -319,6 +319,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( TARGET_BYTES_PER_TLOG, 2000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3; init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3; init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0; + init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3; init( MAX_TRANSACTIONS_PER_BYTE, 1000 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 4d77ecc42d..94671b7747 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -257,6 +257,7 @@ public: int64_t TARGET_BYTES_PER_TLOG; double SPRING_BYTES_TLOG; int64_t TLOG_SPILL_THRESHOLD; + int64_t TLOG_HARD_LIMIT_BYTES; double MAX_TRANSACTIONS_PER_BYTE; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 0494b1f319..a5ad779aea 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1180,6 +1180,18 @@ ACTOR Future tLogCommit( return Void(); } + state double waitStartT = 0; + while( self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES ) { + if (now() - waitStartT >= 1) { + TraceEvent(SevWarn, "TLogUpdateLag", logData->logId) + .detail("Version", logData->version.get()) + .detail("PersistentDataVersion", logData->persistentDataVersion) + .detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion).suppressFor(1.0); + waitStartT = now(); + } + Void _ = wait( delayJittered(.005, TaskTLogCommit) ); + } + if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!) if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before"); @@ -1448,7 +1460,7 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st while (!endVersion.present() || logData->version.get() < endVersion.get()) { loop { choose { - when(Void _ = wait( r ? r->getMore() : Never() ) ) { + when(Void _ = wait( r ? r->getMore(TaskTLogCommit) : Never() ) ) { if(poppedIsKnownCommitted) { logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped()); } @@ -1469,6 +1481,18 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st return Void(); } + state double waitStartT = 0; + while( self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES ) { + if (now() - waitStartT >= 1) { + TraceEvent(SevWarn, "TLogUpdateLag", logData->logId) + .detail("Version", logData->version.get()) + .detail("PersistentDataVersion", logData->persistentDataVersion) + .detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion).suppressFor(1.0); + waitStartT = now(); + } + Void _ = wait( delayJittered(.005, TaskTLogCommit) ); + } + Version ver = 0; std::vector messages; while (true) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index f1ca4c4e49..1279bbbb8e 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2323,9 +2323,8 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) // If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory // This is often referred to as the storage server e-brake (emergency brake) state double waitStartT = 0; - while ( data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES && data->durableVersion.get() < data->desiredOldestVersion.get() ) - { - if (now() - waitStartT >= .1) { + while ( data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES && data->durableVersion.get() < data->desiredOldestVersion.get() ) { + if (now() - waitStartT >= 1) { TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID) .detail("Version", data->version.get()) .detail("DurableVersion", data->durableVersion.get()).suppressFor(1.0); From 0bdd25df239b88f328c05b9c86041aa6b2744d49 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Mon, 18 Jun 2018 17:23:55 -0700 Subject: [PATCH 2/3] ratekeeper does not control on remote storage servers --- fdbserver/DataDistribution.actor.cpp | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 507abaf25a..6fe6b60cee 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -484,7 +484,7 @@ Future storageServerTracker( MoveKeysLock const& lock, UID const& masterId, std::map>* const& other_servers, - PromiseStream< std::pair> > const& changes, + Optional> >> const& changes, Promise const& errorOut, Version const& addedVersion); @@ -513,7 +513,7 @@ struct DDTeamCollection { PromiseStream removedServers; std::set recruitingIds; // The IDs of the SS which are being recruited std::set recruitingLocalities; - PromiseStream< std::pair> > serverChanges; + Optional> >> serverChanges; Future initialFailureReactionDelay; Future initializationDoneActor; Promise serverTrackerErrorOut; @@ -544,7 +544,7 @@ struct DDTeamCollection { DatabaseConfiguration configuration, std::vector> includedDCs, Optional>> otherTrackedDCs, - PromiseStream< std::pair> > const& serverChanges, + Optional> >> const& serverChanges, Future readyToStart, Reference> zeroHealthyTeams, bool primary, Reference> processingUnhealthy) :cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ), @@ -1578,7 +1578,7 @@ ACTOR Future storageServerTracker( MoveKeysLock lock, UID masterId, std::map>* other_servers, - PromiseStream< std::pair> > changes, + Optional> >> changes, Promise errorOut, Version addedVersion) { @@ -1593,7 +1593,9 @@ ACTOR Future storageServerTracker( state Future storeTracker = keyValueStoreTypeTracker( self, server ); state bool hasWrongStoreTypeOrDC = false; - changes.send( std::make_pair(server->id, server->lastKnownInterface) ); + if(changes.present()) { + changes.get().send( std::make_pair(server->id, server->lastKnownInterface) ); + } try { loop { @@ -1680,7 +1682,9 @@ ACTOR Future storageServerTracker( when( Void _ = wait( failureTracker ) ) { // The server is failed AND all data has been removed from it, so permanently remove it. TraceEvent("StatusMapChange", masterId).detail("ServerID", server->id).detail("Status", "Removing"); - changes.send( std::make_pair(server->id, Optional()) ); + if(changes.present()) { + changes.get().send( std::make_pair(server->id, Optional()) ); + } // Remove server from FF/serverList Void _ = wait( removeStorageServer( cx, server->id, lock ) ); @@ -1699,7 +1703,9 @@ ACTOR Future storageServerTracker( server->lastKnownInterface = newInterface.first; server->lastKnownClass = newInterface.second; interfaceChanged = server->onInterfaceChanged; - changes.send( std::make_pair(server->id, server->lastKnownInterface) ); + if(changes.present()) { + changes.get().send( std::make_pair(server->id, server->lastKnownInterface) ); + } // We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality ); @@ -1918,7 +1924,7 @@ ACTOR Future dataDistributionTeamCollection( DatabaseConfiguration configuration, std::vector> includedDCs, Optional>> otherTrackedDCs, - PromiseStream< std::pair> > serverChanges, + Optional> >> serverChanges, Future readyToStart, Reference> zeroHealthyTeams, bool primary, @@ -2208,7 +2214,7 @@ ACTOR Future dataDistribution( actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) ); if (configuration.usableRegions > 1) { - actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional>>(), Optional> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) ); } Void _ = wait( waitForAll( actors ) ); From eaca0fb2eab4b034782ec3a60b4027f6025f512d Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Mon, 18 Jun 2018 17:36:40 -0700 Subject: [PATCH 3/3] fixed incorrect priorities on the log router --- fdbserver/LogRouter.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 6f33c9f9f6..4a81554d1b 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -179,7 +179,7 @@ ACTOR Future pullAsyncData( LogRouterData *self ) { loop { loop { choose { - when(Void _ = wait( r ? r->getMore() : Never() ) ) { + when(Void _ = wait( r ? r->getMore(TaskTLogCommit) : Never() ) ) { break; } when( Void _ = wait( dbInfoChange ) ) { //FIXME: does this actually happen? @@ -336,7 +336,7 @@ ACTOR Future logRouterPop( LogRouterData* self, TLogPopRequest req ) { while(!self->messageBlocks.empty() && self->messageBlocks.front().first < minPopped) { self->messageBlocks.pop_front(); - Void _ = wait(yield(TaskUpdateStorage)); + Void _ = wait(yield(TaskTLogPop)); } if(self->logSystem->get() && self->allowPops) {