Merge branch 'fzhao/feature-testing' into fzhao/RedwoodIOLaunchLimit

This commit is contained in:
Steve Atherton 2022-09-20 01:14:04 -07:00
commit 3c306cc558
8 changed files with 434 additions and 243 deletions

View File

@ -290,7 +290,8 @@ else()
add_link_options(-stdlib=libc++ -Wl,-build-id=sha1)
endif()
endif()
if (NOT APPLE)
if (NOT APPLE AND NOT USE_LIBCXX)
message(STATUS "Linking libatomic")
add_link_options(-latomic)
endif()
if (OPEN_FOR_IDE)

View File

@ -778,7 +778,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 );
init( STORAGESERVER_MAX_RANK, 2 );
// Priorities which each ReadType maps to, in enumeration order
init( STORAGESERVER_READ_RANKS, "0,2,1,1,1" );
init( STORAGESERVER_READ_PRIORITIES, "48,32,8" );
@ -893,7 +892,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_DEFAULT_EXTENT_SIZE, 32 * 1024 * 1024 );
init( REDWOOD_DEFAULT_EXTENT_READ_SIZE, 1024 * 1024 );
init( REDWOOD_EXTENT_CONCURRENT_READS, 4 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
init( REDWOOD_KVSTORE_RANGE_PREFETCH, true );
init( REDWOOD_PAGE_REBUILD_MAX_SLACK, 0.33 );
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
@ -906,7 +904,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );
init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; }
init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); }
init( REDWOOD_IO_MAX_PRIORITY, 3 );
init( REDWOOD_PRIORITY_LAUNCHS, "32,32,32,32" );
// Server request latency measurement

View File

@ -733,7 +733,6 @@ public:
int QUICK_GET_KEY_VALUES_LIMIT;
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
int STORAGE_FEED_QUERY_HARD_LIMIT;
int STORAGESERVER_MAX_RANK;
std::string STORAGESERVER_READ_RANKS;
std::string STORAGESERVER_READ_PRIORITIES;
@ -865,7 +864,6 @@ public:
int REDWOOD_DEFAULT_EXTENT_SIZE; // Extent size for new Redwood files
int REDWOOD_DEFAULT_EXTENT_READ_SIZE; // Extent read size for Redwood files
int REDWOOD_EXTENT_CONCURRENT_READS; // Max number of simultaneous extent disk reads in progress.
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
bool REDWOOD_KVSTORE_RANGE_PREFETCH; // Whether to use range read prefetching
double REDWOOD_PAGE_REBUILD_MAX_SLACK; // When rebuilding pages, max slack to allow in page
int REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES; // Number of pages to try to pop from the lazy delete queue and process at
@ -884,7 +882,6 @@ public:
bool REDWOOD_EVICT_UPDATED_PAGES; // Whether to prioritize eviction of updated pages from cache.
int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches
int REDWOOD_IO_MAX_PRIORITY;
std::string REDWOOD_PRIORITY_LAUNCHS;
// Server request latency measurement

View File

