Extract TaskQueue out of Net2 and reuse it in sim2 (#8330)

* Extract TaskQueue out of Net2 and reuse it in sim2

* empty commit

* Address review comments

* Introduce MAX_RUNLOOP_SLEEP_DELAY

* Apply clang-format
This commit is contained in:
Marian Dvorsky 2022-10-10 21:46:06 +02:00 committed by GitHub
parent 4b238e3985
commit c6c449d047
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 256 additions and 206 deletions

View File

@ -311,14 +311,14 @@ public:
if (refreshAtTS == std::numeric_limits<int64_t>::max()) {
return false;
}
return now() >= refreshAtTS ? true : false;
return now() + INetwork::TIME_EPS >= refreshAtTS ? true : false;
}
inline bool isExpired() {
if (expireAtTS == std::numeric_limits<int64_t>::max()) {
return false;
}
return now() >= expireAtTS ? true : false;
return now() + INetwork::TIME_EPS >= expireAtTS ? true : false;
}
void reset();
@ -667,4 +667,4 @@ void computeAuthToken(const std::vector<std::pair<const uint8_t*, size_t>>& payl
EncryptAuthTokenMode getEncryptAuthTokenMode(const EncryptAuthTokenMode mode);
#endif // FDBCLIENT_BLOB_CIPHER_H
#endif // FDBCLIENT_BLOB_CIPHER_H

View File

@ -53,6 +53,7 @@
#include "fdbrpc/ReplicationUtils.h"
#include "fdbrpc/AsyncFileWriteChecker.h"
#include "flow/FaultInjection.h"
#include "flow/TaskQueue.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ISimulator* g_simulator = nullptr;
@ -908,20 +909,25 @@ public:
}
Future<class Void> delay(double seconds, TaskPriority taskID, ProcessInfo* machine, bool ordered = false) {
ASSERT(seconds >= -0.0001);
seconds = std::max(0.0, seconds);
Future<Void> f;
if (!ordered && !currentProcess->rebooting && machine == currentProcess &&
!currentProcess->shutdownSignal.isSet() && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 &&
deterministicRandom()->random01() < 0.25) { // FIXME: why doesn't this work when we are changing machines?
seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY * pow(deterministicRandom()->random01(), 1000.0);
if (seconds >= 4e12) // Intervals that overflow an int64_t in microseconds (more than 100,000 years) are treated
// as infinite
return Never();
PromiseTask* t = new PromiseTask(machine);
if (seconds <= TIME_EPS) {
taskQueue.addReady(taskID, t);
} else {
if (!ordered && !currentProcess->rebooting && machine == currentProcess &&
!currentProcess->shutdownSignal.isSet() && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 &&
deterministicRandom()->random01() < 0.25) {
// FIXME: why doesn't this work when we are changing machines?
seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY * pow(deterministicRandom()->random01(), 1000.0);
}
double at = now() + seconds;
taskQueue.addTimer(at, taskID, t);
}
mutex.enter();
tasks.push(Task(time + seconds, taskID, taskCount++, machine, f));
mutex.leave();
return f;
return t->promise.getFuture();
}
ACTOR static Future<Void> checkShutdown(Sim2* self, TaskPriority taskID) {
wait(success(self->getCurrentProcess()->shutdownSignal.getFuture()));
@ -1214,20 +1220,28 @@ public:
static void runLoop(Sim2* self) {
ISimulator::ProcessInfo* callingMachine = self->currentProcess;
while (!self->isStopped) {
self->mutex.enter();
if (self->tasks.size() == 0) {
self->mutex.leave();
ASSERT(false);
if (self->taskQueue.canSleep()) {
double sleepTime = self->taskQueue.getSleepTime(self->time);
self->time +=
sleepTime + FLOW_KNOBS->MAX_RUNLOOP_SLEEP_DELAY * pow(deterministicRandom()->random01(), 1000.0);
if (self->printSimTime) {
printf("Time: %d\n", (int)self->time);
}
self->timerTime = std::max(self->timerTime, self->time);
}
// if (!randLog/* && now() >= 32.0*/)
// randLog = fopen("randLog.txt", "wt");
Task t = std::move(self->tasks.top()); // Unfortunately still a copy under gcc where .top() returns const&
self->currentTaskID = t.taskID;
self->tasks.pop();
self->mutex.leave();
self->execTask(t);
self->yielded = false;
self->taskQueue.processReadyTimers(self->time);
self->taskQueue.processThreadReady();
while (self->taskQueue.hasReadyTask()) {
self->currentTaskID = self->taskQueue.getReadyTaskID();
PromiseTask* task = self->taskQueue.getReadyTask();
self->taskQueue.popReadyTask();
self->execTask(*task);
self->yielded = false;
}
}
self->currentProcess = callingMachine;
for (auto& fn : self->stopCallbacks) {
@ -2226,7 +2240,7 @@ public:
}
Sim2(bool printSimTime)
: time(0.0), timerTime(0.0), currentTaskID(TaskPriority::Zero), taskCount(0), yielded(false), yield_limit(0),
: time(0.0), timerTime(0.0), currentTaskID(TaskPriority::Zero), yielded(false), yield_limit(0),
printSimTime(printSimTime) {
// Not letting currentProcess be nullptr eliminates some annoying special cases
currentProcess =
@ -2247,61 +2261,20 @@ public:
}
// Implementation
struct Task {
TaskPriority taskID;
double time;
uint64_t stable;
struct PromiseTask final {
Promise<Void> promise;
ProcessInfo* machine;
Promise<Void> action;
Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Promise<Void>&& action)
: taskID(taskID), time(time), stable(stable), machine(machine), action(std::move(action)) {}
Task(double time, TaskPriority taskID, uint64_t stable, ProcessInfo* machine, Future<Void>& future)
: taskID(taskID), time(time), stable(stable), machine(machine) {
future = action.getFuture();
}
Task(Task&& rhs) noexcept
: taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine),
action(std::move(rhs.action)) {}
void operator=(Task const& rhs) {
taskID = rhs.taskID;
time = rhs.time;
stable = rhs.stable;
machine = rhs.machine;
action = rhs.action;
}
Task(Task const& rhs)
: taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), action(rhs.action) {}
void operator=(Task&& rhs) noexcept {
time = rhs.time;
taskID = rhs.taskID;
stable = rhs.stable;
machine = rhs.machine;
action = std::move(rhs.action);
}
bool operator<(Task const& rhs) const {
// Ordering is reversed for priority_queue
if (time != rhs.time)
return time > rhs.time;
return stable > rhs.stable;
}
explicit PromiseTask(ProcessInfo* machine) : machine(machine) {}
PromiseTask(ProcessInfo* machine, Promise<Void>&& promise) : machine(machine), promise(std::move(promise)) {}
};
void execTask(struct Task& t) {
void execTask(struct PromiseTask& t) {
if (t.machine->failed) {
t.action.send(Never());
t.promise.send(Never());
} else {
mutex.enter();
if (printSimTime && (int)this->time < (int)t.time) {
printf("Time: %d\n", (int)t.time);
}
this->time = t.time;
this->timerTime = std::max(this->timerTime, this->time);
mutex.leave();
this->currentProcess = t.machine;
try {
t.action.send(Void());
t.promise.send(Void());
ASSERT(this->currentProcess == t.machine);
} catch (Error& e) {
TraceEvent(SevError, "UnhandledSimulationEventError").errorUnsuppressed(e);
@ -2310,11 +2283,10 @@ public:
if (randLog)
fmt::print(randLog,
"T {0} {1} {2} {3}\n",
"T {0} {1} {2}\n",
this->time,
int(deterministicRandom()->peek() % 10000),
t.machine ? t.machine->name : "none",
t.stable);
t.machine ? t.machine->name : "none");
}
}
@ -2322,11 +2294,10 @@ public:
// This is presumably coming from either a "fake" thread pool thread, i.e. it is actually on this thread
// or a thread created with g_network->startThread
ASSERT(getCurrentProcess());
mutex.enter();
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
tasks.push(Task(time, taskID, taskCount++, getCurrentProcess(), std::move(signal)));
mutex.leave();
PromiseTask* p = new PromiseTask(getCurrentProcess(), std::move(signal));
taskQueue.addReadyThreadSafe(isOnMainThread(), taskID, p);
}
bool isOnMainThread() const override { return net2->isOnMainThread(); }
Future<Void> onProcess(ISimulator::ProcessInfo* process, TaskPriority taskID) override {
@ -2340,21 +2311,15 @@ public:
ProtocolVersion protocolVersion() const override { return getCurrentProcess()->protocolVersion; }
// time is guarded by ISimulator::mutex. It is not necessary to guard reads on the main thread because
// time should only be modified from the main thread.
double time;
double timerTime;
TaskPriority currentTaskID;
// taskCount is guarded by ISimulator::mutex
uint64_t taskCount;
std::map<Optional<Standalone<StringRef>>, MachineInfo> machines;
std::map<NetworkAddress, ProcessInfo*> addressMap;
std::map<ProcessInfo*, Promise<Void>> filesDeadMap;
// tasks is guarded by ISimulator::mutex
std::priority_queue<Task, std::vector<Task>> tasks;
TaskQueue<PromiseTask> taskQueue;
std::vector<std::function<void()>> stopCallbacks;

View File

@ -222,6 +222,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( SLOW_NETWORK_LATENCY, 100e-3 );
init( MAX_CLOGGING_LATENCY, 0 ); if( randomize && BUGGIFY ) MAX_CLOGGING_LATENCY = 0.1 * deterministicRandom()->random01();
init( MAX_BUGGIFIED_DELAY, 0 ); if( randomize && BUGGIFY ) MAX_BUGGIFIED_DELAY = 0.2 * deterministicRandom()->random01();
init( MAX_RUNLOOP_SLEEP_DELAY, 0 );
init( SIM_CONNECT_ERROR_MODE, deterministicRandom()->randomInt(0,3) );
//Tracefiles

View File

@ -42,7 +42,7 @@
#include "flow/IAsyncFile.h"
#include "flow/ActorCollection.h"
#include "flow/ThreadSafeQueue.h"
#include "flow/TaskQueue.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/TDMetric.actor.h"
#include "flow/AsioReactor.h"
@ -115,27 +115,6 @@ class Connection;
// Outlives main
Net2* g_net2 = nullptr;
class Task {
public:
virtual void operator()() = 0;
};
struct OrderedTask {
int64_t priority;
TaskPriority taskID;
Task* task;
OrderedTask(int64_t priority, TaskPriority taskID, Task* task) : priority(priority), taskID(taskID), task(task) {}
bool operator<(OrderedTask const& rhs) const { return priority < rhs.priority; }
};
template <class T>
class ReadyQueue : public std::priority_queue<T, std::vector<T>> {
public:
typedef typename std::priority_queue<T, std::vector<T>>::size_type size_type;
ReadyQueue(size_type capacity = 0) { reserve(capacity); };
void reserve(size_type capacity) { this->c.reserve(capacity); }
};
thread_local INetwork* thread_network = 0;
class Net2 final : public INetwork, public INetworkConnections {
@ -252,7 +231,6 @@ public:
int64_t tscBegin, tscEnd;
double taskBegin;
TaskPriority currentTaskID;
uint64_t tasksIssued;
TDMetricCollection tdmetrics;
ChaosMetrics chaosMetrics;
// we read now() from a different thread. On Intel, reading a double is atomic anyways, but on other platforms it's
@ -272,20 +250,21 @@ public:
NetworkMetrics::PriorityStats* lastPriorityStats;
ReadyQueue<OrderedTask> ready;
ThreadSafeQueue<OrderedTask> threadReady;
struct PromiseTask final : public FastAllocated<PromiseTask> {
Promise<Void> promise;
PromiseTask() {}
explicit PromiseTask(Promise<Void>&& promise) noexcept : promise(std::move(promise)) {}
struct DelayedTask : OrderedTask {
double at;
DelayedTask(double at, int64_t priority, TaskPriority taskID, Task* task)
: OrderedTask(priority, taskID, task), at(at) {}
bool operator<(DelayedTask const& rhs) const { return at > rhs.at; } // Ordering is reversed for priority_queue
void operator()() {
promise.send(Void());
delete this;
}
};
std::priority_queue<DelayedTask, std::vector<DelayedTask>> timers;
TaskQueue<PromiseTask> taskQueue;
void checkForSlowTask(int64_t tscBegin, int64_t tscEnd, double duration, TaskPriority priority);
bool check_yield(TaskPriority taskId, int64_t tscNow);
void processThreadReady();
void trackAtPriority(TaskPriority priority, double now);
void stopImmediately() {
#ifdef ADDRESS_SANITIZER
@ -293,10 +272,7 @@ public:
__lsan_do_leak_check();
#endif
stopped = true;
decltype(ready) _1;
ready.swap(_1);
decltype(timers) _2;
timers.swap(_2);
taskQueue.clear();
}
Future<Void> timeOffsetLogger;
@ -312,9 +288,6 @@ public:
Int64MetricHandle countWrites;
Int64MetricHandle countUDPWrites;
Int64MetricHandle countRunLoop;
Int64MetricHandle countCantSleep;
Int64MetricHandle countWontSleep;
Int64MetricHandle countTimers;
Int64MetricHandle countTasks;
Int64MetricHandle countYields;
Int64MetricHandle countYieldBigStack;
@ -1226,17 +1199,6 @@ private:
}
};
struct PromiseTask final : public Task, public FastAllocated<PromiseTask> {
Promise<Void> promise;
PromiseTask() {}
explicit PromiseTask(Promise<Void>&& promise) noexcept : promise(std::move(promise)) {}
void operator()() override {
promise.send(Void());
delete this;
}
};
// 5MB for loading files into memory
Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
@ -1245,8 +1207,8 @@ Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
boost::asio::ssl::context(boost::asio::ssl::context::tls)) }),
sslHandshakerThreadsStarted(0), sslPoolHandshakesInProgress(0), tlsConfig(tlsConfig),
tlsInitializedState(ETLSInitState::NONE), network(this), tscBegin(0), tscEnd(0), taskBegin(0),
currentTaskID(TaskPriority::DefaultYield), tasksIssued(0), stopped(false), started(false), numYields(0),
lastPriorityStats(nullptr), ready(FLOW_KNOBS->READY_QUEUE_RESERVED_SIZE) {
currentTaskID(TaskPriority::DefaultYield), stopped(false), started(false), numYields(0),
lastPriorityStats(nullptr) {
// Until run() is called, yield() will always yield
TraceEvent("Net2Starting").log();
@ -1400,9 +1362,6 @@ void Net2::initMetrics() {
countWouldBlock.init("Net2.CountWouldBlock"_sr);
countWrites.init("Net2.CountWrites"_sr);
countRunLoop.init("Net2.CountRunLoop"_sr);
countCantSleep.init("Net2.CountCantSleep"_sr);
countWontSleep.init("Net2.CountWontSleep"_sr);
countTimers.init("Net2.CountTimers"_sr);
countTasks.init("Net2.CountTasks"_sr);
countYields.init("Net2.CountYields"_sr);
countYieldBigStack.init("Net2.CountYieldBigStack"_sr);
@ -1416,6 +1375,7 @@ void Net2::initMetrics() {
slowTaskMetric.init("Net2.SlowTask"_sr);
countLaunchTime.init("Net2.CountLaunchTime"_sr);
countReactTime.init("Net2.CountReactTime"_sr);
taskQueue.initMetrics();
}
bool Net2::checkRunnable() {
@ -1473,19 +1433,10 @@ void Net2::run() {
}
double sleepTime = 0;
bool b = ready.empty();
if (b) {
b = threadReady.canSleep();
if (!b)
++countCantSleep;
} else
++countWontSleep;
if (b) {
if (taskQueue.canSleep()) {
sleepTime = 1e99;
double sleepStart = timer_monotonic();
if (!timers.empty()) {
sleepTime = timers.top().at - sleepStart; // + 500e-6?
}
sleepTime = taskQueue.getSleepTime(sleepStart);
if (sleepTime > 0) {
#if defined(__linux__)
// notify the run loop monitoring thread that we have gone idle
@ -1517,33 +1468,24 @@ void Net2::run() {
nondeterministicRandom()->random01() < (now - nnow) * FLOW_KNOBS->SLOW_LOOP_SAMPLING_RATE)
TraceEvent("SomewhatSlowRunLoopTop").detail("Elapsed", now - nnow);
int numTimers = 0;
while (!timers.empty() && timers.top().at < now) {
++numTimers;
++countTimers;
ready.push(timers.top());
timers.pop();
}
// FIXME: Is this double counting?
countTimers += numTimers;
FDB_TRACE_PROBE(run_loop_ready_timers, numTimers);
taskQueue.processReadyTimers(now);
processThreadReady();
taskQueue.processThreadReady();
tscBegin = timestampCounter();
tscEnd = tscBegin + FLOW_KNOBS->TSC_YIELD_TIME;
taskBegin = timer_monotonic();
numYields = 0;
TaskPriority minTaskID = TaskPriority::Max;
[[maybe_unused]] int queueSize = ready.size();
[[maybe_unused]] int queueSize = taskQueue.getNumReadyTasks();
FDB_TRACE_PROBE(run_loop_tasks_start, queueSize);
while (!ready.empty()) {
while (taskQueue.hasReadyTask()) {
++countTasks;
currentTaskID = ready.top().taskID;
currentTaskID = taskQueue.getReadyTaskID();
priorityMetric = static_cast<int64_t>(currentTaskID);
Task* task = ready.top().task;
ready.pop();
PromiseTask* task = taskQueue.getReadyTask();
taskQueue.popReadyTask();
try {
++tasksSinceReact;
@ -1584,7 +1526,7 @@ void Net2::run() {
trackAtPriority(TaskPriority::RunLoop, taskBegin);
queueSize = ready.size();
queueSize = taskQueue.getNumReadyTasks();
FDB_TRACE_PROBE(run_loop_done, queueSize);
#if defined(__linux__)
@ -1699,20 +1641,6 @@ void Net2::trackAtPriority(TaskPriority priority, double now) {
}
}
void Net2::processThreadReady() {
int numReady = 0;
while (true) {
Optional<OrderedTask> t = threadReady.pop();
if (!t.present())
break;
t.get().priority -= ++tasksIssued;
ASSERT(t.get().task != 0);
ready.push(t.get());
++numReady;
}
FDB_TRACE_PROBE(run_loop_thread_ready, numReady);
}
void Net2::checkForSlowTask(int64_t tscBegin, int64_t tscEnd, double duration, TaskPriority priority) {
int64_t elapsed = tscEnd - tscBegin;
if (elapsed > FLOW_KNOBS->TSC_YIELD_TIME && tscBegin > 0) {
@ -1752,11 +1680,11 @@ bool Net2::check_yield(TaskPriority taskID, int64_t tscNow) {
return true;
}
processThreadReady();
taskQueue.processThreadReady();
if (taskID == TaskPriority::DefaultYield)
taskID = currentTaskID;
if (!ready.empty() && ready.top().priority > int64_t(taskID) << 32) {
if (taskQueue.hasReadyTask() && taskQueue.getReadyTaskPriority() > int64_t(taskID) << 32) {
return true;
}
@ -1794,18 +1722,17 @@ Future<class Void> Net2::yield(TaskPriority taskID) {
}
Future<Void> Net2::delay(double seconds, TaskPriority taskId) {
if (seconds <= 0.) {
PromiseTask* t = new PromiseTask;
this->ready.push(OrderedTask((int64_t(taskId) << 32) - (++tasksIssued), taskId, t));
return t->promise.getFuture();
}
if (seconds >=
4e12) // Intervals that overflow an int64_t in microseconds (more than 100,000 years) are treated as infinite
if (seconds >= 4e12) // Intervals that overflow an int64_t in microseconds (more than 100,000 years) are treated
// as infinite
return Never();
double at = now() + seconds;
PromiseTask* t = new PromiseTask;
this->timers.push(DelayedTask(at, (int64_t(taskId) << 32) - (++tasksIssued), taskId, t));
if (seconds <= 0.) {
taskQueue.addReady(taskId, t);
} else {
double at = now() + seconds;
taskQueue.addTimer(at, taskId, t);
}
return t->promise.getFuture();
}
@ -1818,14 +1745,8 @@ void Net2::onMainThread(Promise<Void>&& signal, TaskPriority taskID) {
if (stopped)
return;
PromiseTask* p = new PromiseTask(std::move(signal));
int64_t priority = int64_t(taskID) << 32;
if (thread_network == this) {
processThreadReady();
this->ready.push(OrderedTask(priority - (++tasksIssued), taskID, p));
} else {
if (threadReady.push(OrderedTask(priority, taskID, p)))
reactor.wake();
if (taskQueue.addReadyThreadSafe(isOnMainThread(), taskID, p)) {
reactor.wake();
}
}
@ -2246,8 +2167,6 @@ TEST_CASE("flow/Net2/ThreadSafeQueue/Threaded") {
return Void();
}
// NB: This could be a test for any INetwork implementation, but Sim2 doesn't
// satisfy this requirement yet.
TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") {
// Verifies that signals processed by onMainThread() are executed in order.
noUnseed = true; // multi-threading inherently non-deterministic

View File

@ -287,6 +287,7 @@ public:
double SLOW_NETWORK_LATENCY;
double MAX_CLOGGING_LATENCY;
double MAX_BUGGIFIED_DELAY;
double MAX_RUNLOOP_SLEEP_DELAY;
int SIM_CONNECT_ERROR_MODE;
double SIM_SPEEDUP_AFTER_SECONDS;
int MAX_TRACE_LINES;

View File

@ -0,0 +1,161 @@
/*
* TaskQueue.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_TASK_QUEUE_H
#define FLOW_TASK_QUEUE_H
#pragma once
#include <queue>
#include <vector>
#include "flow/TDMetric.actor.h"
#include "flow/network.h"
#include "flow/ThreadSafeQueue.h"
template <typename Task>
// A queue of ordered tasks, both ready to execute, and delayed for later execution.
// All functions must be called on the main thread, except for addReadyThreadSafe() which can be called from any thread.
class TaskQueue {
public:
TaskQueue() : tasksIssued(0), ready(FLOW_KNOBS->READY_QUEUE_RESERVED_SIZE) {}
// Add a task that is ready to be executed.
void addReady(TaskPriority taskId, Task* t) { this->ready.push(OrderedTask(getFIFOPriority(taskId), taskId, t)); }
// Add a task to be executed at a given future time instant (a "timer").
void addTimer(double at, TaskPriority taskId, Task* t) {
this->timers.push(DelayedTask(at, getFIFOPriority(taskId), taskId, t));
}
// Add a task that is ready to be executed, potentially called from a thread that is different from main.
// Returns true iff the main thread need to be woken up to execute this task.
bool addReadyThreadSafe(bool isMainThread, TaskPriority taskID, Task* t) {
if (isMainThread) {
processThreadReady();
addReady(taskID, t);
} else {
if (threadReady.push(std::make_pair(taskID, t)))
return true;
}
return false;
}
// Returns true if the there are no tasks that are ready to be executed.
bool canSleep() {
bool b = ready.empty();
if (b) {
b = threadReady.canSleep();
if (!b)
++countCantSleep;
} else
++countWontSleep;
return b;
}
// Returns a time interval a caller should sleep from now until the next timer.
double getSleepTime(double now) {
if (!timers.empty()) {
return timers.top().at - now;
}
return 0;
}
// Moves all timers that are scheduled to be executed at or before now to the ready queue.
void processReadyTimers(double now) {
int numTimers = 0;
while (!timers.empty() && timers.top().at <= now + INetwork::TIME_EPS) {
++numTimers;
++countTimers;
ready.push(timers.top());
timers.pop();
}
FDB_TRACE_PROBE(run_loop_ready_timers, numTimers);
}
// Moves all tasks scheduled from a different thread to the ready queue.
void processThreadReady() {
int numReady = 0;
while (true) {
Optional<std::pair<TaskPriority, Task*>> t = threadReady.pop();
if (!t.present())
break;
ASSERT(t.get().second != nullptr);
addReady(t.get().first, t.get().second);
++numReady;
}
FDB_TRACE_PROBE(run_loop_thread_ready, numReady);
}
bool hasReadyTask() { return !ready.empty(); }
size_t getNumReadyTasks() { return ready.size(); }
TaskPriority getReadyTaskID() { return ready.top().taskID; }
int64_t getReadyTaskPriority() { return ready.top().priority; }
Task* getReadyTask() { return ready.top().task; }
void popReadyTask() { ready.pop(); }
void initMetrics() {
countTimers.init("Net2.CountTimers"_sr);
countCantSleep.init("Net2.CountCantSleep"_sr);
countWontSleep.init("Net2.CountWontSleep"_sr);
}
void clear() {
decltype(ready) _1;
ready.swap(_1);
decltype(timers) _2;
timers.swap(_2);
}
private:
struct OrderedTask {
int64_t priority;
TaskPriority taskID;
Task* task;
OrderedTask(int64_t priority, TaskPriority taskID, Task* task)
: priority(priority), taskID(taskID), task(task) {}
bool operator<(OrderedTask const& rhs) const { return priority < rhs.priority; }
};
struct DelayedTask : OrderedTask {
double at;
DelayedTask(double at, int64_t priority, TaskPriority taskID, Task* task)
: OrderedTask(priority, taskID, task), at(at) {}
bool operator<(DelayedTask const& rhs) const { return at > rhs.at; } // Ordering is reversed for priority_queue
};
template <class T>
class ReadyQueue : public std::priority_queue<T, std::vector<T>> {
public:
typedef typename std::priority_queue<T, std::vector<T>>::size_type size_type;
ReadyQueue(size_type capacity = 0) { reserve(capacity); };
void reserve(size_type capacity) { this->c.reserve(capacity); }
};
// Returns a unique priority value for a task which preserves FIFO ordering
// for tasks with the same priority.
int64_t getFIFOPriority(TaskPriority taskId) { return (int64_t(taskId) << 32) - (++tasksIssued); }
uint64_t tasksIssued;
ReadyQueue<OrderedTask> ready;
ThreadSafeQueue<std::pair<TaskPriority, Task*>> threadReady;
std::priority_queue<DelayedTask, std::vector<DelayedTask>> timers;
Int64MetricHandle countTimers;
Int64MetricHandle countCantSleep;
Int64MetricHandle countWontSleep;
};
#endif /* FLOW_TASK_QUEUE_H */

View File

@ -507,6 +507,9 @@ public:
// on. Note that there are tools for disk access, scheduling, etc as well as networking, and that almost all access
// to the network should be through FlowTransport, not directly through these low level interfaces!
// Time instants (e.g. from now()) within TIME_EPS are considered to be equal.
static constexpr double TIME_EPS = 1e-7; // 100ns
enum enumGlobal {
enFailureMonitor = 0,
enFlowTransport = 1,