Merge pull request #1176 from jzhou77/ratekeeper

Make Ratekeeper a separate role
This commit is contained in:
Evan Tschannen 2019-03-12 15:58:59 -07:00 committed by GitHub
commit a7e45cff91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 486 additions and 289 deletions

View File

@ -339,6 +339,7 @@ cluster.messages log_servers_error Time
cluster.messages transaction_start_timeout Unable to start transaction after __ seconds.
cluster.messages unreachable_master_worker Unable to locate the master worker.
cluster.messages unreachable_dataDistributor_worker Unable to locate the data distributor worker.
cluster.messages unreachable_ratekeeper_worker Unable to locate the ratekeeper worker.
cluster.messages unreachable_processes The cluster has some unreachable processes.
cluster.messages unreadable_configuration Unable to read database configuration.
cluster.messages layer_status_incomplete Some or all of the layers subdocument could not be read.

View File

@ -16,6 +16,8 @@ Features
* Batch priority transactions are now limited separately by ratekeeper and will be throttled at lower levels of cluster saturation. This makes it possible to run a more intense background load at saturation without significantly affecting normal priority transactions. It is still recommended not to run excessive loads at batch priority. `(PR #1198) <https://github.com/apple/foundationdb/pull/1198>`_
* Restore now requires the destnation cluster to be specified explicitly to avoid confusion. `(PR #1240) <https://github.com/apple/foundationdb/pull/1240>`_
* Restore target version can now be specified by timestamp if the original cluster is available. `(PR #1240) <https://github.com/apple/foundationdb/pull/1240>`_
* Separate data distribution out from master as a new role. `(PR #1062) <https://github.com/apple/foundationdb/pull/1062>`_
* Separate rate keeper out from data distribution as a new role. `(PR ##1176) <https://github.com/apple/foundationdb/pull/1176>`_
Performance
-----------

View File

@ -737,6 +737,7 @@ struct HealthMetrics {
int64_t worstStorageDurabilityLag;
int64_t worstTLogQueue;
double tpsLimit;
bool batchLimited;
std::map<UID, StorageStats> storageStats;
std::map<UID, int64_t> tLogQueue;
@ -745,6 +746,7 @@ struct HealthMetrics {
, worstStorageDurabilityLag(0)
, worstTLogQueue(0)
, tpsLimit(0.0)
, batchLimited(false)
{}
void update(const HealthMetrics& hm, bool detailedInput, bool detailedOutput)
@ -753,6 +755,7 @@ struct HealthMetrics {
worstStorageDurabilityLag = hm.worstStorageDurabilityLag;
worstTLogQueue = hm.worstTLogQueue;
tpsLimit = hm.tpsLimit;
batchLimited = hm.batchLimited;
if (!detailedOutput) {
storageStats.clear();
@ -769,13 +772,14 @@ struct HealthMetrics {
worstStorageDurabilityLag == r.worstStorageDurabilityLag &&
worstTLogQueue == r.worstTLogQueue &&
storageStats == r.storageStats &&
tLogQueue == r.tLogQueue
tLogQueue == r.tLogQueue &&
batchLimited == r.batchLimited
);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, worstStorageQueue, worstStorageDurabilityLag, worstTLogQueue, tpsLimit, storageStats, tLogQueue);
serializer(ar, worstStorageQueue, worstStorageDurabilityLag, worstTLogQueue, tpsLimit, batchLimited, storageStats, tLogQueue);
}
};

View File

@ -324,6 +324,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"$enum":[
"unreachable_master_worker",
"unreachable_dataDistributor_worker",
"unreachable_ratekeeper_worker",
"unreadable_configuration",
"full_replication_timeout",
"client_issues",

View File

@ -164,26 +164,35 @@ ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) cons
}
case ProcessClass::DataDistributor:
switch( _class ) {
case ProcessClass::DataDistributorClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::MasterClass:
return ProcessClass::OkayFit;
case ProcessClass::ResolutionClass:
return ProcessClass::OkayFit;
case ProcessClass::TransactionClass:
return ProcessClass::OkayFit;
case ProcessClass::ProxyClass:
return ProcessClass::OkayFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::CoordinatorClass:
return ProcessClass::NeverAssign;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
case ProcessClass::DataDistributorClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::MasterClass:
return ProcessClass::OkayFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::CoordinatorClass:
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
case ProcessClass::RateKeeper:
switch( _class ) {
case ProcessClass::RateKeeperClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::MasterClass:
return ProcessClass::OkayFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::CoordinatorClass:
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
default:
return ProcessClass::NeverAssign;

View File

@ -26,9 +26,9 @@
struct ProcessClass {
// This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items!
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, DataDistributorClass, CoordinatorClass, InvalidClass = -1 };
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, DataDistributorClass, CoordinatorClass, RateKeeperClass, InvalidClass = -1 };
enum Fitness { BestFit, GoodFit, UnsetFit, OkayFit, WorstFit, ExcludeFit, NeverAssign }; //cannot be larger than 7 because of leader election mask
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController, DataDistributor, NoRole };
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController, DataDistributor, RateKeeper, NoRole };
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };
int16_t _class;
int16_t _source;
@ -50,6 +50,7 @@ public:
else if (s=="cluster_controller") _class = ClusterControllerClass;
else if (s=="data_distributor") _class = DataDistributorClass;
else if (s=="coordinator") _class = CoordinatorClass;
else if (s=="ratekeeper") _class = RateKeeperClass;
else _class = InvalidClass;
}
@ -67,6 +68,7 @@ public:
else if (classStr=="cluster_controller") _class = ClusterControllerClass;
else if (classStr=="data_distributor") _class = DataDistributorClass;
else if (classStr=="coordinator") _class = CoordinatorClass;
else if (classStr=="ratekeeper") _class = RateKeeperClass;
else _class = InvalidClass;
if (sourceStr=="command_line") _source = CommandLineSource;
@ -99,6 +101,7 @@ public:
case ClusterControllerClass: return "cluster_controller";
case DataDistributorClass: return "data_distributor";
case CoordinatorClass: return "coordinator";
case RateKeeperClass: return "ratekeeper";
default: return "invalid";
}
}

View File

@ -99,6 +99,7 @@ public:
case ProcessClass::LogRouterClass: return false;
case ProcessClass::ClusterControllerClass: return false;
case ProcessClass::DataDistributorClass: return false;
case ProcessClass::RateKeeperClass: return false;
default: return false;
}
}

View File

