change midShardSize type and other details

This commit is contained in:
Xiaoxi Wang 2020-08-12 17:49:12 +00:00
parent 0cceda9908
commit f3ecf14601
8 changed files with 24 additions and 27 deletions

View File

@ -230,7 +230,7 @@ void ClientKnobs::initialize(bool randomize) {
// transaction tags
init( MAX_TAGS_PER_TRANSACTION, 5 );
init( MAX_TRANSACTION_TAG_LENGTH, 16 );
init( COMMIT_SAMPLE_COST, 10 ); if( randomize && BUGGIFY ) COMMIT_SAMPLE_COST = 1.0;
init( COMMIT_SAMPLE_COST, 100 ); if( randomize && BUGGIFY ) COMMIT_SAMPLE_COST = 10;
init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096;
init( INCOMPLETE_SHARD_PLUS, 4096 );
init( READ_TAG_SAMPLE_RATE, 0.01 ); if( randomize && BUGGIFY ) READ_TAG_SAMPLE_RATE = 1.0; // Communicated to clients from cluster

View File

@ -191,7 +191,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
Version version;
bool locked;
Optional<Value> metadataVersion;
double midShardSize;
int64_t midShardSize = 0;
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
@ -428,7 +428,7 @@ struct GetDDMetricsRequest {
ReplyPromise<struct GetDDMetricsReply> reply;
GetDDMetricsRequest() {}
explicit GetDDMetricsRequest(KeyRange const& keys, const int shardLimit, bool midOnly = false) : keys(keys), shardLimit(shardLimit) {}
explicit GetDDMetricsRequest(KeyRange const& keys, const int shardLimit) : keys(keys), shardLimit(shardLimit) {}
template<class Ar>
void serialize(Ar& ar) {

View File

@ -2003,7 +2003,8 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, Spa
cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(span.context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
cx->smoothMidShardSize.setTotal(v.midShardSize);
if(v.midShardSize > 0)
cx->smoothMidShardSize.setTotal(v.midShardSize);
if (v.version >= version)
return v.version;
// SOMEDAY: Do the wait on the server side, possibly use less expensive source of committed version (causal consistency is not needed for this purpose)
@ -3315,7 +3316,7 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transac
} else if (it->type == MutationRef::Type::ClearRange) {
trCommitCosts.opsCount++;
if(it->clearSingleKey) {
trCommitCosts.clearIdxCosts.emplace(i, getOperationCost(it->expectedSize())); // NOTE: whether we need a weight here?
trCommitCosts.clearIdxCosts.emplace(i, getOperationCost(it->expectedSize()));
continue;
}
keyRange = KeyRange(KeyRangeRef(it->param1, it->param2));
@ -3330,13 +3331,14 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transac
if (locations.empty()) continue;
if(deterministicRandom()->random01() < 0.01)
TraceEvent("NAPIAvgShardSizex100").detail("SmoothTotal", self->getDatabase()->smoothMidShardSize.smoothTotal());
TraceEvent("NAPIAvgShardSizex100").detail("SmoothTotal", (uint64_t)self->getDatabase()->smoothMidShardSize.smoothTotal());
uint64_t bytes = 0;
if (locations.size() == 1)
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS;
else
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS * 2 + (locations.size() - 2) * self->getDatabase()->smoothMidShardSize.smoothTotal();
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS * 2 +
(locations.size() - 2) * (int64_t)self->getDatabase()->smoothMidShardSize.smoothTotal();
trCommitCosts.clearIdxCosts.emplace(i, getOperationCost(bytes));
trCommitCosts.writeCosts += getOperationCost(bytes);
}
@ -3378,7 +3380,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
commit_unknown_result()});
}
if(req.tagSet.present() && tr->options.priority < TransactionPriority::IMMEDIATE){
if(req.tagSet.present() && tr->options.priority == TransactionPriority::DEFAULT){
wait(store(req.transaction.read_snapshot, readVersion) &&
store(req.commitCostEstimation, estimateCommitCosts(tr, &req.transaction)));
if (!req.commitCostEstimation.present()) req.tagSet.reset();

View File

@ -358,7 +358,7 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
inline uint64_t getOperationCost(uint64_t bytes) {
return bytes / CLIENT_KNOBS->OPERATION_COST_BYTE_FACTOR + 1;
return bytes / std::max(1, CLIENT_KNOBS->OPERATION_COST_BYTE_FACTOR) + 1;
}
#include "flow/unactorcompiler.h"
#endif

View File

@ -5001,9 +5001,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
break;
}
when ( state GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture()) ) {
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(
errorOr(timeoutError(getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)),
SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT)));
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
if ( result.isError() ) {
req.reply.sendError(result.getError());
} else {

View File

@ -70,7 +70,7 @@ struct HaltDataDistributorRequest {
struct GetDataDistributorMetricsReply {
constexpr static FileIdentifier file_identifier = 1284337;
Standalone<VectorRef<DDMetricsRef>> storageMetricsList;
Optional<double> midShardSize;
Optional<int64_t> midShardSize;
GetDataDistributorMetricsReply() {}

View File

@ -405,7 +405,7 @@ struct ResolutionRequestBuilder {
}
};
ACTOR Future<Void> monitorDDMetricsChanges(double* midShardSize, Reference<AsyncVar<ServerDBInfo>> db) {
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo>> db) {
state Future<Void> nextRequestTimer = Never();
state Future<GetDataDistributorMetricsReply> nextReply = Never();
state KeyRange keys(normalKeys);
@ -427,10 +427,9 @@ ACTOR Future<Void> monitorDDMetricsChanges(double* midShardSize, Reference<Async
when(wait(nextRequestTimer)) {
nextRequestTimer = Never();
if(db->get().distributor.present()) {
nextReply = timeoutError(db->get().distributor.get().dataDistributorMetrics.getReply(
GetDataDistributorMetricsRequest(keys, CLIENT_KNOBS->TOO_MANY, true)),
SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT);
}
nextReply = db->get().distributor.get().dataDistributorMetrics.getReply(
GetDataDistributorMetricsRequest(keys, CLIENT_KNOBS->TOO_MANY, true));
} else nextReply = Never();
}
when(GetDataDistributorMetricsReply reply = wait(nextReply)) {
nextReply = Never();
@ -1600,7 +1599,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, Pro
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests,
ProxyStats* stats, Version minKnownCommittedVersion,
PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags, double midShardSize = 0) {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags, int64_t midShardSize = 0) {
GetReadVersionReply _reply = wait(replyFuture);
GetReadVersionReply reply = _reply;
Version replyVersion = reply.version;
@ -1682,7 +1681,7 @@ ACTOR static Future<Void> transactionStarter(
state PromiseStream<double> replyTimes;
state Span span;
state double midShardSize = SERVER_KNOBS->MIN_SHARD_BYTES;
state int64_t midShardSize = SERVER_KNOBS->MIN_SHARD_BYTES;
// FIXME: There's weird RYWIterator reference problem occurs
// addActor.send(monitorDDMetricsChanges(&midShardSize, db));
@ -1970,12 +1969,10 @@ ACTOR Future<Void> ddMetricsRequestServer(MasterProxyInterface proxy, Reference<
{
loop {
choose {
when(wait(db->onChange())) { }
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture()))
{
// TraceEvent("DDMetricsRequestServer").detail("HasDistributor", db->get().distributor.present());
if(!db->get().distributor.present()) {
req.reply.sendError(dd_cancelled());
req.reply.sendError(operation_failed());
continue;
}
ErrorOr<GetDataDistributorMetricsReply> reply =

View File

@ -866,9 +866,8 @@ Future<Void> refreshStorageServerCommitCost(RatekeeperData *self) {
if(maxRate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) {
it->value.busiestWriteTag = busiestTag;
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps);
ASSERT(it->value.totalWriteCosts > 0 && it->value.totalWriteOps > 0);
maxBusyness = double(maxCost.getOpsSum() + maxCost.getCostSum()) /
(it->value.totalWriteOps + it->value.totalWriteCosts);
ASSERT(it->value.totalWriteCosts > 0);
maxBusyness = double(maxCost.getCostSum()) / it->value.totalWriteCosts;
it->value.busiestWriteTagFractionalBusyness = maxBusyness;
it->value.busiestWriteTagRate = maxRate;
}
@ -987,7 +986,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
if(limits->priority == TransactionPriority::DEFAULT) {
tryAutoThrottleTag(self, ss, storageQueue, storageDurabilityLag);
tryAutoThrottleTag(self, ss, storageQueue, storageDurabilityLag);
}
double inputRate = ss.smoothInputBytes.smoothRate();