foundationdb/fdbserver/CoroFlow.actor.cpp

303 lines
8.3 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* CoroFlow.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
2022-03-22 04:36:23 +08:00
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/CoroFlow.h"
2017-05-26 04:48:44 +08:00
#include "flow/ActorCollection.h"
#include "flow/TDMetric.actor.h"
#include "fdbrpc/simulator.h"
2020-12-02 05:19:25 +08:00
#include <boost/coroutine2/all.hpp>
#include <boost/coroutine2/coroutine.hpp>
#include <functional>
#include "flow/flow.h"
#include "flow/network.h"
2019-02-18 10:46:59 +08:00
#include "flow/actorcompiler.h" // has to be last include
2017-05-26 04:48:44 +08:00
2021-01-27 08:23:17 +08:00
using coro_t = boost::coroutines2::coroutine<Future<Void>>;
2017-05-26 04:48:44 +08:00
2021-01-27 08:23:17 +08:00
// Coro *current_coro = 0, *main_coro = 0;
// Coro* swapCoro( Coro* n ) {
// Coro* t = current_coro;
// current_coro = n;
// return t;
// }
2017-05-26 04:48:44 +08:00
/*struct IThreadlike {
public:
virtual void start() = 0; // Call at most once! Causes run() to be called on the 'thread'.
virtual ~IThreadlike() {} // Pre: start hasn't been called, or run() has returned
virtual void unblock() = 0; // Pre: block() has been called by run(). Causes block() to return.
2017-05-26 04:48:44 +08:00
protected:
virtual void block() = 0; // Call only from run(). Returns when unblock() is called elsewhere.
virtual void run() = 0; // To be overridden by client. Returning causes the thread to block until it is
destroyed.
2017-05-26 04:48:44 +08:00
};*/
2021-01-27 08:23:17 +08:00
struct Coroutine;
Coroutine* current_coro;
2017-05-26 04:48:44 +08:00
struct Coroutine /*: IThreadlike*/ {
2020-12-02 05:19:25 +08:00
Coroutine() = default;
~Coroutine() { *alive = false; }
2017-05-26 04:48:44 +08:00
void start() {
2020-12-02 05:19:25 +08:00
coro.reset(new coro_t::pull_type([this](coro_t::push_type& sink) { entry(sink); }));
switcher(this);
2017-05-26 04:48:44 +08:00
}
void unblock() {
// Coro_switchTo_( swapCoro(coro), coro );
// Copy blocked before calling send, since the call to send might destroy it.
auto b = blocked;
b.send(Void());
2017-05-26 04:48:44 +08:00
}
void waitFor(Future<Void> const& what) {
ASSERT(current_coro == this);
current_coro = nullptr;
(*sink)(what); // Pass control back to the switcher actor
2021-01-27 08:23:17 +08:00
ASSERT(what.isReady());
current_coro = this;
2021-01-27 08:23:17 +08:00
}
2017-05-26 04:48:44 +08:00
protected:
2020-12-02 05:19:25 +08:00
ACTOR static void switcher(Coroutine* self) {
state std::shared_ptr<bool> alive = self->alive;
while (*alive && *self->coro) {
try {
wait(self->coro->get());
} catch (Error& e) {
// We just want to transfer control back to the coroutine. The coroutine will handle the error.
}
(*self->coro)(); // Transfer control to the coroutine. This call "returns" after waitFor is called.
2020-12-02 05:19:25 +08:00
wait(delay(0, g_network->getCurrentTask()));
}
}
void entry(coro_t::push_type& sink) {
current_coro = this;
this->sink = &sink;
run();
}
2017-05-26 04:48:44 +08:00
void block() {
// Coro_switchTo_( swapCoro(main_coro), main_coro );
2017-05-26 04:48:44 +08:00
blocked = Promise<Void>();
double before = now();
CoroThreadPool::waitFor(blocked.getFuture());
if (g_network->isSimulated() && g_simulator.getCurrentProcess()->rebooting)
TraceEvent("CoroUnblocked").detail("After", now() - before);
2017-05-26 04:48:44 +08:00
}
virtual void run() = 0;
private:
2021-01-27 08:23:17 +08:00
coro_t::push_type* sink;
2017-05-26 04:48:44 +08:00
Promise<Void> blocked;
2020-12-02 05:19:25 +08:00
std::shared_ptr<bool> alive{ std::make_shared<bool>(true) };
std::unique_ptr<coro_t::pull_type> coro;
2017-05-26 04:48:44 +08:00
};
template <class Threadlike, class Mutex, bool IS_CORO>
class WorkPool final : public IThreadPool, public ReferenceCounted<WorkPool<Threadlike, Mutex, IS_CORO>> {
2017-05-26 04:48:44 +08:00
struct Worker;
// Pool can survive the destruction of WorkPool while it waits for workers to terminate
struct Pool : ReferenceCounted<Pool> {
Mutex queueLock;
Deque<PThreadAction> work;
std::vector<Worker*> idle, workers;
ActorCollection anyError, allStopped;
Future<Void> m_holdRefUntilStopped;
Pool() : anyError(false), allStopped(true) { m_holdRefUntilStopped = holdRefUntilStopped(this); }
2017-05-26 04:48:44 +08:00
~Pool() {
for (int c = 0; c < workers.size(); c++)
2017-05-26 04:48:44 +08:00
delete workers[c];
}
ACTOR Future<Void> holdRefUntilStopped(Pool* p) {
2017-05-26 04:48:44 +08:00
p->addref();
wait(p->allStopped.getResult());
2017-05-26 04:48:44 +08:00
p->delref();
return Void();
}
};
struct Worker final : Threadlike {
2017-05-26 04:48:44 +08:00
Pool* pool;
IThreadPoolReceiver* userData;
bool stop;
ThreadReturnPromise<Void> stopped;
ThreadReturnPromise<Void> error;
Worker(Pool* pool, IThreadPoolReceiver* userData) : pool(pool), userData(userData), stop(false) {}
2017-05-26 04:48:44 +08:00
void run() override {
2017-05-26 04:48:44 +08:00
try {
if (!stop)
2017-05-26 04:48:44 +08:00
userData->init();
2021-01-27 08:23:17 +08:00
2017-05-26 04:48:44 +08:00
while (!stop) {
pool->queueLock.enter();
if (pool->work.empty()) {
pool->idle.push_back(this);
2017-05-26 04:48:44 +08:00
pool->queueLock.leave();
Threadlike::block();
} else {
PThreadAction a = pool->work.front();
pool->work.pop_front();
pool->queueLock.leave();
(*a)(userData);
if (IS_CORO)
CoroThreadPool::waitFor(yield());
2017-05-26 04:48:44 +08:00
}
}
TraceEvent("CoroStop").log();
2017-05-26 04:48:44 +08:00
delete userData;
stopped.send(Void());
return;
} catch (Error& e) {
TraceEvent("WorkPoolError").errorUnsuppressed(e);
2017-05-26 04:48:44 +08:00
error.sendError(e);
} catch (...) {
TraceEvent("WorkPoolError").log();
2017-05-26 04:48:44 +08:00
error.sendError(unknown_error());
}
try {
delete userData;
} catch (...) {
TraceEvent(SevError, "WorkPoolErrorShutdownError").log();
2017-05-26 04:48:44 +08:00
}
stopped.send(Void());
}
};
Reference<Pool> pool;
Future<Void> m_stopOnError; // must be last, because its cancellation calls stop()!
2017-05-26 04:48:44 +08:00
Error error;
ACTOR Future<Void> stopOnError(WorkPool* w) {
2021-01-27 08:23:17 +08:00
try {
2020-11-04 04:29:32 +08:00
wait(w->getError());
ASSERT(false);
2017-05-26 04:48:44 +08:00
} catch (Error& e) {
w->stop(e);
2017-05-26 04:48:44 +08:00
}
return Void();
}
void checkError() {
if (error.code() != invalid_error_code) {
ASSERT(error.code() != error_code_success); // Calling post or addThread after stop is an error
2017-05-26 04:48:44 +08:00
throw error;
}
}
public:
WorkPool() : pool(new Pool) { m_stopOnError = stopOnError(this); }
2017-05-26 04:48:44 +08:00
2020-10-08 06:55:11 +08:00
Future<Void> getError() const override { return pool->anyError.getResult(); }
void addThread(IThreadPoolReceiver* userData, const char*) override {
2017-05-26 04:48:44 +08:00
checkError();
auto w = new Worker(pool.getPtr(), userData);
pool->queueLock.enter();
pool->workers.push_back(w);
2017-05-26 04:48:44 +08:00
pool->queueLock.leave();
pool->anyError.add(w->error.getFuture());
pool->allStopped.add(w->stopped.getFuture());
2017-05-26 04:48:44 +08:00
startWorker(w);
}
ACTOR static void startWorker(Worker* w) {
2017-05-26 04:48:44 +08:00
// We want to make sure that coroutines are always started after Net2::run() is called, so the main coroutine is
// initialized.
wait(delay(0, g_network->getCurrentTask()));
2017-05-26 04:48:44 +08:00
w->start();
}
2020-10-08 06:55:11 +08:00
void post(PThreadAction action) override {
2017-05-26 04:48:44 +08:00
checkError();
pool->queueLock.enter();
pool->work.push_back(action);
if (!pool->idle.empty()) {
Worker* c = pool->idle.back();
pool->idle.pop_back();
pool->queueLock.leave();
c->unblock();
} else
pool->queueLock.leave();
}
2020-10-08 06:55:11 +08:00
Future<Void> stop(Error const& e) override {
if (error.code() == invalid_error_code) {
error = e;
}
2017-05-26 04:48:44 +08:00
pool->queueLock.enter();
TraceEvent("WorkPool_Stop")
.errorUnsuppressed(e)
.detail("Workers", pool->workers.size())
.detail("Idle", pool->idle.size())
.detail("Work", pool->work.size());
for (uint32_t i = 0; i < pool->work.size(); i++)
pool->work[i]->cancel(); // What if cancel() does something to this?
2017-05-26 04:48:44 +08:00
pool->work.clear();
for (int i = 0; i < pool->workers.size(); i++)
2017-05-26 04:48:44 +08:00
pool->workers[i]->stop = true;
2021-01-27 08:23:17 +08:00
std::vector<Worker*> idle;
2017-05-26 04:48:44 +08:00
std::swap(idle, pool->idle);
pool->queueLock.leave();
for (int i = 0; i < idle.size(); i++)
2017-05-26 04:48:44 +08:00
idle[i]->unblock();
pool->allStopped.add(Void());
2017-05-26 04:48:44 +08:00
return pool->allStopped.getResult();
}
2020-10-08 06:55:11 +08:00
bool isCoro() const override { return IS_CORO; }
void addref() override { ReferenceCounted<WorkPool>::addref(); }
void delref() override { ReferenceCounted<WorkPool>::delref(); }
2017-05-26 04:48:44 +08:00
};
typedef WorkPool<Coroutine, ThreadUnsafeSpinLock, true> CoroPool;
void CoroThreadPool::waitFor(Future<Void> what) {
ASSERT(current_coro != nullptr);
if (what.isReady())
return;
2019-04-17 00:48:15 +08:00
// double t = now();
current_coro->waitFor(what);
what.get(); // Throw if |what| is an error
2017-05-26 04:48:44 +08:00
}
// Right After INet2::run
void CoroThreadPool::init() {}
2017-05-26 04:48:44 +08:00
Reference<IThreadPool> CoroThreadPool::createThreadPool() {
return Reference<IThreadPool>(new CoroPool);
2017-05-26 04:48:44 +08:00
}