Merge pull request #3653 from etschannen/feature-proxy-busy-loadbalance
Changed proxy load balancing to balance on CPU usage
This commit is contained in:
commit
5c97461d18
|
@ -191,7 +191,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
serializer(ar, BasicLoadBalancedReply::recentRequests, version, locked, metadataVersion, tagThrottleInfo);
|
serializer(ar, BasicLoadBalancedReply::processBusyTime, version, locked, metadataVersion, tagThrottleInfo);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -458,8 +458,8 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||||
// Subclasses must initialize all members in their default constructors
|
// Subclasses must initialize all members in their default constructors
|
||||||
// Subclasses must serialize all members
|
// Subclasses must serialize all members
|
||||||
struct BasicLoadBalancedReply {
|
struct BasicLoadBalancedReply {
|
||||||
int recentRequests;
|
int processBusyTime;
|
||||||
BasicLoadBalancedReply() : recentRequests(0) {}
|
BasicLoadBalancedReply() : processBusyTime(0) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const BasicLoadBalancedReply *reply);
|
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const BasicLoadBalancedReply *reply);
|
||||||
|
@ -528,7 +528,7 @@ Future< REPLY_TYPE(Request) > basicLoadBalance(
|
||||||
if(result.present()) {
|
if(result.present()) {
|
||||||
Optional<BasicLoadBalancedReply> loadBalancedReply = getBasicLoadBalancedReply(&result.get());
|
Optional<BasicLoadBalancedReply> loadBalancedReply = getBasicLoadBalancedReply(&result.get());
|
||||||
if(loadBalancedReply.present()) {
|
if(loadBalancedReply.present()) {
|
||||||
alternatives->updateRecent( useAlt, loadBalancedReply.get().recentRequests );
|
alternatives->updateRecent( useAlt, loadBalancedReply.get().processBusyTime );
|
||||||
}
|
}
|
||||||
|
|
||||||
return result.get();
|
return result.get();
|
||||||
|
|
|
@ -62,10 +62,10 @@ struct AlternativeInfo {
|
||||||
T interf;
|
T interf;
|
||||||
double probability;
|
double probability;
|
||||||
double cumulativeProbability;
|
double cumulativeProbability;
|
||||||
int recentRequests;
|
int processBusyTime;
|
||||||
double lastUpdate;
|
double lastUpdate;
|
||||||
|
|
||||||
AlternativeInfo(T const& interf, double probability, double cumulativeProbability) : interf(interf), probability(probability), cumulativeProbability(cumulativeProbability), recentRequests(-1), lastUpdate(0) {}
|
AlternativeInfo(T const& interf, double probability, double cumulativeProbability) : interf(interf), probability(probability), cumulativeProbability(cumulativeProbability), processBusyTime(-1), lastUpdate(0) {}
|
||||||
|
|
||||||
bool operator < (double const& r) const {
|
bool operator < (double const& r) const {
|
||||||
return cumulativeProbability < r;
|
return cumulativeProbability < r;
|
||||||
|
@ -100,26 +100,28 @@ public:
|
||||||
return std::lower_bound( alternatives.begin(), alternatives.end(), deterministicRandom()->random01() ) - alternatives.begin();
|
return std::lower_bound( alternatives.begin(), alternatives.end(), deterministicRandom()->random01() ) - alternatives.begin();
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateRecent( int index, int recentRequests ) {
|
void updateRecent( int index, int processBusyTime ) {
|
||||||
alternatives[index].recentRequests = recentRequests;
|
alternatives[index].processBusyTime = processBusyTime;
|
||||||
alternatives[index].lastUpdate = now();
|
alternatives[index].lastUpdate = now();
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateProbabilities() {
|
void updateProbabilities() {
|
||||||
double totalRequests = 0;
|
double totalBusyTime = 0;
|
||||||
for(auto& it : alternatives) {
|
for(auto& it : alternatives) {
|
||||||
totalRequests += it.recentRequests;
|
totalBusyTime += it.processBusyTime;
|
||||||
if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) {
|
if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(totalRequests < 1000) {
|
|
||||||
|
//Do not update probabilities if the average proxy busyness is less than 5%
|
||||||
|
if(totalBusyTime < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_AMOUNT*alternatives.size()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
double totalProbability = 0;
|
double totalProbability = 0;
|
||||||
for(auto& it : alternatives) {
|
for(auto& it : alternatives) {
|
||||||
it.probability += (1.0/alternatives.size()-(it.recentRequests/totalRequests))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
|
it.probability += (1.0/alternatives.size()-(it.processBusyTime/totalBusyTime))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
|
||||||
it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size()));
|
it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size()));
|
||||||
it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size());
|
it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size());
|
||||||
totalProbability += it.probability;
|
totalProbability += it.probability;
|
||||||
|
|
|
@ -96,33 +96,8 @@ struct ProxyStats {
|
||||||
|
|
||||||
Future<Void> logger;
|
Future<Void> logger;
|
||||||
|
|
||||||
int recentRequests;
|
|
||||||
Deque<int> requestBuckets;
|
|
||||||
double lastBucketBegin;
|
|
||||||
double bucketInterval;
|
|
||||||
|
|
||||||
void updateRequestBuckets() {
|
|
||||||
while(now() - lastBucketBegin > bucketInterval) {
|
|
||||||
lastBucketBegin += bucketInterval;
|
|
||||||
recentRequests -= requestBuckets.front();
|
|
||||||
requestBuckets.pop_front();
|
|
||||||
requestBuckets.push_back(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void addRequest() {
|
|
||||||
updateRequestBuckets();
|
|
||||||
++recentRequests;
|
|
||||||
++requestBuckets.back();
|
|
||||||
}
|
|
||||||
|
|
||||||
int getRecentRequests() {
|
|
||||||
updateRequestBuckets();
|
|
||||||
return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now()));
|
|
||||||
}
|
|
||||||
|
|
||||||
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
|
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
|
||||||
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
|
: cc("ProxyStats", id.toString()),
|
||||||
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
|
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
|
||||||
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
|
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
|
||||||
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
|
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
|
||||||
|
@ -148,9 +123,6 @@ struct ProxyStats {
|
||||||
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
|
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
|
||||||
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
|
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
|
||||||
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
|
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
|
||||||
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
|
|
||||||
requestBuckets.push_back(0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -310,7 +282,6 @@ ACTOR Future<Void> queueTransactionStartRequests(
|
||||||
loop choose{
|
loop choose{
|
||||||
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
|
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
|
||||||
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
||||||
stats->addRequest();
|
|
||||||
if( stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() > SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE ) {
|
if( stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() > SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE ) {
|
||||||
++stats->txnRequestErrors;
|
++stats->txnRequestErrors;
|
||||||
//FIXME: send an error instead of giving an unreadable version when the client can support the error: req.reply.sendError(proxy_memory_limit_exceeded());
|
//FIXME: send an error instead of giving an unreadable version when the client can support the error: req.reply.sendError(proxy_memory_limit_exceeded());
|
||||||
|
@ -629,7 +600,6 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
|
||||||
choose{
|
choose{
|
||||||
when(CommitTransactionRequest req = waitNext(in)) {
|
when(CommitTransactionRequest req = waitNext(in)) {
|
||||||
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
||||||
commitData->stats.addRequest();
|
|
||||||
int bytes = getBytes(req);
|
int bytes = getBytes(req);
|
||||||
|
|
||||||
// Drop requests if memory is under severe pressure
|
// Drop requests if memory is under severe pressure
|
||||||
|
@ -1439,7 +1409,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
||||||
rep = v;
|
rep = v;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rep.recentRequests = commitData->stats.getRecentRequests();
|
rep.processBusyTime = 1e6 * (g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness);
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present()) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
|
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
|
||||||
|
@ -1710,7 +1680,6 @@ ACTOR static Future<Void> readRequestServer( MasterProxyInterface proxy, Promise
|
||||||
loop {
|
loop {
|
||||||
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
|
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
|
||||||
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
||||||
commitData->stats.addRequest();
|
|
||||||
if(req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && //Always do data distribution requests
|
if(req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && //Always do data distribution requests
|
||||||
commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() > SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) {
|
commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() > SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) {
|
||||||
++commitData->stats.keyServerLocationErrors;
|
++commitData->stats.keyServerLocationErrors;
|
||||||
|
|
|
@ -211,10 +211,10 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
|
||||||
init( FUTURE_VERSION_BACKOFF_GROWTH, 2.0 );
|
init( FUTURE_VERSION_BACKOFF_GROWTH, 2.0 );
|
||||||
init( LOAD_BALANCE_MAX_BAD_OPTIONS, 1 ); //should be the same as MAX_MACHINES_FALLING_BEHIND
|
init( LOAD_BALANCE_MAX_BAD_OPTIONS, 1 ); //should be the same as MAX_MACHINES_FALLING_BEHIND
|
||||||
init( LOAD_BALANCE_PENALTY_IS_BAD, true );
|
init( LOAD_BALANCE_PENALTY_IS_BAD, true );
|
||||||
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 2.0 );
|
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 10.0 ); //should be longer than the rate we log network metrics
|
||||||
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.05 );
|
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.10 );
|
||||||
init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 );
|
init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 );
|
||||||
init( BASIC_LOAD_BALANCE_BUCKETS, 40 );
|
init( BASIC_LOAD_BALANCE_MIN_AMOUNT, 50000 ); //Will not update probabilities if the average proxy busyness is less than 5%
|
||||||
|
|
||||||
// Health Monitor
|
// Health Monitor
|
||||||
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;
|
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;
|
||||||
|
|
|
@ -233,7 +233,7 @@ public:
|
||||||
double BASIC_LOAD_BALANCE_UPDATE_RATE;
|
double BASIC_LOAD_BALANCE_UPDATE_RATE;
|
||||||
double BASIC_LOAD_BALANCE_MAX_CHANGE;
|
double BASIC_LOAD_BALANCE_MAX_CHANGE;
|
||||||
double BASIC_LOAD_BALANCE_MAX_PROB;
|
double BASIC_LOAD_BALANCE_MAX_PROB;
|
||||||
int BASIC_LOAD_BALANCE_BUCKETS;
|
double BASIC_LOAD_BALANCE_MIN_AMOUNT;
|
||||||
|
|
||||||
// Health Monitor
|
// Health Monitor
|
||||||
int FAILURE_DETECTION_DELAY;
|
int FAILURE_DETECTION_DELAY;
|
||||||
|
|
|
@ -166,6 +166,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
|
||||||
n.detail(format("PriorityBusy%d", itr.first).c_str(), itr.second);
|
n.detail(format("PriorityBusy%d", itr.first).c_str(), itr.second);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool firstTracker = true;
|
||||||
for (auto &itr : g_network->networkInfo.metrics.starvationTrackers) {
|
for (auto &itr : g_network->networkInfo.metrics.starvationTrackers) {
|
||||||
if(itr.active) {
|
if(itr.active) {
|
||||||
itr.duration += now() - itr.windowedTimer;
|
itr.duration += now() - itr.windowedTimer;
|
||||||
|
@ -176,6 +177,11 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
|
||||||
n.detail(format("PriorityStarvedBelow%d", itr.priority).c_str(), std::min(currentStats.elapsed, itr.duration));
|
n.detail(format("PriorityStarvedBelow%d", itr.priority).c_str(), std::min(currentStats.elapsed, itr.duration));
|
||||||
n.detail(format("PriorityMaxStarvedBelow%d", itr.priority).c_str(), itr.maxDuration);
|
n.detail(format("PriorityMaxStarvedBelow%d", itr.priority).c_str(), itr.maxDuration);
|
||||||
|
|
||||||
|
if(firstTracker) {
|
||||||
|
g_network->networkInfo.metrics.lastRunLoopBusyness = std::min(currentStats.elapsed, itr.duration)/currentStats.elapsed;
|
||||||
|
firstTracker = false;
|
||||||
|
}
|
||||||
|
|
||||||
itr.duration = 0;
|
itr.duration = 0;
|
||||||
itr.maxDuration = 0;
|
itr.maxDuration = 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -336,6 +336,7 @@ struct NetworkMetrics {
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
|
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
|
||||||
|
double lastRunLoopBusyness;
|
||||||
std::vector<struct PriorityStats> starvationTrackers;
|
std::vector<struct PriorityStats> starvationTrackers;
|
||||||
|
|
||||||
static const std::vector<int> starvationBins;
|
static const std::vector<int> starvationBins;
|
||||||
|
|
Loading…
Reference in New Issue