Merge pull request #3152 from ajbeamon/tag-throttling-status-improvements
Add and fix tag throttling status fields
This commit is contained in:
commit
3ee4912312
|
@ -121,6 +121,13 @@
|
||||||
},
|
},
|
||||||
"commit_latency_bands":{
|
"commit_latency_bands":{
|
||||||
"$map_key=upperBoundOfBand": 1
|
"$map_key=upperBoundOfBand": 1
|
||||||
|
},
|
||||||
|
"busiest_read_tag":{
|
||||||
|
"tag": "",
|
||||||
|
"fractional_cost": 0.0,
|
||||||
|
"estimated_cost":{
|
||||||
|
"hz":0.0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
@ -267,8 +274,14 @@
|
||||||
"transactions_per_second_limit":0,
|
"transactions_per_second_limit":0,
|
||||||
"batch_released_transactions_per_second":0,
|
"batch_released_transactions_per_second":0,
|
||||||
"released_transactions_per_second":0,
|
"released_transactions_per_second":0,
|
||||||
"batch_tags_throttled":0,
|
"throttled_tags":{
|
||||||
"tags_throttled":0,
|
"auto":{
|
||||||
|
"count":0
|
||||||
|
},
|
||||||
|
"manual":{
|
||||||
|
"count":0
|
||||||
|
}
|
||||||
|
},
|
||||||
"limiting_queue_bytes_storage_server":0,
|
"limiting_queue_bytes_storage_server":0,
|
||||||
"worst_queue_bytes_storage_server":0,
|
"worst_queue_bytes_storage_server":0,
|
||||||
"limiting_version_lag_storage_server":0,
|
"limiting_version_lag_storage_server":0,
|
||||||
|
|
|
@ -683,6 +683,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
throttleExpirer = recurring([this](){ expireThrottles(); }, CLIENT_KNOBS->TAG_THROTTLE_EXPIRATION_INTERVAL);
|
throttleExpirer = recurring([this](){ expireThrottles(); }, CLIENT_KNOBS->TAG_THROTTLE_EXPIRATION_INTERVAL);
|
||||||
|
|
||||||
|
if(BUGGIFY) {
|
||||||
|
DatabaseContext::debugUseTags = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
|
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
|
||||||
|
@ -2248,7 +2252,7 @@ Future<Standalone<RangeResultRef>> getRange( Database const& cx, Future<Version>
|
||||||
return getRange(cx, Reference<TransactionLogInfo>(), fVersion, begin, end, limits, Promise<std::pair<Key, Key>>(), true, reverse, info, tags);
|
return getRange(cx, Reference<TransactionLogInfo>(), fVersion, begin, end, limits, Promise<std::pair<Key, Key>>(), true, reverse, info, tags);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DatabaseContext::debugUseTags = true;
|
bool DatabaseContext::debugUseTags = false;
|
||||||
const std::vector<std::string> DatabaseContext::debugTransactionTagChoices = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t" };
|
const std::vector<std::string> DatabaseContext::debugTransactionTagChoices = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t" };
|
||||||
|
|
||||||
void debugAddTags(Transaction *tr) {
|
void debugAddTags(Transaction *tr) {
|
||||||
|
|
|
@ -144,6 +144,13 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
||||||
},
|
},
|
||||||
"commit_latency_bands":{
|
"commit_latency_bands":{
|
||||||
"$map": 1
|
"$map": 1
|
||||||
|
},
|
||||||
|
"busiest_read_tag":{
|
||||||
|
"tag": "",
|
||||||
|
"fractional_cost": 0.0,
|
||||||
|
"estimated_cost":{
|
||||||
|
"hz": 0.0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
@ -295,8 +302,14 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
||||||
"transactions_per_second_limit":0,
|
"transactions_per_second_limit":0,
|
||||||
"batch_released_transactions_per_second":0,
|
"batch_released_transactions_per_second":0,
|
||||||
"released_transactions_per_second":0,
|
"released_transactions_per_second":0,
|
||||||
"batch_tags_throttled":0,
|
"throttled_tags":{
|
||||||
"tags_throttled":0,
|
"auto":{
|
||||||
|
"count":0
|
||||||
|
},
|
||||||
|
"manual":{
|
||||||
|
"count":0
|
||||||
|
}
|
||||||
|
},
|
||||||
"limiting_queue_bytes_storage_server":0,
|
"limiting_queue_bytes_storage_server":0,
|
||||||
"worst_queue_bytes_storage_server":0,
|
"worst_queue_bytes_storage_server":0,
|
||||||
"limiting_version_lag_storage_server":0,
|
"limiting_version_lag_storage_server":0,
|
||||||
|
|
|
@ -439,7 +439,12 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t manualThrottleCount() const {
|
int64_t manualThrottleCount() const {
|
||||||
return autoThrottledTags.size();
|
int64_t count = 0;
|
||||||
|
for(auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) {
|
||||||
|
count += itr->second.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
|
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
|
||||||
|
|
|
@ -467,6 +467,29 @@ struct RolesInfo {
|
||||||
obj["data_lag"] = getLagObject(versionLag);
|
obj["data_lag"] = getLagObject(versionLag);
|
||||||
obj["durability_lag"] = getLagObject(version - durableVersion);
|
obj["durability_lag"] = getLagObject(version - durableVersion);
|
||||||
|
|
||||||
|
TraceEventFields const& busiestReadTag = metrics.at("BusiestReadTag");
|
||||||
|
if(busiestReadTag.size()) {
|
||||||
|
int64_t tagCost = busiestReadTag.getInt64("TagCost");
|
||||||
|
|
||||||
|
if(tagCost > 0) {
|
||||||
|
JsonBuilderObject busiestReadTagObj;
|
||||||
|
|
||||||
|
int64_t totalSampledCost = busiestReadTag.getInt64("TotalSampledCost");
|
||||||
|
ASSERT(totalSampledCost > 0);
|
||||||
|
|
||||||
|
busiestReadTagObj["tag"] = busiestReadTag.getValue("Tag");
|
||||||
|
busiestReadTagObj["fractional_cost"] = (double)tagCost / totalSampledCost;
|
||||||
|
|
||||||
|
double elapsed = busiestReadTag.getDouble("Elapsed");
|
||||||
|
if(CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
||||||
|
JsonBuilderObject estimatedCostObj;
|
||||||
|
estimatedCostObj["hz"] = tagCost / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
|
||||||
|
busiestReadTagObj["estimated_cost"] = estimatedCostObj;
|
||||||
|
}
|
||||||
|
|
||||||
|
obj["busiest_read_tag"] = busiestReadTagObj;
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
if(e.code() != error_code_attribute_not_found)
|
if(e.code() != error_code_attribute_not_found)
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -1526,7 +1549,7 @@ static Future<vector<std::pair<iface, EventMap>>> getServerMetrics(vector<iface>
|
||||||
ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
|
ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
|
||||||
vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
|
vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
|
||||||
vector<std::pair<StorageServerInterface, EventMap>> results = wait(
|
vector<std::pair<StorageServerInterface, EventMap>> results = wait(
|
||||||
getServerMetrics(servers, address_workers, std::vector<std::string>{ "StorageMetrics", "ReadLatencyMetrics" }));
|
getServerMetrics(servers, address_workers, std::vector<std::string>{ "StorageMetrics", "ReadLatencyMetrics", "BusiestReadTag" }));
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
@ -1689,8 +1712,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
||||||
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
|
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
|
||||||
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
|
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
|
||||||
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
|
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
|
||||||
int throttledTags = ratekeeper.getInt("TagsThrottled");
|
int autoThrottledTags = ratekeeper.getInt("TagsAutoThrottled");
|
||||||
int batchThrottledTags = batchRatekeeper.getInt("TagsThrottled");
|
int manualThrottledTags = ratekeeper.getInt("TagsManuallyThrottled");
|
||||||
int ssCount = ratekeeper.getInt("StorageServers");
|
int ssCount = ratekeeper.getInt("StorageServers");
|
||||||
int tlogCount = ratekeeper.getInt("TLogs");
|
int tlogCount = ratekeeper.getInt("TLogs");
|
||||||
int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
|
int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
|
||||||
|
@ -1721,8 +1744,17 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
||||||
(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
|
(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
|
||||||
(*qos)["released_transactions_per_second"] = transPerSec;
|
(*qos)["released_transactions_per_second"] = transPerSec;
|
||||||
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
|
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
|
||||||
(*qos)["tags_throttled"] = throttledTags;
|
|
||||||
(*qos)["batch_tags_throttled"] = batchThrottledTags;
|
JsonBuilderObject throttledTagsObj;
|
||||||
|
JsonBuilderObject autoThrottledTagsObj;
|
||||||
|
autoThrottledTagsObj["count"] = autoThrottledTags;
|
||||||
|
throttledTagsObj["auto"] = autoThrottledTagsObj;
|
||||||
|
|
||||||
|
JsonBuilderObject manualThrottledTagsObj;
|
||||||
|
manualThrottledTagsObj["count"] = manualThrottledTags;
|
||||||
|
throttledTagsObj["manual"] = manualThrottledTagsObj;
|
||||||
|
|
||||||
|
(*qos)["throttled_tags"] = throttledTagsObj;
|
||||||
|
|
||||||
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
|
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
|
||||||
if(!perfLimit.empty()) {
|
if(!perfLimit.empty()) {
|
||||||
|
|
|
@ -453,12 +453,11 @@ public:
|
||||||
struct TransactionTagCounter {
|
struct TransactionTagCounter {
|
||||||
struct TagInfo {
|
struct TagInfo {
|
||||||
TransactionTag tag;
|
TransactionTag tag;
|
||||||
int64_t count;
|
double rate;
|
||||||
double fractionalBusyness;
|
double fractionalBusyness;
|
||||||
double elapsed;
|
|
||||||
|
|
||||||
TagInfo(TransactionTag const& tag, int64_t count, int64_t totalCount, double elapsed)
|
TagInfo(TransactionTag const& tag, double rate, double fractionalBusyness)
|
||||||
: tag(tag), count(count), fractionalBusyness((double)count/totalCount), elapsed(elapsed) {}
|
: tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
TransactionTagMap<int64_t> intervalCounts;
|
TransactionTagMap<int64_t> intervalCounts;
|
||||||
|
@ -492,20 +491,21 @@ public:
|
||||||
|
|
||||||
void startNewInterval(UID id) {
|
void startNewInterval(UID id) {
|
||||||
double elapsed = now() - intervalStart;
|
double elapsed = now() - intervalStart;
|
||||||
if(intervalStart > 0 && busiestTagCount >= SERVER_KNOBS->MIN_TAG_PAGES_READ_RATE * elapsed) {
|
previousBusiestTag.reset();
|
||||||
previousBusiestTag = TagInfo(busiestTag, busiestTagCount, intervalTotalSampledCount, elapsed);
|
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
||||||
}
|
double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
|
||||||
else {
|
if(rate > SERVER_KNOBS->MIN_TAG_PAGES_READ_RATE) {
|
||||||
previousBusiestTag.reset();
|
previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: report in status
|
TraceEvent("BusiestReadTag", id)
|
||||||
TraceEvent("BusiestReadTag", id)
|
.detail("Elapsed", elapsed)
|
||||||
.detail("Elapsed", elapsed)
|
.detail("Tag", printable(busiestTag))
|
||||||
.detail("Tag", printable(busiestTag))
|
.detail("TagCost", busiestTagCount)
|
||||||
.detail("TagCount", busiestTagCount)
|
.detail("TotalSampledCost", intervalTotalSampledCount)
|
||||||
.detail("TotalSampledCount", intervalTotalSampledCount)
|
.detail("Reported", previousBusiestTag.present())
|
||||||
.detail("Reported", previousBusiestTag.present());
|
.trackLatest(id.toString() + "/BusiestReadTag");
|
||||||
|
}
|
||||||
|
|
||||||
intervalCounts.clear();
|
intervalCounts.clear();
|
||||||
intervalTotalSampledCount = 0;
|
intervalTotalSampledCount = 0;
|
||||||
|
@ -1655,7 +1655,7 @@ void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const&
|
||||||
Optional<StorageServer::TransactionTagCounter::TagInfo> busiestTag = self->transactionTagCounter.getBusiestTag();
|
Optional<StorageServer::TransactionTagCounter::TagInfo> busiestTag = self->transactionTagCounter.getBusiestTag();
|
||||||
reply.busiestTag = busiestTag.map<TransactionTag>([](StorageServer::TransactionTagCounter::TagInfo tagInfo) { return tagInfo.tag; });
|
reply.busiestTag = busiestTag.map<TransactionTag>([](StorageServer::TransactionTagCounter::TagInfo tagInfo) { return tagInfo.tag; });
|
||||||
reply.busiestTagFractionalBusyness = busiestTag.present() ? busiestTag.get().fractionalBusyness : 0.0;
|
reply.busiestTagFractionalBusyness = busiestTag.present() ? busiestTag.get().fractionalBusyness : 0.0;
|
||||||
reply.busiestTagRate = busiestTag.present() ? busiestTag.get().count / busiestTag.get().elapsed : 0.0;
|
reply.busiestTagRate = busiestTag.present() ? busiestTag.get().rate : 0.0;
|
||||||
|
|
||||||
req.reply.send( reply );
|
req.reply.send( reply );
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue