From 433872e17d2a128acc87d1efcc51fac152d0d706 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Tue, 6 Apr 2021 17:28:28 -0700 Subject: [PATCH] Sample actors waiting on network --- fdbclient/InstrumentRequest.h | 50 +++++++++++++++++++++++++++++++++++ fdbclient/NativeAPI.actor.cpp | 5 ++++ fdbrpc/FlowTests.actor.cpp | 4 +++ fdbrpc/sim2.actor.cpp | 7 +++++ flow/Net2.actor.cpp | 8 ++++++ flow/Platform.actor.cpp | 12 ++++++--- flow/network.h | 4 +++ 7 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 fdbclient/InstrumentRequest.h diff --git a/fdbclient/InstrumentRequest.h b/fdbclient/InstrumentRequest.h new file mode 100644 index 0000000000..77adbd1490 --- /dev/null +++ b/fdbclient/InstrumentRequest.h @@ -0,0 +1,50 @@ +/* + * InstrumentRequest.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "flow/flow.h" +#include "flow/network.h" + +// Used to manually instrument waiting actors to collect samples for the +// sampling profiler. +struct InstrumentRequest { + unsigned index; + + InstrumentRequest() {} + + // This API isn't great. Ideally, no cleanup call is needed. I ran into an + // issue around the destructor being called twice because an instance of + // this class has to be stored as a class member (otherwise it goes away + // when wait is called), and due to how Flow does code generation the + // member will be default initialized and then initialized again when it is + // initially set. Then, the destructor will be called twice, causing issues + // when the WriteOnlySet tries to erase the same index twice. I'm working + // on this :) + + void start() { + index = g_network->getActorLineageSet().insert(currentLineage); + } + + void complete() { + g_network->getActorLineageSet().erase(index); + } +}; + diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index a0ed70997c..41e63c68f8 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -36,6 +36,7 @@ #include "fdbclient/ClusterInterface.h" #include "fdbclient/CoordinationInterface.h" #include "fdbclient/DatabaseContext.h" +#include "fdbclient/InstrumentRequest.h" #include "fdbclient/JsonBuilder.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/Knobs.h" @@ -1770,6 +1771,7 @@ void runNetwork() { if (networkOptions.traceDirectory.present() && networkOptions.runLoopProfilingEnabled) { setupRunLoopProfiler(); } + setupSamplingProfiler(); g_network->run(); @@ -3025,6 +3027,8 @@ ACTOR Future> getRange(Database cx, throw deterministicRandom()->randomChoice( std::vector{ transaction_too_old(), future_version() }); } + state InstrumentRequest request; + request.start(); GetKeyValuesReply _rep = wait(loadBalance(cx.getPtr(), beginServer.second, @@ -3035,6 +3039,7 @@ ACTOR Future> getRange(Database cx, cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); rep = _rep; ++cx->transactionPhysicalReadsCompleted; + request.complete(); } catch (Error&) { ++cx->transactionPhysicalReadsCompleted; throw; diff --git a/fdbrpc/FlowTests.actor.cpp b/fdbrpc/FlowTests.actor.cpp index 40e4ed1c52..c965149f70 100644 --- a/fdbrpc/FlowTests.actor.cpp +++ b/fdbrpc/FlowTests.actor.cpp @@ -24,6 +24,7 @@ #include "flow/UnitTest.h" #include "flow/DeterministicRandom.h" #include "flow/IThreadPool.h" +#include "flow/WriteOnlySet.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/IAsyncFile.h" #include "flow/TLSConfig.actor.h" @@ -283,6 +284,9 @@ struct YieldMockNetwork final : INetwork, ReferenceCounted { static TLSConfig emptyConfig; return emptyConfig; } + ActorLineageSet& getActorLineageSet() override { + throw std::exception(); + } ProtocolVersion protocolVersion() override { return baseNetwork->protocolVersion(); } }; diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index e9219f3ff3..4bd2c9399e 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -31,6 +31,7 @@ #include "flow/IThreadPool.h" #include "flow/ProtocolVersion.h" #include "flow/Util.h" +#include "flow/WriteOnlySet.h" #include "fdbrpc/IAsyncFile.h" #include "fdbrpc/AsyncFileCached.actor.h" #include "fdbrpc/AsyncFileNonDurable.actor.h" @@ -975,6 +976,10 @@ public: bool checkRunnable() override { return net2->checkRunnable(); } + ActorLineageSet& getActorLineageSet() override { + return actorLineageSet; + } + void stop() override { isStopped = true; } void addStopCallback(std::function fn) override { stopCallbacks.emplace_back(std::move(fn)); } bool isSimulated() const override { return true; } @@ -2117,6 +2122,8 @@ public: // Whether or not yield has returned true during the current iteration of the run loop bool yielded; int yield_limit; // how many more times yield may return false before next returning true + + ActorLineageSet actorLineageSet; }; class UDPSimSocket : public IUDPSocket, ReferenceCounted { diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 5026d6a982..bb3c675de4 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -198,6 +198,8 @@ public: bool checkRunnable() override; + ActorLineageSet& getActorLineageSet() override; + bool useThreadPool; // private: @@ -225,6 +227,8 @@ public: std::atomic stopped; mutable std::map addressOnHostCache; + ActorLineageSet actorLineageSet; + std::atomic started; uint64_t numYields; @@ -1377,6 +1381,10 @@ bool Net2::checkRunnable() { return !started.exchange(true); } +ActorLineageSet& Net2::getActorLineageSet() { + return actorLineageSet; +} + void Net2::run() { TraceEvent::setNetworkThread(); TraceEvent("Net2Running"); diff --git a/flow/Platform.actor.cpp b/flow/Platform.actor.cpp index 50f252021b..5be9b6423f 100644 --- a/flow/Platform.actor.cpp +++ b/flow/Platform.actor.cpp @@ -3679,8 +3679,7 @@ void* sampleThread(void* arg) { while (true) { threadSleep(1.0); // TODO: Read sample rate from global config - // TODO: Copy actor lineage of currently running actor - // Read currentLineage + // Get actor lineage of currently running actor. auto actorLineage = currentLineageThreadSafe.get(); printf("Currently running actor lineage (%p):\n", actorLineage.getPtr()); auto stack = actorLineage->stack(&StackLineage::actorName); @@ -3690,11 +3689,16 @@ void* sampleThread(void* arg) { } printf("\n"); + // Get lineage of actors waiting on disk. auto diskAlps = IAsyncFileSystem::filesystem()->getActorLineageSet().copy(); - printf("Disk ALPs: %d\n", diskAlps.size()); + // printf("Disk ALPs: %d\n", diskAlps.size()); + + // TODO: Get lineage of actors waiting on network + auto networkAlps = g_network->getActorLineageSet().copy(); + printf("Network ALPs: %d\n", networkAlps.size()); // TODO: Call collect on all actor lineages - for (auto actorLineage : diskAlps) { + for (auto actorLineage : networkAlps) { auto stack = actorLineage->stack(&StackLineage::actorName); while (!stack.empty()) { printf("%s ", stack.top()); diff --git a/flow/network.h b/flow/network.h index 33fb7b0f26..b335db3c2d 100644 --- a/flow/network.h +++ b/flow/network.h @@ -34,6 +34,7 @@ #include "flow/Arena.h" #include "flow/IRandom.h" #include "flow/Trace.h" +#include "flow/WriteOnlySet.h" enum class TaskPriority { Max = 1000000, @@ -535,6 +536,9 @@ public: // returns false. virtual bool checkRunnable() = 0; + // Returns the shared memory data structure used to store actor lineages. + virtual ActorLineageSet& getActorLineageSet() = 0; + virtual ProtocolVersion protocolVersion() = 0; // Shorthand for transport().getLocalAddress()