Merge pull request #8797 from sfc-gh-satherton/pml-delay

Another ProrityMultiLock refactor and re-add StorageServer priority read locking without perf regression
This commit is contained in:
Steve Atherton 2022-11-14 11:30:01 -08:00 committed by GitHub
commit 2b133e5bd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 129 additions and 224 deletions

View File

@ -820,10 +820,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 );
// Read priority definitions in the form of a list of their relative concurrency share weights
init( STORAGESERVER_READ_PRIORITIES, "120,10,20,40,60" );
// The total concurrency which will be shared by active priorities according to their relative weights
init( STORAGE_SERVER_READ_CONCURRENCY, 70 );
// Priorities which each ReadType maps to, in enumeration order
init( STORAGESERVER_READ_RANKS, "0,2,1,1,1" );
init( STORAGESERVER_READ_PRIORITIES, "48,32,8" );
// The priority number which each ReadType maps to in enumeration order
// This exists for flexibility but assigning each ReadType to its own unique priority number makes the most sense
// The enumeration is currently: eager, fetch, low, normal, high
init( STORAGESERVER_READTYPE_PRIORITY_MAP, "0,1,2,3,4" );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
@ -947,7 +951,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );
init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; }
init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); }
init( REDWOOD_PRIORITY_LAUNCHS, "32,32,32,32" );
init( REDWOOD_IO_PRIORITIES, "32,32,32,32" );
init( REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT, false );
// Server request latency measurement

View File

@ -770,9 +770,9 @@ public:
int QUICK_GET_KEY_VALUES_LIMIT;
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
int STORAGE_FEED_QUERY_HARD_LIMIT;
int STORAGE_SERVER_READ_CONCURRENCY;
std::string STORAGESERVER_READ_RANKS;
std::string STORAGESERVER_READ_PRIORITIES;
int STORAGE_SERVER_READ_CONCURRENCY;
std::string STORAGESERVER_READTYPE_PRIORITY_MAP;
// Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
@ -921,7 +921,7 @@ public:
int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches
bool REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT; // Whether to split pages by tenant if encryption is enabled
std::string REDWOOD_PRIORITY_LAUNCHS;
std::string REDWOOD_IO_PRIORITIES;
// Server request latency measurement
int LATENCY_SAMPLE_SIZE;

View File

