Merge pull request #3639 from xumengpanda/tmp/merge-PR

Merge release 6.3 to master and resolve conflicts
This commit is contained in:
Meng Xu 2020-08-12 22:52:35 -07:00 committed by GitHub
commit b1e9e9cd79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 63 deletions

View File

@ -847,7 +847,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
: connectionFile(connectionFile), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID),
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"),
transactionReadVersions("ReadVersions", cc),
transactionReadVersions("ReadVersions", cc),
transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc),
transactionReadVersionBatches("ReadVersionBatches", cc),
@ -991,19 +991,19 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc), transactionReadVersionBatches("ReadVersionBatches", cc), transactionBatchReadVersions("BatchPriorityReadVersions", cc),
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc), transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc), transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc), transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc), transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc),
transactionBytesRead("BytesRead", cc), transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc), transactionCommittedMutations("CommittedMutations", cc),
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc), transactionClearMutations("ClearMutations", cc),
transactionAtomicMutations("AtomicMutations", cc), transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc), transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc), transactionsTooOld("TooOld", cc),
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc), transactionReadVersionBatches("ReadVersionBatches", cc), transactionBatchReadVersions("BatchPriorityReadVersions", cc),
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc), transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc), transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
transactionImmediateReadVersionsCompleted("ImmediatePriorityReadVersionsCompleted", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc), transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
transactionGetRangeRequests("GetRangeRequests", cc), transactionWatchRequests("WatchRequests", cc), transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc),
transactionBytesRead("BytesRead", cc), transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc), transactionCommittedMutations("CommittedMutations", cc),
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionSetMutations("SetMutations", cc), transactionClearMutations("ClearMutations", cc),
transactionAtomicMutations("AtomicMutations", cc), transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc),
transactionKeyServerLocationRequests("KeyServerLocationRequests", cc), transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc), transactionsTooOld("TooOld", cc),
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc),
transactionsResourceConstrained("ResourceConstrained", cc), transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
internal(false) {}
@ -2323,7 +2323,7 @@ void getRangeFinished(Database cx, Reference<TransactionLogInfo> trLogInfo, doub
cx->transactionBytesRead += bytes;
cx->transactionKeysRead += result.size();
if( trLogInfo ) {
trLogInfo->addLog(FdbClientLogEvents::EventGetRange(startTime, cx->clientLocality.dcId(), now()-startTime, bytes, begin.getKey(), end.getKey()));
}
@ -2593,11 +2593,11 @@ Future<Standalone<RangeResultRef>> getRange( Database const& cx, Future<Version>
}
bool DatabaseContext::debugUseTags = false;
const std::vector<std::string> DatabaseContext::debugTransactionTagChoices = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t" };
const std::vector<std::string> DatabaseContext::debugTransactionTagChoices = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t" };
void debugAddTags(Transaction *tr) {
int numTags = deterministicRandom()->randomInt(0, CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION+1);
for(int i = 0; i < numTags; ++i) {
for(int i = 0; i < numTags; ++i) {
TransactionTag tag;
if(deterministicRandom()->random01() < 0.7) {
tag = TransactionTagRef(deterministicRandom()->randomChoice(DatabaseContext::debugTransactionTagChoices));
@ -3667,7 +3667,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
if (info.debugID.present()) {
TraceEvent(SevInfo, "TransactionBeingTraced")
.detail("DebugTransactionID", trLogInfo->identifier)
.detail("ServerTraceID", info.debugID.get().toString());
.detail("ServerTraceID", info.debugID.get().first());
}
break;
@ -3703,7 +3703,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
if (trLogInfo && !trLogInfo->identifier.empty()) {
TraceEvent(SevInfo, "TransactionBeingTraced")
.detail("DebugTransactionID", trLogInfo->identifier)
.detail("ServerTraceID", info.debugID.get().toString());
.detail("ServerTraceID", info.debugID.get().first());
}
break;
@ -3969,7 +3969,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
case TransactionPriority::DEFAULT:
flags |= GetReadVersionRequest::PRIORITY_DEFAULT;
++cx->transactionDefaultReadVersions;
break;
break;
case TransactionPriority::BATCH:
flags |= GetReadVersionRequest::PRIORITY_BATCH;
++cx->transactionBatchReadVersions;
@ -4205,7 +4205,7 @@ ACTOR Future< StorageMetrics > extractMetrics( Future<std::pair<Optional<Storage
return x.first.get();
}
ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, KeyRange keys) {
ACTOR Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(Database cx, KeyRange keys) {
state Span span("NAPI:GetReadHotRanges"_loc);
loop {
int64_t shardLimit = 100; // Shard limit here does not really matter since this function is currently only used
@ -4233,7 +4233,7 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, K
}
wait(waitForAll(fReplies));
Standalone<VectorRef<KeyRangeRef>> results;
Standalone<VectorRef<ReadHotRangeWithMetrics>> results;
for (int i = 0; i < nLocs; i++)
results.append(results.arena(), fReplies[i].get().readHotRanges.begin(),
@ -4250,7 +4250,7 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, K
}
}
}
ACTOR Future< std::pair<Optional<StorageMetrics>, int> > waitStorageMetrics(
Database cx,
KeyRange keys,
@ -4342,7 +4342,7 @@ ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsLis
}
}
Future<Standalone<VectorRef<KeyRangeRef>>> Transaction::getReadHotRanges(KeyRange const& keys) {
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> Transaction::getReadHotRanges(KeyRange const& keys) {
return ::getReadHotRanges(cx, keys);
}

View File

@ -266,7 +266,7 @@ public:
// Pass a negative value for `shardLimit` to indicate no limit on the shard number.
Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit );
Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated );
Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(KeyRange const& keys);
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(KeyRange const& keys);
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true );

View File

@ -434,9 +434,30 @@ struct SplitMetricsRequest {
}
};
// Should always be used inside a `Standalone`.
struct ReadHotRangeWithMetrics {
KeyRangeRef keys;
double density;
double readBandwidth;
ReadHotRangeWithMetrics() = default;
ReadHotRangeWithMetrics(KeyRangeRef const& keys, double density, double readBandwidth)
: keys(keys), density(density), readBandwidth(readBandwidth) {}
ReadHotRangeWithMetrics(Arena& arena, const ReadHotRangeWithMetrics& rhs)
: keys(arena, rhs.keys), density(rhs.density), readBandwidth(rhs.readBandwidth) {}
int expectedSize() { return keys.expectedSize() + sizeof(density) + sizeof(readBandwidth); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, density, readBandwidth);
}
};
struct ReadHotSubRangeReply {
constexpr static FileIdentifier file_identifier = 10424537;
Standalone<VectorRef<KeyRangeRef>> readHotRanges;
Standalone<VectorRef<ReadHotRangeWithMetrics>> readHotRanges;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -299,11 +299,14 @@ ACTOR Future<Void> readHotDetector(DataDistributionTracker* self) {
state Transaction tr(self->cx);
loop {
try {
Standalone<VectorRef<KeyRangeRef>> readHotRanges = wait(tr.getReadHotRanges(keys));
Standalone<VectorRef<ReadHotRangeWithMetrics>> readHotRanges = wait(tr.getReadHotRanges(keys));
for (auto& keyRange : readHotRanges) {
TraceEvent("ReadHotRangeLog")
.detail("KeyRangeBegin", keyRange.begin)
.detail("KeyRangeEnd", keyRange.end);
.detail("ReadDensity", keyRange.density)
.detail("ReadBandwidth", keyRange.readBandwidth)
.detail("ReadDensityThreshold", SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO)
.detail("KeyRangeBegin", keyRange.keys.begin)
.detail("KeyRangeEnd", keyRange.keys.end);
}
break;
} catch (Error& e) {

View File

@ -144,17 +144,17 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( SHARD_BYTES_PER_SQRT_BYTES, 45 ); if( buggifySmallShards ) SHARD_BYTES_PER_SQRT_BYTES = 0;//Approximately 10000 bytes per shard
init( MAX_SHARD_BYTES, 500000000 );
init( KEY_SERVER_SHARD_BYTES, 500000000 );
init( SHARD_MAX_READ_DENSITY_RATIO, 2.0);
init( SHARD_MAX_READ_DENSITY_RATIO, 8.0); if (randomize && BUGGIFY) SHARD_MAX_READ_DENSITY_RATIO = 2.0;
/*
The bytesRead/byteSize radio. Will be declared as read hot when larger than this. 2.0 was chosen to avoid reporting table scan as read hot.
The bytesRead/byteSize radio. Will be declared as read hot when larger than this. 8.0 was chosen to avoid reporting table scan as read hot.
*/
init ( SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS, 166667 * 1000);
init ( SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS, 1666667 * 1000);
/*
The read bandwidth of a given shard needs to be larger than this value in order to be evaluated if it's read hot. The roughly 167KB per second is calculated as following:
- Heuristic data suggests that each storage process can do max 50K read operations per second
The read bandwidth of a given shard needs to be larger than this value in order to be evaluated if it's read hot. The roughly 1.67MB per second is calculated as following:
- Heuristic data suggests that each storage process can do max 500K read operations per second
- Each read has a minimum cost of EMPTY_READ_PENALTY, which is 20 bytes
- Thus that gives a minimum 1MB per second
- But to be conservative, set that number to be 1/6 of 1MB, which is roughly 166,667 bytes per second
- Thus that gives a minimum 10MB per second
- But to be conservative, set that number to be 1/6 of 10MB, which is roughly 1,666,667 bytes per second
Shard with a read bandwidth smaller than this value will never be too busy to handle the reads.
*/
init( SHARD_MAX_BYTES_READ_PER_KSEC_JITTER, 0.1 );

View File

@ -81,7 +81,7 @@ ACTOR Future<Void> sampleBackups(Reference<RestoreControllerData> self, RestoreC
.detail("SampleID", req.id)
.detail("BatchIndex", req.batchIndex)
.detail("Samples", req.samples.size());
ASSERT(req.batchIndex < self->batch.size());
ASSERT(req.batchIndex <= self->batch.size()); // batchIndex starts from 1
Reference<ControllerBatchData> batch = self->batch[req.batchIndex];
if (batch->sampleMsgs.find(req.id) != batch->sampleMsgs.end()) {

View File

@ -419,9 +419,11 @@ struct StorageServerMetrics {
// Given a read hot shard, this function will divide the shard into chunks and find those chunks whose
// readBytes/sizeBytes exceeds the `readDensityRatio`. Please make sure to run unit tests
// `StorageMetricsSampleTests.txt` after change made.
std::vector<KeyRangeRef> getReadHotRanges(KeyRangeRef shard, double readDensityRatio, int64_t baseChunkSize,
int64_t minShardReadBandwidthPerKSeconds) const {
std::vector<KeyRangeRef> toReturn;
std::vector<ReadHotRangeWithMetrics> getReadHotRanges(KeyRangeRef shard, double readDensityRatio,
int64_t baseChunkSize,
int64_t minShardReadBandwidthPerKSeconds) const {
std::vector<ReadHotRangeWithMetrics> toReturn;
double shardSize = (double)byteSample.getEstimate(shard);
int64_t shardReadBandwidth = bytesReadSample.getEstimate(shard);
if (shardReadBandwidth * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS <=
@ -431,7 +433,9 @@ struct StorageServerMetrics {
if (shardSize <= baseChunkSize) {
// Shard is small, use it as is
if (bytesReadSample.getEstimate(shard) > (readDensityRatio * shardSize)) {
toReturn.push_back(shard);
toReturn.emplace_back(shard, bytesReadSample.getEstimate(shard) / shardSize,
bytesReadSample.getEstimate(shard) /
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL);
}
return toReturn;
}
@ -453,14 +457,15 @@ struct StorageServerMetrics {
if (bytesReadSample.getEstimate(KeyRangeRef(beginKey, *endKey)) >
(readDensityRatio * std::max(baseChunkSize, byteSample.getEstimate(KeyRangeRef(beginKey, *endKey))))) {
auto range = KeyRangeRef(beginKey, *endKey);
if (!toReturn.empty() && toReturn.back().end == range.begin) {
if (!toReturn.empty() && toReturn.back().keys.end == range.begin) {
// in case two consecutive chunks both are over the ratio, merge them.
auto updatedTail = KeyRangeRef(toReturn.back().begin, *endKey);
range = KeyRangeRef(toReturn.back().keys.begin, *endKey);
toReturn.pop_back();
toReturn.push_back(updatedTail);
} else {
toReturn.push_back(range);
}
toReturn.emplace_back(
range,
(double)bytesReadSample.getEstimate(range) / std::max(baseChunkSize, byteSample.getEstimate(range)),
bytesReadSample.getEstimate(range) / SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL);
}
beginKey = *endKey;
endKey = byteSample.sample.index(byteSample.sample.sumTo(byteSample.sample.lower_bound(beginKey)) +
@ -471,10 +476,10 @@ struct StorageServerMetrics {
void getReadHotRanges(ReadHotSubRangeRequest req) const {
ReadHotSubRangeReply reply;
std::vector<KeyRangeRef> v = getReadHotRanges(req.keys, SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO,
SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE,
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
reply.readHotRanges = VectorRef<KeyRangeRef>(v.data(), v.size());
auto _ranges = getReadHotRanges(req.keys, SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO,
SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE,
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
reply.readHotRanges = VectorRef(_ranges.data(), _ranges.size());
req.reply.send(reply);
}
@ -518,11 +523,11 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/simple") {
ssm.byteSample.sample.insert(LiteralStringRef("But"), 100 * sampleUnit);
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
vector<KeyRangeRef> t =
std::vector<ReadHotRangeWithMetrics> t =
ssm.getReadHotRanges(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 2.0, 200 * sampleUnit, 0);
ASSERT(t.size() == 1 && (*t.begin()).begin == LiteralStringRef("Bah") &&
(*t.begin()).end == LiteralStringRef("Bob"));
ASSERT(t.size() == 1 && (*t.begin()).keys.begin == LiteralStringRef("Bah") &&
(*t.begin()).keys.end == LiteralStringRef("Bob"));
return Void();
}
@ -549,12 +554,12 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/moreThanOneRange") {
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
ssm.byteSample.sample.insert(LiteralStringRef("Dah"), 300 * sampleUnit);
vector<KeyRangeRef> t =
std::vector<ReadHotRangeWithMetrics> t =
ssm.getReadHotRanges(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), 2.0, 200 * sampleUnit, 0);
ASSERT(t.size() == 2 && (*t.begin()).begin == LiteralStringRef("Bah") &&
(*t.begin()).end == LiteralStringRef("Bob"));
ASSERT(t.at(1).begin == LiteralStringRef("Cat") && t.at(1).end == LiteralStringRef("Dah"));
ASSERT(t.size() == 2 && (*t.begin()).keys.begin == LiteralStringRef("Bah") &&
(*t.begin()).keys.end == LiteralStringRef("Bob"));
ASSERT(t.at(1).keys.begin == LiteralStringRef("Cat") && t.at(1).keys.end == LiteralStringRef("Dah"));
return Void();
}
@ -582,12 +587,12 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/consecutiveRanges") {
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
ssm.byteSample.sample.insert(LiteralStringRef("Dah"), 300 * sampleUnit);
vector<KeyRangeRef> t =
std::vector<ReadHotRangeWithMetrics> t =
ssm.getReadHotRanges(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), 2.0, 200 * sampleUnit, 0);
ASSERT(t.size() == 2 && (*t.begin()).begin == LiteralStringRef("Bah") &&
(*t.begin()).end == LiteralStringRef("But"));
ASSERT(t.at(1).begin == LiteralStringRef("Cat") && t.at(1).end == LiteralStringRef("Dah"));
ASSERT(t.size() == 2 && (*t.begin()).keys.begin == LiteralStringRef("Bah") &&
(*t.begin()).keys.end == LiteralStringRef("But"));
ASSERT(t.at(1).keys.begin == LiteralStringRef("Cat") && t.at(1).keys.end == LiteralStringRef("Dah"));
return Void();
}

View File

@ -99,15 +99,15 @@ struct ReadHotDetectionWorkload : TestWorkload {
// TraceEvent("RHDCheckPhaseLog")
// .detail("KeyRangeSize", sm.bytes)
// .detail("KeyRangeReadBandwith", sm.bytesReadPerKSecond);
Standalone<VectorRef<KeyRangeRef>> keyRanges = wait(tr.getReadHotRanges(self->wholeRange));
Standalone<VectorRef<ReadHotRangeWithMetrics>> keyRanges = wait(tr.getReadHotRanges(self->wholeRange));
// TraceEvent("RHDCheckPhaseLog")
// .detail("KeyRangesSize", keyRanges.size())
// .detail("ReadKey", self->readKey.printable().c_str())
// .detail("KeyRangesBackBeginKey", keyRanges.back().begin)
// .detail("KeyRangesBackEndKey", keyRanges.back().end);
// Loose check.
for (auto kr : keyRanges) {
if (kr.contains(self->readKey)) {
for (const auto& kr : keyRanges) {
if (kr.keys.contains(self->readKey)) {
self->passed = true;
}
}