Change runners from a Deque to an intrusive linked list of FastAllocated items so they can be removed as they complete.

This commit is contained in:
Steve Atherton 2022-11-10 19:34:43 -08:00
parent 199fb3be54
commit 3c7129b7a8
1 changed files with 58 additions and 47 deletions

View File

@ -79,6 +79,7 @@ public:
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
bool isLocked() const { return promise.canBeSet(); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
@ -88,7 +89,7 @@ public:
: PriorityMultiLock(concurrency, parseStringToVector<int>(weights, ',')) {}
PriorityMultiLock(int concurrency, std::vector<int> weightsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), totalPendingWeights(0), releaseDebugID(0) {
: concurrency(concurrency), available(concurrency), waiting(0), totalPendingWeights(0) {
priorities.resize(weightsByPriority.size());
for (int i = 0; i < priorities.size(); ++i) {
@ -138,33 +139,35 @@ public:
}
void kill() {
pml_debug_printf("kill %s\n", toString(false).c_str());
pml_debug_printf("kill %s\n", toString().c_str());
brokenOnDestruct.reset();
// handleRelease will not free up any execution slots when it ends via cancel
fRunner.cancel();
available = 0;
runners.clear();
// Cancel and clean up runners
auto r = runners.begin();
while (r != runners.end()) {
r->handler.cancel();
Runner* runner = &*r;
r = runners.erase(r);
delete runner;
}
waitingPriorities.clear();
priorities.clear();
}
std::string toString(bool leakCheck = true) const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].taskFuture.isReady()) {
++runnersDone;
}
}
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d "
"runnersDone=%d pendingWeights=%d ",
std::string toString() const {
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersList=%d "
"pendingWeights=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone,
totalPendingWeights);
for (auto& p : priorities) {
@ -173,9 +176,9 @@ public:
s += "}";
if (leakCheck && (concurrency - available != runners.size() - runnersDone)) {
if (concurrency - available != runners.size()) {
pml_debug_printf("%s\n", s.c_str());
ASSERT_EQ(concurrency - available, runners.size() - runnersDone);
ASSERT_EQ(concurrency - available, runners.size());
}
return s;
@ -205,12 +208,6 @@ private:
UserTag userTag;
};
struct Runner {
Runner(const UserTag& u, Future<Void> f) : userTag(u), taskFuture(f) {}
UserTag userTag;
Future<Void> taskFuture;
};
// Total execution slots allowed across all priorities
int concurrency;
// Current available execution slots
@ -251,48 +248,67 @@ private:
// does not have to iterage over the priorities vector checking priorities without waiters.
WaitingPrioritiesList waitingPriorities;
// Current or recent (ended) runners
Deque<Runner> runners;
struct Runner : boost::intrusive::list_base_hook<>, FastAllocated<Runner> {
Runner(Priority* p, const UserTag& u) : priority(p), userTag(u) {
#if PRIORITYMULTILOCK_DEBUG || !defined(NO_INTELLISENSE)
debugID = deterministicRandom()->randomUniqueID();
#endif
}
Future<Void> handler;
UserTag userTag;
Priority* priority;
#if PRIORITYMULTILOCK_DEBUG || !defined(NO_INTELLISENSE)
UID debugID;
#endif
};
// Current runners list. This is an intrusive list of FastAllocated items so that they can remove themselves
// efficiently as they complete. size() will be linear because it's only used in toString() for debugging
typedef boost::intrusive::list<Runner, boost::intrusive::constant_time_size<false>> RunnerList;
RunnerList runners;
Future<Void> fRunner;
AsyncTrigger wakeRunner;
Promise<Void> brokenOnDestruct;
// Used for debugging, can roll over without issue
unsigned int releaseDebugID;
ACTOR static Future<Void> handleRelease(PriorityMultiLock* self, Future<Void> f, Priority* priority) {
state [[maybe_unused]] unsigned int id = self->releaseDebugID++;
pml_debug_printf("%f handleRelease self=%p id=%u start \n", now(), self, id);
ACTOR static Future<Void> handleRelease(PriorityMultiLock* self, Runner* r, Future<Void> holder) {
pml_debug_printf("%f handleRelease self=%p id=%s start \n", now(), self, r->debugID.toString().c_str());
try {
wait(f);
pml_debug_printf("%f handleRelease self=%p id=%u success\n", now(), self, id);
wait(holder);
pml_debug_printf("%f handleRelease self=%p id=%s success\n", now(), self, r->debugID.toString().c_str());
} catch (Error& e) {
pml_debug_printf("%f handleRelease self=%p id=%u error %s\n", now(), self, id, e.what());
pml_debug_printf(
"%f handleRelease self=%p id=%s error %s\n", now(), self, r->debugID.toString().c_str(), e.what());
if (e.code() == error_code_actor_cancelled) {
// self is shutting down so no need to clean up r, this is done in kill()
throw;
}
}
pml_debug_printf(
"lock release priority %d %s\n", (int)(priority - &self->priorities.front()), self->toString().c_str());
pml_debug_printf("lock release priority %d %s\n", (int)(r->priority->priority), self->toString().c_str());
pml_debug_printf("%f handleRelease self=%p id=%u releasing\n", now(), self, id);
pml_debug_printf("%f handleRelease self=%p id=%s releasing\n", now(), self, r->debugID.toString().c_str());
++self->available;
priority->runners -= 1;
r->priority->runners -= 1;
// Remove r from runners list and delete it
self->runners.erase(RunnerList::s_iterator_to(*r));
delete r;
// If there are any waiters or if the runners array is getting large, trigger the runner loop
if (self->waiting > 0 || self->runners.size() > 1000) {
if (self->waiting > 0) {
self->wakeRunner.trigger();
}
return Void();
}
void addRunner(Lock& lock, UserTag userTag, Priority* p) {
p->runners += 1;
void addRunner(Lock& lock, UserTag userTag, Priority* priority) {
priority->runners += 1;
--available;
runners.emplace_back(userTag, handleRelease(this, lock.promise.getFuture(), p));
Runner* runner = new Runner(priority, userTag);
runners.push_back(*runner);
runner->handler = handleRelease(this, runner, lock.promise.getFuture());
}
// Current maximum running tasks for the specified priority, which must have waiters
@ -312,11 +328,6 @@ private:
loop {
pml_debug_printf("runner loop start priority=%d %s\n", p->priority, self->toString().c_str());
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().taskFuture.isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
pml_debug_printf("runner loop waitTrigger priority=%d %s\n", p->priority, self->toString().c_str());
wait(self->wakeRunner.onTrigger());