better serialize; TransactionOption::clear patch

This commit is contained in:
Xiaoxi Wang 2020-07-15 22:03:35 +00:00
parent eb44ae0e86
commit 9d0d189cc8
4 changed files with 28 additions and 34 deletions

View File

@ -3059,6 +3059,7 @@ void TransactionOptions::clear() {
tags = TagSet{};
readTags = TagSet{};
priority = TransactionPriority::DEFAULT;
expensiveClearCostEstimation = false;
}
TransactionOptions::TransactionOptions() {
@ -3274,18 +3275,14 @@ ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* s
trCommitCosts.numClear++;
keyRange = KeyRange(KeyRangeRef(it->param1, it->param2));
if (self->options.expensiveClearCostEstimation) {
try {
StorageMetrics m = wait(self->getStorageMetrics(keyRange, INT_MAX));
trCommitCosts.bytesClearEst.present() ? (trCommitCosts.bytesClearEst.get() += m.bytes)
: (trCommitCosts.bytesClearEst = m.bytes);
continue;
} catch (...) {
} // do a local estimation
StorageMetrics m = wait(self->getStorageMetrics(keyRange, INT_MAX));
trCommitCosts.bytesClearEst += m.bytes;
}
else {
std::vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
self->getDatabase(), keyRange, INT_MAX, false, &StorageServerInterface::getShardState, self->info));
trCommitCosts.numClearShards += locations.size();
}
std::vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
self->getDatabase(), keyRange, INT_MAX, false, &StorageServerInterface::getShardState, self->info));
trCommitCosts.numClearShards.present() ? (trCommitCosts.numClearShards.get() += locations.size())
: (trCommitCosts.numClearShards = locations.size());
}
}
return trCommitCosts;

View File

@ -1340,8 +1340,7 @@ ACTOR Future<Void> commitBatch(
ASSERT_WE_THINK(commitVersion != invalidVersion);
trs[t].reply.send(CommitID(commitVersion, t, metadataVersionAfter));
// aggregate commit cost estimation iff committed
ASSERT((trs[t].commitCostEstimation.present() && trs[t].tagSet.present()) ||
(!trs[t].commitCostEstimation.present() && !trs[t].tagSet.present()));
ASSERT(trs[t].commitCostEstimation.present() == trs[t].tagSet.present());
if (trs[t].tagSet.present()) {
TransactionCommitCostEstimation& costEstimation = trs[t].commitCostEstimation.get();
for (auto& tag : trs[t].tagSet.get()) {

View File

@ -1282,17 +1282,17 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
for(auto tag : req.throttledTagCounts) {
self.throttledTags.addRequests(tag.first, tag.second);
}
for (auto& tag : req.throttledTagCommitCostEst) {
for (const auto &[tagName, cost] : req.throttledTagCommitCostEst) {
TraceEvent(SevInfo, "ThrottledTagCommitCostEst")
.suppressFor(1.0)
.detail("Tag", tag.first)
.detail("NumWrite", tag.second.numWrite)
.detail("ByteWrites", tag.second.bytesWrite)
.detail("NumAtomicWrite", tag.second.numAtomicWrite)
.detail("BytesAtomicWrite", tag.second.bytesAtomicWrite)
.detail("NumClear", tag.second.numClear)
.detail("NumClearShards", tag.second.numClearShards.orDefault(0))
.detail("BytesClearEst", tag.second.bytesClearEst.orDefault(0));
.detail("Tag", tagName)
.detail("NumWrite", cost.numWrite)
.detail("ByteWrites", cost.bytesWrite)
.detail("NumAtomicWrite", cost.numAtomicWrite)
.detail("BytesAtomicWrite", cost.bytesAtomicWrite)
.detail("NumClear", cost.numClear)
.detail("NumClearShards", cost.numClearShards)
.detail("BytesClearEst", cost.bytesClearEst);
}
}
if(p.batchTransactions > 0) {

View File

@ -76,13 +76,13 @@ struct ClientTagThrottleLimits {
};
struct TransactionCommitCostEstimation {
int64_t numWrite = 0;
int64_t numAtomicWrite = 0;
int64_t numClear = 0;
unsigned long bytesWrite = 0;
unsigned long bytesAtomicWrite = 0;
Optional<unsigned long> numClearShards;
Optional<unsigned long> bytesClearEst;
int numWrite = 0;
int numAtomicWrite = 0;
int numClear = 0;
int numClearShards = 0;
uint64_t bytesWrite = 0;
uint64_t bytesAtomicWrite = 0;
uint64_t bytesClearEst = 0;
template <class Ar>
void serialize(Ar& ar) {
@ -95,10 +95,8 @@ struct TransactionCommitCostEstimation {
numClear += other.numClear;
bytesWrite += other.bytesWrite;
bytesAtomicWrite += other.numAtomicWrite;
if (other.bytesClearEst.present() || bytesClearEst.present())
bytesClearEst = bytesClearEst.orDefault(0) + other.bytesClearEst.orDefault(0);
if (other.numClearShards.present() || numClearShards.present())
numClearShards = numClearShards.orDefault(0) + other.numClearShards.orDefault(0);
numClearShards += other.numClearShards;
bytesClearEst += other.bytesClearEst;
return *this;
}
};
@ -139,7 +137,7 @@ struct GetRateInfoRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, throttledTagCommitCostEst, detailed, reply);
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply, throttledTagCommitCostEst);
}
};