Merge pull request #3102 from mpilman/features/trace-roles

Emit traces regularly about role assignment
This commit is contained in:
A.J. Beamon 2020-05-15 08:12:25 -07:00 committed by GitHub
commit d3f465fd56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 33 additions and 3 deletions

View File

@ -3094,6 +3094,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
self.addActor.send( monitorStorageCache(&self) );
self.addActor.send( dbInfoUpdater(&self) );
self.addActor.send( traceCounters("ClusterControllerMetrics", self.id, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self.clusterControllerMetrics, self.id.toString() + "/ClusterControllerMetrics") );
self.addActor.send( traceRole(Role::CLUSTER_CONTROLLER, interf.id()) );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
loop choose {

View File

@ -490,7 +490,7 @@ struct LeaderRegisterCollection {
try {
// FIXME: Get worker ID here
startRole(Role::COORDINATOR, id, UID());
wait(actor);
wait(actor || traceRole(Role::COORDINATOR, id));
endRole(Role::COORDINATOR, id, "Coordinator changed");
} catch (Error& err) {
endRole(Role::COORDINATOR, id, err.what(), err.code() == error_code_actor_cancelled, err);

View File

@ -4859,6 +4859,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, true, true);
state ActorCollection actors(false);
self->addActor.send(actors.getResult());
self->addActor.send(traceRole(Role::DATA_DISTRIBUTOR, di.id()));
try {
TraceEvent("DataDistributorRunning", di.id());

View File

@ -525,6 +525,7 @@ ACTOR Future<Void> logRouterCore(
addActor.send( pullAsyncData(&logRouterData) );
addActor.send( cleanupPeekTrackers(&logRouterData) );
addActor.send( traceRole(Role::LOG_ROUTER, interf.id()) );
loop choose {
when( wait( dbInfoChange ) ) {

View File

@ -1962,6 +1962,7 @@ ACTOR Future<Void> masterProxyServerCore(
state GetHealthMetricsReply detailedHealthMetricsReply;
addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
addActor.send( traceRole(Role::MASTER_PROXY, proxy.id()) );
//TraceEvent("ProxyInit1", proxy.id());

View File

@ -414,6 +414,7 @@ namespace oldTLog_4_6 {
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void())
{
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, "Restored");
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);

View File

@ -487,6 +487,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
{
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);

View File

@ -559,6 +559,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
{
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/IndexedSet.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Smoother.h"
@ -1206,6 +1207,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
self.addActor.send( monitorServerListChange(&self, serverChanges) );
self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
self.addActor.send( traceRole(Role::RATEKEEPER, rkInterf.id()) );
self.addActor.send(monitorThrottlingChanges(&self));

View File

@ -315,6 +315,7 @@ ACTOR Future<Void> resolverCore(
state ActorCollection actors(false);
state Future<Void> doPollMetrics = self->resolverCount > 1 ? Void() : Future<Void>(Never());
actors.add( waitFailureServer(resolver.waitFailure.getFuture()) );
actors.add( traceRole(Role::RESOLVER, resolver.id()) );
TraceEvent("ResolverInit", resolver.id()).detail("RecoveryCount", initReq.recoveryCount);
loop choose {

View File

@ -968,6 +968,7 @@ ACTOR Future<Void> storageCache(StorageServerInterface ssi, uint16_t id, Referen
// pullAsyncData actor pulls mutations from the TLog and also applies them.
actors.add(pullAsyncData(&self));
actors.add(traceRole(Role::STORAGE_CACHE, ssi.id()));
loop {
++self.counters.loops;

View File

@ -556,6 +556,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
{
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
@ -2914,6 +2915,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
self.sharedActors.send( commitQueue(&self) );
self.sharedActors.send( updateStorageLoop(&self) );
self.sharedActors.send( traceRole(Role::SHARED_TRANSACTION_LOG, tlogId) );
state Future<Void> activeSharedChange = Void();
loop {

View File

@ -688,6 +688,7 @@ private:
void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details = std::map<std::string, std::string>(), const std::string &origination = "Recruited");
void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error e = Error());
ACTOR Future<Void> traceRole(Role role, UID roleId);
struct ServerDBInfo;

View File

@ -1619,6 +1619,7 @@ ACTOR Future<Void> masterServer( MasterInterface mi, Reference<AsyncVar<ServerDB
state PromiseStream<Future<Void>> addActor;
state Reference<MasterData> self( new MasterData( db, mi, coordinators, db->get().clusterInterface, LiteralStringRef(""), addActor, forceRecovery ) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
self->addActor.send(traceRole(Role::MASTER, mi.id()));
TEST( !lifetime.isStillValid( db->get().masterLifetime, mi.id()==db->get().master.id() ) ); // Master born doomed
TraceEvent("MasterLifetime", self->dbgid).detail("LifetimeToken", lifetime.toString());

View File

@ -3711,9 +3711,10 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
self->actors.add(serveGetKeyValuesRequests(self, ssi.getKeyValues.getFuture()));
self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture()));
self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture()));
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
self->transactionTagCounter.startNewInterval(self->thisServerID);
self->actors.add(recurring([this](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
self->coreStarted.send( Void() );

View File

@ -518,7 +518,7 @@ ACTOR Future<Void> testerServerWorkload( WorkloadRequest work, Reference<Cluster
fprintf(stderr, "ERROR: The workload could not be created.\n");
throw test_specification_invalid();
}
Future<Void> test = runWorkloadAsync(cx, workIface, workload, work.databasePingDelay);
Future<Void> test = runWorkloadAsync(cx, workIface, workload, work.databasePingDelay) || traceRole(Role::TESTER, workIface.id());
work.reply.send(workIface);
replied = true;

View File

@ -21,6 +21,7 @@
#include <tuple>
#include <boost/lexical_cast.hpp>
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
#include "flow/SystemMonitor.h"
#include "flow/TDMetric.actor.h"
@ -42,6 +43,7 @@
#include "fdbclient/ClientWorkerInterface.h"
#include "flow/Profiler.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/Trace.h"
#ifdef __linux__
#include <fcntl.h>
@ -783,6 +785,17 @@ void endRole(const Role &role, UID id, std::string reason, bool ok, Error e) {
}
}
ACTOR Future<Void>
traceRole(Role role, UID roleId)
{
loop {
wait(delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL));
TraceEvent("Role", roleId)
.detail("Transition", "Refresh")
.detail("As", role.roleName);
}
}
ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFolder) {
state ExecCmdValueString snapArg(snapReq.snapPayload);
try {
@ -1040,6 +1053,7 @@ ACTOR Future<Void> workerServer(
details["DataFolder"] = folder;
details["StoresPresent"] = format("%d", stores.size());
startRole( Role::WORKER, interf.id(), interf.id(), details );
errorForwarders.add(traceRole(Role::WORKER, interf.id()));
wait(waitForAll(recoveries));
recoveredDiskFiles.send(Void());