Merge pull request #3696 from sfc-gh-xwang/tag-report
report busiest write tag of each storage server
This commit is contained in:
commit
62dd1f7234
|
@ -191,6 +191,13 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"estimated_cost":{
|
||||
"hz": 0.0
|
||||
}
|
||||
},
|
||||
"busiest_write_tag":{
|
||||
"tag": "",
|
||||
"fractional_cost": 0.0,
|
||||
"estimated_cost":{
|
||||
"hz": 0.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
|
@ -346,15 +353,11 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"auto" : {
|
||||
"busy_read" : 0,
|
||||
"busy_write" : 0,
|
||||
"count" : 0
|
||||
"count" : 0,
|
||||
"recommended_only": 0
|
||||
},
|
||||
"manual" : {
|
||||
"count" : 0
|
||||
},
|
||||
"recommend" : {
|
||||
"busy_read" : 0,
|
||||
"busy_write" : 0,
|
||||
"count" : 0
|
||||
}
|
||||
},
|
||||
"limiting_queue_bytes_storage_server":0,
|
||||
|
|
|
@ -887,14 +887,13 @@ Future<Void> refreshStorageServerCommitCost(RatekeeperData* self) {
|
|||
}
|
||||
|
||||
TraceEvent("BusiestWriteTag", it->key)
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("Tag", printable(busiestTag))
|
||||
.detail("TagOps", maxCost.getOpsSum())
|
||||
.detail("TagCosts", maxCost.getCostSum())
|
||||
.detail("TagRate", maxRate)
|
||||
.detail("TagBusyness", maxBusyness)
|
||||
.detail("Reported", it->value.busiestWriteTag.present())
|
||||
.trackLatest(it->key.toString() + "/BusiestWriteTag");
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("Tag", printable(busiestTag))
|
||||
.detail("TagOps", maxCost.getOpsSum())
|
||||
.detail("TagCost", maxCost.getCostSum())
|
||||
.detail("TotalCost", it->value.totalWriteCosts)
|
||||
.detail("Reported", it->value.busiestWriteTag.present())
|
||||
.trackLatest(it->key.toString() + "/BusiestWriteTag");
|
||||
|
||||
// reset statistics
|
||||
it->value.tagCostEst.clear();
|
||||
|
|
|
@ -511,6 +511,30 @@ struct RolesInfo {
|
|||
obj["busiest_read_tag"] = busiestReadTagObj;
|
||||
}
|
||||
}
|
||||
|
||||
TraceEventFields const& busiestWriteTag = metrics.at("BusiestWriteTag");
|
||||
if(busiestWriteTag.size()) {
|
||||
int64_t tagCost = busiestWriteTag.getInt64("TagCost");
|
||||
|
||||
if(tagCost > 0) {
|
||||
JsonBuilderObject busiestWriteTagObj;
|
||||
|
||||
int64_t totalCost = busiestWriteTag.getInt64("TotalCost");
|
||||
ASSERT(totalCost > 0);
|
||||
|
||||
busiestWriteTagObj["tag"] = busiestWriteTag.getValue("Tag");
|
||||
busiestWriteTagObj["fractional_cost"] = (double)tagCost / totalCost;
|
||||
|
||||
double elapsed = busiestWriteTag.getDouble("Elapsed");
|
||||
if(elapsed > 0) {
|
||||
JsonBuilderObject estimatedCostObj;
|
||||
estimatedCostObj["hz"] = tagCost / elapsed;
|
||||
busiestWriteTagObj["estimated_cost"] = estimatedCostObj;
|
||||
}
|
||||
|
||||
obj["busiest_write_tag"] = busiestWriteTagObj;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if(e.code() != error_code_attribute_not_found)
|
||||
throw e;
|
||||
|
@ -1605,10 +1629,34 @@ static Future<vector<std::pair<iface, EventMap>>> getServerMetrics(vector<iface>
|
|||
return results;
|
||||
}
|
||||
|
||||
ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
|
||||
vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
|
||||
vector<std::pair<StorageServerInterface, EventMap>> results = wait(
|
||||
getServerMetrics(servers, address_workers, std::vector<std::string>{ "StorageMetrics", "ReadLatencyMetrics", "ReadLatencyBands", "BusiestReadTag" }));
|
||||
ACTOR template <class iface>
|
||||
static Future<vector<TraceEventFields>> getServerBusiestWriteTags(vector<iface> servers, std::unordered_map<NetworkAddress, WorkerInterface> address_workers, WorkerDetails rkWorker) {
|
||||
state vector<Future<Optional<TraceEventFields>>> futures;
|
||||
for (const auto& s : servers) {
|
||||
futures.push_back(latestEventOnWorker(rkWorker.interf, s.id().toString() + "/BusiestWriteTag"));
|
||||
}
|
||||
wait(waitForAll(futures));
|
||||
|
||||
vector<TraceEventFields> result(servers.size());
|
||||
for(int i = 0; i < servers.size(); ++ i) {
|
||||
if(futures[i].get().present()) {
|
||||
result[i] = futures[i].get().get();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers, WorkerDetails rkWorker) {
|
||||
state vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
|
||||
state vector<std::pair<StorageServerInterface, EventMap>> results;
|
||||
state vector<TraceEventFields> busiestWriteTags;
|
||||
wait(store(results, getServerMetrics(servers, address_workers,std::vector<std::string>{ "StorageMetrics", "ReadLatencyMetrics","ReadLatencyBands", "BusiestReadTag" }))
|
||||
&& store(busiestWriteTags, getServerBusiestWriteTags(servers, address_workers, rkWorker)));
|
||||
|
||||
ASSERT(busiestWriteTags.size() == results.size());
|
||||
for(int i = 0; i < busiestWriteTags.size(); ++ i) {
|
||||
results[i].second.emplace("BusiestWriteTag", busiestWriteTags[i]);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
@ -1823,28 +1871,18 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
|
||||
|
||||
JsonBuilderObject throttledTagsObj;
|
||||
JsonBuilderObject autoThrottledTagsObj, recommendThrottleTagsObj;
|
||||
JsonBuilderObject autoThrottledTagsObj;
|
||||
autoThrottledTagsObj["count"] = autoThrottledTags;
|
||||
autoThrottledTagsObj["busy_read"] = autoThrottledTagsBusyRead;
|
||||
autoThrottledTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
|
||||
if(autoThrottlingEnabled) {
|
||||
autoThrottledTagsObj["count"] = autoThrottledTags;
|
||||
autoThrottledTagsObj["busy_read"] = autoThrottledTagsBusyRead;
|
||||
autoThrottledTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
|
||||
|
||||
recommendThrottleTagsObj["count"] = 0;
|
||||
recommendThrottleTagsObj["busy_read"] = 0;
|
||||
recommendThrottleTagsObj["busy_write"] = 0;
|
||||
autoThrottledTagsObj["recommended_only"] = 0;
|
||||
}
|
||||
else {
|
||||
recommendThrottleTagsObj["count"] = autoThrottledTags;
|
||||
recommendThrottleTagsObj["busy_read"] = autoThrottledTagsBusyRead;
|
||||
recommendThrottleTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
|
||||
|
||||
autoThrottledTagsObj["count"] = 0;
|
||||
autoThrottledTagsObj["busy_read"] = 0;
|
||||
autoThrottledTagsObj["busy_write"] = 0;
|
||||
autoThrottledTagsObj["recommended_only"] = 1;
|
||||
}
|
||||
|
||||
throttledTagsObj["auto"] = autoThrottledTagsObj;
|
||||
throttledTagsObj["recommend"] = recommendThrottleTagsObj;
|
||||
|
||||
JsonBuilderObject manualThrottledTagsObj;
|
||||
manualThrottledTagsObj["count"] = manualThrottledTags;
|
||||
|
@ -2464,7 +2502,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
address_workers[worker.interf.address()] = worker.interf;
|
||||
}
|
||||
|
||||
state Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers));
|
||||
state Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers, rkWorker));
|
||||
state Future<ErrorOr<vector<std::pair<TLogInterface, EventMap>>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers));
|
||||
state Future<ErrorOr<vector<std::pair<MasterProxyInterface, EventMap>>>> proxyFuture = errorOr(getProxiesAndMetrics(db, address_workers));
|
||||
state Future<ErrorOr<vector<std::pair<GrvProxyInterface, EventMap>>>> grvProxyFuture = errorOr(getGrvProxiesAndMetrics(db, address_workers));
|
||||
|
|
|
@ -26,13 +26,17 @@
|
|||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct TagThrottleApiWorkload : TestWorkload {
|
||||
bool autoThrottleEnabled;
|
||||
double testDuration;
|
||||
|
||||
constexpr static const char* NAME = "TagThrottleApi";
|
||||
|
||||
TagThrottleApiWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
testDuration = getOption( options, LiteralStringRef("testDuration"), 10.0 );
|
||||
autoThrottleEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
|
||||
}
|
||||
|
||||
virtual std::string description() { return "TagThrottleApi"; }
|
||||
virtual std::string description() { return TagThrottleApiWorkload::NAME; }
|
||||
|
||||
virtual Future<Void> setup(Database const& cx) {
|
||||
DatabaseContext::debugUseTags = true;
|
||||
|
@ -76,7 +80,7 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
tagSet.addTag(tag);
|
||||
|
||||
try {
|
||||
wait(ThrottleApi::throttleTags(cx, tagSet, rate, duration, TagThrottleType::MANUAL, priority));
|
||||
wait(ThrottleApi::throttleTags(cx, tagSet, rate, duration, TagThrottleType::MANUAL, priority, Optional<double>(), TagThrottledReason::MANUAL));
|
||||
}
|
||||
catch(Error &e) {
|
||||
state Error err = e;
|
||||
|
@ -88,7 +92,9 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
throw err;
|
||||
}
|
||||
|
||||
manuallyThrottledTags->insert_or_assign(std::make_pair(tag, priority), TagThrottleInfo(tag, TagThrottleType::MANUAL, priority, rate, now() + duration, duration));
|
||||
manuallyThrottledTags->insert_or_assign(
|
||||
std::make_pair(tag, priority), TagThrottleInfo(tag, TagThrottleType::MANUAL, priority, rate,
|
||||
now() + duration, duration, TagThrottledReason::MANUAL));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -127,13 +133,16 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getTags(Database cx, std::map<std::pair<TransactionTag, TransactionPriority>, TagThrottleInfo> const* manuallyThrottledTags) {
|
||||
ACTOR Future<Void> getTags(TagThrottleApiWorkload* self, Database cx, std::map<std::pair<TransactionTag, TransactionPriority>, TagThrottleInfo> const* manuallyThrottledTags) {
|
||||
std::vector<TagThrottleInfo> tags = wait(ThrottleApi::getThrottledTags(cx, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
int manualThrottledTags = 0;
|
||||
int activeAutoThrottledTags = 0;
|
||||
for(auto &tag : tags) {
|
||||
if(tag.throttleType == TagThrottleType::MANUAL) {
|
||||
if(tag.throttleType == TagThrottleType::AUTO) {
|
||||
ASSERT(self->autoThrottleEnabled);
|
||||
}
|
||||
else if(tag.throttleType == TagThrottleType::MANUAL) {
|
||||
ASSERT(manuallyThrottledTags->find(std::make_pair(tag.tag, tag.priority)) != manuallyThrottledTags->end());
|
||||
++manualThrottledTags;
|
||||
}
|
||||
|
@ -158,6 +167,15 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getRecommendedTags(TagThrottleApiWorkload* self, Database cx) {
|
||||
std::vector<TagThrottleInfo> tags = wait(ThrottleApi::getRecommendedTags(cx, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
for(auto& tag : tags) {
|
||||
ASSERT(tag.throttleType == TagThrottleType::AUTO);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> unthrottleTagGroup(Database cx, std::map<std::pair<TransactionTag, TransactionPriority>, TagThrottleInfo> *manuallyThrottledTags) {
|
||||
state Optional<TagThrottleType> throttleType = TagThrottleApiWorkload::randomTagThrottleType();
|
||||
state Optional<TransactionPriority> priority = deterministicRandom()->coinflip() ? Optional<TransactionPriority>() : deterministicRandom()->randomChoice(allTransactionPriorities);
|
||||
|
@ -190,15 +208,17 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> enableAutoThrottling(Database cx) {
|
||||
ACTOR Future<Void> enableAutoThrottling(TagThrottleApiWorkload* self, Database cx) {
|
||||
if(deterministicRandom()->coinflip()) {
|
||||
wait(ThrottleApi::enableAuto(cx, true));
|
||||
self->autoThrottleEnabled = true;
|
||||
if(deterministicRandom()->coinflip()) {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAll(cx, TagThrottleType::AUTO, Optional<TransactionPriority>()));
|
||||
}
|
||||
}
|
||||
else {
|
||||
wait(ThrottleApi::enableAuto(cx, false));
|
||||
self->autoThrottleEnabled = false;
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -210,7 +230,7 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
double delayTime = deterministicRandom()->random01() * 5;
|
||||
wait(delay(delayTime));
|
||||
|
||||
state int action = deterministicRandom()->randomInt(0, 5);
|
||||
state int action = deterministicRandom()->randomInt(0, 6);
|
||||
|
||||
if(action == 0) {
|
||||
wait(self->throttleTag(cx, &manuallyThrottledTags));
|
||||
|
@ -219,16 +239,19 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
wait(self->unthrottleTag(cx, &manuallyThrottledTags));
|
||||
}
|
||||
else if(action == 2) {
|
||||
wait(self->getTags(cx, &manuallyThrottledTags));
|
||||
wait(self->getTags(self, cx, &manuallyThrottledTags));
|
||||
}
|
||||
else if(action == 3) {
|
||||
wait(self->unthrottleTagGroup(cx, &manuallyThrottledTags));
|
||||
}
|
||||
else if(action == 4) {
|
||||
wait(self->enableAutoThrottling(cx));
|
||||
wait(self->enableAutoThrottling(self, cx));
|
||||
}
|
||||
else if(action == 5) {
|
||||
wait(self->getRecommendedTags(self, cx));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<TagThrottleApiWorkload> TagThrottleApiWorkloadFactory("TagThrottleApi");
|
||||
WorkloadFactory<TagThrottleApiWorkload> TagThrottleApiWorkloadFactory(TagThrottleApiWorkload::NAME);
|
||||
|
|
Loading…
Reference in New Issue