@ -28,6 +28,7 @@
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/Histogram.h"
#include "flow/PriorityMultiLock.actor.h"
#include <limits>
#include <random>
#include "fdbrpc/ContinuousSample.h"
@ -102,8 +103,6 @@ std::string addPrefix(std::string prefix, std::string lines) {
return s;
}
#define PRIORITYMULTILOCK_DEBUG 0
// Some convenience functions for debugging to stringify various structures
// Classes can add compatibility by either specializing toString<T> or implementing
// std::string toString() const;
@ -223,7 +222,7 @@ std::string toString(const std::pair<F, S>& o) {
constexpr static int ioMinPriority = 0;
constexpr static int ioLeafPriority = 1;
constexpr static int ioMaxPriority = 3;
constexpr static int maxConcurrentReadsLaunchLimit = std::numeric_limits<int>::max();
// A FIFO queue of T stored as a linked list of pages.
// Main operations are pop(), pushBack(), pushFront(), and flush().
//
@ -2009,9 +2008,7 @@ public:
bool memoryOnly,
Reference<IEncryptionKeyProvider> keyProvider,
Promise<Void> errorPromise = {})
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING,
SERVER_KNOBS->REDWOOD_IO_MAX_PRIORITY,
SERVER_KNOBS->REDWOOD_PRIORITY_LAUNCHS),
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_PRIORITY_LAUNCHS),
pageCacheBytes(pageCacheSizeBytes), desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize),
filename(filename), memoryOnly(memoryOnly), errorPromise(errorPromise),
remapCleanupWindowBytes(remapCleanupWindowBytes), concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
@ -7513,10 +7510,7 @@ RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff"))
class KeyValueStoreRedwood : public IKeyValueStore {
public:
KeyValueStoreRedwood(std::string filename, UID logID, Reference<IEncryptionKeyProvider> encryptionKeyProvider)
: m_filename(filename), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS,
0,
std::to_string(maxConcurrentReadsLaunchLimit)),
prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
: m_filename(filename), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
int pageSize =
BUGGIFY ? deterministicRandom()->randomInt(1000, 4096 * 4) : SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE;
@ -7678,7 +7672,6 @@ public:
f.get();
} else {
CODE_PROBE(true, "Uncached forward range read seek");
wait(store(lock, self->m_concurrentReads.lock()));
wait(f);
}
@ -7734,7 +7727,6 @@ public:
f.get();
} else {
CODE_PROBE(true, "Uncached reverse range read seek");
wait(store(lock, self->m_concurrentReads.lock()));
wait(f);
}
@ -7801,9 +7793,6 @@ public:
wait(self->m_tree->initBTreeCursor(
&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::PointRead, options));
// Not locking for point reads, instead relying on IO priority lock
// state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock());
++g_redwoodMetrics.metric.opGet;
wait(cur.seekGTE(key));
if (cur.isValid() && cur.get().key == key) {
@ -7839,7 +7828,6 @@ private:
Future<Void> m_init;
Promise<Void> m_closed;
Promise<Void> m_error;
PriorityMultiLock m_concurrentReads;
bool prefetch;
Version m_nextCommitVersion;
Reference<IEncryptionKeyProvider> m_keyProvider;
@ -8482,11 +8470,11 @@ void RedwoodMetrics::getIOLockFields(TraceEvent* e, std::string* s) {
int maxPriority = ioLock->maxPriority();
if (e != nullptr) {
e->detail("ActiveReads", ioLock->totalWorkers());
e->detail("ActiveReads", ioLock->totalRunners());
e->detail("AwaitReads", ioLock->totalWaiters());
for (int priority = 0; priority <= maxPriority; ++priority) {
e->detail(format("ActiveP%d", priority), ioLock->numWorkers(priority));
e->detail(format("ActiveP%d", priority), ioLock->numRunners(priority));
e->detail(format("AwaitP%d", priority), ioLock->numWaiters(priority));
}
}
@ -8496,13 +8484,13 @@ void RedwoodMetrics::getIOLockFields(TraceEvent* e, std::string* s) {
std::string await = "Await";
*s += "\n";
*s += format("%-15s %-8u ", "ActiveReads", ioLock->totalWorkers());
*s += format("%-15s %-8u ", "ActiveReads", ioLock->totalRunners());
*s += format("%-15s %-8u ", "AwaitReads", ioLock->totalWaiters());
*s += "\n";
for (int priority = 0; priority <= maxPriority; ++priority) {
*s +=
format("%-15s %-8u ", (active + 'P' + std::to_string(priority)).c_str(), ioLock->numWorkers(priority));
format("%-15s %-8u ", (active + 'P' + std::to_string(priority)).c_str(), ioLock->numRunners(priority));
}
*s += "\n";
for (int priority = 0; priority <= maxPriority; ++priority) {
@ -10999,3 +10987,57 @@ TEST_CASE(":/redwood/performance/histograms") {
return Void();
}
ACTOR Future<Void> waitLockIncrement(PriorityMultiLock* pml, int priority, int* pout) {
state PriorityMultiLock::Lock lock = wait(pml->lock(priority));
wait(delay(deterministicRandom()->random01() * .1));
++*pout;
return Void();
}
TEST_CASE("/redwood/PriorityMultiLock") {
state std::vector<int> priorities = { 10, 20, 40 };
state int concurrency = 25;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks at each level
state std::vector<Future<PriorityMultiLock::Lock>> lockFutures;
for (int i = 0; i < priorities.size(); ++i) {
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(i));
}
}
// Wait for n = concurrency locks to be acquired
wait(quorum(lockFutures, concurrency));
state std::vector<Future<Void>> futures;
for (int i = 0; i < 10e3; ++i) {
int p = i % priorities.size();
futures.push_back(waitLockIncrement(pml, p, &counts[p]));
}
state Future<Void> f = waitForAll(futures);
// Release the locks
lockFutures.clear();
// Print stats and wait for all futures to be ready
loop {
choose {
when(wait(delay(1))) {
printf("counts: ");
for (auto c : counts) {
printf("%d ", c);
}
printf(" pml: %s\n", pml->toString().c_str());
}
when(wait(f)) { break; }
}
}
delete pml;
return Void();
}

View File

@ -37,6 +37,7 @@
#include "flow/Error.h"
#include "flow/Hash3.h"
#include "flow/Histogram.h"
#include "flow/PriorityMultiLock.actor.h"
#include "flow/IRandom.h"
#include "flow/IndexedSet.h"
#include "flow/SystemMonitor.h"
@ -1291,9 +1292,7 @@ public:
changeFeedDiskReadsLock(SERVER_KNOBS->CHANGE_FEED_DISK_READS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
ssLock(FLOW_KNOBS->MAX_OUTSTANDING * 2,
SERVER_KNOBS->STORAGESERVER_MAX_RANK,
SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
ssLock(FLOW_KNOBS->MAX_OUTSTANDING * 2, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()),
@ -9923,22 +9922,22 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
te.detail("Tag", self->tag.toString());
std::vector<int> rpr = self->readPriorityRanks;
te.detail("ActiveReads", self->ssLock.totalWorkers());
te.detail("ActiveReads", self->ssLock.totalRunners());
te.detail("AwaitReads", self->ssLock.totalWaiters());
int type = (int)ReadType::EAGER;
te.detail("ActiveEager", self->ssLock.numWorkers(rpr[type]));
te.detail("ActiveEager", self->ssLock.numRunners(rpr[type]));
te.detail("AwaitEager", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::FETCH;
te.detail("ActiveFetch", self->ssLock.numWorkers(rpr[type]));
te.detail("ActiveFetch", self->ssLock.numRunners(rpr[type]));
te.detail("AwaitFetch", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::LOW;
te.detail("ActiveLow", self->ssLock.numWorkers(rpr[type]));
te.detail("ActiveLow", self->ssLock.numRunners(rpr[type]));
te.detail("AwaitLow", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::NORMAL;
te.detail("ActiveNormal", self->ssLock.numWorkers(rpr[type]));
te.detail("ActiveNormal", self->ssLock.numRunners(rpr[type]));
te.detail("AwaitNormal", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::HIGH;
te.detail("ActiveHigh", self->ssLock.numWorkers(rpr[type]));
te.detail("ActiveHigh", self->ssLock.numRunners(rpr[type]));
te.detail("AwaitHigh", self->ssLock.numWaiters(rpr[type]));
StorageBytes sb = self->storage.getStorageBytes();
te.detail("KvstoreBytesUsed", sb.used);

View File

@ -0,0 +1,325 @@
/*
* PriorityMultiLock.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#if defined(NO_INTELLISENSE) && !defined(FLOW_PRIORITYMULTILOCK_ACTOR_G_H)
#define FLOW_PRIORITYMULTILOCK_ACTOR_G_H
#include "flow/PriorityMultiLock.actor.g.h"
#elif !defined(PRIORITYMULTILOCK_ACTOR_H)
#define PRIORITYMULTILOCK_ACTOR_H
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define PRIORITYMULTILOCK_DEBUG 0
#if PRIORITYMULTILOCK_DEBUG
#define pml_debug_printf(...) printf(__VA_ARGS__)
#else
#define pml_debug_printf(...)
#endif
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
// id and are granted locks based on a total concurrency and relative importants of the priority
// ids defined.
//
// Scheduling logic
// launchLimits[n] = configured amount from the launchLimit vector for priority n
// waiters[n] = the number of waiters for priority n
// runnerCounts[n] = number of runners at priority n
//
// totalActiveLaunchLimits = sum of limits for all priorities with waiters
// When waiters[n] becomes == 0, totalActiveLaunchLimits -= launchLimits[n]
// When waiters[n] becomes > 0, totaltotalActiveLaunchLimitsLimits += launchLimits[n]
//
// The total capacity of a priority to be considered when launching tasks is
// ceil(launchLimits[n] / totalLimits * concurrency)
//
// The interface is similar to FlowMutex except that lock holders can just drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
};
PriorityMultiLock(int concurrency, std::string launchLimits)
: PriorityMultiLock(concurrency, parseStringToVector<int>(launchLimits, ',')) {}
PriorityMultiLock(int concurrency, std::vector<int> launchLimitsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), totalActiveLaunchLimits(0),
launchLimits(launchLimitsByPriority) {
waiters.resize(launchLimits.size());
runnerCounts.resize(launchLimits.size(), 0);
fRunner = runner(this);
}
~PriorityMultiLock() {}
Future<Lock> lock(int priority = 0) {
auto& q = waiters[priority];
Waiter w;
// If this priority currently has no waiters
if (q.empty()) {
// Add this priority's launch limit to totalLimits
totalActiveLaunchLimits += launchLimits[priority];
// If there are slots available and the priority has capacity then don't make the caller wait
if (available > 0 && runnerCounts[priority] < currentCapacity(priority)) {
// Remove this priority's launch limit from the total since it will remain empty
totalActiveLaunchLimits -= launchLimits[priority];
// Return a Lock to the caller
Lock p;
addRunner(p, priority);
return p;
}
}
q.push_back(w);
++waiting;
pml_debug_printf("lock line %d priority %d %s\n", __LINE__, priority, toString().c_str());
return w.lockPromise.getFuture();
}
void kill() {
for (int i = 0; i < runners.size(); ++i) {
if (!runners[i].isReady()) {
runners[i].cancel();
}
}
runners.clear();
brokenOnDestruct.sendError(broken_promise());
waiting = 0;
waiters.clear();
}
std::string toString() const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].isReady()) {
++runnersDone;
}
}
std::string s =
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone);
for (int i = 0; i < waiters.size(); ++i) {
s += format("p%d: limit=%d run=%d wait=%d cap=%d ",
i,
launchLimits[i],
runnerCounts[i],
waiters[i].size(),
waiters[i].empty() ? 0 : currentCapacity(i));
}
s += "}";
return s;
}
int maxPriority() const { return launchLimits.size() - 1; }
int totalWaiters() const { return waiting; }
int numWaiters(const unsigned int priority) const {
ASSERT(priority < waiters.size());
return waiters[priority].size();
}
int totalRunners() const { return concurrency - available; }
int numRunners(const unsigned int priority) const {
ASSERT(priority < waiters.size());
return runnerCounts[priority];
}
private:
struct Waiter {
Waiter() : queuedTime(now()) {}
Promise<Lock> lockPromise;
double queuedTime;
};
// Total execution slots allowed across all priorities
int concurrency;
// Current available execution slots
int available;
// Total waiters across all priorities
int waiting;
// Sum of launch limits for all priorities with 1 or more waiters
int totalActiveLaunchLimits;
typedef Deque<Waiter> Queue;
// Launch limits by priority
std::vector<int> launchLimits;
// Queue of waiters by priority
std::vector<Queue> waiters;
// Count of active runners by priority
std::vector<int> runnerCounts;
// Current or recent (ended) runners
Deque<Future<Void>> runners;
Future<Void> fRunner;
AsyncTrigger wakeRunner;
Promise<Void> brokenOnDestruct;
void addRunner(Lock& lock, int priority) {
runnerCounts[priority] += 1;
--available;
runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) {
++available;
runnerCounts[priority] -= 1;
// If there are any waiters or if the runners array is getting large, trigger the runner loop
if (waiting > 0 || runners.size() > 1000) {
wakeRunner.trigger();
}
return Void();
}));
}
// Current maximum running tasks for the specified priority, which must have waiters
// or the result is undefined
int currentCapacity(int priority) const {
// The total concurrency allowed for this priority at present is the total concurrency times
// priority's launch limit divided by the total launch limits for all priorities with waiters.
return ceil((float)launchLimits[priority] / totalActiveLaunchLimits * concurrency);
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
// Priority to try to run tasks from next
state int priority = 0;
loop {
pml_debug_printf(
"runner loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
wait(self->wakeRunner.onTrigger());
pml_debug_printf(
"runner loop wake line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
if (++sinceYield == 100) {
sinceYield = 0;
wait(delay(0));
pml_debug_printf(
" runner afterDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
}
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
pml_debug_printf(
" launch loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
// Find the next priority with waiters and capacity. There must be at least one.
while (self->waiters[priority].empty() ||
self->runnerCounts[priority] >= self->currentCapacity(priority)) {
if (++priority == self->waiters.size()) {
priority = 0;
}
pml_debug_printf(" launch loop scan line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
}
Queue& queue = self->waiters[priority];
while (!queue.empty() && self->runnerCounts[priority] < self->currentCapacity(priority)) {
pml_debug_printf(
" launching line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
Waiter w = queue.front();
queue.pop_front();
// If this priority is now empty, subtract its launch limit from totalLimits
if (queue.empty()) {
self->totalActiveLaunchLimits -= self->launchLimits[priority];
pml_debug_printf(" emptied priority line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
}
--self->waiting;
Lock lock;
w.lockPromise.send(lock);
// Self may have been destructed during the lock callback
if (error.isReady()) {
throw error.getError();
}
// If the lock was not already released, add it to the runners future queue
if (lock.promise.canBeSet()) {
self->addRunner(lock, priority);
// A slot has been consumed, so stop reading from this queue if there aren't any more
if (self->available == 0) {
pml_debug_printf(" avail now 0 line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
break;
}
}
}
}
}
}
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -2192,213 +2192,6 @@ private:
static std::unordered_map<NetworkAddress, Reference<T>> instanceMap;
};
// A multi user lock with a concurrent holder limit where waiters are granted the lock according to
// an integer priority from 0 to maxPriority, inclusive, where higher integers are given priority.
//
// The interface is similar to FlowMutex except that lock holders can drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
};
PriorityMultiLock(int concurrency, int maxPriority, std::string launchLimit)
: concurrency(concurrency), available(concurrency), waiting(0) {
this->launchLimit = parseStringToVector<int>(launchLimit, ',');
ASSERT(this->launchLimit.size() == maxPriority + 1);
waiters.resize(maxPriority + 1);
workerCounts.resize(maxPriority + 1, 0);
fRunner = runner(this);
}
~PriorityMultiLock() {}
Future<Lock> lock(int priority = 0) {
// This shortcut may enable a waiter to jump the line when the releaser loop yields
if (available > 0) {
--available;
workerCounts[priority] += 1;
Lock p;
addRunner(p, priority);
return p;
}
Waiter w;
waiters[priority].push_back(w);
++waiting;
return w.lockPromise.getFuture();
}
void kill() {
for (int i = 0; i < runners.size(); ++i) {
if (!runners[i].isReady()) {
runners[i].cancel();
}
}
runners.clear();
brokenOnDestruct.sendError(broken_promise());
waiting = 0;
waiters.clear();
}
std::string toString() const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].isReady()) {
++runnersDone;
}
}
std::string s =
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone);
for (int i = 0; i < waiters.size(); ++i) {
s += format("p%d_waiters=%u ", i, waiters[i].size());
}
s += "}";
return s;
}
int maxPriority() const { return launchLimit.size() - 1; }
int totalWaiters() const { return waiting; }
int numWaiters(const unsigned int priority) const {
ASSERT(priority < waiters.size());
return waiters[priority].size();
}
int totalWorkers() const { return concurrency - available; }
int numWorkers(const unsigned int priority) const {
ASSERT(priority < waiters.size());
return workerCounts[priority];
}
private:
struct Waiter {
Waiter() : queuedTime(now()) {}
Promise<Lock> lockPromise;
double queuedTime;
};
int concurrency;
int available;
int waiting;
typedef Deque<Waiter> Queue;
std::vector<int> launchLimit;
std::vector<Queue> waiters;
std::vector<int> workerCounts;
Deque<Future<Void>> runners;
Future<Void> fRunner;
AsyncTrigger release;
Promise<Void> brokenOnDestruct;
void addRunner(Lock& lock, int priority) {
runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) {
++available;
workerCounts[priority] -= 1;
if (waiting > 0 || runners.size() > 100) {
release.trigger();
}
return Void();
}));
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
state int maxPriority = self->waiters.size() - 1;
// Priority to try to run tasks from next
state int priority = maxPriority;
state int ioLaunchLimit = self->launchLimit[priority];
state Queue* pQueue = &self->waiters[maxPriority];
// Track the number of waiters unlocked at the same priority in a row
state int lastPriorityCount = 0;
loop {
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
wait(self->release.onTrigger());
if (++sinceYield == 1000) {
sinceYield = 0;
wait(delay(0));
}
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
while (!pQueue->empty() && ++lastPriorityCount < ioLaunchLimit) {
Waiter w = pQueue->front();
pQueue->pop_front();
--self->waiting;
Lock lock;
w.lockPromise.send(lock);
// Self may have been destructed during the lock callback
if (error.isReady()) {
throw error.getError();
}
// If the lock was not already released, add it to the runners future queue
if (lock.promise.canBeSet()) {
self->workerCounts[priority] += 1;
self->addRunner(lock, priority);
// A slot has been consumed, so stop reading from this queue if there aren't any more
if (--self->available == 0) {
break;
}
}
}
// If there are no more slots available, then don't move to the next priority
if (self->available == 0) {
break;
}
// Decrease priority, wrapping around to max from 0
if (priority == 0) {
priority = maxPriority;
} else {
--priority;
}
ioLaunchLimit = self->launchLimit[priority];
pQueue = &self->waiters[priority];
lastPriorityCount = 0;
}
}
}
};
template <class T>
std::unordered_map<NetworkAddress, Reference<T>> FlowSingleton<T>::instanceMap;

View File

@ -0,0 +1,37 @@
/*
* BenchStream.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "benchmark/benchmark.h"
#include "flow/flow.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/PriorityMultiLock.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
return Void();
}
static void bench_priorityMultiLock(benchmark::State& benchState) {
onMainThread([&benchState]() { return benchPriorityMultiLock(&benchState); }).blockUntilReady();
}
BENCHMARK(bench_priorityMultiLock)->DenseRange(1, 8)->ReportAggregatesOnly(true);