[ORC] Move SimpleRemoteEPCServer::Dispatcher into OrcShared.

Renames SimpleRemoteEPCServer::Dispatcher to SimpleRemoteEPCDispatcher and
moves it into OrcShared. SimpleRemoteEPCServer::ThreadDispatcher is similarly
moved and renamed to DynamicThreadPoolSimpleRemoteEPCDispatcher.

This will allow these classes to be reused by SimpleRemoteEPC on the controller
side of the connection.
This commit is contained in:
Lang Hames 2021-10-07 22:04:48 -07:00
parent b0f68791f0
commit dfd74db981
6 changed files with 57 additions and 53 deletions

View File

@ -15,6 +15,7 @@
#define LLVM_EXECUTIONENGINE_ORC_SHARED_SIMPLEREMOTEEPCUTILS_H
#include "llvm/ADT/ArrayRef.h"
#include "llvm/ADT/FunctionExtras.h"
#include "llvm/ADT/SmallVector.h"
#include "llvm/ADT/StringMap.h"
#include "llvm/ExecutionEngine/Orc/Shared/ExecutorAddress.h"
@ -138,6 +139,29 @@ private:
std::atomic<bool> Disconnected{false};
};
/// Dispatches calls to runWrapper.
class SimpleRemoteEPCDispatcher {
public:
virtual ~SimpleRemoteEPCDispatcher();
virtual void dispatch(unique_function<void()> Work) = 0;
virtual void shutdown() = 0;
};
#if LLVM_ENABLE_THREADS
class DynamicThreadPoolSimpleRemoteEPCDispatcher
: public SimpleRemoteEPCDispatcher {
public:
void dispatch(unique_function<void()> Work) override;
void shutdown() override;
private:
std::mutex DispatchMutex;
bool Running = true;
size_t Outstanding = 0;
std::condition_variable OutstandingCV;
};
#endif
struct RemoteSymbolLookupSetElement {
std::string Name;
bool Required;

View File

@ -37,28 +37,6 @@ class SimpleRemoteEPCServer : public SimpleRemoteEPCTransportClient {
public:
using ReportErrorFunction = unique_function<void(Error)>;
/// Dispatches calls to runWrapper.
class Dispatcher {
public:
virtual ~Dispatcher();
virtual void dispatch(unique_function<void()> Work) = 0;
virtual void shutdown() = 0;
};
#if LLVM_ENABLE_THREADS
class ThreadDispatcher : public Dispatcher {
public:
void dispatch(unique_function<void()> Work) override;
void shutdown() override;
private:
std::mutex DispatchMutex;
bool Running = true;
size_t Outstanding = 0;
std::condition_variable OutstandingCV;
};
#endif
class Setup {
friend class SimpleRemoteEPCServer;
@ -68,7 +46,9 @@ public:
std::vector<std::unique_ptr<ExecutorBootstrapService>> &services() {
return Services;
}
void setDispatcher(std::unique_ptr<Dispatcher> D) { S.D = std::move(D); }
void setDispatcher(std::unique_ptr<SimpleRemoteEPCDispatcher> D) {
S.D = std::move(D);
}
void setErrorReporter(unique_function<void(Error)> ReportError) {
S.ReportError = std::move(ReportError);
}
@ -166,7 +146,7 @@ private:
enum { ServerRunning, ServerShuttingDown, ServerShutDown } RunState;
Error ShutdownErr = Error::success();
std::unique_ptr<SimpleRemoteEPCTransport> T;
std::unique_ptr<Dispatcher> D;
std::unique_ptr<SimpleRemoteEPCDispatcher> D;
std::vector<std::unique_ptr<ExecutorBootstrapService>> Services;
ReportErrorFunction ReportError;

View File

@ -241,5 +241,32 @@ void FDSimpleRemoteEPCTransport::listenLoop() {
C.handleDisconnect(std::move(Err));
}
SimpleRemoteEPCDispatcher::~SimpleRemoteEPCDispatcher() {}
#if LLVM_ENABLE_THREADS
void DynamicThreadPoolSimpleRemoteEPCDispatcher::dispatch(
unique_function<void()> Work) {
{
std::lock_guard<std::mutex> Lock(DispatchMutex);
if (!Running)
return;
++Outstanding;
}
std::thread([this, Work = std::move(Work)]() mutable {
Work();
std::lock_guard<std::mutex> Lock(DispatchMutex);
--Outstanding;
OutstandingCV.notify_all();
}).detach();
}
void DynamicThreadPoolSimpleRemoteEPCDispatcher::shutdown() {
std::unique_lock<std::mutex> Lock(DispatchMutex);
Running = false;
OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; });
}
#endif
} // end namespace orc
} // end namespace llvm

View File

@ -24,33 +24,6 @@ namespace orc {
ExecutorBootstrapService::~ExecutorBootstrapService() {}
SimpleRemoteEPCServer::Dispatcher::~Dispatcher() {}
#if LLVM_ENABLE_THREADS
void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
unique_function<void()> Work) {
{
std::lock_guard<std::mutex> Lock(DispatchMutex);
if (!Running)
return;
++Outstanding;
}
std::thread([this, Work = std::move(Work)]() mutable {
Work();
std::lock_guard<std::mutex> Lock(DispatchMutex);
--Outstanding;
OutstandingCV.notify_all();
}).detach();
}
void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
std::unique_lock<std::mutex> Lock(DispatchMutex);
Running = false;
OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; });
}
#endif
StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
StringMap<ExecutorAddr> DBS;
rt_bootstrap::addTo(DBS);

View File

@ -54,7 +54,7 @@ int main(int argc, char *argv[]) {
ExitOnErr(SimpleRemoteEPCServer::Create<FDSimpleRemoteEPCTransport>(
[](SimpleRemoteEPCServer::Setup &S) -> Error {
S.setDispatcher(
std::make_unique<SimpleRemoteEPCServer::ThreadDispatcher>());
std::make_unique<DynamicThreadPoolSimpleRemoteEPCDispatcher>());
S.bootstrapSymbols() =
SimpleRemoteEPCServer::defaultBootstrapSymbols();
S.services().push_back(

View File

@ -158,7 +158,7 @@ int main(int argc, char *argv[]) {
ExitOnErr(SimpleRemoteEPCServer::Create<FDSimpleRemoteEPCTransport>(
[](SimpleRemoteEPCServer::Setup &S) -> Error {
S.setDispatcher(
std::make_unique<SimpleRemoteEPCServer::ThreadDispatcher>());
std::make_unique<DynamicThreadPoolSimpleRemoteEPCDispatcher>());
S.bootstrapSymbols() =
SimpleRemoteEPCServer::defaultBootstrapSymbols();
S.services().push_back(