diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index 87c5d04c7a..bf6af3aab7 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -357,6 +357,13 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db, .extractPtr()); } +// Get network thread busyness (updated every 1s) +// A value of 0 indicates that the client is more or less idle +// A value of 1 (or more) indicates that the client is saturated +extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d) { + return DB(d)->getMainThreadBusyness(); +} + extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) { try { TXN(tr)->delref(); diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index ca046db699..2086cbd775 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -187,6 +187,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_create_snapshot(FDBDatabase uint8_t const* snap_command, int snap_command_length); +DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db); + DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr); DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr); diff --git a/bindings/c/test/unit/unit_tests.cpp b/bindings/c/test/unit/unit_tests.cpp index c92430f95e..f3f97476c2 100644 --- a/bindings/c/test/unit/unit_tests.cpp +++ b/bindings/c/test/unit/unit_tests.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #define DOCTEST_CONFIG_IMPLEMENT #include "doctest.h" @@ -2126,6 +2127,24 @@ TEST_CASE("block_from_callback") { context.event.wait(); } +// monitors network busyness for 2 sec (40 readings) +TEST_CASE("monitor_network_busyness") { + bool containsGreaterZero = false; + for (int i = 0; i < 40; i++) { + double busyness = fdb_database_get_main_thread_busyness(db); + // make sure the busyness is between 0 and 1 + CHECK(busyness >= 0); + CHECK(busyness <= 1); + if (busyness > 0) { + containsGreaterZero = true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + // assert that at least one of the busyness readings was greater than 0 + CHECK(containsGreaterZero); +} + int main(int argc, char** argv) { if (argc != 3 && argc != 4) { std::cout << "Unit tests for the FoundationDB C API.\n" diff --git a/documentation/sphinx/source/api-c.rst b/documentation/sphinx/source/api-c.rst index 2c2e256884..0d02dc18dd 100644 --- a/documentation/sphinx/source/api-c.rst +++ b/documentation/sphinx/source/api-c.rst @@ -481,7 +481,11 @@ An |database-blurb1| Modifications to a database are performed via transactions. |length-of| ``snapshot_command`` .. note:: The function is exposing the functionality of the fdbcli command ``snapshot``. Please take a look at the documentation before using (see :ref:`disk-snapshot-backups`). - + +.. function:: double fdb_database_get_main_thread_busyness(FDBDatabase* database) + + Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated. By default, this value is updated every second. + Transaction =========== diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index 25f5098b0f..6f3ad07cd1 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -96,6 +96,7 @@ public: virtual Reference createTransaction() = 0; virtual void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) = 0; + virtual double getMainThreadBusyness() = 0; virtual void addref() = 0; virtual void delref() = 0; diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index 62aa32478c..bcca5ed166 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -38,6 +38,7 @@ void ClientKnobs::initialize(bool randomize) { init( TOO_MANY, 1000000 ); init( SYSTEM_MONITOR_INTERVAL, 5.0 ); + init( NETWORK_BUSYNESS_MONITOR_INTERVAL, 1.0 ); init( FAILURE_MAX_DELAY, 5.0 ); init( FAILURE_MIN_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 1.0; diff --git a/fdbclient/Knobs.h b/fdbclient/Knobs.h index 3c21e58bff..3d22b5a24b 100644 --- a/fdbclient/Knobs.h +++ b/fdbclient/Knobs.h @@ -30,6 +30,7 @@ public: int TOO_MANY; // FIXME: this should really be split up so we can control these more specifically double SYSTEM_MONITOR_INTERVAL; + double NETWORK_BUSYNESS_MONITOR_INTERVAL; // The interval in which we should update the network busyness metric double FAILURE_MAX_DELAY; double FAILURE_MIN_DELAY; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 0dd10a218c..ac1855c811 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -347,6 +347,15 @@ ThreadFuture DLDatabase::createSnapshot(const StringRef& uid, const String return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); }); } +// Get network thread busyness +double DLDatabase::getMainThreadBusyness() { + if (api->databaseGetMainThreadBusyness != nullptr) { + return api->databaseGetMainThreadBusyness(db); + } + + return 0; +} + // DLApi template void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) { @@ -360,6 +369,7 @@ void loadClientFunction(T* fp, void* lib, std::string libPath, const char* funct DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad) : api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {} +// Loads client API functions (definitions are in FdbCApi struct) void DLApi::init() { if (isLibraryLoaded(fdbCPath.c_str())) { throw external_client_already_loaded(); @@ -388,6 +398,11 @@ void DLApi::init() { loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction"); loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option"); + loadClientFunction(&api->databaseGetMainThreadBusyness, + lib, + fdbCPath, + "fdb_database_get_main_thread_busyness", + headerVersion >= 700); loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy"); loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700); loadClientFunction(&api->databaseForceRecoveryWithDataLoss, @@ -917,6 +932,15 @@ ThreadFuture MultiVersionDatabase::createSnapshot(const StringRef& uid, co return abortableFuture(f, dbState->dbVar->get().onChange); } +// Get network thread busyness +double MultiVersionDatabase::getMainThreadBusyness() { + if (dbState->db) { + return dbState->db->getMainThreadBusyness(); + } + + return 0; +} + void MultiVersionDatabase::Connector::connect() { addref(); onMainThreadVoid( diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 625eea061c..ea16f4f35e 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -80,6 +80,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int uidLength, uint8_t const* snapshotCommmand, int snapshotCommandLength); + double (*databaseGetMainThreadBusyness)(FDBDatabase* database); // Transaction fdb_error_t (*transactionSetOption)(FDBTransaction* tr, @@ -262,6 +263,7 @@ public: Reference createTransaction() override; void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; + double getMainThreadBusyness() override; void addref() override { ThreadSafeReferenceCounted::addref(); } void delref() override { ThreadSafeReferenceCounted::delref(); } @@ -422,6 +424,7 @@ public: Reference createTransaction() override; void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; + double getMainThreadBusyness() override; void addref() override { ThreadSafeReferenceCounted::addref(); } void delref() override { ThreadSafeReferenceCounted::delref(); } diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index a0ed70997c..8b55757621 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1743,6 +1743,30 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional valu } } +// update the network busyness on a 1s cadence +ACTOR Future monitorNetworkBusyness() { + state double prevTime = now(); + loop { + wait(delay(CLIENT_KNOBS->NETWORK_BUSYNESS_MONITOR_INTERVAL, TaskPriority::FlushTrace)); + double elapsed = now() - prevTime; // get elapsed time from last execution + prevTime = now(); + struct NetworkMetrics::PriorityStats& tracker = g_network->networkInfo.metrics.starvationTrackerNetworkBusyness; + + if (tracker.active) { // update metrics + tracker.duration += now() - tracker.windowedTimer; + tracker.maxDuration = std::max(tracker.maxDuration, now() - tracker.timer); + tracker.windowedTimer = now(); + } + + g_network->networkInfo.metrics.networkBusyness = + std::min(elapsed, tracker.duration) / elapsed; // average duration spent doing "work" + + tracker.duration = 0; + tracker.maxDuration = 0; + } +} + +// Setup g_network and start monitoring for network busyness void setupNetwork(uint64_t transportId, bool useMetrics) { if (g_network) throw network_already_setup(); @@ -1756,6 +1780,8 @@ void setupNetwork(uint64_t transportId, bool useMetrics) { g_network->addStopCallback(TLS::DestroyOpenSSLGlobalState); FlowTransport::createInstance(true, transportId); Net2FileSystem::newFileSystem(); + + uncancellable(monitorNetworkBusyness()); } void runNetwork() { diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 05a3c58a49..0e0877f9af 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -91,6 +91,12 @@ ThreadFuture ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons return onMainThread([db, snapUID, cmd]() -> Future { return db->createSnapshot(snapUID, cmd); }); } +// Return the main network thread busyness +double ThreadSafeDatabase::getMainThreadBusyness() { + ASSERT(g_network); + return g_network->networkInfo.metrics.networkBusyness; +} + ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) { ClusterConnectionFile* connFile = new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first); diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 4c7b26fb94..a62e503c11 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -35,6 +35,7 @@ public: Reference createTransaction() override; void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; + double getMainThreadBusyness() override; ThreadFuture onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 5026d6a982..bb0b0325c6 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -135,6 +135,12 @@ thread_local INetwork* thread_network = 0; class Net2 final : public INetwork, public INetworkConnections { +private: + void updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats, + TaskPriority priority, + TaskPriority lastPriority, + double now); + public: Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics); void initTLS(ETLSInitState targetState) override; @@ -1582,6 +1588,28 @@ void Net2::run() { #endif } +// Updates the PriorityStats found in NetworkMetrics +void Net2::updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats, + TaskPriority priority, + TaskPriority lastPriority, + double now) { + + // Busy -> idle at binStats.priority + if (binStats.priority > priority && binStats.priority <= lastPriority) { + binStats.active = false; + binStats.duration += now - binStats.windowedTimer; + binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer); + } + + // Idle -> busy at binStats.priority + else if (binStats.priority <= priority && binStats.priority > lastPriority) { + binStats.active = true; + binStats.timer = now; + binStats.windowedTimer = now; + } +} + +// Update both vectors of starvation trackers (one that updates every 5s and the other every 1s) void Net2::trackAtPriority(TaskPriority priority, double now) { if (lastPriorityStats == nullptr || priority != lastPriorityStats->priority) { // Start tracking current priority @@ -1601,22 +1629,12 @@ void Net2::trackAtPriority(TaskPriority priority, double now) { if (binStats.priority > lastPriority && binStats.priority > priority) { break; } - - // Busy -> idle at binStats.priority - if (binStats.priority > priority && binStats.priority <= lastPriority) { - binStats.active = false; - binStats.duration += now - binStats.windowedTimer; - binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer); - } - - // Idle -> busy at binStats.priority - else if (binStats.priority <= priority && binStats.priority > lastPriority) { - binStats.active = true; - binStats.timer = now; - binStats.windowedTimer = now; - } + updateStarvationTracker(binStats, priority, lastPriority, now); } + // Update starvation trackers for network busyness + updateStarvationTracker(networkInfo.metrics.starvationTrackerNetworkBusyness, priority, lastPriority, now); + lastPriorityStats = &activeStatsItr.first->second; } } diff --git a/flow/network.h b/flow/network.h index 33fb7b0f26..d0f117dede 100644 --- a/flow/network.h +++ b/flow/network.h @@ -27,6 +27,7 @@ #include #include #include +#include #include "boost/asio.hpp" #ifndef TLS_DISABLED #include "boost/asio/ssl.hpp" @@ -320,6 +321,7 @@ class Future; template class Promise; +// Metrics which represent various network properties struct NetworkMetrics { enum { SLOW_EVENT_BINS = 16 }; uint64_t countSlowEvents[SLOW_EVENT_BINS] = {}; @@ -340,16 +342,37 @@ struct NetworkMetrics { }; std::unordered_map activeTrackers; - double lastRunLoopBusyness; + double lastRunLoopBusyness; // network thread busyness (measured every 5s by default) + std::atomic networkBusyness; // network thread busyness which is returned to the the client (measured every 1s by default) + + // starvation trackers which keeps track of different task priorities std::vector starvationTrackers; + struct PriorityStats starvationTrackerNetworkBusyness; static const std::vector starvationBins; - NetworkMetrics() : lastRunLoopBusyness(0) { - for (int priority : starvationBins) { + NetworkMetrics() + : lastRunLoopBusyness(0), networkBusyness(0), + starvationTrackerNetworkBusyness(PriorityStats(static_cast(starvationBins.at(0)))) { + for (int priority : starvationBins) { // initalize starvation trackers with given priorities starvationTrackers.emplace_back(static_cast(priority)); } } + + // Since networkBusyness is atomic we need to redefine copy assignment operator + NetworkMetrics& operator=(const NetworkMetrics& rhs) { + for (int i = 0; i < SLOW_EVENT_BINS; i++) { + countSlowEvents[i] = rhs.countSlowEvents[i]; + } + secSquaredSubmit = rhs.secSquaredSubmit; + secSquaredDiskStall = rhs.secSquaredDiskStall; + activeTrackers = rhs.activeTrackers; + lastRunLoopBusyness = rhs.lastRunLoopBusyness; + networkBusyness = rhs.networkBusyness.load(); + starvationTrackers = rhs.starvationTrackers; + starvationTrackerNetworkBusyness = rhs.starvationTrackerNetworkBusyness; + return *this; + } }; struct FlowLock;