Sample actors waiting on network

This commit is contained in:
Lukas Joswiak 2021-04-06 17:28:28 -07:00
parent c90be2003f
commit 433872e17d
7 changed files with 86 additions and 4 deletions

View File

@ -0,0 +1,50 @@
/*
* InstrumentRequest.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "flow/flow.h"
#include "flow/network.h"
// Used to manually instrument waiting actors to collect samples for the
// sampling profiler.
struct InstrumentRequest {
unsigned index;
InstrumentRequest() {}
// This API isn't great. Ideally, no cleanup call is needed. I ran into an
// issue around the destructor being called twice because an instance of
// this class has to be stored as a class member (otherwise it goes away
// when wait is called), and due to how Flow does code generation the
// member will be default initialized and then initialized again when it is
// initially set. Then, the destructor will be called twice, causing issues
// when the WriteOnlySet tries to erase the same index twice. I'm working
// on this :)
void start() {
index = g_network->getActorLineageSet().insert(currentLineage);
}
void complete() {
g_network->getActorLineageSet().erase(index);
}
};

View File

@ -36,6 +36,7 @@
#include "fdbclient/ClusterInterface.h" #include "fdbclient/ClusterInterface.h"
#include "fdbclient/CoordinationInterface.h" #include "fdbclient/CoordinationInterface.h"
#include "fdbclient/DatabaseContext.h" #include "fdbclient/DatabaseContext.h"
#include "fdbclient/InstrumentRequest.h"
#include "fdbclient/JsonBuilder.h" #include "fdbclient/JsonBuilder.h"
#include "fdbclient/KeyRangeMap.h" #include "fdbclient/KeyRangeMap.h"
#include "fdbclient/Knobs.h" #include "fdbclient/Knobs.h"
@ -1770,6 +1771,7 @@ void runNetwork() {
if (networkOptions.traceDirectory.present() && networkOptions.runLoopProfilingEnabled) { if (networkOptions.traceDirectory.present() && networkOptions.runLoopProfilingEnabled) {
setupRunLoopProfiler(); setupRunLoopProfiler();
} }
setupSamplingProfiler();
g_network->run(); g_network->run();
@ -3025,6 +3027,8 @@ ACTOR Future<Standalone<RangeResultRef>> getRange(Database cx,
throw deterministicRandom()->randomChoice( throw deterministicRandom()->randomChoice(
std::vector<Error>{ transaction_too_old(), future_version() }); std::vector<Error>{ transaction_too_old(), future_version() });
} }
state InstrumentRequest request;
request.start();
GetKeyValuesReply _rep = GetKeyValuesReply _rep =
wait(loadBalance(cx.getPtr(), wait(loadBalance(cx.getPtr(),
beginServer.second, beginServer.second,
@ -3035,6 +3039,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange(Database cx,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr));
rep = _rep; rep = _rep;
++cx->transactionPhysicalReadsCompleted; ++cx->transactionPhysicalReadsCompleted;
request.complete();
} catch (Error&) { } catch (Error&) {
++cx->transactionPhysicalReadsCompleted; ++cx->transactionPhysicalReadsCompleted;
throw; throw;

View File

@ -24,6 +24,7 @@
#include "flow/UnitTest.h" #include "flow/UnitTest.h"
#include "flow/DeterministicRandom.h" #include "flow/DeterministicRandom.h"
#include "flow/IThreadPool.h" #include "flow/IThreadPool.h"
#include "flow/WriteOnlySet.h"
#include "fdbrpc/fdbrpc.h" #include "fdbrpc/fdbrpc.h"
#include "fdbrpc/IAsyncFile.h" #include "fdbrpc/IAsyncFile.h"
#include "flow/TLSConfig.actor.h" #include "flow/TLSConfig.actor.h"
@ -283,6 +284,9 @@ struct YieldMockNetwork final : INetwork, ReferenceCounted<YieldMockNetwork> {
static TLSConfig emptyConfig; static TLSConfig emptyConfig;
return emptyConfig; return emptyConfig;
} }
ActorLineageSet& getActorLineageSet() override {
throw std::exception();
}
ProtocolVersion protocolVersion() override { return baseNetwork->protocolVersion(); } ProtocolVersion protocolVersion() override { return baseNetwork->protocolVersion(); }
}; };

View File

@ -31,6 +31,7 @@
#include "flow/IThreadPool.h" #include "flow/IThreadPool.h"
#include "flow/ProtocolVersion.h" #include "flow/ProtocolVersion.h"
#include "flow/Util.h" #include "flow/Util.h"
#include "flow/WriteOnlySet.h"
#include "fdbrpc/IAsyncFile.h" #include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/AsyncFileCached.actor.h" #include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileNonDurable.actor.h" #include "fdbrpc/AsyncFileNonDurable.actor.h"
@ -975,6 +976,10 @@ public:
bool checkRunnable() override { return net2->checkRunnable(); } bool checkRunnable() override { return net2->checkRunnable(); }
ActorLineageSet& getActorLineageSet() override {
return actorLineageSet;
}
void stop() override { isStopped = true; } void stop() override { isStopped = true; }
void addStopCallback(std::function<void()> fn) override { stopCallbacks.emplace_back(std::move(fn)); } void addStopCallback(std::function<void()> fn) override { stopCallbacks.emplace_back(std::move(fn)); }
bool isSimulated() const override { return true; } bool isSimulated() const override { return true; }
@ -2117,6 +2122,8 @@ public:
// Whether or not yield has returned true during the current iteration of the run loop // Whether or not yield has returned true during the current iteration of the run loop
bool yielded; bool yielded;
int yield_limit; // how many more times yield may return false before next returning true int yield_limit; // how many more times yield may return false before next returning true
ActorLineageSet actorLineageSet;
}; };
class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> { class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {

View File

@ -198,6 +198,8 @@ public:
bool checkRunnable() override; bool checkRunnable() override;
ActorLineageSet& getActorLineageSet() override;
bool useThreadPool; bool useThreadPool;
// private: // private:
@ -225,6 +227,8 @@ public:
std::atomic<bool> stopped; std::atomic<bool> stopped;
mutable std::map<IPAddress, bool> addressOnHostCache; mutable std::map<IPAddress, bool> addressOnHostCache;
ActorLineageSet actorLineageSet;
std::atomic<bool> started; std::atomic<bool> started;
uint64_t numYields; uint64_t numYields;
@ -1377,6 +1381,10 @@ bool Net2::checkRunnable() {
return !started.exchange(true); return !started.exchange(true);
} }
ActorLineageSet& Net2::getActorLineageSet() {
return actorLineageSet;
}
void Net2::run() { void Net2::run() {
TraceEvent::setNetworkThread(); TraceEvent::setNetworkThread();
TraceEvent("Net2Running"); TraceEvent("Net2Running");

View File

@ -3679,8 +3679,7 @@ void* sampleThread(void* arg) {
while (true) { while (true) {
threadSleep(1.0); // TODO: Read sample rate from global config threadSleep(1.0); // TODO: Read sample rate from global config
// TODO: Copy actor lineage of currently running actor // Get actor lineage of currently running actor.
// Read currentLineage
auto actorLineage = currentLineageThreadSafe.get(); auto actorLineage = currentLineageThreadSafe.get();
printf("Currently running actor lineage (%p):\n", actorLineage.getPtr()); printf("Currently running actor lineage (%p):\n", actorLineage.getPtr());
auto stack = actorLineage->stack(&StackLineage::actorName); auto stack = actorLineage->stack(&StackLineage::actorName);
@ -3690,11 +3689,16 @@ void* sampleThread(void* arg) {
} }
printf("\n"); printf("\n");
// Get lineage of actors waiting on disk.
auto diskAlps = IAsyncFileSystem::filesystem()->getActorLineageSet().copy(); auto diskAlps = IAsyncFileSystem::filesystem()->getActorLineageSet().copy();
printf("Disk ALPs: %d\n", diskAlps.size()); // printf("Disk ALPs: %d\n", diskAlps.size());
// TODO: Get lineage of actors waiting on network
auto networkAlps = g_network->getActorLineageSet().copy();
printf("Network ALPs: %d\n", networkAlps.size());
// TODO: Call collect on all actor lineages // TODO: Call collect on all actor lineages
for (auto actorLineage : diskAlps) { for (auto actorLineage : networkAlps) {
auto stack = actorLineage->stack(&StackLineage::actorName); auto stack = actorLineage->stack(&StackLineage::actorName);
while (!stack.empty()) { while (!stack.empty()) {
printf("%s ", stack.top()); printf("%s ", stack.top());

View File

@ -34,6 +34,7 @@
#include "flow/Arena.h" #include "flow/Arena.h"
#include "flow/IRandom.h" #include "flow/IRandom.h"
#include "flow/Trace.h" #include "flow/Trace.h"
#include "flow/WriteOnlySet.h"
enum class TaskPriority { enum class TaskPriority {
Max = 1000000, Max = 1000000,
@ -535,6 +536,9 @@ public:
// returns false. // returns false.
virtual bool checkRunnable() = 0; virtual bool checkRunnable() = 0;
// Returns the shared memory data structure used to store actor lineages.
virtual ActorLineageSet& getActorLineageSet() = 0;
virtual ProtocolVersion protocolVersion() = 0; virtual ProtocolVersion protocolVersion() = 0;
// Shorthand for transport().getLocalAddress() // Shorthand for transport().getLocalAddress()