sample on cost instead of bytes

This commit is contained in:
Xiaoxi Wang 2020-08-10 22:29:59 +00:00
parent 696e77c94e
commit cae6f04070
10 changed files with 82 additions and 71 deletions

View File

@ -161,7 +161,7 @@ public:
void invalidateCache( const KeyRangeRef& );
bool sampleReadTags();
bool sampleOnBytes(uint64_t bytes);
bool sampleOnCost(uint64_t cost);
Reference<ProxyInfo> getMasterProxies(bool useProvisionalProxies);
Future<Reference<ProxyInfo>> getMasterProxiesFuture(bool useProvisionalProxies);

View File

@ -230,7 +230,8 @@ void ClientKnobs::initialize(bool randomize) {
// transaction tags
init( MAX_TAGS_PER_TRANSACTION, 5 );
init( MAX_TRANSACTION_TAG_LENGTH, 16 );
init( COMMIT_SAMPLE_BYTE, 65536 );
init( COMMIT_SAMPLE_COST, 10 ); if( randomize && BUGGIFY ) COMMIT_SAMPLE_COST = 1.0;
init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096;
init( INCOMPLETE_SHARD_PLUS, 4096 );
init( READ_TAG_SAMPLE_RATE, 0.01 ); if( randomize && BUGGIFY ) READ_TAG_SAMPLE_RATE = 1.0; // Communicated to clients from cluster
init( TAG_THROTTLE_SMOOTHING_WINDOW, 2.0 );

View File

@ -217,7 +217,8 @@ public:
// transaction tags
int MAX_TRANSACTION_TAG_LENGTH;
int MAX_TAGS_PER_TRANSACTION;
int COMMIT_SAMPLE_BYTE; // The expectation of sampling is every COMMIT_SAMPLE_BYTE sample once
int COMMIT_SAMPLE_COST; // The expectation of sampling is every COMMIT_SAMPLE_COST sample once
int OPERATION_COST_BYTE_FACTOR; // The same value as SERVER_KNOBS->OPERATION_COST_BYTE_FACTOR
int INCOMPLETE_SHARD_PLUS; // The size of (possible) incomplete shard when estimate clear range
double READ_TAG_SAMPLE_RATE; // Communicated to clients from cluster
double TAG_THROTTLE_SMOOTHING_WINDOW;

View File

@ -108,12 +108,11 @@ struct ClientDBInfo {
int64_t clientTxnInfoSizeLimit;
Optional<Value> forward;
double transactionTagSampleRate;
int transactionTagSampleBytes;
double transactionTagSampleCost;
ClientDBInfo()
: clientTxnInfoSampleRate(std::numeric_limits<double>::infinity()), clientTxnInfoSizeLimit(-1),
transactionTagSampleRate(CLIENT_KNOBS->READ_TAG_SAMPLE_RATE),
transactionTagSampleBytes(CLIENT_KNOBS->COMMIT_SAMPLE_BYTE) {}
transactionTagSampleRate(CLIENT_KNOBS->READ_TAG_SAMPLE_RATE), transactionTagSampleCost(CLIENT_KNOBS->COMMIT_SAMPLE_COST) {}
bool operator == (ClientDBInfo const& r) const { return id == r.id; }
bool operator != (ClientDBInfo const& r) const { return id != r.id; }
@ -124,7 +123,7 @@ struct ClientDBInfo {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, proxies, id, clientTxnInfoSampleRate, clientTxnInfoSizeLimit, forward, transactionTagSampleRate,
transactionTagSampleBytes);
transactionTagSampleCost);
}
};

View File

