add clear cost estimation

This commit is contained in:
Xiaoxi Wang 2020-07-14 00:18:52 +00:00
parent 6ec2f92a8d
commit d512170cd8
6 changed files with 44 additions and 31 deletions

View File

@ -232,6 +232,7 @@ void ClientKnobs::initialize(bool randomize) {
init( TAG_THROTTLE_RECHECK_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_RECHECK_INTERVAL = 0.0;
init( TAG_THROTTLE_EXPIRATION_INTERVAL, 60.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_EXPIRATION_INTERVAL = 1.0;
init( COMMIT_CLEAR_COST_ESTIMATE_METHOD, 0.0);
// clang-format on
}

View File

@ -219,6 +219,9 @@ public:
double TAG_THROTTLE_RECHECK_INTERVAL;
double TAG_THROTTLE_EXPIRATION_INTERVAL;
double COMMIT_CLEAR_COST_ESTIMATE_METHOD; // 0 --> estimate through shard-map; 1 --> estimate by asking StorageServer
// (more accurate but more expensive)
ClientKnobs();
void initialize(bool randomize = false);
};

View File

@ -3257,6 +3257,32 @@ void Transaction::setupWatches() {
}
}
ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* self, CommitTransactionRef* transaction) {
state MutationRef* it = transaction->mutations.begin();
state MutationRef* end = transaction->mutations.end();
state TransactionCommitCostEstimation trCommitCosts;
for (; it != end; ++it) {
if (it->type == MutationRef::Type::SetValue) {
trCommitCosts.bytesWrite += it->expectedSize();
trCommitCosts.numWrite++;
} else if (it->isAtomicOp()) {
trCommitCosts.bytesAtomicWrite += it->expectedSize();
trCommitCosts.numAtomicWrite++;
} else if (it->type == MutationRef::Type::ClearRange) {
if (deterministicRandom()->random01() < CLIENT_KNOBS->COMMIT_CLEAR_COST_ESTIMATE_METHOD) {
try {
StorageMetrics m = wait(self->getStorageMetrics(KeyRangeRef(it->param1, it->param2), -1));
trCommitCosts.bytesClearEst += m.bytes;
continue;
} catch (...) {} // do a local estimation
}
// TODO how much data a clear would delete: 1. shard-map
}
}
return trCommitCosts;
}
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
state TraceInterval interval( "TransactionCommit" );
state double startTime = now();
@ -3275,6 +3301,8 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
Version v = wait( readVersion );
req.transaction.read_snapshot = v;
TransactionCommitCostEstimation costEst = wait( estimateCommitCosts(tr, &req.transaction) );
req.commitCostEstimation = costEst; // estimateCommitCosts(tr, &req.transaction, false);
startTime = now();
state Optional<UID> commitID = Optional<UID>();
@ -3401,33 +3429,6 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
}
}
TransactionCommitCostEstimation Transaction::estimateCommitCosts(bool traceEvent) const {
auto it = tr.transaction.mutations.cbegin();
auto end = tr.transaction.mutations.cend();
TransactionCommitCostEstimation trCommitCosts;
for (; it != end; ++it) {
if (it->type == MutationRef::Type::SetValue) {
trCommitCosts.bytesWrite += it->expectedSize();
trCommitCosts.numWrite ++;
}
else if (it->isAtomicOp()) {
trCommitCosts.bytesAtomicWrite += it->expectedSize();
trCommitCosts.numAtomicWrite ++;
}
else if (it->type == MutationRef::Type::ClearRange) {
// TODO how much data a clear would delete: 1. shard-map 2. storage server
}
}
if (traceEvent) {
TraceEvent(SevDebug, "TransactionCostEstimation")
.detail("numWrite", trCommitCosts.numWrite)
.detail("byteWrites", trCommitCosts.bytesWrite)
.detail("numAtomicWrite", trCommitCosts.numAtomicWrite)
.detail("bytesAtomicWrite", trCommitCosts.bytesAtomicWrite)
.detail("bytesClearEst", trCommitCosts.bytesClearEst);
}
return trCommitCosts;
}
Future<Void> Transaction::commitMutations() {
try {
@ -3447,7 +3448,6 @@ Future<Void> Transaction::commitMutations() {
cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
tr.commitCostEstimation = estimateCommitCosts(true);
tr.tagSet = options.tags;
size_t transactionSize = getSize();

View File

@ -327,8 +327,6 @@ public:
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.write_conflict_ranges, tr.arena);
}
TransactionCommitCostEstimation estimateCommitCosts(bool traceEvent = false) const;
private:
Future<Version> getReadVersion(uint32_t flags);
Database cx;

View File

@ -1248,9 +1248,20 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
if (p.totalTransactions > 0) {
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.totalTransactions );
// TODO process commitCostEstimation
for(auto tag : req.throttledTagCounts) {
self.throttledTags.addRequests(tag.first, tag.second);
}
for(auto& tag : req.throttledTagCommitCostEst) {
TraceEvent(SevInfo, "ThrottledTagCommitCostEst")
.suppressFor(1.0)
.detail("Tag", tag.first)
.detail("NumWrite", tag.second.numWrite)
.detail("ByteWrites", tag.second.bytesWrite)
.detail("NumAtomicWrite", tag.second.numAtomicWrite)
.detail("BytesAtomicWrite", tag.second.bytesAtomicWrite)
.detail("BytesClearEst", tag.second.bytesClearEst);
}
}
if(p.batchTransactions > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batchTransactions );

View File

@ -133,7 +133,7 @@ struct GetRateInfoRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply);
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, throttledTagCommitCostEst, detailed, reply);
}
};