Capture how fast an SS is catching up to its tLog-SS lag
Changes: storagegroupserver.actor.cpp: - Report "fetchedVersions" and "duration" as part of StorageMetrics trace event. - Report "sourceTLogID" as a separte trace event (and report this only when it changes)..
This commit is contained in:
parent
b0554b4554
commit
78ef6822f6
|
@ -545,9 +545,10 @@ public:
|
|||
int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage
|
||||
// server
|
||||
|
||||
int64_t versionCount;
|
||||
double duration;
|
||||
Optional<UID> sourceTLogID;
|
||||
// Metrics about the latest batch of versions fetched by this StorageServer
|
||||
int64_t fetchedVersions; // how many versions were fetched
|
||||
double duration; // how long (in seconds) it took to fetch the versions
|
||||
Optional<UID> sourceTLogID; // the tLog from which the versions were fetched
|
||||
|
||||
ProtocolVersion logProtocol;
|
||||
|
||||
|
@ -710,6 +711,8 @@ public:
|
|||
specialCounter(cc, "DurableVersion", [self]() { return self->durableVersion.get(); });
|
||||
specialCounter(cc, "DesiredOldestVersion", [self]() { return self->desiredOldestVersion.get(); });
|
||||
specialCounter(cc, "VersionLag", [self]() { return self->versionLag; });
|
||||
specialCounter(cc, "FetchedVersions", [self]() { return self->fetchedVersions; });
|
||||
specialCounter(cc, "Duration", [self]() { return self->duration; });
|
||||
specialCounter(cc, "LocalRate", [self] { return self->currentRate() * 100; });
|
||||
|
||||
specialCounter(cc, "BytesReadSampleCount", [self]() { return self->metrics.bytesReadSample.queue.size(); });
|
||||
|
@ -736,7 +739,7 @@ public:
|
|||
: fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage),
|
||||
db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
|
||||
rebootAfterDurableVersion(std::numeric_limits<Version>::max()), durableInProgress(Void()), versionLag(0),
|
||||
versionCount(0), duration(0), primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0),
|
||||
fetchedVersions(0), duration(0.0), primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0),
|
||||
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), shuttingDown(false),
|
||||
debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), logProtocol(0),
|
||||
counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
|
||||
|
@ -3527,18 +3530,20 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
if (data->otherError.getFuture().isReady())
|
||||
data->otherError.getFuture().get();
|
||||
|
||||
auto curTime = now();
|
||||
data->versionCount = ver - data->version.get();
|
||||
data->duration = curTime - data->lastUpdate;
|
||||
data->sourceTLogID = cursor->getCurrentPeekLocation();
|
||||
data->fetchedVersions = ver - data->version.get();
|
||||
data->duration = now() - data->lastUpdate;
|
||||
Optional<UID> curSourceTLogID = cursor->getCurrentPeekLocation();
|
||||
|
||||
TraceEvent("StorageServerCatchUpRate", data->thisServerID)
|
||||
.detail("VersionCount", data->versionCount)
|
||||
.detail("Duration", data->duration)
|
||||
.detail("SourceTLogId", data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown");
|
||||
if (curSourceTLogID != data->sourceTLogID) {
|
||||
data->sourceTLogID = curSourceTLogID;
|
||||
|
||||
TraceEvent("StorageServerSourceTLogID", data->thisServerID)
|
||||
.detail("SourceTLogID", data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown")
|
||||
.trackLatest(data->thisServerID.toString() + "/StorageServerSourceTLogID");
|
||||
}
|
||||
|
||||
data->noRecentUpdates.set(false);
|
||||
data->lastUpdate = curTime;
|
||||
data->lastUpdate = now();
|
||||
data->version.set(ver); // Triggers replies to waiting gets for new version(s)
|
||||
|
||||
setDataVersion(data->thisServerID, data->version.get());
|
||||
|
|
Loading…
Reference in New Issue