Completely rewrote PriorityMultiLock scheduling and added a unit test for it.

This commit is contained in:
Steve Atherton 2022-09-20 00:45:29 -07:00
parent 74b152e550
commit ab41da174c
3 changed files with 217 additions and 56 deletions

View File

@ -103,8 +103,6 @@ std::string addPrefix(std::string prefix, std::string lines) {
return s;
}
#define PRIORITYMULTILOCK_DEBUG 0
// Some convenience functions for debugging to stringify various structures
// Classes can add compatibility by either specializing toString<T> or implementing
// std::string toString() const;
@ -10989,3 +10987,57 @@ TEST_CASE(":/redwood/performance/histograms") {
return Void();
}
ACTOR Future<Void> waitLockIncrement(PriorityMultiLock* pml, int priority, int* pout) {
state PriorityMultiLock::Lock lock = wait(pml->lock(priority));
wait(delay(deterministicRandom()->random01() * .1));
++*pout;
return Void();
}
TEST_CASE("/redwood/PriorityMultiLock") {
state std::vector<int> priorities = { 10, 20, 40 };
state int concurrency = 25;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks at each level
state std::vector<Future<PriorityMultiLock::Lock>> lockFutures;
for (int i = 0; i < priorities.size(); ++i) {
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(i));
}
}
// Wait for n = concurrency locks to be acquired
wait(quorum(lockFutures, concurrency));
state std::vector<Future<Void>> futures;
for (int i = 0; i < 10e3; ++i) {
int p = i % priorities.size();
futures.push_back(waitLockIncrement(pml, p, &counts[p]));
}
state Future<Void> f = waitForAll(futures);
// Release the locks
lockFutures.clear();
// Print stats and wait for all futures to be ready
loop {
choose {
when(wait(delay(1))) {
printf("counts: ");
for (auto c : counts) {
printf("%d ", c);
}
printf(" pml: %s\n", pml->toString().c_str());
}
when(wait(f)) { break; }
}
}
delete pml;
return Void();
}

View File

