finish local shard estimation

This commit is contained in:
Xiaoxi Wang 2020-07-15 02:47:39 +00:00
parent bcb858288b
commit eb44ae0e86
4 changed files with 35 additions and 23 deletions

View File

@ -231,6 +231,7 @@ void ClientKnobs::initialize(bool randomize) {
init( TAG_THROTTLE_SMOOTHING_WINDOW, 2.0 );
init( TAG_THROTTLE_RECHECK_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_RECHECK_INTERVAL = 0.0;
init( TAG_THROTTLE_EXPIRATION_INTERVAL, 60.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_EXPIRATION_INTERVAL = 1.0;
// clang-format on
}

View File

@ -3257,10 +3257,12 @@ void Transaction::setupWatches() {
}
}
ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* self, CommitTransactionRef* transaction) {
ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* self,
CommitTransactionRef* transaction) {
state MutationRef* it = transaction->mutations.begin();
state MutationRef* end = transaction->mutations.end();
state TransactionCommitCostEstimation trCommitCosts;
state KeyRange keyRange;
for (; it != end; ++it) {
if (it->type == MutationRef::Type::SetValue) {
trCommitCosts.bytesWrite += it->expectedSize();
@ -3269,16 +3271,21 @@ ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* s
trCommitCosts.bytesAtomicWrite += it->expectedSize();
trCommitCosts.numAtomicWrite++;
} else if (it->type == MutationRef::Type::ClearRange) {
trCommitCosts.numClear ++;
if(self->options.expensiveClearCostEstimation) {
trCommitCosts.numClear++;
keyRange = KeyRange(KeyRangeRef(it->param1, it->param2));
if (self->options.expensiveClearCostEstimation) {
try {
StorageMetrics m = wait(self->getStorageMetrics(KeyRangeRef(it->param1, it->param2), -1));
trCommitCosts.bytesClearEst += m.bytes;
StorageMetrics m = wait(self->getStorageMetrics(keyRange, INT_MAX));
trCommitCosts.bytesClearEst.present() ? (trCommitCosts.bytesClearEst.get() += m.bytes)
: (trCommitCosts.bytesClearEst = m.bytes);
continue;
} catch (...) {} // do a local estimation
} catch (...) {
} // do a local estimation
}
// TODO how much data a clear would delete: 1. shard-map
std::vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
self->getDatabase(), keyRange, INT_MAX, false, &StorageServerInterface::getShardState, self->info));
trCommitCosts.numClearShards.present() ? (trCommitCosts.numClearShards.get() += locations.size())
: (trCommitCosts.numClearShards = locations.size());
}
}
return trCommitCosts;
@ -3305,11 +3312,8 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
req.transaction.read_snapshot = v;
}
else {
Version v;
TransactionCommitCostEstimation costEst;
wait(store(v, readVersion) && store(costEst, estimateCommitCosts(tr, &req.transaction)));
req.transaction.read_snapshot = v;
req.commitCostEstimation = costEst; // estimateCommitCosts(tr, &req.transaction, false);
req.commitCostEstimation = TransactionCommitCostEstimation();
wait(store(req.transaction.read_snapshot, readVersion) && store(req.commitCostEstimation.get(), estimateCommitCosts(tr, &req.transaction)));
}
startTime = now();
@ -3732,15 +3736,17 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
span.addParent(BinaryReader::fromStringRef<UID>(value.get(), Unversioned()));
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
break;
case FDBTransactionOptions::EXPENSIVE_CLEAR_COST_ESTIMATION_ENABLE:
validateOptionValue(value, false);
options.expensiveClearCostEstimation = true;
break;
default:
default:
break;
}
}

View File

@ -1291,7 +1291,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
.detail("NumAtomicWrite", tag.second.numAtomicWrite)
.detail("BytesAtomicWrite", tag.second.bytesAtomicWrite)
.detail("NumClear", tag.second.numClear)
.detail("BytesClearEst", tag.second.bytesClearEst);
.detail("NumClearShards", tag.second.numClearShards.orDefault(0))
.detail("BytesClearEst", tag.second.bytesClearEst.orDefault(0));
}
}
if(p.batchTransactions > 0) {

View File

@ -80,21 +80,25 @@ struct TransactionCommitCostEstimation {
int64_t numAtomicWrite = 0;
int64_t numClear = 0;
unsigned long bytesWrite = 0;
unsigned long bytesClearEst = 0;
unsigned long bytesAtomicWrite = 0;
Optional<unsigned long> numClearShards;
Optional<unsigned long> bytesClearEst;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear);
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear, numClearShards);
}
TransactionCommitCostEstimation& operator += (const TransactionCommitCostEstimation& other) {
TransactionCommitCostEstimation& operator+=(const TransactionCommitCostEstimation& other) {
numWrite += other.numWrite;
numAtomicWrite += other.numAtomicWrite;
numClear += other.numClear;
bytesWrite += other.bytesWrite;
bytesClearEst += other.bytesClearEst;
bytesAtomicWrite += other.numAtomicWrite;
if (other.bytesClearEst.present() || bytesClearEst.present())
bytesClearEst = bytesClearEst.orDefault(0) + other.bytesClearEst.orDefault(0);
if (other.numClearShards.present() || numClearShards.present())
numClearShards = numClearShards.orDefault(0) + other.numClearShards.orDefault(0);
return *this;
}
};