From 90b887f3945af7c46455e598f292f9e5b5b8526b Mon Sep 17 00:00:00 2001 From: Xiaoge Su Date: Thu, 7 Jul 2022 22:45:56 -0700 Subject: [PATCH] fixup! Update per comments --- .../fdbclient/StorageServerInterface.h | 5 ++++- fdbserver/Ratekeeper.actor.cpp | 1 + fdbserver/TransactionTagCounter.cpp | 13 +---------- fdbserver/storageserver.actor.cpp | 22 +++++++++++++++++-- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/fdbclient/include/fdbclient/StorageServerInterface.h b/fdbclient/include/fdbclient/StorageServerInterface.h index abde99ce6f..d1d7cf080b 100644 --- a/fdbclient/include/fdbclient/StorageServerInterface.h +++ b/fdbclient/include/fdbclient/StorageServerInterface.h @@ -58,6 +58,9 @@ struct VersionReply { struct UpdateCommitCostRequest { constexpr static FileIdentifier file_identifier = 4159439; + // The time the request being posted + double postTime; + double elapsed; TransactionTag busiestTag; @@ -72,7 +75,7 @@ struct UpdateCommitCostRequest { template void serialize(Ar& ar) { - serializer(ar, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply); + serializer(ar, postTime, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply); } }; diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index d90a653216..81b20659f2 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -1016,6 +1016,7 @@ UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) { } UpdateCommitCostRequest updateCommitCostRequest; + updateCommitCostRequest.postTime = now(); updateCommitCostRequest.elapsed = elapsed; updateCommitCostRequest.busiestTag = busiestTag; updateCommitCostRequest.opsSum = maxCost.getOpsSum(); diff --git a/fdbserver/TransactionTagCounter.cpp b/fdbserver/TransactionTagCounter.cpp index 29134b87f8..bd09d19cd4 100644 --- a/fdbserver/TransactionTagCounter.cpp +++ b/fdbserver/TransactionTagCounter.cpp @@ -90,17 +90,12 @@ class TransactionTagCounterImpl { std::vector previousBusiestTags; Reference busiestReadTagEventHolder; - const std::string busiestWriteTagTrackingKey; - Reference busiestWriteTagEventHolder; - static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; } public: TransactionTagCounterImpl(UID thisServerID) : thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED), - busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")), - busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"), - busiestWriteTagEventHolder(makeReference(busiestWriteTagTrackingKey)) {} + busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")) {} void addRequest(Optional const& tags, int64_t bytes) { if (tags.present()) { @@ -116,8 +111,6 @@ public: } } - const std::string& getBusiestWritingTagTrackingKey() const { return busiestWriteTagTrackingKey; } - void startNewInterval() { double elapsed = now() - intervalStart; previousBusiestTags.clear(); @@ -155,10 +148,6 @@ void TransactionTagCounter::startNewInterval() { return impl->startNewInterval(); } -const std::string& TransactionTagCounter::getBusiestWriteTagTrackingKey() const { - return impl->getBusiestWritingTagTrackingKey(); -} - std::vector const& TransactionTagCounter::getBusiestTags() const { return impl->getBusiestTags(); } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 3650a7f457..81ba402346 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -620,6 +620,16 @@ public: : key(key), value(value), version(version), tags(tags), debugID(debugID) {} }; +struct BusiestWriteTagContext { + const std::string busiestWriteTagTrackingKey; + Reference busiestWriteTagEventHolder; + double lastUpdateTime; + + BusiestWriteTagContext(const UID& thisServerID) + : busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"), + busiestWriteTagEventHolder(makeReference(busiestWriteTagTrackingKey)), lastUpdateTime(0.0) {} +}; + struct StorageServer { typedef VersionedMap VersionedData; @@ -1018,6 +1028,7 @@ public: } TransactionTagCounter transactionTagCounter; + BusiestWriteTagContext busiestWriteTagContext; Optional latencyBandConfig; @@ -1226,7 +1237,8 @@ public: serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM), instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false), versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0), - lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this), + lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), + busiestWriteTagContext(ssi.id()), counters(this), storageServerSourceTLogIDEventHolder( makeReference(ssi.id().toString() + "/StorageServerSourceTLogID")) { version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id); @@ -10067,6 +10079,12 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->actors.add(fetchCheckpointQ(self, req)); } when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) { + // In case we received an old request/duplicate request, due to, e.g. network problem + if (req.postTime < self->busiestWriteTagContext.lastUpdateTime) { + continue; + } + + self->busiestWriteTagContext.lastUpdateTime = req.postTime; TraceEvent("BusiestWriteTag", self->thisServerID) .detail("Elapsed", req.elapsed) .detail("Tag", printable(req.busiestTag)) @@ -10074,7 +10092,7 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface .detail("TagCost", req.costSum) .detail("TotalCost", req.totalWriteCosts) .detail("Reported", req.reported) - .trackLatest(self->transactionTagCounter.getBusiestWriteTagTrackingKey()); + .trackLatest(self->busiestWriteTagContext.busiestWriteTagTrackingKey); req.reply.send(Void()); }