Merge pull request #1198 from ajbeamon/ratekeeper-batch-priority-limits

Ratekeeper Batch Priority Limits
This commit is contained in:
Evan Tschannen 2019-03-01 15:03:49 -08:00 committed by GitHub
commit b209cd45e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 419 additions and 191 deletions

View File

@ -13,6 +13,7 @@ Improved replication mechanism, a new hierarchical replication technique that fu
* Get read version, read, and commit requests are counted and aggregated by server-side latency in configurable latency bands and output in JSON status. `(PR #1084) <https://github.com/apple/foundationdb/pull/1084>`_
* Added configuration option to choose log spilling implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
* Added configuration option to choose log system implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
* Batch priority transactions are now limited separately by ratekeeper and will be throttled at lower levels of cluster saturation. This makes it possible to run a more intense background load at saturation without significantly affecting normal priority transactions. It is still recommended not to run excessive loads at batch priority. `(PR #1198) <https://github.com/apple/foundationdb/pull/1198>`_
Performance
-----------

View File

@ -230,6 +230,25 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"qos":{
"worst_queue_bytes_log_server":460,
"batch_performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
"name":{
"$enum":[
"workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio"
]
},
"description":"The database is not being saturated by the workload."
},
"performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
@ -249,7 +268,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"description":"The database is not being saturated by the workload."
},
"batch_transactions_per_second_limit":0,
"transactions_per_second_limit":0,
"batch_released_transactions_per_second":0,
"released_transactions_per_second":0,
"limiting_queue_bytes_storage_server":0,
"worst_queue_bytes_storage_server":0,

View File

@ -51,24 +51,26 @@ struct DataDistributorInterface {
struct GetRateInfoRequest {
UID requesterID;
int64_t totalReleasedTransactions;
int64_t batchReleasedTransactions;
ReplyPromise<struct GetRateInfoReply> reply;
GetRateInfoRequest() {}
GetRateInfoRequest( UID const& requesterID, int64_t totalReleasedTransactions ) : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions) {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions) : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, reply);
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, reply);
}
};
struct GetRateInfoReply {
double transactionRate;
double batchTransactionRate;
double leaseDuration;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionRate, leaseDuration);
serializer(ar, transactionRate, batchTransactionRate, leaseDuration);
}
};

View File

