Inital implementation of network busyness

This commit is contained in:
Nim Wijetunga 2021-03-15 23:23:56 +00:00
parent c1cf8d41fe
commit f0d79b3d86
14 changed files with 120 additions and 15 deletions

View File

@ -366,6 +366,10 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_create_snapshot(FDBDatabase* db,
.extractPtr());
}
extern "C" DLLEXPORT double fdb_database_get_main_thread_busyness(FDBDatabase* d) {
return DB(d)->getMainThreadBusyness();
}
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {
try {
TXN(tr)->delref();

View File

@ -190,6 +190,8 @@ extern "C" {
int uid_length, uint8_t const *snap_command,
int snap_command_length);
DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDatabase* db);
DLLEXPORT void fdb_transaction_destroy( FDBTransaction* tr);
DLLEXPORT void fdb_transaction_cancel( FDBTransaction* tr);

View File

@ -96,6 +96,7 @@ public:
virtual Reference<ITransaction> createTransaction() = 0;
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0;
virtual void addref() = 0;
virtual void delref() = 0;

View File

@ -38,6 +38,7 @@ void ClientKnobs::initialize(bool randomize) {
init( TOO_MANY, 1000000 );
init( SYSTEM_MONITOR_INTERVAL, 5.0 );
init( NETWORK_BUSYNESS_MONITOR_INTERVAL, 1.0 );
init( FAILURE_MAX_DELAY, 5.0 );
init( FAILURE_MIN_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 1.0;

View File

@ -30,6 +30,7 @@ public:
int TOO_MANY; // FIXME: this should really be split up so we can control these more specifically
double SYSTEM_MONITOR_INTERVAL;
double NETWORK_BUSYNESS_MONITOR_INTERVAL;
double FAILURE_MAX_DELAY;
double FAILURE_MIN_DELAY;

View File

@ -347,6 +347,14 @@ ThreadFuture<Void> DLDatabase::createSnapshot(const StringRef& uid, const String
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
double DLDatabase::getMainThreadBusyness() {
if (api->databaseGetMainThreadBusyness != nullptr) {
return api->databaseGetMainThreadBusyness(db);
}
return 0;
}
// DLApi
template <class T>
void loadClientFunction(T* fp, void* lib, std::string libPath, const char* functionName, bool requireFunction = true) {
@ -388,6 +396,11 @@ void DLApi::init() {
loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction");
loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option");
loadClientFunction(&api->databaseGetMainThreadBusyness,
lib,
fdbCPath,
"fdb_database_get_main_thread_busyness",
headerVersion >= 700);
loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
loadClientFunction(&api->databaseRebootWorker, lib, fdbCPath, "fdb_database_reboot_worker", headerVersion >= 700);
loadClientFunction(&api->databaseForceRecoveryWithDataLoss,
@ -917,6 +930,14 @@ ThreadFuture<Void> MultiVersionDatabase::createSnapshot(const StringRef& uid, co
return abortableFuture(f, dbState->dbVar->get().onChange);
}
double MultiVersionDatabase::getMainThreadBusyness() {
if (dbState->db) {
return dbState->db->getMainThreadBusyness();
}
return 0;
}
void MultiVersionDatabase::Connector::connect() {
addref();
onMainThreadVoid(

View File

@ -80,6 +80,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
int uidLength,
uint8_t const* snapshotCommmand,
int snapshotCommandLength);
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
// Transaction
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
@ -262,6 +263,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
@ -422,6 +424,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }

View File

@ -1746,6 +1746,10 @@ void setupNetwork(uint64_t transportId, bool useMetrics) {
g_network->addStopCallback(TLS::DestroyOpenSSLGlobalState);
FlowTransport::createInstance(true, transportId);
Net2FileSystem::newFileSystem();
TraceEvent("Nim_setupNetwork");
systemMonitorNetworkBusyness();
uncancellable(recurring(&systemMonitorNetworkBusyness, CLIENT_KNOBS->NETWORK_BUSYNESS_MONITOR_INTERVAL, TaskPriority::FlushTrace));
}
void runNetwork() {

View File

@ -91,6 +91,17 @@ ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, cons
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
}
double ThreadSafeDatabase::getMainThreadBusyness() {
// Return the main network thread busyness
if (!g_network) {
TraceEvent("Nim_getBusyness g_network null");
// TODO: Is this the right thing to do in this case?
return 0.0;
}
TraceEvent("Nim_getBusyness g_network good");
return g_network->networkInfo.metrics.networkBusyness;
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);

View File

@ -35,6 +35,7 @@ public:
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
ThreadFuture<Void>
onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The

View File

@ -131,6 +131,8 @@ thread_local INetwork* thread_network = 0;
class Net2 final : public INetwork, public INetworkConnections {
private:
void updateStarvationTrackers(struct NetworkMetrics::PriorityStats &binStats, TaskPriority priority, TaskPriority lastPriority, double now);
public:
Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics);
void initTLS(ETLSInitState targetState) override;
@ -1572,6 +1574,24 @@ void Net2::run() {
#endif
}
void Net2::updateStarvationTrackers(struct NetworkMetrics::PriorityStats &binStats, TaskPriority priority, TaskPriority lastPriority, double now) {
// Updates the PriorityStats found in NetworkMetrics
// Busy -> idle at binStats.priority
if (binStats.priority > priority && binStats.priority <= lastPriority) {
binStats.active = false;
binStats.duration += now - binStats.windowedTimer;
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
}
// Idle -> busy at binStats.priority
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
binStats.active = true;
binStats.timer = now;
binStats.windowedTimer = now;
}
}
void Net2::trackAtPriority(TaskPriority priority, double now) {
if (lastPriorityStats == nullptr || priority != lastPriorityStats->priority) {
// Start tracking current priority
@ -1591,20 +1611,15 @@ void Net2::trackAtPriority(TaskPriority priority, double now) {
if (binStats.priority > lastPriority && binStats.priority > priority) {
break;
}
updateStarvationTrackers(binStats, priority, lastPriority, now);
}
// Busy -> idle at binStats.priority
if (binStats.priority > priority && binStats.priority <= lastPriority) {
binStats.active = false;
binStats.duration += now - binStats.windowedTimer;
binStats.maxDuration = std::max(binStats.maxDuration, now - binStats.timer);
}
// Idle -> busy at binStats.priority
else if (binStats.priority <= priority && binStats.priority > lastPriority) {
binStats.active = true;
binStats.timer = now;
binStats.windowedTimer = now;
// Update starvation trackers for 1s measurment interval
for (auto& binStats : networkInfo.metrics.starvationTrackersOneSecondInterval) {
if (binStats.priority > lastPriority && binStats.priority > priority) {
break;
}
updateStarvationTrackers(binStats, priority, lastPriority, now);
}
lastPriorityStats = &activeStatsItr.first->second;

View File

@ -42,6 +42,11 @@ void systemMonitor() {
customSystemMonitor("ProcessMetrics", &statState, true);
}
void systemMonitorNetworkBusyness() {
static StatisticsState statStateNetworkBusyness = StatisticsState();
customSystemMonitorNetworkBusyness("ProcessMetricsNetworkBusyness", &statStateNetworkBusyness);
}
SystemStatistics getSystemStatistics() {
static StatisticsState statState = StatisticsState();
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
@ -61,6 +66,39 @@ SystemStatistics getSystemStatistics() {
.detail("ApproximateUnusedMemory" #size, FastAllocator<size>::getApproximateMemoryUnused()) \
.detail("ActiveThreads" #size, FastAllocator<size>::getActiveThreads())
SystemStatistics customSystemMonitorNetworkBusyness(std::string eventName, StatisticsState* statState) {
const IPAddress ipAddr = IPAddress();
SystemStatistics currentStats = getSystemStatistics("", &ipAddr, &statState->systemState, true);
NetworkData netData;
netData.init();
if (!g_network->isSimulated() && currentStats.initialized) {
{
bool firstTracker = true;
for (auto& itr : g_network->networkInfo.metrics.starvationTrackersOneSecondInterval) {
if (itr.active) {
itr.duration += now() - itr.windowedTimer;
itr.maxDuration = std::max(itr.maxDuration, now() - itr.timer);
itr.windowedTimer = now();
}
if (firstTracker) {
g_network->networkInfo.metrics.networkBusyness =
std::min(currentStats.elapsed, itr.duration) / currentStats.elapsed;
TraceEvent("Nim_system monitor").detail("busyness", g_network->networkInfo.metrics.networkBusyness);
firstTracker = false;
}
itr.duration = 0;
itr.maxDuration = 0;
}
}
}
statState->networkMetricsState = g_network->networkInfo.metrics;
statState->networkState = netData;
return currentStats;
}
SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics) {
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
SystemStatistics currentStats = getSystemStatistics(

View File

@ -148,7 +148,9 @@ struct StatisticsState {
};
void systemMonitor();
void systemMonitorNetworkBusyness();
SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* statState, bool machineMetrics = false);
SystemStatistics customSystemMonitorNetworkBusyness(std::string eventName, StatisticsState* statState);
SystemStatistics getSystemStatistics();
#endif /* FLOW_SYSTEM_MONITOR_H */

View File

@ -340,14 +340,15 @@ struct NetworkMetrics {
};
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
double lastRunLoopBusyness;
std::vector<struct PriorityStats> starvationTrackers;
double lastRunLoopBusyness, networkBusyness;
std::vector<struct PriorityStats> starvationTrackers, starvationTrackersOneSecondInterval;
static const std::vector<int> starvationBins;
NetworkMetrics() : lastRunLoopBusyness(0) {
NetworkMetrics() : lastRunLoopBusyness(0), networkBusyness(0) {
for (int priority : starvationBins) {
starvationTrackers.emplace_back(static_cast<TaskPriority>(priority));
starvationTrackersOneSecondInterval.emplace_back(static_cast<TaskPriority>(priority));
}
}
};