From 1f38c2f2a431f4cc9e5a34c9ab7876e6372dc2af Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Sat, 1 Aug 2020 06:48:40 +0000 Subject: [PATCH] check clear sample on proxy --- fdbserver/MasterProxyServer.actor.cpp | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 5578f7995e..c899eb4265 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -1090,6 +1090,8 @@ ACTOR Future commitBatch( for (; transactionNum* trCost = &trs[transactionNum].commitCostEstimation; state int mutationNum = 0; state VectorRef* pMutations = &trs[transactionNum].transaction.mutations; for (; mutationNum < pMutations->size(); mutationNum++) { @@ -1112,8 +1114,8 @@ ACTOR Future commitBatch( if (isSingleKeyMutation((MutationRef::Type) m.type)) { // sample single key mutation based on byte // the expectation of sampling is every COMMIT_SAMPLE_BYTE sample once - if (trs[transactionNum].tagSet.present()) { - double totalSize = trs[transactionNum].commitCostEstimation.get().writtenBytes; + if (checkSample) { + double totalSize = trCost->get().writtenBytes; double mul = std::max(1.0, (double)mutationSize / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_BYTE)); ASSERT(totalSize > 0); double prob = mul * mutationSize / totalSize; @@ -1156,6 +1158,12 @@ ACTOR Future commitBatch( ranges.begin().value().populateTags(); toCommit.addTags(ranges.begin().value().tags); // check whether clear is sampled + if(checkSample && trCost->get().clearIdxBytes.count(mutationNum) > 0){ + for(const auto& ssInfo : ranges.begin().value().src_info) { + auto id = ssInfo->interf.id(); + self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxBytes[mutationNum]); + } + } } else { TEST(true); //A clear range extends past a shard boundary @@ -1163,6 +1171,13 @@ ACTOR Future commitBatch( for (auto r : ranges) { r.value().populateTags(); allSources.insert(r.value().tags.begin(), r.value().tags.end()); + // check whether clear is sampled + if(checkSample && trCost->get().clearIdxBytes.count(mutationNum) > 0){ + for(const auto& ssInfo : ranges.begin().value().src_info) { + auto id = ssInfo->interf.id(); + self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxBytes[mutationNum]); + } + } } DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", allSources).detail("Mutation", m); @@ -1366,14 +1381,6 @@ ACTOR Future commitBatch( if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) { ASSERT_WE_THINK(commitVersion != invalidVersion); trs[t].reply.send(CommitID(commitVersion, t, metadataVersionAfter)); - // aggregate commit cost estimation if committed -// ASSERT(trs[t].commitCostEstimation.present() == trs[t].tagSet.present()); -// if (trs[t].tagSet.present()) { -// TransactionCommitCostEstimation& costEstimation = trs[t].commitCostEstimation.get(); -// for (auto& tag : trs[t].tagSet.get()) { -// self->ssTagCommitCost[tag] += costEstimation; -// } -// } } else if (committed[t] == ConflictBatch::TransactionTooOld) { trs[t].reply.sendError(transaction_too_old());