From b92b930b9353d8c628f3b78a85ff82a056dbd738 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 8 Jun 2021 17:23:02 -0700 Subject: [PATCH 01/30] Add exportLibrary for exporting external clients from jar After this change, users would be able to add all fdb shared libraries they need in the jar itself with something like `jar uf`. --- .../main/com/apple/foundationdb/JNIUtil.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/bindings/java/src/main/com/apple/foundationdb/JNIUtil.java b/bindings/java/src/main/com/apple/foundationdb/JNIUtil.java index 99c2f8a322..a3811bd029 100644 --- a/bindings/java/src/main/com/apple/foundationdb/JNIUtil.java +++ b/bindings/java/src/main/com/apple/foundationdb/JNIUtil.java @@ -30,7 +30,7 @@ import java.io.OutputStream; * Utility for loading a dynamic library from the classpath. * */ -class JNIUtil { +public class JNIUtil { private static final String SEPARATOR = "/"; private static final String LOADABLE_PREFIX = "FDB_LIBRARY_PATH_"; private static final String TEMPFILE_PREFIX = "fdbjni"; @@ -92,7 +92,7 @@ class JNIUtil { File exported; try { - exported = exportResource(path); + exported = exportResource(path, libName); } catch (IOException e) { throw new UnsatisfiedLinkError(e.getMessage()); @@ -109,6 +109,19 @@ class JNIUtil { } } + /** + * Export a library from classpath resources to a temporary file. + * + * @param libName the name of the library to attempt to export. This name should be + * undecorated with file extensions and, in the case of *nix, "lib" prefixes. + * @return the exported temporary file + */ + public static File exportLibrary(String libName) throws IOException { + OS os = getRunningOS(); + String path = getPath(os, libName); + return exportResource(path, libName); + } + /** * Gets a relative path for a library. The path will be of the form: * {@code {os}/{arch}/{name}}. @@ -127,20 +140,21 @@ class JNIUtil { * Export a resource from the classpath to a temporary file. * * @param path the relative path of the file to load from the classpath + * @param name an optional descriptive name to include in the temporary file's path * * @return the absolute path to the exported file * @throws IOException */ - private static File exportResource(String path) throws IOException { + private static File exportResource(String path, String name) throws IOException { InputStream resource = JNIUtil.class.getResourceAsStream(path); if(resource == null) throw new IllegalStateException("Embedded library jar:" + path + " not found"); - File f = saveStreamAsTempFile(resource); + File f = saveStreamAsTempFile(resource, name); return f; } - private static File saveStreamAsTempFile(InputStream resource) throws IOException { - File f = File.createTempFile(TEMPFILE_PREFIX, TEMPFILE_SUFFIX); + private static File saveStreamAsTempFile(InputStream resource, String name) throws IOException { + File f = File.createTempFile(name.length() > 0 ? name : TEMPFILE_PREFIX, TEMPFILE_SUFFIX); FileOutputStream outputStream = new FileOutputStream(f); copyStream(resource, outputStream); outputStream.flush(); From 399c2c96f0affa1cf7822b211f0b9d2f2dbcc020 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Wed, 9 Jun 2021 11:37:14 -0700 Subject: [PATCH 02/30] Remove unnecessary std::string copies from flow --- fdbclient/DatabaseConfiguration.cpp | 8 ++++---- flow/FileTraceLogWriter.cpp | 20 ++++++++++---------- flow/FileTraceLogWriter.h | 16 ++++++++-------- flow/Histogram.cpp | 2 +- flow/Histogram.h | 8 ++++---- flow/ITrace.h | 4 ++-- flow/Platform.actor.cpp | 4 ++-- flow/Platform.h | 4 ++-- flow/SystemMonitor.cpp | 2 +- flow/SystemMonitor.h | 14 ++++++++------ flow/Trace.cpp | 14 +++++++------- flow/Trace.h | 8 ++++---- flow/flow.cpp | 6 +++--- flow/flow.h | 6 +++--- flow/network.cpp | 2 +- flow/network.h | 2 +- 16 files changed, 61 insertions(+), 59 deletions(-) diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index a2cfc435b3..8bb7fc90f0 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -533,11 +533,11 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) { return true; // All of the above options currently require recovery to take effect } -inline static KeyValueRef* lower_bound(VectorRef& config, KeyRef const& key) { +static KeyValueRef* lower_bound(VectorRef& config, KeyRef const& key) { return std::lower_bound(config.begin(), config.end(), KeyValueRef(key, ValueRef()), KeyValueRef::OrderByKey()); } -inline static KeyValueRef const* lower_bound(VectorRef const& config, KeyRef const& key) { - return lower_bound(const_cast&>(config), key); +static KeyValueRef const* lower_bound(VectorRef const& config, KeyRef const& key) { + return std::lower_bound(config.begin(), config.end(), KeyValueRef(key, ValueRef()), KeyValueRef::OrderByKey()); } void DatabaseConfiguration::applyMutation(MutationRef m) { @@ -649,7 +649,7 @@ void DatabaseConfiguration::fromKeyValues(Standalone> raw } bool DatabaseConfiguration::isOverridden(std::string key) const { - key = configKeysPrefix.toString() + key; + key = configKeysPrefix.toString() + std::move(key); if (mutableConfiguration.present()) { return mutableConfiguration.get().find(key) != mutableConfiguration.get().end(); diff --git a/flow/FileTraceLogWriter.cpp b/flow/FileTraceLogWriter.cpp index cc402534f3..e418c2081d 100644 --- a/flow/FileTraceLogWriter.cpp +++ b/flow/FileTraceLogWriter.cpp @@ -49,7 +49,7 @@ struct IssuesListImpl { IssuesListImpl() {} - void addIssue(std::string issue) { + void addIssue(std::string const& issue) { MutexHolder h(mutex); issues.insert(issue); } @@ -61,7 +61,7 @@ struct IssuesListImpl { } } - void resolveIssue(std::string issue) { + void resolveIssue(std::string const& issue) { MutexHolder h(mutex); issues.erase(issue); } @@ -73,23 +73,23 @@ private: IssuesList::IssuesList() : impl(std::make_unique()) {} IssuesList::~IssuesList() = default; -void IssuesList::addIssue(std::string issue) { +void IssuesList::addIssue(std::string const& issue) { impl->addIssue(issue); } void IssuesList::retrieveIssues(std::set& out) const { impl->retrieveIssues(out); } -void IssuesList::resolveIssue(std::string issue) { +void IssuesList::resolveIssue(std::string const& issue) { impl->resolveIssue(issue); } -FileTraceLogWriter::FileTraceLogWriter(std::string directory, - std::string processName, - std::string basename, - std::string extension, +FileTraceLogWriter::FileTraceLogWriter(std::string const& directory, + std::string const& processName, + std::string const& basename, + std::string const& extension, uint64_t maxLogsSize, - std::function onError, - Reference issues) + std::function const& onError, + Reference const& issues) : directory(directory), processName(processName), basename(basename), extension(extension), maxLogsSize(maxLogsSize), traceFileFD(-1), index(0), onError(onError), issues(issues) {} diff --git a/flow/FileTraceLogWriter.h b/flow/FileTraceLogWriter.h index e1dbbb2060..bc6ed8c4eb 100644 --- a/flow/FileTraceLogWriter.h +++ b/flow/FileTraceLogWriter.h @@ -32,11 +32,11 @@ struct IssuesListImpl; struct IssuesList final : ITraceLogIssuesReporter, ThreadSafeReferenceCounted { IssuesList(); ~IssuesList() override; - void addIssue(std::string issue) override; + void addIssue(std::string const& issue) override; void retrieveIssues(std::set& out) const override; - void resolveIssue(std::string issue) override; + void resolveIssue(std::string const& issue) override; void addref() override { ThreadSafeReferenceCounted::addref(); } void delref() override { ThreadSafeReferenceCounted::delref(); } @@ -62,13 +62,13 @@ private: void write(const char* str, size_t size); public: - FileTraceLogWriter(std::string directory, - std::string processName, - std::string basename, - std::string extension, + FileTraceLogWriter(std::string const& directory, + std::string const& processName, + std::string const& basename, + std::string const& extension, uint64_t maxLogsSize, - std::function onError, - Reference issues); + std::function const& onError, + Reference const& issues); void addref() override; void delref() override; diff --git a/flow/Histogram.cpp b/flow/Histogram.cpp index d229511bfb..595dcc8fef 100644 --- a/flow/Histogram.cpp +++ b/flow/Histogram.cpp @@ -77,7 +77,7 @@ void HistogramRegistry::unregisterHistogram(Histogram* h) { ASSERT(count == 1); } -Histogram* HistogramRegistry::lookupHistogram(std::string name) { +Histogram* HistogramRegistry::lookupHistogram(std::string const& name) { auto h = histograms.find(name); if (h == histograms.end()) { return nullptr; diff --git a/flow/Histogram.h b/flow/Histogram.h index f730996fde..de9b6ad4d2 100644 --- a/flow/Histogram.h +++ b/flow/Histogram.h @@ -39,7 +39,7 @@ class HistogramRegistry { public: void registerHistogram(Histogram* h); void unregisterHistogram(Histogram* h); - Histogram* lookupHistogram(std::string name); + Histogram* lookupHistogram(std::string const& name); void logReport(); private: @@ -63,7 +63,7 @@ public: private: static const std::unordered_map UnitToStringMapper; - Histogram(std::string group, std::string op, Unit unit, HistogramRegistry& registry) + Histogram(std::string const& group, std::string const& op, Unit unit, HistogramRegistry& registry) : group(group), op(op), unit(unit), registry(registry), ReferenceCounted() { ASSERT(UnitToStringMapper.find(unit) != UnitToStringMapper.end()); @@ -71,7 +71,7 @@ private: clear(); } - static std::string generateName(std::string group, std::string op) { return group + ":" + op; } + static std::string generateName(std::string const& group, std::string const& op) { return group + ":" + op; } public: ~Histogram() { registry.unregisterHistogram(this); } @@ -125,7 +125,7 @@ public: } void writeToLog(); - std::string name() { return generateName(this->group, this->op); } + std::string name() const { return generateName(this->group, this->op); } std::string const group; std::string const op; diff --git a/flow/ITrace.h b/flow/ITrace.h index 9fcd7ac390..3af788ca85 100644 --- a/flow/ITrace.h +++ b/flow/ITrace.h @@ -51,8 +51,8 @@ struct ITraceLogFormatter { struct ITraceLogIssuesReporter { virtual ~ITraceLogIssuesReporter(); - virtual void addIssue(std::string issue) = 0; - virtual void resolveIssue(std::string issue) = 0; + virtual void addIssue(std::string const& issue) = 0; + virtual void resolveIssue(std::string const& issue) = 0; virtual void retrieveIssues(std::set& out) const = 0; diff --git a/flow/Platform.actor.cpp b/flow/Platform.actor.cpp index 42d8decccc..eb1a8b3392 100644 --- a/flow/Platform.actor.cpp +++ b/flow/Platform.actor.cpp @@ -1482,7 +1482,7 @@ void initPdhStrings(SystemStatisticsState* state, std::string dataFolder) { } #endif -SystemStatistics getSystemStatistics(std::string dataFolder, +SystemStatistics getSystemStatistics(std::string const& dataFolder, const IPAddress* ip, SystemStatisticsState** statState, bool logDetails) { @@ -2640,7 +2640,7 @@ Future> listDirectoriesAsync(std::string const& directory) { return findFiles(directory, "", true /* directoryOnly */, true); } -void findFilesRecursively(std::string path, std::vector& out) { +void findFilesRecursively(std::string const& path, std::vector& out) { // Add files to output, prefixing path std::vector files = platform::listFiles(path); for (auto const& f : files) diff --git a/flow/Platform.h b/flow/Platform.h index 74c9395c53..cb2e337344 100644 --- a/flow/Platform.h +++ b/flow/Platform.h @@ -232,7 +232,7 @@ struct SystemStatisticsState; struct IPAddress; -SystemStatistics getSystemStatistics(std::string dataFolder, +SystemStatistics getSystemStatistics(std::string const& dataFolder, const IPAddress* ip, SystemStatisticsState** statState, bool logDetails); @@ -369,7 +369,7 @@ std::vector listFiles(std::string const& directory, std::string con // returns directory names relative to directory std::vector listDirectories(std::string const& directory); -void findFilesRecursively(std::string path, std::vector& out); +void findFilesRecursively(std::string const& path, std::vector& out); // Tag the given file as "temporary", i.e. not really needing commits to disk void makeTemporary(const char* filename); diff --git a/flow/SystemMonitor.cpp b/flow/SystemMonitor.cpp index 37dadf9dc1..d6da40b68b 100644 --- a/flow/SystemMonitor.cpp +++ b/flow/SystemMonitor.cpp @@ -61,7 +61,7 @@ SystemStatistics getSystemStatistics() { .detail("ApproximateUnusedMemory" #size, FastAllocator::getApproximateMemoryUnused()) \ .detail("ActiveThreads" #size, FastAllocator::getActiveThreads()) -SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics) { +SystemStatistics customSystemMonitor(std::string const& eventName, StatisticsState* statState, bool machineMetrics) { const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress(); SystemStatistics currentStats = getSystemStatistics( machineState.folder.present() ? machineState.folder.get() : "", &ipAddr, &statState->systemState, true); diff --git a/flow/SystemMonitor.h b/flow/SystemMonitor.h index 2d2470bd48..c43ca35885 100644 --- a/flow/SystemMonitor.h +++ b/flow/SystemMonitor.h @@ -36,11 +36,11 @@ struct SystemMonitorMachineState { SystemMonitorMachineState() : monitorStartTime(0) {} explicit SystemMonitorMachineState(const IPAddress& ip) : ip(ip), monitorStartTime(0) {} - SystemMonitorMachineState(std::string folder, - Optional> dcId, - Optional> zoneId, - Optional> machineId, - const IPAddress& ip) + SystemMonitorMachineState(std::string const& folder, + Optional> const& dcId, + Optional> const& zoneId, + Optional> const& machineId, + IPAddress const& ip) : folder(folder), dcId(dcId), zoneId(zoneId), machineId(machineId), ip(ip), monitorStartTime(0) {} }; @@ -148,7 +148,9 @@ struct StatisticsState { }; void systemMonitor(); -SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics = false); +SystemStatistics customSystemMonitor(std::string const& eventName, + StatisticsState* statState, + bool machineMetrics = false); SystemStatistics getSystemStatistics(); #endif /* FLOW_SYSTEM_MONITOR_H */ diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 06c7f33ec1..84fe3f120b 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -503,7 +503,7 @@ public: } } - void addRole(std::string role) { + void addRole(std::string const& role) { MutexHolder holder(mutex); RoleInfo& r = mutateRoleInfo(); @@ -511,7 +511,7 @@ public: r.refreshRolesString(); } - void removeRole(std::string role) { + void removeRole(std::string const& role) { MutexHolder holder(mutex); RoleInfo& r = mutateRoleInfo(); @@ -557,13 +557,13 @@ NetworkAddress getAddressIndex() { } // This does not check for simulation, and as such is not safe for external callers -void clearPrefix_internal(std::map& data, std::string prefix) { +void clearPrefix_internal(std::map& data, std::string const& prefix) { auto first = data.lower_bound(prefix); auto last = data.lower_bound(strinc(prefix).toString()); data.erase(first, last); } -void LatestEventCache::clear(std::string prefix) { +void LatestEventCache::clear(std::string const& prefix) { clearPrefix_internal(latest[getAddressIndex()], prefix); } @@ -575,7 +575,7 @@ void LatestEventCache::set(std::string tag, const TraceEventFields& contents) { latest[getAddressIndex()][tag] = contents; } -TraceEventFields LatestEventCache::get(std::string tag) { +TraceEventFields LatestEventCache::get(std::string const& tag) { return latest[getAddressIndex()][tag]; } @@ -757,11 +757,11 @@ bool traceFileIsOpen() { return g_traceLog.isOpen(); } -void addTraceRole(std::string role) { +void addTraceRole(std::string const& role) { g_traceLog.addRole(role); } -void removeTraceRole(std::string role) { +void removeTraceRole(std::string const& role) { g_traceLog.removeRole(role); } diff --git a/flow/Trace.h b/flow/Trace.h index 4c2eadb215..5fdfb96abd 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -519,11 +519,11 @@ struct TraceInterval { struct LatestEventCache { public: void set(std::string tag, const TraceEventFields& fields); - TraceEventFields get(std::string tag); + TraceEventFields get(std::string const& tag); std::vector getAll(); std::vector getAllUnsafe(); - void clear(std::string prefix); + void clear(std::string const& prefix); void clear(); // Latest error tracking only tracks errors when called from the main thread. Other errors are silently ignored. @@ -577,8 +577,8 @@ bool selectTraceClockSource(std::string source); // Returns true iff source is recognized. bool validateTraceClockSource(std::string source); -void addTraceRole(std::string role); -void removeTraceRole(std::string role); +void addTraceRole(std::string const& role); +void removeTraceRole(std::string const& role); void retrieveTraceLogIssues(std::set& out); void setTraceLogGroup(const std::string& role); template diff --git a/flow/flow.cpp b/flow/flow.cpp index 74f0b334f5..6aed19fdfe 100644 --- a/flow/flow.cpp +++ b/flow/flow.cpp @@ -93,7 +93,7 @@ std::string UID::shortString() const { void detectFailureAfter(int const& address, double const& delay); -Optional parse_with_suffix(std::string toparse, std::string default_unit) { +Optional parse_with_suffix(std::string const& toparse, std::string const& default_unit) { char* endptr; uint64_t ret = strtoull(toparse.c_str(), &endptr, 10); @@ -144,7 +144,7 @@ Optional parse_with_suffix(std::string toparse, std::string default_un // m - minutes // h - hours // d - days -Optional parseDuration(std::string str, std::string defaultUnit) { +Optional parseDuration(std::string const& str, std::string const& defaultUnit) { char* endptr; uint64_t ret = strtoull(str.c_str(), &endptr, 10); @@ -284,7 +284,7 @@ std::vector P_BUGGIFIED_SECTION_FIRES{ .25, .25 }; double P_EXPENSIVE_VALIDATION = .05; -int getSBVar(std::string file, int line, BuggifyType type) { +int getSBVar(std::string const& file, int line, BuggifyType type) { if (!buggifyActivated[int(type)]) return 0; diff --git a/flow/flow.h b/flow/flow.h index e03d598d9b..baf8be9dec 100644 --- a/flow/flow.h +++ b/flow/flow.h @@ -72,7 +72,7 @@ extern double P_EXPENSIVE_VALIDATION; enum class BuggifyType : uint8_t { General = 0, Client }; bool isBuggifyEnabled(BuggifyType type); void clearBuggifySections(BuggifyType type); -int getSBVar(std::string file, int line, BuggifyType); +int getSBVar(std::string const& file, int line, BuggifyType); void enableBuggify(bool enabled, BuggifyType type); // Currently controls buggification and (randomized) expensive validation bool validationIsEnabled(BuggifyType type); @@ -83,8 +83,8 @@ bool validationIsEnabled(BuggifyType type); #define EXPENSIVE_VALIDATION \ (validationIsEnabled(BuggifyType::General) && deterministicRandom()->random01() < P_EXPENSIVE_VALIDATION) -extern Optional parse_with_suffix(std::string toparse, std::string default_unit = ""); -extern Optional parseDuration(std::string str, std::string defaultUnit = ""); +extern Optional parse_with_suffix(std::string const& toparse, std::string const& default_unit = ""); +extern Optional parseDuration(std::string const& str, std::string const& defaultUnit = ""); extern std::string format(const char* form, ...); // On success, returns the number of characters written. On failure, returns a negative number. diff --git a/flow/network.cpp b/flow/network.cpp index 9dc71cdc7d..f50bbc6060 100644 --- a/flow/network.cpp +++ b/flow/network.cpp @@ -46,7 +46,7 @@ std::string IPAddress::toString() const { } } -Optional IPAddress::parse(std::string str) { +Optional IPAddress::parse(std::string const& str) { try { auto addr = boost::asio::ip::address::from_string(str); return addr.is_v6() ? IPAddress(addr.to_v6().to_bytes()) : IPAddress(addr.to_v4().to_ulong()); diff --git a/flow/network.h b/flow/network.h index 1eeb5bdc2d..8760ff484f 100644 --- a/flow/network.h +++ b/flow/network.h @@ -150,7 +150,7 @@ public: const IPAddressStore& toV6() const { return std::get(addr); } std::string toString() const; - static Optional parse(std::string str); + static Optional parse(std::string const& str); bool operator==(const IPAddress& addr) const; bool operator!=(const IPAddress& addr) const; From 04694a6f706f36d846f71db24954f8f51cc08111 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 9 Jun 2021 16:21:50 -0700 Subject: [PATCH 03/30] Respect JOSHUA_SEED if set Closes #4913 --- contrib/TestHarness/Program.cs.cmake | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/contrib/TestHarness/Program.cs.cmake b/contrib/TestHarness/Program.cs.cmake index 075a2758d6..767af1fa5f 100644 --- a/contrib/TestHarness/Program.cs.cmake +++ b/contrib/TestHarness/Program.cs.cmake @@ -51,9 +51,10 @@ namespace SummarizeTest bool traceToStdout = false; try { + string joshuaSeed = System.Environment.GetEnvironmentVariable("JOSHUA_SEED"); byte[] seed = new byte[4]; new System.Security.Cryptography.RNGCryptoServiceProvider().GetBytes(seed); - random = new Random(new BinaryReader(new MemoryStream(seed)).ReadInt32()); + random = new Random(joshuaSeed != null ? Int32.Parse(joshuaSeed) : new BinaryReader(new MemoryStream(seed)).ReadInt32()); if (args.Length < 1) return UsageMessage(); From 948f4993b968661c794e93e131f9fd9e8cdade79 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 9 Jun 2021 17:07:45 -0700 Subject: [PATCH 04/30] Convert to 32 bit int --- contrib/TestHarness/Program.cs.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/TestHarness/Program.cs.cmake b/contrib/TestHarness/Program.cs.cmake index 767af1fa5f..c9edc4f29b 100644 --- a/contrib/TestHarness/Program.cs.cmake +++ b/contrib/TestHarness/Program.cs.cmake @@ -54,7 +54,7 @@ namespace SummarizeTest string joshuaSeed = System.Environment.GetEnvironmentVariable("JOSHUA_SEED"); byte[] seed = new byte[4]; new System.Security.Cryptography.RNGCryptoServiceProvider().GetBytes(seed); - random = new Random(joshuaSeed != null ? Int32.Parse(joshuaSeed) : new BinaryReader(new MemoryStream(seed)).ReadInt32()); + random = new Random(joshuaSeed != null ? Convert.ToInt32(Int64.Parse(joshuaSeed) % 2147483648) : new BinaryReader(new MemoryStream(seed)).ReadInt32()); if (args.Length < 1) return UsageMessage(); From 3524ceeb09d6d4ff63fb9ff100693b940d6cf46b Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 10 Jun 2021 15:24:42 -0700 Subject: [PATCH 05/30] Fix some documentation rendering issues around the estimated range size bytes and similar functions. --- documentation/sphinx/source/api-c.rst | 4 ++++ documentation/sphinx/source/api-python.rst | 11 ++++------- documentation/sphinx/source/api-ruby.rst | 3 ++- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/documentation/sphinx/source/api-c.rst b/documentation/sphinx/source/api-c.rst index 0acafea8ba..5d746eff7a 100644 --- a/documentation/sphinx/source/api-c.rst +++ b/documentation/sphinx/source/api-c.rst @@ -541,13 +541,17 @@ Applications must provide error handling and an appropriate retry loop around th |snapshot| .. function:: FDBFuture* fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length) + Returns an estimated byte size of the key range. + .. note:: The estimated size is calculated based on the sampling done by FDB server. The sampling algorithm works roughly in this way: the larger the key-value pair is, the more likely it would be sampled and the more accurate its sampled size would be. And due to that reason it is recommended to use this API to query against large ranges for accuracy considerations. For a rough reference, if the returned size is larger than 3MB, one can consider the size to be accurate. |future-return0| the estimated size of the key range given. |future-return1| call :func:`fdb_future_get_int64()` to extract the size, |future-return2| .. function:: FDBFuture* fdb_transaction_get_range_split_points( FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length, int64_t chunk_size) + Returns a list of keys that can split the given range into (roughly) equally sized chunks based on ``chunk_size``. + .. note:: The returned split points contain the start key and end key of the given range |future-return0| the list of split points. |future-return1| call :func:`fdb_future_get_key_array()` to extract the array, |future-return2| diff --git a/documentation/sphinx/source/api-python.rst b/documentation/sphinx/source/api-python.rst index 0cd1e8f078..2897be0153 100644 --- a/documentation/sphinx/source/api-python.rst +++ b/documentation/sphinx/source/api-python.rst @@ -800,6 +800,7 @@ Transaction misc functions .. method:: Transaction.get_estimated_range_size_bytes(begin_key, end_key) Gets the estimated byte size of the given key range. Returns a :class:`FutureInt64`. + .. note:: The estimated size is calculated based on the sampling done by FDB server. The sampling algorithm works roughly in this way: the larger the key-value pair is, the more likely it would be sampled and the more accurate its sampled size would be. And due to that reason it is recommended to use this API to query against large ranges for accuracy considerations. For a rough reference, if the returned size is larger than 3MB, one can consider the size to be accurate. .. method:: Transaction.get_range_split_points(self, begin_key, end_key, chunk_size) @@ -807,15 +808,11 @@ Transaction misc functions Gets a list of keys that can split the given range into (roughly) equally sized chunks based on ``chunk_size``. Returns a :class:`FutureKeyArray`. .. note:: The returned split points contain the start key and end key of the given range - -.. _api-python-transaction-options: - -Transaction misc functions --------------------------- - .. method:: Transaction.get_approximate_size() - |transaction-get-approximate-size-blurb|. Returns a :class:`FutureInt64`. + |transaction-get-approximate-size-blurb| Returns a :class:`FutureInt64`. + +.. _api-python-transaction-options: Transaction options ------------------- diff --git a/documentation/sphinx/source/api-ruby.rst b/documentation/sphinx/source/api-ruby.rst index ddb721a0d0..4d92b5ea0e 100644 --- a/documentation/sphinx/source/api-ruby.rst +++ b/documentation/sphinx/source/api-ruby.rst @@ -744,6 +744,7 @@ Transaction misc functions .. method:: Transaction.get_estimated_range_size_bytes(begin_key, end_key) -> Int64Future Gets the estimated byte size of the given key range. Returns a :class:`Int64Future`. + .. note:: The estimated size is calculated based on the sampling done by FDB server. The sampling algorithm works roughly in this way: the larger the key-value pair is, the more likely it would be sampled and the more accurate its sampled size would be. And due to that reason it is recommended to use this API to query against large ranges for accuracy considerations. For a rough reference, if the returned size is larger than 3MB, one can consider the size to be accurate. .. method:: Transaction.get_range_split_points(begin_key, end_key, chunk_size) -> FutureKeyArray @@ -753,7 +754,7 @@ Transaction misc functions .. method:: Transaction.get_approximate_size() -> Int64Future - |transaction-get-approximate-size-blurb|. Returns a :class:`Int64Future`. + |transaction-get-approximate-size-blurb| Returns a :class:`Int64Future`. Transaction options ------------------- From d46fccc30f391ecba0685024ac7b47957b297a71 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 11 Jun 2021 22:58:05 +0000 Subject: [PATCH 06/30] Revert "Revert "Properly set simulation test for perpetual storage wiggle and bug fixing"" This reverts commit ad576e8c2022b1e9ff92ce3adbf5086e317b9353. --- fdbserver/DataDistribution.actor.cpp | 197 ++++++++++++------ fdbserver/DataDistributionQueue.actor.cpp | 2 +- fdbserver/Knobs.cpp | 2 +- fdbserver/tester.actor.cpp | 18 ++ .../workloads/ConsistencyCheck.actor.cpp | 1 + fdbserver/workloads/workloads.actor.h | 10 +- 6 files changed, 162 insertions(+), 68 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 78b846139d..e1f95aaf36 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -52,6 +52,7 @@ class TCMachineTeamInfo; ACTOR Future checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self); ACTOR Future removeWrongStoreType(DDTeamCollection* self); ACTOR Future waitForAllDataRemoved(Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams); +bool _exclusionSafetyCheck(vector& excludeServerIDs, DDTeamCollection* teamCollection); struct TCServerInfo : public ReferenceCounted { UID id; @@ -375,14 +376,16 @@ struct ServerStatus { LocalityData locality; ServerStatus() : isWiggling(false), isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {} - ServerStatus(bool isFailed, bool isUndesired, LocalityData const& locality) + ServerStatus(bool isFailed, bool isUndesired, bool isWiggling, LocalityData const& locality) : isFailed(isFailed), isUndesired(isUndesired), locality(locality), isWrongConfiguration(false), - initialized(true), isWiggling(false) {} + initialized(true), isWiggling(isWiggling) {} bool isUnhealthy() const { return isFailed || isUndesired; } - const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : "Healthy"; } + const char* toString() const { + return isFailed ? "Failed" : isUndesired ? "Undesired" : isWiggling ? "Wiggling" : "Healthy"; + } bool operator==(ServerStatus const& r) const { - return isFailed == r.isFailed && isUndesired == r.isUndesired && + return isFailed == r.isFailed && isUndesired == r.isUndesired && isWiggling == r.isWiggling && isWrongConfiguration == r.isWrongConfiguration && locality == r.locality && initialized == r.initialized; } bool operator!=(ServerStatus const& r) const { return !(*this == r); } @@ -621,6 +624,7 @@ struct DDTeamCollection : ReferenceCounted { std::map priority_teams; std::map> server_info; std::map>> pid2server_info; // some process may serve as multiple storage servers + std::vector wiggle_addresses; // collection of wiggling servers' address std::map> tss_info_by_pair; std::map> server_and_tss_info; // TODO could replace this with an efficient way to do a read-only concatenation of 2 data structures? std::map lagging_zones; // zone to number of storage servers lagging @@ -2826,6 +2830,7 @@ struct DDTeamCollection : ReferenceCounted { this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) { continue; // don't overwrite the value set by actor trackExcludedServer } + this->wiggle_addresses.push_back(addr); this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING); moveFutures.push_back( waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this)); @@ -2837,19 +2842,19 @@ struct DDTeamCollection : ReferenceCounted { return moveFutures; } - // Include storage servers held on process of which the Process Id is “pid” by setting their status from `WIGGLING` + // Include wiggled storage servers by setting their status from `WIGGLING` // to `NONE`. The storage recruiter will recruit them as new storage servers - void includeStorageServersForWiggle(const Value& pid) { + void includeStorageServersForWiggle() { bool included = false; - for (auto& info : this->pid2server_info[pid]) { - AddressExclusion addr(info->lastKnownInterface.address().ip); - if (!this->excludedServers.count(addr) || - this->excludedServers.get(addr) != DDTeamCollection::Status::WIGGLING) { + for (auto& address : this->wiggle_addresses) { + if (!this->excludedServers.count(address) || + this->excludedServers.get(address) != DDTeamCollection::Status::WIGGLING) { continue; } included = true; - this->excludedServers.set(addr, DDTeamCollection::Status::NONE); + this->excludedServers.set(address, DDTeamCollection::Status::NONE); } + this->wiggle_addresses.clear(); if (included) { this->restartRecruiting.trigger(); } @@ -3531,8 +3536,7 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea } change.push_back(self->zeroHealthyTeams->onChange()); - bool healthy = - !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize && !anyWigglingServer; + bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize; team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam bool optimal = team->isOptimal() && healthy; bool containsFailed = teamContainsFailedServer(self, team); @@ -3829,10 +3833,12 @@ ACTOR Future trackExcludedServers(DDTeamCollection* self) { // Reset and reassign self->excludedServers based on excluded, but we only // want to trigger entries that are different - // Do not retrigger and double-overwrite failed servers + // Do not retrigger and double-overwrite failed or wiggling servers auto old = self->excludedServers.getKeys(); for (const auto& o : old) { - if (!excluded.count(o) && !failed.count(o)) { + if (!excluded.count(o) && !failed.count(o) && + !(self->excludedServers.count(o) && + self->excludedServers.get(o) == DDTeamCollection::Status::WIGGLING)) { self->excludedServers.set(o, DDTeamCollection::Status::NONE); } } @@ -3884,6 +3890,7 @@ ACTOR Future>> getServerL // to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0. ACTOR Future updateNextWigglingStoragePID(DDTeamCollection* teamCollection) { state ReadYourWritesTransaction tr(teamCollection->cx); + state Value writeValue = LiteralStringRef("0"); loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); @@ -3896,11 +3903,14 @@ ACTOR Future updateNextWigglingStoragePID(DDTeamCollection* teamCollection auto nextIt = teamCollection->pid2server_info.upper_bound(value.get()); if (nextIt == teamCollection->pid2server_info.end()) { tr.set(wigglingStorageServerKey, pid); + writeValue = pid; } else { tr.set(wigglingStorageServerKey, nextIt->first); + writeValue = nextIt->first; } } else { tr.set(wigglingStorageServerKey, pid); + writeValue = pid; } } wait(tr.commit()); @@ -3909,6 +3919,9 @@ ACTOR Future updateNextWigglingStoragePID(DDTeamCollection* teamCollection wait(tr.onError(e)); } } + TraceEvent(SevDebug, "PerpetualNextWigglingStoragePID", teamCollection->distributorId) + .detail("WriteValue", writeValue); + return Void(); } @@ -3918,9 +3931,6 @@ ACTOR Future updateNextWigglingStoragePID(DDTeamCollection* teamCollection ACTOR Future perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, FutureStream finishStorageWiggleSignal, DDTeamCollection* teamCollection) { - // initialize PID - wait(updateNextWigglingStoragePID(teamCollection)); - loop choose { when(wait(stopSignal->onTrigger())) { break; } when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); } @@ -3931,8 +3941,8 @@ ACTOR Future perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, // Watch the value change of `wigglingStorageServerKey`. // Return the watch future and the current value of `wigglingStorageServerKey`. -ACTOR Future, Value>> watchPerpetualStoragePIDChange(Database cx) { - state ReadYourWritesTransaction tr(cx); +ACTOR Future, Value>> watchPerpetualStoragePIDChange(DDTeamCollection* self) { + state ReadYourWritesTransaction tr(self->cx); state Future watchFuture; state Value ret; loop { @@ -3960,7 +3970,7 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, PromiseStream finishStorageWiggleSignal, DDTeamCollection* self, const DDEnabledState* ddEnabledState) { - state Future watchFuture; + state Future watchFuture = Never(); state Future moveFinishFuture = Never(); state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY); state AsyncTrigger restart; @@ -3968,13 +3978,16 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow); state int movingCount = 0; state bool isPaused = false; - - state std::pair, Value> res = wait(watchPerpetualStoragePIDChange(self->cx)); - watchFuture = res.first; + state vector excludedServerIds; + state std::pair, Value> res = wait(watchPerpetualStoragePIDChange(self)); + ASSERT(!self->wigglingPid.present()); // only single process wiggle is allowed self->wigglingPid = Optional(res.second); // start with the initial pid - if (self->healthyTeamCount > 1) { // pre-check health status + for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) { + excludedServerIds.push_back(info->id); + } + if (self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { // pre-check health status TEST(true); // start the first wiggling auto fv = self->excludeStorageServersForWiggle(self->wigglingPid.get()); @@ -3993,15 +4006,20 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, choose { when(wait(stopSignal->onTrigger())) { break; } when(wait(watchFuture)) { + ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished + watchFuture = Never(); + // read new pid and set the next watch Future - wait(store(res, watchPerpetualStoragePIDChange(self->cx))); - watchFuture = res.first; + wait(store(res, watchPerpetualStoragePIDChange(self))); self->wigglingPid = Optional(res.second); StringRef pid = self->wigglingPid.get(); - if (self->healthyTeamCount <= 1) { // pre-check health status - pauseWiggle.trigger(); - } else { + // pre-check health status + excludedServerIds.clear(); + for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) { + excludedServerIds.push_back(info->id); + } + if (self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { TEST(true); // start wiggling auto fv = self->excludeStorageServersForWiggle(pid); @@ -4010,6 +4028,8 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, TraceEvent("PerpetualStorageWiggleStart", self->distributorId) .detail("ProcessId", pid) .detail("StorageCount", movingCount); + } else { + pauseWiggle.trigger(); } } when(wait(restart.onTrigger())) { @@ -4030,12 +4050,13 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, StringRef pid = self->wigglingPid.get(); moveFinishFuture = Never(); - self->includeStorageServersForWiggle(pid); + self->includeStorageServersForWiggle(); TraceEvent("PerpetualStorageWiggleFinish", self->distributorId) .detail("ProcessId", pid.toString()) .detail("StorageCount", movingCount); self->wigglingPid.reset(); + watchFuture = res.first; finishStorageWiggleSignal.send(Void()); } when(wait(self->zeroHealthyTeams->onChange())) { @@ -4050,11 +4071,11 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) { pauseWiggle.trigger(); - } else if (count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && self->healthyTeamCount > 1 && - isPaused) { + } else if (isPaused && count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && + self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { restart.trigger(); } - ddQueueCheck = delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow); + ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow); } when(wait(pauseWiggle.onTrigger())) { if (self->wigglingPid.present()) { @@ -4062,7 +4083,7 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, StringRef pid = self->wigglingPid.get(); isPaused = true; moveFinishFuture = Never(); - self->includeStorageServersForWiggle(pid); + self->includeStorageServersForWiggle(); TraceEvent("PerpetualStorageWigglePause", self->distributorId) .detail("ProcessId", pid) .detail("StorageCount", movingCount); @@ -4072,7 +4093,9 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, } if (self->wigglingPid.present()) { - self->includeStorageServersForWiggle(self->wigglingPid.get()); + self->includeStorageServersForWiggle(); + TraceEvent("PerpetualStorageWiggleExitingPause", self->distributorId) + .detail("ProcessId", self->wigglingPid.get()); self->wigglingPid.reset(); } @@ -4088,7 +4111,7 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio state AsyncTrigger stopWiggleSignal; state PromiseStream finishStorageWiggleSignal; state SignalableActorCollection collection; - + state bool started = false; loop { state ReadYourWritesTransaction tr(teamCollection->cx); loop { @@ -4103,16 +4126,18 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio wait(tr.commit()); ASSERT(speed == 1 || speed == 0); - if (speed == 1) { + if (speed == 1 && !started) { collection.add(perpetualStorageWiggleIterator( &stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection)); collection.add(perpetualStorageWiggler( &stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState)); TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId); - } else { + started = true; + } else if (speed == 0 && started) { stopWiggleSignal.trigger(); wait(collection.signalAndReset()); TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId); + started = false; } wait(watchFuture); break; @@ -4410,7 +4435,7 @@ ACTOR Future storageServerTracker( bool isTss) { state Future failureTracker; - state ServerStatus status(false, false, server->lastKnownInterface.locality); + state ServerStatus status(false, false, false, server->lastKnownInterface.locality); state bool lastIsUnhealthy = false; state Future metricsTracker = serverMetricsPolling(server); @@ -4427,6 +4452,7 @@ ACTOR Future storageServerTracker( loop { status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get(); status.isWrongConfiguration = false; + status.isWiggling = false; hasWrongDC = !isCorrectDC(self, server); hasInvalidLocality = !self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality); @@ -4506,10 +4532,21 @@ ACTOR Future storageServerTracker( status.isWrongConfiguration = true; } + // An invalid wiggle server should set itself the right status. Otherwise, it cannot be re-included by + // wiggler. + auto invalidWiggleServer = + [](const AddressExclusion& addr, const DDTeamCollection* tc, const TCServerInfo* server) { + return server->lastKnownInterface.locality.processId() != tc->wigglingPid; + }; // If the storage server is in the excluded servers list, it is undesired NetworkAddress a = server->lastKnownInterface.address(); AddressExclusion worstAddr(a.ip, a.port); DDTeamCollection::Status worstStatus = self->excludedServers.get(worstAddr); + + if (worstStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(worstAddr, self, server)) { + self->excludedServers.set(worstAddr, DDTeamCollection::Status::NONE); + worstStatus = DDTeamCollection::Status::NONE; + } otherChanges.push_back(self->excludedServers.onChange(worstAddr)); for (int i = 0; i < 3; i++) { @@ -4525,6 +4562,12 @@ ACTOR Future storageServerTracker( else if (i == 2) testAddr = AddressExclusion(server->lastKnownInterface.secondaryAddress().get().ip); DDTeamCollection::Status testStatus = self->excludedServers.get(testAddr); + + if (testStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(testAddr, self, server)) { + self->excludedServers.set(testAddr, DDTeamCollection::Status::NONE); + testStatus = DDTeamCollection::Status::NONE; + } + if (testStatus > worstStatus) { worstStatus = testStatus; worstAddr = testAddr; @@ -4543,6 +4586,7 @@ ACTOR Future storageServerTracker( status.isWiggling = true; TraceEvent("PerpetualWigglingStorageServer", self->distributorId) .detail("Server", server->id) + .detail("ProcessId", server->lastKnownInterface.locality.processId()) .detail("Address", worstAddr.toString()); } else if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) { TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId) @@ -4607,11 +4651,14 @@ ACTOR Future storageServerTracker( bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality; bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() != newInterface.first.locality.zoneId().get(); + bool processIdChanged = server->lastKnownInterface.locality.processId().get() != + newInterface.first.locality.processId().get(); TraceEvent("StorageServerInterfaceChanged", self->distributorId) .detail("ServerID", server->id) .detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token) .detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token) .detail("LocalityChanged", localityChanged) + .detail("ProcessIdChanged", processIdChanged) .detail("MachineLocalityChanged", machineLocalityChanged); server->lastKnownInterface = newInterface.first; @@ -4656,6 +4703,20 @@ ACTOR Future storageServerTracker( ASSERT(destMachine.isValid()); } + // update pid2server_info if the process id has changed + if (processIdChanged) { + self->pid2server_info[newInterface.first.locality.processId().get()].push_back( + self->server_info[server->id]); + // delete the old one + auto& old_infos = + self->pid2server_info[server->lastKnownInterface.locality.processId().get()]; + for (int i = 0; i < old_infos.size(); ++i) { + if (old_infos[i].getPtr() == server) { + std::swap(old_infos[i--], old_infos.back()); + old_infos.pop_back(); + } + } + } // Ensure the server's server team belong to a machine team, and // Get the newBadTeams due to the locality change vector> newBadTeams; @@ -4702,7 +4763,8 @@ ACTOR Future storageServerTracker( interfaceChanged = server->onInterfaceChanged; // Old failureTracker for the old interface will be actorCancelled since the handler of the old // actor now points to the new failure monitor actor. - status = ServerStatus(status.isFailed, status.isUndesired, server->lastKnownInterface.locality); + status = ServerStatus( + status.isFailed, status.isUndesired, status.isWiggling, server->lastKnownInterface.locality); // self->traceTeamCollectionInfo(); recordTeamCollectionInfo = true; @@ -5462,8 +5524,10 @@ ACTOR Future dataDistributionTeamCollection(Reference te self->addActor.send(trackExcludedServers(self)); self->addActor.send(monitorHealthyTeams(self)); self->addActor.send(waitHealthyZoneChange(self)); - self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState)); + if (self->primary) { // the primary dc also handle the satellite dc's perpetual wiggling + self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState)); + } // SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them loop choose { @@ -6215,6 +6279,30 @@ ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, return Void(); } +// Find size of set intersection of excludeServerIDs and serverIDs on each team and see if the leftover team is valid +bool _exclusionSafetyCheck(vector& excludeServerIDs, DDTeamCollection* teamCollection) { + std::sort(excludeServerIDs.begin(), excludeServerIDs.end()); + for (const auto& team : teamCollection->teams) { + vector teamServerIDs = team->getServerIDs(); + std::sort(teamServerIDs.begin(), teamServerIDs.end()); + TraceEvent(SevDebug, "DDExclusionSafetyCheck", teamCollection->distributorId) + .detail("Excluding", describe(excludeServerIDs)) + .detail("Existing", team->getDesc()); + // Find size of set intersection of both vectors and see if the leftover team is valid + vector intersectSet(teamServerIDs.size()); + auto it = std::set_intersection(excludeServerIDs.begin(), + excludeServerIDs.end(), + teamServerIDs.begin(), + teamServerIDs.end(), + intersectSet.begin()); + intersectSet.resize(it - intersectSet.begin()); + if (teamServerIDs.size() - intersectSet.size() < SERVER_KNOBS->DD_EXCLUDE_MIN_REPLICAS) { + return false; + } + } + return true; +} + ACTOR Future ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req, Reference self, Database cx) { @@ -6244,26 +6332,7 @@ ACTOR Future ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest } } } - std::sort(excludeServerIDs.begin(), excludeServerIDs.end()); - for (const auto& team : self->teamCollection->teams) { - vector teamServerIDs = team->getServerIDs(); - std::sort(teamServerIDs.begin(), teamServerIDs.end()); - TraceEvent(SevDebug, "DDExclusionSafetyCheck", self->ddId) - .detail("Excluding", describe(excludeServerIDs)) - .detail("Existing", team->getDesc()); - // Find size of set intersection of both vectors and see if the leftover team is valid - vector intersectSet(teamServerIDs.size()); - auto it = std::set_intersection(excludeServerIDs.begin(), - excludeServerIDs.end(), - teamServerIDs.begin(), - teamServerIDs.end(), - intersectSet.begin()); - intersectSet.resize(it - intersectSet.begin()); - if (teamServerIDs.size() - intersectSet.size() < SERVER_KNOBS->DD_EXCLUDE_MIN_REPLICAS) { - reply.safe = false; - break; - } - } + reply.safe = _exclusionSafetyCheck(excludeServerIDs, self->teamCollection); TraceEvent("DDExclusionSafetyCheckFinish", self->ddId); req.reply.send(reply); return Void(); @@ -6440,7 +6509,7 @@ std::unique_ptr testTeamCollection(int teamSize, interface.locality.set(LiteralStringRef("data_hall"), Standalone(std::to_string(id % 3))); collection->server_info[uid] = makeReference( interface, collection.get(), ProcessClass(), true, collection->storageServerSet); - collection->server_status.set(uid, ServerStatus(false, false, interface.locality)); + collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); collection->checkAndCreateMachine(collection->server_info[uid]); } @@ -6497,7 +6566,7 @@ std::unique_ptr testMachineTeamCollection(int teamSize, collection->server_info[uid] = makeReference( interface, collection.get(), ProcessClass(), true, collection->storageServerSet); - collection->server_status.set(uid, ServerStatus(false, false, interface.locality)); + collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); } int totalServerIndex = collection->constructMachinesFromServers(); diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 2881932380..bb87bf4e33 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -993,7 +993,7 @@ ACTOR Future dataDistributionRelocator(DDQueueData* self, RelocateData rd, allHealthy = true; anyWithSource = false; bestTeams.clear(); - // Get team from teamCollections in diffrent DCs and find the best one + // Get team from teamCollections in different DCs and find the best one while (tciIndex < self->teamCollections.size()) { double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY; if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 4ccb08b9f6..c43fc77906 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -133,7 +133,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( PRIORITY_RECOVER_MOVE, 110 ); init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 ); init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 ); - init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 140 ); + init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 139 ); init( PRIORITY_TEAM_HEALTHY, 140 ); init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 ); init( PRIORITY_TEAM_REDUNDANT, 200 ); diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 7a23e96d98..fe4d0034ee 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -890,6 +890,7 @@ ACTOR Future checkConsistency(Database cx, StringRef performTSSCheck = LiteralStringRef("false"); if (doQuiescentCheck) { performQuiescent = LiteralStringRef("true"); + spec.restorePerpetualWiggleSetting = false; } if (doCacheCheck) { performCacheCheck = LiteralStringRef("true"); @@ -1385,6 +1386,8 @@ ACTOR Future runTests(Reference runTests(ReferencewaitForQuiescenceEnd) waitForQuiescenceEnd = true; + if (iter->restorePerpetualWiggleSetting) + restorePerpetualWiggleSetting = true; startDelay = std::max(startDelay, iter->startDelay); databasePingDelay = std::min(databasePingDelay, iter->databasePingDelay); if (iter->simBackupAgents != ISimulator::BackupAgentType::NoBackupAgents) @@ -1437,6 +1442,15 @@ ACTOR Future runTests(Reference(startingConfiguration.begin()), + startingConfiguration.size()); + const std::string setting = "perpetual_storage_wiggle:="; + auto pos = confView.find(setting); + if (pos != confView.npos && confView.at(pos + setting.size()) == '1') { + perpetualWiggleEnabled = true; + } + } } if (useDB && waitForQuiescenceBegin) { @@ -1452,6 +1466,10 @@ ACTOR Future runTests(ReferenceisSimulated()), runConsistencyCheckOnCache(false), runConsistencyCheckOnTSS(false), waitForQuiescenceBegin(true), - waitForQuiescenceEnd(true), simCheckRelocationDuration(false), simConnectionFailuresDisableDuration(0), - simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents), + waitForQuiescenceEnd(true), restorePerpetualWiggleSetting(true), simCheckRelocationDuration(false), + simConnectionFailuresDisableDuration(0), simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents), simDrAgents(ISimulator::BackupAgentType::NoBackupAgents) { phases = TestWorkload::SETUP | TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS; if (databasePingDelay < 0) @@ -191,6 +192,11 @@ public: bool runConsistencyCheckOnTSS; bool waitForQuiescenceBegin; bool waitForQuiescenceEnd; + bool restorePerpetualWiggleSetting; // whether set perpetual_storage_wiggle as the value after run + // QuietDatabase. QuietDatabase always disables perpetual storage wiggle on + // purpose. If waitForQuiescenceBegin == true and we want to keep perpetual + // storage wiggle the same setting as before during testing, this value should + // be set true. bool simCheckRelocationDuration; // If set to true, then long duration relocations generate SevWarnAlways messages. // Once any workload sets this to true, it will be true for the duration of the From 4f29b94c61d820ef7e509178e1bf2c7c5d59933a Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Sat, 12 Jun 2021 17:27:00 -0700 Subject: [PATCH 07/30] Fix TraceEventOverflow error in HealthMetricsApi workload --- fdbserver/workloads/HealthMetricsApi.actor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbserver/workloads/HealthMetricsApi.actor.cpp b/fdbserver/workloads/HealthMetricsApi.actor.cpp index 9566514b04..ec22baa1a2 100644 --- a/fdbserver/workloads/HealthMetricsApi.actor.cpp +++ b/fdbserver/workloads/HealthMetricsApi.actor.cpp @@ -168,6 +168,7 @@ struct HealthMetricsApiWorkload : TestWorkload { traceDiskUsage.detail(format("Storage-%s", ss.first.toString().c_str()), storageStats.diskUsage); } TraceEvent traceTLogQueue("TLogQueue"); + traceTLogQueue.setMaxFieldLength(10000).setMaxEventLength(11000); for (const auto& ss : healthMetrics.tLogQueue) { self->detailedWorstTLogQueue = std::max(self->detailedWorstTLogQueue, ss.second); traceTLogQueue.detail(format("TLog-%s", ss.first.toString().c_str()), ss.second); From 8af0affeff61f90b89f409540b7182e721906eaf Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 11 Jun 2021 18:31:44 -0400 Subject: [PATCH 08/30] Add support for running non-simulation tests on Joshua --- contrib/TestHarness/Program.cs.cmake | 38 ++++++++++++++++-------- fdbserver/KeyValueStoreRocksDB.actor.cpp | 2 +- tests/CMakeLists.txt | 1 + 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/contrib/TestHarness/Program.cs.cmake b/contrib/TestHarness/Program.cs.cmake index 075a2758d6..5081031fe6 100644 --- a/contrib/TestHarness/Program.cs.cmake +++ b/contrib/TestHarness/Program.cs.cmake @@ -246,6 +246,7 @@ namespace SummarizeTest string testFile = null; string testDir = ""; string oldServerName = ""; + bool noSim = false; if (Directory.Exists(testFolder)) { @@ -254,9 +255,10 @@ namespace SummarizeTest if( Directory.Exists(Path.Combine(testFolder, "slow")) ) poolSize += 5; if( Directory.Exists(Path.Combine(testFolder, "fast")) ) poolSize += 14; if( Directory.Exists(Path.Combine(testFolder, "restarting")) ) poolSize += 1; + if( Directory.Exists(Path.Combine(testFolder, "noSim")) ) poolSize += 1; if( poolSize == 0 ) { - Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, or restarting sub-folder", testFolder); + Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, restarting, or noSim sub-folder", testFolder); return 1; } int selection = random.Next(poolSize); @@ -272,11 +274,20 @@ namespace SummarizeTest testDir = Path.Combine(testFolder, "restarting"); else { - if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5; + if (Directory.Exists(Path.Combine(testFolder, "noSim"))) selectionWindow += 1; if (selection < selectionWindow) - testDir = Path.Combine(testFolder, "slow"); + { + testDir = Path.Combine(testFolder, "noSim"); + noSim = true; + } else - testDir = Path.Combine(testFolder, "fast"); + { + if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5; + if (selection < selectionWindow) + testDir = Path.Combine(testFolder, "slow"); + else + testDir = Path.Combine(testFolder, "fast"); + } } } string[] files = Directory.GetFiles(testDir, "*", SearchOption.AllDirectories); @@ -341,11 +352,11 @@ namespace SummarizeTest bool useNewPlugin = (oldServerName == fdbserverName) || versionGreaterThanOrEqual(oldServerName.Split('-').Last(), "5.2.0"); bool useToml = File.Exists(testFile + "-1.toml"); string testFile1 = useToml ? testFile + "-1.toml" : testFile + "-1.txt"; - result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout); + result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout, noSim); if (result == 0) { string testFile2 = useToml ? testFile + "-2.toml" : testFile + "-2.txt"; - result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout); + result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout, noSim); } } else @@ -353,13 +364,13 @@ namespace SummarizeTest int expectedUnseed = -1; if (!useValgrind && unseedCheck) { - result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout); + result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout, noSim); } if (!retryableError) { int unseed; - result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout); + result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout, noSim); } } @@ -374,7 +385,7 @@ namespace SummarizeTest private static int RunTest(string fdbserverName, string tlsPluginFile, string summaryFileName, string errorFileName, int seed, bool buggify, string testFile, string runDir, string uid, int expectedUnseed, out int unseed, out bool retryableError, bool logOnRetryableError, bool useValgrind, bool restarting = false, - bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false) + bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false, bool noSim = false) { unseed = -1; @@ -408,16 +419,17 @@ namespace SummarizeTest tlsPluginArg = "--tls_plugin=" + tlsPluginFile; } process.StartInfo.RedirectStandardOutput = true; + string role = (noSim) ? "test" : "simulation"; var args = ""; if (willRestart && oldBinaryName.EndsWith("alpha6")) { - args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", - IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1000000000 -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash", + role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); } else { - args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", - IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1GB -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash", + role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); } if (restarting) args = args + " --restarting"; if (useValgrind && !willRestart) diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 7377a6d5cf..2f2c77c42e 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -488,7 +488,7 @@ IKeyValueStore* keyValueStoreRocksDB(std::string const& path, namespace { -TEST_CASE("fdbserver/KeyValueStoreRocksDB/Reopen") { +TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/Reopen") { state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db"; platform::eraseDirectoryRecursive(rocksDBTestDir); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f84abc0faf..d1b08d9ffd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -155,6 +155,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/Watches.toml) add_fdb_test(TEST_FILES fast/WriteDuringRead.toml) add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml) + add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml) add_fdb_test(TEST_FILES rare/CheckRelocation.toml) add_fdb_test(TEST_FILES rare/ClogUnclog.toml) add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml) From ef2a2cd1a564ec61827644bd3da842afec080711 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 14 Jun 2021 13:11:43 -0400 Subject: [PATCH 09/30] Actually add test file --- tests/noSim/RandomUnitTests.toml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 tests/noSim/RandomUnitTests.toml diff --git a/tests/noSim/RandomUnitTests.toml b/tests/noSim/RandomUnitTests.toml new file mode 100644 index 0000000000..769ea7596f --- /dev/null +++ b/tests/noSim/RandomUnitTests.toml @@ -0,0 +1,9 @@ +[[test]] +testTitle = 'UnitTests' +useDB = false +startDelay = 0 + + [[test.workload]] + testName = 'UnitTests' + maxTestCases = 1 + testsMatching = 'noSim/' From 16a53fe531f6875f97cde6c6e8a0cf1994e2c727 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Mon, 14 Jun 2021 13:43:31 -0700 Subject: [PATCH 10/30] Update fdbserver/workloads/HealthMetricsApi.actor.cpp Co-authored-by: A.J. Beamon --- fdbserver/workloads/HealthMetricsApi.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/workloads/HealthMetricsApi.actor.cpp b/fdbserver/workloads/HealthMetricsApi.actor.cpp index ec22baa1a2..fff9c1ae91 100644 --- a/fdbserver/workloads/HealthMetricsApi.actor.cpp +++ b/fdbserver/workloads/HealthMetricsApi.actor.cpp @@ -168,7 +168,7 @@ struct HealthMetricsApiWorkload : TestWorkload { traceDiskUsage.detail(format("Storage-%s", ss.first.toString().c_str()), storageStats.diskUsage); } TraceEvent traceTLogQueue("TLogQueue"); - traceTLogQueue.setMaxFieldLength(10000).setMaxEventLength(11000); + traceTLogQueue.setMaxEventLength(10000); for (const auto& ss : healthMetrics.tLogQueue) { self->detailedWorstTLogQueue = std::max(self->detailedWorstTLogQueue, ss.second); traceTLogQueue.detail(format("TLog-%s", ss.first.toString().c_str()), ss.second); From 2cd4e6d62fee7f79db6bfe9a90750ad88d035acd Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 11 Jun 2021 23:00:44 +0000 Subject: [PATCH 11/30] check healthy team count, dd queue and disk space; code refactor --- fdbserver/DataDistribution.actor.cpp | 148 ++++++++++++--------------- 1 file changed, 65 insertions(+), 83 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index e1f95aaf36..538efa697b 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -630,6 +630,7 @@ struct DDTeamCollection : ReferenceCounted { std::map lagging_zones; // zone to number of storage servers lagging AsyncVar disableFailingLaggingServers; Optional wigglingPid; // Process id of current wiggling storage server; + Reference> pauseWiggle; // machine_info has all machines info; key must be unique across processes on the same machine std::map, Reference> machine_info; @@ -655,6 +656,8 @@ struct DDTeamCollection : ReferenceCounted { int optimalTeamCount; AsyncVar zeroOptimalTeams; + bool bestTeamStuck = false; + bool isTssRecruiting; // If tss recruiting is waiting on a pair, don't consider DD recruiting for the purposes of QuietDB // WIGGLING if an address is under storage wiggling. @@ -1007,8 +1010,14 @@ struct DDTeamCollection : ReferenceCounted { } // Log BestTeamStuck reason when we have healthy teams but they do not have healthy free space - if (g_network->isSimulated() && randomTeams.empty() && !self->zeroHealthyTeams->get()) { - TraceEvent(SevWarn, "GetTeamReturnEmpty").detail("HealthyTeams", self->healthyTeamCount); + if (randomTeams.empty() && !self->zeroHealthyTeams->get()) { + self->bestTeamStuck = true; + if(g_network->isSimulated()) { + TraceEvent(SevWarn, "GetTeamReturnEmpty").detail("HealthyTeams", self->healthyTeamCount); + } + } + else { + self->bestTeamStuck = false; } for (int i = 0; i < randomTeams.size(); i++) { @@ -3962,6 +3971,25 @@ ACTOR Future, Value>> watchPerpetualStoragePIDChange(DDTe return std::make_pair(watchFuture, ret); } +// periodically check whether the cluster is healthy if we continue perpetual wiggle +ACTOR Future clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self) { + loop { + Promise countp; + self->getUnhealthyRelocationCount.send(countp); + int count = wait(countp.getFuture()); + // pause wiggle when + // a. DDQueue is busy with unhealthy relocation request + // b. no healthy team + // c. the overall disk space is not enough + if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount == 0 || self->bestTeamStuck) { + self->pauseWiggle->set(true); + } + else { + self->pauseWiggle->set(false); + } + wait(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow)); + } +} // Watches the value (pid) change of \xff/storageWigglePID, and adds storage servers held on process of which the // Process Id is “pid” into excludeServers which prevent recruiting the wiggling storage servers and let teamTracker // start to move data off the affected teams. The wiggling process of current storage servers will be paused if the @@ -3972,56 +4000,31 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, const DDEnabledState* ddEnabledState) { state Future watchFuture = Never(); state Future moveFinishFuture = Never(); - state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY); - state AsyncTrigger restart; - state Future ddQueueCheck = - delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow); + state Future ddQueueCheck = clusterHealthCheckForPerpetualWiggle(self); state int movingCount = 0; - state bool isPaused = false; state vector excludedServerIds; state std::pair, Value> res = wait(watchPerpetualStoragePIDChange(self)); ASSERT(!self->wigglingPid.present()); // only single process wiggle is allowed self->wigglingPid = Optional(res.second); - // start with the initial pid - for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) { - excludedServerIds.push_back(info->id); - } - if (self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { // pre-check health status - TEST(true); // start the first wiggling - - auto fv = self->excludeStorageServersForWiggle(self->wigglingPid.get()); - movingCount = fv.size(); - moveFinishFuture = waitForAll(fv); - TraceEvent("PerpetualStorageWiggleInitialStart", self->distributorId) - .detail("ProcessId", self->wigglingPid.get()) - .detail("StorageCount", movingCount); - } else { - isPaused = true; - TraceEvent("PerpetualStorageWiggleInitialPause", self->distributorId) - .detail("ProcessId", self->wigglingPid.get()); - } - loop { - choose { - when(wait(stopSignal->onTrigger())) { break; } - when(wait(watchFuture)) { - ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished - watchFuture = Never(); - - // read new pid and set the next watch Future - wait(store(res, watchPerpetualStoragePIDChange(self))); - self->wigglingPid = Optional(res.second); - StringRef pid = self->wigglingPid.get(); - - // pre-check health status + if (self->wigglingPid.present()) { + StringRef pid = self->wigglingPid.get(); + if (self->pauseWiggle->get()) { + TEST(true); // paused because cluster is unhealthy + moveFinishFuture = Never(); + self->includeStorageServersForWiggle(); + TraceEvent("PerpetualStorageWigglePause", self->distributorId) + .detail("ProcessId", pid) + .detail("StorageCount", movingCount); + } else { + // pre-check whether wiggling chosen servers still satisfy replica requirement excludedServerIds.clear(); for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) { excludedServerIds.push_back(info->id); } - if (self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { + if (_exclusionSafetyCheck(excludedServerIds, self)) { TEST(true); // start wiggling - auto fv = self->excludeStorageServersForWiggle(pid); movingCount = fv.size(); moveFinishFuture = waitForAll(fv); @@ -4029,20 +4032,24 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, .detail("ProcessId", pid) .detail("StorageCount", movingCount); } else { - pauseWiggle.trigger(); + TEST(true); // skip wiggling current process + TraceEvent("PerpetualStorageWiggleSkip", self->distributorId).detail("ProcessId", pid.toString()); + + self->wigglingPid.reset(); + watchFuture = res.first; + finishStorageWiggleSignal.send(Void()); } } - when(wait(restart.onTrigger())) { - if (self->wigglingPid.present()) { - TEST(true); // restart paused wiggling - StringRef pid = self->wigglingPid.get(); - auto fv = self->excludeStorageServersForWiggle(pid); - moveFinishFuture = waitForAll(fv); - TraceEvent("PerpetualStorageWiggleRestart", self->distributorId) - .detail("ProcessId", pid) - .detail("StorageCount", fv.size()); - isPaused = false; - } + } + + choose { + when(wait(stopSignal->onTrigger())) { break; } + when(wait(watchFuture)) { + ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished + watchFuture = Never(); + // read new pid and set the next watch Future + wait(store(res, watchPerpetualStoragePIDChange(self))); + self->wigglingPid = Optional(res.second); } when(wait(moveFinishFuture)) { TEST(true); // finish wiggling this process @@ -4059,36 +4066,8 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, watchFuture = res.first; finishStorageWiggleSignal.send(Void()); } - when(wait(self->zeroHealthyTeams->onChange())) { - if (self->zeroHealthyTeams->get() && !isPaused) { - pauseWiggle.trigger(); - } - } - when(wait(ddQueueCheck)) { // check health status periodically - Promise countp; - self->getUnhealthyRelocationCount.send(countp); - int count = wait(countp.getFuture()); - - if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) { - pauseWiggle.trigger(); - } else if (isPaused && count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && - self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { - restart.trigger(); - } - ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow); - } - when(wait(pauseWiggle.onTrigger())) { - if (self->wigglingPid.present()) { - TEST(true); // paused because cluster is unhealthy - StringRef pid = self->wigglingPid.get(); - isPaused = true; - moveFinishFuture = Never(); - self->includeStorageServersForWiggle(); - TraceEvent("PerpetualStorageWigglePause", self->distributorId) - .detail("ProcessId", pid) - .detail("StorageCount", movingCount); - } - } + when(wait(ddQueueCheck)) {} + when(wait(self->pauseWiggle->onChange())) {} } } @@ -4112,7 +4091,9 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio state PromiseStream finishStorageWiggleSignal; state SignalableActorCollection collection; state bool started = false; - loop { + teamCollection->pauseWiggle = makeReference>(true); + + loop { state ReadYourWritesTransaction tr(teamCollection->cx); loop { try { @@ -4138,6 +4119,7 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio wait(collection.signalAndReset()); TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId); started = false; + teamCollection->pauseWiggle->set(true); } wait(watchFuture); break; From d33e43fd2bdd72b4d7cc44609cc954fc61ac4e13 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Mon, 14 Jun 2021 23:00:02 +0000 Subject: [PATCH 12/30] code format --- fdbserver/DataDistribution.actor.cpp | 33 ++++++++++++++-------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 538efa697b..644e3c414f 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1012,11 +1012,10 @@ struct DDTeamCollection : ReferenceCounted { // Log BestTeamStuck reason when we have healthy teams but they do not have healthy free space if (randomTeams.empty() && !self->zeroHealthyTeams->get()) { self->bestTeamStuck = true; - if(g_network->isSimulated()) { + if (g_network->isSimulated()) { TraceEvent(SevWarn, "GetTeamReturnEmpty").detail("HealthyTeams", self->healthyTeamCount); } - } - else { + } else { self->bestTeamStuck = false; } @@ -3974,21 +3973,21 @@ ACTOR Future, Value>> watchPerpetualStoragePIDChange(DDTe // periodically check whether the cluster is healthy if we continue perpetual wiggle ACTOR Future clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self) { loop { - Promise countp; - self->getUnhealthyRelocationCount.send(countp); - int count = wait(countp.getFuture()); + Promise countp; + self->getUnhealthyRelocationCount.send(countp); + int count = wait(countp.getFuture()); // pause wiggle when // a. DDQueue is busy with unhealthy relocation request // b. no healthy team // c. the overall disk space is not enough - if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount == 0 || self->bestTeamStuck) { - self->pauseWiggle->set(true); - } - else { - self->pauseWiggle->set(false); - } - wait(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow)); - } + if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount == 0 || + self->bestTeamStuck) { + self->pauseWiggle->set(true); + } else { + self->pauseWiggle->set(false); + } + wait(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow)); + } } // Watches the value (pid) change of \xff/storageWigglePID, and adds storage servers held on process of which the // Process Id is “pid” into excludeServers which prevent recruiting the wiggling storage servers and let teamTracker @@ -4041,7 +4040,7 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, } } } - + choose { when(wait(stopSignal->onTrigger())) { break; } when(wait(watchFuture)) { @@ -4091,9 +4090,9 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio state PromiseStream finishStorageWiggleSignal; state SignalableActorCollection collection; state bool started = false; - teamCollection->pauseWiggle = makeReference>(true); + teamCollection->pauseWiggle = makeReference>(true); - loop { + loop { state ReadYourWritesTransaction tr(teamCollection->cx); loop { try { From 549cf0512b61fc78207c13ff49cb29d50e18027e Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 14 Jun 2021 19:23:56 -0400 Subject: [PATCH 13/30] Revert "Add support for running non-simulation tests on Joshua" --- contrib/TestHarness/Program.cs.cmake | 38 ++++++++---------------- fdbserver/KeyValueStoreRocksDB.actor.cpp | 2 +- tests/CMakeLists.txt | 1 - tests/noSim/RandomUnitTests.toml | 9 ------ 4 files changed, 14 insertions(+), 36 deletions(-) delete mode 100644 tests/noSim/RandomUnitTests.toml diff --git a/contrib/TestHarness/Program.cs.cmake b/contrib/TestHarness/Program.cs.cmake index ab5d557dfd..c9edc4f29b 100644 --- a/contrib/TestHarness/Program.cs.cmake +++ b/contrib/TestHarness/Program.cs.cmake @@ -247,7 +247,6 @@ namespace SummarizeTest string testFile = null; string testDir = ""; string oldServerName = ""; - bool noSim = false; if (Directory.Exists(testFolder)) { @@ -256,10 +255,9 @@ namespace SummarizeTest if( Directory.Exists(Path.Combine(testFolder, "slow")) ) poolSize += 5; if( Directory.Exists(Path.Combine(testFolder, "fast")) ) poolSize += 14; if( Directory.Exists(Path.Combine(testFolder, "restarting")) ) poolSize += 1; - if( Directory.Exists(Path.Combine(testFolder, "noSim")) ) poolSize += 1; if( poolSize == 0 ) { - Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, restarting, or noSim sub-folder", testFolder); + Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, or restarting sub-folder", testFolder); return 1; } int selection = random.Next(poolSize); @@ -275,20 +273,11 @@ namespace SummarizeTest testDir = Path.Combine(testFolder, "restarting"); else { - if (Directory.Exists(Path.Combine(testFolder, "noSim"))) selectionWindow += 1; + if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5; if (selection < selectionWindow) - { - testDir = Path.Combine(testFolder, "noSim"); - noSim = true; - } + testDir = Path.Combine(testFolder, "slow"); else - { - if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5; - if (selection < selectionWindow) - testDir = Path.Combine(testFolder, "slow"); - else - testDir = Path.Combine(testFolder, "fast"); - } + testDir = Path.Combine(testFolder, "fast"); } } string[] files = Directory.GetFiles(testDir, "*", SearchOption.AllDirectories); @@ -353,11 +342,11 @@ namespace SummarizeTest bool useNewPlugin = (oldServerName == fdbserverName) || versionGreaterThanOrEqual(oldServerName.Split('-').Last(), "5.2.0"); bool useToml = File.Exists(testFile + "-1.toml"); string testFile1 = useToml ? testFile + "-1.toml" : testFile + "-1.txt"; - result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout, noSim); + result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout); if (result == 0) { string testFile2 = useToml ? testFile + "-2.toml" : testFile + "-2.txt"; - result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout, noSim); + result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout); } } else @@ -365,13 +354,13 @@ namespace SummarizeTest int expectedUnseed = -1; if (!useValgrind && unseedCheck) { - result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout, noSim); + result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout); } if (!retryableError) { int unseed; - result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout, noSim); + result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout); } } @@ -386,7 +375,7 @@ namespace SummarizeTest private static int RunTest(string fdbserverName, string tlsPluginFile, string summaryFileName, string errorFileName, int seed, bool buggify, string testFile, string runDir, string uid, int expectedUnseed, out int unseed, out bool retryableError, bool logOnRetryableError, bool useValgrind, bool restarting = false, - bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false, bool noSim = false) + bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false) { unseed = -1; @@ -420,17 +409,16 @@ namespace SummarizeTest tlsPluginArg = "--tls_plugin=" + tlsPluginFile; } process.StartInfo.RedirectStandardOutput = true; - string role = (noSim) ? "test" : "simulation"; var args = ""; if (willRestart && oldBinaryName.EndsWith("alpha6")) { - args = string.Format("-Rs 1000000000 -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash", - role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", + IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); } else { - args = string.Format("-Rs 1GB -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash", - role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", + IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); } if (restarting) args = args + " --restarting"; if (useValgrind && !willRestart) diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 2f2c77c42e..7377a6d5cf 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -488,7 +488,7 @@ IKeyValueStore* keyValueStoreRocksDB(std::string const& path, namespace { -TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/Reopen") { +TEST_CASE("fdbserver/KeyValueStoreRocksDB/Reopen") { state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db"; platform::eraseDirectoryRecursive(rocksDBTestDir); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d1b08d9ffd..f84abc0faf 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -155,7 +155,6 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/Watches.toml) add_fdb_test(TEST_FILES fast/WriteDuringRead.toml) add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml) - add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml) add_fdb_test(TEST_FILES rare/CheckRelocation.toml) add_fdb_test(TEST_FILES rare/ClogUnclog.toml) add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml) diff --git a/tests/noSim/RandomUnitTests.toml b/tests/noSim/RandomUnitTests.toml deleted file mode 100644 index 769ea7596f..0000000000 --- a/tests/noSim/RandomUnitTests.toml +++ /dev/null @@ -1,9 +0,0 @@ -[[test]] -testTitle = 'UnitTests' -useDB = false -startDelay = 0 - - [[test.workload]] - testName = 'UnitTests' - maxTestCases = 1 - testsMatching = 'noSim/' From f19d256e0da9ade7e211cd2365762f27ade4f314 Mon Sep 17 00:00:00 2001 From: RenxuanW Date: Fri, 11 Jun 2021 23:01:23 -0700 Subject: [PATCH 14/30] Bug fix: grvLatencyBands should take "GRVLatencyBands" as name. --- fdbserver/GrvProxyServer.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 341b42c1ef..d2389c2b2d 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -107,7 +107,7 @@ struct GrvProxyStats { id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), - grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { + grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { // The rate at which the limit(budget) is allowed to grow. specialCounter( cc, "SystemAndDefaultTxnRateAllowed", [this]() { return int64_t(this->transactionRateAllowed); }); From a3e3a18cda3a593133c30f3fc68c8caa5ac862cf Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 15 Jun 2021 11:12:14 -0700 Subject: [PATCH 15/30] Update fdbcli/ForceRecoveryWithDataLossCommand.actor.cpp --- fdbcli/ForceRecoveryWithDataLossCommand.actor.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/fdbcli/ForceRecoveryWithDataLossCommand.actor.cpp b/fdbcli/ForceRecoveryWithDataLossCommand.actor.cpp index 9573df9a20..de03af3359 100644 --- a/fdbcli/ForceRecoveryWithDataLossCommand.actor.cpp +++ b/fdbcli/ForceRecoveryWithDataLossCommand.actor.cpp @@ -38,7 +38,6 @@ ACTOR Future forceRecoveryWithDataLossCommandActor(Reference db return true; } -// hidden commands, no help text for now CommandFactory forceRecoveryWithDataLossFactory( "force_recovery_with_data_loss", CommandHelp("force_recovery_with_data_loss ", From efdda3cd0e7df697bc390acdbc0e927ff6a20428 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 14 Jun 2021 19:43:08 -0400 Subject: [PATCH 16/30] Revert "Revert "Add support for running non-simulation tests on Joshua"" --- contrib/TestHarness/Program.cs.cmake | 38 ++++++++++++++++-------- fdbserver/KeyValueStoreRocksDB.actor.cpp | 2 +- tests/CMakeLists.txt | 1 + tests/noSim/RandomUnitTests.toml | 9 ++++++ 4 files changed, 36 insertions(+), 14 deletions(-) create mode 100644 tests/noSim/RandomUnitTests.toml diff --git a/contrib/TestHarness/Program.cs.cmake b/contrib/TestHarness/Program.cs.cmake index c9edc4f29b..ab5d557dfd 100644 --- a/contrib/TestHarness/Program.cs.cmake +++ b/contrib/TestHarness/Program.cs.cmake @@ -247,6 +247,7 @@ namespace SummarizeTest string testFile = null; string testDir = ""; string oldServerName = ""; + bool noSim = false; if (Directory.Exists(testFolder)) { @@ -255,9 +256,10 @@ namespace SummarizeTest if( Directory.Exists(Path.Combine(testFolder, "slow")) ) poolSize += 5; if( Directory.Exists(Path.Combine(testFolder, "fast")) ) poolSize += 14; if( Directory.Exists(Path.Combine(testFolder, "restarting")) ) poolSize += 1; + if( Directory.Exists(Path.Combine(testFolder, "noSim")) ) poolSize += 1; if( poolSize == 0 ) { - Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, or restarting sub-folder", testFolder); + Console.WriteLine("Passed folder ({0}) did not have a fast, slow, rare, restarting, or noSim sub-folder", testFolder); return 1; } int selection = random.Next(poolSize); @@ -273,11 +275,20 @@ namespace SummarizeTest testDir = Path.Combine(testFolder, "restarting"); else { - if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5; + if (Directory.Exists(Path.Combine(testFolder, "noSim"))) selectionWindow += 1; if (selection < selectionWindow) - testDir = Path.Combine(testFolder, "slow"); + { + testDir = Path.Combine(testFolder, "noSim"); + noSim = true; + } else - testDir = Path.Combine(testFolder, "fast"); + { + if (Directory.Exists(Path.Combine(testFolder, "slow"))) selectionWindow += 5; + if (selection < selectionWindow) + testDir = Path.Combine(testFolder, "slow"); + else + testDir = Path.Combine(testFolder, "fast"); + } } } string[] files = Directory.GetFiles(testDir, "*", SearchOption.AllDirectories); @@ -342,11 +353,11 @@ namespace SummarizeTest bool useNewPlugin = (oldServerName == fdbserverName) || versionGreaterThanOrEqual(oldServerName.Split('-').Last(), "5.2.0"); bool useToml = File.Exists(testFile + "-1.toml"); string testFile1 = useToml ? testFile + "-1.toml" : testFile + "-1.txt"; - result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout); + result = RunTest(firstServerName, useNewPlugin ? tlsPluginFile : tlsPluginFile_5_1, summaryFileName, errorFileName, seed, buggify, testFile1, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, true, oldServerName, traceToStdout, noSim); if (result == 0) { string testFile2 = useToml ? testFile + "-2.toml" : testFile + "-2.txt"; - result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout); + result = RunTest(secondServerName, tlsPluginFile, summaryFileName, errorFileName, seed+1, buggify, testFile2, runDir, uid, expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, true, false, oldServerName, traceToStdout, noSim); } } else @@ -354,13 +365,13 @@ namespace SummarizeTest int expectedUnseed = -1; if (!useValgrind && unseedCheck) { - result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout); + result = RunTest(fdbserverName, tlsPluginFile, null, null, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), -1, out expectedUnseed, out retryableError, logOnRetryableError, false, false, false, "", traceToStdout, noSim); } if (!retryableError) { int unseed; - result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout); + result = RunTest(fdbserverName, tlsPluginFile, summaryFileName, errorFileName, seed, buggify, testFile, runDir, Guid.NewGuid().ToString(), expectedUnseed, out unseed, out retryableError, logOnRetryableError, useValgrind, false, false, "", traceToStdout, noSim); } } @@ -375,7 +386,7 @@ namespace SummarizeTest private static int RunTest(string fdbserverName, string tlsPluginFile, string summaryFileName, string errorFileName, int seed, bool buggify, string testFile, string runDir, string uid, int expectedUnseed, out int unseed, out bool retryableError, bool logOnRetryableError, bool useValgrind, bool restarting = false, - bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false) + bool willRestart = false, string oldBinaryName = "", bool traceToStdout = false, bool noSim = false) { unseed = -1; @@ -409,16 +420,17 @@ namespace SummarizeTest tlsPluginArg = "--tls_plugin=" + tlsPluginFile; } process.StartInfo.RedirectStandardOutput = true; + string role = (noSim) ? "test" : "simulation"; var args = ""; if (willRestart && oldBinaryName.EndsWith("alpha6")) { - args = string.Format("-Rs 1000000000 -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", - IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1000000000 -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash", + role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); } else { - args = string.Format("-Rs 1GB -r simulation {0} -s {1} -f \"{2}\" -b {3} {4} --crash", - IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); + args = string.Format("-Rs 1GB -r {0} {1} -s {2} -f \"{3}\" -b {4} {5} --crash", + role, IsRunningOnMono() ? "" : "-q", seed, testFile, buggify ? "on" : "off", tlsPluginArg); } if (restarting) args = args + " --restarting"; if (useValgrind && !willRestart) diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 7377a6d5cf..2f2c77c42e 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -488,7 +488,7 @@ IKeyValueStore* keyValueStoreRocksDB(std::string const& path, namespace { -TEST_CASE("fdbserver/KeyValueStoreRocksDB/Reopen") { +TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/Reopen") { state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db"; platform::eraseDirectoryRecursive(rocksDBTestDir); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f84abc0faf..d1b08d9ffd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -155,6 +155,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/Watches.toml) add_fdb_test(TEST_FILES fast/WriteDuringRead.toml) add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml) + add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml) add_fdb_test(TEST_FILES rare/CheckRelocation.toml) add_fdb_test(TEST_FILES rare/ClogUnclog.toml) add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml) diff --git a/tests/noSim/RandomUnitTests.toml b/tests/noSim/RandomUnitTests.toml new file mode 100644 index 0000000000..769ea7596f --- /dev/null +++ b/tests/noSim/RandomUnitTests.toml @@ -0,0 +1,9 @@ +[[test]] +testTitle = 'UnitTests' +useDB = false +startDelay = 0 + + [[test.workload]] + testName = 'UnitTests' + maxTestCases = 1 + testsMatching = 'noSim/' From 6eb383cfc330568a9e435c2f183b4a1ad245e602 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 15 Jun 2021 13:03:19 -0400 Subject: [PATCH 17/30] Run noSim test as a unit test --- tests/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d1b08d9ffd..ebb94c4c94 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -155,7 +155,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/Watches.toml) add_fdb_test(TEST_FILES fast/WriteDuringRead.toml) add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml) - add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml) + add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT) add_fdb_test(TEST_FILES rare/CheckRelocation.toml) add_fdb_test(TEST_FILES rare/ClogUnclog.toml) add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml) From 44cba555ebd5755a32da96442bbe2582aa829bbc Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 15 Jun 2021 13:32:35 -0400 Subject: [PATCH 18/30] Don't fail on missing simfdb dir --- tests/TestRunner/TestRunner.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/TestRunner/TestRunner.py b/tests/TestRunner/TestRunner.py index 207bec08c0..f001e68181 100755 --- a/tests/TestRunner/TestRunner.py +++ b/tests/TestRunner/TestRunner.py @@ -390,7 +390,11 @@ def run_simulation_test(basedir, options): os.remove(trace) if options.keep_simdirs == 'NONE' or options.keep_simdirs == 'FAILED' and res: print("Delete {}".format(os.path.join(wd, 'simfdb'))) - shutil.rmtree(os.path.join(wd, 'simfdb')) + # Don't fail if the directory doesn't exist. + try: + shutil.rmtree(os.path.join(wd, 'simfdb')) + except FileNotFoundError: + pass if len(os.listdir(wd)) == 0: print("Delete {} - empty".format(wd)) os.rmdir(wd) From fe936207a92272589cb0d00314c2b027399ad9a4 Mon Sep 17 00:00:00 2001 From: RenxuanW Date: Tue, 15 Jun 2021 13:38:14 -0700 Subject: [PATCH 19/30] Replace lower priority txn request when limit is hit. --- fdbclient/NativeAPI.actor.cpp | 3 -- fdbserver/GrvProxyServer.actor.cpp | 50 +++++++++++++++++++++++++----- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 3f6ed7530b..30f48b06f7 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -4976,9 +4976,6 @@ ACTOR Future extractReadVersion(Location location, if (trLogInfo) trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V3( startTime, cx->clientLocality.dcId(), latency, priority, rep.version)); - if (rep.version == 1 && rep.locked) { - throw proxy_memory_limit_exceeded(); - } if (rep.locked && !lockAware) throw database_locked(); diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index d2389c2b2d..04bc8dec96 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -346,6 +346,26 @@ ACTOR Future getRate(UID myID, } } +// Respond with an error to the GetReadVersion request when the GRV limit is hit. +void proxyGRVThresholdExceeded(const GetReadVersionRequest* req, GrvProxyStats* stats) { + ++stats->txnRequestErrors; + req->reply.sendError(proxy_memory_limit_exceeded()); + if (req->priority == TransactionPriority::IMMEDIATE) { + TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededSystem").suppressFor(60); + } else if (req->priority == TransactionPriority::DEFAULT) { + TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededDefault").suppressFor(60); + } else { + TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceededBatch").suppressFor(60); + } +} + +// Drop a GetReadVersion request from a queue, by responding an error to the request. +void dropRequestFromQueue(Deque* queue, GrvProxyStats* stats) { + proxyGRVThresholdExceeded(&queue->front(), stats); + queue->pop_front(); +} + +// Put a GetReadVersion request into the queue corresponding to its priority. ACTOR Future queueGetReadVersionRequests(Reference> db, SpannedDeque* systemQueue, SpannedDeque* defaultQueue, @@ -361,16 +381,30 @@ ACTOR Future queueGetReadVersionRequests(Reference> loop choose { when(GetReadVersionRequest req = waitNext(readVersionRequests)) { // WARNING: this code is run at a high priority, so it needs to do as little work as possible + bool canBeQueued = true; if (stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() > SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE) { - ++stats->txnRequestErrors; - // FIXME: send an error instead of giving an unreadable version when the client can support the error: - // req.reply.sendError(proxy_memory_limit_exceeded()); - GetReadVersionReply rep; - rep.version = 1; - rep.locked = true; - req.reply.send(rep); - TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60); + // When the limit is hit, try to drop requests from the lower priority queues. + if (req.priority == TransactionPriority::BATCH) { + canBeQueued = false; + } else if (req.priority == TransactionPriority::DEFAULT) { + if (!batchQueue->empty()) { + dropRequestFromQueue(batchQueue, stats); + } else { + canBeQueued = false; + } + } else { + if (!batchQueue->empty()) { + dropRequestFromQueue(batchQueue, stats); + } else if (!defaultQueue->empty()) { + dropRequestFromQueue(defaultQueue, stats); + } else { + canBeQueued = false; + } + } + } + if (!canBeQueued) { + proxyGRVThresholdExceeded(&req, stats); } else { stats->addRequest(req.transactionCount); // TODO: check whether this is reasonable to do in the fast path From 8f3e8ecf101cb7e7514fafd85a1bf9663a1567b0 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Tue, 15 Jun 2021 21:47:16 +0000 Subject: [PATCH 20/30] add injected fault check --- tests/TestRunner/TestRunner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/TestRunner/TestRunner.py b/tests/TestRunner/TestRunner.py index 207bec08c0..ba48720369 100755 --- a/tests/TestRunner/TestRunner.py +++ b/tests/TestRunner/TestRunner.py @@ -117,7 +117,7 @@ class LogParser: continue if 'Type' not in obj: continue - if obj['Severity'] == '40': + if obj['Severity'] == '40' and obj.get('ErrorIsInjectedFault', None) != '1': self.fail() if self.name is not None: obj['testname'] = self.name From c6f47c1f158689e5aa10acaedf191dccef92491c Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 15 Jun 2021 16:00:19 -0600 Subject: [PATCH 21/30] Fixed segfault with simple mode --- fdbserver/SimulatedCluster.actor.cpp | 149 ++++++++++++++++++++------- 1 file changed, 114 insertions(+), 35 deletions(-) diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 56614fdd2d..0121a22338 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -53,55 +53,126 @@ const int MACHINE_REBOOT_TIME = 10; bool destructed = false; +template +struct concatenate_variant_t; + +template +struct concatenate_variant_t, Args2...> { + using type = std::variant; +}; + +template +using concatenate_variant = typename concatenate_variant_t::type; + +template +struct variant_with_optional_t; + +template +struct variant_with_optional_t { + using type = std::variant>; +}; + +template +struct variant_with_optional_t { + using type = concatenate_variant::type, Head, Optional>; +}; + +template +using variant_with_optional = typename variant_with_optional_t::type; + +template +struct add_pointers_t; + +template +struct add_pointers_t> { + using type = std::variant; +}; + +template +using add_pointers = typename add_pointers_t::type; + // Configuration details specified in workload test files that change the simulation // environment details class TestConfig { class ConfigBuilder { using value_type = toml::basic_value; - std::unordered_map> confMap; + using types = add_pointers>>; + std::unordered_map confMap; + + struct visitor { + const value_type& value; + visitor(const value_type& v) : value(v) {} + void operator()(int* val) const { *val = value.as_integer(); } + void operator()(Optional* val) const { *val = value.as_integer(); } + void operator()(bool* val) const { *val = value.as_boolean(); } + void operator()(Optional* val) const { *val = value.as_boolean(); } + void operator()(std::string* val) const { *val = value.as_string(); } + void operator()(Optional* val) const { *val = value.as_string(); } + void operator()(std::vector* val) const { + auto arr = value.as_array(); + for (const auto& i : arr) { + val->emplace_back(i.as_integer()); + } + } + void operator()(Optional>* val) const { + std::vector res; + (*this)(&res); + *val = std::move(res); + } + }; + + struct trace_visitor { + std::string key; + TraceEvent& evt; + trace_visitor(std::string const& key, TraceEvent& e) : key("Key" + key), evt(e) {} + void operator()(int* val) const { evt.detail(key.c_str(), *val); } + void operator()(Optional* val) const { evt.detail(key.c_str(), *val); } + void operator()(bool* val) const { evt.detail(key.c_str(), *val); } + void operator()(Optional* val) const { evt.detail(key.c_str(), *val); } + void operator()(std::string* val) const { evt.detail(key.c_str(), *val); } + void operator()(Optional* val) const { evt.detail(key.c_str(), *val); } + void operator()(std::vector* val) const { + if (val->empty()) { + evt.detail(key.c_str(), "[]"); + return; + } + std::stringstream value; + value << "[" << val->at(0); + for (int i = 1; i < val->size(); ++i) { + value << "," << val->at(i); + } + value << "]"; + evt.detail(key.c_str(), value.str()); + } + void operator()(Optional>* val) const { + std::vector res; + (*this)(&res); + *val = std::move(res); + } + }; public: - ConfigBuilder& add(std::string_view key, int* value) { - confMap.emplace(key, [value](value_type const& v) { *value = v.as_integer(); }); + ~ConfigBuilder() { + TraceEvent evt("SimulatorConfigFromToml"); + for (const auto& p : confMap) { + std::visit(trace_visitor(std::string(p.first), evt), p.second); + } + } + + template + ConfigBuilder& add(std::string_view key, V value) { + confMap.emplace(key, value); return *this; } - ConfigBuilder& add(std::string_view key, Optional* value) { - confMap.emplace(key, [value](value_type const& v) { *value = v.as_integer(); }); - return *this; - } - ConfigBuilder& add(std::string_view key, bool* value) { - confMap.emplace(key, [value](value_type const& v) { *value = v.as_boolean(); }); - return *this; - } - ConfigBuilder& add(std::string_view key, Optional* value) { - confMap.emplace(key, [value](value_type const& v) { *value = v.as_boolean(); }); - return *this; - } - ConfigBuilder& add(std::string_view key, std::string* value) { - confMap.emplace(key, [value](value_type const& v) { *value = v.as_string(); }); - return *this; - } - ConfigBuilder& add(std::string_view key, Optional* value) { - confMap.emplace(key, [value](value_type const& v) { *value = v.as_string(); }); - return *this; - } - ConfigBuilder& add(std::string_view key, std::vector* value) { - confMap.emplace(key, [value](value_type const& v) { - auto arr = v.as_array(); - for (const auto& i : arr) { - value->push_back(i.as_integer()); - } - }); - return *this; - } - void set(std::string const& key, value_type const& val) { + + void set(std::string_view key, const value_type& value) { auto iter = confMap.find(key); if (iter == confMap.end()) { std::cerr << "Unknown configuration attribute " << key << std::endl; - TraceEvent("UnknownConfigurationAttribute").detail("Name", key); + TraceEvent("UnknownConfigurationAttribute").detail("Name", std::string(key)); throw unknown_error(); } - iter->second(val); + std::visit(visitor(value), iter->second); } }; @@ -274,6 +345,13 @@ public: TraceEvent("TOMLParseError").detail("Error", printable(e.what())); throw unknown_error(); } + // Verify that we can use the passed config + if (simpleConfig) { + if (minimumRegions > 1) { + TraceEvent("ElapsedTime").detail("SimTime", now()).detail("RealTime", 0).detail("RandomUnseed", 0); + flushAndExit(0); + } + } } }; @@ -1738,6 +1816,7 @@ void setupSimulatedSystem(vector>* systemActors, } } + ASSERT(coordinatorAddresses.size() > 0); deterministicRandom()->randomShuffle(coordinatorAddresses); for (int i = 0; i < (coordinatorAddresses.size() / 2) + 1; i++) { TraceEvent("ProtectCoordinator") From b2271f21768b230845bd6169414ee4f0620773c6 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 15 Jun 2021 16:00:28 -0600 Subject: [PATCH 22/30] additional tracing for quietDatabase --- fdbserver/QuietDatabase.actor.cpp | 125 +++++++++++++++++++++++------- 1 file changed, 95 insertions(+), 30 deletions(-) diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index c475e505ba..8f05a0d054 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -284,6 +284,52 @@ ACTOR Future> getStorageWorkers(Database cx, return result; } +int64_t extractMaxQueueSize(const std::vector>& messages, + const std::vector& servers) { + + int64_t maxQueueSize = 0; + UID maxQueueServer; + + for (int i = 0; i < messages.size(); i++) { + try { + auto queueSize = getQueueSize(messages[i].get()); + if (queueSize > maxQueueSize) { + maxQueueSize = queueSize; + maxQueueServer = servers[i].id(); + } + } catch (Error& e) { + TraceEvent("QuietDatabaseFailure") + .detail("Reason", "Failed to extract MaxStorageServerQueue") + .detail("SS", servers[i].id()); + for (auto& m : messages) { + TraceEvent("Messages").detail("Info", m.get().toString()); + } + throw; + } + } + + TraceEvent("QuietDatabaseGotMaxStorageServerQueueSize") + .detail("Stage", "MaxComputed") + .detail("Max", maxQueueSize) + .detail("MaxQueueServer", format("%016" PRIx64, maxQueueServer.first())); + return maxQueueSize; +} + +ACTOR Future getStorageMetricsTimeout(UID storage, WorkerInterface wi) { + state Future result = + wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics"))); + state Future timeout = delay(1.0); + choose { + when(TraceEventFields res = wait(result)) { return res; } + when(wait(timeout)) { + TraceEvent("QuietDatabaseFailure") + .detail("Reason", "Could not fetch StorageMetrics") + .detail("Storage", format("%016" PRIx64, storage.first())); + throw timed_out(); + } + } +}; + // Gets the maximum size of all the storage server queues ACTOR Future getMaxStorageServerQueueSize(Database cx, Reference> dbInfo) { TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers"); @@ -311,9 +357,7 @@ ACTOR Future getMaxStorageServerQueueSize(Database cx, ReferenceisSimulated() || g_simulator.tssMode != ISimulator::TSSMode::EnabledAddDelay) { - messages.push_back(timeoutError(itr->second.eventLogRequest.getReply(EventLogRequest( - StringRef(servers[i].id().toString() + "/StorageMetrics"))), - 1.0)); + messages.push_back(getStorageMetricsTimeout(servers[i].id(), itr->second)); } } @@ -321,23 +365,7 @@ ACTOR Future getMaxStorageServerQueueSize(Database cx, Reference waitForQuietDatabase(Database cx, int64_t maxPoppedVersionLag = 30e6) { state Future reconfig = reconfigureAfter(cx, 100 + (deterministicRandom()->random01() * 100), dbInfo, "QuietDatabase"); + state Future dataInFlight; + state Future> tLogQueueInfo; + state Future dataDistributionQueueSize; + state Future teamCollectionValid; + state Future storageQueueSize; + state Future dataDistributionActive; + state Future storageServersRecruiting; auto traceMessage = "QuietDatabase" + phase + "Begin"; TraceEvent(traceMessage.c_str()); @@ -613,15 +648,13 @@ ACTOR Future waitForQuietDatabase(Database cx, TraceEvent("QuietDatabaseGotDataDistributor", distributorUID) .detail("Locality", distributorWorker.locality.toString()); - state Future dataInFlight = getDataInFlight(cx, distributorWorker); - state Future> tLogQueueInfo = getTLogQueueInfo(cx, dbInfo); - state Future dataDistributionQueueSize = - getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0); - state Future teamCollectionValid = getTeamCollectionValid(cx, distributorWorker); - state Future storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo); - state Future dataDistributionActive = getDataDistributionActive(cx, distributorWorker); - state Future storageServersRecruiting = - getStorageServersRecruiting(cx, distributorWorker, distributorUID); + dataInFlight = getDataInFlight(cx, distributorWorker); + tLogQueueInfo = getTLogQueueInfo(cx, dbInfo); + dataDistributionQueueSize = getDataDistributionQueueSize(cx, distributorWorker, dataInFlightGate == 0); + teamCollectionValid = getTeamCollectionValid(cx, distributorWorker); + storageQueueSize = getMaxStorageServerQueueSize(cx, dbInfo); + dataDistributionActive = getDataDistributionActive(cx, distributorWorker); + storageServersRecruiting = getStorageServersRecruiting(cx, distributorWorker, distributorUID); wait(success(dataInFlight) && success(tLogQueueInfo) && success(dataDistributionQueueSize) && success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) && @@ -661,6 +694,7 @@ ACTOR Future waitForQuietDatabase(Database cx, } } } catch (Error& e) { + TraceEvent(("QuietDatabase" + phase + "Error").c_str()).error(e, true); if (e.code() != error_code_actor_cancelled && e.code() != error_code_attribute_not_found && e.code() != error_code_timed_out) TraceEvent(("QuietDatabase" + phase + "Error").c_str()).error(e); @@ -670,7 +704,38 @@ ACTOR Future waitForQuietDatabase(Database cx, if (e.code() != error_code_attribute_not_found && e.code() != error_code_timed_out) throw; - TraceEvent(("QuietDatabase" + phase + "Retry").c_str()).error(e); + auto evtType = "QuietDatabase" + phase + "Retry"; + TraceEvent evt(evtType.c_str()); + evt.error(e); + int notReadyCount = 0; + if (dataInFlight.isReady() && dataInFlight.isError()) { + auto key = "NotReady" + std::to_string(notReadyCount++); + evt.detail(key.c_str(), "dataInFlight"); + } + if (tLogQueueInfo.isReady() && tLogQueueInfo.isError()) { + auto key = "NotReady" + std::to_string(notReadyCount++); + evt.detail(key.c_str(), "tLogQueueInfo"); + } + if (dataDistributionQueueSize.isReady() && dataDistributionQueueSize.isError()) { + auto key = "NotReady" + std::to_string(notReadyCount++); + evt.detail(key.c_str(), "dataDistributionQueueSize"); + } + if (teamCollectionValid.isReady() && teamCollectionValid.isError()) { + auto key = "NotReady" + std::to_string(notReadyCount++); + evt.detail(key.c_str(), "teamCollectionValid"); + } + if (storageQueueSize.isReady() && storageQueueSize.isError()) { + auto key = "NotReady" + std::to_string(notReadyCount++); + evt.detail(key.c_str(), "storageQueueSize"); + } + if (dataDistributionActive.isReady() && dataDistributionActive.isError()) { + auto key = "NotReady" + std::to_string(notReadyCount++); + evt.detail(key.c_str(), "dataDistributionActive"); + } + if (storageServersRecruiting.isReady() && storageServersRecruiting.isError()) { + auto key = "NotReady" + std::to_string(notReadyCount++); + evt.detail(key.c_str(), "storageServersRecruiting"); + } wait(delay(1.0)); numSuccesses = 0; } From dc17be093c99464c4b6f2067d4077e66590ab0c8 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 15 Jun 2021 16:00:43 -0600 Subject: [PATCH 23/30] don't allow double recruitement for storages --- fdbserver/worker.actor.cpp | 43 +++++++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index e45a1d6617..a2ab8f8ce2 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -743,7 +743,21 @@ ACTOR Future monitorHighMemory(int64_t threshold) { return Void(); } -ACTOR Future storageServerRollbackRebooter(Future prevStorageServer, +struct TrackRunningStorage { + UID self; + KeyValueStoreType storeType; + std::set>* runningStorages; + TrackRunningStorage(UID self, + KeyValueStoreType storeType, + std::set>* runningStorages) + : self(self), storeType(storeType), runningStorages(runningStorages) { + runningStorages->emplace(self, storeType); + } + ~TrackRunningStorage() { runningStorages->erase(std::make_pair(self, storeType)); }; +}; + +ACTOR Future storageServerRollbackRebooter(std::set>* runningStorages, + Future prevStorageServer, KeyValueStoreType storeType, std::string filename, UID id, @@ -754,6 +768,7 @@ ACTOR Future storageServerRollbackRebooter(Future prevStorageServer, ActorCollection* filesClosed, int64_t memoryLimit, IKeyValueStore* store) { + state TrackRunningStorage _(id, storeType, runningStorages); loop { ErrorOr e = wait(errorOr(prevStorageServer)); if (!e.isError()) @@ -999,6 +1014,13 @@ struct SharedLogsValue { : actor(actor), uid(uid), requests(requests) {} }; +ACTOR template +Future sendDelayedError(ReplyPromise reply, Error e, double d) { + wait(delay(d)); + reply.sendError(e); + return Void(); +} + ACTOR Future workerServer(Reference connFile, Reference>> ccInterface, LocalityData locality, @@ -1038,6 +1060,7 @@ ACTOR Future workerServer(Reference connFile, state std::string coordFolder = abspath(_coordFolder); state WorkerInterface interf(locality); + state std::set> runningStorages; interf.initEndpoints(); state Reference>> issues(new AsyncVar>()); @@ -1150,7 +1173,8 @@ ACTOR Future workerServer(Reference connFile, Future f = storageServer(kv, recruited, dbInfo, folder, recovery, connFile); recoveries.push_back(recovery.getFuture()); f = handleIOErrors(f, kv, s.storeID, kvClosed); - f = storageServerRollbackRebooter(f, + f = storageServerRollbackRebooter(&runningStorages, + f, s.storeType, s.filename, recruited.id(), @@ -1512,7 +1536,11 @@ ACTOR Future workerServer(Reference connFile, activeSharedTLog->set(logData.uid); } when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) { - if (!storageCache.exists(req.reqId)) { + if (!storageCache.exists(req.reqId) && + (std::all_of(runningStorages.begin(), + runningStorages.end(), + [&req](const auto& p) { return p.second != req.storeType; }) || + req.seedTag != invalidTag)) { bool isTss = req.tssPairIDAndVersion.present(); @@ -1561,7 +1589,8 @@ ACTOR Future workerServer(Reference connFile, folder); s = handleIOErrors(s, data, recruited.id(), kvClosed); s = storageCache.removeOnReady(req.reqId, s); - s = storageServerRollbackRebooter(s, + s = storageServerRollbackRebooter(&runningStorages, + s, req.storeType, filename, recruited.id(), @@ -1573,8 +1602,12 @@ ACTOR Future workerServer(Reference connFile, memoryLimit, data); errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s)); - } else + } else if (storageCache.exists(req.reqId)) { forwardPromise(req.reply, storageCache.get(req.reqId)); + } else { + TraceEvent("AttemptedDoubleRecruitement", interf.id()).detail("ForRole", "StorageServer"); + errorForwarders.add(sendDelayedError(req.reply, recruitment_failed(), 0.5)); + } } when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) { CommitProxyInterface recruited; From 56eaf1bc839f2d58fa81b97f39677749dfd1dade Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 15 Jun 2021 16:49:27 -0600 Subject: [PATCH 24/30] added comments --- fdbserver/QuietDatabase.actor.cpp | 5 +++++ fdbserver/SimulatedCluster.actor.cpp | 11 +++++++---- fdbserver/worker.actor.cpp | 6 ++++++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 8f05a0d054..16598d6639 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -284,6 +284,8 @@ ACTOR Future> getStorageWorkers(Database cx, return result; } +// Helper function to extract he maximum SQ size of all provided messages. All futures in the +// messages vector have to be ready. int64_t extractMaxQueueSize(const std::vector>& messages, const std::vector& servers) { @@ -315,6 +317,7 @@ int64_t extractMaxQueueSize(const std::vector>& message return maxQueueSize; } +// Timeout wrapper when getting the storage metrics. This will do some additional tracing ACTOR Future getStorageMetricsTimeout(UID storage, WorkerInterface wi) { state Future result = wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics"))); @@ -608,6 +611,8 @@ ACTOR Future reconfigureAfter(Database cx, return Void(); } +// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This +// requires the database to be available and healthy in order to succeed. ACTOR Future waitForQuietDatabase(Database cx, Reference> dbInfo, std::string phase, diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 0121a22338..ce0af1d345 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -53,17 +53,19 @@ const int MACHINE_REBOOT_TIME = 10; bool destructed = false; +// given a std::variant T, this templated class will define type which will be a variant of all types in T plus Args... template -struct concatenate_variant_t; +struct variant_add_t; template -struct concatenate_variant_t, Args2...> { +struct variant_add_t, Args2...> { using type = std::variant; }; template -using concatenate_variant = typename concatenate_variant_t::type; +using variant_add = typename variant_add_t::type; +// variant_with_optional_t::type will be a std::variant...> template struct variant_with_optional_t; @@ -74,12 +76,13 @@ struct variant_with_optional_t { template struct variant_with_optional_t { - using type = concatenate_variant::type, Head, Optional>; + using type = variant_add::type, Head, Optional>; }; template using variant_with_optional = typename variant_with_optional_t::type; +// Expects a std::variant and will make a pointer out of each argument template struct add_pointers_t; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index a2ab8f8ce2..bea37b70fa 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1536,6 +1536,12 @@ ACTOR Future workerServer(Reference connFile, activeSharedTLog->set(logData.uid); } when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) { + // We want to prevent double recruiting on a worker unless we try to recruit something + // with a different storage engine (otherwise storage migration won't work for certain + // configuration). Additionally we also need to allow double recruitment for seed servers. + // The reason for this is that a storage will only remove itself if after it was able + // to read the system key space. But if recovery fails right after a `configure new ...` + // was run it won't be able to do so. if (!storageCache.exists(req.reqId) && (std::all_of(runningStorages.begin(), runningStorages.end(), From 5cd029967e81d78cf3cd7525100815c6111ac2c1 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 15 Jun 2021 17:37:05 -0600 Subject: [PATCH 25/30] minor simplification (address review) --- fdbserver/worker.actor.cpp | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index bea37b70fa..d775d09695 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1014,13 +1014,6 @@ struct SharedLogsValue { : actor(actor), uid(uid), requests(requests) {} }; -ACTOR template -Future sendDelayedError(ReplyPromise reply, Error e, double d) { - wait(delay(d)); - reply.sendError(e); - return Void(); -} - ACTOR Future workerServer(Reference connFile, Reference>> ccInterface, LocalityData locality, @@ -1612,7 +1605,10 @@ ACTOR Future workerServer(Reference connFile, forwardPromise(req.reply, storageCache.get(req.reqId)); } else { TraceEvent("AttemptedDoubleRecruitement", interf.id()).detail("ForRole", "StorageServer"); - errorForwarders.add(sendDelayedError(req.reply, recruitment_failed(), 0.5)); + errorForwarders.add(map(delay(0.5), [reply = req.reply](Void) { + reply.sendError(recruitment_failed()); + return Void(); + })); } } when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) { From efba472d9c9ddaed022ea7aee389b170b1a8b77d Mon Sep 17 00:00:00 2001 From: negoyal Date: Tue, 15 Jun 2021 18:25:13 -0700 Subject: [PATCH 26/30] Fix a heap use after free issue. --- fdbserver/VersionedBTree.actor.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index e3a18d26af..c988b8904d 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -820,20 +820,21 @@ public: if (mode == POP) { --queue->numPages; } - page.clear(); - debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n", toString().c_str()); if (mode == POP && !queue->usesExtents) { // Freeing the old page must happen after advancing the cursor and clearing the page reference // because freePage() could cause a push onto a queue that causes a newPageID() call which could // pop() from this very same queue. Queue pages are freed at version 0 because they can be reused // after the next commit. + page.clear(); queue->pager->freePage(oldPageID, 0); } else if (queue->usesExtents && (p->extentCurPageID == p->extentEndPageID)) { + page.clear(); // Figure out the beginning of the extent int pagesPerExtent = queue->pagesPerExtent; queue->pager->freeExtent(oldPageID - pagesPerExtent + 1); } + debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n", toString().c_str()); } debug_printf("FIFOQueue(%s) %s(upperBound=%s) -> %s\n", From fdd9c3079493c7d53351f802d4ae6e0304dfdcb9 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Wed, 16 Jun 2021 05:30:58 +0000 Subject: [PATCH 27/30] code refactor;change stopSignal; --- fdbserver/DataDistribution.actor.cpp | 43 ++++++++++++++++------------ 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 644e3c414f..121409337f 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3936,12 +3936,17 @@ ACTOR Future updateNextWigglingStoragePID(DDTeamCollection* teamCollection // Iterate over each storage process to do storage wiggle. After initializing the first Process ID, it waits a signal // from `perpetualStorageWiggler` indicating the wiggling of current process is finished. Then it writes the next // Process ID to a system key: `wigglingStorageServerKey` to show the next process to wiggle. -ACTOR Future perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, +ACTOR Future perpetualStorageWiggleIterator(AsyncVar* stopSignal, FutureStream finishStorageWiggleSignal, DDTeamCollection* teamCollection) { - loop choose { - when(wait(stopSignal->onTrigger())) { break; } - when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); } + loop { + choose { + when(wait(stopSignal->onChange())) {} + when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); } + } + if (stopSignal->get()) { + break; + } } return Void(); @@ -3993,7 +3998,7 @@ ACTOR Future clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self) // Process Id is “pid” into excludeServers which prevent recruiting the wiggling storage servers and let teamTracker // start to move data off the affected teams. The wiggling process of current storage servers will be paused if the // cluster is unhealthy and restarted once the cluster is healthy again. -ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, +ACTOR Future perpetualStorageWiggler(AsyncVar* stopSignal, PromiseStream finishStorageWiggleSignal, DDTeamCollection* self, const DDEnabledState* ddEnabledState) { @@ -4033,22 +4038,21 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, } else { TEST(true); // skip wiggling current process TraceEvent("PerpetualStorageWiggleSkip", self->distributorId).detail("ProcessId", pid.toString()); - - self->wigglingPid.reset(); - watchFuture = res.first; - finishStorageWiggleSignal.send(Void()); + moveFinishFuture = Void(); } } } choose { - when(wait(stopSignal->onTrigger())) { break; } when(wait(watchFuture)) { ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished watchFuture = Never(); // read new pid and set the next watch Future wait(store(res, watchPerpetualStoragePIDChange(self))); self->wigglingPid = Optional(res.second); + + // random delay + wait(delayJittered(5.0, TaskPriority::DataDistributionLow)); } when(wait(moveFinishFuture)) { TEST(true); // finish wiggling this process @@ -4065,8 +4069,11 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, watchFuture = res.first; finishStorageWiggleSignal.send(Void()); } - when(wait(ddQueueCheck)) {} - when(wait(self->pauseWiggle->onChange())) {} + when(wait(ddQueueCheck || self->pauseWiggle->onChange() || stopSignal->onChange())) {} + } + + if (stopSignal->get()) { + break; } } @@ -4086,10 +4093,9 @@ ACTOR Future perpetualStorageWiggler(AsyncTrigger* stopSignal, ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection, const DDEnabledState* ddEnabledState) { state int speed = 0; - state AsyncTrigger stopWiggleSignal; + state AsyncVar stopWiggleSignal(false); state PromiseStream finishStorageWiggleSignal; state SignalableActorCollection collection; - state bool started = false; teamCollection->pauseWiggle = makeReference>(true); loop { @@ -4106,18 +4112,17 @@ ACTOR Future monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio wait(tr.commit()); ASSERT(speed == 1 || speed == 0); - if (speed == 1 && !started) { + if (speed == 1 && stopWiggleSignal.get()) { // avoid duplicated start + stopWiggleSignal.set(false); collection.add(perpetualStorageWiggleIterator( &stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection)); collection.add(perpetualStorageWiggler( &stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState)); TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId); - started = true; - } else if (speed == 0 && started) { - stopWiggleSignal.trigger(); + } else if (speed == 0 && !stopWiggleSignal.get()) { + stopWiggleSignal.set(true); wait(collection.signalAndReset()); TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId); - started = false; teamCollection->pauseWiggle->set(true); } wait(watchFuture); From 81202be424ae5dff5be8831316c6b246d19c73af Mon Sep 17 00:00:00 2001 From: negoyal Date: Wed, 16 Jun 2021 01:09:58 -0700 Subject: [PATCH 28/30] Better fix. --- fdbserver/VersionedBTree.actor.cpp | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index c988b8904d..0509b39f78 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -809,6 +809,8 @@ public: if (offset == p->endOffset) { debug_printf("FIFOQueue::Cursor(%s) Page exhausted\n", toString().c_str()); LogicalPageID oldPageID = pageID; + LogicalPageID extentCurPageID = p->extentCurPageID; + LogicalPageID extentEndPageID = p->extentEndPageID; pageID = p->nextPageID; offset = p->nextOffset; @@ -821,20 +823,21 @@ public: --queue->numPages; } - if (mode == POP && !queue->usesExtents) { - // Freeing the old page must happen after advancing the cursor and clearing the page reference - // because freePage() could cause a push onto a queue that causes a newPageID() call which could - // pop() from this very same queue. Queue pages are freed at version 0 because they can be reused - // after the next commit. - page.clear(); - queue->pager->freePage(oldPageID, 0); - } else if (queue->usesExtents && (p->extentCurPageID == p->extentEndPageID)) { - page.clear(); - // Figure out the beginning of the extent - int pagesPerExtent = queue->pagesPerExtent; - queue->pager->freeExtent(oldPageID - pagesPerExtent + 1); - } + page.clear(); debug_printf("FIFOQueue::Cursor(%s) readNext page exhausted, moved to new page\n", toString().c_str()); + if (mode == POP) { + if(!queue->usesExtents) { + // Freeing the old page must happen after advancing the cursor and clearing the page reference + // because freePage() could cause a push onto a queue that causes a newPageID() call which could + // pop() from this very same queue. Queue pages are freed at version 0 because they can be reused + // after the next commit. + queue->pager->freePage(oldPageID, 0); + } else if (queue->usesExtents && (extentCurPageID == extentEndPageID)) { + // Figure out the beginning of the extent + int pagesPerExtent = queue->pagesPerExtent; + queue->pager->freeExtent(oldPageID - pagesPerExtent + 1); + } + } } debug_printf("FIFOQueue(%s) %s(upperBound=%s) -> %s\n", From 9a24a57007ffcea14030d9d78096f4b4e3312654 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 16 Jun 2021 10:46:21 -0600 Subject: [PATCH 29/30] refactor templates to make them more readable and reusable --- fdbserver/SimulatedCluster.actor.cpp | 46 ++--------------------- flow/CMakeLists.txt | 1 + flow/TypeTraits.h | 55 ++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 42 deletions(-) create mode 100644 flow/TypeTraits.h diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index ce0af1d345..3286e99aa7 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -39,6 +39,7 @@ #include "fdbclient/versions.h" #include "flow/ProtocolVersion.h" #include "flow/network.h" +#include "flow/TypeTraits.h" #include "flow/actorcompiler.h" // This must be the last #include. #undef max @@ -53,53 +54,14 @@ const int MACHINE_REBOOT_TIME = 10; bool destructed = false; -// given a std::variant T, this templated class will define type which will be a variant of all types in T plus Args... -template -struct variant_add_t; - -template -struct variant_add_t, Args2...> { - using type = std::variant; -}; - -template -using variant_add = typename variant_add_t::type; - -// variant_with_optional_t::type will be a std::variant...> -template -struct variant_with_optional_t; - -template -struct variant_with_optional_t { - using type = std::variant>; -}; - -template -struct variant_with_optional_t { - using type = variant_add::type, Head, Optional>; -}; - -template -using variant_with_optional = typename variant_with_optional_t::type; - -// Expects a std::variant and will make a pointer out of each argument -template -struct add_pointers_t; - -template -struct add_pointers_t> { - using type = std::variant; -}; - -template -using add_pointers = typename add_pointers_t::type; - // Configuration details specified in workload test files that change the simulation // environment details class TestConfig { class ConfigBuilder { using value_type = toml::basic_value; - using types = add_pointers>>; + using base_variant = std::variant>; + using types = + variant_map>, std::add_pointer_t>; std::unordered_map confMap; struct visitor { diff --git a/flow/CMakeLists.txt b/flow/CMakeLists.txt index c928aac757..09a81ee0c6 100644 --- a/flow/CMakeLists.txt +++ b/flow/CMakeLists.txt @@ -67,6 +67,7 @@ set(FLOW_SRCS Tracing.h Tracing.actor.cpp TreeBenchmark.h + TypeTraits.h UnitTest.cpp UnitTest.h XmlTraceLogFormatter.cpp diff --git a/flow/TypeTraits.h b/flow/TypeTraits.h new file mode 100644 index 0000000000..9552f5f34e --- /dev/null +++ b/flow/TypeTraits.h @@ -0,0 +1,55 @@ +/* + * TypeTraits.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file, similar to `type_traits` in the standard library, contains utility types that can be used for template +// metaprogramming. While they can be very useful and simplify certain things, please be aware that their use will +// increase compilation times significantly. Therefore it is not recommended to use them in header file if not +// absosultely necessary. +#pragma once +#include + +// This type class will take two std::variant types and concatenate them +template +struct variant_concat_t; + +template +struct variant_concat_t, std::variant> { + using type = std::variant; +}; + +// Helper definition for variant_concat_t. Instead of using `typename variant_concat_t<...>::type` one can simply use +// `variant_concat<...>` +template +using variant_concat = typename variant_concat_t::type; + +// Takes a std::variant as first argument and applies Fun to all of them. For example: typename +// variant_map_t, std::add_pointer_t>::type will be defined as std::variant +template class Fun> +struct variant_map_t; + +template class Fun> +struct variant_map_t, Fun> { + using type = std::variant...>; +}; + +// Helper definition for variant_map_t. Instead of using `typename variant_map<...>::type` one can simple use +// `varirant_map<...>` which is equivalent but shorter. +template class Fun> +using variant_map = typename variant_map_t::type; From 82bdc3593125a6ce1866d77a3d16462a637b21c8 Mon Sep 17 00:00:00 2001 From: negoyal Date: Wed, 16 Jun 2021 10:00:27 -0700 Subject: [PATCH 30/30] Remove redundant check. --- fdbserver/VersionedBTree.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 0509b39f78..31f74084a4 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -832,7 +832,7 @@ public: // pop() from this very same queue. Queue pages are freed at version 0 because they can be reused // after the next commit. queue->pager->freePage(oldPageID, 0); - } else if (queue->usesExtents && (extentCurPageID == extentEndPageID)) { + } else if (extentCurPageID == extentEndPageID) { // Figure out the beginning of the extent int pagesPerExtent = queue->pagesPerExtent; queue->pager->freeExtent(oldPageID - pagesPerExtent + 1);