@ -1104,10 +1104,10 @@ Future<Void> DatabaseContext::onMasterProxiesChanged() {
bool DatabaseContext::sampleReadTags() {
return clientInfo->get().transactionTagSampleRate > 0 && deterministicRandom()->random01() <= clientInfo->get().transactionTagSampleRate;
}
bool DatabaseContext::sampleOnBytes(uint64_t bytes) {
if(bytes >= clientInfo->get().transactionTagSampleBytes) return true;
ASSERT(clientInfo->get().transactionTagSampleBytes > 0);
return deterministicRandom()->random01() < (double)bytes / clientInfo->get().transactionTagSampleBytes;
bool DatabaseContext::sampleOnCost(uint64_t cost) {
if(clientInfo->get().transactionTagSampleCost <= 0) return false;
return deterministicRandom()->random01() <= (double)cost / clientInfo->get().transactionTagSampleCost;
}
int64_t extractIntOption( Optional<StringRef> value, int64_t minValue, int64_t maxValue ) {
@ -3311,18 +3311,18 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transac
auto* it = &transaction->mutations[i];
if (it->type == MutationRef::Type::SetValue || it->isAtomicOp()) {
trCommitCosts.opsCount++;
trCommitCosts.writtenBytes += it->expectedSize();
trCommitCosts.writeCosts += getOperationCost(it->expectedSize());
} else if (it->type == MutationRef::Type::ClearRange) {
trCommitCosts.opsCount++;
if(it->clearSingleKey) {
trCommitCosts.clearIdxBytes.emplace(i, it->expectedSize()); // NOTE: whether we need a weight here?
trCommitCosts.clearIdxCosts.emplace(i, getOperationCost(it->expectedSize()); // NOTE: whether we need a weight here?
continue;
}
keyRange = KeyRange(KeyRangeRef(it->param1, it->param2));
if (self->options.expensiveClearCostEstimation) {
StorageMetrics m = wait(self->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY));
trCommitCosts.clearIdxBytes.emplace(i, m.bytes);
trCommitCosts.writtenBytes += m.bytes;
trCommitCosts.clearIdxCosts.emplace(i, getOperationCost(m.bytes));
trCommitCosts.writeCosts += getOperationCost(m.bytes);
} else {
std::vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(self->getDatabase(), keyRange, CLIENT_KNOBS->TOO_MANY, false,
@ -3333,32 +3333,32 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transac
TraceEvent("NAPIAvgShardSizex100").detail("SmoothTotal", self->getDatabase()->smoothMidShardSize.smoothTotal());
uint64_t bytes = 0;
if (locations .size() == 1)
if (locations.size() == 1)
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS;
else
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS * 2 + (locations.size() - 2) * self->getDatabase()->smoothMidShardSize.smoothTotal();
trCommitCosts.clearIdxBytes.emplace(i, bytes);
trCommitCosts.writtenBytes += bytes;
trCommitCosts.clearIdxCosts.emplace(i, getOperationCost(bytes));
trCommitCosts.writeCosts += getOperationCost(bytes);
}
}
}
// sample on written bytes
if (!self->getDatabase()->sampleOnBytes(trCommitCosts.writtenBytes))
if (!self->getDatabase()->sampleOnCost(trCommitCosts.writeCosts))
return Optional<ClientTrCommitCostEstimation>();
// sample clear op: the expectation of sampling is every COMMIT_SAMPLE_BYTE sample once
ASSERT(trCommitCosts.writtenBytes > 0);
std::map<int, uint64_t> newClearIdxBytes;
for (const auto& [idx, bytes] : trCommitCosts.clearIdxBytes) {
if(trCommitCosts.writtenBytes >= CLIENT_KNOBS->COMMIT_SAMPLE_BYTE){
double mul = trCommitCosts.writtenBytes / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_BYTE);
if(deterministicRandom()->random01() < bytes * mul / trCommitCosts.writtenBytes)
newClearIdxBytes.emplace(idx, bytes);
// sample clear op: the expectation of sampling is every COMMIT_SAMPLE_COST sample once
ASSERT(trCommitCosts.writeCosts > 0);
std::map<int, uint64_t> newClearIdxCosts;
for (const auto& [idx, cost] : trCommitCosts.clearIdxCosts) {
if(trCommitCosts.writeCosts >= CLIENT_KNOBS->COMMIT_SAMPLE_COST){
double mul = trCommitCosts.writeCosts / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_COST);
if(deterministicRandom()->random01() < cost * mul / trCommitCosts.writeCosts)
newClearIdxCosts.emplace(idx, cost);
}
else if(deterministicRandom()->random01() < (double)bytes / trCommitCosts.writtenBytes){
newClearIdxBytes.emplace(idx, bytes);
else if(deterministicRandom()->random01() < (double)cost / trCommitCosts.writeCosts){
newClearIdxCosts.emplace(idx, cost);
}
}
trCommitCosts.clearIdxBytes = newClearIdxBytes;
trCommitCosts.clearIdxCosts.swap(newClearIdxCosts);
return trCommitCosts;
}

View File

@ -357,5 +357,8 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
inline uint64_t getOperationCost(uint64_t bytes) {
return bytes / CLIENT_KNOBS->OPERATION_COST_BYTE_FACTOR + 1;
}
#include "flow/unactorcompiler.h"
#endif

View File

@ -654,6 +654,12 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// clang-format on
if(clientKnobs)
clientKnobs->IS_ACCEPTABLE_DELAY = clientKnobs->IS_ACCEPTABLE_DELAY*std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS)/(5.0*VERSIONS_PER_SECOND);
if(clientKnobs) {
clientKnobs->IS_ACCEPTABLE_DELAY =
clientKnobs->IS_ACCEPTABLE_DELAY *
std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS) /
(5.0 * VERSIONS_PER_SECOND);
clientKnobs->OPERATION_COST_BYTE_FACTOR = OPERATION_COST_BYTE_FACTOR;
clientKnobs->INIT_MID_SHARD_BYTES = MIN_SHARD_BYTES;
}
}

