From 6810a032838e257725d64bfe13cc18761bae81ec Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 21 Feb 2020 10:55:14 -0800 Subject: [PATCH 1/4] Add more logging to valley filler and mountain chopper --- fdbclient/NativeAPI.actor.cpp | 4 +- fdbserver/DataDistributionQueue.actor.cpp | 111 +++++++++++++++++----- flow/Trace.cpp | 55 ++++++++++- flow/Trace.h | 9 +- 4 files changed, 153 insertions(+), 26 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 9d9c6dcbed..182cddd231 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2548,8 +2548,8 @@ ACTOR void checkWrites( Database cx, Future committed, Promise outCo } else { Optional val = wait( tr.get( it->range().begin ) ); if( !val.present() || val.get() != m.setValue ) { - TraceEvent evt = TraceEvent(SevError, "CheckWritesFailed") - .detail("Class", "Set") + TraceEvent evt(SevError, "CheckWritesFailed"); + evt.detail("Class", "Set") .detail("Key", it->range().begin) .detail("Expected", m.setValue); if( !val.present() ) diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 0d2e4d0db1..0a35623753 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1128,8 +1128,10 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd } // Move a random shard of sourceTeam's to destTeam if sourceTeam has much more data than destTeam -ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference sourceTeam, Reference destTeam, bool primary ) { +ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference sourceTeam, + Reference destTeam, bool primary, TraceEvent *traceEvent ) { if(g_network->isSimulated() && g_simulator.speedUpSimulation) { + traceEvent->detail("CancelingDueToSimulationSpeedup", true); return false; } @@ -1139,6 +1141,9 @@ ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) ); + traceEvent->detail("AverageShardBytes", averageShardBytes) + .detail("ShardsInSource", shards.size()); + if( !shards.size() ) return false; @@ -1160,28 +1165,28 @@ ACTOR Future rebalanceTeams( DDQueueData* self, int priority, ReferencegetLoadBytes(false); int64_t destBytes = destTeam->getLoadBytes(); - if( sourceBytes - destBytes <= 3 * std::max( SERVER_KNOBS->MIN_SHARD_BYTES, metrics.bytes ) || metrics.bytes == 0 ) + + bool sourceAndDestTooSimilar = sourceBytes - destBytes <= 3 * std::max(SERVER_KNOBS->MIN_SHARD_BYTES, metrics.bytes); + traceEvent->detail("SourceBytes", sourceBytes) + .detail("DestBytes", destBytes) + .detail("ShardBytes", metrics.bytes) + .detail("SourceAndDestTooSimilar", sourceAndDestTooSimilar); + + if( sourceAndDestTooSimilar || metrics.bytes == 0 ) { return false; + } - { - //verify the shard is still in sabtf - std::vector shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) ); - for( int i = 0; i < shards.size(); i++ ) { - if( moveShard == shards[i] ) { - TraceEvent(priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ? "BgDDMountainChopper" : "BgDDValleyFiller", self->distributorId) - .detail("SourceBytes", sourceBytes) - .detail("DestBytes", destBytes) - .detail("ShardBytes", metrics.bytes) - .detail("AverageShardBytes", averageShardBytes) - .detail("SourceTeam", sourceTeam->getDesc()) - .detail("DestTeam", destTeam->getDesc()); - - self->output.send( RelocateShard( moveShard, priority ) ); - return true; - } + //verify the shard is still in sabtf + shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) ); + for( int i = 0; i < shards.size(); i++ ) { + if( moveShard == shards[i] ) { + traceEvent->detail("ShardStillPresent", true); + self->output.send( RelocateShard( moveShard, priority ) ); + return true; } } + traceEvent->detail("ShardStillPresent", false); return false; } @@ -1192,6 +1197,15 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd state double lastRead = 0; state bool skipCurrentLoop = false; loop { + state bool moved = false; + state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId); + traceEvent.suppressFor(5.0) + .detail("PollingInterval", rebalancePollingInterval); + + if(*self->lastLimited > 0) { + traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited); + } + try { state Future delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch); if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) { @@ -1204,6 +1218,9 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd } skipCurrentLoop = val.present(); } + + traceEvent.detail("Enabled", !skipCurrentLoop); + wait(delayF); if (skipCurrentLoop) { // set loop interval to avoid busy wait here. @@ -1211,18 +1228,31 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL); continue; } + + traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]); if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] < SERVER_KNOBS->DD_REBALANCE_PARALLELISM) { state Optional> randomTeam = wait(brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true, false, SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF)))); + + traceEvent.detail("DestTeam", printable(randomTeam.map([](const Reference& team){ + return team->getDesc(); + }))); + if (randomTeam.present()) { state Optional> loadedTeam = wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply( GetTeamRequest(true, true, false, true)))); + + traceEvent.detail("SourceTeam", printable(loadedTeam.map([](const Reference& team){ + return team->getDesc(); + }))); + if (loadedTeam.present()) { - bool moved = + bool _moved = wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(), - randomTeam.get(), teamCollectionIndex == 0)); + randomTeam.get(), teamCollectionIndex == 0, &traceEvent)); + moved = _moved; if (moved) { resetCount = 0; } else { @@ -1245,10 +1275,16 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT; } + + traceEvent.detail("ResetCount", resetCount); tr.reset(); } catch (Error& e) { + traceEvent.error(e); wait(tr.onError(e)); } + + traceEvent.detail("Moved", moved); + traceEvent.log(true); } } @@ -1259,6 +1295,15 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) state double lastRead = 0; state bool skipCurrentLoop = false; loop { + state bool moved = false; + state TraceEvent traceEvent("BgDDValleyFiller", self->distributorId); + traceEvent.suppressFor(5.0) + .detail("PollingInterval", rebalancePollingInterval); + + if(*self->lastLimited > 0) { + traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited); + } + try { state Future delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch); if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) { @@ -1271,6 +1316,9 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) } skipCurrentLoop = val.present(); } + + traceEvent.detail("Enabled", !skipCurrentLoop); + wait(delayF); if (skipCurrentLoop) { // set loop interval to avoid busy wait here. @@ -1278,17 +1326,30 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL); continue; } + + traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]); if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] < SERVER_KNOBS->DD_REBALANCE_PARALLELISM) { state Optional> randomTeam = wait(brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false, true)))); + + traceEvent.detail("SourceTeam", printable(randomTeam.map([](const Reference& team){ + return team->getDesc(); + }))); + if (randomTeam.present()) { state Optional> unloadedTeam = wait(brokenPromiseToNever( self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, true, true, false, SERVER_KNOBS->FREE_SPACE_RATIO_DD_CUTOFF)))); + + traceEvent.detail("DestTeam", printable(unloadedTeam.map([](const Reference& team){ + return team->getDesc(); + }))); + if (unloadedTeam.present()) { - bool moved = + bool _moved = wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(), - unloadedTeam.get(), teamCollectionIndex == 0)); + unloadedTeam.get(), teamCollectionIndex == 0, &traceEvent)); + moved = _moved; if (moved) { resetCount = 0; } else { @@ -1311,10 +1372,16 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL; resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT; } + + traceEvent.detail("ResetCount", resetCount); tr.reset(); } catch (Error& e) { + traceEvent.error(e); wait(tr.onError(e)); } + + traceEvent.detail("Moved", moved); + traceEvent.log(true); } } diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 5c788ffcca..88ef09afd7 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -653,6 +653,50 @@ void removeTraceRole(std::string role) { g_traceLog.removeRole(role); } +TraceEvent::TraceEvent() : initialized(true), enabled(false), logged(true) {} + +TraceEvent::TraceEvent(TraceEvent &&ev) { + enabled = ev.enabled; + err = ev.err; + fields = std::move(ev.fields); + id = ev.id; + initialized = ev.initialized; + logged = ev.logged; + maxEventLength = ev.maxEventLength; + maxFieldLength = ev.maxFieldLength; + severity = ev.severity; + tmpEventMetric = ev.tmpEventMetric; + trackingKey = ev.trackingKey; + type = ev.type; + + ev.initialized = true; + ev.enabled = false; + ev.logged = true; + ev.tmpEventMetric = nullptr; +} + +TraceEvent& TraceEvent::operator=(TraceEvent &&ev) { + enabled = ev.enabled; + err = ev.err; + fields = std::move(ev.fields); + id = ev.id; + initialized = ev.initialized; + logged = ev.logged; + maxEventLength = ev.maxEventLength; + maxFieldLength = ev.maxFieldLength; + severity = ev.severity; + tmpEventMetric = ev.tmpEventMetric; + trackingKey = ev.trackingKey; + type = ev.type; + + ev.initialized = true; + ev.enabled = false; + ev.logged = true; + ev.tmpEventMetric = nullptr; + + return *this; +} + TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) { g_trace_depth++; setMaxFieldLength(0); @@ -730,6 +774,8 @@ bool TraceEvent::init() { detail("Severity", int(severity)); detailf("Time", "%.6f", getCurrentTime()); + timeIndex = fields.size() - 1; + detail("Type", type); if(g_network && g_network->isSimulated()) { NetworkAddress local = g_network->getLocalAddress(); @@ -932,11 +978,14 @@ TraceEvent& TraceEvent::backtrace(const std::string& prefix) { return detail(prefix + "Backtrace", platform::get_backtrace()); } -void TraceEvent::log() { +void TraceEvent::log(bool useCurrentTime) { if(!logged) { init(); try { if (enabled) { + if (useCurrentTime) { + fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime()); + } if (this->severity == SevError) { severity = SevInfo; backtrace(); @@ -1150,6 +1199,10 @@ std::string TraceEventFields::getValue(std::string key) const { } } +TraceEventFields::Field& TraceEventFields::mutate(int index) { + return fields.at(index); +} + namespace { void parseNumericValue(std::string const& s, double &outValue, bool permissive = false) { double d = 0; diff --git a/flow/Trace.h b/flow/Trace.h index 532a084622..3cdab75040 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -80,6 +80,8 @@ public: int64_t getInt64(std::string key, bool permissive=false) const; double getDouble(std::string key, bool permissive=false) const; + Field &mutate(int index); + std::string toString() const; void validateFormat() const; template @@ -373,11 +375,15 @@ struct SpecialTraceMetricType TRACE_METRIC_TYPE(double, double); struct TraceEvent { + TraceEvent(); TraceEvent( const char* type, UID id = UID() ); // Assumes SevInfo severity TraceEvent( Severity, const char* type, UID id = UID() ); TraceEvent( struct TraceInterval&, UID id = UID() ); TraceEvent( Severity severity, struct TraceInterval& interval, UID id = UID() ); + TraceEvent( TraceEvent &&ev ); + TraceEvent& operator=( TraceEvent &&ev ); + static void setNetworkThread(); static bool isNetworkThread(); @@ -467,7 +473,7 @@ public: return enabled; } - void log(); + void log(bool useCurrentTime = false); ~TraceEvent(); // Actually logs the event @@ -489,6 +495,7 @@ private: int maxFieldLength; int maxEventLength; + int timeIndex; void setSizeLimits(); From 2e699fef55dfe4600d9639b34d4387d6355481ca Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 21 Feb 2020 11:28:59 -0800 Subject: [PATCH 2/4] Don't suppress actor cancellation because we've already initialized the trace event by adding details. --- fdbserver/DataDistributionQueue.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 0a35623753..edef406788 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1279,7 +1279,7 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd traceEvent.detail("ResetCount", resetCount); tr.reset(); } catch (Error& e) { - traceEvent.error(e); + traceEvent.error(e, true); // Log actor_cancelled because it's not legal to suppress an event that's initialized wait(tr.onError(e)); } @@ -1376,7 +1376,7 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) traceEvent.detail("ResetCount", resetCount); tr.reset(); } catch (Error& e) { - traceEvent.error(e); + traceEvent.error(e, true); // Log actor_cancelled because it's not legal to suppress an event that's initialized wait(tr.onError(e)); } From 2431d4d788ce7528ff2f260fb1452622235c0c6e Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 21 Feb 2020 13:57:04 -0800 Subject: [PATCH 3/4] Always compute the time for a trace event when it is being logged rather than when it is being created. Usually these are the same, but if they aren't, doing the opposite can lead to out of order trace events. --- flow/Error.h | 2 +- flow/Trace.cpp | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/flow/Error.h b/flow/Error.h index 0afe4d0d99..feb5b25bb1 100644 --- a/flow/Error.h +++ b/flow/Error.h @@ -111,7 +111,7 @@ extern bool isAssertDisabled( int line ); // ASSERT_WE_THINK() is to be used for assertions that we want to validate in testing, but which are judged too // risky to evaluate at runtime, because the code should work even if they are false and throwing internal_error() would // result in a bug. Don't use it for assertions that are *expensive*; look at EXPENSIVE_VALIDATION. -#define ASSERT_WE_THINK( condition ) ASSERT( !g_network->isSimulated() || (condition) ) +#define ASSERT_WE_THINK( condition ) ASSERT( !g_network || !g_network->isSimulated() || (condition) ) #define ABORT_ON_ERROR( code_to_run ) \ try { code_to_run; } \ diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 88ef09afd7..0222c2c2db 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -773,7 +773,7 @@ bool TraceEvent::init() { } detail("Severity", int(severity)); - detailf("Time", "%.6f", getCurrentTime()); + detail("Time", "0.000000"); timeIndex = fields.size() - 1; detail("Type", type); @@ -983,9 +983,8 @@ void TraceEvent::log(bool useCurrentTime) { init(); try { if (enabled) { - if (useCurrentTime) { - fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime()); - } + fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime()); + if (this->severity == SevError) { severity = SevInfo; backtrace(); From dfa5f76c01e40dd43435c56fb0b6aa10183cd8b0 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 21 Feb 2020 16:28:03 -0800 Subject: [PATCH 4/4] Remove unused parameter. Don't put check for g_network presence in ASSERT_WE_THINK. --- fdbserver/DataDistributionQueue.actor.cpp | 4 ++-- flow/Error.h | 2 +- flow/Trace.cpp | 7 +++++-- flow/Trace.h | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index edef406788..312cfcc47f 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1284,7 +1284,7 @@ ACTOR Future BgDDMountainChopper( DDQueueData* self, int teamCollectionInd } traceEvent.detail("Moved", moved); - traceEvent.log(true); + traceEvent.log(); } } @@ -1381,7 +1381,7 @@ ACTOR Future BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex) } traceEvent.detail("Moved", moved); - traceEvent.log(true); + traceEvent.log(); } } diff --git a/flow/Error.h b/flow/Error.h index feb5b25bb1..0afe4d0d99 100644 --- a/flow/Error.h +++ b/flow/Error.h @@ -111,7 +111,7 @@ extern bool isAssertDisabled( int line ); // ASSERT_WE_THINK() is to be used for assertions that we want to validate in testing, but which are judged too // risky to evaluate at runtime, because the code should work even if they are false and throwing internal_error() would // result in a bug. Don't use it for assertions that are *expensive*; look at EXPENSIVE_VALIDATION. -#define ASSERT_WE_THINK( condition ) ASSERT( !g_network || !g_network->isSimulated() || (condition) ) +#define ASSERT_WE_THINK( condition ) ASSERT( !g_network->isSimulated() || (condition) ) #define ABORT_ON_ERROR( code_to_run ) \ try { code_to_run; } \ diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 0222c2c2db..3a46381105 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -978,7 +978,7 @@ TraceEvent& TraceEvent::backtrace(const std::string& prefix) { return detail(prefix + "Backtrace", platform::get_backtrace()); } -void TraceEvent::log(bool useCurrentTime) { +void TraceEvent::log() { if(!logged) { init(); try { @@ -1327,6 +1327,9 @@ void TraceEventFields::validateFormat() const { } std::string traceableStringToString(const char* value, size_t S) { - ASSERT_WE_THINK(S > 0 && value[S - 1] == '\0'); + if(g_network) { + ASSERT_WE_THINK(S > 0 && value[S - 1] == '\0'); + } + return std::string(value, S - 1); // Exclude trailing \0 byte } diff --git a/flow/Trace.h b/flow/Trace.h index 3cdab75040..7bc4415035 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -473,7 +473,7 @@ public: return enabled; } - void log(bool useCurrentTime = false); + void log(); ~TraceEvent(); // Actually logs the event