ordered tasks are executed at the highest priority instead of disabling batching

This commit is contained in:
Evan Tschannen 2021-03-17 16:44:49 -07:00
parent bf4fcbdb5e
commit 5af2962d04
1 changed files with 20 additions and 37 deletions

View File

@ -859,7 +859,8 @@ public:
seconds = std::max(0.0, seconds); seconds = std::max(0.0, seconds);
Future<Void> f; Future<Void> f;
bool ordered = currentProcess->rebooting || machine != currentProcess || currentProcess->shutdownSignal.isSet(); bool ordered = currentProcess->rebooting || machine != currentProcess || !machine->machine ||
currentProcess->shutdownSignal.isSet();
if (!ordered && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 && if (!ordered && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 &&
deterministicRandom()->random01() < 0.25) { // FIXME: why doesnt this work when we are changing machines? deterministicRandom()->random01() < 0.25) { // FIXME: why doesnt this work when we are changing machines?
seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY * pow(deterministicRandom()->random01(), 1000.0); seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY * pow(deterministicRandom()->random01(), 1000.0);
@ -867,9 +868,8 @@ public:
mutex.enter(); mutex.enter();
tasks.push(Task(actualTime + seconds, tasks.push(Task(actualTime + seconds,
taskID, ordered ? TaskPriority::Max : taskID,
ordered ? taskCount++ : (deterministicRandom()->randomUInt64() << 32) | taskCount++, ordered ? taskCount++ : (deterministicRandom()->randomUInt64() << 32) | taskCount++,
ordered,
machine, machine,
f)); f));
mutex.leave(); mutex.leave();
@ -1113,11 +1113,9 @@ public:
self->instantTasks.push_back( self->instantTasks.push_back(
std::move(self->tasks.top())); // Unfortunately still a copy under gcc where .top() returns const& std::move(self->tasks.top())); // Unfortunately still a copy under gcc where .top() returns const&
self->time = self->tasks.top().time; self->time = self->tasks.top().time;
bool ordered = self->tasks.top().ordered;
self->tasks.pop();
if (!ordered) {
double batchTime = deterministicRandom()->random01() * FLOW_KNOBS->MAX_RUNLOOP_TIME_BATCHING; double batchTime = deterministicRandom()->random01() * FLOW_KNOBS->MAX_RUNLOOP_TIME_BATCHING;
while (self->tasks.top().time - self->time < batchTime && !self->tasks.top().ordered) { self->tasks.pop();
while (self->tasks.top().time - self->time < batchTime) {
self->instantTasks.push_back(std::move(self->tasks.top())); self->instantTasks.push_back(std::move(self->tasks.top()));
self->tasks.pop(); self->tasks.pop();
} }
@ -1127,7 +1125,6 @@ public:
} }
return a.stable < b.stable; return a.stable < b.stable;
}); });
}
self->mutex.leave(); self->mutex.leave();
for (auto& t : self->instantTasks) { for (auto& t : self->instantTasks) {
self->execTask(t); self->execTask(t);
@ -2028,45 +2025,31 @@ public:
TaskPriority taskID; TaskPriority taskID;
double time; double time;
uint64_t stable; uint64_t stable;
bool ordered;
ProcessInfo* machine; ProcessInfo* machine;
Promise<Void> action; Promise<Void> action;
Task(double time, Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Promise<Void>&& action)
TaskPriority taskID, : time(time), taskID(taskID), stable(stable), machine(machine), action(std::move(action)) {}
uint64_t stable, Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Future<Void>& future)
bool ordered, : time(time), taskID(taskID), stable(stable), machine(machine) {
ProcessInfo* machine,
Promise<Void>&& action)
: time(time), taskID(taskID), stable(stable), machine(machine), ordered(ordered), action(std::move(action)) {}
Task(double time,
TaskPriority taskID,
uint64_t stable,
bool ordered,
ProcessInfo* machine,
Future<Void>& future)
: time(time), taskID(taskID), stable(stable), machine(machine), ordered(ordered) {
future = action.getFuture(); future = action.getFuture();
} }
Task(Task&& rhs) noexcept Task(Task&& rhs) noexcept
: time(rhs.time), taskID(rhs.taskID), stable(rhs.stable), machine(rhs.machine), ordered(rhs.ordered), : time(rhs.time), taskID(rhs.taskID), stable(rhs.stable), machine(rhs.machine),
action(std::move(rhs.action)) {} action(std::move(rhs.action)) {}
void operator=(Task const& rhs) { void operator=(Task const& rhs) {
taskID = rhs.taskID; taskID = rhs.taskID;
time = rhs.time; time = rhs.time;
stable = rhs.stable; stable = rhs.stable;
machine = rhs.machine; machine = rhs.machine;
ordered = rhs.ordered;
action = rhs.action; action = rhs.action;
} }
Task(Task const& rhs) Task(Task const& rhs)
: taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), ordered(rhs.ordered), : taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), action(rhs.action) {}
action(rhs.action) {}
void operator=(Task&& rhs) noexcept { void operator=(Task&& rhs) noexcept {
time = rhs.time; time = rhs.time;
taskID = rhs.taskID; taskID = rhs.taskID;
stable = rhs.stable; stable = rhs.stable;
machine = rhs.machine; machine = rhs.machine;
ordered = rhs.ordered;
action = std::move(rhs.action); action = std::move(rhs.action);
} }
@ -2115,7 +2098,7 @@ public:
mutex.enter(); mutex.enter();
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max); ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
tasks.push(Task(actualTime, taskID, taskCount++, true, getCurrentProcess(), std::move(signal))); tasks.push(Task(actualTime, taskID, taskCount++, getCurrentProcess(), std::move(signal)));
mutex.leave(); mutex.leave();
} }
bool isOnMainThread() const override { return net2->isOnMainThread(); } bool isOnMainThread() const override { return net2->isOnMainThread(); }