@ -2025,7 +2025,8 @@ public:
bool memoryOnly,
Reference<IPageEncryptionKeyProvider> keyProvider,
Promise<Void> errorPromise = {})
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_PRIORITY_LAUNCHS),
: keyProvider(keyProvider),
ioLock(makeReference<PriorityMultiLock>(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_IO_PRIORITIES)),
pageCacheBytes(pageCacheSizeBytes), desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize),
filename(filename), memoryOnly(memoryOnly), errorPromise(errorPromise),
remapCleanupWindowBytes(remapCleanupWindowBytes), concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
@ -2037,7 +2038,7 @@ public:
// This sets the page cache size for all PageCacheT instances using the same evictor
pageCache.evictor().sizeLimit = pageCacheBytes;
g_redwoodMetrics.ioLock = &ioLock;
g_redwoodMetrics.ioLock = ioLock.getPtr();
if (!g_redwoodMetricsActor.isValid()) {
g_redwoodMetricsActor = redwoodMetricsLogger();
}
@ -2499,7 +2500,7 @@ public:
unsigned int level,
bool header) {
state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(header ? ioMaxPriority : ioMinPriority));
state PriorityMultiLock::Lock lock = wait(self->ioLock->lock(header ? ioMaxPriority : ioMinPriority));
++g_redwoodMetrics.metric.pagerDiskWrite;
g_redwoodMetrics.level(level).metrics.events.addEventReason(PagerEvents::PageWrite, reason);
if (self->memoryOnly) {
@ -2779,7 +2780,7 @@ public:
int blockSize,
int64_t offset,
int priority) {
state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority)));
state PriorityMultiLock::Lock lock = wait(self->ioLock->lock(std::min(priority, ioMaxPriority)));
++g_redwoodMetrics.metric.pagerDiskRead;
int bytes = wait(self->pageFile->read(pageBuffer->rawData() + pageOffset, blockSize, offset));
return bytes;
@ -3593,7 +3594,7 @@ public:
// The next section explicitly cancels all pending operations held in the pager
debug_printf("DWALPager(%s) shutdown kill ioLock\n", self->filename.c_str());
self->ioLock.kill();
self->ioLock->kill();
debug_printf("DWALPager(%s) shutdown cancel recovery\n", self->filename.c_str());
self->recoverFuture.cancel();
@ -3802,7 +3803,7 @@ private:
Reference<IPageEncryptionKeyProvider> keyProvider;
PriorityMultiLock ioLock;
Reference<PriorityMultiLock> ioLock;
int64_t pageCacheBytes;
@ -8894,32 +8895,25 @@ void RedwoodMetrics::getIOLockFields(TraceEvent* e, std::string* s) {
int maxPriority = ioLock->maxPriority();
if (e != nullptr) {
e->detail("ActiveReads", ioLock->totalRunners());
e->detail("AwaitReads", ioLock->totalWaiters());
e->detail("IOActiveTotal", ioLock->getRunnersCount());
e->detail("IOWaitingTotal", ioLock->getWaitersCount());
for (int priority = 0; priority <= maxPriority; ++priority) {
e->detail(format("ActiveP%d", priority), ioLock->numRunners(priority));
e->detail(format("AwaitP%d", priority), ioLock->numWaiters(priority));
e->detail(format("IOActiveP%d", priority), ioLock->getRunnersCount(priority));
e->detail(format("IOWaitingP%d", priority), ioLock->getWaitersCount(priority));
}
}
if (s != nullptr) {
std::string active = "Active";
std::string await = "Await";
*s += "\n";
*s += format("%-15s %-8u ", "ActiveReads", ioLock->totalRunners());
*s += format("%-15s %-8u ", "AwaitReads", ioLock->totalWaiters());
*s += "\n";
*s += format("%-15s %-8u ", "IOActiveTotal", ioLock->getRunnersCount());
for (int priority = 0; priority <= maxPriority; ++priority) {
*s +=
format("%-15s %-8u ", (active + 'P' + std::to_string(priority)).c_str(), ioLock->numRunners(priority));
*s += format("IOActiveP%-6d %-8u ", priority, ioLock->getRunnersCount(priority));
}
*s += "\n";
*s += format("%-15s %-8u ", "IOWaitingTotal", ioLock->getWaitersCount());
for (int priority = 0; priority <= maxPriority; ++priority) {
*s +=
format("%-15s %-8u ", (await + 'P' + std::to_string(priority)).c_str(), ioLock->numWaiters(priority));
*s += format("IOWaitingP%-5d %-8u ", priority, ioLock->getWaitersCount(priority));
}
}
}
@ -11407,57 +11401,3 @@ TEST_CASE(":/redwood/performance/histograms") {
return Void();
}
ACTOR Future<Void> waitLockIncrement(PriorityMultiLock* pml, int priority, int* pout) {
state PriorityMultiLock::Lock lock = wait(pml->lock(priority));
wait(delay(deterministicRandom()->random01() * .1));
++*pout;
return Void();
}
TEST_CASE("/redwood/PriorityMultiLock") {
state std::vector<int> priorities = { 10, 20, 40 };
state int concurrency = 25;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks at each level
state std::vector<Future<PriorityMultiLock::Lock>> lockFutures;
for (int i = 0; i < priorities.size(); ++i) {
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(i));
}
}
// Wait for n = concurrency locks to be acquired
wait(quorum(lockFutures, concurrency));
state std::vector<Future<Void>> futures;
for (int i = 0; i < 10e3; ++i) {
int p = i % priorities.size();
futures.push_back(waitLockIncrement(pml, p, &counts[p]));
}
state Future<Void> f = waitForAll(futures);
// Release the locks
lockFutures.clear();
// Print stats and wait for all futures to be ready
loop {
choose {
when(wait(delay(1))) {
printf("counts: ");
for (auto c : counts) {
printf("%d ", c);
}
printf(" pml: %s\n", pml->toString().c_str());
}
when(wait(f)) { break; }
}
}
delete pml;
return Void();
}

View File

