ordered tasks in a batch are executed first and in their creation order

This commit is contained in:
Evan Tschannen 2021-03-17 17:07:25 -07:00
parent 5af2962d04
commit c44035a27b
1 changed files with 30 additions and 10 deletions

View File

@ -859,8 +859,7 @@ public:
seconds = std::max(0.0, seconds);
Future<Void> f;
bool ordered = currentProcess->rebooting || machine != currentProcess || !machine->machine ||
currentProcess->shutdownSignal.isSet();
bool ordered = currentProcess->rebooting || machine != currentProcess || currentProcess->shutdownSignal.isSet();
if (!ordered && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 &&
deterministicRandom()->random01() < 0.25) { // FIXME: why doesnt this work when we are changing machines?
seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY * pow(deterministicRandom()->random01(), 1000.0);
@ -868,7 +867,8 @@ public:
mutex.enter();
tasks.push(Task(actualTime + seconds,
ordered ? TaskPriority::Max : taskID,
taskID,
ordered,
ordered ? taskCount++ : (deterministicRandom()->randomUInt64() << 32) | taskCount++,
machine,
f));
@ -1120,6 +1120,12 @@ public:
self->tasks.pop();
}
std::sort(self->instantTasks.begin(), self->instantTasks.end(), [](const Task& a, const Task& b) {
if (a.ordered || b.ordered) {
if (!a.ordered || !b.ordered) {
return a.ordered;
}
return a.stable < b.stable;
}
if (a.taskID != b.taskID) {
return a.taskID > b.taskID;
}
@ -2023,31 +2029,45 @@ public:
// Implementation
struct Task {
TaskPriority taskID;
bool ordered;
double time;
uint64_t stable;
ProcessInfo* machine;
Promise<Void> action;
Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Promise<Void>&& action)
: time(time), taskID(taskID), stable(stable), machine(machine), action(std::move(action)) {}
Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Future<Void>& future)
: time(time), taskID(taskID), stable(stable), machine(machine) {
Task(double time,
TaskPriority taskID,
bool ordered,
uint64_t stable,
ProcessInfo* machine,
Promise<Void>&& action)
: time(time), taskID(taskID), ordered(ordered), stable(stable), machine(machine), action(std::move(action)) {}
Task(double time,
TaskPriority taskID,
bool ordered,
uint64_t stable,
ProcessInfo* machine,
Future<Void>& future)
: time(time), taskID(taskID), ordered(ordered), stable(stable), machine(machine) {
future = action.getFuture();
}
Task(Task&& rhs) noexcept
: time(rhs.time), taskID(rhs.taskID), stable(rhs.stable), machine(rhs.machine),
: time(rhs.time), taskID(rhs.taskID), ordered(rhs.ordered), stable(rhs.stable), machine(rhs.machine),
action(std::move(rhs.action)) {}
void operator=(Task const& rhs) {
taskID = rhs.taskID;
ordered = rhs.ordered;
time = rhs.time;
stable = rhs.stable;
machine = rhs.machine;
action = rhs.action;
}
Task(Task const& rhs)
: taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), action(rhs.action) {}
: taskID(rhs.taskID), ordered(rhs.ordered), time(rhs.time), stable(rhs.stable), machine(rhs.machine),
action(rhs.action) {}
void operator=(Task&& rhs) noexcept {
time = rhs.time;
taskID = rhs.taskID;
ordered = rhs.ordered;
stable = rhs.stable;
machine = rhs.machine;
action = std::move(rhs.action);
@ -2098,7 +2118,7 @@ public:
mutex.enter();
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
tasks.push(Task(actualTime, taskID, taskCount++, getCurrentProcess(), std::move(signal)));
tasks.push(Task(actualTime, taskID, true, taskCount++, getCurrentProcess(), std::move(signal)));
mutex.leave();
}
bool isOnMainThread() const override { return net2->isOnMainThread(); }