Update per comments
This commit is contained in:
parent
da31eb3d89
commit
0fd43a5bbb
|
@ -305,8 +305,13 @@ public:
|
|||
const Reference<Histogram> bytes;
|
||||
const Reference<Histogram> bandwidth;
|
||||
|
||||
FetchKeysHistograms(const Reference<Histogram> latency_, const Reference<Histogram> bytes_, const Reference<Histogram> bandwidth_)
|
||||
: latency(latency_), bytes(bytes_), bandwidth(bandwidth_) {}
|
||||
FetchKeysHistograms()
|
||||
: latency(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_LATENCY_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
bytes(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_HISTOGRAM,
|
||||
Histogram::Unit::bytes)),
|
||||
bandwidth(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM,
|
||||
Histogram::Unit::bytes)) {}
|
||||
} fetchKeysHistograms;
|
||||
|
||||
class CurrentRunningFetchKeys {
|
||||
|
@ -331,11 +336,11 @@ public:
|
|||
return {-1, emptyKeyRange};
|
||||
}
|
||||
|
||||
const double now_= now();
|
||||
const double currentTime = now();
|
||||
double longest = 0;
|
||||
UID UIDofLongest;
|
||||
for (const auto kv: startTimeMap) {
|
||||
const double currentRunningTime = now_ - kv.second;
|
||||
const double currentRunningTime = currentTime - kv.second;
|
||||
if (longest < currentRunningTime) {
|
||||
longest = currentRunningTime;
|
||||
UIDofLongest = kv.first;
|
||||
|
@ -586,27 +591,18 @@ public:
|
|||
}
|
||||
} counters;
|
||||
|
||||
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
|
||||
: fetchKeysHistograms(
|
||||
Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_LATENCY_HISTOGRAM, Histogram::Unit::microseconds),
|
||||
Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_HISTOGRAM, Histogram::Unit::bytes),
|
||||
Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM, Histogram::Unit::bytes)
|
||||
),
|
||||
instanceID(deterministicRandom()->randomUniqueID().first()),
|
||||
storage(this, storage), db(db),
|
||||
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
||||
rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
|
||||
durableInProgress(Void()),
|
||||
versionLag(0), primaryLocality(tagLocalityInvalid),
|
||||
updateEagerReads(0),
|
||||
shardChangeCounter(0),
|
||||
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
|
||||
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
|
||||
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
|
||||
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
|
||||
behind(false), versionBehind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
|
||||
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
|
||||
{
|
||||
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db,
|
||||
StorageServerInterface const& ssi)
|
||||
: fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage),
|
||||
db(db), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
||||
rebootAfterDurableVersion(std::numeric_limits<Version>::max()), durableInProgress(Void()), versionLag(0),
|
||||
primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0),
|
||||
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), shuttingDown(false),
|
||||
debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), logProtocol(0),
|
||||
counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
|
||||
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), behind(false), versionBehind(false),
|
||||
byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), lastUpdate(now()),
|
||||
poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0) {
|
||||
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
||||
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
|
||||
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
|
||||
|
@ -718,7 +714,7 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
const StringRef StorageServer::CurrentRunningFetchKeys::emptyString= LiteralStringRef("");
|
||||
const StringRef StorageServer::CurrentRunningFetchKeys::emptyString = LiteralStringRef("");
|
||||
const KeyRangeRef StorageServer::CurrentRunningFetchKeys::emptyKeyRange = KeyRangeRef(StorageServer::CurrentRunningFetchKeys::emptyString, StorageServer::CurrentRunningFetchKeys::emptyString);
|
||||
|
||||
// If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end,
|
||||
|
@ -1996,12 +1992,20 @@ void splitMutation(StorageServer* data, KeyRangeMap<T>& map, MutationRef const&
|
|||
ASSERT(false); // Unknown mutation type in splitMutations
|
||||
}
|
||||
|
||||
constexpr double FETCH_KEY_TOO_LONG_TIME_CRITERIA = 300.0;
|
||||
|
||||
ACTOR Future<Void> logFetchKeysWarning(AddingShard* shard) {
|
||||
state double startTime = now();
|
||||
loop {
|
||||
state double waitSeconds = BUGGIFY ? 5.0 : 600.0;
|
||||
wait(delay(waitSeconds));
|
||||
TraceEvent(waitSeconds > 300.0 ? SevWarnAlways : SevInfo, "FetchKeysTooLong").detail("Duration", now() - startTime).detail("Phase", shard->phase).detail("Begin", shard->keys.begin.printable()).detail("End", shard->keys.end.printable());
|
||||
|
||||
const auto traceEventLevel = waitSeconds > FETCH_KEY_TOO_LONG_TIME_CRITERIA ? SevWarnAlways : SevInfo;
|
||||
TraceEvent(traceEventLevel, "FetchKeysTooLong")
|
||||
.detail("Duration", now() - startTime)
|
||||
.detail("Phase", shard->phase)
|
||||
.detail("Begin", shard->keys.begin.printable())
|
||||
.detail("End", shard->keys.end.printable());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2022,8 +2026,10 @@ public:
|
|||
void addFetchedBytes(const int bytes) { fetchedBytes += bytes; }
|
||||
|
||||
~FetchKeysMetricReporter() {
|
||||
const double latency = now() - startTime;
|
||||
if (latency == 0) return;
|
||||
double latency = now() - startTime;
|
||||
|
||||
// If fetchKeys is *NOT* run, i.e. returning immediately, still report a record.
|
||||
if (latency == 0) latency = 1e6;
|
||||
|
||||
const uint32_t bandwidth = fetchedBytes / latency;
|
||||
|
||||
|
@ -3581,12 +3587,18 @@ ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
|
|||
}
|
||||
|
||||
const auto longestRunningFetchKeys = self->currentRunningFetchKeys.longestTime();
|
||||
TraceEvent(SevInfo, "FetchKeyCurrentStatus")
|
||||
.detail("Timestamp", now())
|
||||
.detail("LongestRunningTime", longestRunningFetchKeys.first)
|
||||
.detail("StartKey", longestRunningFetchKeys.second.begin)
|
||||
.detail("EndKey", longestRunningFetchKeys.second.end)
|
||||
.detail("NumRunning", numRunningFetchKeys);
|
||||
|
||||
auto level = SevInfo;
|
||||
if (longestRunningFetchKeys.first >= FETCH_KEY_TOO_LONG_TIME_CRITERIA) {
|
||||
level = SevWarnAlways;
|
||||
}
|
||||
|
||||
TraceEvent(level, "FetchKeyCurrentStatus")
|
||||
.detail("Timestamp", now())
|
||||
.detail("LongestRunningTime", longestRunningFetchKeys.first)
|
||||
.detail("StartKey", longestRunningFetchKeys.second.begin.printable())
|
||||
.detail("EndKey", longestRunningFetchKeys.second.end.printable())
|
||||
.detail("NumRunning", numRunningFetchKeys);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue