diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index a5d6a206d8..f98c1c2404 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -366,6 +366,10 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db, .extractPtr()); } +extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d) { + return DB(d)->getMainThreadBusyness(); +} + extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) { try { TXN(tr)->delref(); diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 0bda60790b..4fdd852890 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -190,6 +190,8 @@ extern "C" { int uid_length, uint8_t const *snap_command, int snap_command_length); + DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db); + DLLEXPORT void fdb_transaction_destroy( FDBTransaction* tr); DLLEXPORT void fdb_transaction_cancel( FDBTransaction* tr); diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index 25f5098b0f..6f3ad07cd1 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -96,6 +96,7 @@ public: virtual Reference createTransaction() = 0; virtual void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) = 0; + virtual double getMainThreadBusyness() = 0; virtual void addref() = 0; virtual void delref() = 0; diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index 47f858e58f..13fc34222d 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -38,6 +38,7 @@ void ClientKnobs::initialize(bool randomize) { init( TOO_MANY, 1000000 ); init( SYSTEM_MONITOR_INTERVAL, 5.0 ); + init( NETWORK_BUSYNESS_MONITOR_INTERVAL, 1.0 ); init( FAILURE_MAX_DELAY, 5.0 ); init( FAILURE_MIN_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 1.0; diff --git a/fdbclient/Knobs.h b/fdbclient/Knobs.h index 3c21e58bff..3280005efb 100644 --- a/fdbclient/Knobs.h +++ b/fdbclient/Knobs.h @@ -30,6 +30,7 @@ public: int TOO_MANY; // FIXME: this should really be split up so we can control these more specifically double SYSTEM_MONITOR_INTERVAL; + double NETWORK_BUSYNESS_MONITOR_INTERVAL; double FAILURE_MAX_DELAY; double FAILURE_MIN_DELAY; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 3fe56620c5..66a1634359 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -347,6 +347,14 @@ ThreadFuture DLDatabase::createSnapshot(const StringRef& uid, const String return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); }); } +double DLDatabase::getMainThreadBusyness() { + if (api->databaseGetMainThreadBusyness != nullptr) { + return api->databaseGetMainThreadBusyness(db); + } + + return 0; +} + // DLApi template void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) { @@ -388,6 +396,11 @@ void DLApi::init() { loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction"); loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option"); + loadClientFunction(&api->databaseGetMainThreadBusyness, + lib, + fdbCPath, + "fdb_database_get_main_thread_busyness", + headerVersion >= 700); loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy"); loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700); loadClientFunction(&api->databaseForceRecoveryWithDataLoss, @@ -917,6 +930,14 @@ ThreadFuture MultiVersionDatabase::createSnapshot(const StringRef& uid, co return abortableFuture(f, dbState->dbVar->get().onChange); } +double MultiVersionDatabase::getMainThreadBusyness() { + if (dbState->db) { + return dbState->db->getMainThreadBusyness(); + } + + return 0; +} + void MultiVersionDatabase::Connector::connect() { addref(); onMainThreadVoid( diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 625eea061c..ea16f4f35e 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -80,6 +80,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int uidLength, uint8_t const* snapshotCommmand, int snapshotCommandLength); + double (*databaseGetMainThreadBusyness)(FDBDatabase* database); // Transaction fdb_error_t (*transactionSetOption)(FDBTransaction* tr, @@ -262,6 +263,7 @@ public: Reference createTransaction() override; void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; + double getMainThreadBusyness() override; void addref() override { ThreadSafeReferenceCounted::addref(); } void delref() override { ThreadSafeReferenceCounted::delref(); } @@ -422,6 +424,7 @@ public: Reference createTransaction() override; void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; + double getMainThreadBusyness() override; void addref() override { ThreadSafeReferenceCounted::addref(); } void delref() override { ThreadSafeReferenceCounted::delref(); } diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index f0a5fc1ad6..142cfedb2a 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1746,6 +1746,10 @@ void setupNetwork(uint64_t transportId, bool useMetrics) { g_network->addStopCallback(TLS::DestroyOpenSSLGlobalState); FlowTransport::createInstance(true, transportId); Net2FileSystem::newFileSystem(); + + TraceEvent("Nim_setupNetwork"); + systemMonitorNetworkBusyness(); + uncancellable(recurring(&systemMonitorNetworkBusyness, CLIENT_KNOBS->NETWORK_BUSYNESS_MONITOR_INTERVAL, TaskPriority::FlushTrace)); } void runNetwork() { diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 2eb59ebc15..765e1e1270 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -91,6 +91,17 @@ ThreadFuture ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons return onMainThread([db, snapUID, cmd]() -> Future { return db->createSnapshot(snapUID, cmd); }); } +double ThreadSafeDatabase::getMainThreadBusyness() { + // Return the main network thread busyness + if (!g_network) { + TraceEvent("Nim_getBusyness g_network null"); + // TODO: Is this the right thing to do in this case? + return 0.0; + } + TraceEvent("Nim_getBusyness g_network good"); + return g_network->networkInfo.metrics.networkBusyness; +} + ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) { ClusterConnectionFile* connFile = new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first); diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 4c7b26fb94..a62e503c11 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -35,6 +35,7 @@ public: Reference createTransaction() override; void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; + double getMainThreadBusyness() override; ThreadFuture onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 3a1e52dc4a..7086b89614 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -131,6 +131,8 @@ thread_local INetwork* thread_network = 0; class Net2 final : public INetwork, public INetworkConnections { +private: + void updateStarvationTrackers(struct NetworkMetrics::PriorityStats &binStats, TaskPriority priority, TaskPriority lastPriority, double now); public: Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics); void initTLS(ETLSInitState targetState) override; @@ -1572,6 +1574,24 @@ void Net2::run() { #endif } +void Net2::updateStarvationTrackers(struct NetworkMetrics::PriorityStats &binStats, TaskPriority priority, TaskPriority lastPriority, double now) { + // Updates the PriorityStats found in NetworkMetrics + + // Busy -> idle at binStats.priority + if (binStats.priority > priority && binStats.priority <= lastPriority) { + binStats.active = false; + binStats.duration += now - binStats.windowedTimer; + binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer); + } + + // Idle -> busy at binStats.priority + else if (binStats.priority <= priority && binStats.priority > lastPriority) { + binStats.active = true; + binStats.timer = now; + binStats.windowedTimer = now; + } +} + void Net2::trackAtPriority(TaskPriority priority, double now) { if (lastPriorityStats == nullptr || priority != lastPriorityStats->priority) { // Start tracking current priority @@ -1591,20 +1611,15 @@ void Net2::trackAtPriority(TaskPriority priority, double now) { if (binStats.priority > lastPriority && binStats.priority > priority) { break; } + updateStarvationTrackers(binStats, priority, lastPriority, now); + } - // Busy -> idle at binStats.priority - if (binStats.priority > priority && binStats.priority <= lastPriority) { - binStats.active = false; - binStats.duration += now - binStats.windowedTimer; - binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer); - } - - // Idle -> busy at binStats.priority - else if (binStats.priority <= priority && binStats.priority > lastPriority) { - binStats.active = true; - binStats.timer = now; - binStats.windowedTimer = now; + // Update starvation trackers for 1s measurment interval + for (auto& binStats : networkInfo.metrics.starvationTrackersOneSecondInterval) { + if (binStats.priority > lastPriority && binStats.priority > priority) { + break; } + updateStarvationTrackers(binStats, priority, lastPriority, now); } lastPriorityStats = &activeStatsItr.first->second; diff --git a/flow/SystemMonitor.cpp b/flow/SystemMonitor.cpp index 37dadf9dc1..22800a2ae3 100644 --- a/flow/SystemMonitor.cpp +++ b/flow/SystemMonitor.cpp @@ -42,6 +42,11 @@ void systemMonitor() { customSystemMonitor("ProcessMetrics", &statState, true); } +void systemMonitorNetworkBusyness() { + static StatisticsState statStateNetworkBusyness = StatisticsState(); + customSystemMonitorNetworkBusyness("ProcessMetricsNetworkBusyness", &statStateNetworkBusyness); +} + SystemStatistics getSystemStatistics() { static StatisticsState statState = StatisticsState(); const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress(); @@ -61,6 +66,39 @@ SystemStatistics getSystemStatistics() { .detail("ApproximateUnusedMemory" #size, FastAllocator::getApproximateMemoryUnused()) \ .detail("ActiveThreads" #size, FastAllocator::getActiveThreads()) +SystemStatistics customSystemMonitorNetworkBusyness(std::string eventName, StatisticsState* statState) { + const IPAddress ipAddr = IPAddress(); + SystemStatistics currentStats = getSystemStatistics("", &ipAddr, &statState->systemState, true); + NetworkData netData; + netData.init(); + if (!g_network->isSimulated() && currentStats.initialized) { + { + bool firstTracker = true; + for (auto& itr : g_network->networkInfo.metrics.starvationTrackersOneSecondInterval) { + if (itr.active) { + itr.duration += now() - itr.windowedTimer; + itr.maxDuration = std::max(itr.maxDuration, now() - itr.timer); + itr.windowedTimer = now(); + } + + if (firstTracker) { + g_network->networkInfo.metrics.networkBusyness = + std::min(currentStats.elapsed, itr.duration) / currentStats.elapsed; + TraceEvent("Nim_system monitor").detail("busyness", g_network->networkInfo.metrics.networkBusyness); + firstTracker = false; + } + + itr.duration = 0; + itr.maxDuration = 0; + } + } + } + + statState->networkMetricsState = g_network->networkInfo.metrics; + statState->networkState = netData; + return currentStats; +} + SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics) { const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress(); SystemStatistics currentStats = getSystemStatistics( diff --git a/flow/SystemMonitor.h b/flow/SystemMonitor.h index 2d2470bd48..28bbcb100c 100644 --- a/flow/SystemMonitor.h +++ b/flow/SystemMonitor.h @@ -148,7 +148,9 @@ struct StatisticsState { }; void systemMonitor(); +void systemMonitorNetworkBusyness(); SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics = false); +SystemStatistics customSystemMonitorNetworkBusyness(std::string eventName, StatisticsState* statState); SystemStatistics getSystemStatistics(); #endif /* FLOW_SYSTEM_MONITOR_H */ diff --git a/flow/network.h b/flow/network.h index 33fb7b0f26..fb6c5cf5ec 100644 --- a/flow/network.h +++ b/flow/network.h @@ -340,14 +340,15 @@ struct NetworkMetrics { }; std::unordered_map activeTrackers; - double lastRunLoopBusyness; - std::vector starvationTrackers; + double lastRunLoopBusyness, networkBusyness; + std::vector starvationTrackers, starvationTrackersOneSecondInterval; static const std::vector starvationBins; - NetworkMetrics() : lastRunLoopBusyness(0) { + NetworkMetrics() : lastRunLoopBusyness(0), networkBusyness(0) { for (int priority : starvationBins) { starvationTrackers.emplace_back(static_cast(priority)); + starvationTrackersOneSecondInterval.emplace_back(static_cast(priority)); } } };