View File

@ -501,23 +501,23 @@ struct ProxyCommitData {
latencyBandConfig = newLatencyBandConfig;
}
void updateSSTagCost(const UID& id, const TagSet& tagSet, MutationRef m, int bytes){
void updateSSTagCost(const UID& id, const TagSet& tagSet, MutationRef m, int cost){
if(ssTagCommitCost.count(id) == 0) {
ssTagCommitCost[id] = TransactionTagMap<TransactionCommitCostEstimation>();
}
for(auto& tag: tagSet) {
auto& cost = ssTagCommitCost[id][tag];
auto& costItem = ssTagCommitCost[id][tag];
if(m.isAtomicOp()) {
cost.numAtomicWrite ++;
cost.bytesAtomicWrite += bytes;
costItem.numAtomicWrite ++;
costItem.costAtomicWrite += cost;
}
else if (m.type == MutationRef::Type::SetValue){
cost.numWrite ++;
cost.bytesWrite += bytes;
costItem.numWrite ++;
costItem.costWrite += cost;
}
else if (m.type == MutationRef::Type::ClearRange){
cost.numClear ++;
cost.bytesClearEst += bytes;
costItem.numClear ++;
costItem.costClearEst += cost;
}
}
}
@ -1151,16 +1151,17 @@ ACTOR Future<Void> commitBatch(
if (isSingleKeyMutation((MutationRef::Type) m.type)) {
// sample single key mutation based on byte
// the expectation of sampling is every COMMIT_SAMPLE_BYTE sample once
// the expectation of sampling is every COMMIT_SAMPLE_COST sample once
if (checkSample) {
double totalSize = trCost->get().writtenBytes;
double mul = std::max(1.0, (double)mutationSize / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_BYTE));
ASSERT(totalSize > 0);
double prob = mul * mutationSize / totalSize;
double totalCosts = trCost->get().writeCosts;
double cost = getOperationCost(mutationSize);
double mul = std::max(1.0, totalCosts / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_COST));
ASSERT(totalCosts > 0);
double prob = mul * cost / totalCosts;
if(deterministicRandom()->random01() < prob) {
for(const auto& ssInfo : self->keyInfo[m.param1].src_info) {
auto id = ssInfo->interf.id();
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, mutationSize);
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, cost);
}
}
}
@ -1196,10 +1197,10 @@ ACTOR Future<Void> commitBatch(
ranges.begin().value().populateTags();
toCommit.addTags(ranges.begin().value().tags);
// check whether clear is sampled
if(checkSample && trCost->get().clearIdxBytes.count(mutationNum) > 0){
if(checkSample && trCost->get().clearIdxCosts.count(mutationNum) > 0){
for(const auto& ssInfo : ranges.begin().value().src_info) {
auto id = ssInfo->interf.id();
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxBytes[mutationNum]);
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxCosts[mutationNum]);
}
}
}
@ -1210,10 +1211,10 @@ ACTOR Future<Void> commitBatch(
r.value().populateTags();
allSources.insert(r.value().tags.begin(), r.value().tags.end());
// check whether clear is sampled
if(checkSample && trCost->get().clearIdxBytes.count(mutationNum) > 0){
if(checkSample && trCost->get().clearIdxCosts.count(mutationNum) > 0){
for(const auto& ssInfo : r.value().src_info) {
auto id = ssInfo->interf.id();
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxBytes[mutationNum]);
self->updateSSTagCost(id, trs[transactionNum].tagSet.get(), m, trCost->get().clearIdxCosts[mutationNum]);
}
}
}

View File

