use DefaultEndPoint as the default priority for storage server reads
This commit is contained in:
parent
2322730345
commit
f761f9a03a
|
@ -740,6 +740,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100);
|
init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100);
|
||||||
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
||||||
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
|
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
|
||||||
|
init( STORAGESERVER_MAX_RANK, 4 );
|
||||||
|
init( STORAGESERVER_READ_RANKS, "0,1,2,3,4" );
|
||||||
init( STORAGESERVER_READ_PRIORITIES, "32,8,12,32,48" );
|
init( STORAGESERVER_READ_PRIORITIES, "32,8,12,32,48" );
|
||||||
|
|
||||||
//Wait Failure
|
//Wait Failure
|
||||||
|
|
|
@ -698,6 +698,8 @@ public:
|
||||||
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
|
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
|
||||||
int QUICK_GET_KEY_VALUES_LIMIT;
|
int QUICK_GET_KEY_VALUES_LIMIT;
|
||||||
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
||||||
|
int STORAGESERVER_MAX_RANK;
|
||||||
|
std::string STORAGESERVER_READ_RANKS;
|
||||||
std::string STORAGESERVER_READ_PRIORITIES;
|
std::string STORAGESERVER_READ_PRIORITIES;
|
||||||
|
|
||||||
// Wait Failure
|
// Wait Failure
|
||||||
|
|
|
@ -853,6 +853,7 @@ public:
|
||||||
FlowLock serveFetchCheckpointParallelismLock;
|
FlowLock serveFetchCheckpointParallelismLock;
|
||||||
|
|
||||||
PriorityMultiLock ssLock;
|
PriorityMultiLock ssLock;
|
||||||
|
std::vector<int> readPriorityRanks;
|
||||||
|
|
||||||
int64_t instanceID;
|
int64_t instanceID;
|
||||||
|
|
||||||
|
@ -1061,12 +1062,15 @@ public:
|
||||||
fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
|
fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
|
||||||
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
|
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
|
||||||
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
|
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
|
||||||
ssLock(FLOW_KNOBS->MAX_OUTSTANDING, (int)ReadType::MAX, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
|
ssLock(FLOW_KNOBS->MAX_OUTSTANDING,
|
||||||
|
SERVER_KNOBS->STORAGESERVER_MAX_RANK,
|
||||||
|
SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
|
||||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
||||||
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this),
|
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this),
|
||||||
storageServerSourceTLogIDEventHolder(
|
storageServerSourceTLogIDEventHolder(
|
||||||
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
|
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
|
||||||
|
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READ_RANKS, ',');
|
||||||
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
||||||
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
|
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
|
||||||
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
|
durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
|
||||||
|
@ -1568,9 +1572,9 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
||||||
|
|
||||||
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
||||||
// so we need to downgrade here
|
// so we need to downgrade here
|
||||||
wait(delay(0));
|
wait(delay(0, TaskPriority::DefaultEndpoint));
|
||||||
state int readPriority = data->getQueryPriority();
|
state int rankIndex = data->getQueryPriority();
|
||||||
wait(store(lock, data->ssLock.lock(readPriority)));
|
wait(store(lock, data->ssLock.lock(data->readPriorityRanks[rankIndex])));
|
||||||
|
|
||||||
if (req.debugID.present())
|
if (req.debugID.present())
|
||||||
g_traceBatch.addEvent("GetValueDebug",
|
g_traceBatch.addEvent("GetValueDebug",
|
||||||
|
@ -3381,10 +3385,10 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
|
||||||
|
|
||||||
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
||||||
// so we need to downgrade here
|
// so we need to downgrade here
|
||||||
wait(delay(0));
|
wait(delay(0, TaskPriority::DefaultEndpoint));
|
||||||
state int readPriority =
|
state int rankIndex =
|
||||||
(req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : data->getQueryPriority();
|
(req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : data->getQueryPriority();
|
||||||
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority));
|
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[rankIndex]));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (req.debugID.present())
|
if (req.debugID.present())
|
||||||
|
@ -4099,10 +4103,10 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
|
||||||
|
|
||||||
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
||||||
// so we need to downgrade here
|
// so we need to downgrade here
|
||||||
wait(delay(0));
|
wait(delay(0, TaskPriority::DefaultEndpoint));
|
||||||
state int readPriority =
|
state int rankIndex =
|
||||||
(req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : data->getQueryPriority();
|
(req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : data->getQueryPriority();
|
||||||
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority));
|
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[rankIndex]));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (req.debugID.present())
|
if (req.debugID.present())
|
||||||
|
@ -4290,8 +4294,9 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
|
||||||
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
|
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
|
||||||
state int64_t resultSize = 0;
|
state int64_t resultSize = 0;
|
||||||
state ReadType type = req.isFetchKeys ? ReadType::FETCH : ReadType::NORMAL;
|
state ReadType type = req.isFetchKeys ? ReadType::FETCH : ReadType::NORMAL;
|
||||||
state int readPriority =
|
state int rankIndex =
|
||||||
(req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : (int)ReadType::NORMAL;
|
(req.isFetchKeys && SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY) ? (int)ReadType::FETCH : (int)ReadType::NORMAL;
|
||||||
|
state int readPriority = data->readPriorityRanks[rankIndex];
|
||||||
|
|
||||||
if (req.tenantInfo.name.present()) {
|
if (req.tenantInfo.name.present()) {
|
||||||
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
|
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
|
||||||
|
@ -4306,7 +4311,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
|
||||||
|
|
||||||
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
||||||
// so we need to downgrade here
|
// so we need to downgrade here
|
||||||
wait(delay(0));
|
wait(delay(0, TaskPriority::DefaultEndpoint));
|
||||||
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority));
|
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -4464,6 +4469,9 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
|
||||||
end = lastKey;
|
end = lastKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lock.release();
|
||||||
|
wait(store(lock, data->ssLock.lock(readPriority)));
|
||||||
|
|
||||||
data->transactionTagCounter.addRequest(req.tags, resultSize);
|
data->transactionTagCounter.addRequest(req.tags, resultSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4499,9 +4507,9 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
|
||||||
|
|
||||||
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
|
||||||
// so we need to downgrade here
|
// so we need to downgrade here
|
||||||
wait(delay(0));
|
wait(delay(0, TaskPriority::DefaultEndpoint));
|
||||||
state int readPriority = data->getQueryPriority();
|
state int rankIndex = data->getQueryPriority();
|
||||||
wait(store(lock, data->ssLock.lock(readPriority)));
|
wait(store(lock, data->ssLock.lock(data->readPriorityRanks[rankIndex])));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
|
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
|
||||||
|
@ -8671,6 +8679,24 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
|
||||||
[self = self](TraceEvent& te) {
|
[self = self](TraceEvent& te) {
|
||||||
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
|
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
|
||||||
te.detail("Tag", self->tag.toString());
|
te.detail("Tag", self->tag.toString());
|
||||||
|
std::vector<int> rpr = self->readPriorityRanks;
|
||||||
|
te.detail("ActiveReads", self->ssLock.totalWorkers());
|
||||||
|
te.detail("AwaitReads", self->ssLock.totalWaiters());
|
||||||
|
int type = (int)ReadType::EAGER;
|
||||||
|
te.detail("ActiveEager", self->ssLock.numWorkers(rpr[type]));
|
||||||
|
te.detail("AwaitEager", self->ssLock.numWaiters(rpr[type]));
|
||||||
|
type = (int)ReadType::FETCH;
|
||||||
|
te.detail("ActiveFetch", self->ssLock.numWorkers(rpr[type]));
|
||||||
|
te.detail("AwaitFetch", self->ssLock.numWaiters(rpr[type]));
|
||||||
|
type = (int)ReadType::LOW;
|
||||||
|
te.detail("ActiveLow", self->ssLock.numWorkers(rpr[type]));
|
||||||
|
te.detail("AwaitLow", self->ssLock.numWaiters(rpr[type]));
|
||||||
|
type = (int)ReadType::NORMAL;
|
||||||
|
te.detail("ActiveNormal", self->ssLock.numWorkers(rpr[type]));
|
||||||
|
te.detail("AwaitNormal", self->ssLock.numWaiters(rpr[type]));
|
||||||
|
type = (int)ReadType::HIGH;
|
||||||
|
te.detail("ActiveHigh", self->ssLock.numWorkers(rpr[type]));
|
||||||
|
te.detail("AwaitHigh", self->ssLock.numWaiters(rpr[type]));
|
||||||
StorageBytes sb = self->storage.getStorageBytes();
|
StorageBytes sb = self->storage.getStorageBytes();
|
||||||
te.detail("KvstoreBytesUsed", sb.used);
|
te.detail("KvstoreBytesUsed", sb.used);
|
||||||
te.detail("KvstoreBytesFree", sb.free);
|
te.detail("KvstoreBytesFree", sb.free);
|
||||||
|
|
|
@ -2175,6 +2175,7 @@ public:
|
||||||
this->launchLimit = parseStringToVector<int>(launchLimit, ',');
|
this->launchLimit = parseStringToVector<int>(launchLimit, ',');
|
||||||
ASSERT(this->launchLimit.size() == maxPriority + 1);
|
ASSERT(this->launchLimit.size() == maxPriority + 1);
|
||||||
waiters.resize(maxPriority + 1);
|
waiters.resize(maxPriority + 1);
|
||||||
|
workerCounts.resize(maxPriority + 1, 0);
|
||||||
fRunner = runner(this);
|
fRunner = runner(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2185,8 +2186,9 @@ public:
|
||||||
// This shortcut may enable a waiter to jump the line when the releaser loop yields
|
// This shortcut may enable a waiter to jump the line when the releaser loop yields
|
||||||
if (available > 0) {
|
if (available > 0) {
|
||||||
--available;
|
--available;
|
||||||
|
workerCounts[priority] += 1;
|
||||||
Lock p;
|
Lock p;
|
||||||
addRunner(p);
|
addRunner(p, priority);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2234,6 +2236,20 @@ public:
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int totalWaiters() { return waiting; }
|
||||||
|
|
||||||
|
int numWaiters(const unsigned int priority) {
|
||||||
|
ASSERT(priority < waiters.size());
|
||||||
|
return waiters[priority].size();
|
||||||
|
}
|
||||||
|
|
||||||
|
int totalWorkers() { return concurrency - available; }
|
||||||
|
|
||||||
|
int numWorkers(const unsigned int priority) {
|
||||||
|
ASSERT(priority < waiters.size());
|
||||||
|
return workerCounts[priority];
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Waiter {
|
struct Waiter {
|
||||||
Waiter() : queuedTime(now()) {}
|
Waiter() : queuedTime(now()) {}
|
||||||
|
@ -2247,14 +2263,16 @@ private:
|
||||||
typedef Deque<Waiter> Queue;
|
typedef Deque<Waiter> Queue;
|
||||||
std::vector<int> launchLimit;
|
std::vector<int> launchLimit;
|
||||||
std::vector<Queue> waiters;
|
std::vector<Queue> waiters;
|
||||||
|
std::vector<int> workerCounts;
|
||||||
Deque<Future<Void>> runners;
|
Deque<Future<Void>> runners;
|
||||||
Future<Void> fRunner;
|
Future<Void> fRunner;
|
||||||
AsyncTrigger release;
|
AsyncTrigger release;
|
||||||
Promise<Void> brokenOnDestruct;
|
Promise<Void> brokenOnDestruct;
|
||||||
|
|
||||||
void addRunner(Lock& lock) {
|
void addRunner(Lock& lock, int priority) {
|
||||||
runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) {
|
runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) {
|
||||||
++available;
|
++available;
|
||||||
|
workerCounts[priority] -= 1;
|
||||||
if (waiting > 0 || runners.size() > 100) {
|
if (waiting > 0 || runners.size() > 100) {
|
||||||
release.trigger();
|
release.trigger();
|
||||||
}
|
}
|
||||||
|
@ -2307,7 +2325,8 @@ private:
|
||||||
|
|
||||||
// If the lock was not already released, add it to the runners future queue
|
// If the lock was not already released, add it to the runners future queue
|
||||||
if (lock.promise.canBeSet()) {
|
if (lock.promise.canBeSet()) {
|
||||||
self->addRunner(lock);
|
self->workerCounts[priority] += 1;
|
||||||
|
self->addRunner(lock, priority);
|
||||||
|
|
||||||
// A slot has been consumed, so stop reading from this queue if there aren't any more
|
// A slot has been consumed, so stop reading from this queue if there aren't any more
|
||||||
if (--self->available == 0) {
|
if (--self->available == 0) {
|
||||||
|
|
Loading…
Reference in New Issue