From 3c86643822f23164198828456218a7d1fdc8da8d Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 14 Feb 2019 16:24:46 -0800 Subject: [PATCH 01/14] Separate Ratekeeper from data distribution. Add a new role for ratekeeper. Remove StorageServerChanges from data distribution. Ratekeeper monitors storage servers, which borrows the idea from DataDistribution. --- fdbrpc/Locality.cpp | 23 +++++ fdbrpc/Locality.h | 7 +- fdbrpc/simulator.h | 1 + fdbserver/CMakeLists.txt | 1 + fdbserver/ClusterController.actor.cpp | 92 ++++++++++++++++++-- fdbserver/ClusterRecruitmentInterface.h | 7 +- fdbserver/DataDistribution.actor.cpp | 77 +++++------------ fdbserver/DataDistribution.actor.h | 1 + fdbserver/DataDistributorInterface.h | 36 +------- fdbserver/Knobs.cpp | 2 + fdbserver/Knobs.h | 2 + fdbserver/MasterProxyServer.actor.cpp | 17 ++-- fdbserver/Ratekeeper.actor.cpp | 110 ++++++++++++++++++------ fdbserver/Ratekeeper.h | 35 -------- fdbserver/RatekeeperInterface.h | 93 ++++++++++++++++++++ fdbserver/ServerDBInfo.h | 2 + fdbserver/WorkerInterface.actor.h | 16 +++- fdbserver/fdbserver.vcxproj | 2 +- fdbserver/fdbserver.vcxproj.filters | 3 +- fdbserver/masterserver.actor.cpp | 1 - fdbserver/worker.actor.cpp | 26 +++++- flow/network.h | 1 + 22 files changed, 377 insertions(+), 178 deletions(-) delete mode 100644 fdbserver/Ratekeeper.h create mode 100644 fdbserver/RatekeeperInterface.h diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index d1f1957d2a..ff9135d77e 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -185,6 +185,29 @@ ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) cons default: return ProcessClass::WorstFit; } + case ProcessClass::RateKeeper: + switch( _class ) { + case ProcessClass::RateKeeperClass: + return ProcessClass::BestFit; + case ProcessClass::StatelessClass: + return ProcessClass::GoodFit; + case ProcessClass::MasterClass: + return ProcessClass::OkayFit; + case ProcessClass::ResolutionClass: + return ProcessClass::OkayFit; + case ProcessClass::TransactionClass: + return ProcessClass::OkayFit; + case ProcessClass::ProxyClass: + return ProcessClass::OkayFit; + case ProcessClass::UnsetClass: + return ProcessClass::UnsetFit; + case ProcessClass::CoordinatorClass: + return ProcessClass::NeverAssign; + case ProcessClass::TesterClass: + return ProcessClass::NeverAssign; + default: + return ProcessClass::WorstFit; + } default: return ProcessClass::NeverAssign; } diff --git a/fdbrpc/Locality.h b/fdbrpc/Locality.h index bae2fd69e8..1415ad9eff 100644 --- a/fdbrpc/Locality.h +++ b/fdbrpc/Locality.h @@ -26,9 +26,9 @@ struct ProcessClass { // This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items! - enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, DataDistributorClass, CoordinatorClass, InvalidClass = -1 }; + enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, DataDistributorClass, CoordinatorClass, RateKeeperClass, InvalidClass = -1 }; enum Fitness { BestFit, GoodFit, UnsetFit, OkayFit, WorstFit, ExcludeFit, NeverAssign }; //cannot be larger than 7 because of leader election mask - enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController, DataDistributor, NoRole }; + enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController, DataDistributor, RateKeeper, NoRole }; enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 }; int16_t _class; int16_t _source; @@ -50,6 +50,7 @@ public: else if (s=="cluster_controller") _class = ClusterControllerClass; else if (s=="data_distributor") _class = DataDistributorClass; else if (s=="coordinator") _class = CoordinatorClass; + else if (s=="ratekeeper") _class = RateKeeperClass; else _class = InvalidClass; } @@ -67,6 +68,7 @@ public: else if (classStr=="cluster_controller") _class = ClusterControllerClass; else if (classStr=="data_distributor") _class = DataDistributorClass; else if (classStr=="coordinator") _class = CoordinatorClass; + else if (classStr=="ratekeeper") _class = RateKeeperClass; else _class = InvalidClass; if (sourceStr=="command_line") _source = CommandLineSource; @@ -99,6 +101,7 @@ public: case ClusterControllerClass: return "cluster_controller"; case DataDistributorClass: return "data_distributor"; case CoordinatorClass: return "coordinator"; + case RateKeeperClass: return "ratekeeper"; default: return "invalid"; } } diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 2bfd34e98f..2987c80655 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -99,6 +99,7 @@ public: case ProcessClass::LogRouterClass: return false; case ProcessClass::ClusterControllerClass: return false; case ProcessClass::DataDistributorClass: return false; + case ProcessClass::RateKeeperClass: return false; default: return false; } } diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 9e630d4c7c..58853a2fee 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -57,6 +57,7 @@ set(FDBSERVER_SRCS QuietDatabase.h Ratekeeper.actor.cpp Ratekeeper.h + RatekeeperInterface.h RecoveryState.h Restore.actor.cpp RestoreInterface.h diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index aadebd93c2..d95b9c1385 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -30,6 +30,7 @@ #include "fdbserver/LogSystemConfig.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/ClusterRecruitmentInterface.h" +#include "fdbserver/RatekeeperInterface.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/Status.h" #include "fdbserver/LatencyBandConfig.h" @@ -110,17 +111,28 @@ public: { } - void setDistributor(const DataDistributorInterface& distributorInterf) { + void setDistributor(const DataDistributorInterface& interf) { ServerDBInfo newInfo = serverInfo->get(); newInfo.id = g_random->randomUniqueID(); - newInfo.distributor = distributorInterf; + newInfo.distributor = interf; serverInfo->set( newInfo ); } - void clearDistributor() { + void setRatekeeper(const RatekeeperInterface& interf) { ServerDBInfo newInfo = serverInfo->get(); newInfo.id = g_random->randomUniqueID(); - newInfo.distributor = Optional(); + newInfo.ratekeeper = interf; + serverInfo->set( newInfo ); + } + + void clearInterf(ProcessClass::ClassType t) { + ServerDBInfo newInfo = serverInfo->get(); + newInfo.id = g_random->randomUniqueID(); + if (t == ProcessClass::DataDistributorClass) { + newInfo.distributor = Optional(); + } else if (t == ProcessClass::RateKeeperClass) { + newInfo.ratekeeper = Optional(); + } serverInfo->set( newInfo ); } }; @@ -524,6 +536,9 @@ public: if (db.serverInfo->get().distributor.present()) { (*id_used)[db.serverInfo->get().distributor.get().locality.processId()]++; } + if (db.serverInfo->get().ratekeeper.present()) { + (*id_used)[db.serverInfo->get().ratekeeper.get().locality.processId()]++; + } } RecruitRemoteFromConfigurationReply findRemoteWorkersForConfiguration( RecruitRemoteFromConfigurationRequest const& req ) { @@ -1752,7 +1767,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { if ( req.distributorInterf.present() && !self->db.serverInfo->get().distributor.present() ) { const DataDistributorInterface& di = req.distributorInterf.get(); TraceEvent("ClusterController_RegisterDataDistributor", self->id).detail("DDID", di.id()); - self->db.setDistributor( di ); + self->db.setDistributor(di); } if( info == self->id_worker.end() ) { self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo ); @@ -2341,7 +2356,7 @@ ACTOR Future startDataDistributor( ClusterControllerDa } } -ACTOR Future waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterControllerFullInterface *clusterInterface ) { +ACTOR Future waitDDRejoinOrStartDD(ClusterControllerData *self) { state Future initialDelay = delay(SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY); // wait for a while to see if existing data distributor will join. @@ -2361,10 +2376,68 @@ ACTOR Future waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterCo wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) ); TraceEvent("ClusterController", self->id) .detail("DataDistributorDied", self->db.serverInfo->get().distributor.get().id()); - self->db.clearDistributor(); + self->db.clearInterf(ProcessClass::DataDistributorClass); } else { DataDistributorInterface distributorInterf = wait( startDataDistributor(self) ); - self->db.setDistributor( distributorInterf ); + self->db.setDistributor(distributorInterf); + } + } +} + +ACTOR Future startRatekeeper(ClusterControllerData *self) { + loop { + try { + while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) { + wait( self->db.serverInfo->onChange() ); + } + + std::map>, int> id_used = self->getUsedIds(); + Optional dcId = self->clusterControllerDcId; + state WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used); + state InitializeRatekeeperRequest req; + req.reqId = g_random->randomUniqueID(); + TraceEvent("ClusterController_RecruitRatekeeper", req.reqId).detail("Addr", rkWorker.worker.first.address()); + + ErrorOr interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) ); + if (interf.present()) { + TraceEvent("ClusterController_RatekeeperRecruited", req.reqId).detail("Addr", rkWorker.worker.first.address()); + return interf.get(); + } + } + catch (Error& e) { + TraceEvent("ClusterController_RatekeeperRecruitError", req.reqId).error(e); + if ( e.code() != error_code_no_more_servers ) { + throw; + } + } + wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) ); + } +} + +ACTOR Future waitRKRejoinOrStartRK(ClusterControllerData *self) { + state Future initialDelay = delay(SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY); + + // wait for a while to see if an existing ratekeeper will join. + loop choose { + when ( wait(initialDelay) ) { break; } + when ( wait(self->db.serverInfo->onChange()) ) { // Rejoins via worker registration + if ( self->db.serverInfo->get().ratekeeper.present() ) { + TraceEvent("ClusterController_GotRateKeeper", self->id) + .detail("RKID", self->db.serverInfo->get().ratekeeper.get().id()); + break; + } + } + } + + loop { + if ( self->db.serverInfo->get().ratekeeper.present() ) { + wait( waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ) ); + TraceEvent("ClusterController", self->id) + .detail("RatekeeperDied", self->db.serverInfo->get().ratekeeper.get().id()); + self->db.clearInterf(ProcessClass::RateKeeperClass); + } else { + RatekeeperInterface rkInterf = wait( startRatekeeper(self) ); + self->db.setRatekeeper(rkInterf); } } } @@ -2385,8 +2458,9 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, self.addActor.send( updatedChangingDatacenters(&self) ); self.addActor.send( updatedChangedDatacenters(&self) ); self.addActor.send( updateDatacenterVersionDifference(&self) ); - self.addActor.send( waitDDRejoinOrStartDD(&self, &interf) ); self.addActor.send( handleForcedRecoveries(&self, interf) ); + self.addActor.send( waitDDRejoinOrStartDD(&self) ); + self.addActor.send( waitRKRejoinOrStartRK(&self) ); //printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str()); loop choose { diff --git a/fdbserver/ClusterRecruitmentInterface.h b/fdbserver/ClusterRecruitmentInterface.h index aefbe167c8..db49a9d075 100644 --- a/fdbserver/ClusterRecruitmentInterface.h +++ b/fdbserver/ClusterRecruitmentInterface.h @@ -168,15 +168,16 @@ struct RegisterWorkerRequest { ClusterControllerPriorityInfo priorityInfo; Generation generation; Optional distributorInterf; + Optional ratekeeperInterf; ReplyPromise reply; RegisterWorkerRequest() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {} - RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional ddInterf) : - wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf) {} + RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional ddInterf, Optional rkInterf) : + wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf) {} template void serialize( Ar& ar ) { - serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, reply); + serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, ratekeeperInterf, reply); } }; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 48f5732fca..fe52e5e42a 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -29,7 +29,6 @@ #include "fdbserver/WaitFailure.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/IKeyValueStore.h" -#include "fdbserver/Ratekeeper.h" #include "fdbclient/ManagementAPI.actor.h" #include "fdbrpc/Replication.h" #include "flow/UnitTest.h" @@ -570,7 +569,6 @@ struct DDTeamCollection : ReferenceCounted { PromiseStream removedServers; std::set recruitingIds; // The IDs of the SS which are being recruited std::set recruitingLocalities; - Optional> >> serverChanges; Future initialFailureReactionDelay; Future initializationDoneActor; Promise serverTrackerErrorOut; @@ -629,13 +627,12 @@ struct DDTeamCollection : ReferenceCounted { Reference const& shardsAffectedByTeamFailure, DatabaseConfiguration configuration, std::vector> includedDCs, Optional>> otherTrackedDCs, - Optional>>> const& serverChanges, Future readyToStart, Reference> zeroHealthyTeams, bool primary, Reference> processingUnhealthy) : cx(cx), distributorId(distributorId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), teamBuilder(Void()), badTeamRemover(Void()), redundantTeamRemover(Void()), configuration(configuration), - serverChanges(serverChanges), readyToStart(readyToStart), + readyToStart(readyToStart), checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskDataDistribution)), initialFailureReactionDelay( delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution)), @@ -2839,10 +2836,6 @@ ACTOR Future storageServerTracker( state Future storeTracker = keyValueStoreTypeTracker( self, server ); state bool hasWrongStoreTypeOrDC = false; - if(self->serverChanges.present()) { - self->serverChanges.get().send( std::make_pair(server->id, server->lastKnownInterface) ); - } - try { loop { status.isUndesired = false; @@ -2933,9 +2926,6 @@ ACTOR Future storageServerTracker( when( wait( failureTracker ) ) { // The server is failed AND all data has been removed from it, so permanently remove it. TraceEvent("StatusMapChange", self->distributorId).detail("ServerID", server->id).detail("Status", "Removing"); - if(self->serverChanges.present()) { - self->serverChanges.get().send( std::make_pair(server->id, Optional()) ); - } if(server->updated.canBeSet()) { server->updated.send(Void()); @@ -3040,9 +3030,6 @@ ACTOR Future storageServerTracker( } interfaceChanged = server->onInterfaceChanged; - if(self->serverChanges.present()) { - self->serverChanges.get().send( std::make_pair(server->id, server->lastKnownInterface) ); - } // We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality ); @@ -3460,13 +3447,11 @@ ACTOR Future pollMoveKeysLock( Database cx, MoveKeysLock lock ) { } } -ACTOR Future dataDistribution( - Reference> db, - UID myId, - PromiseStream< std::pair> > serverChanges, - double* lastLimited) +ACTOR Future dataDistribution(Reference self, + double* lastLimited) { - state Database cx = openDBOnServer(db, TaskDataDistributionLaunch, true, true); + state Database cx = openDBOnServer(self->dbInfo, TaskDataDistributionLaunch, true, true); + state DatabaseConfiguration configuration = self->configuration->get(); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; //cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); @@ -3532,20 +3517,20 @@ ACTOR Future dataDistribution( Reference initData_ = wait( getInitialDataDistribution(cx, myId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector>() ) ); initData = initData_; if(initData->shards.size() > 1) { - TraceEvent("DDInitGotInitialDD", myId) + TraceEvent("DDInitGotInitialDD", self->ddId) .detail("B", printable(initData->shards.end()[-2].key)) .detail("E", printable(initData->shards.end()[-1].key)) .detail("Src", describe(initData->shards.end()[-2].primarySrc)) .detail("Dest", describe(initData->shards.end()[-2].primaryDest)) .trackLatest("InitialDD"); } else { - TraceEvent("DDInitGotInitialDD", myId).detail("B","").detail("E", "").detail("Src", "[no items]").detail("Dest", "[no items]").trackLatest("InitialDD"); + TraceEvent("DDInitGotInitialDD", self->ddId).detail("B","").detail("E", "").detail("Src", "[no items]").detail("Dest", "[no items]").trackLatest("InitialDD"); } if (initData->mode) break; // mode may be set true by system operator using fdbcli - TraceEvent("DataDistributionDisabled", myId); + TraceEvent("DataDistributionDisabled", self->ddId); - TraceEvent("MovingData", myId) + TraceEvent("MovingData", self->ddId) .detail( "InFlight", 0 ) .detail( "InQueue", 0 ) .detail( "AverageShardSize", -1 ) @@ -3554,8 +3539,8 @@ ACTOR Future dataDistribution( .detail( "HighestPriority", 0 ) .trackLatest( "MovingData" ); - TraceEvent("TotalDataInFlight", myId).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight"); - TraceEvent("TotalDataInFlight", myId).detail("Primary", false).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1).trackLatest("TotalDataInFlightRemote"); + TraceEvent("TotalDataInFlight", self->ddId).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight"); + TraceEvent("TotalDataInFlight", self->ddId).detail("Primary", false).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1).trackLatest("TotalDataInFlightRemote"); wait( waitForDataDistributionEnabled(cx) ); TraceEvent("DataDistributionEnabled"); @@ -3573,12 +3558,12 @@ ACTOR Future dataDistribution( state Reference shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure ); state int shard = 0; - for(; shardshards.size() - 1; shard++) { + for (; shard < initData->shards.size() - 1; shard++) { KeyRangeRef keys = KeyRangeRef(initData->shards[shard].key, initData->shards[shard+1].key); shardsAffectedByTeamFailure->defineShard(keys); std::vector teams; teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].primarySrc, true)); - if(configuration.usableRegions > 1) { + if (configuration.usableRegions > 1) { teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false)); } if(g_network->isSimulated()) { @@ -3587,11 +3572,11 @@ ACTOR Future dataDistribution( } shardsAffectedByTeamFailure->moveShard(keys, teams); - if(initData->shards[shard].hasDest) { + if (initData->shards[shard].hasDest) { // This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in // DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement. bool unhealthy = initData->shards[shard].primarySrc.size() != configuration.storageTeamSize; - if(!unhealthy && configuration.usableRegions > 1) { + if (!unhealthy && configuration.usableRegions > 1) { unhealthy = initData->shards[shard].remoteSrc.size() != configuration.storageTeamSize; } output.send( RelocateShard( keys, unhealthy ? PRIORITY_TEAM_UNHEALTHY : PRIORITY_RECOVER_MOVE ) ); @@ -3620,20 +3605,20 @@ ACTOR Future dataDistribution( } actors.push_back( pollMoveKeysLock(cx, lock) ); - actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, myId ), "DDTracker", myId, &normalDDQueueErrors() ) ); - actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, myId, storageTeamSize, lastLimited ), "DDQueue", myId, &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); vector teamCollectionsPtrs; - Reference primaryTeamCollection( new DDTeamCollection(cx, myId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ); + Reference primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, self->primaryDcId, configuration.usableRegions > 1 ? self->remoteDcIds : std::vector>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ); teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr()); if (configuration.usableRegions > 1) { - Reference remoteTeamCollection( new DDTeamCollection(cx, myId, lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional>>(), serverChanges, readyToStart.getFuture() && remoteRecovered(db), zeroHealthyTeams[1], false, processingUnhealthy) ); + Reference remoteTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, self->remoteDcIds, Optional>>(), readyToStart.getFuture() && remoteRecovered(self->dbInfo), zeroHealthyTeams[1], false, processingUnhealthy) ); teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr()); remoteTeamCollection->teamCollections = teamCollectionsPtrs; - actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], db ), "DDTeamCollectionSecondary", myId, &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], self->dbInfo ), "DDTeamCollectionSecondary", self->ddId, &normalDDQueueErrors() ) ); } primaryTeamCollection->teamCollections = teamCollectionsPtrs; - actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( primaryTeamCollection, initData, tcis[0], db ), "DDTeamCollectionPrimary", myId, &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( primaryTeamCollection, initData, tcis[0], self->dbInfo ), "DDTeamCollectionPrimary", self->ddId, &normalDDQueueErrors() ) ); actors.push_back(yieldPromiseStream(output.getFuture(), input)); wait( waitForAll( actors ) ); @@ -3654,7 +3639,6 @@ ACTOR Future dataDistribution( struct DataDistributorData : NonCopyable, ReferenceCounted { Reference> dbInfo; UID ddId; - PromiseStream< std::pair> > ddStorageServerChanges; PromiseStream> addActor; DataDistributorData(Reference> const& db, UID id) : dbInfo(db), ddId(id) {} @@ -3672,19 +3656,7 @@ static std::set const& normalDataDistributorErrors() { return s; } -static std::set const& normalRateKeeperErrors() { - static std::set s; - if (s.empty()) { - s.insert( error_code_worker_removed ); - s.insert( error_code_broken_promise ); - s.insert( error_code_actor_cancelled ); - s.insert( error_code_please_reboot ); - } - return s; -} - ACTOR Future dataDistributor(DataDistributorInterface di, Reference> db ) { - state UID lastClusterControllerID(0,0); state Reference self( new DataDistributorData(db, di.id()) ); state Future collection = actorCollection( self->addActor.getFuture() ); @@ -3693,10 +3665,8 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference> > ddStorageServerChanges; state double lastLimited = 0; - state Future distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), ddStorageServerChanges, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); - self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) ); + state Future distributor = reportErrorsExcept( dataDistribution( self->dbInfo, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); wait( distributor || collection ); } @@ -3732,7 +3702,6 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro conf, {}, {}, - PromiseStream>>(), Future(Void()), Reference>( new AsyncVar(true) ), true, @@ -3765,7 +3734,7 @@ DDTeamCollection* testMachineTeamCollection(int teamSize, IRepPolicyRef policy, DDTeamCollection* collection = new DDTeamCollection(database, UID(0, 0), MoveKeysLock(), PromiseStream(), Reference(new ShardsAffectedByTeamFailure()), conf, {}, {}, - PromiseStream>>(), Future(Void()), + Future(Void()), Reference>(new AsyncVar(true)), true, Reference>(new AsyncVar(false))); diff --git a/fdbserver/DataDistribution.actor.h b/fdbserver/DataDistribution.actor.h index b838217192..1baff7f58e 100644 --- a/fdbserver/DataDistribution.actor.h +++ b/fdbserver/DataDistribution.actor.h @@ -253,5 +253,6 @@ int64_t getMaxShardSize( double dbSizeEstimate ); class DDTeamCollection; ACTOR Future teamRemover(DDTeamCollection* self); ACTOR Future teamRemoverPeriodic(DDTeamCollection* self); +ACTOR Future>> getServerListAndProcessClasses(Transaction* tr); #endif diff --git a/fdbserver/DataDistributorInterface.h b/fdbserver/DataDistributorInterface.h index 2150eb08e5..d437fc69ae 100644 --- a/fdbserver/DataDistributorInterface.h +++ b/fdbserver/DataDistributorInterface.h @@ -27,15 +27,14 @@ struct DataDistributorInterface { RequestStream> waitFailure; - RequestStream getRateInfo; struct LocalityData locality; DataDistributorInterface() {} explicit DataDistributorInterface(const struct LocalityData& l) : locality(l) {} void initEndpoints() {} - UID id() const { return getRateInfo.getEndpoint().token; } - NetworkAddress address() const { return getRateInfo.getEndpoint().getPrimaryAddress(); } + UID id() const { return waitFailure.getEndpoint().token; } + NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); } bool operator== (const DataDistributorInterface& r) const { return id() == r.id(); } @@ -45,36 +44,7 @@ struct DataDistributorInterface { template void serialize(Archive& ar) { - serializer(ar, waitFailure, getRateInfo, locality); - } -}; - -struct GetRateInfoRequest { - UID requesterID; - int64_t totalReleasedTransactions; - int64_t batchReleasedTransactions; - bool detailed; - ReplyPromise reply; - - GetRateInfoRequest() {} - GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed) - : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {} - - template - void serialize(Ar& ar) { - serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply); - } -}; - -struct GetRateInfoReply { - double transactionRate; - double batchTransactionRate; - double leaseDuration; - HealthMetrics healthMetrics; - - template - void serialize(Ar& ar) { - serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics); + serializer(ar, waitFailure, locality); } }; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 8b37bb55fb..815d933211 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -307,11 +307,13 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY, 5.0 ); init( ATTEMPT_RECRUITMENT_DELAY, 0.035 ); init( WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 1.0 ); + init( WAIT_FOR_RATEKEEPER_JOIN_DELAY, 1.0 ); init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0; init( CHECK_OUTSTANDING_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) CHECK_OUTSTANDING_INTERVAL = 0.001; init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0; init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND ); init( FORCE_RECOVERY_CHECK_DELAY, 5.0 ); + init( RATEKEEPER_FAILURE_TIME, 1.0 ); init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0; init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index f3698b3561..b2b77861db 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -248,12 +248,14 @@ public: double WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY; double ATTEMPT_RECRUITMENT_DELAY; double WAIT_FOR_DISTRIBUTOR_JOIN_DELAY; + double WAIT_FOR_RATEKEEPER_JOIN_DELAY; double WORKER_FAILURE_TIME; double CHECK_OUTSTANDING_INTERVAL; double INCOMPATIBLE_PEERS_LOGGING_INTERVAL; double VERSION_LAG_METRIC_INTERVAL; int64_t MAX_VERSION_DIFFERENCE; double FORCE_RECOVERY_CHECK_DELAY; + double RATEKEEPER_FAILURE_TIME; // Knobs used to select the best policy (via monte carlo) int POLICY_RATING_TESTS; // number of tests per policy (in order to compare) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 8755a69069..2b3299572e 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -97,18 +97,15 @@ ACTOR Future getRate(UID myID, Reference> db, int64 state int64_t lastTC = 0; - if (db->get().distributor.present()) { - nextRequestTimer = Void(); - } - + if (db->get().ratekeeper.present()) nextRequestTimer = Void(); loop choose { when ( wait( db->onChange() ) ) { - if ( db->get().distributor.present() ) { - TraceEvent("Proxy_DataDistributorChanged", myID) - .detail("DDID", db->get().distributor.get().id()); - nextRequestTimer = Void(); // trigger GetRate request + if ( db->get().ratekeeper.present() ) { + TraceEvent("Proxy_RatekeeperChanged", myID) + .detail("RKID", db->get().ratekeeper.get().id()); + nextRequestTimer = Void(); // trigger GetRate request } else { - TraceEvent("Proxy_DataDistributorDied", myID); + TraceEvent("Proxy_RatekeeperDied", myID); nextRequestTimer = Never(); reply = Never(); } @@ -116,7 +113,7 @@ ACTOR Future getRate(UID myID, Reference> db, int64 when ( wait( nextRequestTimer ) ) { nextRequestTimer = Never(); bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE; - reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed))); + reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed))); expectingDetailedReply = detailed; } when ( GetRateInfoReply rep = wait(reply) ) { diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 1baf876d51..83a8778411 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -19,13 +19,14 @@ */ #include "flow/IndexedSet.h" -#include "fdbserver/Ratekeeper.h" #include "fdbrpc/FailureMonitor.h" -#include "fdbserver/Knobs.h" #include "fdbrpc/Smoother.h" -#include "fdbserver/ServerDBInfo.h" #include "fdbrpc/simulator.h" #include "fdbclient/ReadYourWrites.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/DataDistribution.h" +#include "fdbserver/ServerDBInfo.h" +#include "fdbserver/WaitFailure.h" #include "flow/actorcompiler.h" // This must be the last #include. enum limitReason_t { @@ -146,7 +147,7 @@ struct TransactionCounts { TransactionCounts() : total(0), batch(0), time(0) {} }; -struct Ratekeeper { +struct RatekeeperData { Map storageQueueInfo; Map tlogQueueInfo; @@ -154,6 +155,7 @@ struct Ratekeeper { Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes; HealthMetrics healthMetrics; DatabaseConfiguration configuration; + PromiseStream> addActor; Int64MetricHandle actualTpsMetric; @@ -163,7 +165,7 @@ struct Ratekeeper { RatekeeperLimits normalLimits; RatekeeperLimits batchLimits; - Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), + RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE), @@ -172,7 +174,7 @@ struct Ratekeeper { }; //SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function -ACTOR Future trackStorageServerQueueInfo( Ratekeeper* self, StorageServerInterface ssi ) { +ACTOR Future trackStorageServerQueueInfo( RatekeeperData* self, StorageServerInterface ssi ) { self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) ); state Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); TraceEvent("RkTracking", ssi.id()); @@ -217,7 +219,7 @@ ACTOR Future trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI } } -ACTOR Future trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) { +ACTOR Future trackTLogQueueInfo( RatekeeperData* self, TLogInterface tli ) { self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) ); state Map::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id()); TraceEvent("RkTracking", tli.id()); @@ -270,7 +272,7 @@ ACTOR Future splitError( Future in, Promise errOut ) { } ACTOR Future trackEachStorageServer( - Ratekeeper* self, + RatekeeperData* self, FutureStream< std::pair> > serverChanges ) { state Map> actors; @@ -289,7 +291,59 @@ ACTOR Future trackEachStorageServer( } } -void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) { +ACTOR Future monitorServerListChange( + Reference> dbInfo, + PromiseStream< std::pair> > serverChanges) { + state Database db = openDBOnServer(dbInfo, TaskRateKeeper, true, true); + state Future checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY); + state Future>> serverListAndProcessClasses = Never(); + state std::map oldServers; + state Transaction tr(db); + + loop { + try { + choose { + when ( wait( checkSignal ) ) { + checkSignal = Never(); + serverListAndProcessClasses = getServerListAndProcessClasses(&tr); + } + when ( vector> results = wait( serverListAndProcessClasses ) ) { + serverListAndProcessClasses = Never(); + + std::map newServers; + for( int i = 0; i < results.size(); i++ ) { + UID serverId = results[i].first.id(); + StorageServerInterface const& ssi = results[i].first; + newServers[serverId] = ssi; + + if ( oldServers.count( serverId ) ) { + if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) { + serverChanges.send( std::make_pair(serverId, Optional(ssi)) ); + } + oldServers.erase(serverId); + } else { + serverChanges.send( std::make_pair(serverId, Optional(ssi)) ); + } + } + + for (auto it : oldServers) { + serverChanges.send( std::make_pair(it.first, Optional()) ); + } + + oldServers.swap(newServers); + tr = Transaction(db); + checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY); + } + } + } catch(Error& e) { + wait( tr.onError(e) ); + serverListAndProcessClasses = Never(); + checkSignal = Void(); + } + } +} + +void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { //double controlFactor = ; // dt / eFoldingTime double actualTps = self->smoothReleasedTransactions.smoothRate(); @@ -566,7 +620,7 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) { } } -ACTOR Future configurationMonitor( Ratekeeper* self, Reference> dbInfo ) { +ACTOR Future configurationMonitor(Reference> dbInfo, DatabaseConfiguration* conf) { state Database cx = openDBOnServer(dbInfo, TaskDefaultEndpoint, true, true); loop { state ReadYourWritesTransaction tr(cx); @@ -578,7 +632,7 @@ ACTOR Future configurationMonitor( Ratekeeper* self, Reference results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY ); - self->configuration.fromKeyValues( (VectorRef) results ); + conf->fromKeyValues( (VectorRef) results ); state Future watchFuture = tr.watch(moveKeysLockOwnerKey); wait( tr.commit() ); @@ -591,21 +645,26 @@ ACTOR Future configurationMonitor( Ratekeeper* self, Reference rateKeeper( - Reference> dbInfo, - PromiseStream< std::pair> > serverChanges, - FutureStream< struct GetRateInfoRequest > getRateInfo, - double* lastLimited) -{ - state Ratekeeper self; - state Future track = trackEachStorageServer( &self, serverChanges.getFuture() ); +ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference> dbInfo) { + state RatekeeperData self; state Future timeout = Void(); state std::vector> actors; state std::vector> tlogTrackers; state std::vector tlogInterfs; state Promise err; - state Future configMonitor = configurationMonitor(&self, dbInfo); - self.lastLimited = lastLimited; + state Future collection = actorCollection( self.addActor.getFuture() ); + + // TODOs: + double lastLimited; + self.lastLimited = &lastLimited; + + TraceEvent("Ratekeeper_Starting", rkInterf.id()); + self.addActor.send( waitFailureServer(rkInterf.waitFailure.getFuture()) ); + self.addActor.send( configurationMonitor(dbInfo, &self.configuration) ); + + PromiseStream< std::pair> > serverChanges; + self.addActor.send( monitorServerListChange(dbInfo, serverChanges) ); + self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) ); TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) .detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)); @@ -619,7 +678,6 @@ ACTOR Future rateKeeper( loop{ choose { - when (wait( track )) { break; } when (wait( timeout )) { updateRate(&self, self.normalLimits); updateRate(&self, self.batchLimits); @@ -638,7 +696,7 @@ ACTOR Future rateKeeper( } timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE); } - when (GetRateInfoRequest req = waitNext(getRateInfo)) { + when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) { GetRateInfoReply reply; auto& p = self.proxy_transactionCounts[ req.requesterID ]; @@ -672,8 +730,10 @@ ACTOR Future rateKeeper( tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) ); } } - when(wait(configMonitor)) {} + when ( wait(collection) ) { + ASSERT(false); + throw internal_error(); + } } } - return Void(); } diff --git a/fdbserver/Ratekeeper.h b/fdbserver/Ratekeeper.h deleted file mode 100644 index 282e99f766..0000000000 --- a/fdbserver/Ratekeeper.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Ratekeeper.h - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef FDBSERVER_RATEKEEPER_H -#define FDBSERVER_RATEKEEPER_H -#pragma once - -#include "fdbserver/MasterInterface.h" -#include "fdbserver/TLogInterface.h" -#include "fdbclient/DatabaseConfiguration.h" - -Future rateKeeper( - Reference> const& dbInfo, - PromiseStream< std::pair> > const& serverChanges, // actually an input, but we don't want broken_promise - FutureStream< struct GetRateInfoRequest > const& getRateInfo, - double* const& lastLimited); - -#endif diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h new file mode 100644 index 0000000000..cb5049a595 --- /dev/null +++ b/fdbserver/RatekeeperInterface.h @@ -0,0 +1,93 @@ +/* + * DataDistributorInterface.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_RATEKEEPERINTERFACE_H +#define FDBSERVER_RATEKEEPERINTERFACE_H + +#include "fdbclient/StorageServerInterface.h" +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/fdbrpc.h" +#include "fdbrpc/Locality.h" + +struct RatekeeperInterface { + RequestStream> waitFailure; + RequestStream getRateInfo; + RequestStream changeStorage; + struct LocalityData locality; + + RatekeeperInterface() {} + explicit RatekeeperInterface(const struct LocalityData& l) : locality(l) {} + + void initEndpoints() {} + UID id() const { return getRateInfo.getEndpoint().token; } + NetworkAddress address() const { return getRateInfo.getEndpoint().address; } + bool operator== (const RatekeeperInterface& r) const { + return id() == r.id(); + } + bool operator!= (const RatekeeperInterface& r) const { + return !(*this == r); + } + + template + void serialize(Archive& ar) { + serializer(ar, waitFailure, getRateInfo, changeStorage, locality); + } +}; + +struct GetRateInfoRequest { + UID requesterID; + int64_t totalReleasedTransactions; + int64_t batchReleasedTransactions; + bool detailed; + ReplyPromise reply; + + GetRateInfoRequest() {} + GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed) + : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {} + + template + void serialize(Ar& ar) { + serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply); + } +}; + +struct GetRateInfoReply { + double transactionRate; + double batchTransactionRate; + double leaseDuration; + HealthMetrics healthMetrics; + + template + void serialize(Ar& ar) { + serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics); + } +}; + +struct StorageChangeRequest { + UID ssID; + Optional ssInterf; + + template + void serialize(Ar& ar) { + serializer(ar, ssID, ssInterf); + } +}; + +#endif //FDBSERVER_RATEKEEPERINTERFACE_H diff --git a/fdbserver/ServerDBInfo.h b/fdbserver/ServerDBInfo.h index abb7be412c..c5a76f831b 100644 --- a/fdbserver/ServerDBInfo.h +++ b/fdbserver/ServerDBInfo.h @@ -26,6 +26,7 @@ #include "fdbserver/DataDistributorInterface.h" #include "fdbserver/MasterInterface.h" #include "fdbserver/LogSystemConfig.h" +#include "fdbserver/RatekeeperInterface.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/LatencyBandConfig.h" @@ -39,6 +40,7 @@ struct ServerDBInfo { ClientDBInfo client; // After a successful recovery, eventually proxies that communicate with it Optional distributor; // The best guess of current data distributor. MasterInterface master; // The best guess as to the most recent master, which might still be recovering + Optional ratekeeper; vector resolvers; DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful master recovery increments it twice; unsuccessful recoveries may increment it once. Depending on where the current master is in its recovery process, this might not have been written by the current master. RecoveryState recoveryState; diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index c3f6b1bb49..494ff3fcd3 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -28,6 +28,7 @@ #include "fdbserver/DataDistributorInterface.h" #include "fdbserver/MasterInterface.h" #include "fdbserver/TLogInterface.h" +#include "fdbserver/RatekeeperInterface.h" #include "fdbserver/ResolverInterface.h" #include "fdbclient/StorageServerInterface.h" #include "fdbserver/TesterInterface.actor.h" @@ -46,6 +47,7 @@ struct WorkerInterface { RequestStream< struct RecruitMasterRequest > master; RequestStream< struct InitializeMasterProxyRequest > masterProxy; RequestStream< struct InitializeDataDistributorRequest > dataDistributor; + RequestStream< struct InitializeRatekeeperRequest > ratekeeper; RequestStream< struct InitializeResolverRequest > resolver; RequestStream< struct InitializeStorageRequest > storage; RequestStream< struct InitializeLogRouterRequest > logRouter; @@ -68,7 +70,7 @@ struct WorkerInterface { template void serialize(Ar& ar) { - serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest); + serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest); } }; @@ -151,6 +153,16 @@ struct InitializeDataDistributorRequest { } }; +struct InitializeRatekeeperRequest { + UID reqId; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reqId, reply); + } +}; + struct InitializeResolverRequest { uint64_t recoveryCount; int proxyCount; @@ -300,6 +312,7 @@ struct Role { static const Role TESTER; static const Role LOG_ROUTER; static const Role DATA_DISTRIBUTOR; + static const Role RATE_KEEPER; std::string roleName; std::string abbreviation; @@ -361,6 +374,7 @@ ACTOR Future resolver(ResolverInterface proxy, InitializeResolverRequest i ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, Reference> db); ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference> db); +ACTOR Future rateKeeper(RatekeeperInterface const& rki, Reference> const& db); void registerThreadForProfiling(); void updateCpuProfiler(ProfilerRequest req); diff --git a/fdbserver/fdbserver.vcxproj b/fdbserver/fdbserver.vcxproj index 36e73c121a..483e0e5aec 100644 --- a/fdbserver/fdbserver.vcxproj +++ b/fdbserver/fdbserver.vcxproj @@ -191,7 +191,7 @@ false - + diff --git a/fdbserver/fdbserver.vcxproj.filters b/fdbserver/fdbserver.vcxproj.filters index 9c27ac6fad..3395d97ab0 100644 --- a/fdbserver/fdbserver.vcxproj.filters +++ b/fdbserver/fdbserver.vcxproj.filters @@ -310,6 +310,7 @@ + @@ -343,7 +344,7 @@ - + diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 6460f49403..d242ec446c 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -31,7 +31,6 @@ #include #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" -#include "fdbserver/Ratekeeper.h" #include "fdbserver/ClusterRecruitmentInterface.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/CoordinatedState.h" diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index c4b0dd0def..88d8ec85d0 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -349,14 +349,15 @@ ACTOR Future registrationClient( WorkerInterface interf, Reference> asyncPriorityInfo, ProcessClass initialClass, - Reference>> ddInterf) { + Reference>> ddInterf, + Reference>> rkInterf) { // Keeps the cluster controller (as it may be re-elected) informed that this worker exists // The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register) // The registration request piggybacks optional distributor interface if it exists. state Generation requestGeneration = 0; state ProcessClass processClass = initialClass; loop { - RegisterWorkerRequest request(interf, initialClass, processClass, asyncPriorityInfo->get(), requestGeneration++, ddInterf->get()); + RegisterWorkerRequest request(interf, initialClass, processClass, asyncPriorityInfo->get(), requestGeneration++, ddInterf->get(), rkInterf->get()); Future registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply(request) ) : Never(); choose { when ( RegisterWorkerReply reply = wait( registrationReply )) { @@ -365,6 +366,7 @@ ACTOR Future registrationClient( } when ( wait( ccInterface->onChange() )) { } when ( wait( ddInterf->onChange() ) ) {} + when ( wait( rkInterf->onChange() ) ) {} } } } @@ -610,6 +612,7 @@ ACTOR Future workerServer( Reference connFile, Refe Reference> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix, Promise recoveredDiskFiles) { state PromiseStream< ErrorInfo > errors; state Reference>> ddInterf( new AsyncVar>() ); + state Reference>> rkInterf( new AsyncVar>() ); state Future handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last state ActorCollection errorForwarders(false); state Future loggingTrigger = Void(); @@ -756,7 +759,7 @@ ACTOR Future workerServer( Reference connFile, Refe wait(waitForAll(recoveries)); recoveredDiskFiles.send(Void()); - errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf ) ); + errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf, rkInterf ) ); TraceEvent("RecoveriesComplete", interf.id()); @@ -837,6 +840,22 @@ ACTOR Future workerServer( Reference connFile, Refe TraceEvent("DataDistributorReceived", req.reqId).detail("DataDistributorId", recruited.id()); req.reply.send(recruited); } + when ( InitializeRatekeeperRequest req = waitNext(interf.ratekeeper.getFuture()) ) { + RatekeeperInterface recruited(locality); + recruited.initEndpoints(); + + if (rkInterf->get().present()) { + recruited = rkInterf->get().get(); + TEST(true); // Recruited while already a ratekeeper. + } else { + startRole(Role::RATE_KEEPER, recruited.id(), interf.id()); + Future ratekeeper = rateKeeper( recruited, dbInfo ); + errorForwarders.add( forwardError( errors, Role::RATE_KEEPER, recruited.id(), setWhenDoneOrError( ratekeeper, rkInterf, Optional() ) ) ); + rkInterf->set(Optional(recruited)); + } + TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id()); + req.reply.send(recruited); + } when( InitializeTLogRequest req = waitNext(interf.tLog.getFuture()) ) { // For now, there's a one-to-one mapping of spill type to TLogVersion. // With future work, a particular version of the TLog can support multiple @@ -1244,3 +1263,4 @@ const Role Role::CLUSTER_CONTROLLER("ClusterController", "CC"); const Role Role::TESTER("Tester", "TS"); const Role Role::LOG_ROUTER("LogRouter", "LR"); const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD"); +const Role Role::RATE_KEEPER("RateKeeper", "RK"); diff --git a/flow/network.h b/flow/network.h index 3b569da349..0437c5febe 100644 --- a/flow/network.h +++ b/flow/network.h @@ -67,6 +67,7 @@ enum { TaskUnknownEndpoint = 4000, TaskMoveKeys = 3550, TaskDataDistributionLaunch = 3530, + TaskRateKeeper = 3510, TaskDataDistribution = 3500, TaskDiskWrite = 3010, TaskUpdateStorage = 3000, From e6ac3f7fe8bd635c2954a8e3c8d814e69c3a36b4 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 15 Feb 2019 17:29:52 -0800 Subject: [PATCH 02/14] Minor fix on ratekeeper work registration. --- fdbserver/ClusterController.actor.cpp | 17 +++++++++----- fdbserver/DataDistribution.actor.cpp | 33 +++++++++++++++++++++++++++ fdbserver/Ratekeeper.actor.cpp | 4 ++-- fdbserver/RatekeeperInterface.h | 13 +---------- 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index d95b9c1385..be5bfc53e4 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1769,6 +1769,11 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { TraceEvent("ClusterController_RegisterDataDistributor", self->id).detail("DDID", di.id()); self->db.setDistributor(di); } + if ( req.ratekeeperInterf.present() && !self->db.serverInfo->get().ratekeeper.present() ) { + const RatekeeperInterface& rki = req.ratekeeperInterf.get(); + TraceEvent("ClusterController_RegisterRatekeeper", self->id).detail("RKID", rki.id()); + self->db.setRatekeeper(rki); + } if( info == self->id_worker.end() ) { self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo ); checkOutstandingRequests( self ); @@ -2398,7 +2403,7 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { req.reqId = g_random->randomUniqueID(); TraceEvent("ClusterController_RecruitRatekeeper", req.reqId).detail("Addr", rkWorker.worker.first.address()); - ErrorOr interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) ); + ErrorOr interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) ); if (interf.present()) { TraceEvent("ClusterController_RatekeeperRecruited", req.reqId).detail("Addr", rkWorker.worker.first.address()); return interf.get(); @@ -2414,7 +2419,7 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { } } -ACTOR Future waitRKRejoinOrStartRK(ClusterControllerData *self) { +ACTOR Future monitorRatekeeper(ClusterControllerData *self) { state Future initialDelay = delay(SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY); // wait for a while to see if an existing ratekeeper will join. @@ -2432,8 +2437,8 @@ ACTOR Future waitRKRejoinOrStartRK(ClusterControllerData *self) { loop { if ( self->db.serverInfo->get().ratekeeper.present() ) { wait( waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ) ); - TraceEvent("ClusterController", self->id) - .detail("RatekeeperDied", self->db.serverInfo->get().ratekeeper.get().id()); + TraceEvent("ClusterController_RateKeeperDied", self->id) + .detail("RKID", self->db.serverInfo->get().ratekeeper.get().id()); self->db.clearInterf(ProcessClass::RateKeeperClass); } else { RatekeeperInterface rkInterf = wait( startRatekeeper(self) ); @@ -2460,7 +2465,7 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, self.addActor.send( updateDatacenterVersionDifference(&self) ); self.addActor.send( handleForcedRecoveries(&self, interf) ); self.addActor.send( waitDDRejoinOrStartDD(&self) ); - self.addActor.send( waitRKRejoinOrStartRK(&self) ); + self.addActor.send( monitorRatekeeper(&self) ); //printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str()); loop choose { diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index fe52e5e42a..e1b80d2314 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3454,6 +3454,39 @@ ACTOR Future dataDistribution(Reference self, state DatabaseConfiguration configuration = self->configuration->get(); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; +<<<<<<< HEAD +======= + state Transaction tr(cx); + loop { + try { + tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); + tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); + + Standalone replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY)); + + for(auto& kv : replicaKeys) { + auto dcId = decodeDatacenterReplicasKey(kv.key); + auto replicas = decodeDatacenterReplicasValue(kv.value); + if ((self->primaryDcId.size() && self->primaryDcId[0] == dcId) || + (self->remoteDcIds.size() && self->remoteDcIds[0] == dcId && configuration.usableRegions > 1)) { + if(replicas > configuration.storageTeamSize) { + tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize)); + } + } else { + tr.clear(kv.key); + } + } + + wait(tr.commit()); + break; + } + catch(Error &e) { + wait(tr.onError(e)); + } + } + + +>>>>>>> Minor fix on ratekeeper work registration. //cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); //ASSERT( cx->locationCacheSize == SERVER_KNOBS->DD_LOCATION_CACHE_SIZE ); diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 83a8778411..f872a55566 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -655,7 +655,7 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference collection = actorCollection( self.addActor.getFuture() ); // TODOs: - double lastLimited; + double lastLimited = 0; self.lastLimited = &lastLimited; TraceEvent("Ratekeeper_Starting", rkInterf.id()); diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index cb5049a595..539aeb8d7f 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -29,7 +29,6 @@ struct RatekeeperInterface { RequestStream> waitFailure; RequestStream getRateInfo; - RequestStream changeStorage; struct LocalityData locality; RatekeeperInterface() {} @@ -47,7 +46,7 @@ struct RatekeeperInterface { template void serialize(Archive& ar) { - serializer(ar, waitFailure, getRateInfo, changeStorage, locality); + serializer(ar, waitFailure, getRateInfo, locality); } }; @@ -80,14 +79,4 @@ struct GetRateInfoReply { } }; -struct StorageChangeRequest { - UID ssID; - Optional ssInterf; - - template - void serialize(Ar& ar) { - serializer(ar, ssID, ssInterf); - } -}; - #endif //FDBSERVER_RATEKEEPERINTERFACE_H From 36a51a7b57d7a6f5f92249edb26c30fa2468f794 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 18 Feb 2019 14:57:21 -0800 Subject: [PATCH 03/14] Fix a segfault bug due to uncopied ratekeeper interface --- fdbserver/CMakeLists.txt | 1 - fdbserver/ClusterController.actor.cpp | 11 +++++++-- fdbserver/DataDistribution.actor.cpp | 32 +++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 58853a2fee..31af61de23 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -56,7 +56,6 @@ set(FDBSERVER_SRCS QuietDatabase.actor.cpp QuietDatabase.h Ratekeeper.actor.cpp - Ratekeeper.h RatekeeperInterface.h RecoveryState.h Restore.actor.cpp diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index be5bfc53e4..fef69bc7e6 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -936,6 +936,9 @@ public: if (db.serverInfo->get().distributor.present()) { id_used[db.serverInfo->get().distributor.get().locality.processId()]++; } + if (db.serverInfo->get().ratekeeper.present()) { + id_used[db.serverInfo->get().ratekeeper.get().locality.processId()]++; + } WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true); if ( oldMasterFit < mworker.fitness ) @@ -1121,6 +1124,9 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster if (cluster->db.serverInfo->get().distributor.present()) { id_used[cluster->db.serverInfo->get().distributor.get().locality.processId()]++; } + if (cluster->db.serverInfo->get().ratekeeper.present()) { + id_used[cluster->db.serverInfo->get().ratekeeper.get().locality.processId()]++; + } state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used); if( ( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.first.locality.processId() == cluster->clusterControllerProcessId ) && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) { @@ -1156,6 +1162,7 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster ++dbInfo.masterLifetime; dbInfo.clusterInterface = db->serverInfo->get().clusterInterface; dbInfo.distributor = db->serverInfo->get().distributor; + dbInfo.ratekeeper = db->serverInfo->get().ratekeeper; TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id); db->serverInfo->set( dbInfo ); @@ -2361,7 +2368,7 @@ ACTOR Future startDataDistributor( ClusterControllerDa } } -ACTOR Future waitDDRejoinOrStartDD(ClusterControllerData *self) { +ACTOR Future monitorDataDistributor(ClusterControllerData *self) { state Future initialDelay = delay(SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY); // wait for a while to see if existing data distributor will join. @@ -2464,7 +2471,7 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, self.addActor.send( updatedChangedDatacenters(&self) ); self.addActor.send( updateDatacenterVersionDifference(&self) ); self.addActor.send( handleForcedRecoveries(&self, interf) ); - self.addActor.send( waitDDRejoinOrStartDD(&self) ); + self.addActor.send( monitorDataDistributor(&self) ); self.addActor.send( monitorRatekeeper(&self) ); //printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str()); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index e1b80d2314..a46d5d8f01 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3447,6 +3447,38 @@ ACTOR Future pollMoveKeysLock( Database cx, MoveKeysLock lock ) { } } +<<<<<<< HEAD +======= +struct DataDistributorData : NonCopyable, ReferenceCounted { + Reference> dbInfo; + Reference> configuration; + std::vector> primaryDcId; + std::vector> remoteDcIds; + UID ddId; + PromiseStream> addActor; + + DataDistributorData(Reference> const& db, Reference> const& dbConfig, UID id) + : dbInfo(db), configuration(dbConfig), ddId(id) {} + + void refreshDcIds() { + primaryDcId.clear(); + remoteDcIds.clear(); + + const std::vector& regions = configuration->get().regions; + TraceEvent ev("DataDistributor", ddId); + if ( regions.size() > 0 ) { + primaryDcId.push_back( regions[0].dcId ); + ev.detail("PrimaryDcID", regions[0].dcId.toHexString()); + } + if ( regions.size() > 1 ) { + remoteDcIds.push_back( regions[1].dcId ); + ev.detail("SecondaryDcID", regions[1].dcId.toHexString()); + } + } +}; + +// TODO: remove lastLimited -- obtain this information of ratekeeper from proxy +>>>>>>> Fix a segfault bug due to uncopied ratekeeper interface ACTOR Future dataDistribution(Reference self, double* lastLimited) { From b2ee41ba33570f1db9b120ec3785f923a6fd68f1 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 19 Feb 2019 14:04:45 -0800 Subject: [PATCH 04/14] Remove lastLimited from data distribution Fix a serialization bug in ServerDBInfo, which causes test failures. --- fdbserver/DataDistribution.actor.cpp | 24 ++++++++++++++++++++++- fdbserver/DataDistribution.actor.h | 3 +-- fdbserver/DataDistributionQueue.actor.cpp | 6 +++--- fdbserver/Ratekeeper.actor.cpp | 9 ++++++--- fdbserver/ServerDBInfo.h | 2 +- 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index a46d5d8f01..667e606b35 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3478,9 +3478,13 @@ struct DataDistributorData : NonCopyable, ReferenceCounted }; // TODO: remove lastLimited -- obtain this information of ratekeeper from proxy +<<<<<<< HEAD >>>>>>> Fix a segfault bug due to uncopied ratekeeper interface ACTOR Future dataDistribution(Reference self, double* lastLimited) +======= +ACTOR Future dataDistribution(Reference self) +>>>>>>> Remove lastLimited from data distribution { state Database cx = openDBOnServer(self->dbInfo, TaskDataDistributionLaunch, true, true); state DatabaseConfiguration configuration = self->configuration->get(); @@ -3671,7 +3675,7 @@ ACTOR Future dataDistribution(Reference self, actors.push_back( pollMoveKeysLock(cx, lock) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) ); - actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); vector teamCollectionsPtrs; Reference primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, self->primaryDcId, configuration.usableRegions > 1 ? self->remoteDcIds : std::vector>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ); @@ -3730,10 +3734,28 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference distributor = reportErrorsExcept( dataDistribution( self->dbInfo, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); wait( distributor || collection ); +======= + state Future distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() ); + + loop choose { + when ( wait( self->configuration->onChange() ) ) { + TraceEvent("DataDistributor_Restart", di.id()) + .detail("Configuration", self->configuration->get().toString()); + self->refreshDcIds(); + distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() ); + } + when ( wait( collection ) ) { + ASSERT(false); + throw internal_error(); + } + when ( wait( distributor ) ) {} + } +>>>>>>> Remove lastLimited from data distribution } catch ( Error &err ) { if ( normalDataDistributorErrors().count(err.code()) == 0 ) { diff --git a/fdbserver/DataDistribution.actor.h b/fdbserver/DataDistribution.actor.h index 1baff7f58e..109c57878b 100644 --- a/fdbserver/DataDistribution.actor.h +++ b/fdbserver/DataDistribution.actor.h @@ -230,8 +230,7 @@ Future dataDistributionQueue( MoveKeysLock const& lock, PromiseStream> const& getAverageShardBytes, UID const& distributorId, - int const& teamSize, - double* const& lastLimited); + int const& teamSize); //Holds the permitted size and IO Bounds for a shard struct ShardSizeBounds { diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index f282ef12bc..f0b8d28d41 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1201,10 +1201,10 @@ ACTOR Future dataDistributionQueue( MoveKeysLock lock, PromiseStream> getAverageShardBytes, UID distributorId, - int teamSize, - double* lastLimited) + int teamSize) { - state DDQueueData self( distributorId, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, lastLimited ); + state double lastLimited = 0; + state DDQueueData self( distributorId, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, &lastLimited ); state std::set serversToLaunchFrom; state KeyRange keysToLaunchFrom; state RelocateData launchData; diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index f872a55566..81daee2629 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -648,7 +648,6 @@ ACTOR Future configurationMonitor(Reference> dbInfo ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference> dbInfo) { state RatekeeperData self; state Future timeout = Void(); - state std::vector> actors; state std::vector> tlogTrackers; state std::vector tlogInterfs; state Promise err; @@ -676,8 +675,8 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference rateKeeper(RatekeeperInterface rkInterf, Reference void serialize( Ar& ar ) { - serializer(ar, id, clusterInterface, client, distributor, master, resolvers, recoveryCount, recoveryState, masterLifetime, logSystemConfig, priorCommittedLogServers, latencyBandConfig); + serializer(ar, id, clusterInterface, client, distributor, master, ratekeeper, resolvers, recoveryCount, recoveryState, masterLifetime, logSystemConfig, priorCommittedLogServers, latencyBandConfig); } }; From d52ff738c0dba2b4003cd1bb69bd89d62dfa921e Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 19 Feb 2019 21:05:24 -0800 Subject: [PATCH 05/14] Fix merge conflicts during rebase. --- fdbserver/DataDistribution.actor.cpp | 9 +++++++-- fdbserver/RatekeeperInterface.h | 2 +- fdbserver/WorkerInterface.actor.h | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 667e606b35..7a18ee8a32 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3535,9 +3535,10 @@ ACTOR Future dataDistribution(Reference self) loop { try { loop { - TraceEvent("DDInitTakingMoveKeysLock", myId); - MoveKeysLock lock_ = wait( takeMoveKeysLock( cx, myId ) ); + TraceEvent("DDInitTakingMoveKeysLock", self->ddId); + MoveKeysLock lock_ = wait( takeMoveKeysLock( cx, self->ddId ) ); lock = lock_; +<<<<<<< HEAD TraceEvent("DDInitTookMoveKeysLock", myId); DatabaseConfiguration configuration_ = wait( getDatabaseConfiguration(cx) ); @@ -3584,6 +3585,10 @@ ACTOR Future dataDistribution(Reference self) TraceEvent("DDInitUpdatedReplicaKeys", myId); Reference initData_ = wait( getInitialDataDistribution(cx, myId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector>() ) ); +======= + TraceEvent("DDInitTookMoveKeysLock", self->ddId); + Reference initData_ = wait( getInitialDataDistribution(cx, self->ddId, lock, configuration.usableRegions > 1 ? self->remoteDcIds : std::vector>() ) ); +>>>>>>> Fix merge conflicts during rebase. initData = initData_; if(initData->shards.size() > 1) { TraceEvent("DDInitGotInitialDD", self->ddId) diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index 539aeb8d7f..36a47e167a 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -36,7 +36,7 @@ struct RatekeeperInterface { void initEndpoints() {} UID id() const { return getRateInfo.getEndpoint().token; } - NetworkAddress address() const { return getRateInfo.getEndpoint().address; } + NetworkAddress address() const { return getRateInfo.getEndpoint().getPrimaryAddress(); } bool operator== (const RatekeeperInterface& r) const { return id() == r.id(); } diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 494ff3fcd3..370d780127 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -374,7 +374,7 @@ ACTOR Future resolver(ResolverInterface proxy, InitializeResolverRequest i ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, Reference> db); ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference> db); -ACTOR Future rateKeeper(RatekeeperInterface const& rki, Reference> const& db); +ACTOR Future rateKeeper(RatekeeperInterface rki, Reference> db); void registerThreadForProfiling(); void updateCpuProfiler(ProfilerRequest req); From 517966fce219e484d90512108d09fdfeada848eb Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 20 Feb 2019 15:47:55 -0800 Subject: [PATCH 06/14] Remove lastLimited from rate keeper Refactor code to make IDE happy. --- fdbserver/ClusterController.actor.cpp | 14 ++++++++------ fdbserver/Ratekeeper.actor.cpp | 5 ----- fdbserver/WorkerInterface.actor.h | 4 ++++ 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index fef69bc7e6..c4b0213eeb 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2336,11 +2336,11 @@ ACTOR Future handleForcedRecoveries( ClusterControllerData *self, ClusterC ACTOR Future startDataDistributor( ClusterControllerData *self ) { state Optional dcId = self->clusterControllerDcId; - state InitializeDataDistributorRequest req; while ( !self->clusterControllerProcessId.present() || !self->masterProcessId.present() ) { wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) ); } + state UID reqId; loop { try { while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) { @@ -2349,7 +2349,8 @@ ACTOR Future startDataDistributor( ClusterControllerDa std::map>, int> id_used = self->getUsedIds(); state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used); - req.reqId = g_random->randomUniqueID(); + reqId = g_random->randomUniqueID(); + state InitializeDataDistributorRequest req(reqId); TraceEvent("ClusterController_DataDistributorRecruit", req.reqId).detail("Addr", data_distributor.worker.first.address()); ErrorOr distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) ); @@ -2359,7 +2360,7 @@ ACTOR Future startDataDistributor( ClusterControllerDa } } catch (Error& e) { - TraceEvent("ClusterController_DataDistributorRecruitError", req.reqId).error(e); + TraceEvent("ClusterController_DataDistributorRecruitError", reqId).error(e); if ( e.code() != error_code_no_more_servers ) { throw; } @@ -2397,6 +2398,7 @@ ACTOR Future monitorDataDistributor(ClusterControllerData *self) { } ACTOR Future startRatekeeper(ClusterControllerData *self) { + state UID reqId; loop { try { while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) { @@ -2406,8 +2408,8 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { std::map>, int> id_used = self->getUsedIds(); Optional dcId = self->clusterControllerDcId; state WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used); - state InitializeRatekeeperRequest req; - req.reqId = g_random->randomUniqueID(); + reqId = g_random->randomUniqueID(); + state InitializeRatekeeperRequest req(reqId); TraceEvent("ClusterController_RecruitRatekeeper", req.reqId).detail("Addr", rkWorker.worker.first.address()); ErrorOr interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) ); @@ -2417,7 +2419,7 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { } } catch (Error& e) { - TraceEvent("ClusterController_RatekeeperRecruitError", req.reqId).error(e); + TraceEvent("ClusterController_RatekeeperRecruitError", reqId).error(e); if ( e.code() != error_code_no_more_servers ) { throw; } diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 81daee2629..ec6150a87e 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -160,7 +160,6 @@ struct RatekeeperData { Int64MetricHandle actualTpsMetric; double lastWarning; - double* lastLimited; RatekeeperLimits normalLimits; RatekeeperLimits batchLimits; @@ -653,10 +652,6 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference err; state Future collection = actorCollection( self.addActor.getFuture() ); - // TODOs: - double lastLimited = 0; - self.lastLimited = &lastLimited; - TraceEvent("Ratekeeper_Starting", rkInterf.id()); self.addActor.send( waitFailureServer(rkInterf.waitFailure.getFuture()) ); self.addActor.send( configurationMonitor(dbInfo, &self.configuration) ); diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 370d780127..f79a96f623 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -147,6 +147,8 @@ struct InitializeDataDistributorRequest { UID reqId; ReplyPromise reply; + InitializeDataDistributorRequest() {} + explicit InitializeDataDistributorRequest(UID uid) : reqId(uid) {} template void serialize( Ar& ar ) { serializer(ar, reqId, reply); @@ -157,6 +159,8 @@ struct InitializeRatekeeperRequest { UID reqId; ReplyPromise reply; + InitializeRatekeeperRequest() {} + explicit InitializeRatekeeperRequest(UID uid) : reqId(uid) {} template void serialize(Ar& ar) { serializer(ar, reqId, reply); From 5dcde9efe0a6cb6ce22bd3b90de4507a3e887715 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 21 Feb 2019 15:30:39 -0800 Subject: [PATCH 07/14] Fix locality per review comment and a mac compile error --- fdbrpc/Locality.cpp | 66 ++++++++++++++++++--------------------------- 1 file changed, 26 insertions(+), 40 deletions(-) diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index ff9135d77e..caf3fa1d70 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -164,49 +164,35 @@ ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) cons } case ProcessClass::DataDistributor: switch( _class ) { - case ProcessClass::DataDistributorClass: - return ProcessClass::BestFit; - case ProcessClass::StatelessClass: - return ProcessClass::GoodFit; - case ProcessClass::MasterClass: - return ProcessClass::OkayFit; - case ProcessClass::ResolutionClass: - return ProcessClass::OkayFit; - case ProcessClass::TransactionClass: - return ProcessClass::OkayFit; - case ProcessClass::ProxyClass: - return ProcessClass::OkayFit; - case ProcessClass::UnsetClass: - return ProcessClass::UnsetFit; - case ProcessClass::CoordinatorClass: - return ProcessClass::NeverAssign; - case ProcessClass::TesterClass: - return ProcessClass::NeverAssign; - default: - return ProcessClass::WorstFit; + case ProcessClass::DataDistributorClass: + return ProcessClass::BestFit; + case ProcessClass::StatelessClass: + return ProcessClass::GoodFit; + case ProcessClass::MasterClass: + return ProcessClass::OkayFit; + case ProcessClass::UnsetClass: + return ProcessClass::UnsetFit; + case ProcessClass::CoordinatorClass: + case ProcessClass::TesterClass: + return ProcessClass::NeverAssign; + default: + return ProcessClass::WorstFit; } case ProcessClass::RateKeeper: switch( _class ) { - case ProcessClass::RateKeeperClass: - return ProcessClass::BestFit; - case ProcessClass::StatelessClass: - return ProcessClass::GoodFit; - case ProcessClass::MasterClass: - return ProcessClass::OkayFit; - case ProcessClass::ResolutionClass: - return ProcessClass::OkayFit; - case ProcessClass::TransactionClass: - return ProcessClass::OkayFit; - case ProcessClass::ProxyClass: - return ProcessClass::OkayFit; - case ProcessClass::UnsetClass: - return ProcessClass::UnsetFit; - case ProcessClass::CoordinatorClass: - return ProcessClass::NeverAssign; - case ProcessClass::TesterClass: - return ProcessClass::NeverAssign; - default: - return ProcessClass::WorstFit; + case ProcessClass::RateKeeperClass: + return ProcessClass::BestFit; + case ProcessClass::StatelessClass: + return ProcessClass::GoodFit; + case ProcessClass::MasterClass: + return ProcessClass::OkayFit; + case ProcessClass::UnsetClass: + return ProcessClass::UnsetFit; + case ProcessClass::CoordinatorClass: + case ProcessClass::TesterClass: + return ProcessClass::NeverAssign; + default: + return ProcessClass::WorstFit; } default: return ProcessClass::NeverAssign; From 734099826125f3d6081126a6a4d5f4a9bcfb20ea Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 22 Feb 2019 15:04:38 -0800 Subject: [PATCH 08/14] Fix status message for ratekeeper --- documentation/sphinx/source/mr-status.rst | 1 + fdbclient/Schemas.cpp | 1 + fdbserver/Status.actor.cpp | 21 +++++++++++++++++---- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/documentation/sphinx/source/mr-status.rst b/documentation/sphinx/source/mr-status.rst index 61d2103b16..441624f4ad 100644 --- a/documentation/sphinx/source/mr-status.rst +++ b/documentation/sphinx/source/mr-status.rst @@ -339,6 +339,7 @@ cluster.messages log_servers_error Time cluster.messages transaction_start_timeout Unable to start transaction after __ seconds. cluster.messages unreachable_master_worker Unable to locate the master worker. cluster.messages unreachable_dataDistributor_worker Unable to locate the data distributor worker. +cluster.messages unreachable_ratekeeper_worker Unable to locate the ratekeeper worker. cluster.messages unreachable_processes The cluster has some unreachable processes. cluster.messages unreadable_configuration Unable to read database configuration. cluster.messages layer_status_incomplete Some or all of the layers subdocument could not be read. diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 769d7e8d25..34b8f1826d 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -324,6 +324,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "$enum":[ "unreachable_master_worker", "unreachable_dataDistributor_worker", + "unreachable_ratekeeper_worker", "unreadable_configuration", "full_replication_timeout", "client_issues", diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 0b48d7fcf2..374d0728e3 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -1387,7 +1387,7 @@ JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transP return perfLimit; } -ACTOR static Future workloadStatusFetcher(Reference> db, vector> workers, std::pair mWorker, std::pair ddWorker, +ACTOR static Future workloadStatusFetcher(Reference> db, vector> workers, std::pair mWorker, std::pair rkWorker, JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set *incomplete_reasons, Future>>> storageServerFuture) { state JsonBuilderObject statusObj; @@ -1439,8 +1439,8 @@ ACTOR static Future workloadStatusFetcher(Reference clusterGetStatus( state std::set status_incomplete_reasons; state std::pair mWorker; state std::pair ddWorker; // DataDistributor worker + state std::pair rkWorker; // RateKeeper worker try { // Get the master Worker interface @@ -1837,6 +1838,18 @@ ACTOR Future clusterGetStatus( ddWorker = _ddWorker.get(); } + // Get the RateKeeper worker interface + Optional> _rkWorker; + if (db->get().ratekeeper.present()) { + _rkWorker = getWorker( workers, db->get().ratekeeper.get().address() ); + } + + if (!db->get().ratekeeper.present() || !_rkWorker.present()) { + messages.push_back(JsonString::makeMessage("unreachable_ratekeeper_worker", "Unable to locate the ratekeeper worker.")); + } else { + rkWorker = _rkWorker.get(); + } + // Get latest events for various event types from ALL workers // WorkerEvents is a map of worker's NetworkAddress to its event string // The pair represents worker responses and a set of worker NetworkAddress strings which did not respond @@ -1940,7 +1953,7 @@ ACTOR Future clusterGetStatus( state int minReplicasRemaining = -1; std::vector> futures2; futures2.push_back(dataStatusFetcher(ddWorker, &minReplicasRemaining)); - futures2.push_back(workloadStatusFetcher(db, workers, mWorker, ddWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture)); + futures2.push_back(workloadStatusFetcher(db, workers, mWorker, rkWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture)); futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons)); futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons)); From 835cc278c33819739520d2ce35cd7c84ce2d9db2 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 22 Feb 2019 16:36:07 -0800 Subject: [PATCH 09/14] Fix rebase conflicts. --- fdbserver/DataDistribution.actor.cpp | 115 +++------------------------ 1 file changed, 9 insertions(+), 106 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 7a18ee8a32..d664e07249 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3447,82 +3447,19 @@ ACTOR Future pollMoveKeysLock( Database cx, MoveKeysLock lock ) { } } -<<<<<<< HEAD -======= struct DataDistributorData : NonCopyable, ReferenceCounted { Reference> dbInfo; - Reference> configuration; - std::vector> primaryDcId; - std::vector> remoteDcIds; UID ddId; PromiseStream> addActor; - DataDistributorData(Reference> const& db, Reference> const& dbConfig, UID id) - : dbInfo(db), configuration(dbConfig), ddId(id) {} - - void refreshDcIds() { - primaryDcId.clear(); - remoteDcIds.clear(); - - const std::vector& regions = configuration->get().regions; - TraceEvent ev("DataDistributor", ddId); - if ( regions.size() > 0 ) { - primaryDcId.push_back( regions[0].dcId ); - ev.detail("PrimaryDcID", regions[0].dcId.toHexString()); - } - if ( regions.size() > 1 ) { - remoteDcIds.push_back( regions[1].dcId ); - ev.detail("SecondaryDcID", regions[1].dcId.toHexString()); - } - } + DataDistributorData(Reference> const& db, UID id) : dbInfo(db), ddId(id) {} }; -// TODO: remove lastLimited -- obtain this information of ratekeeper from proxy -<<<<<<< HEAD ->>>>>>> Fix a segfault bug due to uncopied ratekeeper interface -ACTOR Future dataDistribution(Reference self, - double* lastLimited) -======= ACTOR Future dataDistribution(Reference self) ->>>>>>> Remove lastLimited from data distribution { state Database cx = openDBOnServer(self->dbInfo, TaskDataDistributionLaunch, true, true); - state DatabaseConfiguration configuration = self->configuration->get(); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; -<<<<<<< HEAD -======= - state Transaction tr(cx); - loop { - try { - tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); - tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); - - Standalone replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY)); - - for(auto& kv : replicaKeys) { - auto dcId = decodeDatacenterReplicasKey(kv.key); - auto replicas = decodeDatacenterReplicasValue(kv.value); - if ((self->primaryDcId.size() && self->primaryDcId[0] == dcId) || - (self->remoteDcIds.size() && self->remoteDcIds[0] == dcId && configuration.usableRegions > 1)) { - if(replicas > configuration.storageTeamSize) { - tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize)); - } - } else { - tr.clear(kv.key); - } - } - - wait(tr.commit()); - break; - } - catch(Error &e) { - wait(tr.onError(e)); - } - } - - ->>>>>>> Minor fix on ratekeeper work registration. //cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); //ASSERT( cx->locationCacheSize == SERVER_KNOBS->DD_LOCATION_CACHE_SIZE ); @@ -3538,8 +3475,7 @@ ACTOR Future dataDistribution(Reference self) TraceEvent("DDInitTakingMoveKeysLock", self->ddId); MoveKeysLock lock_ = wait( takeMoveKeysLock( cx, self->ddId ) ); lock = lock_; -<<<<<<< HEAD - TraceEvent("DDInitTookMoveKeysLock", myId); + TraceEvent("DDInitTookMoveKeysLock", self->ddId); DatabaseConfiguration configuration_ = wait( getDatabaseConfiguration(cx) ); configuration = configuration_; @@ -3553,7 +3489,7 @@ ACTOR Future dataDistribution(Reference self) remoteDcIds.push_back( regions[1].dcId ); } - TraceEvent("DDInitGotConfiguration", myId).detail("Conf", configuration.toString()); + TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", configuration.toString()); state Transaction tr(cx); loop { @@ -3583,12 +3519,8 @@ ACTOR Future dataDistribution(Reference self) } } - TraceEvent("DDInitUpdatedReplicaKeys", myId); - Reference initData_ = wait( getInitialDataDistribution(cx, myId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector>() ) ); -======= - TraceEvent("DDInitTookMoveKeysLock", self->ddId); - Reference initData_ = wait( getInitialDataDistribution(cx, self->ddId, lock, configuration.usableRegions > 1 ? self->remoteDcIds : std::vector>() ) ); ->>>>>>> Fix merge conflicts during rebase. + TraceEvent("DDInitUpdatedReplicaKeys", self->ddId); + Reference initData_ = wait( getInitialDataDistribution(cx, self->ddId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector>() ) ); initData = initData_; if(initData->shards.size() > 1) { TraceEvent("DDInitGotInitialDD", self->ddId) @@ -3683,10 +3615,10 @@ ACTOR Future dataDistribution(Reference self) actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); vector teamCollectionsPtrs; - Reference primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, self->primaryDcId, configuration.usableRegions > 1 ? self->remoteDcIds : std::vector>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ); + Reference primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ); teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr()); if (configuration.usableRegions > 1) { - Reference remoteTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, self->remoteDcIds, Optional>>(), readyToStart.getFuture() && remoteRecovered(self->dbInfo), zeroHealthyTeams[1], false, processingUnhealthy) ); + Reference remoteTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional>>(), readyToStart.getFuture() && remoteRecovered(self->dbInfo), zeroHealthyTeams[1], false, processingUnhealthy) ); teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr()); remoteTeamCollection->teamCollections = teamCollectionsPtrs; actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], self->dbInfo ), "DDTeamCollectionSecondary", self->ddId, &normalDDQueueErrors() ) ); @@ -3710,14 +3642,6 @@ ACTOR Future dataDistribution(Reference self) } } -struct DataDistributorData : NonCopyable, ReferenceCounted { - Reference> dbInfo; - UID ddId; - PromiseStream> addActor; - - DataDistributorData(Reference> const& db, UID id) : dbInfo(db), ddId(id) {} -}; - static std::set const& normalDataDistributorErrors() { static std::set s; if (s.empty()) { @@ -3734,33 +3658,12 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference self( new DataDistributorData(db, di.id()) ); state Future collection = actorCollection( self->addActor.getFuture() ); - TraceEvent("DataDistributor_Starting", di.id()); - self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) ); - try { TraceEvent("DataDistributor_Running", di.id()); -<<<<<<< HEAD - state double lastLimited = 0; - state Future distributor = reportErrorsExcept( dataDistribution( self->dbInfo, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() ); - - wait( distributor || collection ); -======= + self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) ); state Future distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() ); - loop choose { - when ( wait( self->configuration->onChange() ) ) { - TraceEvent("DataDistributor_Restart", di.id()) - .detail("Configuration", self->configuration->get().toString()); - self->refreshDcIds(); - distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() ); - } - when ( wait( collection ) ) { - ASSERT(false); - throw internal_error(); - } - when ( wait( distributor ) ) {} - } ->>>>>>> Remove lastLimited from data distribution + wait( distributor || collection ); } catch ( Error &err ) { if ( normalDataDistributorErrors().count(err.code()) == 0 ) { From dc129207a92098126260c627c57704d02b1d6a3a Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 27 Feb 2019 11:51:48 -0800 Subject: [PATCH 10/14] Minor fix after rebase. --- fdbserver/DataDistributorInterface.h | 1 - fdbserver/MasterProxyServer.actor.cpp | 12 ------------ fdbserver/Ratekeeper.actor.cpp | 7 +------ fdbserver/RatekeeperInterface.h | 1 - 4 files changed, 1 insertion(+), 20 deletions(-) diff --git a/fdbserver/DataDistributorInterface.h b/fdbserver/DataDistributorInterface.h index d437fc69ae..4c2f68f83d 100644 --- a/fdbserver/DataDistributorInterface.h +++ b/fdbserver/DataDistributorInterface.h @@ -21,7 +21,6 @@ #ifndef FDBSERVER_DATADISTRIBUTORINTERFACE_H #define FDBSERVER_DATADISTRIBUTORINTERFACE_H -#include "fdbclient/FDBTypes.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 2b3299572e..4cbe878eb6 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -76,17 +76,6 @@ struct ProxyStats { } }; -ACTOR template -Future forwardValue(Promise out, Future in) -{ - // Like forwardPromise, but throws on error - T t = wait(in); - out.send(t); - return Void(); -} - -int getBytes(Promise const& r) { return 0; } - ACTOR Future getRate(UID myID, Reference> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate, double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) { state Future nextRequestTimer = Never(); @@ -94,7 +83,6 @@ ACTOR Future getRate(UID myID, Reference> db, int64 state Future reply = Never(); state double lastDetailedReply = 0.0; // request detailed metrics immediately state bool expectingDetailedReply = false; - state int64_t lastTC = 0; if (db->get().ratekeeper.present()) nextRequestTimer = Void(); diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index ec6150a87e..00ea50e850 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -24,7 +24,7 @@ #include "fdbrpc/simulator.h" #include "fdbclient/ReadYourWrites.h" #include "fdbserver/Knobs.h" -#include "fdbserver/DataDistribution.h" +#include "fdbserver/DataDistribution.actor.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/WaitFailure.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -676,11 +676,6 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit) { - *self.lastLimited = now(); - } - - double tooOld = now() - 1.0; for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) { if (p->second.time < tooOld) diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index 36a47e167a..c50447d544 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -21,7 +21,6 @@ #ifndef FDBSERVER_RATEKEEPERINTERFACE_H #define FDBSERVER_RATEKEEPERINTERFACE_H -#include "fdbclient/StorageServerInterface.h" #include "fdbclient/FDBTypes.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" From f43277e8192ee71c62a0492f55e6002d96b6106b Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 6 Mar 2019 10:46:17 -0800 Subject: [PATCH 11/14] Format Ratekeeper.actor.cpp code --- fdbserver/Ratekeeper.actor.cpp | 82 +++++++++++++++++----------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 00ea50e850..a0ff3ffd5e 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -310,12 +310,12 @@ ACTOR Future monitorServerListChange( serverListAndProcessClasses = Never(); std::map newServers; - for( int i = 0; i < results.size(); i++ ) { - UID serverId = results[i].first.id(); - StorageServerInterface const& ssi = results[i].first; + for (int i = 0; i < results.size(); i++) { + const StorageServerInterface& ssi = results[i].first; + const UID serverId = ssi.id(); newServers[serverId] = ssi; - if ( oldServers.count( serverId ) ) { + if (oldServers.count(serverId)) { if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) { serverChanges.send( std::make_pair(serverId, Optional(ssi)) ); } @@ -325,7 +325,7 @@ ACTOR Future monitorServerListChange( } } - for (auto it : oldServers) { + for (const auto& it : oldServers) { serverChanges.send( std::make_pair(it.first, Optional()) ); } @@ -342,7 +342,7 @@ ACTOR Future monitorServerListChange( } } -void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { +void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { //double controlFactor = ; // dt / eFoldingTime double actualTps = self->smoothReleasedTransactions.smoothRate(); @@ -350,7 +350,7 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { // SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT ); - limits.tpsLimit = std::numeric_limits::infinity(); + limits->tpsLimit = std::numeric_limits::infinity(); UID reasonID = UID(); limitReason_t limitReason = limitReason_t::unlimited; @@ -376,9 +376,9 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace); - int64_t springBytes = std::max(1, std::min(limits.storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2)); - int64_t targetBytes = std::max(1, std::min(limits.storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace)); - if (targetBytes != limits.storageTargetBytes) { + int64_t springBytes = std::max(1, std::min(limits->storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2)); + int64_t targetBytes = std::max(1, std::min(limits->storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace)); + if (targetBytes != limits->storageTargetBytes) { if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) { ssLimitReason = limitReason_t::storage_server_min_free_space; } else { @@ -442,9 +442,9 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss)); - if(limitTps < limits.tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) { + if (limitTps < limits->tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) { reasonID = ss.id; - limits.tpsLimit = limitTps; + limits->tpsLimit = limitTps; limitReason = ssLimitReason; } @@ -455,19 +455,19 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer; std::set>> ignoredMachines; - for(auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits.tpsLimit; ++ss) { - if(ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { + for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) { + if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { ignoredMachines.insert(ss->second->locality.zoneId()); continue; } - if(ignoredMachines.count(ss->second->locality.zoneId()) > 0) { + if (ignoredMachines.count(ss->second->locality.zoneId()) > 0) { continue; } limitingStorageQueueStorageServer = ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal(); - limits.tpsLimit = ss->first; - limitReason = ssReasons[storageTpsLimitReverseIndex.begin()->second->id]; + limits->tpsLimit = ss->first; reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process + limitReason = ssReasons[reasonID]; break; } @@ -479,27 +479,27 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { { Version minSSVer = std::numeric_limits::max(); Version minLimitingSSVer = std::numeric_limits::max(); - for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) { - auto& ss = i->value; + for (const auto& it : self->storageQueueInfo) { + auto& ss = it.value; if (!ss.valid) continue; minSSVer = std::min(minSSVer, ss.lastReply.version); // Machines that ratekeeper isn't controlling can fall arbitrarily far behind - if(ignoredMachines.count(i->value.locality.zoneId()) == 0) { + if (ignoredMachines.count(it.value.locality.zoneId()) == 0) { minLimitingSSVer = std::min(minLimitingSSVer, ss.lastReply.version); } } Version maxTLVer = std::numeric_limits::min(); - for(auto i = self->tlogQueueInfo.begin(); i != self->tlogQueueInfo.end(); ++i) { - auto& tl = i->value; + for(const auto& it : self->tlogQueueInfo) { + auto& tl = it.value; if (!tl.valid) continue; maxTLVer = std::max(maxTLVer, tl.lastReply.v); } // writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed - writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits.maxVersionDifference/2) / (limits.maxVersionDifference/4); + writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference/2) / (limits->maxVersionDifference/4); worstVersionLag = std::max((Version)0, maxTLVer - minSSVer); limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer); } @@ -507,8 +507,8 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { int64_t worstFreeSpaceTLog = std::numeric_limits::max(); int64_t worstStorageQueueTLog = 0; int tlcount = 0; - for(auto i = self->tlogQueueInfo.begin(); i != self->tlogQueueInfo.end(); ++i) { - auto& tl = i->value; + for (auto& it : self->tlogQueueInfo) { + auto& tl = it.value; if (!tl.valid) continue; ++tlcount; @@ -518,9 +518,9 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace); - int64_t springBytes = std::max(1, std::min(limits.logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2)); - int64_t targetBytes = std::max(1, std::min(limits.logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace)); - if (targetBytes != limits.logTargetBytes) { + int64_t springBytes = std::max(1, std::min(limits->logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2)); + int64_t targetBytes = std::max(1, std::min(limits->logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace)); + if (targetBytes != limits->logTargetBytes) { if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) { tlogLimitReason = limitReason_t::log_server_min_free_space; } else { @@ -540,7 +540,7 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { } reasonID = tl.id; limitReason = limitReason_t::log_server_min_free_space; - limits.tpsLimit = 0.0; + limits->tpsLimit = 0.0; } double targetRateRatio = std::min( ( b + springBytes ) / (double)springBytes, 2.0 ); @@ -558,8 +558,8 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { if (targetRateRatio < .75) //< FIXME: KNOB for 2.0 x = std::max(x, 0.95); double lim = actualTps * x; - if (lim < limits.tpsLimit){ - limits.tpsLimit = lim; + if (lim < limits->tpsLimit){ + limits->tpsLimit = lim; reasonID = tl.id; limitReason = tlogLimitReason; } @@ -568,8 +568,8 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { // Don't let any tlogs use up its target bytes faster than its MVCC window! double x = ((targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)) / inputRate; double lim = actualTps * x; - if (lim < limits.tpsLimit){ - limits.tpsLimit = lim; + if (lim < limits->tpsLimit){ + limits->tpsLimit = lim; reasonID = tl.id; limitReason = limitReason_t::log_server_mvcc_write_bandwidth; } @@ -578,10 +578,10 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { self->healthMetrics.worstTLogQueue = worstStorageQueueTLog; - limits.tpsLimit = std::max(limits.tpsLimit, 0.0); + limits->tpsLimit = std::max(limits->tpsLimit, 0.0); if(g_network->isSimulated() && g_simulator.speedUpSimulation) { - limits.tpsLimit = std::max(limits.tpsLimit, 100.0); + limits->tpsLimit = std::max(limits->tpsLimit, 100.0); } int64_t totalDiskUsageBytes = 0; @@ -592,13 +592,13 @@ void updateRate( RatekeeperData* self, RatekeeperLimits &limits ) { if (s.value.valid) totalDiskUsageBytes += s.value.lastReply.storageBytes.used; - limits.tpsLimitMetric = std::min(limits.tpsLimit, 1e6); - limits.reasonMetric = limitReason; + limits->tpsLimitMetric = std::min(limits->tpsLimit, 1e6); + limits->reasonMetric = limitReason; if (g_random->random01() < 0.1) { - std::string name = "RkUpdate" + limits.context; + std::string name = "RkUpdate" + limits->context; TraceEvent(name.c_str()) - .detail("TPSLimit", limits.tpsLimit) + .detail("TPSLimit", limits->tpsLimit) .detail("Reason", limitReason) .detail("ReasonServerID", reasonID) .detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate()) @@ -673,8 +673,8 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference Date: Thu, 7 Mar 2019 10:15:28 -0800 Subject: [PATCH 12/14] Data distributor pulls batch limited info from proxy Add a flag in HealthMetrics to indicate that batch priority is rate limited. Data distributor pulls this flag from proxy to know roughly when rate limiting happens. DD uses this information to determine when to do the rebalance in the background, i.e., moving data from heavily loaded servers to lighter ones. If the cluster is currently rate limited for batch commits, then the rebalance will use longer time intervals, otherwise use shorter intervals. See BgDDMountainChopper() and BgDDValleyFiller() in DataDistributionQueue.actor.cpp. --- fdbclient/FDBTypes.h | 8 +++++-- fdbserver/DataDistribution.actor.cpp | 28 ++++++++++++++++++++--- fdbserver/DataDistribution.actor.h | 3 ++- fdbserver/DataDistributionQueue.actor.cpp | 6 ++--- fdbserver/Ratekeeper.actor.cpp | 3 +++ 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 5b7de5e818..766d39831c 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -737,6 +737,7 @@ struct HealthMetrics { int64_t worstStorageDurabilityLag; int64_t worstTLogQueue; double tpsLimit; + bool batchLimited; std::map storageStats; std::map tLogQueue; @@ -745,6 +746,7 @@ struct HealthMetrics { , worstStorageDurabilityLag(0) , worstTLogQueue(0) , tpsLimit(0.0) + , batchLimited(false) {} void update(const HealthMetrics& hm, bool detailedInput, bool detailedOutput) @@ -753,6 +755,7 @@ struct HealthMetrics { worstStorageDurabilityLag = hm.worstStorageDurabilityLag; worstTLogQueue = hm.worstTLogQueue; tpsLimit = hm.tpsLimit; + batchLimited = hm.batchLimited; if (!detailedOutput) { storageStats.clear(); @@ -769,13 +772,14 @@ struct HealthMetrics { worstStorageDurabilityLag == r.worstStorageDurabilityLag && worstTLogQueue == r.worstTLogQueue && storageStats == r.storageStats && - tLogQueue == r.tLogQueue + tLogQueue == r.tLogQueue && + batchLimited == r.batchLimited ); } template void serialize(Ar& ar) { - serializer(ar, worstStorageQueue, worstStorageDurabilityLag, worstTLogQueue, tpsLimit, storageStats, tLogQueue); + serializer(ar, worstStorageQueue, worstStorageDurabilityLag, worstTLogQueue, tpsLimit, batchLimited, storageStats, tLogQueue); } }; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index d664e07249..e6af98ce49 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3455,7 +3455,7 @@ struct DataDistributorData : NonCopyable, ReferenceCounted DataDistributorData(Reference> const& db, UID id) : dbInfo(db), ddId(id) {} }; -ACTOR Future dataDistribution(Reference self) +ACTOR Future dataDistribution(Reference self, double* lastLimited) { state Database cx = openDBOnServer(self->dbInfo, TaskDataDistributionLaunch, true, true); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; @@ -3612,7 +3612,7 @@ ACTOR Future dataDistribution(Reference self) actors.push_back( pollMoveKeysLock(cx, lock) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) ); - actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); vector teamCollectionsPtrs; Reference primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ); @@ -3654,14 +3654,36 @@ static std::set const& normalDataDistributorErrors() { return s; } +ACTOR Future monitorBatchLimitedTime(Reference> db, double* lastLimited) { + loop { + wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) ); + while (db->get().client.proxies.size() == 0) { + wait(db->onChange()); + } + + state int idx = g_random->randomInt(0, db->get().client.proxies.size()); + choose { + when (wait(db->onChange())) {} + when (ErrorOr reply = wait( + db->get().client.proxies[idx].getHealthMetrics.getReplyUnlessFailedFor(GetHealthMetricsRequest(false), 1.0, 0))) { + if (reply.present() && reply.get().healthMetrics.batchLimited) { + *lastLimited = now(); + } + } + } + } +} + ACTOR Future dataDistributor(DataDistributorInterface di, Reference> db ) { state Reference self( new DataDistributorData(db, di.id()) ); state Future collection = actorCollection( self->addActor.getFuture() ); + state double lastLimited = 0; try { TraceEvent("DataDistributor_Running", di.id()); self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) ); - state Future distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() ); + self->addActor.send( monitorBatchLimitedTime(db, &lastLimited) ); + state Future distributor = reportErrorsExcept( dataDistribution(self, &lastLimited), "DataDistribution", di.id(), &normalDataDistributorErrors() ); wait( distributor || collection ); } diff --git a/fdbserver/DataDistribution.actor.h b/fdbserver/DataDistribution.actor.h index 109c57878b..1baff7f58e 100644 --- a/fdbserver/DataDistribution.actor.h +++ b/fdbserver/DataDistribution.actor.h @@ -230,7 +230,8 @@ Future dataDistributionQueue( MoveKeysLock const& lock, PromiseStream> const& getAverageShardBytes, UID const& distributorId, - int const& teamSize); + int const& teamSize, + double* const& lastLimited); //Holds the permitted size and IO Bounds for a shard struct ShardSizeBounds { diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index f0b8d28d41..f282ef12bc 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1201,10 +1201,10 @@ ACTOR Future dataDistributionQueue( MoveKeysLock lock, PromiseStream> getAverageShardBytes, UID distributorId, - int teamSize) + int teamSize, + double* lastLimited) { - state double lastLimited = 0; - state DDQueueData self( distributorId, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, &lastLimited ); + state DDQueueData self( distributorId, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, lastLimited ); state std::set serversToLaunchFrom; state KeyRange keysToLaunchFrom; state RelocateData launchData; diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index a0ff3ffd5e..46dcfe25f0 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -671,11 +671,13 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit; double tooOld = now() - 1.0; for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) { if (p->second.time < tooOld) @@ -707,6 +709,7 @@ ACTOR Future rateKeeper(RatekeeperInterface rkInterf, Reference Date: Mon, 11 Mar 2019 15:08:33 -0700 Subject: [PATCH 13/14] Update release notes. --- documentation/sphinx/source/release-notes.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index d0f1b945c5..1ffbfefe85 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -16,6 +16,8 @@ Features * Batch priority transactions are now limited separately by ratekeeper and will be throttled at lower levels of cluster saturation. This makes it possible to run a more intense background load at saturation without significantly affecting normal priority transactions. It is still recommended not to run excessive loads at batch priority. `(PR #1198) `_ * Restore now requires the destnation cluster to be specified explicitly to avoid confusion. `(PR #1240) `_ * Restore target version can now be specified by timestamp if the original cluster is available. `(PR #1240) `_ +* Separate data distribution out from master as a new role. `(PR #1062) `_ +* Separate rate keeper out from data distribution as a new role. `(PR ##1176) `_ Performance ----------- From 2b0139670e413d634425d59b76913c832ae6d1e4 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 12 Mar 2019 11:34:16 -0700 Subject: [PATCH 14/14] Fix review comment for PR 1176 --- fdbserver/ClusterController.actor.cpp | 26 ++++++-------- fdbserver/DataDistribution.actor.cpp | 50 +++++++++++++------------- fdbserver/Ratekeeper.actor.cpp | 52 +++++++++++---------------- fdbserver/RatekeeperInterface.h | 2 +- fdbserver/worker.actor.cpp | 4 +++ 5 files changed, 60 insertions(+), 74 deletions(-) diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index c4b0213eeb..55316b5ba7 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2335,12 +2335,10 @@ ACTOR Future handleForcedRecoveries( ClusterControllerData *self, ClusterC } ACTOR Future startDataDistributor( ClusterControllerData *self ) { - state Optional dcId = self->clusterControllerDcId; while ( !self->clusterControllerProcessId.present() || !self->masterProcessId.present() ) { wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) ); } - state UID reqId; loop { try { while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) { @@ -2348,19 +2346,18 @@ ACTOR Future startDataDistributor( ClusterControllerDa } std::map>, int> id_used = self->getUsedIds(); - state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used); - reqId = g_random->randomUniqueID(); - state InitializeDataDistributorRequest req(reqId); - TraceEvent("ClusterController_DataDistributorRecruit", req.reqId).detail("Addr", data_distributor.worker.first.address()); + state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used); + state InitializeDataDistributorRequest req(g_random->randomUniqueID()); + TraceEvent("ClusterController_DataDistributorRecruit", self->id).detail("Addr", data_distributor.worker.first.address()); ErrorOr distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) ); if (distributor.present()) { - TraceEvent("ClusterController_DataDistributorRecruited", req.reqId).detail("Addr", data_distributor.worker.first.address()); + TraceEvent("ClusterController_DataDistributorRecruited", self->id).detail("Addr", data_distributor.worker.first.address()); return distributor.get(); } } catch (Error& e) { - TraceEvent("ClusterController_DataDistributorRecruitError", reqId).error(e); + TraceEvent("ClusterController_DataDistributorRecruitError", self->id).error(e); if ( e.code() != error_code_no_more_servers ) { throw; } @@ -2398,7 +2395,6 @@ ACTOR Future monitorDataDistributor(ClusterControllerData *self) { } ACTOR Future startRatekeeper(ClusterControllerData *self) { - state UID reqId; loop { try { while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) { @@ -2406,20 +2402,18 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { } std::map>, int> id_used = self->getUsedIds(); - Optional dcId = self->clusterControllerDcId; - state WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used); - reqId = g_random->randomUniqueID(); - state InitializeRatekeeperRequest req(reqId); - TraceEvent("ClusterController_RecruitRatekeeper", req.reqId).detail("Addr", rkWorker.worker.first.address()); + state WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used); + state InitializeRatekeeperRequest req(g_random->randomUniqueID()); + TraceEvent("ClusterController_RecruitRatekeeper", self->id).detail("Addr", rkWorker.worker.first.address()); ErrorOr interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) ); if (interf.present()) { - TraceEvent("ClusterController_RatekeeperRecruited", req.reqId).detail("Addr", rkWorker.worker.first.address()); + TraceEvent("ClusterController_RatekeeperRecruited", self->id).detail("Addr", rkWorker.worker.first.address()); return interf.get(); } } catch (Error& e) { - TraceEvent("ClusterController_RatekeeperRecruitError", reqId).error(e); + TraceEvent("ClusterController_RatekeeperRecruitError", self->id).error(e); if ( e.code() != error_code_no_more_servers ) { throw; } diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index e6af98ce49..f57e5c6b13 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3455,8 +3455,30 @@ struct DataDistributorData : NonCopyable, ReferenceCounted DataDistributorData(Reference> const& db, UID id) : dbInfo(db), ddId(id) {} }; -ACTOR Future dataDistribution(Reference self, double* lastLimited) +ACTOR Future monitorBatchLimitedTime(Reference> db, double* lastLimited) { + loop { + wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) ); + + state Reference proxies(new ProxyInfo(db->get().client.proxies, db->get().myLocality)); + + choose { + when (wait(db->onChange())) {} + when (GetHealthMetricsReply reply = wait(proxies->size() ? + loadBalance(proxies, &MasterProxyInterface::getHealthMetrics, GetHealthMetricsRequest(false)) + : Never())) { + if (reply.healthMetrics.batchLimited) { + *lastLimited = now(); + } + } + } + } +} + +ACTOR Future dataDistribution(Reference self) { + state double lastLimited = 0; + self->addActor.send( monitorBatchLimitedTime(self->dbInfo, &lastLimited) ); + state Database cx = openDBOnServer(self->dbInfo, TaskDataDistributionLaunch, true, true); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; @@ -3612,7 +3634,7 @@ ACTOR Future dataDistribution(Reference self, double* actors.push_back( pollMoveKeysLock(cx, lock) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) ); - actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) ); vector teamCollectionsPtrs; Reference primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) ); @@ -3654,36 +3676,14 @@ static std::set const& normalDataDistributorErrors() { return s; } -ACTOR Future monitorBatchLimitedTime(Reference> db, double* lastLimited) { - loop { - wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) ); - while (db->get().client.proxies.size() == 0) { - wait(db->onChange()); - } - - state int idx = g_random->randomInt(0, db->get().client.proxies.size()); - choose { - when (wait(db->onChange())) {} - when (ErrorOr reply = wait( - db->get().client.proxies[idx].getHealthMetrics.getReplyUnlessFailedFor(GetHealthMetricsRequest(false), 1.0, 0))) { - if (reply.present() && reply.get().healthMetrics.batchLimited) { - *lastLimited = now(); - } - } - } - } -} - ACTOR Future dataDistributor(DataDistributorInterface di, Reference> db ) { state Reference self( new DataDistributorData(db, di.id()) ); state Future collection = actorCollection( self->addActor.getFuture() ); - state double lastLimited = 0; try { TraceEvent("DataDistributor_Running", di.id()); self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) ); - self->addActor.send( monitorBatchLimitedTime(db, &lastLimited) ); - state Future distributor = reportErrorsExcept( dataDistribution(self, &lastLimited), "DataDistribution", di.id(), &normalDataDistributorErrors() ); + state Future distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() ); wait( distributor || collection ); } diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 46dcfe25f0..a89271f0c0 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -294,50 +294,38 @@ ACTOR Future monitorServerListChange( Reference> dbInfo, PromiseStream< std::pair> > serverChanges) { state Database db = openDBOnServer(dbInfo, TaskRateKeeper, true, true); - state Future checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY); - state Future>> serverListAndProcessClasses = Never(); state std::map oldServers; state Transaction tr(db); loop { try { - choose { - when ( wait( checkSignal ) ) { - checkSignal = Never(); - serverListAndProcessClasses = getServerListAndProcessClasses(&tr); - } - when ( vector> results = wait( serverListAndProcessClasses ) ) { - serverListAndProcessClasses = Never(); + vector> results = wait(getServerListAndProcessClasses(&tr)); - std::map newServers; - for (int i = 0; i < results.size(); i++) { - const StorageServerInterface& ssi = results[i].first; - const UID serverId = ssi.id(); - newServers[serverId] = ssi; + std::map newServers; + for (int i = 0; i < results.size(); i++) { + const StorageServerInterface& ssi = results[i].first; + const UID serverId = ssi.id(); + newServers[serverId] = ssi; - if (oldServers.count(serverId)) { - if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) { - serverChanges.send( std::make_pair(serverId, Optional(ssi)) ); - } - oldServers.erase(serverId); - } else { - serverChanges.send( std::make_pair(serverId, Optional(ssi)) ); - } + if (oldServers.count(serverId)) { + if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) { + serverChanges.send( std::make_pair(serverId, Optional(ssi)) ); } - - for (const auto& it : oldServers) { - serverChanges.send( std::make_pair(it.first, Optional()) ); - } - - oldServers.swap(newServers); - tr = Transaction(db); - checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY); + oldServers.erase(serverId); + } else { + serverChanges.send( std::make_pair(serverId, Optional(ssi)) ); } } + + for (const auto& it : oldServers) { + serverChanges.send( std::make_pair(it.first, Optional()) ); + } + + oldServers.swap(newServers); + tr = Transaction(db); + wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY)); } catch(Error& e) { wait( tr.onError(e) ); - serverListAndProcessClasses = Never(); - checkSignal = Void(); } } } diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index c50447d544..cd8ffeb126 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -1,5 +1,5 @@ /* - * DataDistributorInterface.h + * RatekeeperInterface.h * * This source file is part of the FoundationDB open source project * diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 88d8ec85d0..2479adbff5 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -832,6 +832,7 @@ ACTOR Future workerServer( Reference connFile, Refe TEST(true); // Recruited while already a data distributor. } else { startRole( Role::DATA_DISTRIBUTOR, recruited.id(), interf.id() ); + DUMPTOKEN( recruited.waitFailure ); Future dataDistributorProcess = dataDistributor( recruited, dbInfo ); errorForwarders.add( forwardError( errors, Role::DATA_DISTRIBUTOR, recruited.id(), setWhenDoneOrError( dataDistributorProcess, ddInterf, Optional() ) ) ); @@ -849,6 +850,9 @@ ACTOR Future workerServer( Reference connFile, Refe TEST(true); // Recruited while already a ratekeeper. } else { startRole(Role::RATE_KEEPER, recruited.id(), interf.id()); + DUMPTOKEN( recruited.waitFailure ); + DUMPTOKEN( recruited.getRateInfo ); + Future ratekeeper = rateKeeper( recruited, dbInfo ); errorForwarders.add( forwardError( errors, Role::RATE_KEEPER, recruited.id(), setWhenDoneOrError( ratekeeper, rkInterf, Optional() ) ) ); rkInterf->set(Optional(recruited));