@ -346,11 +346,15 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
bool smallStorageTarget = randomize && BUGGIFY;
init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3;
init( SPRING_BYTES_STORAGE_SERVER, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER = 300e3;
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 500e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 50e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
bool smallTlogTarget = randomize && BUGGIFY;
init( TARGET_BYTES_PER_TLOG, 2400e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3;
init( TARGET_BYTES_PER_TLOG_BATCH, 1000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG_BATCH = 1000e3;
init( SPRING_BYTES_TLOG_BATCH, 200e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG_BATCH = 100e3;
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3;
init( TLOG_RECOVER_MEMORY_LIMIT, TARGET_BYTES_PER_TLOG + SPRING_BYTES_TLOG );
@ -361,6 +365,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MIN_FREE_SPACE_RATIO, 0.05 );
init( MAX_TL_SS_VERSION_DIFFERENCE, 1e99 ); // if( randomize && BUGGIFY ) MAX_TL_SS_VERSION_DIFFERENCE = std::max(1.0, 0.25 * VERSIONS_PER_SECOND); // spring starts at half this value //FIXME: this knob causes ratekeeper to clamp on idle cluster in simulation that have a large number of logs
init( MAX_TL_SS_VERSION_DIFFERENCE_BATCH, 1e99 );
init( MAX_MACHINES_FALLING_BEHIND, 1 );
//Storage Metrics

View File

@ -285,10 +285,14 @@ public:
double LAST_LIMITED_RATIO;
int64_t TARGET_BYTES_PER_STORAGE_SERVER;
double SPRING_BYTES_STORAGE_SERVER;
int64_t SPRING_BYTES_STORAGE_SERVER;
int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH;
int64_t SPRING_BYTES_STORAGE_SERVER_BATCH;
int64_t TARGET_BYTES_PER_TLOG;
double SPRING_BYTES_TLOG;
int64_t SPRING_BYTES_TLOG;
int64_t TARGET_BYTES_PER_TLOG_BATCH;
int64_t SPRING_BYTES_TLOG_BATCH;
int64_t TLOG_SPILL_THRESHOLD;
int64_t TLOG_HARD_LIMIT_BYTES;
int64_t TLOG_RECOVER_MEMORY_LIMIT;
@ -299,6 +303,7 @@ public:
double MIN_FREE_SPACE_RATIO;
double MAX_TL_SS_VERSION_DIFFERENCE; // spring starts at half this value
double MAX_TL_SS_VERSION_DIFFERENCE_BATCH;
int MAX_MACHINES_FALLING_BEHIND;
//Storage Metrics

View File

@ -87,13 +87,16 @@ Future<Void> forwardValue(Promise<T> out, Future<T> in)
int getBytes(Promise<Version> const& r) { return 0; }
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate, double* outBatchTransactionRate) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
state int64_t lastTC = 0;
if (db->get().distributor.present()) nextRequestTimer = Void();
if (db->get().distributor.present()) {
nextRequestTimer = Void();
}
loop choose {
when ( wait( db->onChange() ) ) {
if ( db->get().distributor.present() ) {
@ -108,19 +111,21 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount)));
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount)));
}
when ( GetRateInfoReply rep = wait(reply) ) {
reply = Never();
*outTransactionRate = rep.transactionRate;
// TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
*outBatchTransactionRate = rep.batchTransactionRate;
//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
}
when ( wait(leaseTimeout ) ) {
when ( wait( leaseTimeout ) ) {
*outTransactionRate = 0;
// TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("Lease", "Expired");
*outBatchTransactionRate = 0;
//TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("BatchRate", 0).detail("Lease", "Expired");
leaseTimeout = Never();
}
}
@ -1078,6 +1083,27 @@ ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
}
}
struct TransactionRateInfo {
double rate;
double budget;
double limit;
TransactionRateInfo(double rate) : rate(rate), budget(0), limit(0) {}
void reset(double elapsed) {
this->limit = std::min(rate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START) + budget;
}
bool canStart(int64_t numToStart, int64_t numAlreadyStarted) {
return numToStart + numAlreadyStarted < limit || numToStart * g_random->random01() + numAlreadyStarted < limit - std::max(0.0, budget);
}
void updateBudget(int64_t numStarted) {
budget = std::max(std::min<double>(limit - numStarted, SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE), -SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE);
}
};
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
GetReadVersionReply reply = wait(replyFuture);
double end = timer();
@ -1101,13 +1127,15 @@ ACTOR static Future<Void> transactionStarter(
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
state int64_t transactionCount = 0;
state double transactionBudget = 0;
state double transactionRate = 10;
state int64_t batchTransactionCount = 0;
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state std::priority_queue<std::pair<GetReadVersionRequest, int64_t>, std::vector<std::pair<GetReadVersionRequest, int64_t>>> transactionQueue;
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send( getRate(proxy.id(), db, &transactionCount, &transactionRate) );
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate));
addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
// Get a list of the other proxies that go together with us
@ -1130,7 +1158,9 @@ ACTOR static Future<Void> transactionStarter(
lastGRVTime = t;
if(elapsed == 0) elapsed = 1e-15; // resolve a possible indeterminant multiplication with infinite transaction rate
double nTransactionsToStart = std::min(transactionRate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START) + transactionBudget;
normalRateInfo.reset(elapsed);
batchRateInfo.reset(elapsed);
int transactionsStarted[2] = {0,0};
int systemTransactionsStarted[2] = {0,0};
@ -1141,13 +1171,17 @@ ACTOR static Future<Void> transactionStarter(
Optional<UID> debugID;
double leftToStart = 0;
double batchLeftToStart = 0;
while (!transactionQueue.empty()) {
auto& req = transactionQueue.top().first;
int tc = req.transactionCount;
leftToStart = nTransactionsToStart - transactionsStarted[0] - transactionsStarted[1];
bool startNext = tc < leftToStart || req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE || tc * g_random->random01() < leftToStart - std::max(0.0, transactionBudget);
if (!startNext) break;
if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
break;
}
else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
break;
}
if (req.debugID.present()) {
if (!debugID.present()) debugID = g_nondeterministic_random->randomUniqueID();
@ -1178,10 +1212,15 @@ ACTOR static Future<Void> transactionStarter(
.detail("NumSystemTransactionsStarted", systemTransactionsStarted[0] + systemTransactionsStarted[1])
.detail("NumNonSystemTransactionsStarted", transactionsStarted[0] + transactionsStarted[1] - systemTransactionsStarted[0] - systemTransactionsStarted[1])
.detail("TransactionBudget", transactionBudget)
.detail("LastLeftToStart", leftToStart);*/
.detail("BatchTransactionBudget", batchTransactionBudget)
.detail("LastLeftToStart", leftToStart)
.detail("LastBatchLeftToStart", batchLeftToStart);*/
transactionCount += transactionsStarted[0] + transactionsStarted[1];
transactionBudget = std::max(std::min(nTransactionsToStart - transactionsStarted[0] - transactionsStarted[1], SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE), -SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE);
batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");

View File

@ -85,11 +85,10 @@ struct StorageQueueInfo {
Smoother smoothDurableVersion, smoothLatestVersion;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
limitReason_t limitReason;
StorageQueueInfo(UID id, LocalityData locality) : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(1.), smoothLatestVersion(1.), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited)
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT)
{
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
@ -112,25 +111,61 @@ struct TLogQueueInfo {
}
};
struct RatekeeperLimits {
double tpsLimit;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle reasonMetric;
int64_t storageTargetBytes;
int64_t storageSpringBytes;
int64_t logTargetBytes;
int64_t logSpringBytes;
int64_t maxVersionDifference;
std::string context;
RatekeeperLimits(std::string context, int64_t storageTargetBytes, int64_t storageSpringBytes, int64_t logTargetBytes, int64_t logSpringBytes, int64_t maxVersionDifference) :
tpsLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)),
storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes),
logTargetBytes(logTargetBytes),
logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
context(context)
{}
};
struct TransactionCounts {
int64_t total;
int64_t batch;
double time;
TransactionCounts() : total(0), batch(0), time(0) {}
};
struct Ratekeeper {
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, std::pair<int64_t, double> > proxy_transactionCountAndTime;
Smoother smoothReleasedTransactions, smoothTotalDurableBytes;
double TPSLimit;
std::map<UID, TransactionCounts> proxy_transactionCounts;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
DatabaseConfiguration configuration;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle actualTpsMetric;
Int64MetricHandle reasonMetric;
double lastWarning;
double* lastLimited;
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), TPSLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(LiteralStringRef("Ratekeeper.TPSLimit")),
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
reasonMetric(LiteralStringRef("Ratekeeper.Reason")),
lastWarning(0)
lastWarning(0),
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE),
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH)
{}
};
@ -248,15 +283,15 @@ ACTOR Future<Void> trackEachStorageServer(
}
}
void updateRate( Ratekeeper* self ) {
void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
//double controlFactor = ; // dt / eFoldingTime
double actualTPS = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTPS;
double actualTps = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTps;
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
actualTPS = std::max( std::max( 1.0, actualTPS ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
self->TPSLimit = std::numeric_limits<double>::infinity();
limits.tpsLimit = std::numeric_limits<double>::infinity();
UID reasonID = UID();
limitReason_t limitReason = limitReason_t::unlimited;
@ -266,7 +301,8 @@ void updateRate( Ratekeeper* self ) {
int64_t worstStorageQueueStorageServer = 0;
int64_t limitingStorageQueueStorageServer = 0;
std::multimap<double, StorageQueueInfo*> storageTPSLimitReverseIndex;
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
std::map<UID, limitReason_t> ssReasons;
// Look at each storage server's write queue, compute and store the desired rate ratio
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
@ -274,19 +310,19 @@ void updateRate( Ratekeeper* self ) {
if (!ss.valid) continue;
++sscount;
ss.limitReason = limitReason_t::unlimited;
limitReason_t ssLimitReason = limitReason_t::unlimited;
int64_t minFreeSpace = std::max(SERVER_KNOBS->MIN_FREE_SPACE, (int64_t)(SERVER_KNOBS->MIN_FREE_SPACE_RATIO * ss.smoothTotalSpace.smoothTotal()));
worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.storageTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
ss.limitReason = limitReason_t::storage_server_min_free_space;
ssLimitReason = limitReason_t::storage_server_min_free_space;
} else {
ss.limitReason = limitReason_t::storage_server_min_free_space_ratio;
ssLimitReason = limitReason_t::storage_server_min_free_space_ratio;
}
}
@ -296,10 +332,11 @@ void updateRate( Ratekeeper* self ) {
double targetRateRatio = std::min(( b + springBytes ) / (double)springBytes, 2.0);
double inputRate = ss.smoothInputBytes.smoothRate();
//inputRate = std::max( inputRate, actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
/*if( g_random->random01() < 0.1 ) {
TraceEvent("RateKeeperUpdateRate", ss.id)
std::string name = "RateKeeperUpdateRate" + limits.context;
TraceEvent(name, ss.id)
.detail("MinFreeSpace", minFreeSpace)
.detail("SpringBytes", springBytes)
.detail("TargetBytes", targetBytes)
@ -309,7 +346,7 @@ void updateRate( Ratekeeper* self ) {
.detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal())
.detail("TargetRateRatio", targetRateRatio)
.detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate())
.detail("ActualTPS", actualTPS)
.detail("ActualTPS", actualTps)
.detail("InputRate", inputRate)
.detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate())
.detail("B", b);
@ -317,33 +354,35 @@ void updateRate( Ratekeeper* self ) {
// Don't let any storage server use up its target bytes faster than its MVCC window!
double maxBytesPerSecond = (targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0);
double limitTPS = std::min(actualTPS * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ss.limitReason == limitReason_t::unlimited)
ss.limitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
double limitTps = std::min(actualTps * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ssLimitReason == limitReason_t::unlimited)
ssLimitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
if (targetRateRatio > 0 && inputRate > 0) {
ASSERT(inputRate != 0);
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double x = smoothedRate / (inputRate * targetRateRatio);
double lim = actualTPS * x;
if (lim < limitTPS) {
limitTPS = lim;
if (ss.limitReason == limitReason_t::unlimited || ss.limitReason == limitReason_t::storage_server_write_bandwidth_mvcc)
ss.limitReason = limitReason_t::storage_server_write_queue_size;
double lim = actualTps * x;
if (lim < limitTps) {
limitTps = lim;
if (ssLimitReason == limitReason_t::unlimited || ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc)
ssLimitReason = limitReason_t::storage_server_write_queue_size;
}
}
storageTPSLimitReverseIndex.insert(std::make_pair(limitTPS, &ss));
storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss));
if(limitTPS < self->TPSLimit && (ss.limitReason == limitReason_t::storage_server_min_free_space || ss.limitReason == limitReason_t::storage_server_min_free_space_ratio)) {
if(limitTps < limits.tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
reasonID = ss.id;
self->TPSLimit = limitTPS;
limitReason = ss.limitReason;
limits.tpsLimit = limitTps;
limitReason = ssLimitReason;
}
ssReasons[ss.id] = ssLimitReason;
}
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
for(auto ss = storageTPSLimitReverseIndex.begin(); ss != storageTPSLimitReverseIndex.end() && ss->first < self->TPSLimit; ++ss) {
for(auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits.tpsLimit; ++ss) {
if(ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
ignoredMachines.insert(ss->second->locality.zoneId());
continue;
@ -353,9 +392,9 @@ void updateRate( Ratekeeper* self ) {
}
limitingStorageQueueStorageServer = ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal();
self->TPSLimit = ss->first;
limitReason = storageTPSLimitReverseIndex.begin()->second->limitReason;
reasonID = storageTPSLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
limits.tpsLimit = ss->first;
limitReason = ssReasons[storageTpsLimitReverseIndex.begin()->second->id];
reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
break;
}
@ -387,7 +426,7 @@ void updateRate( Ratekeeper* self ) {
}
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE/2) / (SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE/4);
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits.maxVersionDifference/2) / (limits.maxVersionDifference/4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer);
}
@ -406,9 +445,9 @@ void updateRate( Ratekeeper* self ) {
worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->SPRING_BYTES_TLOG, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->TARGET_BYTES_PER_TLOG, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != SERVER_KNOBS->TARGET_BYTES_PER_TLOG) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.logTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
tlogLimitReason = limitReason_t::log_server_min_free_space;
} else {
@ -427,7 +466,7 @@ void updateRate( Ratekeeper* self ) {
}
reasonID = tl.id;
limitReason = limitReason_t::log_server_min_free_space;
self->TPSLimit = 0.0;
limits.tpsLimit = 0.0;
}
double targetRateRatio = std::min( ( b + springBytes ) / (double)springBytes, 2.0 );
@ -440,13 +479,13 @@ void updateRate( Ratekeeper* self ) {
double inputRate = tl.smoothInputBytes.smoothRate();
if (targetRateRatio > 0) {
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double x = smoothedRate / (inputRate * targetRateRatio);
if (targetRateRatio < .75) //< FIXME: KNOB for 2.0
x = std::max(x, 0.95);
double lim = actualTPS * x;
if (lim < self->TPSLimit){
self->TPSLimit = lim;
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
reasonID = tl.id;
limitReason = tlogLimitReason;
}
@ -454,19 +493,19 @@ void updateRate( Ratekeeper* self ) {
if (inputRate > 0) {
// Don't let any tlogs use up its target bytes faster than its MVCC window!
double x = ((targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)) / inputRate;
double lim = actualTPS * x;
if (lim < self->TPSLimit){
self->TPSLimit = lim;
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
reasonID = tl.id;
limitReason = limitReason_t::log_server_mvcc_write_bandwidth;
}
}
}
self->TPSLimit = std::max(self->TPSLimit, 0.0);
limits.tpsLimit = std::max(limits.tpsLimit, 0.0);
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
self->TPSLimit = std::max(self->TPSLimit, 100.0);
limits.tpsLimit = std::max(limits.tpsLimit, 100.0);
}
int64_t totalDiskUsageBytes = 0;
@ -477,22 +516,20 @@ void updateRate( Ratekeeper* self ) {
if (s.value.valid)
totalDiskUsageBytes += s.value.lastReply.storageBytes.used;
self->tpsLimitMetric = std::min(self->TPSLimit, 1e6);
self->reasonMetric = limitReason;
limits.tpsLimitMetric = std::min(limits.tpsLimit, 1e6);
limits.reasonMetric = limitReason;
if( self->smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self->TPSLimit ) {
(*self->lastLimited) = now();
}
if (g_random->random01() < 0.1){
TraceEvent("RkUpdate")
.detail("TPSLimit", self->TPSLimit)
if (g_random->random01() < 0.1) {
std::string name = "RkUpdate" + limits.context;
TraceEvent(name.c_str())
.detail("TPSLimit", limits.tpsLimit)
.detail("Reason", limitReason)
.detail("ReasonServerID", reasonID)
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTPS)
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTps)
.detail("StorageServers", sscount)
.detail("Proxies", self->proxy_transactionCountAndTime.size())
.detail("Proxies", self->proxy_transactionCounts.size())
.detail("TLogs", tlcount)
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
@ -502,7 +539,7 @@ void updateRate( Ratekeeper* self ) {
.detail("TotalDiskUsageBytes", totalDiskUsageBytes)
.detail("WorstStorageServerVersionLag", worstVersionLag)
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.trackLatest("RkUpdate");
.trackLatest(name.c_str());
}
}
@ -561,11 +598,18 @@ ACTOR Future<Void> rateKeeper(
choose {
when (wait( track )) { break; }
when (wait( timeout )) {
updateRate( &self );
updateRate(&self, self.normalLimits);
updateRate(&self, self.batchLimits);
if(self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit) {
*self.lastLimited = now();
}
double tooOld = now() - 1.0;
for(auto p=self.proxy_transactionCountAndTime.begin(); p!=self.proxy_transactionCountAndTime.end(); ) {
if (p->second.second < tooOld)
p = self.proxy_transactionCountAndTime.erase(p);
for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) {
if (p->second.time < tooOld)
p = self.proxy_transactionCounts.erase(p);
else
++p;
}
@ -574,15 +618,21 @@ ACTOR Future<Void> rateKeeper(
when (GetRateInfoRequest req = waitNext(getRateInfo)) {
GetRateInfoReply reply;
auto& p = self.proxy_transactionCountAndTime[ req.requesterID ];
auto& p = self.proxy_transactionCounts[ req.requesterID ];
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
if (p.first > 0)
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.first );
if (p.total > 0) {
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.total );
}
if(p.batch > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batch );
}
p.first = req.totalReleasedTransactions;
p.second = now();
p.total = req.totalReleasedTransactions;
p.batch = req.batchReleasedTransactions;
p.time = now();
reply.transactionRate = self.TPSLimit / self.proxy_transactionCountAndTime.size();
reply.transactionRate = self.normalLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
req.reply.send( reply );
}

View File

@ -209,37 +209,6 @@ protected:
int64_t counter;
};
static double parseDouble(std::string const& s, bool permissive = false) {
double d = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lf%n", &d, &consumed);
if (r == 1 && (consumed == s.size() || permissive))
return d;
throw attribute_not_found();
}
static int parseInt(std::string const& s, bool permissive = false) {
long long int iLong = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &iLong, &consumed);
if (r == 1 && (consumed == s.size() || permissive)){
if (std::numeric_limits<int>::min() <= iLong && iLong <= std::numeric_limits<int>::max())
return (int)iLong; // Downcast definitely safe
else
throw attribute_too_large();
}
throw attribute_not_found();
}
static int64_t parseInt64(std::string const& s, bool permissive = false) {
long long int i = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &i, &consumed);
if (r == 1 && (consumed == s.size() || permissive))
return i;
throw attribute_not_found();
}
static JsonBuilderObject getLocalityInfo(const LocalityData& locality) {
JsonBuilderObject localityObj;
@ -345,8 +314,8 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
statusObj["memory"] = memoryObj;
JsonBuilderObject cpuObj;
double cpu_seconds = parseDouble(event.getValue("CPUSeconds"));
double elapsed = parseDouble(event.getValue("Elapsed"));
double cpu_seconds = event.getDouble("CPUSeconds");
double elapsed = event.getDouble("Elapsed");
if (elapsed > 0){
cpuObj["logical_core_utilization"] = std::max(0.0, std::min(cpu_seconds / elapsed, 1.0));
}
@ -356,7 +325,7 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
networkObj["megabits_sent"] = JsonBuilderObject().setKeyRawNumber("hz", event.getValue("MbpsSent"));
networkObj["megabits_received"] = JsonBuilderObject().setKeyRawNumber("hz", event.getValue("MbpsReceived"));
metric = parseDouble(event.getValue("RetransSegs"));
metric = event.getDouble("RetransSegs");
JsonBuilderObject retransSegsObj;
if (elapsed > 0){
retransSegsObj["hz"] = metric / elapsed;
@ -460,13 +429,13 @@ struct RolesInfo {
obj["mutation_bytes"] = StatusCounter(storageMetrics.getValue("MutationBytes")).getStatus();
obj["mutations"] = StatusCounter(storageMetrics.getValue("Mutations")).getStatus();
Version version = parseInt64(storageMetrics.getValue("Version"));
Version durableVersion = parseInt64(storageMetrics.getValue("DurableVersion"));
Version version = storageMetrics.getInt64("Version");
Version durableVersion = storageMetrics.getInt64("DurableVersion");
obj["data_version"] = version;
obj["durable_version"] = durableVersion;
int64_t versionLag = parseInt64(storageMetrics.getValue("VersionLag"));
int64_t versionLag = storageMetrics.getInt64("VersionLag");
if(maxTLogVersion > 0) {
// It's possible that the storage server hasn't talked to the logs recently, in which case it may not be aware of how far behind it is.
// To account for that, we also compute the version difference between each storage server and the tlog with the largest version.
@ -522,7 +491,7 @@ struct RolesInfo {
obj.setKeyRawNumber("queue_disk_total_bytes", tlogMetrics.getValue("QueueDiskBytesTotal"));
obj["input_bytes"] = StatusCounter(tlogMetrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(tlogMetrics.getValue("BytesDurable")).getStatus();
metricVersion = parseInt64(tlogMetrics.getValue("Version"));
metricVersion = tlogMetrics.getInt64("Version");
obj["data_version"] = metricVersion;
} catch (Error& e) {
if(e.code() != error_code_attribute_not_found)
@ -622,7 +591,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
if(memInfo->second.valid()) {
if(processMetrics.size() > 0) {
memInfo->second.memoryUsage += parseDouble(processMetrics.getValue("Memory"));
memInfo->second.memoryUsage += processMetrics.getDouble("Memory");
++memInfo->second.numProcesses;
}
else
@ -697,11 +666,11 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
statusObj.setKeyRawNumber("uptime_seconds",event.getValue("UptimeSeconds"));
// rates are calculated over the last elapsed seconds
double elapsed = parseDouble(event.getValue("Elapsed"));;
double cpu_seconds = parseDouble(event.getValue("CPUSeconds"));
double diskIdleSeconds = parseDouble(event.getValue("DiskIdleSeconds"));
double diskReads = parseDouble(event.getValue("DiskReads"));
double diskWrites = parseDouble(event.getValue("DiskWrites"));
double elapsed = event.getDouble("Elapsed");
double cpu_seconds = event.getDouble("CPUSeconds");
double diskIdleSeconds = event.getDouble("DiskIdleSeconds");
double diskReads = event.getDouble("DiskReads");
double diskWrites = event.getDouble("DiskWrites");
JsonBuilderObject diskObj;
if (elapsed > 0){
@ -779,7 +748,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
// if this process address is in the machine metrics
if (mMetrics.count(address) && mMetrics[address].size()){
double availableMemory;
availableMemory = parseDouble(mMetrics[address].getValue("AvailableMemory"));
availableMemory = mMetrics[address].getDouble("AvailableMemory");
auto machineMemInfo = machineMemoryUsage[workerItr->first.locality.machineId()];
if (machineMemInfo.valid()) {
@ -883,7 +852,7 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(std::pair<Work
try {
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = parseInt( md.getValue("StatusCode") );
state int mStatusCode = md.getInt("StatusCode");
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -1162,9 +1131,9 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterfa
// If we have a MovingData message, parse it.
if (md.size())
{
int64_t partitionsInQueue = parseInt64(md.getValue("InQueue"));
int64_t partitionsInFlight = parseInt64(md.getValue("InFlight"));
int64_t averagePartitionSize = parseInt64(md.getValue("AverageShardSize"));
int64_t partitionsInQueue = md.getInt64("InQueue");
int64_t partitionsInFlight = md.getInt64("InFlight");
int64_t averagePartitionSize = md.getInt64("AverageShardSize");
if( averagePartitionSize >= 0 ) {
JsonBuilderObject moving_data;
@ -1192,8 +1161,8 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterfa
continue;
}
bool primary = parseInt(inFlight.getValue("Primary"));
int highestPriority = parseInt(inFlight.getValue("HighestPriority"));
bool primary = inFlight.getInt("Primary");
int highestPriority = inFlight.getInt("HighestPriority");
JsonBuilderObject team_tracker;
team_tracker["primary"] = primary;
@ -1388,6 +1357,30 @@ static int getExtraTLogEligibleMachines(const vector<std::pair<WorkerInterface,
return extraTlogEligibleMachines;
}
JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transPerSec, double tpsLimit) {
int reason = ratekeeper.getInt("Reason");
JsonBuilderObject perfLimit;
if (transPerSec > tpsLimit * 0.8) {
// If reason is known, set qos.performance_limited_by, otherwise omit
if (reason >= 0 && reason < limitReasonEnd) {
perfLimit = JsonString::makeMessage(limitReasonName[reason], limitReasonDesc[reason]);
std::string reason_server_id = ratekeeper.getValue("ReasonServerID");
if (!reason_server_id.empty())
perfLimit["reason_server_id"] = reason_server_id;
}
}
else {
perfLimit = JsonString::makeMessage("workload", "The database is not being saturated by the workload.");
}
if(!perfLimit.empty()) {
perfLimit["reason_id"] = reason;
}
return perfLimit;
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::pair<WorkerInterface, ProcessClass> ddWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
@ -1440,50 +1433,46 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Transactions
try {
TraceEventFields md = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
double tpsLimit = parseDouble(md.getValue("TPSLimit"));
double transPerSec = parseDouble(md.getValue("ReleasedTPS"));
int ssCount = parseInt(md.getValue("StorageServers"));
int tlogCount = parseInt(md.getValue("TLogs"));
int64_t worstFreeSpaceStorageServer = parseInt64(md.getValue("WorstFreeSpaceStorageServer"));
int64_t worstFreeSpaceTLog = parseInt64(md.getValue("WorstFreeSpaceTLog"));
(*data_overlay).setKeyRawNumber("total_disk_used_bytes",md.getValue("TotalDiskUsageBytes"));
state TraceEventFields ratekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
double tpsLimit = ratekeeper.getDouble("TPSLimit");
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
int ssCount = ratekeeper.getInt("StorageServers");
int tlogCount = ratekeeper.getInt("TLogs");
int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
int64_t worstFreeSpaceTLog = ratekeeper.getInt64("WorstFreeSpaceTLog");
(*data_overlay).setKeyRawNumber("total_disk_used_bytes",ratekeeper.getValue("TotalDiskUsageBytes"));
if(ssCount > 0) {
(*data_overlay)["least_operating_space_bytes_storage_server"] = std::max(worstFreeSpaceStorageServer, (int64_t)0);
(*qos).setKeyRawNumber("worst_queue_bytes_storage_server",md.getValue("WorstStorageServerQueue"));
(*qos).setKeyRawNumber("limiting_queue_bytes_storage_server",md.getValue("LimitingStorageServerQueue"));
(*qos).setKeyRawNumber("worst_version_lag_storage_server",md.getValue("WorstStorageServerVersionLag"));
(*qos).setKeyRawNumber("limiting_version_lag_storage_server",md.getValue("LimitingStorageServerVersionLag"));
(*qos).setKeyRawNumber("worst_queue_bytes_storage_server", ratekeeper.getValue("WorstStorageServerQueue"));
(*qos).setKeyRawNumber("limiting_queue_bytes_storage_server", ratekeeper.getValue("LimitingStorageServerQueue"));
(*qos).setKeyRawNumber("worst_version_lag_storage_server", ratekeeper.getValue("WorstStorageServerVersionLag"));
(*qos).setKeyRawNumber("limiting_version_lag_storage_server", ratekeeper.getValue("LimitingStorageServerVersionLag"));
}
if(tlogCount > 0) {
(*data_overlay)["least_operating_space_bytes_log_server"] = std::max(worstFreeSpaceTLog, (int64_t)0);
(*qos).setKeyRawNumber("worst_queue_bytes_log_server",md.getValue("WorstTLogQueue"));
(*qos).setKeyRawNumber("worst_queue_bytes_log_server", ratekeeper.getValue("WorstTLogQueue"));
}
(*qos)["transactions_per_second_limit"] = tpsLimit;
(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
(*qos)["released_transactions_per_second"] = transPerSec;
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
int reason = parseInt(md.getValue("Reason"));
JsonBuilderObject perfLimit;
if (transPerSec > tpsLimit * 0.8) {
// If reason is known, set qos.performance_limited_by, otherwise omit
if (reason >= 0 && reason < limitReasonEnd) {
perfLimit = JsonString::makeMessage(limitReasonName[reason], limitReasonDesc[reason]);
std::string reason_server_id = md.getValue("ReasonServerID");
if (!reason_server_id.empty())
perfLimit["reason_server_id"] = reason_server_id;
}
}
else {
perfLimit = JsonString::makeMessage("workload", "The database is not being saturated by the workload.");
}
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
if(!perfLimit.empty()) {
perfLimit["reason_id"] = reason;
(*qos)["performance_limited_by"] = perfLimit;
}
JsonBuilderObject batchPerfLimit = getPerfLimit(batchRatekeeper, transPerSec, batchTpsLimit);
if(!batchPerfLimit.empty()) {
(*qos)["batch_performance_limited_by"] = batchPerfLimit;
}
} catch (Error &e){
if (e.code() == error_code_actor_cancelled)
throw;

View File

@ -587,7 +587,7 @@ public:
}
double getPenalty() {
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
}
};

View File

@ -150,8 +150,10 @@ int getOption( VectorRef<KeyValueRef> options, Key key, int defaultValue) {
if( sscanf(options[i].value.toString().c_str(), "%d", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;
@ -164,8 +166,10 @@ uint64_t getOption( VectorRef<KeyValueRef> options, Key key, uint64_t defaultVal
if( sscanf(options[i].value.toString().c_str(), "%lld", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;
@ -178,8 +182,10 @@ int64_t getOption( VectorRef<KeyValueRef> options, Key key, int64_t defaultValue
if( sscanf(options[i].value.toString().c_str(), "%lld", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;

View File

@ -96,6 +96,9 @@ struct ReadWriteWorkload : KVWorkload {
bool useRYW;
bool rampTransactionType;
bool rampUpConcurrency;
bool batchPriority;
Standalone<StringRef> descriptionString;
Int64MetricHandle totalReadsMetric;
Int64MetricHandle totalRetriesMetric;
@ -174,6 +177,8 @@ struct ReadWriteWorkload : KVWorkload {
rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false);
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
doSetup = getOption(options, LiteralStringRef("setup"), true);
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
if (rampUpConcurrency) ASSERT( rampSweepCount == 2 ); // Implementation is hard coded to ramp up and down
@ -213,7 +218,7 @@ struct ReadWriteWorkload : KVWorkload {
}
}
virtual std::string description() { return "ReadWrite"; }
virtual std::string description() { return descriptionString.toString(); }
virtual Future<Void> setup( Database const& cx ) { return _setup( cx, this ); }
virtual Future<Void> start( Database const& cx ) { return _start( cx, this ); }
@ -304,6 +309,13 @@ struct ReadWriteWorkload : KVWorkload {
return KeyValueRef( keyForIndex( n, false ), randomValue() );
}
template <class Trans>
void setupTransaction(Trans *tr) {
if(batchPriority) {
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
}
}
ACTOR static Future<Void> tracePeriodically( ReadWriteWorkload *self ) {
state double start = now();
state double elapsed = 0.0;
@ -313,10 +325,10 @@ struct ReadWriteWorkload : KVWorkload {
elapsed += self->periodicLoggingInterval;
wait( delayUntil(start + elapsed) );
TraceEvent("RW_RowReadLatency").detail("Mean", self->readLatencies.mean()).detail("Median", self->readLatencies.median()).detail("Percentile5", self->readLatencies.percentile(.05)).detail("Percentile95", self->readLatencies.percentile(.95)).detail("Count", self->readLatencyCount).detail("Elapsed", elapsed);
TraceEvent("RW_GRVLatency").detail("Mean", self->GRVLatencies.mean()).detail("Median", self->GRVLatencies.median()).detail("Percentile5", self->GRVLatencies.percentile(.05)).detail("Percentile95", self->GRVLatencies.percentile(.95));
TraceEvent("RW_CommitLatency").detail("Mean", self->commitLatencies.mean()).detail("Median", self->commitLatencies.median()).detail("Percentile5", self->commitLatencies.percentile(.05)).detail("Percentile95", self->commitLatencies.percentile(.95));
TraceEvent("RW_TotalLatency").detail("Mean", self->latencies.mean()).detail("Median", self->latencies.median()).detail("Percentile5", self->latencies.percentile(.05)).detail("Percentile95", self->latencies.percentile(.95));
TraceEvent((self->description() + "_RowReadLatency").c_str()).detail("Mean", self->readLatencies.mean()).detail("Median", self->readLatencies.median()).detail("Percentile5", self->readLatencies.percentile(.05)).detail("Percentile95", self->readLatencies.percentile(.95)).detail("Count", self->readLatencyCount).detail("Elapsed", elapsed);
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->GRVLatencies.mean()).detail("Median", self->GRVLatencies.median()).detail("Percentile5", self->GRVLatencies.percentile(.05)).detail("Percentile95", self->GRVLatencies.percentile(.95));
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->commitLatencies.mean()).detail("Median", self->commitLatencies.median()).detail("Percentile5", self->commitLatencies.percentile(.05)).detail("Percentile95", self->commitLatencies.percentile(.95));
TraceEvent((self->description() + "_TotalLatency").c_str()).detail("Mean", self->latencies.mean()).detail("Median", self->latencies.median()).detail("Percentile5", self->latencies.percentile(.05)).detail("Percentile95", self->latencies.percentile(.95));
int64_t ops = (self->aTransactions.getValue() * (self->readsPerTransactionA+self->writesPerTransactionA)) +
(self->bTransactions.getValue() * (self->readsPerTransactionB+self->writesPerTransactionB));
@ -456,7 +468,9 @@ struct ReadWriteWorkload : KVWorkload {
state double startTime = now();
loop {
state Transaction tr(cx);
try {
self->setupTransaction(&tr);
wait( self->readOp( &tr, keys, self, false ) );
wait( tr.warmRange( cx, allKeys ) );
break;
@ -564,6 +578,7 @@ struct ReadWriteWorkload : KVWorkload {
extra_ranges.push_back(singleKeyRange( g_random->randomUniqueID().toString() ));
state Trans tr(cx);
if(tstart - self->clientBegin > self->debugTime && tstart - self->clientBegin <= self->debugTime + self->debugInterval) {
debugID = g_random->randomUniqueID();
tr.debugTransaction(debugID);
@ -578,6 +593,8 @@ struct ReadWriteWorkload : KVWorkload {
loop{
try {
self->setupTransaction(&tr);
GRVStartTime = now();
self->transactionFailureMetric->startLatency = -1;

View File

@ -357,7 +357,7 @@ struct WriteDuringReadWorkload : TestWorkload {
}
}
ACTOR Future<Void> commitAndUpdateMemory( ReadYourWritesTransaction *tr, WriteDuringReadWorkload* self, bool *cancelled, bool readYourWritesDisabled, bool snapshotRYWDisabled, bool readAheadDisabled, bool* doingCommit, double* startTime, Key timebombStr ) {
ACTOR Future<Void> commitAndUpdateMemory( ReadYourWritesTransaction *tr, WriteDuringReadWorkload* self, bool *cancelled, bool readYourWritesDisabled, bool snapshotRYWDisabled, bool readAheadDisabled, bool useBatchPriority, bool* doingCommit, double* startTime, Key timebombStr ) {
state UID randomID = g_nondeterministic_random->randomUniqueID();
//TraceEvent("WDRCommit", randomID);
try {
@ -407,6 +407,8 @@ struct WriteDuringReadWorkload : TestWorkload {
tr->setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE);
if(readAheadDisabled)
tr->setOption(FDBTransactionOptions::READ_AHEAD_DISABLE);
if(useBatchPriority)
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
if(self->useSystemKeys)
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->addWriteConflictRange( self->conflictRange );
@ -574,6 +576,7 @@ struct WriteDuringReadWorkload : TestWorkload {
state bool readYourWritesDisabled = g_random->random01() < 0.5;
state bool readAheadDisabled = g_random->random01() < 0.5;
state bool snapshotRYWDisabled = g_random->random01() < 0.5;
state bool useBatchPriority = g_random->random01() < 0.5;
state int64_t timebomb = g_random->random01() < 0.01 ? g_random->randomInt64(1, 6000) : 0;
state std::vector<Future<Void>> operations;
state ActorCollection commits(false);
@ -614,6 +617,8 @@ struct WriteDuringReadWorkload : TestWorkload {
tr.setOption( FDBTransactionOptions::SNAPSHOT_RYW_DISABLE );
if( readAheadDisabled )
tr.setOption( FDBTransactionOptions::READ_AHEAD_DISABLE );
if( useBatchPriority )
tr.setOption( FDBTransactionOptions::PRIORITY_BATCH );
if( self->useSystemKeys )
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::TIMEOUT, timebombStr );
@ -647,7 +652,7 @@ struct WriteDuringReadWorkload : TestWorkload {
g_random->random01() > 0.5, readYourWritesDisabled, snapshotRYWDisabled, self, &doingCommit, &memLimit ) );
} else if( operationType == 3 && !disableCommit ) {
if( !self->rarelyCommit || g_random->random01() < 1.0 / self->numOps ) {
Future<Void> commit = self->commitAndUpdateMemory( &tr, self, &cancelled, readYourWritesDisabled, snapshotRYWDisabled, readAheadDisabled, &doingCommit, &startTime, timebombStr );
Future<Void> commit = self->commitAndUpdateMemory( &tr, self, &cancelled, readYourWritesDisabled, snapshotRYWDisabled, readAheadDisabled, useBatchPriority, &doingCommit, &startTime, timebombStr );
operations.push_back( commit );
commits.add( commit );
}

View File

@ -55,7 +55,7 @@ using namespace boost::asio::ip;
//
// xyzdev
// vvvv
const uint64_t currentProtocolVersion = 0x0FDB00B061030001LL;
const uint64_t currentProtocolVersion = 0x0FDB00B061040001LL;
const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -1120,10 +1120,95 @@ std::string TraceEventFields::getValue(std::string key) const {
return value;
}
else {
TraceEvent ev(SevWarn, "TraceEventFieldNotFound");
if(tryGetValue("Type", value)) {
ev.detail("Event", value);
}
ev.detail("FieldName", key);
throw attribute_not_found();
}
}
namespace {
void parseNumericValue(std::string const& s, double &outValue, bool permissive = false) {
double d = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lf%n", &d, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
outValue = d;
return;
}
throw attribute_not_found();
}
void parseNumericValue(std::string const& s, int &outValue, bool permissive = false) {
long long int iLong = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &iLong, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
if (std::numeric_limits<int>::min() <= iLong && iLong <= std::numeric_limits<int>::max()) {
outValue = (int)iLong; // Downcast definitely safe
return;
}
else {
throw attribute_too_large();
}
}
throw attribute_not_found();
}
void parseNumericValue(std::string const& s, int64_t &outValue, bool permissive = false) {
long long int i = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &i, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
outValue = i;
return;
}
throw attribute_not_found();
}
template<class T>
T getNumericValue(TraceEventFields const& fields, std::string key, bool permissive) {
std::string field = fields.getValue(key);
try {
T value;
parseNumericValue(field, value, permissive);
return value;
}
catch(Error &e) {
std::string type;
TraceEvent ev(SevWarn, "ErrorParsingNumericTraceEventField");
ev.error(e);
if(fields.tryGetValue("Type", type)) {
ev.detail("Event", type);
}
ev.detail("FieldName", key);
ev.detail("FieldValue", field);
throw;
}
}
} // namespace
int TraceEventFields::getInt(std::string key, bool permissive) const {
return getNumericValue<int>(*this, key, permissive);
}
int64_t TraceEventFields::getInt64(std::string key, bool permissive) const {
return getNumericValue<int64_t>(*this, key, permissive);
}
double TraceEventFields::getDouble(std::string key, bool permissive) const {
return getNumericValue<double>(*this, key, permissive);
}
std::string TraceEventFields::toString() const {
std::string str;
bool first = true;

View File

@ -71,6 +71,9 @@ public:
const Field &operator[] (int index) const;
bool tryGetValue(std::string key, std::string &outValue) const;
std::string getValue(std::string key) const;
int getInt(std::string key, bool permissive=false) const;
int64_t getInt64(std::string key, bool permissive=false) const;
double getDouble(std::string key, bool permissive=false) const;
std::string toString() const;
void validateFormat() const;