From 599675cba8eeba908a249c93549238388ef85c73 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Wed, 19 Aug 2020 04:23:23 +0000 Subject: [PATCH] modify some details to get better performance --- fdbclient/NativeAPI.actor.cpp | 14 ++++++-------- fdbserver/DataDistribution.actor.cpp | 19 ++++++++++--------- fdbserver/MasterProxyServer.actor.cpp | 4 ++-- fdbserver/ProxyCommitData.actor.h | 21 ++++++--------------- fdbserver/Ratekeeper.actor.cpp | 2 +- fdbserver/RatekeeperInterface.h | 22 +++++++--------------- 6 files changed, 32 insertions(+), 50 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 511d18ab06..d72ef8a6ba 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -3305,7 +3305,7 @@ void Transaction::setupWatches() { ACTOR Future> estimateCommitCosts(Transaction* self, CommitTransactionRef const * transaction) { state ClientTrCommitCostEstimation trCommitCosts; - state KeyRange keyRange; + state KeyRangeRef keyRange; state int i = 0; for (; i < transaction->mutations.size(); ++i) { @@ -3317,7 +3317,7 @@ ACTOR Future> estimateCommitCosts(Transac } else if (it->type == MutationRef::Type::ClearRange) { trCommitCosts.opsCount++; - keyRange = KeyRange(KeyRangeRef(it->param1, it->param2)); + keyRange = KeyRangeRef(it->param1, it->param2); if (self->options.expensiveClearCostEstimation) { StorageMetrics m = wait(self->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY)); trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes)); @@ -3329,15 +3329,14 @@ ACTOR Future> estimateCommitCosts(Transac &StorageServerInterface::getShardState, self->info)); if (locations.empty()) continue; - if(deterministicRandom()->random01() < 0.01) - TraceEvent("NAPIAvgShardSizex100").detail("SmoothTotal", (uint64_t)self->getDatabase()->smoothMidShardSize.smoothTotal()); - uint64_t bytes = 0; - if (locations.size() == 1) + if (locations.size() == 1) { bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS; - else + } + else { // small clear on the boundary will hit two shards but be much smaller than the shard size bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS * 2 + (locations.size() - 2) * (int64_t)self->getDatabase()->smoothMidShardSize.smoothTotal(); + } trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(bytes)); trCommitCosts.writeCosts += getWriteOperationCost(bytes); @@ -3388,7 +3387,6 @@ ACTOR static Future tryCommit( Database cx, Reference store(req.commitCostEstimation, estimateCommitCosts(tr, &req.transaction))); if (!req.commitCostEstimation.present()) req.tagSet.reset(); } else { - req.tagSet.reset(); wait(store(req.transaction.read_snapshot, readVersion)); } diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index bc3341c665..bfdf3264af 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -512,10 +512,10 @@ ACTOR Future> getInitialDataDistribution( Dat beginKey = keyServers.end()[-1].key; break; } catch (Error& e) { - ASSERT(!succeeded); //We shouldn't be retrying if we have already started modifying result in this loop TraceEvent("GetInitialTeamsKeyServersRetry", distributorId).error(e); wait( tr.onError(e) ); + ASSERT(!succeeded); //We shouldn't be retrying if we have already started modifying result in this loop } } @@ -4975,6 +4975,14 @@ ACTOR Future cacheServerWatcher(Database* db) { } } +static int64_t getMedianShardSize(VectorRef metricVec) { + std::nth_element(metricVec.begin(), metricVec.begin() + metricVec.size() / 2, + metricVec.end(), [](const DDMetricsRef& d1, const DDMetricsRef& d2) { + return d1.shardBytes < d2.shardBytes; + }); + return metricVec[metricVec.size() / 2].shardBytes; +} + ACTOR Future dataDistributor(DataDistributorInterface di, Reference> db ) { state Reference self( new DataDistributorData(db, di.id()) ); state Future collection = actorCollection( self->addActor.getFuture() ); @@ -5014,14 +5022,7 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference shardSizes(metricVec.size()); - for (int i = 0; i < shardSizes.size(); ++ i) - { - shardSizes[i] = metricVec[i].shardBytes; - // TraceEvent("DDMetricsReply").detail("Value", metricVec[i].shardBytes).detail("BeginKey", metricVec[i].beginKey); - } - std::nth_element(shardSizes.begin(), shardSizes.begin() + shardSizes.size()/2, shardSizes.end()); - rep.midShardSize = shardSizes[shardSizes.size()/2]; + rep.midShardSize = getMedianShardSize(metricVec.contents()); } } req.reply.send(rep); diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index ee4adb6bcf..d489fa6af1 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -1058,8 +1058,8 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if (!(self->committed[self->transactionNum] == ConflictBatch::TransactionCommitted && (!self->locked || trs[self->transactionNum].isLockAware()))) { continue; } - ASSERT(trs[self->transactionNum].commitCostEstimation.present() == trs[self->transactionNum].tagSet.present()); - state bool checkSample = trs[self->transactionNum].tagSet.present(); + + state bool checkSample = trs[self->transactionNum].commitCostEstimation.present(); state Optional* trCost = &trs[self->transactionNum].commitCostEstimation; state int mutationNum = 0; state VectorRef* pMutations = &trs[self->transactionNum].transaction.mutations; diff --git a/fdbserver/ProxyCommitData.actor.h b/fdbserver/ProxyCommitData.actor.h index 968ca8bc82..d2df77dcb9 100644 --- a/fdbserver/ProxyCommitData.actor.h +++ b/fdbserver/ProxyCommitData.actor.h @@ -244,22 +244,13 @@ struct ProxyCommitData { } void updateSSTagCost(const UID& id, const TagSet& tagSet, MutationRef m, int cost){ - if(ssTagCommitCost.count(id) == 0) { - ssTagCommitCost[id] = TransactionTagMap(); - } + auto [it, _] = ssTagCommitCost.try_emplace(id, TransactionTagMap()); + for(auto& tag: tagSet) { - auto& costItem = ssTagCommitCost[id][tag]; - if(m.isAtomicOp()) { - costItem.numAtomicWrite ++; - costItem.costAtomicWrite += cost; - } - else if (m.type == MutationRef::Type::SetValue){ - costItem.numWrite ++; - costItem.costWrite += cost; - } - else if (m.type == MutationRef::Type::ClearRange){ - costItem.numClear ++; - costItem.costClearEst += cost; + auto& costItem = it->second[tag]; + if(m.isAtomicOp() || m.type == MutationRef::Type::SetValue || m.type == MutationRef::Type::ClearRange) { + costItem.opsSum ++; + costItem.costSum += cost; } } } diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index baabdeaf9f..b84ea3bf84 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -917,7 +917,7 @@ void tryAutoThrottleTag(RatekeeperData* self, StorageQueueInfo& ss, int64_t stor // tryAutoThrottleTag(self, ss.busiestWriteTag.get(), ss.busiestWriteTagRate, ss.busiestWriteTagFractionalBusyness); // } else if (ss.busiestReadTag.present() && - (SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || + (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) { // read saturated tryAutoThrottleTag(self, ss.busiestReadTag.get(), ss.busiestReadTagRate, ss.busiestReadTagFractionalBusyness); diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index 3e8e014085..76d235f6c7 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -76,28 +76,20 @@ struct ClientTagThrottleLimits { }; struct TransactionCommitCostEstimation { - int numWrite = 0; - int numAtomicWrite = 0; - int numClear = 0; - uint64_t costWrite = 0; - uint64_t costAtomicWrite = 0; - uint64_t costClearEst = 0; + int opsSum = 0; + uint64_t costSum = 0; - uint64_t getCostSum() const { return costClearEst + costAtomicWrite + costWrite; } - int getOpsSum() const { return numWrite + numAtomicWrite + numClear; } + uint64_t getCostSum() const { return costSum; } + int getOpsSum() const { return opsSum; } template void serialize(Ar& ar) { - serializer(ar, costWrite, costClearEst, costAtomicWrite, numWrite, numAtomicWrite, numClear); + serializer(ar, opsSum, costSum); } TransactionCommitCostEstimation& operator+=(const TransactionCommitCostEstimation& other) { - numWrite += other.numWrite; - numAtomicWrite += other.numAtomicWrite; - numClear += other.numClear; - costWrite += other.costWrite; - costAtomicWrite += other.numAtomicWrite; - costClearEst += other.costClearEst; + opsSum += other.opsSum; + costSum += other.costSum; return *this; } };