Merge pull request #4504 from sfc-gh-nwijetunga/network_busyness
Monitor Network Thread Busyness
This commit is contained in:
commit
ed811008ca
|
@ -357,6 +357,13 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
|
|||
.extractPtr());
|
||||
}
|
||||
|
||||
// Get network thread busyness (updated every 1s)
|
||||
// A value of 0 indicates that the client is more or less idle
|
||||
// A value of 1 (or more) indicates that the client is saturated
|
||||
extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d) {
|
||||
return DB(d)->getMainThreadBusyness();
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {
|
||||
try {
|
||||
TXN(tr)->delref();
|
||||
|
|
|
@ -187,6 +187,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_create_snapshot(FDBDatabase
|
|||
uint8_t const* snap_command,
|
||||
int snap_command_length);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db);
|
||||
|
||||
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);
|
||||
|
||||
DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr);
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include <tuple>
|
||||
#include <vector>
|
||||
#include <random>
|
||||
#include <chrono>
|
||||
|
||||
#define DOCTEST_CONFIG_IMPLEMENT
|
||||
#include "doctest.h"
|
||||
|
@ -2126,6 +2127,24 @@ TEST_CASE("block_from_callback") {
|
|||
context.event.wait();
|
||||
}
|
||||
|
||||
// monitors network busyness for 2 sec (40 readings)
|
||||
TEST_CASE("monitor_network_busyness") {
|
||||
bool containsGreaterZero = false;
|
||||
for (int i = 0; i < 40; i++) {
|
||||
double busyness = fdb_database_get_main_thread_busyness(db);
|
||||
// make sure the busyness is between 0 and 1
|
||||
CHECK(busyness >= 0);
|
||||
CHECK(busyness <= 1);
|
||||
if (busyness > 0) {
|
||||
containsGreaterZero = true;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
// assert that at least one of the busyness readings was greater than 0
|
||||
CHECK(containsGreaterZero);
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
if (argc != 3 && argc != 4) {
|
||||
std::cout << "Unit tests for the FoundationDB C API.\n"
|
||||
|
|
|
@ -481,7 +481,11 @@ An |database-blurb1| Modifications to a database are performed via transactions.
|
|||
|length-of| ``snapshot_command``
|
||||
|
||||
.. note:: The function is exposing the functionality of the fdbcli command ``snapshot``. Please take a look at the documentation before using (see :ref:`disk-snapshot-backups`).
|
||||
|
||||
|
||||
.. function:: double fdb_database_get_main_thread_busyness(FDBDatabase* database)
|
||||
|
||||
Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated. By default, this value is updated every second.
|
||||
|
||||
Transaction
|
||||
===========
|
||||
|
||||
|
|
|
@ -96,6 +96,7 @@ public:
|
|||
|
||||
virtual Reference<ITransaction> createTransaction() = 0;
|
||||
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
||||
virtual double getMainThreadBusyness() = 0;
|
||||
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
|
|
|
@ -38,6 +38,7 @@ void ClientKnobs::initialize(bool randomize) {
|
|||
init( TOO_MANY, 1000000 );
|
||||
|
||||
init( SYSTEM_MONITOR_INTERVAL, 5.0 );
|
||||
init( NETWORK_BUSYNESS_MONITOR_INTERVAL, 1.0 );
|
||||
|
||||
init( FAILURE_MAX_DELAY, 5.0 );
|
||||
init( FAILURE_MIN_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 1.0;
|
||||
|
|
|
@ -30,6 +30,7 @@ public:
|
|||
int TOO_MANY; // FIXME: this should really be split up so we can control these more specifically
|
||||
|
||||
double SYSTEM_MONITOR_INTERVAL;
|
||||
double NETWORK_BUSYNESS_MONITOR_INTERVAL; // The interval in which we should update the network busyness metric
|
||||
|
||||
double FAILURE_MAX_DELAY;
|
||||
double FAILURE_MIN_DELAY;
|
||||
|
|
|
@ -347,6 +347,15 @@ ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef& uid, const String
|
|||
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
|
||||
}
|
||||
|
||||
// Get network thread busyness
|
||||
double DLDatabase::getMainThreadBusyness() {
|
||||
if (api->databaseGetMainThreadBusyness != nullptr) {
|
||||
return api->databaseGetMainThreadBusyness(db);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// DLApi
|
||||
template <class T>
|
||||
void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) {
|
||||
|
@ -360,6 +369,7 @@ void loadClientFunction(T* fp, void* lib, std::string libPath, const char* funct
|
|||
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad)
|
||||
: api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
|
||||
|
||||
// Loads client API functions (definitions are in FdbCApi struct)
|
||||
void DLApi::init() {
|
||||
if (isLibraryLoaded(fdbCPath.c_str())) {
|
||||
throw external_client_already_loaded();
|
||||
|
@ -388,6 +398,11 @@ void DLApi::init() {
|
|||
|
||||
loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction");
|
||||
loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option");
|
||||
loadClientFunction(&api->databaseGetMainThreadBusyness,
|
||||
lib,
|
||||
fdbCPath,
|
||||
"fdb_database_get_main_thread_busyness",
|
||||
headerVersion >= 700);
|
||||
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
|
||||
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
|
||||
loadClientFunction(&api->databaseForceRecoveryWithDataLoss,
|
||||
|
@ -917,6 +932,15 @@ ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef& uid, co
|
|||
return abortableFuture(f, dbState->dbVar->get().onChange);
|
||||
}
|
||||
|
||||
// Get network thread busyness
|
||||
double MultiVersionDatabase::getMainThreadBusyness() {
|
||||
if (dbState->db) {
|
||||
return dbState->db->getMainThreadBusyness();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void MultiVersionDatabase::Connector::connect() {
|
||||
addref();
|
||||
onMainThreadVoid(
|
||||
|
|
|
@ -80,6 +80,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int uidLength,
|
||||
uint8_t const* snapshotCommmand,
|
||||
int snapshotCommandLength);
|
||||
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
|
||||
|
||||
// Transaction
|
||||
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
|
||||
|
@ -262,6 +263,7 @@ public:
|
|||
|
||||
Reference<ITransaction> createTransaction() override;
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
double getMainThreadBusyness() override;
|
||||
|
||||
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
|
||||
|
@ -422,6 +424,7 @@ public:
|
|||
|
||||
Reference<ITransaction> createTransaction() override;
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
double getMainThreadBusyness() override;
|
||||
|
||||
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
|
||||
|
|
|
@ -1743,6 +1743,30 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
|
|||
}
|
||||
}
|
||||
|
||||
// update the network busyness on a 1s cadence
|
||||
ACTOR Future<Void> monitorNetworkBusyness() {
|
||||
state double prevTime = now();
|
||||
loop {
|
||||
wait(delay(CLIENT_KNOBS->NETWORK_BUSYNESS_MONITOR_INTERVAL, TaskPriority::FlushTrace));
|
||||
double elapsed = now() - prevTime; // get elapsed time from last execution
|
||||
prevTime = now();
|
||||
struct NetworkMetrics::PriorityStats& tracker = g_network->networkInfo.metrics.starvationTrackerNetworkBusyness;
|
||||
|
||||
if (tracker.active) { // update metrics
|
||||
tracker.duration += now() - tracker.windowedTimer;
|
||||
tracker.maxDuration = std::max(tracker.maxDuration, now() - tracker.timer);
|
||||
tracker.windowedTimer = now();
|
||||
}
|
||||
|
||||
g_network->networkInfo.metrics.networkBusyness =
|
||||
std::min(elapsed, tracker.duration) / elapsed; // average duration spent doing "work"
|
||||
|
||||
tracker.duration = 0;
|
||||
tracker.maxDuration = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Setup g_network and start monitoring for network busyness
|
||||
void setupNetwork(uint64_t transportId, bool useMetrics) {
|
||||
if (g_network)
|
||||
throw network_already_setup();
|
||||
|
@ -1756,6 +1780,8 @@ void setupNetwork(uint64_t transportId, bool useMetrics) {
|
|||
g_network->addStopCallback(TLS::DestroyOpenSSLGlobalState);
|
||||
FlowTransport::createInstance(true, transportId);
|
||||
Net2FileSystem::newFileSystem();
|
||||
|
||||
uncancellable(monitorNetworkBusyness());
|
||||
}
|
||||
|
||||
void runNetwork() {
|
||||
|
|
|
@ -91,6 +91,12 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
|
|||
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
|
||||
}
|
||||
|
||||
// Return the main network thread busyness
|
||||
double ThreadSafeDatabase::getMainThreadBusyness() {
|
||||
ASSERT(g_network);
|
||||
return g_network->networkInfo.metrics.networkBusyness;
|
||||
}
|
||||
|
||||
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
|
||||
ClusterConnectionFile* connFile =
|
||||
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);
|
||||
|
|
|
@ -35,6 +35,7 @@ public:
|
|||
Reference<ITransaction> createTransaction() override;
|
||||
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
double getMainThreadBusyness() override;
|
||||
|
||||
ThreadFuture<Void>
|
||||
onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The
|
||||
|
|
|
@ -135,6 +135,12 @@ thread_local INetwork* thread_network = 0;
|
|||
|
||||
class Net2 final : public INetwork, public INetworkConnections {
|
||||
|
||||
private:
|
||||
void updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats,
|
||||
TaskPriority priority,
|
||||
TaskPriority lastPriority,
|
||||
double now);
|
||||
|
||||
public:
|
||||
Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics);
|
||||
void initTLS(ETLSInitState targetState) override;
|
||||
|
@ -1582,6 +1588,28 @@ void Net2::run() {
|
|||
#endif
|
||||
}
|
||||
|
||||
// Updates the PriorityStats found in NetworkMetrics
|
||||
void Net2::updateStarvationTracker(struct NetworkMetrics::PriorityStats& binStats,
|
||||
TaskPriority priority,
|
||||
TaskPriority lastPriority,
|
||||
double now) {
|
||||
|
||||
// Busy -> idle at binStats.priority
|
||||
if (binStats.priority > priority && binStats.priority <= lastPriority) {
|
||||
binStats.active = false;
|
||||
binStats.duration += now - binStats.windowedTimer;
|
||||
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
|
||||
}
|
||||
|
||||
// Idle -> busy at binStats.priority
|
||||
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
|
||||
binStats.active = true;
|
||||
binStats.timer = now;
|
||||
binStats.windowedTimer = now;
|
||||
}
|
||||
}
|
||||
|
||||
// Update both vectors of starvation trackers (one that updates every 5s and the other every 1s)
|
||||
void Net2::trackAtPriority(TaskPriority priority, double now) {
|
||||
if (lastPriorityStats == nullptr || priority != lastPriorityStats->priority) {
|
||||
// Start tracking current priority
|
||||
|
@ -1601,22 +1629,12 @@ void Net2::trackAtPriority(TaskPriority priority, double now) {
|
|||
if (binStats.priority > lastPriority && binStats.priority > priority) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Busy -> idle at binStats.priority
|
||||
if (binStats.priority > priority && binStats.priority <= lastPriority) {
|
||||
binStats.active = false;
|
||||
binStats.duration += now - binStats.windowedTimer;
|
||||
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
|
||||
}
|
||||
|
||||
// Idle -> busy at binStats.priority
|
||||
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
|
||||
binStats.active = true;
|
||||
binStats.timer = now;
|
||||
binStats.windowedTimer = now;
|
||||
}
|
||||
updateStarvationTracker(binStats, priority, lastPriority, now);
|
||||
}
|
||||
|
||||
// Update starvation trackers for network busyness
|
||||
updateStarvationTracker(networkInfo.metrics.starvationTrackerNetworkBusyness, priority, lastPriority, now);
|
||||
|
||||
lastPriorityStats = &activeStatsItr.first->second;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include <string>
|
||||
#include <stdint.h>
|
||||
#include <variant>
|
||||
#include <atomic>
|
||||
#include "boost/asio.hpp"
|
||||
#ifndef TLS_DISABLED
|
||||
#include "boost/asio/ssl.hpp"
|
||||
|
@ -320,6 +321,7 @@ class Future;
|
|||
template <class T>
|
||||
class Promise;
|
||||
|
||||
// Metrics which represent various network properties
|
||||
struct NetworkMetrics {
|
||||
enum { SLOW_EVENT_BINS = 16 };
|
||||
uint64_t countSlowEvents[SLOW_EVENT_BINS] = {};
|
||||
|
@ -340,16 +342,37 @@ struct NetworkMetrics {
|
|||
};
|
||||
|
||||
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
|
||||
double lastRunLoopBusyness;
|
||||
double lastRunLoopBusyness; // network thread busyness (measured every 5s by default)
|
||||
std::atomic<double> networkBusyness; // network thread busyness which is returned to the the client (measured every 1s by default)
|
||||
|
||||
// starvation trackers which keeps track of different task priorities
|
||||
std::vector<struct PriorityStats> starvationTrackers;
|
||||
struct PriorityStats starvationTrackerNetworkBusyness;
|
||||
|
||||
static const std::vector<int> starvationBins;
|
||||
|
||||
NetworkMetrics() : lastRunLoopBusyness(0) {
|
||||
for (int priority : starvationBins) {
|
||||
NetworkMetrics()
|
||||
: lastRunLoopBusyness(0), networkBusyness(0),
|
||||
starvationTrackerNetworkBusyness(PriorityStats(static_cast<TaskPriority>(starvationBins.at(0)))) {
|
||||
for (int priority : starvationBins) { // initalize starvation trackers with given priorities
|
||||
starvationTrackers.emplace_back(static_cast<TaskPriority>(priority));
|
||||
}
|
||||
}
|
||||
|
||||
// Since networkBusyness is atomic we need to redefine copy assignment operator
|
||||
NetworkMetrics& operator=(const NetworkMetrics& rhs) {
|
||||
for (int i = 0; i < SLOW_EVENT_BINS; i++) {
|
||||
countSlowEvents[i] = rhs.countSlowEvents[i];
|
||||
}
|
||||
secSquaredSubmit = rhs.secSquaredSubmit;
|
||||
secSquaredDiskStall = rhs.secSquaredDiskStall;
|
||||
activeTrackers = rhs.activeTrackers;
|
||||
lastRunLoopBusyness = rhs.lastRunLoopBusyness;
|
||||
networkBusyness = rhs.networkBusyness.load();
|
||||
starvationTrackers = rhs.starvationTrackers;
|
||||
starvationTrackerNetworkBusyness = rhs.starvationTrackerNetworkBusyness;
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
|
||||
struct FlowLock;
|
||||
|
|
Loading…
Reference in New Issue