check clear sample on proxy

This commit is contained in:
Xiaoxi Wang 2020-08-01 06:48:40 +00:00
parent 4f7dab4951
commit 1f38c2f2a4
1 changed files with 17 additions and 10 deletions

View File

@ -1090,6 +1090,8 @@ ACTOR Future<Void> commitBatch(
for (; transactionNum<trs.size(); transactionNum++) {
if (committed[transactionNum] == ConflictBatch::TransactionCommitted && (!locked || trs[transactionNum].isLockAware())) {
ASSERT(trs[transactionNum].commitCostEstimation.present() == trs[transactionNum].tagSet.present());
state bool checkSample = trs[transactionNum].tagSet.present();
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[transactionNum].commitCostEstimation;
state int mutationNum = 0;
state VectorRef<MutationRef>* pMutations = &trs[transactionNum].transaction.mutations;
for (; mutationNum < pMutations->size(); mutationNum++) {
@ -1112,8 +1114,8 @@ ACTOR Future<Void> commitBatch(
if (isSingleKeyMutation((MutationRef::Type) m.type)) {
// sample single key mutation based on byte
// the expectation of sampling is every COMMIT_SAMPLE_BYTE sample once
if (trs[transactionNum].tagSet.present()) {
double totalSize = trs[transactionNum].commitCostEstimation.get().writtenBytes;
if (checkSample) {
double totalSize = trCost->get().writtenBytes;
double mul = std::max(1.0, (double)mutationSize / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_BYTE));
ASSERT(totalSize > 0);
double prob = mul * mutationSize / totalSize;
@ -1156,6 +1158,12 @@ ACTOR Future<Void> commitBatch(
ranges.begin().value().populateTags();
toCommit.addTags(ranges.begin().value().tags);
// check whether clear is sampled
if(checkSample && trCost->get().clearIdxBytes.count(mutationNum) > 0){
for(const auto& ssInfo : ranges.begin().value().src_info) {
auto id = ssInfo->interf.id();
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxBytes[mutationNum]);
}
}
}
else {
TEST(true); //A clear range extends past a shard boundary
@ -1163,6 +1171,13 @@ ACTOR Future<Void> commitBatch(
for (auto r : ranges) {
r.value().populateTags();
allSources.insert(r.value().tags.begin(), r.value().tags.end());
// check whether clear is sampled
if(checkSample && trCost->get().clearIdxBytes.count(mutationNum) > 0){
for(const auto& ssInfo : ranges.begin().value().src_info) {
auto id = ssInfo->interf.id();
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxBytes[mutationNum]);
}
}
}
DEBUG_MUTATION("ProxyCommit", commitVersion, m).detail("Dbgid", self->dbgid).detail("To", allSources).detail("Mutation", m);
@ -1366,14 +1381,6 @@ ACTOR Future<Void> commitBatch(
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
ASSERT_WE_THINK(commitVersion != invalidVersion);
trs[t].reply.send(CommitID(commitVersion, t, metadataVersionAfter));
// aggregate commit cost estimation if committed
// ASSERT(trs[t].commitCostEstimation.present() == trs[t].tagSet.present());
// if (trs[t].tagSet.present()) {
// TransactionCommitCostEstimation& costEstimation = trs[t].commitCostEstimation.get();
// for (auto& tag : trs[t].tagSet.get()) {
// self->ssTagCommitCost[tag] += costEstimation;
// }
// }
}
else if (committed[t] == ConflictBatch::TransactionTooOld) {
trs[t].reply.sendError(transaction_too_old());