Emit traces regularly about role assignment
We are currently emitting Role transition traces when a role starts and when it ends. While this is useful for debugging, it doesn't work well with tools that inject data and might potentially miss some trace lines. We do decorate each trace lines with the roles assigned to that particular process, however, this is not sufficient for tools that can make use of the UID -> Role mapping
This commit is contained in:
parent
a08bbcc539
commit
5f9b127e56
|
@ -3092,6 +3092,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
self.addActor.send( monitorStorageCache(&self) );
|
||||
self.addActor.send( dbInfoUpdater(&self) );
|
||||
self.addActor.send( traceCounters("ClusterControllerMetrics", self.id, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self.clusterControllerMetrics, self.id.toString() + "/ClusterControllerMetrics") );
|
||||
self.addActor.send( traceRole(Role::CLUSTER_CONTROLLER, interf.id()) );
|
||||
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
|
||||
|
||||
loop choose {
|
||||
|
|
|
@ -454,7 +454,7 @@ struct LeaderRegisterCollection {
|
|||
try {
|
||||
// FIXME: Get worker ID here
|
||||
startRole(Role::COORDINATOR, id, UID());
|
||||
wait(actor);
|
||||
wait(actor || traceRole(Role::COORDINATOR, id));
|
||||
endRole(Role::COORDINATOR, id, "Coordinator changed");
|
||||
} catch (Error& err) {
|
||||
endRole(Role::COORDINATOR, id, err.what(), err.code() == error_code_actor_cancelled, err);
|
||||
|
|
|
@ -4855,6 +4855,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
|||
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, true, true);
|
||||
state ActorCollection actors(false);
|
||||
self->addActor.send(actors.getResult());
|
||||
self->addActor.send(traceRole(Role::DATA_DISTRIBUTOR, di.id()));
|
||||
|
||||
try {
|
||||
TraceEvent("DataDistributorRunning", di.id());
|
||||
|
|
|
@ -525,6 +525,7 @@ ACTOR Future<Void> logRouterCore(
|
|||
|
||||
addActor.send( pullAsyncData(&logRouterData) );
|
||||
addActor.send( cleanupPeekTrackers(&logRouterData) );
|
||||
addActor.send( traceRole(Role::LOG_ROUTER, interf.id()) );
|
||||
|
||||
loop choose {
|
||||
when( wait( dbInfoChange ) ) {
|
||||
|
|
|
@ -1914,6 +1914,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
state GetHealthMetricsReply detailedHealthMetricsReply;
|
||||
|
||||
addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
|
||||
addActor.send( traceRole(Role::MASTER_PROXY, proxy.id()) );
|
||||
|
||||
//TraceEvent("ProxyInit1", proxy.id());
|
||||
|
||||
|
|
|
@ -414,6 +414,7 @@ namespace oldTLog_4_6 {
|
|||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), recovery(Void())
|
||||
{
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, "Restored");
|
||||
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
|
||||
|
|
|
@ -487,6 +487,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
|
||||
{
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
|
||||
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
|
||||
|
|
|
@ -559,6 +559,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
|
||||
{
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
|
||||
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "flow/IndexedSet.h"
|
||||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbrpc/Smoother.h"
|
||||
|
@ -731,6 +732,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
|
||||
self.addActor.send( monitorServerListChange(&self, dbInfo, serverChanges) );
|
||||
self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
|
||||
self.addActor.send( traceRole(Role::RATEKEEPER, rkInterf.id()) );
|
||||
|
||||
TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
|
||||
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
|
||||
|
|
|
@ -315,6 +315,7 @@ ACTOR Future<Void> resolverCore(
|
|||
state ActorCollection actors(false);
|
||||
state Future<Void> doPollMetrics = self->resolverCount > 1 ? Void() : Future<Void>(Never());
|
||||
actors.add( waitFailureServer(resolver.waitFailure.getFuture()) );
|
||||
actors.add( traceRole(Role::RESOLVER, resolver.id()) );
|
||||
|
||||
TraceEvent("ResolverInit", resolver.id()).detail("RecoveryCount", initReq.recoveryCount);
|
||||
loop choose {
|
||||
|
|
|
@ -968,6 +968,7 @@ ACTOR Future<Void> storageCache(StorageServerInterface ssi, uint16_t id, Referen
|
|||
|
||||
// pullAsyncData actor pulls mutations from the TLog and also applies them.
|
||||
actors.add(pullAsyncData(&self));
|
||||
actors.add(traceRole(Role::STORAGE_CACHE, ssi.id()));
|
||||
|
||||
loop {
|
||||
++self.counters.loops;
|
||||
|
|
|
@ -556,6 +556,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
|
||||
{
|
||||
startRole(Role::TRANSACTION_LOG, interf.id(), tLogData->workerID, {{"SharedTLog", tLogData->dbgid.shortString()}}, context);
|
||||
addActor.send(traceRole(Role::TRANSACTION_LOG, interf.id()));
|
||||
|
||||
persistentDataVersion.init(LiteralStringRef("TLog.PersistentDataVersion"), cc.id);
|
||||
persistentDataDurableVersion.init(LiteralStringRef("TLog.PersistentDataDurableVersion"), cc.id);
|
||||
|
@ -2914,6 +2915,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
|
|||
|
||||
self.sharedActors.send( commitQueue(&self) );
|
||||
self.sharedActors.send( updateStorageLoop(&self) );
|
||||
self.sharedActors.send( traceRole(Role::SHARED_TRANSACTION_LOG, tlogId) );
|
||||
state Future<Void> activeSharedChange = Void();
|
||||
|
||||
loop {
|
||||
|
|
|
@ -688,6 +688,7 @@ private:
|
|||
|
||||
void startRole(const Role &role, UID roleId, UID workerId, const std::map<std::string, std::string> &details = std::map<std::string, std::string>(), const std::string &origination = "Recruited");
|
||||
void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error e = Error());
|
||||
ACTOR Future<Void> traceRole(Role role, UID roleId);
|
||||
|
||||
struct ServerDBInfo;
|
||||
|
||||
|
|
|
@ -1620,6 +1620,7 @@ ACTOR Future<Void> masterServer( MasterInterface mi, Reference<AsyncVar<ServerDB
|
|||
state PromiseStream<Future<Void>> addActor;
|
||||
state Reference<MasterData> self( new MasterData( db, mi, coordinators, db->get().clusterInterface, LiteralStringRef(""), addActor, forceRecovery ) );
|
||||
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
|
||||
self->addActor.send(traceRole(Role::MASTER, mi.id()));
|
||||
|
||||
TEST( !lifetime.isStillValid( db->get().masterLifetime, mi.id()==db->get().master.id() ) ); // Master born doomed
|
||||
TraceEvent("MasterLifetime", self->dbgid).detail("LifetimeToken", lifetime.toString());
|
||||
|
|
|
@ -3576,6 +3576,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
actors.add(metricsCore(self, ssi));
|
||||
actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
|
||||
actors.add(checkBehind(self));
|
||||
actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
|
||||
|
||||
self->coreStarted.send( Void() );
|
||||
|
||||
|
|
|
@ -518,7 +518,7 @@ ACTOR Future<Void> testerServerWorkload( WorkloadRequest work, Reference<Cluster
|
|||
fprintf(stderr, "ERROR: The workload could not be created.\n");
|
||||
throw test_specification_invalid();
|
||||
}
|
||||
Future<Void> test = runWorkloadAsync(cx, workIface, workload, work.databasePingDelay);
|
||||
Future<Void> test = runWorkloadAsync(cx, workIface, workload, work.databasePingDelay) || traceRole(Role::TESTER, workIface.id());
|
||||
work.reply.send(workIface);
|
||||
replied = true;
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
#include "fdbclient/ClientWorkerInterface.h"
|
||||
#include "flow/Profiler.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/Trace.h"
|
||||
|
||||
#ifdef __linux__
|
||||
#include <fcntl.h>
|
||||
|
@ -782,6 +783,18 @@ void endRole(const Role &role, UID id, std::string reason, bool ok, Error e) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void>
|
||||
traceRole(Role role, UID roleId, UID workerId)
|
||||
{
|
||||
loop {
|
||||
wait(delay(5.0));
|
||||
TraceEvent("Role", roleId)
|
||||
.detail("Transition", "Refresh")
|
||||
.detail("As", role.roleName)
|
||||
.detail("OnWorker", workerId);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFolder) {
|
||||
state ExecCmdValueString snapArg(snapReq.snapPayload);
|
||||
try {
|
||||
|
@ -1040,6 +1053,7 @@ ACTOR Future<Void> workerServer(
|
|||
details["DataFolder"] = folder;
|
||||
details["StoresPresent"] = format("%d", stores.size());
|
||||
startRole( Role::WORKER, interf.id(), interf.id(), details );
|
||||
errorForwarders.add(traceRole(Role::WORKER, interf.id()));
|
||||
|
||||
wait(waitForAll(recoveries));
|
||||
recoveredDiskFiles.send(Void());
|
||||
|
|
Loading…
Reference in New Issue