Added systemClearRange.
This commit is contained in:
parent
f14c832031
commit
da8453eb5d
|
@ -809,6 +809,8 @@ public:
|
|||
Counter kvBytesCommit;
|
||||
// Count of clearRange operatons to the storage engine.
|
||||
Counter kvClearRange;
|
||||
// ClearRange operations issued by FDB, e.g., removing a shard.
|
||||
Counter systemClearRange;
|
||||
// Bytes of the mutations that have been removed from memory because they durable. The counting is same as
|
||||
// bytesInput, instead of the actual bytes taken in the storages, so that (bytesInput - bytesDurable) can
|
||||
// reflect the current memory footprint of MVCC.
|
||||
|
@ -851,22 +853,22 @@ public:
|
|||
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
|
||||
bytesInput("BytesInput", cc), logicalBytesInput("LogicalBytesInput", cc),
|
||||
logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc), kvBytesCommit("KVBytesCommit", cc),
|
||||
kvClearRange("KVClearRange", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
|
||||
mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc),
|
||||
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
|
||||
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
|
||||
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
|
||||
fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc),
|
||||
fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc),
|
||||
readsRejected("ReadsRejected", cc), wrongShardServer("WrongShardServer", cc),
|
||||
fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc),
|
||||
quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc),
|
||||
quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc),
|
||||
kvBytesScan("KVBytesScan", cc), kvBytesGet("KVBytesGet", cc),
|
||||
readLatencySample("ReadLatencyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
kvClearRange("KVClearRange", cc), systemClearRange("SystemClearRange", cc),
|
||||
bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc),
|
||||
sampledBytesCleared("SampledBytesCleared", cc), kvFetched("KVFetched", cc), mutations("Mutations", cc),
|
||||
setMutations("SetMutations", cc), clearRangeMutations("ClearRangeMutations", cc),
|
||||
atomicMutations("AtomicMutations", cc), updateBatches("UpdateBatches", cc),
|
||||
updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc),
|
||||
fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc),
|
||||
fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc),
|
||||
wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc),
|
||||
fetchesFromLogs("FetchesFromLogs", cc), quickGetValueHit("QuickGetValueHit", cc),
|
||||
quickGetValueMiss("QuickGetValueMiss", cc), quickGetKeyValuesHit("QuickGetKeyValuesHit", cc),
|
||||
quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc), kvBytesScan("KVBytesScan", cc),
|
||||
kvBytesGet("KVBytesGet", cc), readLatencySample("ReadLatencyMetrics",
|
||||
self->thisServerID,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
|
||||
specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; });
|
||||
specialCounter(cc, "Version", [self]() { return self->version.get(); });
|
||||
|
@ -1626,6 +1628,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
|||
if (feed->second->storageVersion != invalidVersion) {
|
||||
self->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
|
||||
changeFeedDurableKey(feed->second->id, req.version)));
|
||||
self->counters.systemClearRange++;
|
||||
if (req.version > feed->second->storageVersion) {
|
||||
feed->second->storageVersion = invalidVersion;
|
||||
feed->second->durableVersion = invalidVersion;
|
||||
|
@ -3698,6 +3701,7 @@ void removeDataRange(StorageServer* ss,
|
|||
MutationRef m(MutationRef::ClearRange, range.end, endClear->getEndKey());
|
||||
m = ss->addMutationToMutationLog(mLV, m);
|
||||
data.insert(m.param1, ValueOrClearToRef::clearTo(m.param2));
|
||||
ss->counters.systemClearRange++;
|
||||
}
|
||||
|
||||
auto beginClear = data.atLatest().lastLess(range.begin);
|
||||
|
@ -4416,6 +4420,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
|
||||
if (shard->phase < AddingShard::Waiting) {
|
||||
data->storage.clearRange(keys);
|
||||
data->counters.systemClearRange++;
|
||||
data->byteSampleApplyClear(keys, invalidVersion);
|
||||
} else {
|
||||
ASSERT(data->data().getLatestVersion() > data->version.get());
|
||||
|
@ -4655,6 +4660,7 @@ void changeServerKeys(StorageServer* data,
|
|||
data->addMutation(data->data().getLatestVersion(), true, clearRange, range, data->updateEagerReads);
|
||||
data->newestAvailableVersion.insert(range, latestVersion);
|
||||
setAvailableStatus(data, range, true);
|
||||
data->counters.systemClearRange++;
|
||||
}
|
||||
validate(data);
|
||||
|
||||
|
@ -4680,10 +4686,12 @@ void changeServerKeys(StorageServer* data,
|
|||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
data->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||
data->counters.systemClearRange++;
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
changeFeedDurableKey(f.first, 0),
|
||||
changeFeedDurableKey(f.first, version)));
|
||||
data->counters.systemClearRange++;
|
||||
auto rs = data->keyChangeFeed.modify(f.second);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||
auto& feedList = r->value();
|
||||
|
@ -4931,10 +4939,12 @@ private:
|
|||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
data->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||
data->counters.systemClearRange++;
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
changeFeedDurableKey(feed->second->id, 0),
|
||||
changeFeedDurableKey(feed->second->id, currentVersion)));
|
||||
data->counters.systemClearRange++;
|
||||
auto rs = data->keyChangeFeed.modify(feed->second->range);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||
auto& feedList = r->value();
|
||||
|
@ -4955,6 +4965,7 @@ private:
|
|||
if (feed->second->storageVersion != invalidVersion) {
|
||||
data->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
|
||||
changeFeedDurableKey(feed->second->id, popVersion)));
|
||||
data->counters.systemClearRange++;
|
||||
if (popVersion > feed->second->storageVersion) {
|
||||
feed->second->storageVersion = invalidVersion;
|
||||
feed->second->durableVersion = invalidVersion;
|
||||
|
@ -5639,6 +5650,7 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
|
|||
//TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", availableKeys.begin).detail("RangeEnd", availableKeys.end);
|
||||
|
||||
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, availableKeys.begin, availableKeys.end));
|
||||
self->counters.systemClearRange++;
|
||||
self->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
availableKeys.begin,
|
||||
|
@ -5659,6 +5671,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
|
|||
persistShardAssignedKeys.begin.toString() + keys.end.toString());
|
||||
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
|
||||
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end));
|
||||
self->counters.systemClearRange++;
|
||||
self->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
assignedKeys.begin,
|
||||
|
@ -5735,8 +5748,8 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion,
|
|||
// Update data->storage to persist the changes from (data->storageVersion(),version]
|
||||
void StorageServerDisk::makeVersionDurable(Version version) {
|
||||
KeyValueRef kv(persistVersion, BinaryWriter::toValue(version, Unversioned()));
|
||||
storage->set(kv);
|
||||
*kvBytesCommit += kv.expectedSize();
|
||||
storage->set(kv);
|
||||
*kvBytesCommit += kv.expectedSize();
|
||||
|
||||
// TraceEvent("MakeDurable", data->thisServerID)
|
||||
// .detail("FromVersion", prevStorageVersion)
|
||||
|
@ -6048,6 +6061,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||
++it) {
|
||||
if (it->value() == invalidVersion) {
|
||||
KeyRangeRef clearRange(it->begin(), it->end());
|
||||
data->counters.systemClearRange++;
|
||||
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
|
||||
// DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange);
|
||||
storage->clear(clearRange);
|
||||
|
@ -6128,6 +6142,7 @@ void StorageServer::byteSampleApplySet(KeyValueRef kv, Version ver) {
|
|||
auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin));
|
||||
addMutationToMutationLogOrStorage(ver,
|
||||
MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
|
||||
counters.systemClearRange++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6174,6 +6189,7 @@ void StorageServer::byteSampleApplyClear(KeyRangeRef range, Version ver) {
|
|||
byteSample.eraseAsync(range.begin, range.end);
|
||||
auto diskRange = range.withPrefix(persistByteSampleKeys.begin);
|
||||
addMutationToMutationLogOrStorage(ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
|
||||
counters.systemClearRange++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue