ask DD for shard info

This commit is contained in:
Xiaoxi Wang 2020-07-25 04:08:12 +00:00
parent 6ef3d04fd7
commit 48a0fb5154
4 changed files with 38 additions and 11 deletions

View File

@ -5009,7 +5009,15 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
req.reply.sendError(result.getError());
} else {
GetDataDistributorMetricsReply rep;
rep.storageMetricsList = result.get();
if(!req.meanOnly) {
rep.storageMetricsList = result.get();
}
else {
auto& metricVec = result.get();
double mean = 0.0;
for (auto it = metricVec.cbegin(); it != metricVec.cend(); ++it) mean += it->shardBytes;
rep.meanShardSize = mean / metricVec.size();
}
req.reply.send(rep);
}
}

View File

@ -70,12 +70,13 @@ struct HaltDataDistributorRequest {
struct GetDataDistributorMetricsReply {
constexpr static FileIdentifier file_identifier = 1284337;
Standalone<VectorRef<DDMetricsRef>> storageMetricsList;
double meanShardSize = -1.0;
GetDataDistributorMetricsReply() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar,storageMetricsList);
serializer(ar,storageMetricsList, meanShardSize);
}
};
@ -84,13 +85,15 @@ struct GetDataDistributorMetricsRequest {
KeyRange keys;
int shardLimit;
ReplyPromise<struct GetDataDistributorMetricsReply> reply;
bool meanOnly = false;
GetDataDistributorMetricsRequest() {}
explicit GetDataDistributorMetricsRequest(KeyRange const& keys, const int shardLimit) : keys(keys), shardLimit(shardLimit) {}
explicit GetDataDistributorMetricsRequest(KeyRange const& keys, const int shardLimit, bool avgOnly = false)
: keys(keys), shardLimit(shardLimit), meanOnly(avgOnly) {}
template<class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, shardLimit, reply);
serializer(ar, keys, shardLimit, reply, meanOnly);
}
};

View File

@ -262,12 +262,6 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
TransactionTagMap<uint64_t> tagCounts;
for(auto itr : *throttledTags) {
for(auto priorityThrottles : itr.second) {
tagCounts[priorityThrottles.first] = (*transactionTagCounter)[priorityThrottles.first];
}
}
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(
GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter,
*transactionTagCommitCostEst, detailed)));

View File

@ -538,6 +538,7 @@ struct RatekeeperData {
std::map<UID, ProxyInfo> proxyInfo;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
Smoother smoothMeanShardSize;
HealthMetrics healthMetrics;
DatabaseConfiguration configuration;
PromiseStream<Future<Void>> addActor;
@ -564,6 +565,7 @@ struct RatekeeperData {
: id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothMeanShardSize(SERVER_KNOBS->SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()),
throttledTagChangeId(0),
normalLimits(TransactionPriority::DEFAULT, "", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
@ -840,6 +842,24 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
}
}
ACTOR Future<Void> monitorDDMetricsChanges(RatekeeperData *self, Reference<AsyncVar<ServerDBInfo>> db) {
state bool isFirstRep = true;
loop {
wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY));
if(!db->get().distributor.present()) continue;
ErrorOr<GetDataDistributorMetricsReply> reply = wait(errorOr(
db->get().distributor.get().dataDistributorMetrics.getReply( GetDataDistributorMetricsRequest(normalKeys, std::numeric_limits<int>::max(), true) ) ) );
if(reply.isError()) continue;
if(isFirstRep) {
self->smoothMeanShardSize.reset(reply.get().meanShardSize);
isFirstRep = false;
}
else self->smoothMeanShardSize.setTotal(reply.get().meanShardSize);
};
}
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST) {
TEST(true); // Transaction tag auto-throttled
@ -994,8 +1014,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
break;
}
// Calculate limited durability lag
int64_t limitingDurabilityLag = 0;
std::set<Optional<Standalone<StringRef>>> ignoredDurabilityLagMachines;
for (auto ss = storageDurabilityLagReverseIndex.begin(); ss != storageDurabilityLagReverseIndex.end(); ++ss) {
if (ignoredDurabilityLagMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
@ -1244,6 +1264,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
self.addActor.send( traceRole(Role::RATEKEEPER, rkInterf.id()) );
self.addActor.send(monitorThrottlingChanges(&self));
self.addActor.send(monitorDDMetricsChanges(&self, dbInfo));
self.addActor.send(self.expiredTagThrottleCleanup);
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()).detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));