switch to a separate queue for ordered tasks
This commit is contained in:
parent
c44035a27b
commit
06fe6917ab
|
@ -45,6 +45,7 @@
|
|||
#include "fdbrpc/Replication.h"
|
||||
#include "fdbrpc/ReplicationUtils.h"
|
||||
#include "fdbrpc/AsyncFileWriteChecker.h"
|
||||
#include "flow/Deque.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
bool simulator_should_inject_fault(const char* context, const char* file, int line, int error_code) {
|
||||
|
@ -859,19 +860,20 @@ public:
|
|||
seconds = std::max(0.0, seconds);
|
||||
Future<Void> f;
|
||||
|
||||
bool ordered = currentProcess->rebooting || machine != currentProcess || currentProcess->shutdownSignal.isSet();
|
||||
bool ordered = (seconds == 0.0) && (currentProcess->rebooting || machine != currentProcess ||
|
||||
currentProcess->shutdownSignal.isSet());
|
||||
if (!ordered && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 &&
|
||||
deterministicRandom()->random01() < 0.25) { // FIXME: why doesnt this work when we are changing machines?
|
||||
seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY * pow(deterministicRandom()->random01(), 1000.0);
|
||||
}
|
||||
|
||||
mutex.enter();
|
||||
tasks.push(Task(actualTime + seconds,
|
||||
taskID,
|
||||
ordered,
|
||||
ordered ? taskCount++ : (deterministicRandom()->randomUInt64() << 32) | taskCount++,
|
||||
machine,
|
||||
f));
|
||||
if (ordered) {
|
||||
orderedTasks.emplace_back(actualTime, taskID, taskCount++, machine, f);
|
||||
} else {
|
||||
tasks.push(Task(
|
||||
actualTime + seconds, taskID, (deterministicRandom()->randomUInt64() << 32) | taskCount++, machine, f));
|
||||
}
|
||||
mutex.leave();
|
||||
|
||||
return f;
|
||||
|
@ -1120,12 +1122,6 @@ public:
|
|||
self->tasks.pop();
|
||||
}
|
||||
std::sort(self->instantTasks.begin(), self->instantTasks.end(), [](const Task& a, const Task& b) {
|
||||
if (a.ordered || b.ordered) {
|
||||
if (!a.ordered || !b.ordered) {
|
||||
return a.ordered;
|
||||
}
|
||||
return a.stable < b.stable;
|
||||
}
|
||||
if (a.taskID != b.taskID) {
|
||||
return a.taskID > b.taskID;
|
||||
}
|
||||
|
@ -1133,6 +1129,13 @@ public:
|
|||
});
|
||||
self->mutex.leave();
|
||||
for (auto& t : self->instantTasks) {
|
||||
while (self->orderedTasks.size()) {
|
||||
Task o = std::move(self->orderedTasks.front());
|
||||
self->orderedTasks.pop_front();
|
||||
self->time = o.time;
|
||||
self->execTask(o);
|
||||
self->yielded = false;
|
||||
}
|
||||
self->execTask(t);
|
||||
self->yielded = false;
|
||||
}
|
||||
|
@ -2029,45 +2032,31 @@ public:
|
|||
// Implementation
|
||||
struct Task {
|
||||
TaskPriority taskID;
|
||||
bool ordered;
|
||||
double time;
|
||||
uint64_t stable;
|
||||
ProcessInfo* machine;
|
||||
Promise<Void> action;
|
||||
Task(double time,
|
||||
TaskPriority taskID,
|
||||
bool ordered,
|
||||
uint64_t stable,
|
||||
ProcessInfo* machine,
|
||||
Promise<Void>&& action)
|
||||
: time(time), taskID(taskID), ordered(ordered), stable(stable), machine(machine), action(std::move(action)) {}
|
||||
Task(double time,
|
||||
TaskPriority taskID,
|
||||
bool ordered,
|
||||
uint64_t stable,
|
||||
ProcessInfo* machine,
|
||||
Future<Void>& future)
|
||||
: time(time), taskID(taskID), ordered(ordered), stable(stable), machine(machine) {
|
||||
Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Promise<Void>&& action)
|
||||
: time(time), taskID(taskID), stable(stable), machine(machine), action(std::move(action)) {}
|
||||
Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Future<Void>& future)
|
||||
: time(time), taskID(taskID), stable(stable), machine(machine) {
|
||||
future = action.getFuture();
|
||||
}
|
||||
Task(Task&& rhs) noexcept
|
||||
: time(rhs.time), taskID(rhs.taskID), ordered(rhs.ordered), stable(rhs.stable), machine(rhs.machine),
|
||||
: time(rhs.time), taskID(rhs.taskID), stable(rhs.stable), machine(rhs.machine),
|
||||
action(std::move(rhs.action)) {}
|
||||
void operator=(Task const& rhs) {
|
||||
taskID = rhs.taskID;
|
||||
ordered = rhs.ordered;
|
||||
time = rhs.time;
|
||||
stable = rhs.stable;
|
||||
machine = rhs.machine;
|
||||
action = rhs.action;
|
||||
}
|
||||
Task(Task const& rhs)
|
||||
: taskID(rhs.taskID), ordered(rhs.ordered), time(rhs.time), stable(rhs.stable), machine(rhs.machine),
|
||||
action(rhs.action) {}
|
||||
: taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), action(rhs.action) {}
|
||||
void operator=(Task&& rhs) noexcept {
|
||||
time = rhs.time;
|
||||
taskID = rhs.taskID;
|
||||
ordered = rhs.ordered;
|
||||
stable = rhs.stable;
|
||||
machine = rhs.machine;
|
||||
action = std::move(rhs.action);
|
||||
|
@ -2118,7 +2107,7 @@ public:
|
|||
|
||||
mutex.enter();
|
||||
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
|
||||
tasks.push(Task(actualTime, taskID, true, taskCount++, getCurrentProcess(), std::move(signal)));
|
||||
orderedTasks.emplace_back(actualTime, taskID, taskCount++, getCurrentProcess(), std::move(signal));
|
||||
mutex.leave();
|
||||
}
|
||||
bool isOnMainThread() const override { return net2->isOnMainThread(); }
|
||||
|
@ -2150,6 +2139,7 @@ public:
|
|||
// tasks is guarded by ISimulator::mutex
|
||||
std::priority_queue<Task, std::vector<Task>> tasks;
|
||||
std::vector<Task> instantTasks;
|
||||
Deque<Task> orderedTasks;
|
||||
|
||||
std::vector<std::function<void()>> stopCallbacks;
|
||||
|
||||
|
|
Loading…
Reference in New Issue