@ -56,7 +56,7 @@ set(FDBSERVER_SRCS
QuietDatabase.actor.cpp
QuietDatabase.h
Ratekeeper.actor.cpp
Ratekeeper.h
RatekeeperInterface.h
RecoveryState.h
Restore.actor.cpp
RestoreInterface.h

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/Status.h"
#include "fdbserver/LatencyBandConfig.h"
@ -110,17 +111,28 @@ public:
{
}
void setDistributor(const DataDistributorInterface& distributorInterf) {
void setDistributor(const DataDistributorInterface& interf) {
ServerDBInfo newInfo = serverInfo->get();
newInfo.id = g_random->randomUniqueID();
newInfo.distributor = distributorInterf;
newInfo.distributor = interf;
serverInfo->set( newInfo );
}
void clearDistributor() {
void setRatekeeper(const RatekeeperInterface& interf) {
ServerDBInfo newInfo = serverInfo->get();
newInfo.id = g_random->randomUniqueID();
newInfo.distributor = Optional<DataDistributorInterface>();
newInfo.ratekeeper = interf;
serverInfo->set( newInfo );
}
void clearInterf(ProcessClass::ClassType t) {
ServerDBInfo newInfo = serverInfo->get();
newInfo.id = g_random->randomUniqueID();
if (t == ProcessClass::DataDistributorClass) {
newInfo.distributor = Optional<DataDistributorInterface>();
} else if (t == ProcessClass::RateKeeperClass) {
newInfo.ratekeeper = Optional<RatekeeperInterface>();
}
serverInfo->set( newInfo );
}
};
@ -524,6 +536,9 @@ public:
if (db.serverInfo->get().distributor.present()) {
(*id_used)[db.serverInfo->get().distributor.get().locality.processId()]++;
}
if (db.serverInfo->get().ratekeeper.present()) {
(*id_used)[db.serverInfo->get().ratekeeper.get().locality.processId()]++;
}
}
RecruitRemoteFromConfigurationReply findRemoteWorkersForConfiguration( RecruitRemoteFromConfigurationRequest const& req ) {
@ -921,6 +936,9 @@ public:
if (db.serverInfo->get().distributor.present()) {
id_used[db.serverInfo->get().distributor.get().locality.processId()]++;
}
if (db.serverInfo->get().ratekeeper.present()) {
id_used[db.serverInfo->get().ratekeeper.get().locality.processId()]++;
}
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true);
if ( oldMasterFit < mworker.fitness )
@ -1106,6 +1124,9 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
if (cluster->db.serverInfo->get().distributor.present()) {
id_used[cluster->db.serverInfo->get().distributor.get().locality.processId()]++;
}
if (cluster->db.serverInfo->get().ratekeeper.present()) {
id_used[cluster->db.serverInfo->get().ratekeeper.get().locality.processId()]++;
}
state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used);
if( ( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.first.locality.processId() == cluster->clusterControllerProcessId )
&& now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
@ -1141,6 +1162,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
++dbInfo.masterLifetime;
dbInfo.clusterInterface = db->serverInfo->get().clusterInterface;
dbInfo.distributor = db->serverInfo->get().distributor;
dbInfo.ratekeeper = db->serverInfo->get().ratekeeper;
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->serverInfo->set( dbInfo );
@ -1752,7 +1774,12 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
if ( req.distributorInterf.present() && !self->db.serverInfo->get().distributor.present() ) {
const DataDistributorInterface& di = req.distributorInterf.get();
TraceEvent("ClusterController_RegisterDataDistributor", self->id).detail("DDID", di.id());
self->db.setDistributor( di );
self->db.setDistributor(di);
}
if ( req.ratekeeperInterf.present() && !self->db.serverInfo->get().ratekeeper.present() ) {
const RatekeeperInterface& rki = req.ratekeeperInterf.get();
TraceEvent("ClusterController_RegisterRatekeeper", self->id).detail("RKID", rki.id());
self->db.setRatekeeper(rki);
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo );
@ -2308,8 +2335,6 @@ ACTOR Future<Void> handleForcedRecoveries( ClusterControllerData *self, ClusterC
}
ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerData *self ) {
state Optional<Key> dcId = self->clusterControllerDcId;
state InitializeDataDistributorRequest req;
while ( !self->clusterControllerProcessId.present() || !self->masterProcessId.present() ) {
wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
}
@ -2321,18 +2346,18 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
req.reqId = g_random->randomUniqueID();
TraceEvent("ClusterController_DataDistributorRecruit", req.reqId).detail("Addr", data_distributor.worker.first.address());
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
state InitializeDataDistributorRequest req(g_random->randomUniqueID());
TraceEvent("ClusterController_DataDistributorRecruit", self->id).detail("Addr", data_distributor.worker.first.address());
ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) );
if (distributor.present()) {
TraceEvent("ClusterController_DataDistributorRecruited", req.reqId).detail("Addr", data_distributor.worker.first.address());
TraceEvent("ClusterController_DataDistributorRecruited", self->id).detail("Addr", data_distributor.worker.first.address());
return distributor.get();
}
}
catch (Error& e) {
TraceEvent("ClusterController_DataDistributorRecruitError", req.reqId).error(e);
TraceEvent("ClusterController_DataDistributorRecruitError", self->id).error(e);
if ( e.code() != error_code_no_more_servers ) {
throw;
}
@ -2341,7 +2366,7 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
}
}
ACTOR Future<Void> waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterControllerFullInterface *clusterInterface ) {
ACTOR Future<Void> monitorDataDistributor(ClusterControllerData *self) {
state Future<Void> initialDelay = delay(SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY);
// wait for a while to see if existing data distributor will join.
@ -2361,10 +2386,66 @@ ACTOR Future<Void> waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterCo
wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) );
TraceEvent("ClusterController", self->id)
.detail("DataDistributorDied", self->db.serverInfo->get().distributor.get().id());
self->db.clearDistributor();
self->db.clearInterf(ProcessClass::DataDistributorClass);
} else {
DataDistributorInterface distributorInterf = wait( startDataDistributor(self) );
self->db.setDistributor( distributorInterf );
self->db.setDistributor(distributorInterf);
}
}
}
ACTOR Future<RatekeeperInterface> startRatekeeper(ClusterControllerData *self) {
loop {
try {
while ( self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS ) {
wait( self->db.serverInfo->onChange() );
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
state WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used);
state InitializeRatekeeperRequest req(g_random->randomUniqueID());
TraceEvent("ClusterController_RecruitRatekeeper", self->id).detail("Addr", rkWorker.worker.first.address());
ErrorOr<RatekeeperInterface> interf = wait( rkWorker.worker.first.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) );
if (interf.present()) {
TraceEvent("ClusterController_RatekeeperRecruited", self->id).detail("Addr", rkWorker.worker.first.address());
return interf.get();
}
}
catch (Error& e) {
TraceEvent("ClusterController_RatekeeperRecruitError", self->id).error(e);
if ( e.code() != error_code_no_more_servers ) {
throw;
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
state Future<Void> initialDelay = delay(SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY);
// wait for a while to see if an existing ratekeeper will join.
loop choose {
when ( wait(initialDelay) ) { break; }
when ( wait(self->db.serverInfo->onChange()) ) { // Rejoins via worker registration
if ( self->db.serverInfo->get().ratekeeper.present() ) {
TraceEvent("ClusterController_GotRateKeeper", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
break;
}
}
}
loop {
if ( self->db.serverInfo->get().ratekeeper.present() ) {
wait( waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ) );
TraceEvent("ClusterController_RateKeeperDied", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
self->db.clearInterf(ProcessClass::RateKeeperClass);
} else {
RatekeeperInterface rkInterf = wait( startRatekeeper(self) );
self->db.setRatekeeper(rkInterf);
}
}
}
@ -2385,8 +2466,9 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
self.addActor.send( updatedChangingDatacenters(&self) );
self.addActor.send( updatedChangedDatacenters(&self) );
self.addActor.send( updateDatacenterVersionDifference(&self) );
self.addActor.send( waitDDRejoinOrStartDD(&self, &interf) );
self.addActor.send( handleForcedRecoveries(&self, interf) );
self.addActor.send( monitorDataDistributor(&self) );
self.addActor.send( monitorRatekeeper(&self) );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
loop choose {

View File

@ -168,15 +168,16 @@ struct RegisterWorkerRequest {
ClusterControllerPriorityInfo priorityInfo;
Generation generation;
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
ReplyPromise<RegisterWorkerReply> reply;
RegisterWorkerRequest() : priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional<DataDistributorInterface> ddInterf) :
wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, Generation generation, Optional<DataDistributorInterface> ddInterf, Optional<RatekeeperInterface> rkInterf) :
wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, reply);
serializer(ar, wi, initialClass, processClass, priorityInfo, generation, distributorInterf, ratekeeperInterf, reply);
}
};

