From f761f9a03a537a282f457410b38c52ec70070ebb Mon Sep 17 00:00:00 2001 From: Fuheng Zhao Date: Mon, 25 Jul 2022 10:10:42 -0700 Subject: [PATCH] use DefaultEndPoint as the default priority for storage server reads --- fdbclient/ServerKnobs.cpp | 2 + fdbclient/include/fdbclient/ServerKnobs.h | 2 + fdbserver/storageserver.actor.cpp | 56 +++++++++++++++++------ flow/include/flow/genericactors.actor.h | 25 ++++++++-- 4 files changed, 67 insertions(+), 18 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 9eacc643c5..6cda742bb0 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -740,6 +740,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100); init( QUICK_GET_KEY_VALUES_LIMIT, 2000 ); init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 ); + init( STORAGESERVER_MAX_RANK, 4 ); + init( STORAGESERVER_READ_RANKS, "0,1,2,3,4" ); init( STORAGESERVER_READ_PRIORITIES, "32,8,12,32,48" ); //Wait Failure diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 75e0b5b810..4b471db40e 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -698,6 +698,8 @@ public: int CHECKPOINT_TRANSFER_BLOCK_BYTES; int QUICK_GET_KEY_VALUES_LIMIT; int QUICK_GET_KEY_VALUES_LIMIT_BYTES; + int STORAGESERVER_MAX_RANK; + std::string STORAGESERVER_READ_RANKS; std::string STORAGESERVER_READ_PRIORITIES; // Wait Failure diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 9b0dba7d98..0df7536b27 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -853,6 +853,7 @@ public: FlowLock serveFetchCheckpointParallelismLock; PriorityMultiLock ssLock; + std::vector readPriorityRanks; int64_t instanceID; @@ -1061,12 +1062,15 @@ public: fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM), fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false), serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM), - ssLock(FLOW_KNOBS->MAX_OUTSTANDING, (int)ReadType::MAX, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES), + ssLock(FLOW_KNOBS->MAX_OUTSTANDING, + SERVER_KNOBS->STORAGESERVER_MAX_RANK, + SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES), instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false), versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0), lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this), storageServerSourceTLogIDEventHolder( makeReference(ssi.id().toString() + "/StorageServerSourceTLogID")) { + readPriorityRanks = parseStringToVector(SERVER_KNOBS->STORAGESERVER_READ_RANKS, ','); version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id); oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id); durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id); @@ -1568,9 +1572,9 @@ ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { // Active load balancing runs at a very high priority (to obtain accurate queue lengths) // so we need to downgrade here - wait(delay(0)); - state int readPriority = data->getQueryPriority(); - wait(store(lock, data->ssLock.lock(readPriority))); + wait(delay(0, TaskPriority::DefaultEndpoint)); + state int rankIndex = data->getQueryPriority(); + wait(store(lock, data->ssLock.lock(data->readPriorityRanks[rankIndex]))); if (req.debugID.present()) g_traceBatch.addEvent("GetValueDebug", @@ -3381,10 +3385,10 @@ ACTOR Future getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req) // Active load balancing runs at a very high priority (to obtain accurate queue lengths) // so we need to downgrade here - wait(delay(0)); - state int readPriority = + wait(delay(0, TaskPriority::DefaultEndpoint)); + state int rankIndex = (req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : data->getQueryPriority(); - state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority)); + state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[rankIndex])); try { if (req.debugID.present()) @@ -4099,10 +4103,10 @@ ACTOR Future getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe // Active load balancing runs at a very high priority (to obtain accurate queue lengths) // so we need to downgrade here - wait(delay(0)); - state int readPriority = + wait(delay(0, TaskPriority::DefaultEndpoint)); + state int rankIndex = (req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : data->getQueryPriority(); - state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority)); + state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[rankIndex])); try { if (req.debugID.present()) @@ -4290,8 +4294,9 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe state Span span("SS:getKeyValuesStream"_loc, req.spanContext); state int64_t resultSize = 0; state ReadType type = req.isFetchKeys ? ReadType::FETCH : ReadType::NORMAL; - state int readPriority = + state int rankIndex = (req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : (int)ReadType::NORMAL; + state int readPriority = data->readPriorityRanks[rankIndex]; if (req.tenantInfo.name.present()) { span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); @@ -4306,7 +4311,7 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe // Active load balancing runs at a very high priority (to obtain accurate queue lengths) // so we need to downgrade here - wait(delay(0)); + wait(delay(0, TaskPriority::DefaultEndpoint)); state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority)); try { @@ -4464,6 +4469,9 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe end = lastKey; } + lock.release(); + wait(store(lock, data->ssLock.lock(readPriority))); + data->transactionTagCounter.addRequest(req.tags, resultSize); } } @@ -4499,9 +4507,9 @@ ACTOR Future getKeyQ(StorageServer* data, GetKeyRequest req) { // Active load balancing runs at a very high priority (to obtain accurate queue lengths) // so we need to downgrade here - wait(delay(0)); - state int readPriority = data->getQueryPriority(); - wait(store(lock, data->ssLock.lock(readPriority))); + wait(delay(0, TaskPriority::DefaultEndpoint)); + state int rankIndex = data->getQueryPriority(); + wait(store(lock, data->ssLock.lock(data->readPriorityRanks[rankIndex]))); try { Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag); @@ -8671,6 +8679,24 @@ ACTOR Future metricsCore(StorageServer* self, StorageServerInterface ssi) [self = self](TraceEvent& te) { te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString()); te.detail("Tag", self->tag.toString()); + std::vector rpr = self->readPriorityRanks; + te.detail("ActiveReads", self->ssLock.totalWorkers()); + te.detail("AwaitReads", self->ssLock.totalWaiters()); + int type = (int)ReadType::EAGER; + te.detail("ActiveEager", self->ssLock.numWorkers(rpr[type])); + te.detail("AwaitEager", self->ssLock.numWaiters(rpr[type])); + type = (int)ReadType::FETCH; + te.detail("ActiveFetch", self->ssLock.numWorkers(rpr[type])); + te.detail("AwaitFetch", self->ssLock.numWaiters(rpr[type])); + type = (int)ReadType::LOW; + te.detail("ActiveLow", self->ssLock.numWorkers(rpr[type])); + te.detail("AwaitLow", self->ssLock.numWaiters(rpr[type])); + type = (int)ReadType::NORMAL; + te.detail("ActiveNormal", self->ssLock.numWorkers(rpr[type])); + te.detail("AwaitNormal", self->ssLock.numWaiters(rpr[type])); + type = (int)ReadType::HIGH; + te.detail("ActiveHigh", self->ssLock.numWorkers(rpr[type])); + te.detail("AwaitHigh", self->ssLock.numWaiters(rpr[type])); StorageBytes sb = self->storage.getStorageBytes(); te.detail("KvstoreBytesUsed", sb.used); te.detail("KvstoreBytesFree", sb.free); diff --git a/flow/include/flow/genericactors.actor.h b/flow/include/flow/genericactors.actor.h index 42d0298aa9..ec215cb5f0 100644 --- a/flow/include/flow/genericactors.actor.h +++ b/flow/include/flow/genericactors.actor.h @@ -2175,6 +2175,7 @@ public: this->launchLimit = parseStringToVector(launchLimit, ','); ASSERT(this->launchLimit.size() == maxPriority + 1); waiters.resize(maxPriority + 1); + workerCounts.resize(maxPriority + 1, 0); fRunner = runner(this); } @@ -2185,8 +2186,9 @@ public: // This shortcut may enable a waiter to jump the line when the releaser loop yields if (available > 0) { --available; + workerCounts[priority] += 1; Lock p; - addRunner(p); + addRunner(p, priority); return p; } @@ -2234,6 +2236,20 @@ public: return s; } + int totalWaiters() { return waiting; } + + int numWaiters(const unsigned int priority) { + ASSERT(priority < waiters.size()); + return waiters[priority].size(); + } + + int totalWorkers() { return concurrency - available; } + + int numWorkers(const unsigned int priority) { + ASSERT(priority < waiters.size()); + return workerCounts[priority]; + } + private: struct Waiter { Waiter() : queuedTime(now()) {} @@ -2247,14 +2263,16 @@ private: typedef Deque Queue; std::vector launchLimit; std::vector waiters; + std::vector workerCounts; Deque> runners; Future fRunner; AsyncTrigger release; Promise brokenOnDestruct; - void addRunner(Lock& lock) { + void addRunner(Lock& lock, int priority) { runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) { ++available; + workerCounts[priority] -= 1; if (waiting > 0 || runners.size() > 100) { release.trigger(); } @@ -2307,7 +2325,8 @@ private: // If the lock was not already released, add it to the runners future queue if (lock.promise.canBeSet()) { - self->addRunner(lock); + self->workerCounts[priority] += 1; + self->addRunner(lock, priority); // A slot has been consumed, so stop reading from this queue if there aren't any more if (--self->available == 0) {