@ -31,10 +31,31 @@
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// A multi user lock with a concurrent holder limit where waiters are granted the lock according to
// an integer priority from 0 to maxPriority, inclusive, where higher integers are given priority.
#define PRIORITYMULTILOCK_DEBUG 0
#if PRIORITYMULTILOCK_DEBUG
#define pml_debug_printf(...) printf(__VA_ARGS__)
#else
#define pml_debug_printf(...)
#endif
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
// id and are granted locks based on a total concurrency and relative importants of the priority
// ids defined.
//
// The interface is similar to FlowMutex except that lock holders can drop the lock to release it.
// Scheduling logic
// launchLimits[n] = configured amount from the launchLimit vector for priority n
// waiters[n] = the number of waiters for priority n
// runnerCounts[n] = number of runners at priority n
//
// totalActiveLaunchLimits = sum of limits for all priorities with waiters
// When waiters[n] becomes == 0, totalActiveLaunchLimits -= launchLimits[n]
// When waiters[n] becomes > 0, totaltotalActiveLaunchLimitsLimits += launchLimits[n]
//
// The total capacity of a priority to be considered when launching tasks is
// ceil(launchLimits[n] / totalLimits * concurrency)
//
// The interface is similar to FlowMutex except that lock holders can just drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
@ -57,7 +78,8 @@ public:
: PriorityMultiLock(concurrency, parseStringToVector<int>(launchLimits, ',')) {}
PriorityMultiLock(int concurrency, std::vector<int> launchLimitsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), launchLimits(launchLimitsByPriority) {
: concurrency(concurrency), available(concurrency), waiting(0), totalActiveLaunchLimits(0),
launchLimits(launchLimitsByPriority) {
waiters.resize(launchLimits.size());
runnerCounts.resize(launchLimits.size(), 0);
@ -67,17 +89,29 @@ public:
~PriorityMultiLock() {}
Future<Lock> lock(int priority = 0) {
// This shortcut may enable a waiter to jump the line when the releaser loop yields
if (available > 0) {
Lock p;
addRunner(p, priority);
return p;
}
auto& q = waiters[priority];
Waiter w;
waiters[priority].push_back(w);
// If this priority currently has no waiters
if (q.empty()) {
// Add this priority's launch limit to totalLimits
totalActiveLaunchLimits += launchLimits[priority];
// If there are slots available and the priority has capacity then don't make the caller wait
if (available > 0 && runnerCounts[priority] < currentCapacity(priority)) {
// Remove this priority's launch limit from the total since it will remain empty
totalActiveLaunchLimits -= launchLimits[priority];
// Return a Lock to the caller
Lock p;
addRunner(p, priority);
return p;
}
}
q.push_back(w);
++waiting;
pml_debug_printf("lock line %d priority %d %s\n", __LINE__, priority, toString().c_str());
return w.lockPromise.getFuture();
}
@ -102,7 +136,7 @@ public:
}
std::string s =
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
this,
concurrency,
available,
@ -112,7 +146,12 @@ public:
runnersDone);
for (int i = 0; i < waiters.size(); ++i) {
s += format("p%d_waiters=%u ", i, waiters[i].size());
s += format("p%d: limit=%d run=%d wait=%d cap=%d ",
i,
launchLimits[i],
runnerCounts[i],
waiters[i].size(),
waiters[i].empty() ? 0 : currentCapacity(i));
}
s += "}";
@ -141,16 +180,27 @@ private:
double queuedTime;
};
// Total execution slots allowed across all priorities
int concurrency;
// Current available execution slots
int available;
// Total waiters across all priorities
int waiting;
// Sum of launch limits for all priorities with 1 or more waiters
int totalActiveLaunchLimits;
typedef Deque<Waiter> Queue;
// Launch limits by priority
std::vector<int> launchLimits;
// Queue of waiters by priority
std::vector<Queue> waiters;
// Count of active runners by priority
std::vector<int> runnerCounts;
// Current or recent (ended) runners
Deque<Future<Void>> runners;
Future<Void> fRunner;
AsyncTrigger release;
AsyncTrigger wakeRunner;
Promise<Void> brokenOnDestruct;
void addRunner(Lock& lock, int priority) {
@ -159,46 +209,88 @@ private:
runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) {
++available;
runnerCounts[priority] -= 1;
if (waiting > 0 || runners.size() > 100) {
release.trigger();
// If there are any waiters or if the runners array is getting large, trigger the runner loop
if (waiting > 0 || runners.size() > 1000) {
wakeRunner.trigger();
}
return Void();
}));
}
// Current maximum running tasks for the specified priority, which must have waiters
// or the result is undefined
int currentCapacity(int priority) const {
// The total concurrency allowed for this priority at present is the total concurrency times
// priority's launch limit divided by the total launch limits for all priorities with waiters.
return ceil((float)launchLimits[priority] / totalActiveLaunchLimits * concurrency);
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
state int maxPriority = self->waiters.size() - 1;
// Priority to try to run tasks from next
state int priority = maxPriority;
state int ioLaunchLimit = self->launchLimits[priority];
state Queue* pQueue = &self->waiters[maxPriority];
// Track the number of waiters unlocked at the same priority in a row
state int lastPriorityCount = 0;
state int priority = 0;
loop {
pml_debug_printf(
"runner loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
wait(self->release.onTrigger());
wait(self->wakeRunner.onTrigger());
pml_debug_printf(
"runner loop wake line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
if (++sinceYield == 1000) {
if (++sinceYield == 100) {
sinceYield = 0;
wait(delay(0));
pml_debug_printf(
" runner afterDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
}
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
pml_debug_printf(
" launch loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
// Find the next priority with waiters and capacity. There must be at least one.
while (self->waiters[priority].empty() ||
self->runnerCounts[priority] >= self->currentCapacity(priority)) {
if (++priority == self->waiters.size()) {
priority = 0;
}
pml_debug_printf(" launch loop scan line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
}
Queue& queue = self->waiters[priority];
while (!queue.empty() && self->runnerCounts[priority] < self->currentCapacity(priority)) {
pml_debug_printf(
" launching line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
Waiter w = queue.front();
queue.pop_front();
// If this priority is now empty, subtract its launch limit from totalLimits
if (queue.empty()) {
self->totalActiveLaunchLimits -= self->launchLimits[priority];
pml_debug_printf(" emptied priority line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
}
while (!pQueue->empty() && lastPriorityCount++ < ioLaunchLimit) {
Waiter w = pQueue->front();
pQueue->pop_front();
--self->waiting;
Lock lock;
@ -215,34 +307,14 @@ private:
// A slot has been consumed, so stop reading from this queue if there aren't any more
if (self->available == 0) {
pml_debug_printf(" avail now 0 line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
break;
}
}
}
// If there are no more slots available, then don't move to the next priority
if (self->available == 0) {
break;
}
// Decrease priority, wrapping around to max from 0
if (priority == 0) {
priority = maxPriority;
} else {
--priority;
}
// Set launch limit to configured limit for the new priority to launch from
ioLaunchLimit = self->launchLimits[priority];
// If there are waiters at other priority levels, then reduce the launch limit by the number of
// runners for priority, possibly reducing it all the way to 0.
if (self->numWaiters(priority) < self->waiting) {
ioLaunchLimit = std::max(0, self->numRunners(priority));
}
pQueue = &self->waiters[priority];
lastPriorityCount = 0;
}
}
}

View File

@ -0,0 +1,37 @@
/*
* BenchStream.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "benchmark/benchmark.h"
#include "flow/flow.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/PriorityMultiLock.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
return Void();
}
static void bench_priorityMultiLock(benchmark::State& benchState) {
onMainThread([&benchState]() { return benchPriorityMultiLock(&benchState); }).blockUntilReady();
}
BENCHMARK(bench_priorityMultiLock)->DenseRange(1, 8)->ReportAggregatesOnly(true);