@ -1110,15 +1110,13 @@ public:
FlowLock serveFetchCheckpointParallelismLock;
PriorityMultiLock ssLock;
Reference<PriorityMultiLock> ssLock;
std::vector<int> readPriorityRanks;
Future<PriorityMultiLock::Lock> getReadLock(const Optional<ReadOptions>& options) {
// TODO: Fix perf regression in 100% cache read case where taking this lock adds too much overhead
return PriorityMultiLock::Lock();
// int readType = (int)(options.present() ? options.get().type : ReadType::NORMAL);
// readType = std::clamp<int>(readType, 0, readPriorityRanks.size() - 1);
// return ssLock.lock(readPriorityRanks[readType]);
int readType = (int)(options.present() ? options.get().type : ReadType::NORMAL);
readType = std::clamp<int>(readType, 0, readPriorityRanks.size() - 1);
return ssLock->lock(readPriorityRanks[readType]);
}
FlowLock serveAuditStorageParallelismLock;
@ -1407,7 +1405,8 @@ public:
fetchKeysParallelismFullLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_FULL),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
ssLock(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
ssLock(makeReference<PriorityMultiLock>(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY,
SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES)),
serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
@ -1415,7 +1414,7 @@ public:
busiestWriteTagContext(ssi.id()), counters(this),
storageServerSourceTLogIDEventHolder(
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READ_RANKS, ',');
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READTYPE_PRIORITY_MAP, ',');
ASSERT(readPriorityRanks.size() > (int)ReadType::MAX);
version.initMetric("StorageServer.Version"_sr, counters.cc.getId());
oldestVersion.initMetric("StorageServer.OldestVersion"_sr, counters.cc.getId());
@ -10431,20 +10430,20 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
te.detail("Tag", self->tag.toString());
std::vector<int> rpr = self->readPriorityRanks;
te.detail("ReadsActive", self->ssLock.totalRunners());
te.detail("ReadsWaiting", self->ssLock.totalWaiters());
te.detail("ReadsTotalActive", self->ssLock->getRunnersCount());
te.detail("ReadsTotalWaiting", self->ssLock->getWaitersCount());
int type = (int)ReadType::FETCH;
te.detail("ReadFetchActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadFetchWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadFetchActive", self->ssLock->getRunnersCount(rpr[type]));
te.detail("ReadFetchWaiting", self->ssLock->getWaitersCount(rpr[type]));
type = (int)ReadType::LOW;
te.detail("ReadLowActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadLowWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadLowActive", self->ssLock->getRunnersCount(rpr[type]));
te.detail("ReadLowWaiting", self->ssLock->getWaitersCount(rpr[type]));
type = (int)ReadType::NORMAL;
te.detail("ReadNormalActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadNormalWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadNormalActive", self->ssLock->getRunnersCount(rpr[type]));
te.detail("ReadNormalWaiting", self->ssLock->getWaitersCount(rpr[type]));
type = (int)ReadType::HIGH;
te.detail("ReadHighActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadHighWaiting", self->ssLock.numWaiters(rpr[type]));
te.detail("ReadHighActive", self->ssLock->getRunnersCount(rpr[type]));
te.detail("ReadHighWaiting", self->ssLock->getWaitersCount(rpr[type]));
StorageBytes sb = self->storage.getStorageBytes();
te.detail("KvstoreBytesUsed", sb.used);
te.detail("KvstoreBytesFree", sb.free);
@ -11260,7 +11259,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
// If the storage server dies while something that uses self is still on the stack,
// we want that actor to complete before we terminate and that memory goes out of scope
self.ssLock.kill();
self.ssLock->kill();
state Error err = e;
if (storageServerTerminated(self, persistentData, err)) {
@ -11358,7 +11357,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
throw internal_error();
} catch (Error& e) {
self.ssLock.kill();
self.ssLock->kill();
if (self.byteSampleRecovery.isValid()) {
self.byteSampleRecovery.cancel();

View File

@ -29,21 +29,25 @@
#define PRIORITYMULTILOCK_ACTOR_H
#include "flow/flow.h"
#include <boost/intrusive/list.hpp>
#include "flow/actorcompiler.h" // This must be the last #include.
#define PRIORITYMULTILOCK_DEBUG 0
#if PRIORITYMULTILOCK_DEBUG || !defined(NO_INTELLISENSE)
#define pml_debug_printf(...) \
if (now() > 0) \
printf(__VA_ARGS__)
if (now() > 0) { \
printf("pml line=%04d ", __LINE__); \
printf(__VA_ARGS__); \
}
#else
#define pml_debug_printf(...)
#endif
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
// id and are granted locks based on a total concurrency and relative weights of the current active
// priorities. Priority id's must start at 0 and are sequential integers.
// priorities. Priority id's must start at 0 and are sequential integers. Priority id numbers
// are not related to the importance of the priority in execution.
//
// Scheduling logic
// Let
@ -64,17 +68,17 @@
// The interface is similar to FlowMutex except that lock holders can just drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// Lock lock = wait(prioritylock.lock(priority_id));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
class PriorityMultiLock : public ReferenceCounted<PriorityMultiLock> {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
bool isLocked() const { return promise.canBeSet(); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
@ -84,10 +88,11 @@ public:
: PriorityMultiLock(concurrency, parseStringToVector<int>(weights, ',')) {}
PriorityMultiLock(int concurrency, std::vector<int> weightsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), totalPendingWeights(0), releaseDebugID(0) {
: concurrency(concurrency), available(concurrency), waiting(0), totalPendingWeights(0) {
priorities.resize(weightsByPriority.size());
for (int i = 0; i < priorities.size(); ++i) {
priorities[i].priority = i;
priorities[i].weight = weightsByPriority[i];
}
@ -102,7 +107,8 @@ public:
// If this priority currently has no waiters
if (q.empty()) {
// Add this priority's weight to the total for priorities with pending work
// Add this priority's weight to the total for priorities with pending work. This must be done
// so that currenctCapacity() below will assign capacaity to this priority.
totalPendingWeights += p.weight;
// If there are slots available and the priority has capacity then don't make the caller wait
@ -114,80 +120,69 @@ public:
Lock lock;
addRunner(lock, &p);
pml_debug_printf("lock nowait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
pml_debug_printf("lock nowait priority %d %s\n", priority, toString().c_str());
return lock;
}
// If we didn't return above then add the priority to the waitingPriorities list
waitingPriorities.push_back(p);
}
Waiter w;
q.push_back(w);
Waiter& w = q.emplace_back();
++waiting;
pml_debug_printf("lock wait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
pml_debug_printf("lock wait priority %d %s\n", priority, toString().c_str());
return w.lockPromise.getFuture();
}
void kill() {
pml_debug_printf("kill %s\n", toString().c_str());
brokenOnDestruct.reset();
// handleRelease will not free up any execution slots when it ends via cancel
fRunner.cancel();
available = 0;
runners.clear();
waitingPriorities.clear();
priorities.clear();
}
std::string toString() const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].isReady()) {
++runnersDone;
}
}
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d "
"runnersDone=%d pendingWeights=%d ",
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d "
"pendingWeights=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone,
totalPendingWeights);
for (int i = 0; i < priorities.size(); ++i) {
s += format("p%d:{%s} ", i, priorities[i].toString(this).c_str());
for (auto& p : priorities) {
s += format("{%s} ", p.toString(this).c_str());
}
s += "}";
if (concurrency - available != runners.size() - runnersDone) {
pml_debug_printf("%s\n", s.c_str());
ASSERT_EQ(concurrency - available, runners.size() - runnersDone);
}
return s;
}
int maxPriority() const { return priorities.size() - 1; }
int totalWaiters() const { return waiting; }
int getRunnersCount() const { return concurrency - available; }
int getWaitersCount() const { return waiting; }
int numWaiters(const unsigned int priority) const {
int getWaitersCount(const unsigned int priority) const {
ASSERT(priority < priorities.size());
return priorities[priority].queue.size();
}
int totalRunners() const { return concurrency - available; }
int numRunners(const unsigned int priority) const {
int getRunnersCount(const unsigned int priority) const {
ASSERT(priority < priorities.size());
return priorities[priority].runners;
}
private:
struct Waiter {
Waiter() {}
Promise<Lock> lockPromise;
};
@ -202,8 +197,8 @@ private:
typedef Deque<Waiter> Queue;
struct Priority {
Priority() : runners(0), weight(0) {}
struct Priority : boost::intrusive::list_base_hook<> {
Priority() : runners(0), weight(0), priority(-1) {}
// Queue of waiters at this priority
Queue queue;
@ -211,9 +206,12 @@ private:
int runners;
// Configured weight for this priority
int weight;
// Priority number for convenience, matches *this's index in PML priorities vector
int priority;
std::string toString(const PriorityMultiLock* pml) const {
return format("weight=%d run=%d wait=%d cap=%d",
return format("priority=%d weight=%d run=%d wait=%d cap=%d",
priority,
weight,
runners,
queue.size(),
@ -222,51 +220,41 @@ private:
};
std::vector<Priority> priorities;
typedef boost::intrusive::list<Priority, boost::intrusive::constant_time_size<false>> WaitingPrioritiesList;
// Current or recent (ended) runners
Deque<Future<Void>> runners;
// List of all priorities with 1 or more waiters. This list exists so that the scheduling loop
// does not have to iterage over the priorities vector checking priorities without waiters.
WaitingPrioritiesList waitingPriorities;
Future<Void> fRunner;
AsyncTrigger wakeRunner;
Promise<Void> brokenOnDestruct;
// Used for debugging, can roll over without issue
unsigned int releaseDebugID;
ACTOR static Future<Void> handleRelease(PriorityMultiLock* self, Future<Void> f, Priority* priority) {
state [[maybe_unused]] unsigned int id = self->releaseDebugID++;
pml_debug_printf("%f handleRelease self=%p id=%u start \n", now(), self, id);
ACTOR static void handleRelease(Reference<PriorityMultiLock> self, Priority* priority, Future<Void> holder) {
pml_debug_printf("%f handleRelease self=%p start\n", now(), self.getPtr());
try {
wait(f);
pml_debug_printf("%f handleRelease self=%p id=%u success\n", now(), self, id);
wait(holder);
pml_debug_printf("%f handleRelease self=%p success\n", now(), self.getPtr());
} catch (Error& e) {
pml_debug_printf("%f handleRelease self=%p id=%u error %s\n", now(), self, id, e.what());
if (e.code() == error_code_actor_cancelled) {
throw;
}
pml_debug_printf("%f handleRelease self=%p error %s\n", now(), self.getPtr(), e.what());
}
pml_debug_printf("lock release line %d priority %d %s\n",
__LINE__,
(int)(priority - &self->priorities.front()),
self->toString().c_str());
pml_debug_printf("lock release priority %d %s\n", (int)(priority->priority), self->toString().c_str());
pml_debug_printf("%f handleRelease self=%p id=%u releasing\n", now(), self, id);
pml_debug_printf("%f handleRelease self=%p releasing\n", now(), self.getPtr());
++self->available;
priority->runners -= 1;
// If there are any waiters or if the runners array is getting large, trigger the runner loop
if (self->waiting > 0 || self->runners.size() > 1000) {
if (self->waiting > 0) {
self->wakeRunner.trigger();
}
return Void();
}
void addRunner(Lock& lock, Priority* p) {
p->runners += 1;
void addRunner(Lock& lock, Priority* priority) {
priority->runners += 1;
--available;
runners.push_back(handleRelease(this, lock.promise.getFuture(), p));
handleRelease(Reference<PriorityMultiLock>::addRef(this), priority, lock.promise.getFuture());
}
// Current maximum running tasks for the specified priority, which must have waiters
@ -278,76 +266,50 @@ private:
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
// Priority to try to run tasks from next
state int priority = 0;
state WaitingPrioritiesList::iterator p = self->waitingPriorities.end();
loop {
pml_debug_printf(
"runner loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
pml_debug_printf("runner loop start priority=%d %s\n", p->priority, self->toString().c_str());
// Wait for a runner to release its lock
pml_debug_printf(
"runner loop waitTrigger line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
pml_debug_printf("runner loop waitTrigger priority=%d %s\n", p->priority, self->toString().c_str());
wait(self->wakeRunner.onTrigger());
pml_debug_printf(
"%f runner loop wake line %d priority=%d %s\n", now(), __LINE__, priority, self->toString().c_str());
if (++sinceYield == 100) {
sinceYield = 0;
pml_debug_printf(
" runner waitDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
wait(delay(0));
pml_debug_printf(
" runner afterDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
}
pml_debug_printf("%f runner loop wake priority=%d %s\n", now(), p->priority, self->toString().c_str());
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
pml_debug_printf(
" launch loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
Priority* pPriority;
pml_debug_printf(" launch loop start priority=%d %s\n", p->priority, self->toString().c_str());
// Find the next priority with waiters and capacity. There must be at least one.
loop {
// Rotate to next priority
if (++priority == self->priorities.size()) {
priority = 0;
if (p == self->waitingPriorities.end()) {
p = self->waitingPriorities.begin();
}
pPriority = &self->priorities[priority];
pml_debug_printf(" launch loop scan priority=%d %s\n", p->priority, self->toString().c_str());
pml_debug_printf(" launch loop scan line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
if (!pPriority->queue.empty() && pPriority->runners < self->currentCapacity(pPriority->weight)) {
if (!p->queue.empty() && p->runners < self->currentCapacity(p->weight)) {
break;
}
++p;
}
Queue& queue = pPriority->queue;
Queue& queue = p->queue;
Waiter w = queue.front();
queue.pop_front();
// If this priority is now empty, subtract its weight from the total pending weights
// If this priority is now empty, subtract its weight from the total pending weights an remove it
// from the waitingPriorities list
Priority* pPriority = &*p;
if (queue.empty()) {
p = self->waitingPriorities.erase(p);
self->totalPendingWeights -= pPriority->weight;
pml_debug_printf(" emptied priority line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
pml_debug_printf(
" emptied priority priority=%d %s\n", pPriority->priority, self->toString().c_str());
}
--self->waiting;
@ -365,10 +327,9 @@ private:
self->addRunner(lock, pPriority);
}
pml_debug_printf(" launched line %d alreadyDone=%d priority=%d %s\n",
__LINE__,
pml_debug_printf(" launched alreadyDone=%d priority=%d %s\n",
!lock.promise.canBeSet(),
priority,
pPriority->priority,
self->toString().c_str());
}
}

View File

@ -25,26 +25,28 @@
#include "flow/PriorityMultiLock.actor.h"
#include <deque>
#include "flow/actorcompiler.h" // This must be the last #include.
#include "fmt/printf.h"
ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
state std::vector<int> priorities;
// Arg1 is the number of active priorities to use
// Arg2 is the number of inactive priorities to use
state int active = benchState->range(0);
state int inactive = benchState->range(1);
// Set up priority list with limits 10, 20, 30, ...
while (priorities.size() < benchState->range(0)) {
state std::vector<int> priorities;
while (priorities.size() < active + inactive) {
priorities.push_back(10 * (priorities.size() + 1));
}
state int concurrency = priorities.size() * 10;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
state Reference<PriorityMultiLock> pml = makeReference<PriorityMultiLock>(concurrency, priorities);
// Clog the lock buy taking concurrency locks
// Clog the lock buy taking n=concurrency locks
state std::deque<Future<PriorityMultiLock::Lock>> lockFutures;
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(j % priorities.size()));
lockFutures.push_back(pml->lock(j % active));
}
// Wait for all of the initial locks to be taken
// This will work regardless of their priorities as there are only n = concurrency of them
wait(waitForAll(std::vector<Future<PriorityMultiLock::Lock>>(lockFutures.begin(), lockFutures.end())));
@ -64,7 +66,7 @@ ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
PriorityMultiLock::Lock lock = wait(f);
// Rotate to another priority
if (++p == priorities.size()) {
if (++p == active) {
p = 0;
}
@ -76,7 +78,6 @@ ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
benchState->SetItemsProcessed(static_cast<long>(benchState->iterations()));
delete pml;
return Void();
}
@ -84,4 +85,4 @@ static void bench_priorityMultiLock(benchmark::State& benchState) {
onMainThread([&benchState]() { return benchPriorityMultiLock(&benchState); }).blockUntilReady();
}
BENCHMARK(bench_priorityMultiLock)->DenseRange(1, 8)->ReportAggregatesOnly(true);
BENCHMARK(bench_priorityMultiLock)->Args({ 5, 0 })->Ranges({ { 1, 64 }, { 0, 128 } })->ReportAggregatesOnly(true);