Track health metrics in Ratekeeper and send these metrics to proxies in GetRateInfoReply messages
This commit is contained in:
parent
d7930af2cb
commit
5822bd65bf
|
@ -333,6 +333,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( SMOOTHING_AMOUNT, 1.0 ); if( slowRateKeeper ) SMOOTHING_AMOUNT = 5.0;
|
||||
init( SLOW_SMOOTHING_AMOUNT, 10.0 ); if( slowRateKeeper ) SLOW_SMOOTHING_AMOUNT = 50.0;
|
||||
init( METRIC_UPDATE_RATE, .1 ); if( slowRateKeeper ) METRIC_UPDATE_RATE = 0.5;
|
||||
init( DETAILED_METRIC_UPDATE_RATE, 5.0 );
|
||||
|
||||
bool smallStorageTarget = randomize && BUGGIFY;
|
||||
init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3;
|
||||
|
|
|
@ -271,6 +271,7 @@ public:
|
|||
double SMOOTHING_AMOUNT;
|
||||
double SLOW_SMOOTHING_AMOUNT;
|
||||
double METRIC_UPDATE_RATE;
|
||||
double DETAILED_METRIC_UPDATE_RATE;
|
||||
double LAST_LIMITED_RATIO;
|
||||
|
||||
int64_t TARGET_BYTES_PER_STORAGE_SERVER;
|
||||
|
|
|
@ -55,23 +55,28 @@ struct GetRateInfoRequest {
|
|||
UID requesterID;
|
||||
int64_t totalReleasedTransactions;
|
||||
ReplyPromise<struct GetRateInfoReply> reply;
|
||||
bool detailed;
|
||||
|
||||
GetRateInfoRequest() {}
|
||||
GetRateInfoRequest( UID const& requesterID, int64_t totalReleasedTransactions ) : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions) {}
|
||||
GetRateInfoRequest( UID const& requesterID, int64_t totalReleasedTransactions, bool detailed )
|
||||
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), detailed(detailed) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, requesterID, totalReleasedTransactions, reply);
|
||||
serializer(ar, requesterID, totalReleasedTransactions, reply, detailed);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetRateInfoReply {
|
||||
double transactionRate;
|
||||
double leaseDuration;
|
||||
double detailedLeaseDuration;
|
||||
HealthMetrics healthMetrics;
|
||||
bool detailed;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, transactionRate, leaseDuration);
|
||||
serializer(ar, transactionRate, leaseDuration, detailedLeaseDuration, healthMetrics, detailed);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ ACTOR Future<Void> getRate(UID myID, MasterInterface master, int64_t* inTransact
|
|||
loop choose{
|
||||
when(wait(nextRequestTimer)) {
|
||||
nextRequestTimer = Never();
|
||||
reply = brokenPromiseToNever(master.getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount)));
|
||||
reply = brokenPromiseToNever(master.getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, false)));
|
||||
}
|
||||
when(GetRateInfoReply rep = wait(reply)) {
|
||||
reply = Never();
|
||||
|
|
|
@ -81,13 +81,13 @@ struct StorageQueueInfo {
|
|||
StorageQueuingMetricsReply lastReply;
|
||||
StorageQueuingMetricsReply prevReply;
|
||||
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
|
||||
Smoother smoothDurableVersion, smoothLatestVersion;
|
||||
Smoother smoothDurableVersion, smoothLatestVersion, smoothDesiredOldestVersion;
|
||||
Smoother smoothFreeSpace;
|
||||
Smoother smoothTotalSpace;
|
||||
limitReason_t limitReason;
|
||||
StorageQueueInfo(UID id, LocalityData locality) : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
smoothDurableVersion(1.), smoothLatestVersion(1.), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothDurableVersion(1.), smoothLatestVersion(1.), smoothDesiredOldestVersion(1.), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited)
|
||||
{
|
||||
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
|
||||
|
@ -117,6 +117,7 @@ struct Ratekeeper {
|
|||
std::map<UID, std::pair<int64_t, double> > proxy_transactionCountAndTime;
|
||||
Smoother smoothReleasedTransactions, smoothTotalDurableBytes;
|
||||
double TPSLimit;
|
||||
HealthMetrics healthMetrics;
|
||||
DatabaseConfiguration configuration;
|
||||
|
||||
Int64MetricHandle tpsLimitMetric;
|
||||
|
@ -151,6 +152,8 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
|
|||
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
|
||||
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
|
||||
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
|
||||
myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion);
|
||||
myQueueInfo->value.smoothDesiredOldestVersion.reset(reply.get().desiredOldestVersion);
|
||||
} else {
|
||||
self->smoothTotalDurableBytes.addDelta( reply.get().bytesDurable - myQueueInfo->value.prevReply.bytesDurable );
|
||||
myQueueInfo->value.smoothDurableBytes.setTotal( reply.get().bytesDurable );
|
||||
|
@ -158,6 +161,8 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
|
|||
myQueueInfo->value.smoothInputBytes.setTotal( reply.get().bytesInput );
|
||||
myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available );
|
||||
myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total );
|
||||
myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion);
|
||||
myQueueInfo->value.smoothDesiredOldestVersion.setTotal(reply.get().desiredOldestVersion);
|
||||
}
|
||||
} else {
|
||||
if(myQueueInfo->value.valid) {
|
||||
|
@ -263,6 +268,7 @@ void updateRate( Ratekeeper* self ) {
|
|||
|
||||
int64_t worstFreeSpaceStorageServer = std::numeric_limits<int64_t>::max();
|
||||
int64_t worstStorageQueueStorageServer = 0;
|
||||
int64_t worstStorageNDVStorageServer = 0;
|
||||
int64_t limitingStorageQueueStorageServer = 0;
|
||||
|
||||
std::multimap<double, StorageQueueInfo*> storageTPSLimitReverseIndex;
|
||||
|
@ -290,7 +296,13 @@ void updateRate( Ratekeeper* self ) {
|
|||
}
|
||||
|
||||
int64_t storageQueue = ss.lastReply.bytesInput - ss.smoothDurableBytes.smoothTotal();
|
||||
self->healthMetrics.storageQueue[ss.id] = storageQueue;
|
||||
worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue);
|
||||
|
||||
int64_t storageNDV = ss.smoothDesiredOldestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
|
||||
self->healthMetrics.storageNDV[ss.id] = storageNDV;
|
||||
worstStorageNDVStorageServer = std::max(worstStorageNDVStorageServer, storageNDV);
|
||||
|
||||
int64_t b = storageQueue - targetBytes;
|
||||
double targetRateRatio = std::min(( b + springBytes ) / (double)springBytes, 2.0);
|
||||
|
||||
|
@ -341,6 +353,8 @@ void updateRate( Ratekeeper* self ) {
|
|||
}
|
||||
}
|
||||
|
||||
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
|
||||
|
||||
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
|
||||
for(auto ss = storageTPSLimitReverseIndex.begin(); ss != storageTPSLimitReverseIndex.end() && ss->first < self->TPSLimit; ++ss) {
|
||||
if(ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
|
||||
|
@ -416,6 +430,7 @@ void updateRate( Ratekeeper* self ) {
|
|||
}
|
||||
|
||||
int64_t queue = tl.lastReply.bytesInput - tl.smoothDurableBytes.smoothTotal();
|
||||
self->healthMetrics.tLogQueue[tl.id] = queue;
|
||||
int64_t b = queue - targetBytes;
|
||||
worstStorageQueueTLog = std::max(worstStorageQueueTLog, queue);
|
||||
|
||||
|
@ -462,6 +477,8 @@ void updateRate( Ratekeeper* self ) {
|
|||
}
|
||||
}
|
||||
|
||||
self->healthMetrics.worstTLogQueue = worstStorageQueueTLog;
|
||||
|
||||
self->TPSLimit = std::max(self->TPSLimit, 0.0);
|
||||
|
||||
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
|
||||
|
@ -559,6 +576,21 @@ ACTOR Future<Void> rateKeeper(
|
|||
|
||||
reply.transactionRate = self.TPSLimit / self.proxy_transactionCountAndTime.size();
|
||||
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
|
||||
reply.detailedLeaseDuration = 5.0; // knob
|
||||
|
||||
reply.healthMetrics.update(self.healthMetrics, false, false);
|
||||
reply.healthMetrics.tpsLimit = self.TPSLimit;
|
||||
reply.detailed = req.detailed;
|
||||
if (req.detailed) {
|
||||
for (const auto &s : self.storageQueueInfo) {
|
||||
reply.healthMetrics.cpuUsage[s.key] = s.value.lastReply.cpuUsage;
|
||||
reply.healthMetrics.diskUsage[s.key] = s.value.lastReply.diskUsage;
|
||||
}
|
||||
reply.healthMetrics.storageQueue = self.healthMetrics.storageQueue;
|
||||
reply.healthMetrics.storageNDV = self.healthMetrics.storageNDV;
|
||||
reply.healthMetrics.tLogQueue = self.healthMetrics.tLogQueue;
|
||||
}
|
||||
|
||||
req.reply.send( reply );
|
||||
}
|
||||
when (wait(err.getFuture())) {}
|
||||
|
|
Loading…
Reference in New Issue