From 95f5a73a3d8bcc24089e893bc4303a6c8ea7733d Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Fri, 26 Feb 2021 11:22:31 -0800 Subject: [PATCH 01/14] go: Update generated.go --- bindings/go/src/fdb/generated.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/bindings/go/src/fdb/generated.go b/bindings/go/src/fdb/generated.go index 81fdbbdeb5..afbedf4e15 100644 --- a/bindings/go/src/fdb/generated.go +++ b/bindings/go/src/fdb/generated.go @@ -220,11 +220,18 @@ func (o NetworkOptions) SetExternalClientDirectory(param string) error { return o.setOpt(63, []byte(param)) } -// Prevents connections through the local client, allowing only connections through externally loaded client libraries. Intended primarily for testing. +// Prevents connections through the local client, allowing only connections through externally loaded client libraries. func (o NetworkOptions) SetDisableLocalClient() error { return o.setOpt(64, nil) } +// Spawns multiple worker threads for each version of the client that is loaded. Setting this to a number greater than one implies disable_local_client. +// +// Parameter: Number of client threads to be spawned. Each cluster will be serviced by a single client thread. +func (o NetworkOptions) SetClientThreadsPerVersion(param int64) error { + return o.setOpt(65, int64ToBytes(param)) +} + // Disables logging of client statistics, such as sampled transaction activity. func (o NetworkOptions) SetDisableClientStatisticsLogging() error { return o.setOpt(70, nil) @@ -521,6 +528,25 @@ func (o TransactionOptions) SetReportConflictingKeys() error { return o.setOpt(712, nil) } +// By default, the special key space will only allow users to read from exactly one module (a subspace in the special key space). Use this option to allow reading from zero or more modules. Users who set this option should be prepared for new modules, which may have different behaviors than the modules they're currently reading. For example, a new module might block or return an error. +func (o TransactionOptions) SetSpecialKeySpaceRelaxed() error { + return o.setOpt(713, nil) +} + +// Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction. +// +// Parameter: String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters. +func (o TransactionOptions) SetTag(param string) error { + return o.setOpt(800, []byte(param)) +} + +// Adds a tag to the transaction that can be used to apply manual or automatic targeted throttling. At most 5 tags can be set on a transaction. +// +// Parameter: String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters. +func (o TransactionOptions) SetAutoThrottleTag(param string) error { + return o.setOpt(801, []byte(param)) +} + type StreamingMode int const ( From 179dea5a1b7a3c5eea3b0dc25c9f46faf2455e96 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 3 Nov 2020 20:29:32 +0000 Subject: [PATCH 02/14] Name the RocksDB background threads --- fdbserver/CoroFlow.actor.cpp | 16 ++++++-------- fdbserver/KeyValueStoreRocksDB.actor.cpp | 4 ++-- flow/IThreadPool.cpp | 4 ++-- flow/IThreadPool.h | 6 +++-- flow/Platform.cpp | 28 +++++++++++++++++------- flow/Platform.h | 4 ++-- flow/Trace.cpp | 2 +- 7 files changed, 38 insertions(+), 26 deletions(-) diff --git a/fdbserver/CoroFlow.actor.cpp b/fdbserver/CoroFlow.actor.cpp index 751e89855c..b5012f9803 100644 --- a/fdbserver/CoroFlow.actor.cpp +++ b/fdbserver/CoroFlow.actor.cpp @@ -58,8 +58,7 @@ struct Coroutine /*: IThreadlike*/ { void start() { int result = Coro_startCoro_( swapCoro(coro), coro, this, &entry ); - if (result == ENOMEM) - platform::outOfMemory(); + if (result == ENOMEM) platform::outOfMemory(); } void unblock() { @@ -136,7 +135,7 @@ class WorkPool : public IThreadPool, public ReferenceCountedinit(); - + while (!stop) { pool->queueLock.enter(); if (pool->work.empty()) { @@ -178,8 +177,8 @@ class WorkPool : public IThreadPool, public ReferenceCounted stopOnError( WorkPool* w ) { - try { - wait( w->getError() ); + try { + wait(w->getError()); ASSERT(false); } catch (Error& e) { w->stop(e); @@ -200,7 +199,7 @@ public: } virtual Future getError() { return pool->anyError.getResult(); } - virtual void addThread( IThreadPoolReceiver* userData ) { + virtual void addThread(IThreadPoolReceiver* userData, const char*) { checkError(); auto w = new Worker(pool.getPtr(), userData); @@ -245,7 +244,7 @@ public: for(int i=0; iworkers.size(); i++) pool->workers[i]->stop = true; - std::vector idle; + std::vector idle; std::swap(idle, pool->idle); pool->queueLock.leave(); @@ -294,8 +293,7 @@ void CoroThreadPool::init() { if (!current_coro) { current_coro = main_coro = Coro_new(); - if (main_coro == NULL) - platform::outOfMemory(); + if (main_coro == NULL) platform::outOfMemory(); Coro_initializeMainCoro(main_coro); //printf("Main thread: %d bytes stack presumed available\n", Coro_bytesLeftOnStack(current_coro)); diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 6a8edf1f7f..8eea7aacc2 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -359,9 +359,9 @@ struct RocksDBKeyValueStore : IKeyValueStore { { writeThread = createGenericThreadPool(); readThreads = createGenericThreadPool(); - writeThread->addThread(new Writer(db, id)); + writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr"); for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { - readThreads->addThread(new Reader(db)); + readThreads->addThread(new Reader(db), "fdb-rocksdb-re"); } } diff --git a/flow/IThreadPool.cpp b/flow/IThreadPool.cpp index 3e375048f0..9c85f8d2cd 100644 --- a/flow/IThreadPool.cpp +++ b/flow/IThreadPool.cpp @@ -95,9 +95,9 @@ public: virtual Future getError() { return Never(); } // FIXME virtual void addref() { ReferenceCounted::addref(); } virtual void delref() { if (ReferenceCounted::delref_no_destroy()) stop(); } - void addThread( IThreadPoolReceiver* userData ) { + void addThread(IThreadPoolReceiver* userData, const char* name) { threads.push_back(new Thread(this, userData)); - startThread(start, threads.back(), stackSize); + startThread(start, threads.back(), stackSize, name); } void post( PThreadAction action ) { ios.post( ActionWrapper( action ) ); diff --git a/flow/IThreadPool.h b/flow/IThreadPool.h index 651d7c5f5a..4f94d7a5a7 100644 --- a/flow/IThreadPool.h +++ b/flow/IThreadPool.h @@ -22,6 +22,8 @@ #define FLOW_ITHREADPOOL_H #pragma once +#include + #include "flow/flow.h" // The IThreadPool interface represents a thread pool suitable for doing blocking disk-intensive work @@ -47,7 +49,7 @@ public: virtual void init() = 0; }; -struct ThreadAction { +struct ThreadAction { virtual void operator()(IThreadPoolReceiver*) = 0; // self-destructs virtual void cancel() = 0; virtual double getTimeEstimate() = 0; // for simulation @@ -58,7 +60,7 @@ class IThreadPool { public: virtual ~IThreadPool() {} virtual Future getError() = 0; // asynchronously throws an error if there is an internal error - virtual void addThread( IThreadPoolReceiver* userData ) = 0; + virtual void addThread(IThreadPoolReceiver* userData, const char* name = nullptr) = 0; virtual void post( PThreadAction action ) = 0; virtual Future stop(Error const& e = success()) = 0; virtual bool isCoro() const { return false; } diff --git a/flow/Platform.cpp b/flow/Platform.cpp index c43ec4439b..6cea07c096 100644 --- a/flow/Platform.cpp +++ b/flow/Platform.cpp @@ -1047,9 +1047,8 @@ void getDiskStatistics(std::string const& directory, uint64_t& currentIOs, uint6 reads = total_transfers_read; writes = total_transfers_write; writeSectors = total_blocks_read; - readSectors = total_blocks_write; + readSectors = total_blocks_write; } - } dev_t getDeviceId(std::string path) { @@ -2519,11 +2518,19 @@ void setCloseOnExec( int fd ) { } // namespace platform #ifdef _WIN32 -THREAD_HANDLE startThread(void (*func) (void *), void *arg, int stackSize) { - return (void *)_beginthread(func, stackSize, arg); +THREAD_HANDLE startThread(void (*func)(void*), void* arg, int stackSize, const char* name) { + // Convert `const char*` to `const wchar*` because Windows. + size_t newsize = strlen(orig) + 1; + wchar_t* wcstring = new wchar_t[newsize]; + size_t convertedChars = 0; + mbstowcs_s(&convertedChars, wcstring, newsize, name, _TRUNCATE); + auto handle = _beginthread(func, stackSize, arg); + SetThreadDescription(handle, wcstring); + delete[] wcstring; + return (void*)handle; } #elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__)) -THREAD_HANDLE startThread(void *(*func) (void *), void *arg, int stackSize) { +THREAD_HANDLE startThread(void* (*func)(void*), void* arg, int stackSize, const char* name) { pthread_t t; pthread_attr_t attr; @@ -2542,6 +2549,11 @@ THREAD_HANDLE startThread(void *(*func) (void *), void *arg, int stackSize) { pthread_create(&t, &attr, func, arg); pthread_attr_destroy(&attr); + if (name != nullptr) { + // TODO: Should this just truncate? + ASSERT_EQ(pthread_setname_np(t, name), 0); + } + return t; } #else @@ -3273,7 +3285,7 @@ int64_t getNumProfilesCaptured() { void profileHandler(int sig) { #ifdef __linux__ - if(!profileThread) { + if (!profileThread) { return; } @@ -3311,7 +3323,7 @@ void profileHandler(int sig) { #endif } -void setProfilingEnabled(int enabled) { +void setProfilingEnabled(int enabled) { #ifdef __linux__ if(profileThread && enabled && !profilingEnabled && profileRequested) { profilingEnabled = true; @@ -3323,7 +3335,7 @@ void setProfilingEnabled(int enabled) { } #else // No profiling for other platforms! -#endif +#endif } void* checkThread(void *arg) { diff --git a/flow/Platform.h b/flow/Platform.h index 776f560dde..55dd6f3d0e 100644 --- a/flow/Platform.h +++ b/flow/Platform.h @@ -153,13 +153,13 @@ inline static T& makeDependent(T& value) { return value; } #define THREAD_FUNC static void __cdecl #define THREAD_FUNC_RETURN void #define THREAD_HANDLE void * -THREAD_HANDLE startThread(void (func) (void *), void *arg, int stackSize = 0); +THREAD_HANDLE startThread(void(func)(void*), void* arg, int stackSize = 0, const char* name = nullptr); #define THREAD_RETURN return #elif defined(__unixish__) #define THREAD_FUNC static void * #define THREAD_FUNC_RETURN void * #define THREAD_HANDLE pthread_t -THREAD_HANDLE startThread(void *(func) (void *), void *arg, int stackSize = 0); +THREAD_HANDLE startThread(void*(func)(void*), void* arg, int stackSize = 0, const char* name = nullptr); #define THREAD_RETURN return NULL #else #error How do I start a new thread on this platform? diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 2ee97b3996..3a5da85a25 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -61,7 +61,7 @@ public: Future getError() { return errors.getFuture(); } - void addThread( IThreadPoolReceiver* userData ) { + void addThread(IThreadPoolReceiver* userData, const char*) { ASSERT( !thread ); thread = userData; } From 4adce4eb838ec7b75847f9df7229ca6805451b99 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 1 Mar 2021 20:59:25 +0000 Subject: [PATCH 03/14] Limit named thread support to Linux and add a comment documenting that. --- flow/Platform.cpp | 12 +++--------- flow/Platform.h | 2 ++ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/flow/Platform.cpp b/flow/Platform.cpp index 6cea07c096..9b7c77fc5b 100644 --- a/flow/Platform.cpp +++ b/flow/Platform.cpp @@ -2519,15 +2519,7 @@ void setCloseOnExec( int fd ) { #ifdef _WIN32 THREAD_HANDLE startThread(void (*func)(void*), void* arg, int stackSize, const char* name) { - // Convert `const char*` to `const wchar*` because Windows. - size_t newsize = strlen(orig) + 1; - wchar_t* wcstring = new wchar_t[newsize]; - size_t convertedChars = 0; - mbstowcs_s(&convertedChars, wcstring, newsize, name, _TRUNCATE); - auto handle = _beginthread(func, stackSize, arg); - SetThreadDescription(handle, wcstring); - delete[] wcstring; - return (void*)handle; + return (void *)_beginthread(func, stackSize, arg); } #elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__)) THREAD_HANDLE startThread(void* (*func)(void*), void* arg, int stackSize, const char* name) { @@ -2549,10 +2541,12 @@ THREAD_HANDLE startThread(void* (*func)(void*), void* arg, int stackSize, const pthread_create(&t, &attr, func, arg); pthread_attr_destroy(&attr); +#if defined(__linux__) if (name != nullptr) { // TODO: Should this just truncate? ASSERT_EQ(pthread_setname_np(t, name), 0); } +#endif return t; } diff --git a/flow/Platform.h b/flow/Platform.h index 55dd6f3d0e..491d49b74a 100644 --- a/flow/Platform.h +++ b/flow/Platform.h @@ -159,6 +159,8 @@ THREAD_HANDLE startThread(void(func)(void*), void* arg, int stackSize = 0, const #define THREAD_FUNC static void * #define THREAD_FUNC_RETURN void * #define THREAD_HANDLE pthread_t +// The last parameter is an optional name for the thread. It is only supported on Linux and has a +// limit of 16 characters. THREAD_HANDLE startThread(void*(func)(void*), void* arg, int stackSize = 0, const char* name = nullptr); #define THREAD_RETURN return NULL #else From cc4733247826bd0e04fe8d9e9634fc03294c31cd Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 2 Mar 2021 16:38:51 -0700 Subject: [PATCH 04/14] Added an actor to allow for async file renames --- fdbrpc/AsyncFileEIO.actor.h | 27 +++++++++++++++++++++++++++ fdbrpc/IAsyncFile.h | 4 ++++ fdbrpc/Net2FileSystem.cpp | 4 ++++ fdbrpc/Net2FileSystem.h | 9 ++++++--- fdbrpc/sim2.actor.cpp | 6 ++++++ fdbrpc/simulator.h | 9 ++++++--- 6 files changed, 53 insertions(+), 6 deletions(-) diff --git a/fdbrpc/AsyncFileEIO.actor.h b/fdbrpc/AsyncFileEIO.actor.h index 512a6c95aa..4042a72302 100644 --- a/fdbrpc/AsyncFileEIO.actor.h +++ b/fdbrpc/AsyncFileEIO.actor.h @@ -107,6 +107,33 @@ public: return Void(); } + ACTOR static Future renameFile(std::string from, std::string to) { + state TaskPriority taskID = g_network->getCurrentTask(); + state Promise p; + state eio_req* r = eio_rename(from.c_str(), to.c_str(), 0, eio_callback, &p); + try { + wait(p.getFuture()); + } catch (...) { + g_network->setCurrentTask(taskID); + eio_cancel(r); + throw; + } + try { + state int result = r->result; + if(result == -1) { + TraceEvent(SevError, "FileRenameError").detail("Errno", r->errorno); + throw internal_error(); + } else { + wait(delay(0, taskID)); + return Void(); + } + } catch (Error& e) { + state Error _e = e; + wait(delay(0, taskID)); + throw _e; + } + } + ACTOR static Future lastWriteTime( std::string filename ) { EIO_STRUCT_STAT statdata = wait(stat_impl(filename)); return statdata.st_mtime; diff --git a/fdbrpc/IAsyncFile.h b/fdbrpc/IAsyncFile.h index 8656d542bb..8ab13b6f2b 100644 --- a/fdbrpc/IAsyncFile.h +++ b/fdbrpc/IAsyncFile.h @@ -20,6 +20,7 @@ #ifndef FLOW_IASYNCFILE_H #define FLOW_IASYNCFILE_H +#include #pragma once #include @@ -98,6 +99,9 @@ public: // Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure. virtual Future< Void > deleteFile( std::string filename, bool mustBeDurable ) = 0; + // renames the file, doesn't sync the directory + virtual Future renameFile(std::string const& from, std::string const& to) = 0; + // Unlinks a file and then deletes it slowly by truncating the file repeatedly. // If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure. virtual Future incrementalDeleteFile( std::string filename, bool mustBeDurable ); diff --git a/fdbrpc/Net2FileSystem.cpp b/fdbrpc/Net2FileSystem.cpp index 92ffd3340a..ef4a07d4bf 100644 --- a/fdbrpc/Net2FileSystem.cpp +++ b/fdbrpc/Net2FileSystem.cpp @@ -115,6 +115,10 @@ Net2FileSystem::Net2FileSystem(double ioTimeout, std::string fileSystemPath) #endif } +Future Net2FileSystem::renameFile(const std::string &from, const std::string &to) { + return Void(); +} + void Net2FileSystem::stop() { Net2AsyncFile::stop(); } diff --git a/fdbrpc/Net2FileSystem.h b/fdbrpc/Net2FileSystem.h index 19cd223c5f..d6e269fd5c 100644 --- a/fdbrpc/Net2FileSystem.h +++ b/fdbrpc/Net2FileSystem.h @@ -20,6 +20,7 @@ #ifndef FLOW_NET2FILESYSTEM_H #define FLOW_NET2FILESYSTEM_H +#include #pragma once #include "fdbrpc/IAsyncFile.h" @@ -27,13 +28,15 @@ class Net2FileSystem : public IAsyncFileSystem { public: // Opens a file for asynchronous I/O - virtual Future< Reference > open( std::string filename, int64_t flags, int64_t mode ); + Future< Reference > open( std::string filename, int64_t flags, int64_t mode ) override; // Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure. - virtual Future< Void > deleteFile( std::string filename, bool mustBeDurable ); + Future< Void > deleteFile( std::string filename, bool mustBeDurable ) override; + + Future renameFile(std::string const& from, std::string const& to) override; // Returns the time of the last modification of the file. - virtual Future< std::time_t > lastWriteTime( std::string filename ); + Future< std::time_t > lastWriteTime( std::string filename ) override; //void init(); static void stop(); diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 5a0b610f58..d07ab2b7dd 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -20,6 +20,7 @@ #include +#include "IRandom.h" #include "fdbrpc/simulator.h" #include "flow/IThreadPool.h" #include "flow/Util.h" @@ -1847,6 +1848,11 @@ Future< Void > Sim2FileSystem::deleteFile( std::string filename, bool mustBeDura return Sim2::deleteFileImpl(&g_sim2, filename, mustBeDurable); } +Future Sim2FileSystem::renameFile(std::string const& from, std::string const& to) { + ::renameFile(from, to); + return delay(deterministicRandom()->random01()); +} + Future< std::time_t > Sim2FileSystem::lastWriteTime( std::string filename ) { // TODO: update this map upon file writes. static std::map fileWrites; diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index b45a8e8cfd..14f40a3d16 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -20,6 +20,7 @@ #ifndef FLOW_SIMULATOR_H #define FLOW_SIMULATOR_H +#include #pragma once #include "flow/flow.h" @@ -361,12 +362,14 @@ extern Future waitUntilDiskReady(Reference parameters, int class Sim2FileSystem : public IAsyncFileSystem { public: // Opens a file for asynchronous I/O - virtual Future< Reference > open( std::string filename, int64_t flags, int64_t mode ); + Future> open( std::string filename, int64_t flags, int64_t mode ) override; // Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure. - virtual Future< Void > deleteFile( std::string filename, bool mustBeDurable ); + Future deleteFile(std::string filename, bool mustBeDurable) override; - virtual Future< std::time_t > lastWriteTime( std::string filename ); + Future renameFile(std::string const& from, std::string const& to) override; + + Future< std::time_t > lastWriteTime( std::string filename ) override; Sim2FileSystem() {} From d2bd63d0cbf2f0c39b748f1f39395dbda4c31a18 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 2 Mar 2021 17:05:27 -0700 Subject: [PATCH 05/14] fix includes --- fdbrpc/sim2.actor.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index d07ab2b7dd..17ea397a06 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -20,7 +20,6 @@ #include -#include "IRandom.h" #include "fdbrpc/simulator.h" #include "flow/IThreadPool.h" #include "flow/Util.h" From a33bf684449581a217fd4e9b464df15520f1d843 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 2 Mar 2021 17:34:23 -0700 Subject: [PATCH 06/14] make sure rename code is actually called --- fdbrpc/Net2FileSystem.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbrpc/Net2FileSystem.cpp b/fdbrpc/Net2FileSystem.cpp index ef4a07d4bf..e38579339c 100644 --- a/fdbrpc/Net2FileSystem.cpp +++ b/fdbrpc/Net2FileSystem.cpp @@ -116,7 +116,7 @@ Net2FileSystem::Net2FileSystem(double ioTimeout, std::string fileSystemPath) } Future Net2FileSystem::renameFile(const std::string &from, const std::string &to) { - return Void(); + return Net2AsyncFile::renameFile(from, to); } void Net2FileSystem::stop() { From 7cb529ddb3b3742f089e7c6cbb9b6dcb5a3d3836 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 2 Mar 2021 17:34:30 -0700 Subject: [PATCH 07/14] address review comment --- fdbrpc/sim2.actor.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 17ea397a06..74f8faafbd 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -19,8 +19,10 @@ */ #include +#include #include "fdbrpc/simulator.h" +#include "flow.h" #include "flow/IThreadPool.h" #include "flow/Util.h" #include "fdbrpc/IAsyncFile.h" @@ -1847,9 +1849,15 @@ Future< Void > Sim2FileSystem::deleteFile( std::string filename, bool mustBeDura return Sim2::deleteFileImpl(&g_sim2, filename, mustBeDurable); } -Future Sim2FileSystem::renameFile(std::string const& from, std::string const& to) { +ACTOR Future renameFileImpl(std::string from, std::string to) { + wait(delay(0.5*deterministicRandom()->random01())); ::renameFile(from, to); - return delay(deterministicRandom()->random01()); + wait(delay(0.5*deterministicRandom()->random01())); + return Void(); +} + +Future Sim2FileSystem::renameFile(std::string const& from, std::string const& to) { + return renameFileImpl(from, to); } Future< std::time_t > Sim2FileSystem::lastWriteTime( std::string filename ) { From 7f758328c0b1b0eb124b2ef2d08fa9fc2e5e5f5c Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 3 Mar 2021 09:35:19 -0700 Subject: [PATCH 08/14] add renameFile to WinASIO --- fdbrpc/AsyncFileWinASIO.actor.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fdbrpc/AsyncFileWinASIO.actor.h b/fdbrpc/AsyncFileWinASIO.actor.h index 19b6ffcb9c..f8e3ddebca 100644 --- a/fdbrpc/AsyncFileWinASIO.actor.h +++ b/fdbrpc/AsyncFileWinASIO.actor.h @@ -128,6 +128,12 @@ public: return result.getFuture(); } + + static Future renameFile(std::string const& from, std::string const& to) { + ::renameFile(from, to); + return Void(); + } + virtual Future write( void const* data, int length, int64_t offset ) { /* FIXME From fd1a865737817b09439701cf4ba0834d646fef7d Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 3 Mar 2021 09:36:04 -0700 Subject: [PATCH 09/14] fix botched include --- fdbrpc/sim2.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 74f8faafbd..35521affb9 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -22,7 +22,7 @@ #include #include "fdbrpc/simulator.h" -#include "flow.h" +#include "flow/flow.h" #include "flow/IThreadPool.h" #include "flow/Util.h" #include "fdbrpc/IAsyncFile.h" From 37d9e975e9abcac6070e59476fe12e9c92871b09 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 3 Mar 2021 10:18:03 -0700 Subject: [PATCH 10/14] Fix multiple compiler warnings --- fdbclient/MultiVersionTransaction.actor.cpp | 2 +- fdbclient/MultiVersionTransaction.h | 1 - fdbclient/SystemData.cpp | 6 +++--- fdbserver/BackupProgress.actor.cpp | 2 +- fdbserver/BackupWorker.actor.cpp | 6 +++--- fdbserver/MasterProxyServer.actor.cpp | 2 +- 6 files changed, 9 insertions(+), 10 deletions(-) diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 6a931e9cd2..695510fb75 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -697,7 +697,7 @@ void MultiVersionTransaction::reset() { // MultiVersionDatabase MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, int threadIdx, std::string clusterFilePath, Reference db, bool openConnectors) - : dbState(new DatabaseState()), threadIdx(threadIdx) { + : dbState(new DatabaseState()) { dbState->db = db; dbState->dbVar->set(db); diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 4a3fb0a9a3..f17925eb73 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -369,7 +369,6 @@ private: }; const Reference dbState; - const int threadIdx; friend class MultiVersionTransaction; }; diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 3b8845d0aa..f150dc0a78 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -55,7 +55,7 @@ const Value keyServersValue( Standalone result, const std::vecto std::vector destTag; bool foundOldLocality = false; - for (const KeyValueRef kv : result) { + for (const KeyValueRef& kv : result) { UID uid = decodeServerTagKey(kv.key); if (std::find(src.begin(), src.end(), uid) != src.end()) { srcTag.push_back( decodeServerTagValue(kv.value) ); @@ -107,7 +107,7 @@ void decodeKeyServersValue( Standalone result, const ValueRef& v src.clear(); dest.clear(); - for (const KeyValueRef kv : result) { + for (const KeyValueRef& kv : result) { Tag tag = decodeServerTagValue(kv.value); if (std::find(srcTag.begin(), srcTag.end(), tag) != srcTag.end()) { src.push_back( decodeServerTagKey(kv.key) ); @@ -120,7 +120,7 @@ void decodeKeyServersValue( Standalone result, const ValueRef& v std::sort(dest.begin(), dest.end()); if(missingIsError && (src.size() != srcTag.size() || dest.size() != destTag.size())) { TraceEvent(SevError, "AttemptedToDecodeMissingTag"); - for (const KeyValueRef kv : result) { + for (const KeyValueRef& kv : result) { Tag tag = decodeServerTagValue(kv.value); UID serverID = decodeServerTagKey(kv.key); TraceEvent("TagUIDMap").detail("Tag", tag.toString()).detail("UID", serverID.toString()); diff --git a/fdbserver/BackupProgress.actor.cpp b/fdbserver/BackupProgress.actor.cpp index 3f1d564c16..f496ec0558 100644 --- a/fdbserver/BackupProgress.actor.cpp +++ b/fdbserver/BackupProgress.actor.cpp @@ -121,7 +121,7 @@ std::map, std::map> BackupProgr } } - for (const Tag tag : tags) { // tags without progress data + for (const Tag& tag : tags) { // tags without progress data tagVersions.insert({ tag, adjustedBeginVersion }); TraceEvent("BackupVersionRange", dbgid) .detail("OldEpoch", epoch) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index bf71892934..cf18776d48 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -504,7 +504,7 @@ ACTOR Future setBackupKeys(BackupData* self, std::map savedL state std::vector>> prevVersions; state std::vector versionConfigs; state std::vector>> allWorkersReady; - for (const auto [uid, version] : savedLogVersions) { + for (const auto& [uid, version] : savedLogVersions) { versionConfigs.emplace_back(uid); prevVersions.push_back(versionConfigs.back().latestBackupWorkerSavedVersion().get(tr)); allWorkersReady.push_back(versionConfigs.back().allWorkerStarted().get(tr)); @@ -569,7 +569,7 @@ ACTOR Future monitorBackupProgress(BackupData* self) { if (self->recruitedEpoch == self->oldestBackupEpoch) { // update update progress so far if previous epochs are done Version v = std::numeric_limits::max(); - for (const auto [tag, version] : tagVersions) { + for (const auto& [tag, version] : tagVersions) { v = std::min(v, version); } savedLogVersions.emplace(uid, v); @@ -781,7 +781,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int .detail("TagId", self->tag.id) .detail("File", file->getFileName()); } - for (const UID uid : activeUids) { + for (const UID& uid : activeUids) { self->backups[uid].lastSavedVersion = popVersion + 1; } diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index d3c69651df..2c42eb1283 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -2294,7 +2294,7 @@ ACTOR Future masterProxyServerCore( state KeyRange txnKeys = allKeys; Standalone UIDtoTagMap = commitData.txnStateStore->readRange( serverTagKeys ).get(); state std::map tag_uid; - for (const KeyValueRef kv : UIDtoTagMap) { + for (const KeyValueRef& kv : UIDtoTagMap) { tag_uid[decodeServerTagValue(kv.value)] = decodeServerTagKey(kv.key); } loop { From 08c98ec2a9c070dec864d591f2ccf8300fcd89e3 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 3 Mar 2021 14:25:00 -0700 Subject: [PATCH 11/14] Unit test for rename function --- fdbrpc/IAsyncFile.actor.cpp | 52 +++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/fdbrpc/IAsyncFile.actor.cpp b/fdbrpc/IAsyncFile.actor.cpp index 7a4854e89d..4fb7109f95 100644 --- a/fdbrpc/IAsyncFile.actor.cpp +++ b/fdbrpc/IAsyncFile.actor.cpp @@ -132,3 +132,55 @@ TEST_CASE("/fileio/incrementalDelete" ) { wait(IAsyncFileSystem::filesystem()->incrementalDeleteFile(filename, true)); return Void(); } + +TEST_CASE("/fileio/rename") { + // create a file + state int64_t fileSize = 100e6; + state std::string filename = "/tmp/__JUNK__." + deterministicRandom()->randomUniqueID().toString(); + state std::string renamedFile = "/tmp/__RENAMED_JUNK__." + deterministicRandom()->randomUniqueID().toString(); + state std::unique_ptr data(new char[4096]); + state std::unique_ptr readData(new char[4096]); + state Reference f = wait(IAsyncFileSystem::filesystem()->open( + filename, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, + 0644)); + ; + wait(f->sync()); + wait(f->truncate(fileSize)); + memset(data.get(), 0, 4096); + // write a random string at the beginning of the file which we can verify after rename + for (int i = 0; i < 16; ++i) { + data[i] = deterministicRandom()->randomAlphaNumeric(); + } + // write first and block + wait(f->write(data.get(), 4096, 0)); + wait(f->write(data.get(), 4096, fileSize - 4096)); + wait(f->sync()); + // close file + f.clear(); + wait(IAsyncFileSystem::filesystem()->renameFile(filename, renamedFile)); + Reference _f = wait(IAsyncFileSystem::filesystem()->open(renamedFile, IAsyncFile::OPEN_READONLY, 0)); + f = _f; + + // verify rename happened + bool renamedExists = false; + auto bName = basename(renamedFile); + auto files = platform::listFiles("/tmp/"); + for (const auto& file : files) { + if (file == bName) { + renamedExists = true; + } + ASSERT(file != filename); + } + ASSERT(renamedExists); + + // verify magic string at beginning of file + int length = wait(f->read(readData.get(), 4096, 0)); + ASSERT(length == 4096); + ASSERT(memcmp(readData.get(), data.get(), 4096) == 0); + // close the file + f.clear(); + + // clean up + wait(IAsyncFileSystem::filesystem()->deleteFile(renamedFile, true)); + return Void(); +} From e40e78da64a46b7abd4751b5b82d512cb8cfd7c5 Mon Sep 17 00:00:00 2001 From: Meng Xu <42559636+xumengpanda@users.noreply.github.com> Date: Wed, 3 Mar 2021 19:08:51 -0800 Subject: [PATCH 12/14] Add 6.3.11 release notes (#4424) --- .../release-notes/release-notes-630.rst | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/documentation/sphinx/source/release-notes/release-notes-630.rst b/documentation/sphinx/source/release-notes/release-notes-630.rst index 234e263767..7ca1a2e721 100644 --- a/documentation/sphinx/source/release-notes/release-notes-630.rst +++ b/documentation/sphinx/source/release-notes/release-notes-630.rst @@ -4,6 +4,28 @@ Release Notes ############# +6.3.11 +====== +* Added a hint field in the trace event when all replicas of some data are lost. `(PR #4209) `_ +* Rewrote SQLite injected fault handling. `(PR #4212) `_ +* Add a SevWarnAlways trace line to help debug a rare failure. `(PR #4214) `_ +* Use VFSAsyncFile::checkInjectedError to detect injected faults. `(PR #4253) `_ +* Build on Windows using VS 2019 + LLVM/Clang. `(PR #4258) `_ +* RateControl support in AFCCached to enable write op throttling. The feature is disabled by default. `(PR #4229) `_ +* Add knobs for prefix bloom filters and larger block cache for RocksDB. `(PR #4201) `_ +* Adding debug tools to FDB runtime image. `(PR #4247) `_ +* Fix bug in simulated coordinator selection. `(PR #4285) `_ +* Add option to prevent synchronous file deletes on reads for RocksDB. `(PR #4270) `_ +* Report warning when TLS verification fails. `(PR #4299) `_ +* Support multiple worker threads for each version of client that is loaded so that each cluster will be serviced by a client thread. `(PR #4269) `_ +* Reboot simulated process on io_timeout error. `(PR #4345) `_ +* Fix Snapshot backup test failure. `(PR #4372) `_ +* fdbcli: Output errors and warnings to stderr. `(PR #4332) `_ +* Do not generate machine id in locality field if it is set by the user. `(PR #4022) `_ +* Make the RocksDB init method idempotent. `(PR #4400) `_ +* Fix bugs turned up by _GLIBCXX_DEBUG. `(PR #4301) `_ +* Add New Unit and Integration Tests, and associated infrastructure. `(PR #4366) `_ + 6.3.10 ====== * Make fault tolerance metric calculation in HA clusters consistent with 6.2 branch. `(PR #4175) `_ @@ -90,7 +112,7 @@ Status * Removed fields ``worst_version_lag_storage_server`` and ``limiting_version_lag_storage_server`` from the ``cluster.qos`` section. The ``worst_data_lag_storage_server`` and ``limiting_data_lag_storage_server`` objects can be used instead. `(PR #3196) `_ * If a process is unable to flush trace logs to disk, the problem will now be reported via the output of ``status`` command inside ``fdbcli``. `(PR #2605) `_ `(PR #2820) `_ * When a configuration key is changed, it will always be included in ``status json`` output, even the value is reverted back to the default value. [6.3.5] `(PR #3610) `_ -* Added transactions.rejected_for_queued_too_long for bookkeeping the number of transactions rejected by commit proxy because its queuing time exceeds MVCC window. `(PR #4353) `_ +* Added transactions.rejected_for_queued_too_long for bookkeeping the number of transactions rejected by commit proxy because its queuing time exceeds MVCC window.[6.3.11] `(PR #4353) `_ Bindings -------- @@ -142,6 +164,7 @@ Fixes from previous versions * The 6.3.5 patch release includes all fixes from the patch releases 6.2.24 and 6.2.25. :doc:`(6.2 Release Notes) ` * The 6.3.9 patch release includes all fixes from the patch releases 6.2.26. :doc:`(6.2 Release Notes) ` * The 6.3.10 patch release includes all fixes from the patch releases 6.2.27-6.2.29 :doc:`(6.2 Release Notes) ` +* The 6.3.11 patch release includes all fixes from the patch releases 6.2.30-6.2.32 :doc:`(6.2 Release Notes) ` Fixes only impacting 6.3.0+ --------------------------- From 59181245c145795dab0068ff6d8ecc3eff922c70 Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Wed, 3 Mar 2021 23:33:48 -0800 Subject: [PATCH 13/14] Change SSVersionDiffLarge event log level to warning --- fdbserver/DataDistribution.actor.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index c4a4c8059e..8e265cec2f 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -2687,7 +2687,9 @@ ACTOR Future updateServerMetrics( TCServerInfo *server ) { } } else if ( server->serverMetrics.get().versionLag > SERVER_KNOBS->DD_SS_FAILURE_VERSIONLAG ) { if (server->ssVersionTooFarBehind.get() == false) { - TraceEvent("SSVersionDiffLarge", server->collection->distributorId).detail("ServerId", server->id.toString()).detail("VersionLag", server->serverMetrics.get().versionLag); + TraceEvent(SevWarn, "SSVersionDiffLarge", server->collection->distributorId) + .detail("ServerId", server->id.toString()) + .detail("VersionLag", server->serverMetrics.get().versionLag); server->ssVersionTooFarBehind.set(true); server->collection->addLaggingStorageServer(server->lastKnownInterface.locality.zoneId().get()); } From 9645f489e6b7f1d247550ddeb2856f826f3c7811 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Mon, 8 Mar 2021 15:20:50 -0800 Subject: [PATCH 14/14] Fix base trace event name inconsistency --- fdbserver/DataDistribution.actor.cpp | 2 +- fdbserver/QuietDatabase.actor.cpp | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 8e265cec2f..0e4e2215ba 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -2124,7 +2124,7 @@ struct DDTeamCollection : ReferenceCounted { .detail("Primary", primary) .detail("AddedTeams", addedTeams) .detail("TeamsToBuild", teamsToBuild) - .detail("CurrentTeams", teams.size()) + .detail("CurrentServerTeams", teams.size()) .detail("DesiredTeams", desiredTeams) .detail("MaxTeams", maxTeams) .detail("StorageTeamSize", configuration.storageTeamSize) diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 6324bb8627..460c460862 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -365,7 +365,8 @@ ACTOR Future getTeamCollectionValid(Database cx, WorkerInterface dataDistr TraceEvent("GetTeamCollectionValid").detail("Stage", "GotString"); - state int64_t currentTeams = boost::lexical_cast(teamCollectionInfoMessage.getValue("CurrentTeams")); + state int64_t currentTeams = + boost::lexical_cast(teamCollectionInfoMessage.getValue("CurrentServerTeams")); state int64_t desiredTeams = boost::lexical_cast(teamCollectionInfoMessage.getValue("DesiredTeams")); state int64_t maxTeams = boost::lexical_cast(teamCollectionInfoMessage.getValue("MaxTeams")); state int64_t currentMachineTeams = boost::lexical_cast(teamCollectionInfoMessage.getValue("CurrentMachineTeams")); @@ -411,7 +412,7 @@ ACTOR Future getTeamCollectionValid(Database cx, WorkerInterface dataDistr // TODO: Remove the constraint SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER == 3 to ensure that // the minimun team number per server (and per machine) is always > 0 for any number of replicas TraceEvent("GetTeamCollectionValid") - .detail("CurrentTeams", currentTeams) + .detail("CurrentServerTeams", currentTeams) .detail("DesiredTeams", desiredTeams) .detail("MaxTeams", maxTeams) .detail("CurrentHealthyMachineTeams", healthyMachineTeams)