View File

@ -29,7 +29,6 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Ratekeeper.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbrpc/Replication.h"
#include "flow/UnitTest.h"
@ -570,7 +569,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
PromiseStream<UID> removedServers;
std::set<UID> recruitingIds; // The IDs of the SS which are being recruited
std::set<NetworkAddress> recruitingLocalities;
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> serverChanges;
Future<Void> initialFailureReactionDelay;
Future<Void> initializationDoneActor;
Promise<Void> serverTrackerErrorOut;
@ -629,13 +627,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
DatabaseConfiguration configuration, std::vector<Optional<Key>> includedDCs,
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
Optional<PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>> const& serverChanges,
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
Reference<AsyncVar<bool>> processingUnhealthy)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), teamBuilder(Void()),
badTeamRemover(Void()), redundantTeamRemover(Void()), configuration(configuration),
serverChanges(serverChanges), readyToStart(readyToStart),
readyToStart(readyToStart),
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskDataDistribution)),
initialFailureReactionDelay(
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution)),
@ -2839,10 +2836,6 @@ ACTOR Future<Void> storageServerTracker(
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
state bool hasWrongStoreTypeOrDC = false;
if(self->serverChanges.present()) {
self->serverChanges.get().send( std::make_pair(server->id, server->lastKnownInterface) );
}
try {
loop {
status.isUndesired = false;
@ -2933,9 +2926,6 @@ ACTOR Future<Void> storageServerTracker(
when( wait( failureTracker ) ) {
// The server is failed AND all data has been removed from it, so permanently remove it.
TraceEvent("StatusMapChange", self->distributorId).detail("ServerID", server->id).detail("Status", "Removing");
if(self->serverChanges.present()) {
self->serverChanges.get().send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
}
if(server->updated.canBeSet()) {
server->updated.send(Void());
@ -3040,9 +3030,6 @@ ACTOR Future<Void> storageServerTracker(
}
interfaceChanged = server->onInterfaceChanged;
if(self->serverChanges.present()) {
self->serverChanges.get().send( std::make_pair(server->id, server->lastKnownInterface) );
}
// We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location
status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality );
@ -3460,13 +3447,39 @@ ACTOR Future<Void> pollMoveKeysLock( Database cx, MoveKeysLock lock ) {
}
}
ACTOR Future<Void> dataDistribution(
Reference<AsyncVar<struct ServerDBInfo>> db,
UID myId,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
double* lastLimited)
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
Reference<AsyncVar<struct ServerDBInfo>> dbInfo;
UID ddId;
PromiseStream<Future<Void>> addActor;
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id) : dbInfo(db), ddId(id) {}
};
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db, double* lastLimited) {
loop {
wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) );
state Reference<ProxyInfo> proxies(new ProxyInfo(db->get().client.proxies, db->get().myLocality));
choose {
when (wait(db->onChange())) {}
when (GetHealthMetricsReply reply = wait(proxies->size() ?
loadBalance(proxies, &MasterProxyInterface::getHealthMetrics, GetHealthMetricsRequest(false))
: Never())) {
if (reply.healthMetrics.batchLimited) {
*lastLimited = now();
}
}
}
}
}
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
{
state Database cx = openDBOnServer(db, TaskDataDistributionLaunch, true, true);
state double lastLimited = 0;
self->addActor.send( monitorBatchLimitedTime(self->dbInfo, &lastLimited) );
state Database cx = openDBOnServer(self->dbInfo, TaskDataDistributionLaunch, true, true);
cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE;
//cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) );
@ -3481,10 +3494,10 @@ ACTOR Future<Void> dataDistribution(
loop {
try {
loop {
TraceEvent("DDInitTakingMoveKeysLock", myId);
MoveKeysLock lock_ = wait( takeMoveKeysLock( cx, myId ) );
TraceEvent("DDInitTakingMoveKeysLock", self->ddId);
MoveKeysLock lock_ = wait( takeMoveKeysLock( cx, self->ddId ) );
lock = lock_;
TraceEvent("DDInitTookMoveKeysLock", myId);
TraceEvent("DDInitTookMoveKeysLock", self->ddId);
DatabaseConfiguration configuration_ = wait( getDatabaseConfiguration(cx) );
configuration = configuration_;
@ -3498,7 +3511,7 @@ ACTOR Future<Void> dataDistribution(
remoteDcIds.push_back( regions[1].dcId );
}
TraceEvent("DDInitGotConfiguration", myId).detail("Conf", configuration.toString());
TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", configuration.toString());
state Transaction tr(cx);
loop {
@ -3528,24 +3541,24 @@ ACTOR Future<Void> dataDistribution(
}
}
TraceEvent("DDInitUpdatedReplicaKeys", myId);
Reference<InitialDataDistribution> initData_ = wait( getInitialDataDistribution(cx, myId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>() ) );
TraceEvent("DDInitUpdatedReplicaKeys", self->ddId);
Reference<InitialDataDistribution> initData_ = wait( getInitialDataDistribution(cx, self->ddId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>() ) );
initData = initData_;
if(initData->shards.size() > 1) {
TraceEvent("DDInitGotInitialDD", myId)
TraceEvent("DDInitGotInitialDD", self->ddId)
.detail("B", printable(initData->shards.end()[-2].key))
.detail("E", printable(initData->shards.end()[-1].key))
.detail("Src", describe(initData->shards.end()[-2].primarySrc))
.detail("Dest", describe(initData->shards.end()[-2].primaryDest))
.trackLatest("InitialDD");
} else {
TraceEvent("DDInitGotInitialDD", myId).detail("B","").detail("E", "").detail("Src", "[no items]").detail("Dest", "[no items]").trackLatest("InitialDD");
TraceEvent("DDInitGotInitialDD", self->ddId).detail("B","").detail("E", "").detail("Src", "[no items]").detail("Dest", "[no items]").trackLatest("InitialDD");
}
if (initData->mode) break; // mode may be set true by system operator using fdbcli
TraceEvent("DataDistributionDisabled", myId);
TraceEvent("DataDistributionDisabled", self->ddId);
TraceEvent("MovingData", myId)
TraceEvent("MovingData", self->ddId)
.detail( "InFlight", 0 )
.detail( "InQueue", 0 )
.detail( "AverageShardSize", -1 )
@ -3554,8 +3567,8 @@ ACTOR Future<Void> dataDistribution(
.detail( "HighestPriority", 0 )
.trackLatest( "MovingData" );
TraceEvent("TotalDataInFlight", myId).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight");
TraceEvent("TotalDataInFlight", myId).detail("Primary", false).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1).trackLatest("TotalDataInFlightRemote");
TraceEvent("TotalDataInFlight", self->ddId).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight");
TraceEvent("TotalDataInFlight", self->ddId).detail("Primary", false).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1).trackLatest("TotalDataInFlightRemote");
wait( waitForDataDistributionEnabled(cx) );
TraceEvent("DataDistributionEnabled");
@ -3573,12 +3586,12 @@ ACTOR Future<Void> dataDistribution(
state Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
state int shard = 0;
for(; shard<initData->shards.size() - 1; shard++) {
for (; shard < initData->shards.size() - 1; shard++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[shard].key, initData->shards[shard+1].key);
shardsAffectedByTeamFailure->defineShard(keys);
std::vector<ShardsAffectedByTeamFailure::Team> teams;
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].primarySrc, true));
if(configuration.usableRegions > 1) {
if (configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
}
if(g_network->isSimulated()) {
@ -3587,11 +3600,11 @@ ACTOR Future<Void> dataDistribution(
}
shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[shard].hasDest) {
if (initData->shards[shard].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
bool unhealthy = initData->shards[shard].primarySrc.size() != configuration.storageTeamSize;
if(!unhealthy && configuration.usableRegions > 1) {
if (!unhealthy && configuration.usableRegions > 1) {
unhealthy = initData->shards[shard].remoteSrc.size() != configuration.storageTeamSize;
}
output.send( RelocateShard( keys, unhealthy ? PRIORITY_TEAM_UNHEALTHY : PRIORITY_RECOVER_MOVE ) );
@ -3620,20 +3633,20 @@ ACTOR Future<Void> dataDistribution(
}
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, myId ), "DDTracker", myId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, myId, storageTeamSize, lastLimited ), "DDQueue", myId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
vector<DDTeamCollection*> teamCollectionsPtrs;
Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, myId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
Reference<DDTeamCollection> remoteTeamCollection( new DDTeamCollection(cx, myId, lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered(db), zeroHealthyTeams[1], false, processingUnhealthy) );
Reference<DDTeamCollection> remoteTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), readyToStart.getFuture() && remoteRecovered(self->dbInfo), zeroHealthyTeams[1], false, processingUnhealthy) );
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], db ), "DDTeamCollectionSecondary", myId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], self->dbInfo ), "DDTeamCollectionSecondary", self->ddId, &normalDDQueueErrors() ) );
}
primaryTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( primaryTeamCollection, initData, tcis[0], db ), "DDTeamCollectionPrimary", myId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( primaryTeamCollection, initData, tcis[0], self->dbInfo ), "DDTeamCollectionPrimary", self->ddId, &normalDDQueueErrors() ) );
actors.push_back(yieldPromiseStream(output.getFuture(), input));
wait( waitForAll( actors ) );
@ -3651,15 +3664,6 @@ ACTOR Future<Void> dataDistribution(
}
}
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
Reference<AsyncVar<struct ServerDBInfo>> dbInfo;
UID ddId;
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
PromiseStream<Future<Void>> addActor;
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id) : dbInfo(db), ddId(id) {}
};
static std::set<int> const& normalDataDistributorErrors() {
static std::set<int> s;
if (s.empty()) {
@ -3672,31 +3676,14 @@ static std::set<int> const& normalDataDistributorErrors() {
return s;
}
static std::set<int> const& normalRateKeeperErrors() {
static std::set<int> s;
if (s.empty()) {
s.insert( error_code_worker_removed );
s.insert( error_code_broken_promise );
s.insert( error_code_actor_cancelled );
s.insert( error_code_please_reboot );
}
return s;
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state UID lastClusterControllerID(0,0);
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
TraceEvent("DataDistributor_Starting", di.id());
self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) );
try {
TraceEvent("DataDistributor_Running", di.id());
state PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
state double lastLimited = 0;
state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), ddStorageServerChanges, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) );
self->addActor.send( waitFailureServer(di.waitFailure.getFuture()) );
state Future<Void> distributor = reportErrorsExcept( dataDistribution(self), "DataDistribution", di.id(), &normalDataDistributorErrors() );
wait( distributor || collection );
}
@ -3732,7 +3719,6 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
conf,
{},
{},
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>(),
Future<Void>(Void()),
Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) ),
true,
@ -3765,7 +3751,7 @@ DDTeamCollection* testMachineTeamCollection(int teamSize, IRepPolicyRef policy,
DDTeamCollection* collection =
new DDTeamCollection(database, UID(0, 0), MoveKeysLock(), PromiseStream<RelocateShard>(),
Reference<ShardsAffectedByTeamFailure>(new ShardsAffectedByTeamFailure()), conf, {}, {},
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>(), Future<Void>(Void()),
Future<Void>(Void()),
Reference<AsyncVar<bool>>(new AsyncVar<bool>(true)), true,
Reference<AsyncVar<bool>>(new AsyncVar<bool>(false)));

View File

@ -253,5 +253,6 @@ int64_t getMaxShardSize( double dbSizeEstimate );
class DDTeamCollection;
ACTOR Future<Void> teamRemover(DDTeamCollection* self);
ACTOR Future<Void> teamRemoverPeriodic(DDTeamCollection* self);
ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(Transaction* tr);
#endif

View File

@ -21,21 +21,19 @@
#ifndef FDBSERVER_DATADISTRIBUTORINTERFACE_H
#define FDBSERVER_DATADISTRIBUTORINTERFACE_H
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
struct DataDistributorInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct GetRateInfoRequest> getRateInfo;
struct LocalityData locality;
DataDistributorInterface() {}
explicit DataDistributorInterface(const struct LocalityData& l) : locality(l) {}
void initEndpoints() {}
UID id() const { return getRateInfo.getEndpoint().token; }
NetworkAddress address() const { return getRateInfo.getEndpoint().getPrimaryAddress(); }
UID id() const { return waitFailure.getEndpoint().token; }
NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); }
bool operator== (const DataDistributorInterface& r) const {
return id() == r.id();
}
@ -45,36 +43,7 @@ struct DataDistributorInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, getRateInfo, locality);
}
};
struct GetRateInfoRequest {
UID requesterID;
int64_t totalReleasedTransactions;
int64_t batchReleasedTransactions;
bool detailed;
ReplyPromise<struct GetRateInfoReply> reply;
GetRateInfoRequest() {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply);
}
};
struct GetRateInfoReply {
double transactionRate;
double batchTransactionRate;
double leaseDuration;
HealthMetrics healthMetrics;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics);
serializer(ar, waitFailure, locality);
}
};

