Optimize hot paths and high churn structures in PML. Add second benchmark argument for inactive priorities to measure their cost impact.
This commit is contained in:
@ -68,8 +68,9 @@
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
typedef int64_t UserTag;
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
@ -98,10 +99,9 @@ public:
Future<Lock> lock(int priority = 0,
TaskPriority flowDelayPriority = TaskPriority::DefaultEndpoint,
int64_t userTag = 0) {
UserTag userTag = 0) {
Priority& p = priorities[priority];
Queue& q = p.queue;
Waiter w({ flowDelayPriority, userTag });
// If this priority currently has no waiters
if (q.empty()) {
@ -115,14 +115,14 @@ public:
// Return a Lock to the caller
Lock lock;
addRunner(lock, w.task, &p);
addRunner(lock, userTag, &p);
pml_debug_printf("lock nowait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
return lock;
Waiter& w = q.emplace_back(flowDelayPriority, userTag);
pml_debug_printf("lock wait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
@ -188,19 +188,16 @@ public:
struct Task {
TaskPriority flowDelayPriority;
int64_t userTag;
struct Waiter {
Waiter(Task task) : task(task) {}
Waiter(const TaskPriority& t, const UserTag& u) : flowDelayPriority(t), userTag(u) {}
Promise<Lock> lockPromise;
Task task;
TaskPriority flowDelayPriority;
UserTag userTag;
struct Runner {
Task task;
Runner(const UserTag& u, Future<Void> f) : userTag(u), taskFuture(f) {}
UserTag userTag;
Future<Void> taskFuture;
@ -276,10 +273,10 @@ private:
return Void();
void addRunner(Lock& lock, const Task& task, Priority* p) {
void addRunner(Lock& lock, UserTag userTag, Priority* p) {
p->runners += 1;
runners.push_back({ task, handleRelease(this, lock.promise.getFuture(), p) });
runners.emplace_back(userTag, handleRelease(this, lock.promise.getFuture(), p));
// Current maximum running tasks for the specified priority, which must have waiters
@ -365,7 +362,7 @@ private:
// If the lock was not already released, add it to the runners future queue
if (lock.promise.canBeSet()) {
self->addRunner(lock, w.task, pPriority);
self->addRunner(lock, w.userTag, pPriority);
pml_debug_printf(" launched line %d alreadyDone=%d priority=%d %s\n",
@ -377,7 +374,7 @@ private:
// If the task returned the lock immediately and did not wait, then delay to let the Flow run loop
// schedule other tasks if needed
if (!lock.promise.canBeSet()) {
wait(delay(0, w.task.flowDelayPriority));
wait(delay(0, w.flowDelayPriority));
@ -25,26 +25,28 @@
#include "flow/PriorityMultiLock.actor.h"
#include <deque>
#include "flow/actorcompiler.h" // This must be the last #include.
#include "fmt/printf.h"
ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
state std::vector<int> priorities;
// Arg1 is the number of active priorities to use
// Arg2 is the number of inactive priorities to use
state int active = benchState->range(0);
state int inactive = benchState->range(1);
// Set up priority list with limits 10, 20, 30, ...
while (priorities.size() < benchState->range(0)) {
state std::vector<int> priorities;
while (priorities.size() < active + inactive) {
priorities.push_back(10 * (priorities.size() + 1));
state int concurrency = priorities.size() * 10;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks
// Clog the lock buy taking n=concurrency locks
state std::deque<Future<PriorityMultiLock::Lock>> lockFutures;
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(j % priorities.size()));
lockFutures.push_back(pml->lock(j % active));
// Wait for all of the initial locks to be taken
// This will work regardless of their priorities as there are only n = concurrency of them
wait(waitForAll(std::vector<Future<PriorityMultiLock::Lock>>(lockFutures.begin(), lockFutures.end())));
@ -64,7 +66,7 @@ ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
PriorityMultiLock::Lock lock = wait(f);
// Rotate to another priority
if (++p == priorities.size()) {
if (++p == active) {
p = 0;
@ -84,4 +86,4 @@ static void bench_priorityMultiLock(benchmark::State& benchState) {
onMainThread([&benchState]() { return benchPriorityMultiLock(&benchState); }).blockUntilReady();
BENCHMARK(bench_priorityMultiLock)->DenseRange(1, 8)->ReportAggregatesOnly(true);
BENCHMARK(bench_priorityMultiLock)->Args({ 5, 0 })->Ranges({ { 1, 64 }, { 0, 128 } })->ReportAggregatesOnly(true);
Reference in New Issue