Merge pull request #5142 from Daniel-B-Smith/threads-in-simulation

Enable IThreadPool in simulation
This commit is contained in:
Steve Atherton 2021-07-12 14:24:34 -07:00 committed by GitHub
commit 9fd5f54992
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 21 additions and 21 deletions

View File

@ -1884,8 +1884,6 @@ bool ClientInfo::canReplace(Reference<ClientInfo> other) const {
}
// UNIT TESTS
extern bool noUnseed;
TEST_CASE("/fdbclient/multiversionclient/EnvironmentVariableParsing") {
auto vals = parseOptionValues("a");
ASSERT(vals.size() == 1 && vals[0] == "a");

View File

@ -262,8 +262,8 @@ struct YieldMockNetwork final : INetwork, ReferenceCounted<YieldMockNetwork> {
return baseNetwork->onMainThread(std::move(signal), taskID);
}
bool isOnMainThread() const override { return baseNetwork->isOnMainThread(); }
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) override {
return baseNetwork->startThread(func, arg);
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg, int stackSize, const char* name) override {
return baseNetwork->startThread(func, arg, stackSize, name);
}
Future<Reference<class IAsyncFile>> open(std::string filename, int64_t flags, int64_t mode) {
return IAsyncFileSystem::filesystem()->open(filename, flags, mode);

View File

@ -1007,9 +1007,9 @@ public:
THREAD_RETURN;
}
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) override {
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg, int stackSize, const char* name) override {
SimThreadArgs* simArgs = new SimThreadArgs(func, arg);
return ::startThread(simStartThread, simArgs);
return ::startThread(simStartThread, simArgs, stackSize, name);
}
void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) override {

View File

@ -941,8 +941,6 @@ struct DDQueueData {
}
};
extern bool noUnseed;
// This actor relocates the specified keys to a good place.
// The inFlightActor key range map stores the actor for each RelocateData
ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd, const DDEnabledState* ddEnabledState) {

View File

@ -31,8 +31,6 @@
#define OP_DISK_OVERHEAD (sizeof(OpHeader) + 1)
extern bool noUnseed;
template <typename Container>
class KeyValueStoreMemory final : public IKeyValueStore, NonCopyable {
public:

View File

@ -199,7 +199,6 @@ extern const char* getSourceVersion();
extern void flushTraceFileVoid();
extern bool noUnseed;
extern const int MAX_CLUSTER_FILE_BYTES;
#ifdef ALLOC_INSTRUMENTATION

View File

@ -100,8 +100,6 @@ private:
}
};
extern bool noUnseed;
// A workload which uses the thread safe API from multiple threads
struct ThreadSafetyWorkload : TestWorkload {
int threadsPerClient;

View File

@ -109,7 +109,7 @@ public:
}
void addThread(IThreadPoolReceiver* userData, const char* name) override {
threads.push_back(new Thread(this, userData));
threads.back()->handle = startThread(start, threads.back(), stackSize, name);
threads.back()->handle = g_network->startThread(start, threads.back(), stackSize, name);
}
void post(PThreadAction action) override { ios.post(ActionWrapper(action)); }
};

View File

@ -35,7 +35,9 @@ struct ThreadNameReceiver : IThreadPoolReceiver {
}
};
TEST_CASE("noSim/IThreadPool/NamedThread") {
TEST_CASE("/flow/IThreadPool/NamedThread") {
noUnseed = true;
state Reference<IThreadPool> pool = createGenericThreadPool();
pool->addThread(new ThreadNameReceiver(), "thread-foo");

View File

@ -184,7 +184,7 @@ public:
}
bool isSimulated() const override { return false; }
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) override;
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg, int stackSize, const char* name) override;
void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) override;
bool isAddressOnThisHost(NetworkAddress const& addr) const override;
@ -1513,7 +1513,7 @@ void Net2::run() {
double newTaskBegin = timer_monotonic();
if (check_yield(TaskPriority::Max, tscNow)) {
checkForSlowTask(tscBegin, tscNow, newTaskBegin - taskBegin, currentTaskID);
taskBegin = newTaskBegin;
taskBegin = newTaskBegin;
FDB_TRACE_PROBE(run_loop_yield);
++countYields;
break;
@ -1765,8 +1765,8 @@ void Net2::onMainThread(Promise<Void>&& signal, TaskPriority taskID) {
}
}
THREAD_HANDLE Net2::startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) {
return ::startThread(func, arg);
THREAD_HANDLE Net2::startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg, int stackSize, const char* name) {
return ::startThread(func, arg, stackSize, name);
}
Future<Reference<IConnection>> Net2::connect(NetworkAddress toAddr, const std::string& host) {

View File

@ -99,6 +99,9 @@ struct UnitTestCollection {
extern UnitTestCollection g_unittests;
// Set this to `true` to disable RNG state checking after simulation runs.
extern bool noUnseed;
#define APPEND(a, b) a##b
// FILE_UNIQUE_NAME(basename) expands to a name like basename456 if on line 456

View File

@ -347,7 +347,8 @@ struct NetworkMetrics {
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
double lastRunLoopBusyness; // network thread busyness (measured every 5s by default)
std::atomic<double> networkBusyness; // network thread busyness which is returned to the the client (measured every 1s by default)
std::atomic<double>
networkBusyness; // network thread busyness which is returned to the the client (measured every 1s by default)
// starvation trackers which keeps track of different task priorities
std::vector<struct PriorityStats> starvationTrackers;
@ -536,7 +537,10 @@ public:
virtual void onMainThread(Promise<Void>&& signal, TaskPriority taskID) = 0;
// Executes signal.send(Void()) on a/the thread belonging to this network
virtual THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) = 0;
virtual THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*),
void* arg,
int stackSize = 0,
const char* name = nullptr) = 0;
// Starts a thread and returns a handle to it
virtual void run() = 0;