Merge pull request #4009 from Daniel-B-Smith/rocksdb-named-threads

Name the RocksDB background threads
This commit is contained in:
Russell Sears 2021-03-03 20:34:32 -08:00 committed by GitHub
commit d2d6fcccfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 33 additions and 25 deletions

View File

@ -58,8 +58,7 @@ struct Coroutine /*: IThreadlike*/ {
void start() {
int result = Coro_startCoro_( swapCoro(coro), coro, this, &entry );
if (result == ENOMEM)
platform::outOfMemory();
if (result == ENOMEM) platform::outOfMemory();
}
void unblock() {
@ -136,7 +135,7 @@ class WorkPool : public IThreadPool, public ReferenceCounted<WorkPool<Threadlike
try {
if(!stop)
userData->init();
while (!stop) {
pool->queueLock.enter();
if (pool->work.empty()) {
@ -178,8 +177,8 @@ class WorkPool : public IThreadPool, public ReferenceCounted<WorkPool<Threadlike
Error error;
ACTOR Future<Void> stopOnError( WorkPool* w ) {
try {
wait( w->getError() );
try {
wait(w->getError());
ASSERT(false);
} catch (Error& e) {
w->stop(e);
@ -200,7 +199,7 @@ public:
}
virtual Future<Void> getError() { return pool->anyError.getResult(); }
virtual void addThread( IThreadPoolReceiver* userData ) {
virtual void addThread(IThreadPoolReceiver* userData, const char*) {
checkError();
auto w = new Worker(pool.getPtr(), userData);
@ -245,7 +244,7 @@ public:
for(int i=0; i<pool->workers.size(); i++)
pool->workers[i]->stop = true;
std::vector<Worker*> idle;
std::vector<Worker*> idle;
std::swap(idle, pool->idle);
pool->queueLock.leave();
@ -294,8 +293,7 @@ void CoroThreadPool::init()
{
if (!current_coro) {
current_coro = main_coro = Coro_new();
if (main_coro == NULL)
platform::outOfMemory();
if (main_coro == NULL) platform::outOfMemory();
Coro_initializeMainCoro(main_coro);
//printf("Main thread: %d bytes stack presumed available\n", Coro_bytesLeftOnStack(current_coro));

View File

@ -369,9 +369,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
{
writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool();
writeThread->addThread(new Writer(db, id));
writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr");
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(db));
readThreads->addThread(new Reader(db), "fdb-rocksdb-re");
}
}

View File

@ -95,9 +95,9 @@ public:
virtual Future<Void> getError() { return Never(); } // FIXME
virtual void addref() { ReferenceCounted<ThreadPool>::addref(); }
virtual void delref() { if (ReferenceCounted<ThreadPool>::delref_no_destroy()) stop(); }
void addThread( IThreadPoolReceiver* userData ) {
void addThread(IThreadPoolReceiver* userData, const char* name) {
threads.push_back(new Thread(this, userData));
startThread(start, threads.back(), stackSize);
startThread(start, threads.back(), stackSize, name);
}
void post( PThreadAction action ) {
ios.post( ActionWrapper( action ) );

View File

@ -22,6 +22,8 @@
#define FLOW_ITHREADPOOL_H
#pragma once
#include <string_view>
#include "flow/flow.h"
// The IThreadPool interface represents a thread pool suitable for doing blocking disk-intensive work
@ -47,7 +49,7 @@ public:
virtual void init() = 0;
};
struct ThreadAction {
struct ThreadAction {
virtual void operator()(IThreadPoolReceiver*) = 0; // self-destructs
virtual void cancel() = 0;
virtual double getTimeEstimate() = 0; // for simulation
@ -58,7 +60,7 @@ class IThreadPool {
public:
virtual ~IThreadPool() {}
virtual Future<Void> getError() = 0; // asynchronously throws an error if there is an internal error
virtual void addThread( IThreadPoolReceiver* userData ) = 0;
virtual void addThread(IThreadPoolReceiver* userData, const char* name = nullptr) = 0;
virtual void post( PThreadAction action ) = 0;
virtual Future<Void> stop(Error const& e = success()) = 0;
virtual bool isCoro() const { return false; }

View File

@ -1047,9 +1047,8 @@ void getDiskStatistics(std::string const& directory, uint64_t& currentIOs, uint6
reads = total_transfers_read;
writes = total_transfers_write;
writeSectors = total_blocks_read;
readSectors = total_blocks_write;
readSectors = total_blocks_write;
}
}
dev_t getDeviceId(std::string path) {
@ -2519,11 +2518,11 @@ void setCloseOnExec( int fd ) {
} // namespace platform
#ifdef _WIN32
THREAD_HANDLE startThread(void (*func) (void *), void *arg, int stackSize) {
THREAD_HANDLE startThread(void (*func)(void*), void* arg, int stackSize, const char* name) {
return (void *)_beginthread(func, stackSize, arg);
}
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
THREAD_HANDLE startThread(void *(*func) (void *), void *arg, int stackSize) {
THREAD_HANDLE startThread(void* (*func)(void*), void* arg, int stackSize, const char* name) {
pthread_t t;
pthread_attr_t attr;
@ -2542,6 +2541,13 @@ THREAD_HANDLE startThread(void *(*func) (void *), void *arg, int stackSize) {
pthread_create(&t, &attr, func, arg);
pthread_attr_destroy(&attr);
#if defined(__linux__)
if (name != nullptr) {
// TODO: Should this just truncate?
ASSERT_EQ(pthread_setname_np(t, name), 0);
}
#endif
return t;
}
#else
@ -3273,7 +3279,7 @@ int64_t getNumProfilesCaptured() {
void profileHandler(int sig) {
#ifdef __linux__
if(!profileThread) {
if (!profileThread) {
return;
}
@ -3311,7 +3317,7 @@ void profileHandler(int sig) {
#endif
}
void setProfilingEnabled(int enabled) {
void setProfilingEnabled(int enabled) {
#ifdef __linux__
if(profileThread && enabled && !profilingEnabled && profileRequested) {
profilingEnabled = true;
@ -3323,7 +3329,7 @@ void setProfilingEnabled(int enabled) {
}
#else
// No profiling for other platforms!
#endif
#endif
}
void* checkThread(void *arg) {

View File

@ -153,13 +153,15 @@ inline static T& makeDependent(T& value) { return value; }
#define THREAD_FUNC static void __cdecl
#define THREAD_FUNC_RETURN void
#define THREAD_HANDLE void *
THREAD_HANDLE startThread(void (func) (void *), void *arg, int stackSize = 0);
THREAD_HANDLE startThread(void(func)(void*), void* arg, int stackSize = 0, const char* name = nullptr);
#define THREAD_RETURN return
#elif defined(__unixish__)
#define THREAD_FUNC static void *
#define THREAD_FUNC_RETURN void *
#define THREAD_HANDLE pthread_t
THREAD_HANDLE startThread(void *(func) (void *), void *arg, int stackSize = 0);
// The last parameter is an optional name for the thread. It is only supported on Linux and has a
// limit of 16 characters.
THREAD_HANDLE startThread(void*(func)(void*), void* arg, int stackSize = 0, const char* name = nullptr);
#define THREAD_RETURN return NULL
#else
#error How do I start a new thread on this platform?

View File

@ -61,7 +61,7 @@ public:
Future<Void> getError() {
return errors.getFuture();
}
void addThread( IThreadPoolReceiver* userData ) {
void addThread(IThreadPoolReceiver* userData, const char*) {
ASSERT( !thread );
thread = userData;
}