Merge master branch and keep master proxy reporting txn cost estimation to ratekeeper
This commit is contained in:
commit
e87327b33b
|
@ -161,7 +161,8 @@ public:
|
|||
void invalidateCache( const KeyRef&, bool isBackward = false );
|
||||
void invalidateCache( const KeyRangeRef& );
|
||||
|
||||
bool sampleReadTags();
|
||||
bool sampleReadTags() const;
|
||||
bool sampleOnCost(uint64_t cost) const;
|
||||
|
||||
void updateProxies();
|
||||
Reference<ProxyInfo> getMasterProxies(bool useProvisionalProxies);
|
||||
|
@ -309,6 +310,7 @@ public:
|
|||
Counter transactionsResourceConstrained;
|
||||
Counter transactionsProcessBehind;
|
||||
Counter transactionsThrottled;
|
||||
Counter transactionsExpensiveClearCostEstCount;
|
||||
|
||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit;
|
||||
|
||||
|
@ -341,6 +343,7 @@ public:
|
|||
HealthMetrics healthMetrics;
|
||||
double healthMetricsLastUpdated;
|
||||
double detailedHealthMetricsLastUpdated;
|
||||
Smoother smoothMidShardSize;
|
||||
|
||||
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
|
||||
|
||||
|
|
|
@ -66,6 +66,8 @@ void ClientKnobs::initialize(bool randomize) {
|
|||
init( BACKOFF_GROWTH_RATE, 2.0 );
|
||||
init( RESOURCE_CONSTRAINED_MAX_BACKOFF, 30.0 );
|
||||
init( PROXY_COMMIT_OVERHEAD_BYTES, 23 ); //The size of serializing 7 tags (3 primary, 3 remote, 1 log router) + 2 for the tag length
|
||||
init( SHARD_STAT_SMOOTH_AMOUNT, 5.0 );
|
||||
init( INIT_MID_SHARD_BYTES, 200000 ); if( randomize && BUGGIFY ) INIT_MID_SHARD_BYTES = 40000; // The same value as SERVER_KNOBS->MIN_SHARD_BYTES
|
||||
|
||||
init( TRANSACTION_SIZE_LIMIT, 1e7 );
|
||||
init( KEY_SIZE_LIMIT, 1e4 );
|
||||
|
@ -90,6 +92,7 @@ void ClientKnobs::initialize(bool randomize) {
|
|||
init( STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, 15.0 );
|
||||
init( AGGREGATE_HEALTH_METRICS_MAX_STALENESS, 0.5 );
|
||||
init( DETAILED_HEALTH_METRICS_MAX_STALENESS, 5.0 );
|
||||
init( MID_SHARD_SIZE_MAX_STALENESS, 10.0 );
|
||||
init( TAG_ENCODE_KEY_SERVERS, true ); if( randomize && BUGGIFY ) TAG_ENCODE_KEY_SERVERS = false;
|
||||
|
||||
//KeyRangeMap
|
||||
|
@ -229,6 +232,9 @@ void ClientKnobs::initialize(bool randomize) {
|
|||
// transaction tags
|
||||
init( MAX_TAGS_PER_TRANSACTION, 5 );
|
||||
init( MAX_TRANSACTION_TAG_LENGTH, 16 );
|
||||
init( COMMIT_SAMPLE_COST, 100 ); if( randomize && BUGGIFY ) COMMIT_SAMPLE_COST = 10;
|
||||
init( WRITE_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) WRITE_COST_BYTE_FACTOR = 4096;
|
||||
init( INCOMPLETE_SHARD_PLUS, 4096 );
|
||||
init( READ_TAG_SAMPLE_RATE, 0.01 ); if( randomize && BUGGIFY ) READ_TAG_SAMPLE_RATE = 1.0; // Communicated to clients from cluster
|
||||
init( TAG_THROTTLE_SMOOTHING_WINDOW, 2.0 );
|
||||
init( TAG_THROTTLE_RECHECK_INTERVAL, 5.0 ); if( randomize && BUGGIFY ) TAG_THROTTLE_RECHECK_INTERVAL = 0.0;
|
||||
|
|
|
@ -61,6 +61,8 @@ public:
|
|||
double BACKOFF_GROWTH_RATE;
|
||||
double RESOURCE_CONSTRAINED_MAX_BACKOFF;
|
||||
int PROXY_COMMIT_OVERHEAD_BYTES;
|
||||
double SHARD_STAT_SMOOTH_AMOUNT;
|
||||
int INIT_MID_SHARD_BYTES;
|
||||
|
||||
int TRANSACTION_SIZE_LIMIT;
|
||||
int64_t KEY_SIZE_LIMIT;
|
||||
|
@ -86,6 +88,7 @@ public:
|
|||
double STORAGE_METRICS_TOO_MANY_SHARDS_DELAY;
|
||||
double AGGREGATE_HEALTH_METRICS_MAX_STALENESS;
|
||||
double DETAILED_HEALTH_METRICS_MAX_STALENESS;
|
||||
double MID_SHARD_SIZE_MAX_STALENESS;
|
||||
bool TAG_ENCODE_KEY_SERVERS;
|
||||
|
||||
//KeyRangeMap
|
||||
|
@ -216,6 +219,9 @@ public:
|
|||
// transaction tags
|
||||
int MAX_TRANSACTION_TAG_LENGTH;
|
||||
int MAX_TAGS_PER_TRANSACTION;
|
||||
int COMMIT_SAMPLE_COST; // The expectation of sampling is every COMMIT_SAMPLE_COST sample once
|
||||
int WRITE_COST_BYTE_FACTOR;
|
||||
int INCOMPLETE_SHARD_PLUS; // The size of (possible) incomplete shard when estimate clear range
|
||||
double READ_TAG_SAMPLE_RATE; // Communicated to clients from cluster
|
||||
double TAG_THROTTLE_SMOOTHING_WINDOW;
|
||||
double TAG_THROTTLE_RECHECK_INTERVAL;
|
||||
|
|
|
@ -107,8 +107,12 @@ struct ClientDBInfo {
|
|||
int64_t clientTxnInfoSizeLimit;
|
||||
Optional<Value> forward;
|
||||
double transactionTagSampleRate;
|
||||
double transactionTagSampleCost;
|
||||
|
||||
ClientDBInfo() : clientTxnInfoSampleRate(std::numeric_limits<double>::infinity()), clientTxnInfoSizeLimit(-1), transactionTagSampleRate(CLIENT_KNOBS->READ_TAG_SAMPLE_RATE) {}
|
||||
ClientDBInfo()
|
||||
: clientTxnInfoSampleRate(std::numeric_limits<double>::infinity()), clientTxnInfoSizeLimit(-1),
|
||||
transactionTagSampleRate(CLIENT_KNOBS->READ_TAG_SAMPLE_RATE),
|
||||
transactionTagSampleCost(CLIENT_KNOBS->COMMIT_SAMPLE_COST) {}
|
||||
|
||||
bool operator == (ClientDBInfo const& r) const { return id == r.id; }
|
||||
bool operator != (ClientDBInfo const& r) const { return id != r.id; }
|
||||
|
@ -118,7 +122,8 @@ struct ClientDBInfo {
|
|||
if constexpr (!is_fb_function<Archive>) {
|
||||
ASSERT(ar.protocolVersion().isValid());
|
||||
}
|
||||
serializer(ar, grvProxies, masterProxies, id, clientTxnInfoSampleRate, clientTxnInfoSizeLimit, forward, transactionTagSampleRate);
|
||||
serializer(ar, grvProxies, masterProxies, id, clientTxnInfoSampleRate, clientTxnInfoSizeLimit, forward,
|
||||
transactionTagSampleRate, transactionTagSampleCost);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -157,7 +162,7 @@ struct CommitTransactionRequest : TimedRequest {
|
|||
ReplyPromise<CommitID> reply;
|
||||
uint32_t flags;
|
||||
Optional<UID> debugID;
|
||||
Optional<TransactionCommitCostEstimation> commitCostEstimation;
|
||||
Optional<ClientTrCommitCostEstimation> commitCostEstimation;
|
||||
Optional<TagSet> tagSet;
|
||||
|
||||
CommitTransactionRequest() : flags(0) {}
|
||||
|
@ -186,6 +191,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
|||
Version version;
|
||||
bool locked;
|
||||
Optional<Value> metadataVersion;
|
||||
int64_t midShardSize = 0;
|
||||
|
||||
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
|
||||
|
||||
|
@ -193,7 +199,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, BasicLoadBalancedReply::recentRequests, version, locked, metadataVersion, tagThrottleInfo);
|
||||
serializer(ar, BasicLoadBalancedReply::recentRequests, version, locked, metadataVersion, tagThrottleInfo,
|
||||
midShardSize);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -850,8 +850,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
: connectionFile(connectionFile), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID),
|
||||
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
|
||||
apiVersion(apiVersion), switchable(switchable), proxyProvisional(false), cc("TransactionMetrics"),
|
||||
transactionReadVersions("ReadVersions", cc),
|
||||
transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
|
||||
transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
|
||||
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
|
||||
transactionReadVersionBatches("ReadVersionBatches", cc),
|
||||
transactionBatchReadVersions("BatchPriorityReadVersions", cc),
|
||||
|
@ -878,6 +877,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0), latencies(1000), readLatencies(1000),
|
||||
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)) {
|
||||
dbId = deterministicRandom()->randomUniqueID();
|
||||
connected = (clientInfo->get().masterProxies.size() && clientInfo->get().grvProxies.size())
|
||||
|
@ -900,6 +901,9 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
cacheListMonitor = monitorCacheList(this);
|
||||
|
||||
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
|
||||
|
||||
if (apiVersionAtLeast(700)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ERRORMSG, SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
std::make_unique<SingleSpecialKeyImpl>(
|
||||
|
@ -995,22 +999,36 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
}
|
||||
}
|
||||
|
||||
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
|
||||
transactionReadVersionsCompleted("ReadVersionsCompleted", cc), transactionReadVersionBatches("ReadVersionBatches", cc), transactionBatchReadVersions("BatchPriorityReadVersions", cc),
|
||||
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc), transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
|
||||
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc), transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
|
||||
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
|
||||
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc), transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
|
||||
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc), transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc),
|
||||
transactionBytesRead("BytesRead", cc), transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc), transactionCommittedMutations("CommittedMutations", cc),
|
||||
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc), transactionClearMutations("ClearMutations", cc),
|
||||
transactionAtomicMutations("AtomicMutations", cc), transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
|
||||
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc), transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc), transactionsTooOld("TooOld", cc),
|
||||
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
|
||||
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
|
||||
internal(false) {}
|
||||
|
||||
DatabaseContext::DatabaseContext(const Error& err)
|
||||
: deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
|
||||
transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
|
||||
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
|
||||
transactionReadVersionBatches("ReadVersionBatches", cc),
|
||||
transactionBatchReadVersions("BatchPriorityReadVersions", cc),
|
||||
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc),
|
||||
transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
|
||||
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc),
|
||||
transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
|
||||
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc),
|
||||
transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
|
||||
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc),
|
||||
transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
|
||||
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc),
|
||||
transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc), transactionBytesRead("BytesRead", cc),
|
||||
transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc),
|
||||
transactionCommittedMutations("CommittedMutations", cc),
|
||||
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc),
|
||||
transactionClearMutations("ClearMutations", cc), transactionAtomicMutations("AtomicMutations", cc),
|
||||
transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
|
||||
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc),
|
||||
transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc),
|
||||
transactionsTooOld("TooOld", cc), transactionsFutureVersions("FutureVersions", cc),
|
||||
transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
|
||||
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc),
|
||||
transactionsProcessBehind("ProcessBehind", cc), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), internal(false) {}
|
||||
|
||||
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, TaskPriority taskID, bool lockAware, int apiVersion, bool switchable) {
|
||||
return Database( new DatabaseContext( Reference<AsyncVar<Reference<ClusterConnectionFile>>>(), clientInfo, clientInfoMonitor, taskID, clientLocality, enableLocalityLoadBalance, lockAware, true, apiVersion, switchable ) );
|
||||
|
@ -1101,10 +1119,15 @@ Future<Void> DatabaseContext::onProxiesChanged() {
|
|||
return this->proxiesChangeTrigger.onTrigger();
|
||||
}
|
||||
|
||||
bool DatabaseContext::sampleReadTags() {
|
||||
bool DatabaseContext::sampleReadTags() const {
|
||||
return clientInfo->get().transactionTagSampleRate > 0 && deterministicRandom()->random01() <= clientInfo->get().transactionTagSampleRate;
|
||||
}
|
||||
|
||||
bool DatabaseContext::sampleOnCost(uint64_t cost) const {
|
||||
if (clientInfo->get().transactionTagSampleCost <= 0) return false;
|
||||
return deterministicRandom()->random01() <= (double)cost / clientInfo->get().transactionTagSampleCost;
|
||||
}
|
||||
|
||||
int64_t extractIntOption( Optional<StringRef> value, int64_t minValue, int64_t maxValue ) {
|
||||
validateOptionValue(value, true);
|
||||
if( value.get().size() != 8 ) {
|
||||
|
@ -2025,7 +2048,7 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version, Spa
|
|||
cx->getGrvProxies(false), &GrvProxyInterface::getConsistentReadVersion,
|
||||
GetReadVersionRequest(span.context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) {
|
||||
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
|
||||
|
||||
if (v.midShardSize > 0) cx->smoothMidShardSize.setTotal(v.midShardSize);
|
||||
if (v.version >= version)
|
||||
return v.version;
|
||||
// SOMEDAY: Do the wait on the server side, possibly use less expensive source of committed version (causal consistency is not needed for this purpose)
|
||||
|
@ -3041,7 +3064,6 @@ void Transaction::clear( const KeyRef& key, bool addConflictRange ) {
|
|||
data[key.size()] = 0;
|
||||
t.mutations.emplace_back(req.arena, MutationRef::ClearRange, KeyRef(data, key.size()),
|
||||
KeyRef(data, key.size() + 1));
|
||||
|
||||
if(addConflictRange)
|
||||
t.write_conflict_ranges.emplace_back(req.arena, KeyRef(data, key.size()), KeyRef(data, key.size() + 1));
|
||||
}
|
||||
|
@ -3324,33 +3346,73 @@ void Transaction::setupWatches() {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* self,
|
||||
CommitTransactionRef* transaction) {
|
||||
state MutationRef* it = transaction->mutations.begin();
|
||||
state MutationRef* end = transaction->mutations.end();
|
||||
state TransactionCommitCostEstimation trCommitCosts;
|
||||
state KeyRange keyRange;
|
||||
for (; it != end; ++it) {
|
||||
if (it->type == MutationRef::Type::SetValue) {
|
||||
trCommitCosts.bytesWrite += it->expectedSize();
|
||||
trCommitCosts.numWrite++;
|
||||
} else if (it->isAtomicOp()) {
|
||||
trCommitCosts.bytesAtomicWrite += it->expectedSize();
|
||||
trCommitCosts.numAtomicWrite++;
|
||||
ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Transaction* self,
|
||||
CommitTransactionRef const* transaction) {
|
||||
state ClientTrCommitCostEstimation trCommitCosts;
|
||||
state KeyRangeRef keyRange;
|
||||
state int i = 0;
|
||||
|
||||
for (; i < transaction->mutations.size(); ++i) {
|
||||
auto* it = &transaction->mutations[i];
|
||||
|
||||
if (it->type == MutationRef::Type::SetValue || it->isAtomicOp()) {
|
||||
trCommitCosts.opsCount++;
|
||||
trCommitCosts.writeCosts += getWriteOperationCost(it->expectedSize());
|
||||
} else if (it->type == MutationRef::Type::ClearRange) {
|
||||
trCommitCosts.numClear++;
|
||||
keyRange = KeyRange(KeyRangeRef(it->param1, it->param2));
|
||||
trCommitCosts.opsCount++;
|
||||
keyRange = KeyRangeRef(it->param1, it->param2);
|
||||
if (self->options.expensiveClearCostEstimation) {
|
||||
StorageMetrics m = wait(self->getStorageMetrics(keyRange, std::numeric_limits<int>::max()));
|
||||
trCommitCosts.bytesClearEst += m.bytes;
|
||||
StorageMetrics m = wait(self->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY));
|
||||
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes));
|
||||
trCommitCosts.writeCosts += getWriteOperationCost(m.bytes);
|
||||
++trCommitCosts.expensiveCostEstCount;
|
||||
++self->getDatabase()->transactionsExpensiveClearCostEstCount;
|
||||
}
|
||||
else {
|
||||
std::vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
|
||||
self->getDatabase(), keyRange, std::numeric_limits<int>::max(), false, &StorageServerInterface::getShardState, self->info));
|
||||
trCommitCosts.numClearShards += locations.size();
|
||||
std::vector<pair<KeyRange, Reference<LocationInfo>>> locations =
|
||||
wait(getKeyRangeLocations(self->getDatabase(), keyRange, CLIENT_KNOBS->TOO_MANY, false,
|
||||
&StorageServerInterface::getShardState, self->info));
|
||||
if (locations.empty()) continue;
|
||||
|
||||
uint64_t bytes = 0;
|
||||
if (locations.size() == 1) {
|
||||
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS;
|
||||
} else { // small clear on the boundary will hit two shards but be much smaller than the shard size
|
||||
bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS * 2 +
|
||||
(locations.size() - 2) * (int64_t)self->getDatabase()->smoothMidShardSize.smoothTotal();
|
||||
}
|
||||
|
||||
trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(bytes));
|
||||
trCommitCosts.writeCosts += getWriteOperationCost(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sample on written bytes
|
||||
if (!self->getDatabase()->sampleOnCost(trCommitCosts.writeCosts)) return Optional<ClientTrCommitCostEstimation>();
|
||||
|
||||
// sample clear op: the expectation of #sampledOp is every COMMIT_SAMPLE_COST sample once
|
||||
// we also scale the cost of mutations whose cost is less than COMMIT_SAMPLE_COST as scaledCost =
|
||||
// min(COMMIT_SAMPLE_COST, cost) If we have 4 transactions: A - 100 1-cost mutations: E[sampled ops] = 1, E[sampled
|
||||
// cost] = 100 B - 1 100-cost mutation: E[sampled ops] = 1, E[sampled cost] = 100 C - 50 2-cost mutations: E[sampled
|
||||
// ops] = 1, E[sampled cost] = 100 D - 1 150-cost mutation and 150 1-cost mutations: E[sampled ops] = 3, E[sampled
|
||||
// cost] = 150cost * 1 + 150 * 100cost * 0.01 = 300
|
||||
ASSERT(trCommitCosts.writeCosts > 0);
|
||||
std::deque<std::pair<int, uint64_t>> newClearIdxCosts;
|
||||
for (const auto& [idx, cost] : trCommitCosts.clearIdxCosts) {
|
||||
if (trCommitCosts.writeCosts >= CLIENT_KNOBS->COMMIT_SAMPLE_COST) {
|
||||
double mul = trCommitCosts.writeCosts / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_COST);
|
||||
if (deterministicRandom()->random01() < cost * mul / trCommitCosts.writeCosts) {
|
||||
newClearIdxCosts.emplace_back(
|
||||
idx, cost < CLIENT_KNOBS->COMMIT_SAMPLE_COST ? CLIENT_KNOBS->COMMIT_SAMPLE_COST : cost);
|
||||
}
|
||||
} else if (deterministicRandom()->random01() < (double)cost / trCommitCosts.writeCosts) {
|
||||
newClearIdxCosts.emplace_back(
|
||||
idx, cost < CLIENT_KNOBS->COMMIT_SAMPLE_COST ? CLIENT_KNOBS->COMMIT_SAMPLE_COST : cost);
|
||||
}
|
||||
}
|
||||
|
||||
trCommitCosts.clearIdxCosts.swap(newClearIdxCosts);
|
||||
return trCommitCosts;
|
||||
}
|
||||
|
||||
|
@ -3370,11 +3432,11 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
commit_unknown_result()});
|
||||
}
|
||||
|
||||
if (!req.tagSet.present()) {
|
||||
wait(store(req.transaction.read_snapshot, readVersion));
|
||||
if (req.tagSet.present() && tr->options.priority < TransactionPriority::IMMEDIATE) {
|
||||
wait(store(req.transaction.read_snapshot, readVersion) &&
|
||||
store(req.commitCostEstimation, estimateCommitCosts(tr, &req.transaction)));
|
||||
} else {
|
||||
req.commitCostEstimation = TransactionCommitCostEstimation();
|
||||
wait(store(req.transaction.read_snapshot, readVersion) && store(req.commitCostEstimation.get(), estimateCommitCosts(tr, &req.transaction)));
|
||||
wait(store(req.transaction.read_snapshot, readVersion));
|
||||
}
|
||||
|
||||
startTime = now();
|
||||
|
|
|
@ -357,5 +357,8 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
|
|||
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
|
||||
|
||||
inline uint64_t getWriteOperationCost(uint64_t bytes) {
|
||||
return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1;
|
||||
}
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -206,4 +206,6 @@ using TransactionTagMap = std::unordered_map<TransactionTag, Value, std::hash<Tr
|
|||
template<class Value>
|
||||
using PrioritizedTransactionTagMap = std::map<TransactionPriority, TransactionTagMap<Value>>;
|
||||
|
||||
template <class Value>
|
||||
using UIDTransactionTagMap = std::unordered_map<UID, TransactionTagMap<Value>>;
|
||||
#endif
|
|
@ -512,10 +512,10 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
|
|||
beginKey = keyServers.end()[-1].key;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
TraceEvent("GetInitialTeamsKeyServersRetry", distributorId).error(e);
|
||||
|
||||
wait(tr.onError(e));
|
||||
ASSERT(!succeeded); //We shouldn't be retrying if we have already started modifying result in this loop
|
||||
TraceEvent("GetInitialTeamsKeyServersRetry", distributorId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4826,21 +4826,6 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req, PromiseStream<GetMetricsListRequest> getShardMetricsList) {
|
||||
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(errorOr(brokenPromiseToNever(
|
||||
getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
|
||||
|
||||
if(result.isError()) {
|
||||
req.reply.sendError(result.getError());
|
||||
} else {
|
||||
GetDataDistributorMetricsReply rep;
|
||||
rep.storageMetricsList = result.get();
|
||||
req.reply.send(rep);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
|
||||
state Future<Void> dbInfoChange = db->onChange();
|
||||
if (!setDDEnabled(false, snapReq.snapUID)) {
|
||||
|
@ -4990,6 +4975,37 @@ ACTOR Future<Void> cacheServerWatcher(Database* db) {
|
|||
}
|
||||
}
|
||||
|
||||
static int64_t getMedianShardSize(VectorRef<DDMetricsRef> metricVec) {
|
||||
std::nth_element(metricVec.begin(), metricVec.begin() + metricVec.size() / 2, metricVec.end(),
|
||||
[](const DDMetricsRef& d1, const DDMetricsRef& d2) { return d1.shardBytes < d2.shardBytes; });
|
||||
return metricVec[metricVec.size() / 2].shardBytes;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
|
||||
PromiseStream<GetMetricsListRequest> getShardMetricsList) {
|
||||
ErrorOr<Standalone<VectorRef<DDMetricsRef>>> result = wait(
|
||||
errorOr(brokenPromiseToNever(getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit)))));
|
||||
|
||||
if (result.isError()) {
|
||||
req.reply.sendError(result.getError());
|
||||
} else {
|
||||
GetDataDistributorMetricsReply rep;
|
||||
if (!req.midOnly) {
|
||||
rep.storageMetricsList = result.get();
|
||||
} else {
|
||||
auto& metricVec = result.get();
|
||||
if (metricVec.empty())
|
||||
rep.midShardSize = 0;
|
||||
else {
|
||||
rep.midShardSize = getMedianShardSize(metricVec.contents());
|
||||
}
|
||||
}
|
||||
req.reply.send(rep);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db ) {
|
||||
state Reference<DataDistributorData> self( new DataDistributorData(db, di.id()) );
|
||||
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
|
||||
|
|
|
@ -70,12 +70,13 @@ struct HaltDataDistributorRequest {
|
|||
struct GetDataDistributorMetricsReply {
|
||||
constexpr static FileIdentifier file_identifier = 1284337;
|
||||
Standalone<VectorRef<DDMetricsRef>> storageMetricsList;
|
||||
Optional<int64_t> midShardSize;
|
||||
|
||||
GetDataDistributorMetricsReply() {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar,storageMetricsList);
|
||||
serializer(ar, storageMetricsList, midShardSize);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -84,13 +85,15 @@ struct GetDataDistributorMetricsRequest {
|
|||
KeyRange keys;
|
||||
int shardLimit;
|
||||
ReplyPromise<struct GetDataDistributorMetricsReply> reply;
|
||||
bool midOnly = false;
|
||||
|
||||
GetDataDistributorMetricsRequest() {}
|
||||
explicit GetDataDistributorMetricsRequest(KeyRange const& keys, const int shardLimit) : keys(keys), shardLimit(shardLimit) {}
|
||||
explicit GetDataDistributorMetricsRequest(KeyRange const& keys, const int shardLimit, bool midOnly = false)
|
||||
: keys(keys), shardLimit(shardLimit), midOnly(midOnly) {}
|
||||
|
||||
template<class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, keys, shardLimit, reply);
|
||||
serializer(ar, keys, shardLimit, reply, midOnly);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -257,10 +257,9 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
|
|||
tagCounts[priorityThrottles.first] = (*transactionTagCounter)[priorityThrottles.first];
|
||||
}
|
||||
}
|
||||
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(
|
||||
GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter,
|
||||
TransactionTagMap<TransactionCommitCostEstimation>(), detailed)));
|
||||
transactionTagCounter->clear();
|
||||
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(
|
||||
myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter, detailed)));
|
||||
transactionTagCounter->clear();
|
||||
expectingDetailedReply = detailed;
|
||||
}
|
||||
when ( GetRateInfoReply rep = wait(reply) ) {
|
||||
|
@ -437,7 +436,9 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, Grv
|
|||
}
|
||||
|
||||
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests,
|
||||
GrvProxyStats* stats, Version minKnownCommittedVersion, PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags) {
|
||||
GrvProxyStats* stats, Version minKnownCommittedVersion,
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags,
|
||||
int64_t midShardSize = 0) {
|
||||
GetReadVersionReply _reply = wait(replyFuture);
|
||||
GetReadVersionReply reply = _reply;
|
||||
Version replyVersion = reply.version;
|
||||
|
@ -460,7 +461,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
|
|||
else {
|
||||
reply.version = replyVersion;
|
||||
}
|
||||
|
||||
reply.midShardSize = midShardSize;
|
||||
reply.tagThrottleInfo.clear();
|
||||
|
||||
if(!request.tags.empty()) {
|
||||
|
@ -493,14 +494,53 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getReadVersionServer(
|
||||
GrvProxyInterface proxy,
|
||||
Reference<AsyncVar<ServerDBInfo>> db,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
GrvProxyData* grvProxyData,
|
||||
GetHealthMetricsReply* healthMetricsReply,
|
||||
GetHealthMetricsReply* detailedHealthMetricsReply)
|
||||
{
|
||||
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo>> db) {
|
||||
state Future<Void> nextRequestTimer = Never();
|
||||
state Future<GetDataDistributorMetricsReply> nextReply = Never();
|
||||
|
||||
if (db->get().distributor.present()) nextRequestTimer = Void();
|
||||
loop {
|
||||
try {
|
||||
choose {
|
||||
when(wait(db->onChange())) {
|
||||
if (db->get().distributor.present()) {
|
||||
TraceEvent("DataDistributorChanged", db->get().id)
|
||||
.detail("DDID", db->get().distributor.get().id());
|
||||
nextRequestTimer = Void();
|
||||
} else {
|
||||
TraceEvent("DataDistributorDied", db->get().id);
|
||||
nextRequestTimer = Never();
|
||||
}
|
||||
nextReply = Never();
|
||||
}
|
||||
when(wait(nextRequestTimer)) {
|
||||
nextRequestTimer = Never();
|
||||
if (db->get().distributor.present()) {
|
||||
nextReply = brokenPromiseToNever(db->get().distributor.get().dataDistributorMetrics.getReply(
|
||||
GetDataDistributorMetricsRequest(normalKeys, CLIENT_KNOBS->TOO_MANY, true)));
|
||||
} else
|
||||
nextReply = Never();
|
||||
}
|
||||
when(GetDataDistributorMetricsReply reply = wait(nextReply)) {
|
||||
nextReply = Never();
|
||||
ASSERT(reply.midShardSize.present());
|
||||
*midShardSize = reply.midShardSize.get();
|
||||
nextRequestTimer = delay(CLIENT_KNOBS->MID_SHARD_SIZE_MAX_STALENESS);
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent("DDMidShardSizeUpdateFail").error(e);
|
||||
if (e.code() != error_code_timed_out && e.code() != error_code_dd_not_found) throw;
|
||||
nextRequestTimer = delay(CLIENT_KNOBS->MID_SHARD_SIZE_MAX_STALENESS);
|
||||
nextReply = Never();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db,
|
||||
PromiseStream<Future<Void>> addActor, GrvProxyData* grvProxyData,
|
||||
GetHealthMetricsReply* healthMetricsReply,
|
||||
GetHealthMetricsReply* detailedHealthMetricsReply) {
|
||||
state double lastGRVTime = 0;
|
||||
state PromiseStream<Void> GRVTimer;
|
||||
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
|
||||
|
@ -510,9 +550,9 @@ ACTOR static Future<Void> getReadVersionServer(
|
|||
state GrvTransactionRateInfo normalRateInfo(10);
|
||||
state GrvTransactionRateInfo batchRateInfo(0);
|
||||
|
||||
state SpannedDeque<GetReadVersionRequest> systemQueue("GP:getReadVersionServerSystemQueue"_loc);
|
||||
state SpannedDeque<GetReadVersionRequest> defaultQueue("GP:getReadVersionServerDefaultQueue"_loc);
|
||||
state SpannedDeque<GetReadVersionRequest> batchQueue("GP:getReadVersionServerBatchQueue"_loc);
|
||||
state SpannedDeque<GetReadVersionRequest> systemQueue("GP:transactionStarterSystemQueue"_loc);
|
||||
state SpannedDeque<GetReadVersionRequest> defaultQueue("GP:transactionStarterDefaultQueue"_loc);
|
||||
state SpannedDeque<GetReadVersionRequest> batchQueue("GP:transactionStarterBatchQueue"_loc);
|
||||
|
||||
state TransactionTagMap<uint64_t> transactionTagCounter;
|
||||
state PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
|
||||
|
@ -520,6 +560,9 @@ ACTOR static Future<Void> getReadVersionServer(
|
|||
state PromiseStream<double> normalGRVLatency;
|
||||
state Span span;
|
||||
|
||||
state int64_t midShardSize = SERVER_KNOBS->MIN_SHARD_BYTES;
|
||||
addActor.send(monitorDDMetricsChanges(&midShardSize, db));
|
||||
|
||||
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
|
||||
addActor.send(queueGetReadVersionRequests(db, &systemQueue, &defaultQueue, &batchQueue,
|
||||
proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime,
|
||||
|
@ -627,7 +670,8 @@ ACTOR static Future<Void> getReadVersionServer(
|
|||
batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted, systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(), elapsed);
|
||||
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "GrvProxyServer.getReadVersionServer.AskLiveCommittedVersionFromMaster");
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(),
|
||||
"GrvProxyServer.transactionStarter.AskLiveCommittedVersionFromMaster");
|
||||
}
|
||||
|
||||
for (int i = 0; i < start.size(); i++) {
|
||||
|
@ -635,7 +679,7 @@ ACTOR static Future<Void> getReadVersionServer(
|
|||
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(
|
||||
span.context, grvProxyData, i, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
|
||||
addActor.send(sendGrvReplies(readVersionReply, start[i], &grvProxyData->stats,
|
||||
grvProxyData->minKnownCommittedVersion, throttledTags));
|
||||
grvProxyData->minKnownCommittedVersion, throttledTags, midShardSize));
|
||||
|
||||
// Use normal priority transaction's GRV latency to dynamically calculate transaction batching interval.
|
||||
if (i == 0) {
|
||||
|
@ -673,7 +717,8 @@ ACTOR Future<Void> grvProxyServerCore(
|
|||
|
||||
grvProxyData.updateLatencyBandConfig(grvProxyData.db->get().latencyBandConfig);
|
||||
|
||||
addActor.send(getReadVersionServer(proxy, grvProxyData.db, addActor, &grvProxyData, &healthMetricsReply, &detailedHealthMetricsReply));
|
||||
addActor.send(transactionStarter(proxy, grvProxyData.db, addActor, &grvProxyData, &healthMetricsReply,
|
||||
&detailedHealthMetricsReply));
|
||||
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
|
||||
|
||||
if(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION > 0) {
|
||||
|
|
|
@ -369,6 +369,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( PROXY_COMPUTE_BUCKETS, 20000 );
|
||||
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
|
||||
init( TXN_STATE_SEND_AMOUNT, 4 );
|
||||
init( REPORT_TRANSACTION_COST_ESTIMATION_DELAY, 0.1 );
|
||||
|
||||
// Master Server
|
||||
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
|
||||
|
@ -560,9 +561,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( BEHIND_CHECK_COUNT, 2 );
|
||||
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
|
||||
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
|
||||
init( MIN_TAG_PAGES_READ_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_READ_RATE = 0;
|
||||
init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0;
|
||||
init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096;
|
||||
init( MIN_TAG_PAGES_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_RATE = 0;
|
||||
init( TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 1.0;
|
||||
init( READ_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) READ_COST_BYTE_FACTOR = 4096;
|
||||
init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, true ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = false;
|
||||
|
||||
//Wait Failure
|
||||
|
@ -662,6 +663,11 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
|
||||
// clang-format on
|
||||
|
||||
if(clientKnobs)
|
||||
clientKnobs->IS_ACCEPTABLE_DELAY = clientKnobs->IS_ACCEPTABLE_DELAY*std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS)/(5.0*VERSIONS_PER_SECOND);
|
||||
if (clientKnobs) {
|
||||
clientKnobs->IS_ACCEPTABLE_DELAY =
|
||||
clientKnobs->IS_ACCEPTABLE_DELAY *
|
||||
std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS) /
|
||||
(5.0 * VERSIONS_PER_SECOND);
|
||||
clientKnobs->INIT_MID_SHARD_BYTES = MIN_SHARD_BYTES;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -299,6 +299,7 @@ public:
|
|||
int PROXY_COMPUTE_BUCKETS;
|
||||
double PROXY_COMPUTE_GROWTH_RATE;
|
||||
int TXN_STATE_SEND_AMOUNT;
|
||||
double REPORT_TRANSACTION_COST_ESTIMATION_DELAY;
|
||||
|
||||
// Master Server
|
||||
double COMMIT_SLEEP_TIME;
|
||||
|
@ -489,9 +490,9 @@ public:
|
|||
int BEHIND_CHECK_COUNT;
|
||||
int64_t BEHIND_CHECK_VERSIONS;
|
||||
double WAIT_METRICS_WRONG_SHARD_CHANCE;
|
||||
int64_t MIN_TAG_PAGES_READ_RATE;
|
||||
double READ_TAG_MEASUREMENT_INTERVAL;
|
||||
int64_t OPERATION_COST_BYTE_FACTOR;
|
||||
int64_t MIN_TAG_PAGES_RATE;
|
||||
double TAG_MEASUREMENT_INTERVAL;
|
||||
int64_t READ_COST_BYTE_FACTOR;
|
||||
bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS;
|
||||
|
||||
//Wait Failure
|
||||
|
|
|
@ -45,6 +45,7 @@
|
|||
#include "fdbserver/MasterInterface.h"
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
#include "fdbserver/ProxyCommitData.actor.h"
|
||||
#include "fdbserver/RatekeeperInterface.h"
|
||||
#include "fdbserver/RecoveryState.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
|
@ -53,6 +54,7 @@
|
|||
#include "flow/IRandom.h"
|
||||
#include "flow/Knobs.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/Tracing.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -796,6 +798,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
continue;
|
||||
}
|
||||
|
||||
state bool checkSample = trs[self->transactionNum].commitCostEstimation.present();
|
||||
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
|
||||
state int mutationNum = 0;
|
||||
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
|
||||
for (; mutationNum < pMutations->size(); mutationNum++) {
|
||||
|
@ -818,6 +822,25 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
if (isSingleKeyMutation((MutationRef::Type) m.type)) {
|
||||
auto& tags = pProxyCommitData->tagsForKey(m.param1);
|
||||
|
||||
// sample single key mutation based on cost
|
||||
// the expectation of sampling is every COMMIT_SAMPLE_COST sample once
|
||||
if (checkSample) {
|
||||
double totalCosts = trCost->get().writeCosts;
|
||||
double cost = getWriteOperationCost(m.expectedSize());
|
||||
double mul = std::max(1.0, totalCosts / std::max(1.0, (double)CLIENT_KNOBS->COMMIT_SAMPLE_COST));
|
||||
ASSERT(totalCosts > 0);
|
||||
double prob = mul * cost / totalCosts;
|
||||
|
||||
if (deterministicRandom()->random01() < prob) {
|
||||
for (const auto& ssInfo : pProxyCommitData->keyInfo[m.param1].src_info) {
|
||||
auto id = ssInfo->interf.id();
|
||||
// scale cost
|
||||
cost = cost < CLIENT_KNOBS->COMMIT_SAMPLE_COST ? CLIENT_KNOBS->COMMIT_SAMPLE_COST : cost;
|
||||
pProxyCommitData->updateSSTagCost(id, trs[self->transactionNum].tagSet.get(), m, cost);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(pProxyCommitData->singleKeyMutationEvent->enabled) {
|
||||
KeyRangeRef shard = pProxyCommitData->keyInfo.rangeContaining(m.param1).range();
|
||||
pProxyCommitData->singleKeyMutationEvent->tag1 = (int64_t)tags[0].id;
|
||||
|
@ -846,6 +869,17 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
|
||||
ranges.begin().value().populateTags();
|
||||
self->toCommit.addTags(ranges.begin().value().tags);
|
||||
|
||||
// check whether clear is sampled
|
||||
if (checkSample && !trCost->get().clearIdxCosts.empty() &&
|
||||
trCost->get().clearIdxCosts[0].first == mutationNum) {
|
||||
for (const auto& ssInfo : ranges.begin().value().src_info) {
|
||||
auto id = ssInfo->interf.id();
|
||||
pProxyCommitData->updateSSTagCost(id, trs[self->transactionNum].tagSet.get(), m,
|
||||
trCost->get().clearIdxCosts[0].second);
|
||||
}
|
||||
trCost->get().clearIdxCosts.pop_front();
|
||||
}
|
||||
}
|
||||
else {
|
||||
TEST(true); //A clear range extends past a shard boundary
|
||||
|
@ -853,6 +887,17 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
for (auto r : ranges) {
|
||||
r.value().populateTags();
|
||||
allSources.insert(r.value().tags.begin(), r.value().tags.end());
|
||||
|
||||
// check whether clear is sampled
|
||||
if (checkSample && !trCost->get().clearIdxCosts.empty() &&
|
||||
trCost->get().clearIdxCosts[0].first == mutationNum) {
|
||||
for (const auto& ssInfo : r.value().src_info) {
|
||||
auto id = ssInfo->interf.id();
|
||||
pProxyCommitData->updateSSTagCost(id, trs[self->transactionNum].tagSet.get(), m,
|
||||
trCost->get().clearIdxCosts[0].second);
|
||||
}
|
||||
trCost->get().clearIdxCosts.pop_front();
|
||||
}
|
||||
}
|
||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m).detail("Dbgid", pProxyCommitData->dbgid).detail("To", allSources).detail("Mutation", m);
|
||||
|
||||
|
@ -901,6 +946,11 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (checkSample) {
|
||||
self->pProxyCommitData->stats.txnExpensiveClearCostEstCount +=
|
||||
trs[self->transactionNum].commitCostEstimation.get().expensiveCostEstCount;
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -1118,15 +1168,6 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || tr.isLockAware())) {
|
||||
ASSERT_WE_THINK(self->commitVersion != invalidVersion);
|
||||
tr.reply.send(CommitID(self->commitVersion, t, self->metadataVersionAfter));
|
||||
|
||||
// aggregate commit cost estimation if committed
|
||||
ASSERT(tr.commitCostEstimation.present() == tr.tagSet.present());
|
||||
if (tr.tagSet.present()) {
|
||||
TransactionCommitCostEstimation& costEstimation = tr.commitCostEstimation.get();
|
||||
for (auto& tag : tr.tagSet.get()) {
|
||||
pProxyCommitData->transactionTagCommitCostEst[tag] += costEstimation;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (self->committed[t] == ConflictBatch::TransactionTooOld) {
|
||||
tr.reply.sendError(transaction_too_old());
|
||||
|
@ -1219,6 +1260,7 @@ ACTOR Future<Void> commitBatch(
|
|||
|
||||
context.pProxyCommitData->lastVersionTime = context.startTime;
|
||||
++context.pProxyCommitData->stats.commitBatchIn;
|
||||
context.setupTraceBatch();
|
||||
|
||||
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
|
||||
wait(CommitBatch::preresolutionProcessing(&context));
|
||||
|
@ -1377,8 +1419,14 @@ ACTOR Future<Void> ddMetricsRequestServer(MasterProxyInterface proxy, Reference<
|
|||
choose {
|
||||
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture()))
|
||||
{
|
||||
ErrorOr<GetDataDistributorMetricsReply> reply = wait(errorOr(db->get().distributor.get().dataDistributorMetrics.getReply(GetDataDistributorMetricsRequest(req.keys, req.shardLimit))));
|
||||
if ( reply.isError() ) {
|
||||
if (!db->get().distributor.present()) {
|
||||
req.reply.sendError(dd_not_found());
|
||||
continue;
|
||||
}
|
||||
ErrorOr<GetDataDistributorMetricsReply> reply =
|
||||
wait(errorOr(db->get().distributor.get().dataDistributorMetrics.getReply(
|
||||
GetDataDistributorMetricsRequest(req.keys, req.shardLimit))));
|
||||
if (reply.isError()) {
|
||||
req.reply.sendError(reply.getError());
|
||||
} else {
|
||||
GetDDMetricsReply newReply;
|
||||
|
@ -1492,7 +1540,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
// send a snap request to DD
|
||||
if (!commitData->db->get().distributor.present()) {
|
||||
TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest");
|
||||
throw operation_failed();
|
||||
throw dd_not_found();
|
||||
}
|
||||
state Future<ErrorOr<Void>> ddSnapReq =
|
||||
commitData->db->get().distributor.get().distributorSnapReq.tryGetReply(DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID));
|
||||
|
@ -1551,6 +1599,38 @@ ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db,
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> reportTxnTagCommitCost(UID myID, Reference<AsyncVar<ServerDBInfo>> db,
|
||||
UIDTransactionTagMap<TransactionCommitCostEstimation>* ssTrTagCommitCost) {
|
||||
state Future<Void> nextRequestTimer = Never();
|
||||
state Future<Void> nextReply = Never();
|
||||
if (db->get().ratekeeper.present()) nextRequestTimer = Void();
|
||||
loop choose {
|
||||
when(wait(db->onChange())) {
|
||||
if (db->get().ratekeeper.present()) {
|
||||
TraceEvent("ProxyRatekeeperChanged", myID).detail("RKID", db->get().ratekeeper.get().id());
|
||||
nextRequestTimer = Void();
|
||||
} else {
|
||||
TraceEvent("ProxyRatekeeperDied", myID);
|
||||
nextRequestTimer = Never();
|
||||
}
|
||||
}
|
||||
when(wait(nextRequestTimer)) {
|
||||
nextRequestTimer = Never();
|
||||
if (db->get().ratekeeper.present()) {
|
||||
nextReply = brokenPromiseToNever(db->get().ratekeeper.get().reportCommitCostEstimation.getReply(
|
||||
ReportCommitCostEstimationRequest(*ssTrTagCommitCost)));
|
||||
} else {
|
||||
nextReply = Never();
|
||||
}
|
||||
}
|
||||
when(wait(nextReply)) {
|
||||
nextReply = Never();
|
||||
ssTrTagCommitCost->clear();
|
||||
nextRequestTimer = delay(SERVER_KNOBS->REPORT_TRANSACTION_COST_ESTIMATION_DELAY);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> masterProxyServerCore(
|
||||
MasterProxyInterface proxy,
|
||||
MasterInterface master,
|
||||
|
@ -1612,6 +1692,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
addActor.send(readRequestServer(proxy, addActor, &commitData));
|
||||
addActor.send(rejoinServer(proxy, &commitData));
|
||||
addActor.send(ddMetricsRequestServer(proxy, db));
|
||||
addActor.send(reportTxnTagCommitCost(proxy.id(), db, &commitData.ssTrTagCommitCost));
|
||||
|
||||
// wait for txnStateStore recovery
|
||||
wait(success(commitData.txnStateStore->readValue(StringRef())));
|
||||
|
|
|
@ -57,6 +57,7 @@ struct ProxyStats {
|
|||
Counter mutations;
|
||||
Counter conflictRanges;
|
||||
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
|
||||
Counter txnExpensiveClearCostEstCount;
|
||||
Version lastCommitVersionAssigned;
|
||||
|
||||
LatencySample commitLatencySample;
|
||||
|
@ -94,15 +95,14 @@ struct ProxyStats {
|
|||
int64_t* commitBatchesMemBytesCountPtr)
|
||||
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
|
||||
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
|
||||
txnCommitIn("TxnCommitIn", cc),
|
||||
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
|
||||
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
|
||||
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
|
||||
txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc),
|
||||
txnCommitIn("TxnCommitIn", cc), txnCommitVersionAssigned("TxnCommitVersionAssigned", cc),
|
||||
txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc),
|
||||
txnCommitOut("TxnCommitOut", cc), txnCommitOutSuccess("TxnCommitOutSuccess", cc),
|
||||
txnCommitErrors("TxnCommitErrors", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc),
|
||||
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
|
||||
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
|
||||
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
|
||||
lastCommitVersionAssigned(0),
|
||||
lastCommitVersionAssigned(0), txnExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
|
||||
|
@ -171,7 +171,7 @@ struct ProxyCommitData {
|
|||
NotifiedDouble lastCommitTime;
|
||||
|
||||
vector<double> commitComputePerOperation;
|
||||
TransactionTagMap<TransactionCommitCostEstimation> transactionTagCommitCostEst;
|
||||
UIDTransactionTagMap<TransactionCommitCostEstimation> ssTrTagCommitCost;
|
||||
|
||||
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
|
||||
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
|
||||
|
@ -180,13 +180,7 @@ struct ProxyCommitData {
|
|||
auto& tags = keyInfo[key].tags;
|
||||
if (!tags.size()) {
|
||||
auto& r = keyInfo.rangeContaining(key).value();
|
||||
for (auto info : r.src_info) {
|
||||
r.tags.push_back(info->tag);
|
||||
}
|
||||
for (auto info : r.dest_info) {
|
||||
r.tags.push_back(info->tag);
|
||||
}
|
||||
uniquify(r.tags);
|
||||
r.populateTags();
|
||||
return r.tags;
|
||||
}
|
||||
return tags;
|
||||
|
@ -218,6 +212,18 @@ struct ProxyCommitData {
|
|||
latencyBandConfig = newLatencyBandConfig;
|
||||
}
|
||||
|
||||
void updateSSTagCost(const UID& id, const TagSet& tagSet, MutationRef m, int cost) {
|
||||
auto [it, _] = ssTrTagCommitCost.try_emplace(id, TransactionTagMap<TransactionCommitCostEstimation>());
|
||||
|
||||
for (auto& tag : tagSet) {
|
||||
auto& costItem = it->second[tag];
|
||||
if (m.isAtomicOp() || m.type == MutationRef::Type::SetValue || m.type == MutationRef::Type::ClearRange) {
|
||||
costItem.opsSum++;
|
||||
costItem.costSum += cost;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion,
|
||||
Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit,
|
||||
Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
|
||||
|
|
|
@ -97,17 +97,21 @@ struct StorageQueueInfo {
|
|||
Smoother smoothTotalSpace;
|
||||
limitReason_t limitReason;
|
||||
|
||||
Optional<TransactionTag> busiestTag;
|
||||
double busiestTagFractionalBusyness;
|
||||
double busiestTagRate;
|
||||
Optional<TransactionTag> busiestReadTag, busiestWriteTag;
|
||||
double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0;
|
||||
double busiestReadTagRate = 0, busiestWriteTagRate = 0;
|
||||
|
||||
// refresh periodically
|
||||
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
|
||||
uint64_t totalWriteCosts = 0;
|
||||
int totalWriteOps = 0;
|
||||
|
||||
StorageQueueInfo(UID id, LocalityData locality)
|
||||
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited), busiestTagFractionalBusyness(0),
|
||||
busiestTagRate(0) {
|
||||
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
limitReason(limitReason_t::unlimited) {
|
||||
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
|
||||
lastReply.instanceID = -1;
|
||||
}
|
||||
|
@ -551,6 +555,7 @@ struct RatekeeperData {
|
|||
|
||||
double lastWarning;
|
||||
double lastSSListFetchedTimestamp;
|
||||
double lastBusiestCommitTagPick;
|
||||
|
||||
RkTagThrottleCollection throttledTags;
|
||||
uint64_t throttledTagChangeId;
|
||||
|
@ -570,7 +575,7 @@ struct RatekeeperData {
|
|||
smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()),
|
||||
throttledTagChangeId(0),
|
||||
throttledTagChangeId(0), lastBusiestCommitTagPick(0),
|
||||
normalLimits(TransactionPriority::DEFAULT, "", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
|
||||
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG,
|
||||
SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE,
|
||||
|
@ -615,9 +620,9 @@ ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageSer
|
|||
myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
|
||||
}
|
||||
|
||||
myQueueInfo->value.busiestTag = reply.get().busiestTag;
|
||||
myQueueInfo->value.busiestTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
|
||||
myQueueInfo->value.busiestTagRate = reply.get().busiestTagRate;
|
||||
myQueueInfo->value.busiestReadTag = reply.get().busiestTag;
|
||||
myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
|
||||
myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate;
|
||||
} else {
|
||||
if(myQueueInfo->value.valid) {
|
||||
TraceEvent("RkStorageServerDidNotRespond", self->id)
|
||||
|
@ -852,23 +857,87 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
|||
}
|
||||
}
|
||||
|
||||
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
|
||||
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST) {
|
||||
TEST(true); // Transaction tag auto-throttled
|
||||
Future<Void> refreshStorageServerCommitCost(RatekeeperData* self) {
|
||||
if (self->lastBusiestCommitTagPick == 0) { // the first call should be skipped
|
||||
self->lastBusiestCommitTagPick = now();
|
||||
return Void();
|
||||
}
|
||||
double elapsed = now() - self->lastBusiestCommitTagPick;
|
||||
// for each SS, select the busiest commit tag from ssTrTagCommitCost
|
||||
for (auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) {
|
||||
it->value.busiestWriteTag.reset();
|
||||
TransactionTag busiestTag;
|
||||
TransactionCommitCostEstimation maxCost;
|
||||
double maxRate = 0, maxBusyness = 0;
|
||||
for (const auto& [tag, cost] : it->value.tagCostEst) {
|
||||
double rate = cost.getOpsSum() / elapsed;
|
||||
if (rate > maxRate) {
|
||||
busiestTag = tag;
|
||||
maxRate = rate;
|
||||
maxCost = cost;
|
||||
}
|
||||
}
|
||||
if (maxRate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) {
|
||||
it->value.busiestWriteTag = busiestTag;
|
||||
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps);
|
||||
ASSERT(it->value.totalWriteCosts > 0);
|
||||
maxBusyness = double(maxCost.getCostSum()) / it->value.totalWriteCosts;
|
||||
it->value.busiestWriteTagFractionalBusyness = maxBusyness;
|
||||
it->value.busiestWriteTagRate = maxRate;
|
||||
}
|
||||
|
||||
Optional<double> clientRate = self->throttledTags.autoThrottleTag(self->id, ss.busiestTag.get(), ss.busiestTagFractionalBusyness);
|
||||
if(clientRate.present()) {
|
||||
TraceEvent("BusiestWriteTag", it->key)
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("Tag", printable(busiestTag))
|
||||
.detail("TagOps", maxCost.getOpsSum())
|
||||
.detail("TagCosts", maxCost.getCostSum())
|
||||
.detail("TagRate", maxRate)
|
||||
.detail("TagBusyness", maxBusyness)
|
||||
.detail("Reported", it->value.busiestWriteTag.present())
|
||||
.trackLatest(it->key.toString() + "/BusiestWriteTag");
|
||||
|
||||
// reset statistics
|
||||
it->value.tagCostEst.clear();
|
||||
it->value.totalWriteOps = 0;
|
||||
it->value.totalWriteCosts = 0;
|
||||
}
|
||||
self->lastBusiestCommitTagPick = now();
|
||||
return Void();
|
||||
}
|
||||
|
||||
void tryAutoThrottleTag(RatekeeperData* self, TransactionTag tag, double rate, double busyness,
|
||||
TagThrottledReason reason) {
|
||||
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
|
||||
TEST(true); // Transaction tag auto-throttled
|
||||
Optional<double> clientRate = self->throttledTags.autoThrottleTag(self->id, tag, busyness);
|
||||
if (clientRate.present()) {
|
||||
TagSet tags;
|
||||
tags.addTag(ss.busiestTag.get());
|
||||
tags.addTag(tag);
|
||||
|
||||
self->addActor.send(ThrottleApi::throttleTags(
|
||||
self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO,
|
||||
TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
|
||||
TagThrottledReason::BUSY_READ));
|
||||
TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, reason));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void tryAutoThrottleTag(RatekeeperData* self, StorageQueueInfo& ss, int64_t storageQueue,
|
||||
int64_t storageDurabilityLag) {
|
||||
// TODO: reasonable criteria for write satuation should be investigated in experiment
|
||||
// if (ss.busiestWriteTag.present() && storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES &&
|
||||
// storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
|
||||
// // write-saturated
|
||||
// tryAutoThrottleTag(self, ss.busiestWriteTag.get(), ss.busiestWriteTagRate,
|
||||
//ss.busiestWriteTagFractionalBusyness); } else
|
||||
if (ss.busiestReadTag.present() &&
|
||||
(storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
|
||||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
|
||||
// read saturated
|
||||
tryAutoThrottleTag(self, ss.busiestReadTag.get(), ss.busiestReadTagRate, ss.busiestReadTagFractionalBusyness,
|
||||
TagThrottledReason::BUSY_READ);
|
||||
}
|
||||
}
|
||||
|
||||
void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
||||
//double controlFactor = ; // dt / eFoldingTime
|
||||
|
||||
|
@ -936,8 +1005,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
|
||||
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
|
||||
|
||||
if(limits->priority == TransactionPriority::DEFAULT && (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
|
||||
tryAutoThrottleTag(self, ss);
|
||||
if (limits->priority == TransactionPriority::DEFAULT) {
|
||||
tryAutoThrottleTag(self, ss, storageQueue, storageDurabilityLag);
|
||||
}
|
||||
|
||||
double inputRate = ss.smoothInputBytes.smoothRate();
|
||||
|
@ -1008,6 +1077,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
break;
|
||||
}
|
||||
|
||||
// Calculate limited durability lag
|
||||
int64_t limitingDurabilityLag = 0;
|
||||
|
||||
std::set<Optional<Standalone<StringRef>>> ignoredDurabilityLagMachines;
|
||||
|
@ -1219,6 +1289,19 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
}
|
||||
}
|
||||
|
||||
static void updateCommitCostEstimation(RatekeeperData* self,
|
||||
UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation) {
|
||||
for (auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) {
|
||||
auto tagCostIt = costEstimation.find(it->key);
|
||||
if (tagCostIt == costEstimation.end()) continue;
|
||||
for (const auto& [tagName, cost] : tagCostIt->second) {
|
||||
it->value.tagCostEst[tagName] += cost;
|
||||
it->value.totalWriteCosts += cost.getCostSum();
|
||||
it->value.totalWriteOps += cost.getOpsSum();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> configurationMonitor(RatekeeperData *self) {
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(self->db);
|
||||
|
@ -1261,6 +1344,9 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
self.addActor.send( traceRole(Role::RATEKEEPER, rkInterf.id()) );
|
||||
|
||||
self.addActor.send(monitorThrottlingChanges(&self));
|
||||
RatekeeperData* selfPtr = &self; // let flow compiler capture self
|
||||
self.addActor.send(
|
||||
recurring([selfPtr]() { refreshStorageServerCommitCost(selfPtr); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
|
||||
|
||||
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()).detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
|
||||
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
|
||||
|
@ -1303,10 +1389,6 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
for(auto tag : req.throttledTagCounts) {
|
||||
self.throttledTags.addRequests(tag.first, tag.second);
|
||||
}
|
||||
// TODO process commitCostEstimation
|
||||
// for (const auto &[tagName, cost] : req.throttledTagCommitCostEst) {
|
||||
//
|
||||
// }
|
||||
}
|
||||
if(p.batchTransactions > 0) {
|
||||
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batchTransactions );
|
||||
|
@ -1339,6 +1421,10 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
}
|
||||
when(ReportCommitCostEstimationRequest req = waitNext(rkInterf.reportCommitCostEstimation.getFuture())) {
|
||||
updateCommitCostEstimation(&self, req.ssTrTagCommitCost);
|
||||
req.reply.send(Void());
|
||||
}
|
||||
when (wait(err.getFuture())) {}
|
||||
when (wait(dbInfo->onChange())) {
|
||||
if( tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs() ) {
|
||||
|
|
|
@ -30,6 +30,7 @@ struct RatekeeperInterface {
|
|||
RequestStream<ReplyPromise<Void>> waitFailure;
|
||||
RequestStream<struct GetRateInfoRequest> getRateInfo;
|
||||
RequestStream<struct HaltRatekeeperRequest> haltRatekeeper;
|
||||
RequestStream<struct ReportCommitCostEstimationRequest> reportCommitCostEstimation;
|
||||
struct LocalityData locality;
|
||||
UID myId;
|
||||
|
||||
|
@ -48,7 +49,7 @@ struct RatekeeperInterface {
|
|||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, waitFailure, getRateInfo, haltRatekeeper, locality, myId);
|
||||
serializer(ar, waitFailure, getRateInfo, haltRatekeeper, reportCommitCostEstimation, locality, myId);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -76,31 +77,35 @@ struct ClientTagThrottleLimits {
|
|||
};
|
||||
|
||||
struct TransactionCommitCostEstimation {
|
||||
int numWrite = 0;
|
||||
int numAtomicWrite = 0;
|
||||
int numClear = 0;
|
||||
int numClearShards = 0;
|
||||
uint64_t bytesWrite = 0;
|
||||
uint64_t bytesAtomicWrite = 0;
|
||||
uint64_t bytesClearEst = 0;
|
||||
int opsSum = 0;
|
||||
uint64_t costSum = 0;
|
||||
|
||||
uint64_t getCostSum() const { return costSum; }
|
||||
int getOpsSum() const { return opsSum; }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear, numClearShards);
|
||||
serializer(ar, opsSum, costSum);
|
||||
}
|
||||
|
||||
TransactionCommitCostEstimation& operator+=(const TransactionCommitCostEstimation& other) {
|
||||
numWrite += other.numWrite;
|
||||
numAtomicWrite += other.numAtomicWrite;
|
||||
numClear += other.numClear;
|
||||
bytesWrite += other.bytesWrite;
|
||||
bytesAtomicWrite += other.numAtomicWrite;
|
||||
numClearShards += other.numClearShards;
|
||||
bytesClearEst += other.bytesClearEst;
|
||||
opsSum += other.opsSum;
|
||||
costSum += other.costSum;
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
|
||||
struct ClientTrCommitCostEstimation {
|
||||
int opsCount = 0;
|
||||
uint64_t writeCosts = 0;
|
||||
std::deque<std::pair<int, uint64_t>> clearIdxCosts;
|
||||
uint32_t expensiveCostEstCount = 0;
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, opsCount, writeCosts, clearIdxCosts, expensiveCostEstCount);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetRateInfoReply {
|
||||
constexpr static FileIdentifier file_identifier = 7845006;
|
||||
double transactionRate;
|
||||
|
@ -123,21 +128,20 @@ struct GetRateInfoRequest {
|
|||
int64_t batchReleasedTransactions;
|
||||
|
||||
TransactionTagMap<uint64_t> throttledTagCounts;
|
||||
TransactionTagMap<TransactionCommitCostEstimation> throttledTagCommitCostEst;
|
||||
bool detailed;
|
||||
ReplyPromise<struct GetRateInfoReply> reply;
|
||||
|
||||
GetRateInfoRequest() {}
|
||||
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions,
|
||||
TransactionTagMap<uint64_t> throttledTagCounts,
|
||||
TransactionTagMap<TransactionCommitCostEstimation> throttledTagCommitCostEst, bool detailed)
|
||||
TransactionTagMap<uint64_t> throttledTagCounts, bool detailed)
|
||||
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions),
|
||||
batchReleasedTransactions(batchReleasedTransactions), throttledTagCounts(throttledTagCounts),
|
||||
throttledTagCommitCostEst(throttledTagCommitCostEst), detailed(detailed) {}
|
||||
detailed(detailed) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply, throttledTagCommitCostEst);
|
||||
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed,
|
||||
reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -155,4 +159,19 @@ struct HaltRatekeeperRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct ReportCommitCostEstimationRequest {
|
||||
constexpr static FileIdentifier file_identifier = 8314904;
|
||||
UIDTransactionTagMap<TransactionCommitCostEstimation> ssTrTagCommitCost;
|
||||
ReplyPromise<Void> reply;
|
||||
|
||||
ReportCommitCostEstimationRequest() {}
|
||||
ReportCommitCostEstimationRequest(UIDTransactionTagMap<TransactionCommitCostEstimation> ssTrTagCommitCost)
|
||||
: ssTrTagCommitCost(ssTrTagCommitCost) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ssTrTagCommitCost, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif //FDBSERVER_RATEKEEPERINTERFACE_H
|
||||
|
|
|
@ -476,9 +476,7 @@ public:
|
|||
|
||||
Optional<TagInfo> previousBusiestTag;
|
||||
|
||||
int64_t costFunction(int64_t bytes) {
|
||||
return bytes / SERVER_KNOBS->OPERATION_COST_BYTE_FACTOR + 1;
|
||||
}
|
||||
int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
||||
|
||||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
if(tags.present()) {
|
||||
|
@ -502,7 +500,7 @@ public:
|
|||
previousBusiestTag.reset();
|
||||
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
||||
double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
|
||||
if(rate > SERVER_KNOBS->MIN_TAG_PAGES_READ_RATE) {
|
||||
if (rate > SERVER_KNOBS->MIN_TAG_PAGES_RATE) {
|
||||
previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
|
||||
}
|
||||
|
||||
|
@ -3780,7 +3778,8 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
|
||||
|
||||
self->transactionTagCounter.startNewInterval(self->thisServerID);
|
||||
self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
|
||||
self->actors.add(recurring([&]() { self->transactionTagCounter.startNewInterval(self->thisServerID); },
|
||||
SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
|
||||
|
||||
self->coreStarted.send( Void() );
|
||||
|
||||
|
|
|
@ -1258,6 +1258,7 @@ ACTOR Future<Void> workerServer(
|
|||
DUMPTOKEN( recruited.waitFailure );
|
||||
DUMPTOKEN( recruited.getRateInfo );
|
||||
DUMPTOKEN( recruited.haltRatekeeper );
|
||||
DUMPTOKEN(recruited.reportCommitCostEstimation);
|
||||
|
||||
Future<Void> ratekeeperProcess = ratekeeper(recruited, dbInfo);
|
||||
errorForwarders.add(
|
||||
|
|
|
@ -73,6 +73,7 @@ ERROR( connection_idle, 1049, "Connection closed after idle timeout" )
|
|||
ERROR( disk_adapter_reset, 1050, "The disk queue adpater reset" )
|
||||
ERROR( batch_transaction_throttled, 1051, "Batch GRV request rate limit exceeded")
|
||||
ERROR( dd_cancelled, 1052, "Data distribution components cancelled")
|
||||
ERROR( dd_not_found, 1053, "Data distributor not found")
|
||||
|
||||
ERROR( broken_promise, 1100, "Broken promise" )
|
||||
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include <typeinfo>
|
||||
#include <typeindex>
|
||||
#include <unordered_map>
|
||||
#include <deque>
|
||||
#include "flow/FileIdentifier.h"
|
||||
#include "flow/ObjectSerializerTraits.h"
|
||||
|
||||
|
@ -129,6 +130,33 @@ struct vector_like_traits<std::vector<T, Alloc>> : std::true_type {
|
|||
}
|
||||
};
|
||||
|
||||
template <class T, class Alloc>
|
||||
struct vector_like_traits<std::deque<T, Alloc>> : std::true_type {
|
||||
using Deq = std::deque<T, Alloc>;
|
||||
using value_type = typename Deq::value_type;
|
||||
using iterator = typename Deq::const_iterator;
|
||||
using insert_iterator = std::back_insert_iterator<Deq>;
|
||||
|
||||
template <class Context>
|
||||
static size_t num_entries(const Deq& v, Context&) {
|
||||
return v.size();
|
||||
}
|
||||
template <class Context>
|
||||
static void reserve(Deq& v, size_t size, Context&) {
|
||||
v.resize(size);
|
||||
v.clear();
|
||||
}
|
||||
|
||||
template <class Context>
|
||||
static insert_iterator insert(Deq& v, Context&) {
|
||||
return std::back_inserter(v);
|
||||
}
|
||||
template <class Context>
|
||||
static iterator begin(const Deq& v, Context&) {
|
||||
return v.begin();
|
||||
}
|
||||
};
|
||||
|
||||
template <class T, size_t N>
|
||||
struct vector_like_traits<std::array<T, N>> : std::true_type {
|
||||
using Vec = std::array<T, N>;
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "flow/FileIdentifier.h"
|
||||
#include "flow/ObjectSerializer.h"
|
||||
#include <algorithm>
|
||||
#include <deque>
|
||||
|
||||
// Though similar, is_binary_serializable cannot be replaced by std::is_pod, as doing so would prefer
|
||||
// memcpy over a defined serialize() method on a POD struct. As not all of our structs are packed,
|
||||
|
@ -172,6 +173,26 @@ inline void save( Archive& ar, const std::vector<T>& value ) {
|
|||
}
|
||||
template <class Archive, class T>
|
||||
inline void load( Archive& ar, std::vector<T>& value ) {
|
||||
int s;
|
||||
ar >> s;
|
||||
value.clear();
|
||||
value.reserve(s);
|
||||
for (int i = 0; i < s; i++) {
|
||||
value.push_back(T());
|
||||
ar >> value[i];
|
||||
}
|
||||
ASSERT(ar.protocolVersion().isValid());
|
||||
}
|
||||
|
||||
template <class Archive, class T>
|
||||
inline void save(Archive& ar, const std::deque<T>& value) {
|
||||
ar << (int)value.size();
|
||||
for (auto it = value.begin(); it != value.end(); ++it) ar << *it;
|
||||
ASSERT(ar.protocolVersion().isValid());
|
||||
}
|
||||
|
||||
template <class Archive, class T>
|
||||
inline void load(Archive& ar, std::deque<T>& value) {
|
||||
int s;
|
||||
ar >> s;
|
||||
value.clear();
|
||||
|
|
Loading…
Reference in New Issue