Update status schema for correctness. Send the count of batch transactions started back to ratekeeper so that it can be logged with other ratekeeper metrics.
This commit is contained in:
parent
eb629d87a5
commit
3e6a6a6569
|
@ -230,6 +230,25 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
},
|
||||
"qos":{
|
||||
"worst_queue_bytes_log_server":460,
|
||||
"batch_performance_limited_by":{
|
||||
"reason_server_id":"7f8d623d0cb9966e",
|
||||
"reason_id":0,
|
||||
"name":{
|
||||
"$enum":[
|
||||
"workload",
|
||||
"storage_server_write_queue_size",
|
||||
"storage_server_write_bandwidth_mvcc",
|
||||
"storage_server_readable_behind",
|
||||
"log_server_mvcc_write_bandwidth",
|
||||
"log_server_write_queue",
|
||||
"storage_server_min_free_space",
|
||||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio"
|
||||
]
|
||||
},
|
||||
"description":"The database is not being saturated by the workload."
|
||||
},
|
||||
"performance_limited_by":{
|
||||
"reason_server_id":"7f8d623d0cb9966e",
|
||||
"reason_id":0,
|
||||
|
@ -249,7 +268,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
},
|
||||
"description":"The database is not being saturated by the workload."
|
||||
},
|
||||
"batch_transactions_per_second_limit":0,
|
||||
"transactions_per_second_limit":0,
|
||||
"batch_released_transactions_per_second":0,
|
||||
"released_transactions_per_second":0,
|
||||
"limiting_queue_bytes_storage_server":0,
|
||||
"worst_queue_bytes_storage_server":0,
|
||||
|
|
|
@ -51,14 +51,15 @@ struct DataDistributorInterface {
|
|||
struct GetRateInfoRequest {
|
||||
UID requesterID;
|
||||
int64_t totalReleasedTransactions;
|
||||
int64_t batchReleasedTransactions;
|
||||
ReplyPromise<struct GetRateInfoReply> reply;
|
||||
|
||||
GetRateInfoRequest() {}
|
||||
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions) : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions) {}
|
||||
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions) : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, requesterID, totalReleasedTransactions, reply);
|
||||
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ Future<Void> forwardValue(Promise<T> out, Future<T> in)
|
|||
|
||||
int getBytes(Promise<Version> const& r) { return 0; }
|
||||
|
||||
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate, double* outBatchTransactionRate) {
|
||||
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate, double* outBatchTransactionRate) {
|
||||
state Future<Void> nextRequestTimer = Never();
|
||||
state Future<Void> leaseTimeout = Never();
|
||||
state Future<GetRateInfoReply> reply = Never();
|
||||
|
@ -111,7 +111,7 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
|
|||
}
|
||||
when ( wait( nextRequestTimer ) ) {
|
||||
nextRequestTimer = Never();
|
||||
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount)));
|
||||
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount)));
|
||||
}
|
||||
when ( GetRateInfoReply rep = wait(reply) ) {
|
||||
reply = Never();
|
||||
|
@ -1127,6 +1127,7 @@ ACTOR static Future<Void> transactionStarter(
|
|||
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
|
||||
|
||||
state int64_t transactionCount = 0;
|
||||
state int64_t batchTransactionCount = 0;
|
||||
state TransactionRateInfo normalRateInfo(10);
|
||||
state TransactionRateInfo batchRateInfo(0);
|
||||
|
||||
|
@ -1134,7 +1135,7 @@ ACTOR static Future<Void> transactionStarter(
|
|||
state vector<MasterProxyInterface> otherProxies;
|
||||
|
||||
state PromiseStream<double> replyTimes;
|
||||
addActor.send(getRate(proxy.id(), db, &transactionCount, &normalRateInfo.rate, &batchRateInfo.rate));
|
||||
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate));
|
||||
addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
|
||||
|
||||
// Get a list of the other proxies that go together with us
|
||||
|
@ -1216,6 +1217,7 @@ ACTOR static Future<Void> transactionStarter(
|
|||
.detail("LastBatchLeftToStart", batchLeftToStart);*/
|
||||
|
||||
transactionCount += transactionsStarted[0] + transactionsStarted[1];
|
||||
batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
|
||||
|
||||
normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
|
||||
batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
|
||||
|
|
|
@ -137,11 +137,18 @@ struct RatekeeperLimits {
|
|||
{}
|
||||
};
|
||||
|
||||
struct TransactionCounts {
|
||||
int64_t total;
|
||||
int64_t batch;
|
||||
|
||||
TransactionCount() : total(0), batch(0) {}
|
||||
};
|
||||
|
||||
struct Ratekeeper {
|
||||
Map<UID, StorageQueueInfo> storageQueueInfo;
|
||||
Map<UID, TLogQueueInfo> tlogQueueInfo;
|
||||
std::map<UID, std::pair<int64_t, double> > proxy_transactionCountAndTime;
|
||||
Smoother smoothReleasedTransactions, smoothTotalDurableBytes;
|
||||
std::map<UID, std::pair<TransactionCounts, double> > proxy_transactionCountAndTime;
|
||||
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
|
||||
DatabaseConfiguration configuration;
|
||||
|
||||
Int64MetricHandle actualTpsMetric;
|
||||
|
@ -152,7 +159,7 @@ struct Ratekeeper {
|
|||
RatekeeperLimits normalLimits;
|
||||
RatekeeperLimits batchLimits;
|
||||
|
||||
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
||||
lastWarning(0),
|
||||
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE),
|
||||
|
@ -521,6 +528,7 @@ void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
|
|||
.detail("Reason", limitReason)
|
||||
.detail("ReasonServerID", reasonID)
|
||||
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
|
||||
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
|
||||
.detail("TPSBasis", actualTps)
|
||||
.detail("StorageServers", sscount)
|
||||
.detail("Proxies", self->proxy_transactionCountAndTime.size())
|
||||
|
@ -614,10 +622,15 @@ ACTOR Future<Void> rateKeeper(
|
|||
|
||||
auto& p = self.proxy_transactionCountAndTime[ req.requesterID ];
|
||||
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
|
||||
if (p.first > 0)
|
||||
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.first );
|
||||
if (p.first.total > 0) {
|
||||
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.first.total );
|
||||
}
|
||||
if(p.first.batch > 0) {
|
||||
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.first.batch );
|
||||
}
|
||||
|
||||
p.first = req.totalReleasedTransactions;
|
||||
p.first.total = req.totalReleasedTransactions;
|
||||
p.first.batch = req.batchReleasedTransactions;
|
||||
p.second = now();
|
||||
|
||||
reply.transactionRate = self.normalLimits.tpsLimit / self.proxy_transactionCountAndTime.size();
|
||||
|
|
|
@ -1439,6 +1439,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
double tpsLimit = ratekeeper.getDouble("TPSLimit");
|
||||
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
|
||||
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
|
||||
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
|
||||
int ssCount = ratekeeper.getInt("StorageServers");
|
||||
int tlogCount = ratekeeper.getInt("TLogs");
|
||||
int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
|
||||
|
@ -1461,6 +1462,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
(*qos)["transactions_per_second_limit"] = tpsLimit;
|
||||
(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
|
||||
(*qos)["released_transactions_per_second"] = transPerSec;
|
||||
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
|
||||
|
||||
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
|
||||
if(!perfLimit.empty()) {
|
||||
|
|
Loading…
Reference in New Issue