@ -103,7 +103,7 @@ struct StorageQueueInfo {
// refresh periodically
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
uint64_t totalWriteBytes = 0;
uint64_t totalWriteCosts = 0;
int totalWriteOps = 0;
StorageQueueInfo(UID id, LocalityData locality)
@ -855,8 +855,8 @@ Future<Void> refreshStorageServerCommitCost(RatekeeperData *self) {
TransactionCommitCostEstimation maxCost;
double maxRate = 0, maxBusyness = 0;
for(const auto& [tag, cost] : it->value.tagCostEst) {
// opsBeforeSample / opsSampled = COMMIT_SAMPLE_BYTE / 1
double rate = CLIENT_KNOBS->COMMIT_SAMPLE_BYTE * cost.getOpsSum() / elapsed;
// costBeforeSample / costSampled = COMMIT_SAMPLE_COST / 1
double rate = CLIENT_KNOBS->COMMIT_SAMPLE_COST * cost.getCostSum() / elapsed;
if(rate > maxRate) {
busiestTag = tag;
maxRate = rate;
@ -865,10 +865,10 @@ Future<Void> refreshStorageServerCommitCost(RatekeeperData *self) {
}
if(maxRate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) {
it->value.busiestWriteTag = busiestTag;
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteBytes", it->value.totalWriteBytes).detail("TotalWriteOps",it->value.totalWriteOps);
ASSERT(it->value.totalWriteBytes > 0 && it->value.totalWriteOps > 0);
maxBusyness = (SERVER_KNOBS->OPERATION_COST_BYTE_FACTOR * maxCost.getOpsSum() + maxCost.getBytesSum()) /
(SERVER_KNOBS->OPERATION_COST_BYTE_FACTOR * it->value.totalWriteOps + it->value.totalWriteBytes);
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps);
ASSERT(it->value.totalWriteCosts > 0 && it->value.totalWriteOps > 0);
maxBusyness = double(maxCost.getOpsSum() + maxCost.getCostSum()) /
(it->value.totalWriteOps + it->value.totalWriteCosts);
it->value.busiestWriteTagFractionalBusyness = maxBusyness;
it->value.busiestWriteTagRate = maxRate;
}
@ -877,7 +877,7 @@ Future<Void> refreshStorageServerCommitCost(RatekeeperData *self) {
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagOps", maxCost.getOpsSum())
.detail("TagBytes", maxCost.getBytesSum())
.detail("TagCosts", maxCost.getCostSum())
.detail("TagRate", maxRate)
.detail("TagBusyness", maxBusyness)
.detail("Reported", it->value.busiestWriteTag.present())
@ -886,7 +886,7 @@ Future<Void> refreshStorageServerCommitCost(RatekeeperData *self) {
// reset statistics
it->value.tagCostEst.clear();
it->value.totalWriteOps = 0;
it->value.totalWriteBytes = 0;
it->value.totalWriteCosts = 0;
}
self->lastBusiestCommitTagPick = now();
return Void();
@ -1273,7 +1273,7 @@ void updateCommitCostEstimation(RatekeeperData* self, UIDTransactionTagMap<Trans
if(tagCostIt == costEstimation.end()) continue;
for(const auto& [tagName, cost] : tagCostIt->second) {
it->value.tagCostEst[tagName] += cost;
it->value.totalWriteBytes += cost.getBytesSum();
it->value.totalWriteCosts += cost.getCostSum();
it->value.totalWriteOps += cost.getOpsSum();
}
}

View File

@ -79,35 +79,35 @@ struct TransactionCommitCostEstimation {
int numWrite = 0;
int numAtomicWrite = 0;
int numClear = 0;
uint64_t bytesWrite = 0;
uint64_t bytesAtomicWrite = 0;
uint64_t bytesClearEst = 0;
uint64_t costWrite = 0;
uint64_t costAtomicWrite = 0;
uint64_t costClearEst = 0;
uint64_t getBytesSum() const { return bytesClearEst + bytesAtomicWrite + bytesWrite; }
uint64_t getCostSum() const { return costClearEst + costAtomicWrite + costWrite; }
int getOpsSum() const { return numWrite + numAtomicWrite + numClear; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear);
serializer(ar, costWrite, costClearEst, costAtomicWrite, numWrite, numAtomicWrite, numClear);
}
TransactionCommitCostEstimation& operator+=(const TransactionCommitCostEstimation& other) {
numWrite += other.numWrite;
numAtomicWrite += other.numAtomicWrite;
numClear += other.numClear;
bytesWrite += other.bytesWrite;
bytesAtomicWrite += other.numAtomicWrite;
bytesClearEst += other.bytesClearEst;
costWrite += other.costWrite;
costAtomicWrite += other.numAtomicWrite;
costClearEst += other.costClearEst;
return *this;
}
};
struct ClientTrCommitCostEstimation {
int opsCount = 0;
uint64_t writtenBytes = 0;
std::map<int, uint64_t> clearIdxBytes;
uint64_t writeCosts = 0;
std::map<int, uint64_t> clearIdxCosts;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, opsCount, writtenBytes, clearIdxBytes);
serializer(ar, opsCount, writeCosts, clearIdxCosts);
}
};
struct GetRateInfoReply {