modify some details to get better performance

This commit is contained in:
Xiaoxi Wang 2020-08-19 04:23:23 +00:00
parent 69914d4909
commit 599675cba8
6 changed files with 32 additions and 50 deletions

View File

@ -3305,7 +3305,7 @@ void Transaction::setupWatches() {
ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transaction* self,
CommitTransactionRef const * transaction) {
state ClientTrCommitCostEstimation trCommitCosts;
state KeyRange keyRange;
state KeyRangeRef keyRange;
state int i = 0;
for (; i < transaction->mutations.size(); ++i) {
@ -3317,7 +3317,7 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transac
}
else if (it->type == MutationRef::Type::ClearRange) {
trCommitCosts.opsCount++;
keyRange = KeyRange(KeyRangeRef(it->param1, it->param2));
keyRange = KeyRangeRef(it->param1, it->param2);
if (self->options.expensiveClearCostEstimation) {
StorageMetrics m = wait(self->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY));
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes));
@ -3329,15 +3329,14 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transac
&StorageServerInterface::getShardState, self->info));
if (locations.empty()) continue;
if(deterministicRandom()->random01() < 0.01)
TraceEvent("NAPIAvgShardSizex100").detail("SmoothTotal", (uint64_t)self->getDatabase()->smoothMidShardSize.smoothTotal());
uint64_t bytes = 0;
if (locations.size() == 1)
if (locations.size() == 1) {
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS;
else
}
else { // small clear on the boundary will hit two shards but be much smaller than the shard size
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS * 2 +
(locations.size() - 2) * (int64_t)self->getDatabase()->smoothMidShardSize.smoothTotal();
}
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(bytes));
trCommitCosts.writeCosts += getWriteOperationCost(bytes);
@ -3388,7 +3387,6 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
store(req.commitCostEstimation, estimateCommitCosts(tr, &req.transaction)));
if (!req.commitCostEstimation.present()) req.tagSet.reset();
} else {
req.tagSet.reset();
wait(store(req.transaction.read_snapshot, readVersion));
}

View File

@ -512,10 +512,10 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
beginKey = keyServers.end()[-1].key;
break;
} catch (Error& e) {
ASSERT(!succeeded); //We shouldn't be retrying if we have already started modifying result in this loop
TraceEvent("GetInitialTeamsKeyServersRetry", distributorId).error(e);
wait( tr.onError(e) );
ASSERT(!succeeded); //We shouldn't be retrying if we have already started modifying result in this loop
}
}
@ -4975,6 +4975,14 @@ ACTOR Future<Void> cacheServerWatcher(Database* db) {
}
}
static int64_t getMedianShardSize(VectorRef<DDMetricsRef> metricVec) {
std::nth_element(metricVec.begin(), metricVec.begin() + metricVec.size() / 2,
metricVec.end(), [](const DDMetricsRef& d1, const DDMetricsRef& d2) {
return d1.shardBytes < d2.shardBytes;
});
return metricVec[metricVec.size() / 2].shardBytes;
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
@ -5014,14 +5022,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
auto& metricVec = result.get();
if(metricVec.empty()) rep.midShardSize = 0;
else {
std::vector<int64_t> shardSizes(metricVec.size());
for (int i = 0; i < shardSizes.size(); ++ i)
{
shardSizes[i] = metricVec[i].shardBytes;
// TraceEvent("DDMetricsReply").detail("Value", metricVec[i].shardBytes).detail("BeginKey", metricVec[i].beginKey);
}
std::nth_element(shardSizes.begin(), shardSizes.begin() + shardSizes.size()/2, shardSizes.end());
rep.midShardSize = shardSizes[shardSizes.size()/2];
rep.midShardSize = getMedianShardSize(metricVec.contents());
}
}
req.reply.send(rep);

View File

@ -1058,8 +1058,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (!(self->committed[self->transactionNum] == ConflictBatch::TransactionCommitted && (!self->locked || trs[self->transactionNum].isLockAware()))) {
continue;
}
ASSERT(trs[self->transactionNum].commitCostEstimation.present() == trs[self->transactionNum].tagSet.present());
state bool checkSample = trs[self->transactionNum].tagSet.present();
state bool checkSample = trs[self->transactionNum].commitCostEstimation.present();
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;

View File

@ -244,22 +244,13 @@ struct ProxyCommitData {
}
void updateSSTagCost(const UID& id, const TagSet& tagSet, MutationRef m, int cost){
if(ssTagCommitCost.count(id) == 0) {
ssTagCommitCost[id] = TransactionTagMap<TransactionCommitCostEstimation>();
}
auto [it, _] = ssTagCommitCost.try_emplace(id, TransactionTagMap<TransactionCommitCostEstimation>());
for(auto& tag: tagSet) {
auto& costItem = ssTagCommitCost[id][tag];
if(m.isAtomicOp()) {
costItem.numAtomicWrite ++;
costItem.costAtomicWrite += cost;
}
else if (m.type == MutationRef::Type::SetValue){
costItem.numWrite ++;
costItem.costWrite += cost;
}
else if (m.type == MutationRef::Type::ClearRange){
costItem.numClear ++;
costItem.costClearEst += cost;
auto& costItem = it->second[tag];
if(m.isAtomicOp() || m.type == MutationRef::Type::SetValue || m.type == MutationRef::Type::ClearRange) {
costItem.opsSum ++;
costItem.costSum += cost;
}
}
}

View File

@ -917,7 +917,7 @@ void tryAutoThrottleTag(RatekeeperData* self, StorageQueueInfo& ss, int64_t stor
// tryAutoThrottleTag(self, ss.busiestWriteTag.get(), ss.busiestWriteTagRate, ss.busiestWriteTagFractionalBusyness);
// } else
if (ss.busiestReadTag.present() &&
(SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
(storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
// read saturated
tryAutoThrottleTag(self, ss.busiestReadTag.get(), ss.busiestReadTagRate, ss.busiestReadTagFractionalBusyness);

View File

@ -76,28 +76,20 @@ struct ClientTagThrottleLimits {
};
struct TransactionCommitCostEstimation {
int numWrite = 0;
int numAtomicWrite = 0;
int numClear = 0;
uint64_t costWrite = 0;
uint64_t costAtomicWrite = 0;
uint64_t costClearEst = 0;
int opsSum = 0;
uint64_t costSum = 0;
uint64_t getCostSum() const { return costClearEst + costAtomicWrite + costWrite; }
int getOpsSum() const { return numWrite + numAtomicWrite + numClear; }
uint64_t getCostSum() const { return costSum; }
int getOpsSum() const { return opsSum; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, costWrite, costClearEst, costAtomicWrite, numWrite, numAtomicWrite, numClear);
serializer(ar, opsSum, costSum);
}
TransactionCommitCostEstimation& operator+=(const TransactionCommitCostEstimation& other) {
numWrite += other.numWrite;
numAtomicWrite += other.numAtomicWrite;
numClear += other.numClear;
costWrite += other.costWrite;
costAtomicWrite += other.numAtomicWrite;
costClearEst += other.costClearEst;
opsSum += other.opsSum;
costSum += other.costSum;
return *this;
}
};