View File

@ -307,11 +307,13 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY, 5.0 );
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );
init( WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 1.0 );
init( WAIT_FOR_RATEKEEPER_JOIN_DELAY, 1.0 );
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
init( CHECK_OUTSTANDING_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) CHECK_OUTSTANDING_INTERVAL = 0.001;
init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0;
init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND );
init( FORCE_RECOVERY_CHECK_DELAY, 5.0 );
init( RATEKEEPER_FAILURE_TIME, 1.0 );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );

View File

@ -248,12 +248,14 @@ public:
double WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY;
double ATTEMPT_RECRUITMENT_DELAY;
double WAIT_FOR_DISTRIBUTOR_JOIN_DELAY;
double WAIT_FOR_RATEKEEPER_JOIN_DELAY;
double WORKER_FAILURE_TIME;
double CHECK_OUTSTANDING_INTERVAL;
double INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
double VERSION_LAG_METRIC_INTERVAL;
int64_t MAX_VERSION_DIFFERENCE;
double FORCE_RECOVERY_CHECK_DELAY;
double RATEKEEPER_FAILURE_TIME;
// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

View File

@ -76,17 +76,6 @@ struct ProxyStats {
}
};
ACTOR template <class T>
Future<Void> forwardValue(Promise<T> out, Future<T> in)
{
// Like forwardPromise, but throws on error
T t = wait(in);
out.send(t);
return Void();
}
int getBytes(Promise<Version> const& r) { return 0; }
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
state Future<Void> nextRequestTimer = Never();
@ -94,21 +83,17 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
state Future<GetRateInfoReply> reply = Never();
state double lastDetailedReply = 0.0; // request detailed metrics immediately
state bool expectingDetailedReply = false;
state int64_t lastTC = 0;
if (db->get().distributor.present()) {
nextRequestTimer = Void();
}
if (db->get().ratekeeper.present()) nextRequestTimer = Void();
loop choose {
when ( wait( db->onChange() ) ) {
if ( db->get().distributor.present() ) {
TraceEvent("Proxy_DataDistributorChanged", myID)
.detail("DDID", db->get().distributor.get().id());
nextRequestTimer = Void(); // trigger GetRate request
if ( db->get().ratekeeper.present() ) {
TraceEvent("Proxy_RatekeeperChanged", myID)
.detail("RKID", db->get().ratekeeper.get().id());
nextRequestTimer = Void(); // trigger GetRate request
} else {
TraceEvent("Proxy_DataDistributorDied", myID);
TraceEvent("Proxy_RatekeeperDied", myID);
nextRequestTimer = Never();
reply = Never();
}
@ -116,7 +101,7 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed)));
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed)));
expectingDetailedReply = detailed;
}
when ( GetRateInfoReply rep = wait(reply) ) {

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,13 +19,14 @@
*/
#include "flow/IndexedSet.h"
#include "fdbserver/Ratekeeper.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/Knobs.h"
#include "fdbrpc/Smoother.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/WaitFailure.h"
#include "flow/actorcompiler.h" // This must be the last #include.
enum limitReason_t {
@ -146,7 +147,7 @@ struct TransactionCounts {
TransactionCounts() : total(0), batch(0), time(0) {}
};
struct Ratekeeper {
struct RatekeeperData {
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
@ -154,16 +155,16 @@ struct Ratekeeper {
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
DatabaseConfiguration configuration;
PromiseStream<Future<Void>> addActor;
Int64MetricHandle actualTpsMetric;
double lastWarning;
double* lastLimited;
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
lastWarning(0),
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE),
@ -172,7 +173,7 @@ struct Ratekeeper {
};
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerInterface ssi ) {
ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageServerInterface ssi ) {
self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) );
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
TraceEvent("RkTracking", ssi.id());
@ -217,7 +218,7 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
}
}
ACTOR Future<Void> trackTLogQueueInfo( Ratekeeper* self, TLogInterface tli ) {
ACTOR Future<Void> trackTLogQueueInfo( RatekeeperData* self, TLogInterface tli ) {
self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) );
state Map<UID, TLogQueueInfo>::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id());
TraceEvent("RkTracking", tli.id());
@ -270,7 +271,7 @@ ACTOR Future<Void> splitError( Future<Void> in, Promise<Void> errOut ) {
}
ACTOR Future<Void> trackEachStorageServer(
Ratekeeper* self,
RatekeeperData* self,
FutureStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges )
{
state Map<UID, Future<Void>> actors;
@ -289,7 +290,47 @@ ACTOR Future<Void> trackEachStorageServer(
}
}
void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
ACTOR Future<Void> monitorServerListChange(
Reference<AsyncVar<ServerDBInfo>> dbInfo,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges) {
state Database db = openDBOnServer(dbInfo, TaskRateKeeper, true, true);
state std::map<UID, StorageServerInterface> oldServers;
state Transaction tr(db);
loop {
try {
vector<std::pair<StorageServerInterface, ProcessClass>> results = wait(getServerListAndProcessClasses(&tr));
std::map<UID, StorageServerInterface> newServers;
for (int i = 0; i < results.size(); i++) {
const StorageServerInterface& ssi = results[i].first;
const UID serverId = ssi.id();
newServers[serverId] = ssi;
if (oldServers.count(serverId)) {
if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
oldServers.erase(serverId);
} else {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
}
for (const auto& it : oldServers) {
serverChanges.send( std::make_pair(it.first, Optional<StorageServerInterface>()) );
}
oldServers.swap(newServers);
tr = Transaction(db);
wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
} catch(Error& e) {
wait( tr.onError(e) );
}
}
}
void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
//double controlFactor = ; // dt / eFoldingTime
double actualTps = self->smoothReleasedTransactions.smoothRate();
@ -297,7 +338,7 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
limits.tpsLimit = std::numeric_limits<double>::infinity();
limits->tpsLimit = std::numeric_limits<double>::infinity();
UID reasonID = UID();
limitReason_t limitReason = limitReason_t::unlimited;
@ -323,9 +364,9 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.storageTargetBytes) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits->storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits->storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits->storageTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
ssLimitReason = limitReason_t::storage_server_min_free_space;
} else {
@ -389,9 +430,9 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss));
if(limitTps < limits.tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
if (limitTps < limits->tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
reasonID = ss.id;
limits.tpsLimit = limitTps;
limits->tpsLimit = limitTps;
limitReason = ssLimitReason;
}
@ -402,19 +443,19 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer;
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
for(auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits.tpsLimit; ++ss) {
if(ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
ignoredMachines.insert(ss->second->locality.zoneId());
continue;
}
if(ignoredMachines.count(ss->second->locality.zoneId()) > 0) {
if (ignoredMachines.count(ss->second->locality.zoneId()) > 0) {
continue;
}
limitingStorageQueueStorageServer = ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal();
limits.tpsLimit = ss->first;
limitReason = ssReasons[storageTpsLimitReverseIndex.begin()->second->id];
limits->tpsLimit = ss->first;
reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
limitReason = ssReasons[reasonID];
break;
}
@ -426,27 +467,27 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
{
Version minSSVer = std::numeric_limits<Version>::max();
Version minLimitingSSVer = std::numeric_limits<Version>::max();
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
auto& ss = i->value;
for (const auto& it : self->storageQueueInfo) {
auto& ss = it.value;
if (!ss.valid) continue;
minSSVer = std::min(minSSVer, ss.lastReply.version);
// Machines that ratekeeper isn't controlling can fall arbitrarily far behind
if(ignoredMachines.count(i->value.locality.zoneId()) == 0) {
if (ignoredMachines.count(it.value.locality.zoneId()) == 0) {
minLimitingSSVer = std::min(minLimitingSSVer, ss.lastReply.version);
}
}
Version maxTLVer = std::numeric_limits<Version>::min();
for(auto i = self->tlogQueueInfo.begin(); i != self->tlogQueueInfo.end(); ++i) {
auto& tl = i->value;
for(const auto& it : self->tlogQueueInfo) {
auto& tl = it.value;
if (!tl.valid) continue;
maxTLVer = std::max(maxTLVer, tl.lastReply.v);
}
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits.maxVersionDifference/2) / (limits.maxVersionDifference/4);
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference/2) / (limits->maxVersionDifference/4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer);
}
@ -454,8 +495,8 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
int64_t worstFreeSpaceTLog = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueTLog = 0;
int tlcount = 0;
for(auto i = self->tlogQueueInfo.begin(); i != self->tlogQueueInfo.end(); ++i) {
auto& tl = i->value;
for (auto& it : self->tlogQueueInfo) {
auto& tl = it.value;
if (!tl.valid) continue;
++tlcount;
@ -465,9 +506,9 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.logTargetBytes) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits->logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits->logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits->logTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
tlogLimitReason = limitReason_t::log_server_min_free_space;
} else {
@ -487,7 +528,7 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
}
reasonID = tl.id;
limitReason = limitReason_t::log_server_min_free_space;
limits.tpsLimit = 0.0;
limits->tpsLimit = 0.0;
}
double targetRateRatio = std::min( ( b + springBytes ) / (double)springBytes, 2.0 );
@ -505,8 +546,8 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
if (targetRateRatio < .75) //< FIXME: KNOB for 2.0
x = std::max(x, 0.95);
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
if (lim < limits->tpsLimit){
limits->tpsLimit = lim;
reasonID = tl.id;
limitReason = tlogLimitReason;
}
@ -515,8 +556,8 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
// Don't let any tlogs use up its target bytes faster than its MVCC window!
double x = ((targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)) / inputRate;
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
if (lim < limits->tpsLimit){
limits->tpsLimit = lim;
reasonID = tl.id;
limitReason = limitReason_t::log_server_mvcc_write_bandwidth;
}
@ -525,10 +566,10 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
self->healthMetrics.worstTLogQueue = worstStorageQueueTLog;
limits.tpsLimit = std::max(limits.tpsLimit, 0.0);
limits->tpsLimit = std::max(limits->tpsLimit, 0.0);
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
limits.tpsLimit = std::max(limits.tpsLimit, 100.0);
limits->tpsLimit = std::max(limits->tpsLimit, 100.0);
}
int64_t totalDiskUsageBytes = 0;
@ -539,13 +580,13 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
if (s.value.valid)
totalDiskUsageBytes += s.value.lastReply.storageBytes.used;
limits.tpsLimitMetric = std::min(limits.tpsLimit, 1e6);
limits.reasonMetric = limitReason;
limits->tpsLimitMetric = std::min(limits->tpsLimit, 1e6);
limits->reasonMetric = limitReason;
if (g_random->random01() < 0.1) {
std::string name = "RkUpdate" + limits.context;
std::string name = "RkUpdate" + limits->context;
TraceEvent(name.c_str())
.detail("TPSLimit", limits.tpsLimit)
.detail("TPSLimit", limits->tpsLimit)
.detail("Reason", limitReason)
.detail("ReasonServerID", reasonID)
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
@ -566,7 +607,7 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
}
}
ACTOR Future<Void> configurationMonitor( Ratekeeper* self, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo, DatabaseConfiguration* conf) {
state Database cx = openDBOnServer(dbInfo, TaskDefaultEndpoint, true, true);
loop {
state ReadYourWritesTransaction tr(cx);
@ -578,7 +619,7 @@ ACTOR Future<Void> configurationMonitor( Ratekeeper* self, Reference<AsyncVar<Se
Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
self->configuration.fromKeyValues( (VectorRef<KeyValueRef>) results );
conf->fromKeyValues( (VectorRef<KeyValueRef>) results );
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey);
wait( tr.commit() );
@ -591,21 +632,21 @@ ACTOR Future<Void> configurationMonitor( Ratekeeper* self, Reference<AsyncVar<Se
}
}
ACTOR Future<Void> rateKeeper(
Reference<AsyncVar<ServerDBInfo>> dbInfo,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
FutureStream< struct GetRateInfoRequest > getRateInfo,
double* lastLimited)
{
state Ratekeeper self;
state Future<Void> track = trackEachStorageServer( &self, serverChanges.getFuture() );
ACTOR Future<Void> rateKeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state RatekeeperData self;
state Future<Void> timeout = Void();
state std::vector<Future<Void>> actors;
state std::vector<Future<Void>> tlogTrackers;
state std::vector<TLogInterface> tlogInterfs;
state Promise<Void> err;
state Future<Void> configMonitor = configurationMonitor(&self, dbInfo);
self.lastLimited = lastLimited;
state Future<Void> collection = actorCollection( self.addActor.getFuture() );
TraceEvent("Ratekeeper_Starting", rkInterf.id());
self.addActor.send( waitFailureServer(rkInterf.waitFailure.getFuture()) );
self.addActor.send( configurationMonitor(dbInfo, &self.configuration) );
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
self.addActor.send( monitorServerListChange(dbInfo, serverChanges) );
self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
@ -617,18 +658,14 @@ ACTOR Future<Void> rateKeeper(
for( int i = 0; i < tlogInterfs.size(); i++ )
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
loop{
choose {
when (wait( track )) { break; }
try {
state bool lastLimited = false;
loop choose {
when (wait( timeout )) {
updateRate(&self, self.normalLimits);
updateRate(&self, self.batchLimits);
if(self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit) {
*self.lastLimited = now();
}
updateRate(&self, &self.normalLimits);
updateRate(&self, &self.batchLimits);
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
double tooOld = now() - 1.0;
for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) {
if (p->second.time < tooOld)
@ -638,7 +675,7 @@ ACTOR Future<Void> rateKeeper(
}
timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE);
}
when (GetRateInfoRequest req = waitNext(getRateInfo)) {
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
GetRateInfoReply reply;
auto& p = self.proxy_transactionCounts[ req.requesterID ];
@ -660,6 +697,7 @@ ACTOR Future<Void> rateKeeper(
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
reply.healthMetrics.batchLimited = lastLimited;
req.reply.send( reply );
}
@ -672,8 +710,14 @@ ACTOR Future<Void> rateKeeper(
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
}
}
when(wait(configMonitor)) {}
when ( wait(collection) ) {
ASSERT(false);
throw internal_error();
}
}
}
catch (Error& err) {
TraceEvent("Ratekeeper_Died", rkInterf.id()).error(err, true);
}
return Void();
}

View File

@ -1,35 +0,0 @@
/*
* Ratekeeper.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_RATEKEEPER_H
#define FDBSERVER_RATEKEEPER_H
#pragma once
#include "fdbserver/MasterInterface.h"
#include "fdbserver/TLogInterface.h"
#include "fdbclient/DatabaseConfiguration.h"
Future<Void> rateKeeper(
Reference<AsyncVar<struct ServerDBInfo>> const& dbInfo,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges, // actually an input, but we don't want broken_promise
FutureStream< struct GetRateInfoRequest > const& getRateInfo,
double* const& lastLimited);
#endif

View File

@ -0,0 +1,81 @@
/*
* RatekeeperInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_RATEKEEPERINTERFACE_H
#define FDBSERVER_RATEKEEPERINTERFACE_H
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
struct RatekeeperInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct GetRateInfoRequest> getRateInfo;
struct LocalityData locality;
RatekeeperInterface() {}
explicit RatekeeperInterface(const struct LocalityData& l) : locality(l) {}
void initEndpoints() {}
UID id() const { return getRateInfo.getEndpoint().token; }
NetworkAddress address() const { return getRateInfo.getEndpoint().getPrimaryAddress(); }
bool operator== (const RatekeeperInterface& r) const {
return id() == r.id();
}
bool operator!= (const RatekeeperInterface& r) const {
return !(*this == r);
}
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, getRateInfo, locality);
}
};
struct GetRateInfoRequest {
UID requesterID;
int64_t totalReleasedTransactions;
int64_t batchReleasedTransactions;
bool detailed;
ReplyPromise<struct GetRateInfoReply> reply;
GetRateInfoRequest() {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply);
}
};
struct GetRateInfoReply {
double transactionRate;
double batchTransactionRate;
double leaseDuration;
HealthMetrics healthMetrics;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics);
}
};
#endif //FDBSERVER_RATEKEEPERINTERFACE_H

View File

@ -26,6 +26,7 @@
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/LatencyBandConfig.h"
@ -39,6 +40,7 @@ struct ServerDBInfo {
ClientDBInfo client; // After a successful recovery, eventually proxies that communicate with it
Optional<DataDistributorInterface> distributor; // The best guess of current data distributor.
MasterInterface master; // The best guess as to the most recent master, which might still be recovering
Optional<RatekeeperInterface> ratekeeper;
vector<ResolverInterface> resolvers;
DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful master recovery increments it twice; unsuccessful recoveries may increment it once. Depending on where the current master is in its recovery process, this might not have been written by the current master.
RecoveryState recoveryState;
@ -55,7 +57,7 @@ struct ServerDBInfo {
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, id, clusterInterface, client, distributor, master, resolvers, recoveryCount, recoveryState, masterLifetime, logSystemConfig, priorCommittedLogServers, latencyBandConfig);
serializer(ar, id, clusterInterface, client, distributor, master, ratekeeper, resolvers, recoveryCount, recoveryState, masterLifetime, logSystemConfig, priorCommittedLogServers, latencyBandConfig);
}
};

View File

@ -1387,7 +1387,7 @@ JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transP
return perfLimit;
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::pair<WorkerInterface, ProcessClass> ddWorker,
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::pair<WorkerInterface, ProcessClass> rkWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
state JsonBuilderObject statusObj;
@ -1439,8 +1439,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Transactions
try {
state TraceEventFields ratekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
state TraceEventFields ratekeeper = wait( timeoutError(rkWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(rkWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
double tpsLimit = ratekeeper.getDouble("TPSLimit");
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
@ -1816,6 +1816,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state std::set<std::string> status_incomplete_reasons;
state std::pair<WorkerInterface, ProcessClass> mWorker;
state std::pair<WorkerInterface, ProcessClass> ddWorker; // DataDistributor worker
state std::pair<WorkerInterface, ProcessClass> rkWorker; // RateKeeper worker
try {
// Get the master Worker interface
@ -1837,6 +1838,18 @@ ACTOR Future<StatusReply> clusterGetStatus(
ddWorker = _ddWorker.get();
}
// Get the RateKeeper worker interface
Optional<std::pair<WorkerInterface, ProcessClass>> _rkWorker;
if (db->get().ratekeeper.present()) {
_rkWorker = getWorker( workers, db->get().ratekeeper.get().address() );
}
if (!db->get().ratekeeper.present() || !_rkWorker.present()) {
messages.push_back(JsonString::makeMessage("unreachable_ratekeeper_worker", "Unable to locate the ratekeeper worker."));
} else {
rkWorker = _rkWorker.get();
}
// Get latest events for various event types from ALL workers
// WorkerEvents is a map of worker's NetworkAddress to its event string
// The pair represents worker responses and a set of worker NetworkAddress strings which did not respond
@ -1940,7 +1953,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state int minReplicasRemaining = -1;
std::vector<Future<JsonBuilderObject>> futures2;
futures2.push_back(dataStatusFetcher(ddWorker, &minReplicasRemaining));
futures2.push_back(workloadStatusFetcher(db, workers, mWorker, ddWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
futures2.push_back(workloadStatusFetcher(db, workers, mWorker, rkWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons));
futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons));

View File

@ -28,6 +28,7 @@
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/ResolverInterface.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/TesterInterface.actor.h"
@ -46,6 +47,7 @@ struct WorkerInterface {
RequestStream< struct RecruitMasterRequest > master;
RequestStream< struct InitializeMasterProxyRequest > masterProxy;
RequestStream< struct InitializeDataDistributorRequest > dataDistributor;
RequestStream< struct InitializeRatekeeperRequest > ratekeeper;
RequestStream< struct InitializeResolverRequest > resolver;
RequestStream< struct InitializeStorageRequest > storage;
RequestStream< struct InitializeLogRouterRequest > logRouter;
@ -68,7 +70,7 @@ struct WorkerInterface {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest);
serializer(ar, clientInterface, locality, tLog, master, masterProxy, dataDistributor, ratekeeper, resolver, storage, logRouter, debugPing, coordinationPing, waitFailure, setMetricsRate, eventLogRequest, traceBatchDumpRequest, testerInterface, diskStoreRequest);
}
};
@ -145,12 +147,26 @@ struct InitializeDataDistributorRequest {
UID reqId;
ReplyPromise<DataDistributorInterface> reply;
InitializeDataDistributorRequest() {}
explicit InitializeDataDistributorRequest(UID uid) : reqId(uid) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, reqId, reply);
}
};
struct InitializeRatekeeperRequest {
UID reqId;
ReplyPromise<RatekeeperInterface> reply;
InitializeRatekeeperRequest() {}
explicit InitializeRatekeeperRequest(UID uid) : reqId(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reqId, reply);
}
};
struct InitializeResolverRequest {
uint64_t recoveryCount;
int proxyCount;
@ -300,6 +316,7 @@ struct Role {
static const Role TESTER;
static const Role LOG_ROUTER;
static const Role DATA_DISTRIBUTOR;
static const Role RATE_KEEPER;
std::string roleName;
std::string abbreviation;
@ -361,6 +378,7 @@ ACTOR Future<Void> resolver(ResolverInterface proxy, InitializeResolverRequest i
ACTOR Future<Void> logRouter(TLogInterface interf, InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> rateKeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo>> db);
void registerThreadForProfiling();
void updateCpuProfiler(ProfilerRequest req);

View File

@ -191,7 +191,7 @@
<EnableCompile>false</EnableCompile>
</ActorCompiler>
<ClInclude Include="QuietDatabase.h" />
<ClInclude Include="Ratekeeper.h" />
<ClInclude Include="RatekeeperInterface.h" />
<ClInclude Include="RecoveryState.h" />
<ClInclude Include="ResolverInterface.h" />
<ClInclude Include="RestoreInterface.h" />

View File

@ -310,6 +310,7 @@
<ItemGroup>
<ClInclude Include="ConflictSet.h" />
<ClInclude Include="DataDistribution.actor.h" />
<ClInclude Include="DataDistributorInterface.h" />
<ClInclude Include="MoveKeys.actor.h" />
<ClInclude Include="pubsub.h" />
<ClInclude Include="Knobs.h" />
@ -343,7 +344,7 @@
</ClInclude>
<ClInclude Include="LeaderElection.h" />
<ClInclude Include="StorageMetrics.h" />
<ClInclude Include="Ratekeeper.h" />
<ClInclude Include="RatekeeperInterface.h" />
<ClInclude Include="Status.h" />
<ClInclude Include="IDiskQueue.h" />
<ClInclude Include="CoroFlow.h" />

View File

@ -31,7 +31,6 @@
#include <iterator>
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/Ratekeeper.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/CoordinatedState.h"

View File

@ -349,14 +349,15 @@ ACTOR Future<Void> registrationClient(
WorkerInterface interf,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf) {
Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply (requiring us to re-register)
// The registration request piggybacks optional distributor interface if it exists.
state Generation requestGeneration = 0;
state ProcessClass processClass = initialClass;
loop {
RegisterWorkerRequest request(interf, initialClass, processClass, asyncPriorityInfo->get(), requestGeneration++, ddInterf->get());
RegisterWorkerRequest request(interf, initialClass, processClass, asyncPriorityInfo->get(), requestGeneration++, ddInterf->get(), rkInterf->get());
Future<RegisterWorkerReply> registrationReply = ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().registerWorker.getReply(request) ) : Never();
choose {
when ( RegisterWorkerReply reply = wait( registrationReply )) {
@ -365,6 +366,7 @@ ACTOR Future<Void> registrationClient(
}
when ( wait( ccInterface->onChange() )) { }
when ( wait( ddInterf->onChange() ) ) {}
when ( wait( rkInterf->onChange() ) ) {}
}
}
}
@ -610,6 +612,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, ProcessClass initialClass, std::string folder, int64_t memoryLimit, std::string metricsConnFile, std::string metricsPrefix, Promise<Void> recoveredDiskFiles) {
state PromiseStream< ErrorInfo > errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf( new AsyncVar<Optional<DataDistributorInterface>>() );
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf( new AsyncVar<Optional<RatekeeperInterface>>() );
state Future<Void> handleErrors = workerHandleErrors( errors.getFuture() ); // Needs to be stopped last
state ActorCollection errorForwarders(false);
state Future<Void> loggingTrigger = Void();
@ -756,7 +759,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
wait(waitForAll(recoveries));
recoveredDiskFiles.send(Void());
errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf ) );
errorForwarders.add( registrationClient( ccInterface, interf, asyncPriorityInfo, initialClass, ddInterf, rkInterf ) );
TraceEvent("RecoveriesComplete", interf.id());
@ -829,6 +832,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
TEST(true); // Recruited while already a data distributor.
} else {
startRole( Role::DATA_DISTRIBUTOR, recruited.id(), interf.id() );
DUMPTOKEN( recruited.waitFailure );
Future<Void> dataDistributorProcess = dataDistributor( recruited, dbInfo );
errorForwarders.add( forwardError( errors, Role::DATA_DISTRIBUTOR, recruited.id(), setWhenDoneOrError( dataDistributorProcess, ddInterf, Optional<DataDistributorInterface>() ) ) );
@ -837,6 +841,25 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
TraceEvent("DataDistributorReceived", req.reqId).detail("DataDistributorId", recruited.id());
req.reply.send(recruited);
}
when ( InitializeRatekeeperRequest req = waitNext(interf.ratekeeper.getFuture()) ) {
RatekeeperInterface recruited(locality);
recruited.initEndpoints();
if (rkInterf->get().present()) {
recruited = rkInterf->get().get();
TEST(true); // Recruited while already a ratekeeper.
} else {
startRole(Role::RATE_KEEPER, recruited.id(), interf.id());
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.getRateInfo );
Future<Void> ratekeeper = rateKeeper( recruited, dbInfo );
errorForwarders.add( forwardError( errors, Role::RATE_KEEPER, recruited.id(), setWhenDoneOrError( ratekeeper, rkInterf, Optional<RatekeeperInterface>() ) ) );
rkInterf->set(Optional<RatekeeperInterface>(recruited));
}
TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id());
req.reply.send(recruited);
}
when( InitializeTLogRequest req = waitNext(interf.tLog.getFuture()) ) {
// For now, there's a one-to-one mapping of spill type to TLogVersion.
// With future work, a particular version of the TLog can support multiple
@ -1244,3 +1267,4 @@ const Role Role::CLUSTER_CONTROLLER("ClusterController", "CC");
const Role Role::TESTER("Tester", "TS");
const Role Role::LOG_ROUTER("LogRouter", "LR");
const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD");
const Role Role::RATE_KEEPER("RateKeeper", "RK");

View File

@ -67,6 +67,7 @@ enum {
TaskUnknownEndpoint = 4000,
TaskMoveKeys = 3550,
TaskDataDistributionLaunch = 3530,
TaskRateKeeper = 3510,
TaskDataDistribution = 3500,
TaskDiskWrite = 3010,
TaskUpdateStorage = 3000,