Added storage engine metrics in SS.
This commit is contained in:
parent
0496f43dec
commit
f14c832031
|
@ -665,6 +665,10 @@ struct RangeResultRef : VectorRef<KeyValueRef> {
|
|||
serializer(ar, ((VectorRef<KeyValueRef>&)*this), more, readThrough, readToBegin, readThroughEnd);
|
||||
}
|
||||
|
||||
int logicalSize() const {
|
||||
return VectorRef<KeyValueRef>::expectedSize() - VectorRef<KeyValueRef>::size() * sizeof(KeyValueRef);
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
return "more:" + std::to_string(more) +
|
||||
" readThrough:" + (readThrough.present() ? readThrough.get().toString() : "[unset]") +
|
||||
|
|
|
@ -232,10 +232,12 @@ struct StorageServerDisk {
|
|||
StorageBytes getStorageBytes() const { return storage->getStorageBytes(); }
|
||||
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
|
||||
|
||||
Counter* kvBytesCommit;
|
||||
Counter* kvClearRange;
|
||||
|
||||
private:
|
||||
struct StorageServer* data;
|
||||
IKeyValueStore* storage;
|
||||
|
||||
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
|
||||
|
||||
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, IKeyValueStore::ReadType type) {
|
||||
|
@ -799,6 +801,14 @@ public:
|
|||
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
|
||||
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
|
||||
Counter bytesInput;
|
||||
// Bytes pulled from TLogs, it counts the size of the key value pairs, without any overhead.
|
||||
Counter logicalBytesInput;
|
||||
// Bytes pulled from TLogs for moving-in shards.
|
||||
Counter logicalBytesMoveInOverhead;
|
||||
// Bytes committed to the underlying storage engine by SS, it counts the size of key value pairs.
|
||||
Counter kvBytesCommit;
|
||||
// Count of clearRange operatons to the storage engine.
|
||||
Counter kvClearRange;
|
||||
// Bytes of the mutations that have been removed from memory because they durable. The counting is same as
|
||||
// bytesInput, instead of the actual bytes taken in the storages, so that (bytesInput - bytesDurable) can
|
||||
// reflect the current memory footprint of MVCC.
|
||||
|
@ -826,6 +836,9 @@ public:
|
|||
// fallback).
|
||||
Counter quickGetValueHit, quickGetValueMiss, quickGetKeyValuesHit, quickGetKeyValuesMiss;
|
||||
|
||||
Counter kvBytesScan;
|
||||
Counter kvBytesGet;
|
||||
|
||||
LatencySample readLatencySample;
|
||||
LatencyBands readLatencyBands;
|
||||
|
||||
|
@ -836,7 +849,9 @@ public:
|
|||
getRangeStreamQueries("GetRangeStreamQueries", cc), finishedQueries("FinishedQueries", cc),
|
||||
lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc),
|
||||
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
|
||||
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
|
||||
bytesInput("BytesInput", cc), logicalBytesInput("LogicalBytesInput", cc),
|
||||
logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc), kvBytesCommit("KVBytesCommit", cc),
|
||||
kvClearRange("KVClearRange", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
|
||||
mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc),
|
||||
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
|
||||
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
|
||||
|
@ -847,6 +862,7 @@ public:
|
|||
fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc),
|
||||
quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc),
|
||||
quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc),
|
||||
kvBytesScan("KVBytesScan", cc), kvBytesGet("KVBytesGet", cc),
|
||||
readLatencySample("ReadLatencyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
|
@ -928,6 +944,9 @@ public:
|
|||
addShard(ShardInfo::newNotAssigned(allKeys));
|
||||
|
||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||
|
||||
this->storage.kvBytesCommit = &counters.kvBytesCommit;
|
||||
this->storage.kvClearRange = &counters.kvClearRange;
|
||||
}
|
||||
|
||||
//~StorageServer() { fclose(log); }
|
||||
|
@ -1340,6 +1359,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||
} else if (!i || !i->isClearTo() || i->getEndKey() <= req.key) {
|
||||
path = 2;
|
||||
Optional<Value> vv = wait(data->storage.readValue(req.key, IKeyValueStore::ReadType::NORMAL, req.debugID));
|
||||
data->counters.kvBytesGet += vv.expectedSize();
|
||||
// Validate that while we were reading the data we didn't lose the version or shard
|
||||
if (version < data->storageVersion()) {
|
||||
TEST(true); // transaction_too_old after readValue
|
||||
|
@ -1774,6 +1794,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
changeFeedDurableKey(req.rangeID, req.end)),
|
||||
1 << 30,
|
||||
remainingDurableBytes));
|
||||
data->counters.kvBytesScan += res.logicalSize();
|
||||
|
||||
if (!req.range.empty()) {
|
||||
data->checkChangeCounter(changeCounter, req.range);
|
||||
|
@ -2187,6 +2208,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
|
|||
readEnd = vCurrent ? std::min(vCurrent.key(), range.end) : range.end;
|
||||
RangeResult atStorageVersion =
|
||||
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
|
||||
data->counters.kvBytesScan += atStorageVersion.logicalSize();
|
||||
|
||||
ASSERT(atStorageVersion.size() <= limit);
|
||||
if (data->storageVersion() > version)
|
||||
|
@ -2268,6 +2290,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
|
|||
: range.begin;
|
||||
RangeResult atStorageVersion =
|
||||
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
|
||||
data->counters.kvBytesScan += atStorageVersion.logicalSize();
|
||||
|
||||
ASSERT(atStorageVersion.size() <= -limit);
|
||||
if (data->storageVersion() > version)
|
||||
|
@ -3367,6 +3390,9 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
|
|||
|
||||
state Future<std::vector<Key>> futureKeyEnds = getAll(keyEnd);
|
||||
state std::vector<Key> keyEndVal = wait(futureKeyEnds);
|
||||
for (const auto& key : keyEndVal) {
|
||||
data->counters.kvBytesScan += key.expectedSize();
|
||||
}
|
||||
eager->keyEnd = keyEndVal;
|
||||
}
|
||||
|
||||
|
@ -3377,6 +3403,11 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
|
|||
|
||||
state Future<std::vector<Optional<Value>>> futureValues = getAll(value);
|
||||
std::vector<Optional<Value>> optionalValues = wait(futureValues);
|
||||
for (const auto& value : optionalValues) {
|
||||
if (value.present()) {
|
||||
data->counters.kvBytesGet += value.expectedSize();
|
||||
}
|
||||
}
|
||||
eager->value = optionalValues;
|
||||
|
||||
return Void();
|
||||
|
@ -4416,6 +4447,7 @@ AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys)
|
|||
}
|
||||
|
||||
void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const& mutation) {
|
||||
server->counters.logicalBytesMoveInOverhead += mutation.expectedSize();
|
||||
if (mutation.type == mutation.ClearRange) {
|
||||
ASSERT(keys.begin <= mutation.param1 && mutation.param2 <= keys.end);
|
||||
} else if (isSingleKeyMutation((MutationRef::Type)mutation.type)) {
|
||||
|
@ -5289,6 +5321,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
updater.applyMutation(data, msg, ver, false);
|
||||
mutationBytes += msg.totalSize();
|
||||
data->counters.mutationBytes += msg.totalSize();
|
||||
data->counters.logicalBytesInput += msg.expectedSize();
|
||||
++data->counters.mutations;
|
||||
switch (msg.type) {
|
||||
case MutationRef::SetValue:
|
||||
|
@ -5641,17 +5674,21 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
|
|||
|
||||
void StorageServerDisk::clearRange(KeyRangeRef keys) {
|
||||
storage->clear(keys);
|
||||
*kvClearRange++;
|
||||
}
|
||||
|
||||
void StorageServerDisk::writeKeyValue(KeyValueRef kv) {
|
||||
storage->set(kv);
|
||||
*kvBytesCommit += kv.expectedSize();
|
||||
}
|
||||
|
||||
void StorageServerDisk::writeMutation(MutationRef mutation) {
|
||||
if (mutation.type == MutationRef::SetValue) {
|
||||
storage->set(KeyValueRef(mutation.param1, mutation.param2));
|
||||
*kvBytesCommit += mutation.expectedSize();
|
||||
} else if (mutation.type == MutationRef::ClearRange) {
|
||||
storage->clear(KeyRangeRef(mutation.param1, mutation.param2));
|
||||
*kvClearRange++;
|
||||
} else
|
||||
ASSERT(false);
|
||||
}
|
||||
|
@ -5663,8 +5700,10 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
|
|||
DEBUG_MUTATION(debugContext, debugVersion, m, data->thisServerID);
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
storage->set(KeyValueRef(m.param1, m.param2));
|
||||
*kvBytesCommit += m.expectedSize();
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
storage->clear(KeyRangeRef(m.param1, m.param2));
|
||||
*kvClearRange++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5695,7 +5734,9 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion,
|
|||
|
||||
// Update data->storage to persist the changes from (data->storageVersion(),version]
|
||||
void StorageServerDisk::makeVersionDurable(Version version) {
|
||||
storage->set(KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())));
|
||||
KeyValueRef kv(persistVersion, BinaryWriter::toValue(version, Unversioned()));
|
||||
storage->set(kv);
|
||||
*kvBytesCommit += kv.expectedSize();
|
||||
|
||||
// TraceEvent("MakeDurable", data->thisServerID)
|
||||
// .detail("FromVersion", prevStorageVersion)
|
||||
|
@ -5724,8 +5765,10 @@ ACTOR Future<Void> applyByteSampleResult(StorageServer* data,
|
|||
loop {
|
||||
RangeResult bs = wait(storage->readRange(
|
||||
KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES));
|
||||
if (results)
|
||||
if (results) {
|
||||
results->push_back(bs.castTo<VectorRef<KeyValueRef>>());
|
||||
data->counters.kvBytesScan += bs.logicalSize();
|
||||
}
|
||||
int rangeSize = bs.expectedSize();
|
||||
totalFetches++;
|
||||
totalKeys += bs.size();
|
||||
|
|
Loading…
Reference in New Issue