fixup! Update per comments
This commit is contained in:
parent
ec40c6bfec
commit
90b887f394
|
@ -58,6 +58,9 @@ struct VersionReply {
|
|||
struct UpdateCommitCostRequest {
|
||||
constexpr static FileIdentifier file_identifier = 4159439;
|
||||
|
||||
// The time the request being posted
|
||||
double postTime;
|
||||
|
||||
double elapsed;
|
||||
TransactionTag busiestTag;
|
||||
|
||||
|
@ -72,7 +75,7 @@ struct UpdateCommitCostRequest {
|
|||
|
||||
template <typename Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply);
|
||||
serializer(ar, postTime, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1016,6 +1016,7 @@ UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) {
|
|||
}
|
||||
|
||||
UpdateCommitCostRequest updateCommitCostRequest;
|
||||
updateCommitCostRequest.postTime = now();
|
||||
updateCommitCostRequest.elapsed = elapsed;
|
||||
updateCommitCostRequest.busiestTag = busiestTag;
|
||||
updateCommitCostRequest.opsSum = maxCost.getOpsSum();
|
||||
|
|
|
@ -90,17 +90,12 @@ class TransactionTagCounterImpl {
|
|||
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
||||
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||
|
||||
const std::string busiestWriteTagTrackingKey;
|
||||
Reference<EventCacheHolder> busiestWriteTagEventHolder;
|
||||
|
||||
static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
||||
|
||||
public:
|
||||
TransactionTagCounterImpl(UID thisServerID)
|
||||
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
|
||||
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")),
|
||||
busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"),
|
||||
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(busiestWriteTagTrackingKey)) {}
|
||||
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
|
||||
|
||||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
if (tags.present()) {
|
||||
|
@ -116,8 +111,6 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
const std::string& getBusiestWritingTagTrackingKey() const { return busiestWriteTagTrackingKey; }
|
||||
|
||||
void startNewInterval() {
|
||||
double elapsed = now() - intervalStart;
|
||||
previousBusiestTags.clear();
|
||||
|
@ -155,10 +148,6 @@ void TransactionTagCounter::startNewInterval() {
|
|||
return impl->startNewInterval();
|
||||
}
|
||||
|
||||
const std::string& TransactionTagCounter::getBusiestWriteTagTrackingKey() const {
|
||||
return impl->getBusiestWritingTagTrackingKey();
|
||||
}
|
||||
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& TransactionTagCounter::getBusiestTags() const {
|
||||
return impl->getBusiestTags();
|
||||
}
|
||||
|
|
|
@ -620,6 +620,16 @@ public:
|
|||
: key(key), value(value), version(version), tags(tags), debugID(debugID) {}
|
||||
};
|
||||
|
||||
struct BusiestWriteTagContext {
|
||||
const std::string busiestWriteTagTrackingKey;
|
||||
Reference<EventCacheHolder> busiestWriteTagEventHolder;
|
||||
double lastUpdateTime;
|
||||
|
||||
BusiestWriteTagContext(const UID& thisServerID)
|
||||
: busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"),
|
||||
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(busiestWriteTagTrackingKey)), lastUpdateTime(0.0) {}
|
||||
};
|
||||
|
||||
struct StorageServer {
|
||||
typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
|
||||
|
||||
|
@ -1018,6 +1028,7 @@ public:
|
|||
}
|
||||
|
||||
TransactionTagCounter transactionTagCounter;
|
||||
BusiestWriteTagContext busiestWriteTagContext;
|
||||
|
||||
Optional<LatencyBandConfig> latencyBandConfig;
|
||||
|
||||
|
@ -1226,7 +1237,8 @@ public:
|
|||
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
|
||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
||||
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this),
|
||||
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()),
|
||||
busiestWriteTagContext(ssi.id()), counters(this),
|
||||
storageServerSourceTLogIDEventHolder(
|
||||
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
|
||||
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
||||
|
@ -10067,6 +10079,12 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
|||
self->actors.add(fetchCheckpointQ(self, req));
|
||||
}
|
||||
when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) {
|
||||
// In case we received an old request/duplicate request, due to, e.g. network problem
|
||||
if (req.postTime < self->busiestWriteTagContext.lastUpdateTime) {
|
||||
continue;
|
||||
}
|
||||
|
||||
self->busiestWriteTagContext.lastUpdateTime = req.postTime;
|
||||
TraceEvent("BusiestWriteTag", self->thisServerID)
|
||||
.detail("Elapsed", req.elapsed)
|
||||
.detail("Tag", printable(req.busiestTag))
|
||||
|
@ -10074,7 +10092,7 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
|||
.detail("TagCost", req.costSum)
|
||||
.detail("TotalCost", req.totalWriteCosts)
|
||||
.detail("Reported", req.reported)
|
||||
.trackLatest(self->transactionTagCounter.getBusiestWriteTagTrackingKey());
|
||||
.trackLatest(self->busiestWriteTagContext.busiestWriteTagTrackingKey);
|
||||
|
||||
req.reply.send(Void());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue