From a44ffd851e904f38565ddb1f9cb9bdd9e32976e0 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Fri, 29 Mar 2019 20:11:30 -0700 Subject: [PATCH 1/5] =?UTF-8?q?fix:=20the=20shared=20tlog=20could=20fail?= =?UTF-8?q?=20to=20update=20a=20stopped=20tlog=E2=80=99s=20queueCommitVers?= =?UTF-8?q?ion=20to=20version=20if=20a=20second=20tlog=20registered=20befo?= =?UTF-8?q?re=20it=20could=20issue=20the=20first=20commit=20for=20the=20tl?= =?UTF-8?q?og?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fdbserver/TLogServer.actor.cpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 2e966f5c65..6073ed665a 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1466,7 +1466,7 @@ ACTOR Future watchDegraded(TLogData* self) { return Void(); } -ACTOR Future doQueueCommit( TLogData* self, Reference logData ) { +ACTOR Future doQueueCommit( TLogData* self, Reference logData, std::vector> missingFinalCommit ) { state Version ver = logData->version.get(); state Version commitNumber = self->queueCommitBegin+1; state Version knownCommittedVersion = logData->knownCommittedVersion; @@ -1507,6 +1507,11 @@ ACTOR Future doQueueCommit( TLogData* self, Reference logData ) { logData->queueCommittedVersion.set(ver); self->queueCommitEnd.set(commitNumber); + for(auto& it : missingFinalCommit) { + TraceEvent("TLogCommitMissingFinalCommit", self->dbgid).detail("LogId", logData->logId).detail("Version", it->version.get()).detail("QueueVer", it->queueCommittedVersion.get()); + TEST(true); //A TLog was replaced before having a change to commit its queue + it->queueCommittedVersion.set(it->version.get()); + } return Void(); } @@ -1515,10 +1520,13 @@ ACTOR Future commitQueue( TLogData* self ) { loop { int foundCount = 0; + state std::vector> missingFinalCommit; for(auto it : self->id_data) { if(!it.second->stopped) { logData = it.second; foundCount++; + } else if(it.second->version.get() > std::max(it.second->queueCommittingVersion, it.second->queueCommittedVersion.get())) { + missingFinalCommit.push_back(it.second); } } @@ -1544,7 +1552,8 @@ ACTOR Future commitQueue( TLogData* self ) { while( self->queueCommitBegin != self->queueCommitEnd.get() && !self->largeDiskQueueCommitBytes.get() ) { wait( self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) || self->largeDiskQueueCommitBytes.onChange() ); } - self->sharedActors.send(doQueueCommit(self, logData)); + self->sharedActors.send(doQueueCommit(self, logData, missingFinalCommit)); + missingFinalCommit.clear(); } when(wait(self->newLogData.onTrigger())) {} } From e7ad39246c51ccc72b293be4818251f90516e1f6 Mon Sep 17 00:00:00 2001 From: Alex Miller <35046903+alexmiller-apple@users.noreply.github.com> Date: Fri, 29 Mar 2019 20:16:26 -0700 Subject: [PATCH 2/5] Fix typo --- fdbserver/TLogServer.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 6073ed665a..81d542a11f 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1509,7 +1509,7 @@ ACTOR Future doQueueCommit( TLogData* self, Reference logData, st for(auto& it : missingFinalCommit) { TraceEvent("TLogCommitMissingFinalCommit", self->dbgid).detail("LogId", logData->logId).detail("Version", it->version.get()).detail("QueueVer", it->queueCommittedVersion.get()); - TEST(true); //A TLog was replaced before having a change to commit its queue + TEST(true); //A TLog was replaced before having a chance to commit its queue it->queueCommittedVersion.set(it->version.get()); } return Void(); From 29a37beb207d9caa005f65a6cc12f6b6fc79e103 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Sat, 30 Mar 2019 12:01:36 -0700 Subject: [PATCH 3/5] fixed a valgrind correctness bug --- fdbclient/Knobs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index b577dcceba..e11cb8a127 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -57,7 +57,7 @@ ClientKnobs::ClientKnobs(bool randomize) { init( KEY_SIZE_LIMIT, 1e4 ); init( SYSTEM_KEY_SIZE_LIMIT, 3e4 ); init( VALUE_SIZE_LIMIT, 1e5 ); - init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - serverKeysPrefixFor(UID()).size() - 1; + init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1; init( METADATA_VERSION_CACHE_SIZE, 1000 ); init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; From d670b74d693ca16fbb87f8ccb9f777f4de0f50d2 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Sat, 30 Mar 2019 13:36:13 -0700 Subject: [PATCH 4/5] prevent trace event spam by combining huge arena samples --- flow/Arena.h | 2 +- flow/FastAlloc.cpp | 17 +++++++++++++++++ flow/FastAlloc.h | 1 + flow/Knobs.cpp | 1 + flow/Knobs.h | 1 + 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/flow/Arena.h b/flow/Arena.h index 519b369727..33b333a462 100644 --- a/flow/Arena.h +++ b/flow/Arena.h @@ -241,7 +241,7 @@ struct ArenaBlock : NonCopyable, ThreadSafeReferenceCounted b->bigUsed = sizeof(ArenaBlock); if(FLOW_KNOBS && g_nondeterministic_random && g_nondeterministic_random->random01() < (reqSize / FLOW_KNOBS->HUGE_ARENA_LOGGING_BYTES)) { - TraceEvent("HugeArenaSample").detail("Size", reqSize).backtrace(); + hugeArenaSample(reqSize); } g_hugeArenaMemory += reqSize; diff --git a/flow/FastAlloc.cpp b/flow/FastAlloc.cpp index dd5b41d66c..3000b0eafd 100644 --- a/flow/FastAlloc.cpp +++ b/flow/FastAlloc.cpp @@ -24,6 +24,7 @@ #include "flow/Trace.h" #include "flow/Error.h" #include "flow/Knobs.h" +#include "flow/flow.h" #include #include @@ -83,6 +84,22 @@ void setFastAllocatorThreadInitFunction( ThreadInitFunction f ) { int64_t g_hugeArenaMemory = 0; +double hugeArenaLastLogged = 0; +std::map> hugeArenaTraces; + +void hugeArenaSample(int size) { + auto& info = hugeArenaTraces[platform::get_backtrace()]; + info.first++; + info.second+=size; + if(now() - hugeArenaLastLogged > FLOW_KNOBS->HUGE_ARENA_LOGGING_INTERVAL) { + for(auto& it : hugeArenaTraces) { + TraceEvent("HugeArenaSample").detail("Count", it.second.first).detail("Size", it.second.second).detail("Backtrace", it.first); + } + hugeArenaLastLogged = now(); + hugeArenaTraces.clear(); + } +} + #ifdef ALLOC_INSTRUMENTATION INIT_SEG std::map allocInstr; INIT_SEG std::unordered_map> memSample; diff --git a/flow/FastAlloc.h b/flow/FastAlloc.h index 9966815216..dab4859bdf 100644 --- a/flow/FastAlloc.h +++ b/flow/FastAlloc.h @@ -152,6 +152,7 @@ private: }; extern int64_t g_hugeArenaMemory; +void hugeArenaSample(int size); void releaseAllThreadMagazines(); int64_t getTotalUnusedAllocatedMemory(); void setFastAllocatorThreadInitFunction( void (*)() ); // The given function will be called at least once in each thread that allocates from a FastAllocator. Currently just one such function is tracked. diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index ea011507e6..3c6804b175 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -49,6 +49,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) { init( RANDOMSEED_RETRY_LIMIT, 4 ); init( FAST_ALLOC_LOGGING_BYTES, 10e6 ); init( HUGE_ARENA_LOGGING_BYTES, 100e6 ); + init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 ); //connectionMonitor init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0; diff --git a/flow/Knobs.h b/flow/Knobs.h index 913a474b29..c1e1dcbea2 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -63,6 +63,7 @@ public: int RANDOMSEED_RETRY_LIMIT; double FAST_ALLOC_LOGGING_BYTES; double HUGE_ARENA_LOGGING_BYTES; + double HUGE_ARENA_LOGGING_INTERVAL; //slow task profiling double SLOWTASK_PROFILING_INTERVAL; From 8ebf771392a84de53741c345bf77c711695bbf23 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Sat, 30 Mar 2019 14:17:18 -0700 Subject: [PATCH 5/5] cleanup cluster controller trace events --- fdbserver/ClusterController.actor.cpp | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 28afc938bb..63f6ff9ba8 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -312,7 +312,6 @@ public: } if (logServerSet->size() < (addingDegraded == 0 ? desired : required)) { - TraceEvent(SevWarn,"GWFTADTooFew", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy", policy->info()).detail("DesiredLogs", desired).detail("AddingDegraded", addingDegraded); } else if (logServerSet->size() == required || logServerSet->size() <= desired) { if (logServerSet->validate(policy)) { @@ -1925,7 +1924,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { if (req.ratekeeperInterf.present()) { if((self->recruitingRatekeeperID.present() && self->recruitingRatekeeperID.get() != req.ratekeeperInterf.get().id()) || self->clusterControllerDcId != w.locality.dcId()) { - TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", req.ratekeeperInterf.get().id()) + TraceEvent("CC_HaltRegisteringRatekeeper", self->id).detail("RKID", req.ratekeeperInterf.get().id()) .detail("DcID", printable(self->clusterControllerDcId)) .detail("ReqDcID", printable(w.locality.dcId())) .detail("RecruitingRKID", self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID()); @@ -1935,7 +1934,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { const auto& ratekeeper = self->db.serverInfo->get().ratekeeper; TraceEvent("CC_RegisterRatekeeper", self->id).detail("RKID", rki.id()); if (ratekeeper.present() && ratekeeper.get().id() != rki.id() && self->id_worker.count(ratekeeper.get().locality.processId())) { - TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", ratekeeper.get().id()) + TraceEvent("CC_HaltPreviousRatekeeper", self->id).detail("RKID", ratekeeper.get().id()) .detail("DcID", printable(self->clusterControllerDcId)) .detail("ReqDcID", printable(w.locality.dcId())) .detail("RecruitingRKID", self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID()); @@ -2519,8 +2518,8 @@ ACTOR Future monitorDataDistributor(ClusterControllerData *self) { loop { if ( self->db.serverInfo->get().distributor.present() ) { wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) ); - TraceEvent("ClusterController", self->id) - .detail("DataDistributorDied", self->db.serverInfo->get().distributor.get().id()); + TraceEvent("CC_DataDistributorDied", self->id) + .detail("DistributorId", self->db.serverInfo->get().distributor.get().id()); self->db.clearInterf(ProcessClass::DataDistributorClass); } else { self->recruitingDistributor = true; @@ -2555,17 +2554,16 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { } self->recruitingRatekeeperID = req.reqId; - TraceEvent("ClusterController_RecruitRatekeeper", self->id).detail("Addr", worker.interf.address()).detail("RKID", req.reqId); + TraceEvent("CC_RecruitRatekeeper", self->id).detail("Addr", worker.interf.address()).detail("RKID", req.reqId); ErrorOr interf = wait( worker.interf.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) ); if (interf.present()) { - TraceEvent("ClusterController_RatekeeperRecruited", self->id).detail("Addr", worker.interf.address()); self->recruitRatekeeper.set(false); self->recruitingRatekeeperID = interf.get().id(); const auto& ratekeeper = self->db.serverInfo->get().ratekeeper; - TraceEvent("CC_RegisterRatekeeper", self->id).detail("RKID", interf.get().id()); + TraceEvent("CC_RatekeeperRecruited", self->id).detail("Addr", worker.interf.address()).detail("RKID", interf.get().id()); if (ratekeeper.present() && ratekeeper.get().id() != interf.get().id() && self->id_worker.count(ratekeeper.get().locality.processId())) { - TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", ratekeeper.get().id()) + TraceEvent("CC_HaltRatekeeperAfterRecruit", self->id).detail("RKID", ratekeeper.get().id()) .detail("DcID", printable(self->clusterControllerDcId)); self->id_worker[ratekeeper.get().locality.processId()].haltRatekeeper = brokenPromiseToNever(ratekeeper.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))); } @@ -2577,7 +2575,7 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { } } catch (Error& e) { - TraceEvent("ClusterController_RatekeeperRecruitError", self->id).error(e); + TraceEvent("CC_RatekeeperRecruitError", self->id).error(e); if ( e.code() != error_code_no_more_servers ) { throw; } @@ -2595,7 +2593,7 @@ ACTOR Future monitorRatekeeper(ClusterControllerData *self) { if ( self->db.serverInfo->get().ratekeeper.present() && !self->recruitRatekeeper.get() ) { choose { when(wait(waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) { - TraceEvent("ClusterController_RatekeeperDied", self->id) + TraceEvent("CC_RatekeeperDied", self->id) .detail("RKID", self->db.serverInfo->get().ratekeeper.get().id()); self->db.clearInterf(ProcessClass::RatekeeperClass); }