diff --git a/documentation/sphinx/source/release-notes/release-notes-620.rst b/documentation/sphinx/source/release-notes/release-notes-620.rst index 3c1c902724..fb7ade4edc 100644 --- a/documentation/sphinx/source/release-notes/release-notes-620.rst +++ b/documentation/sphinx/source/release-notes/release-notes-620.rst @@ -2,12 +2,16 @@ Release Notes ############# +6.2.29 +====== +* Fix invalid memory access on data distributor when snapshotting large clusters. `(PR #4076) `_ +* Add human-readable DateTime to trace events `(PR #4087) `_ + 6.2.28 ====== * Log detailed team collection information when median available space ratio of all teams is too low. `(PR #3912) `_ * Bug fix, blob client did not support authentication key sizes over 64 bytes. `(PR #3964) `_ - 6.2.27 ====== * For clusters with a large number of shards, avoid slow tasks in the data distributor by adding yields to the shard map destruction. `(PR #3834) `_ diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 90e9ce7610..6ae199f8be 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -37,16 +37,16 @@ typedef StringRef ValueRef; typedef int64_t Generation; enum { - tagLocalitySpecial = -1, + tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2) tagLocalityLogRouter = -2, - tagLocalityRemoteLog = -3, + tagLocalityRemoteLog = -3, // tag created by log router for remote tLogs tagLocalityUpgraded = -4, tagLocalitySatellite = -5, - tagLocalityLogRouterMapped = -6, // used by log router to pop from TLogs + tagLocalityLogRouterMapped = -6, // The pseudo tag used by log routers to pop the real LogRouter tag (i.e., -2) tagLocalityTxs = -7, tagLocalityBackup = -8, // used by backup role to pop from TLogs tagLocalityInvalid = -99 -}; //The TLog and LogRouter require these number to be as compact as possible +}; // The TLog and LogRouter require these number to be as compact as possible inline bool isPseudoLocality(int8_t locality) { return locality == tagLocalityLogRouterMapped || locality == tagLocalityBackup; @@ -54,6 +54,11 @@ inline bool isPseudoLocality(int8_t locality) { #pragma pack(push, 1) struct Tag { + // if locality > 0, + // locality decides which DC id the tLog is in; + // id decides which SS owns the tag; id <-> SS mapping is in the system keyspace: serverTagKeys. + // if locality < 0, locality decides the type of tLog set: satellite, LR, or remote tLog, etc. + // id decides which tLog in the tLog type will be used. int8_t locality; uint16_t id; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 15310c901d..0e3141b552 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1515,6 +1515,12 @@ ACTOR Future< vector< pair> > > getKeyRangeLoca } } +// Get the SS locations for each shard in the 'keys' key-range; +// Returned vector size is the number of shards in the input keys key-range. +// Returned vector element is pairs, where +// ShardRange is the whole shard key-range, not a part of the given key range. +// Example: If query the function with key range (b, d), the returned list of pairs could be something like: +// [([a, b1), locationInfo), ([b1, c), locationInfo), ([c, d1), locationInfo)]. template Future< vector< pair> > > getKeyRangeLocations( Database const& cx, KeyRange const& keys, int limit, bool reverse, F StorageServerInterface::*member, TransactionInfo const& info ) { ASSERT (!keys.empty()); diff --git a/fdbrpc/LoadBalance.actor.h b/fdbrpc/LoadBalance.actor.h index 71c89ea406..74ebe1d003 100644 --- a/fdbrpc/LoadBalance.actor.h +++ b/fdbrpc/LoadBalance.actor.h @@ -171,14 +171,11 @@ void addLaggingRequest(Future> reply, Promise requestFinis // failMon's information for load balancing and avoiding failed servers // If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers ACTOR template -Future< REPLY_TYPE(Request) > loadBalance( - Reference> alternatives, - RequestStream Interface::* channel, - Request request = Request(), - TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint, - bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically - QueueModel* model = NULL) -{ +Future loadBalance( + Reference> alternatives, RequestStream Interface::*channel, + Request request = Request(), TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint, + bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically + QueueModel* model = NULL) { state Future> firstRequest; state Optional firstRequestEndpoint; state Future> secondRequest; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index e909de515a..7d9d87c46b 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -4779,7 +4779,9 @@ ACTOR Future dataDistribution(Reference self, Promise state MoveKeysLock lock; state Reference primaryTeamCollection; state Reference remoteTeamCollection; + state bool trackerCancelled; loop { + trackerCancelled = false; try { loop { TraceEvent("DDInitTakingMoveKeysLock", self->ddId); @@ -4943,7 +4945,7 @@ ACTOR Future dataDistribution(Reference self, Promise actors.push_back(reportErrorsExcept( dataDistributionTracker(initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart, - anyZeroHealthyTeams, self->ddId, &shards), + anyZeroHealthyTeams, self->ddId, &shards, &trackerCancelled), "DDTracker", self->ddId, &normalDDQueueErrors())); actors.push_back(reportErrorsExcept( dataDistributionQueue(cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, @@ -4977,6 +4979,7 @@ ACTOR Future dataDistribution(Reference self, Promise return Void(); } catch( Error &e ) { + trackerCancelled = true; state Error err = e; TraceEvent("DataDistributorDestroyTeamCollections").error(e); self->teamCollection = nullptr; diff --git a/fdbserver/DataDistribution.actor.h b/fdbserver/DataDistribution.actor.h index 225f7ac5e2..22d95bcd9a 100644 --- a/fdbserver/DataDistribution.actor.h +++ b/fdbserver/DataDistribution.actor.h @@ -212,7 +212,7 @@ struct InitialDataDistribution : ReferenceCounted { struct ShardMetrics { StorageMetrics metrics; double lastLowBandwidthStartTime; - int shardCount; + int shardCount; // number of smaller shards whose metrics are aggregated in the ShardMetrics bool operator==(ShardMetrics const& rhs) const { return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime && @@ -229,33 +229,22 @@ struct ShardTrackedData { Reference>> stats; }; -Future dataDistributionTracker( - Reference const& initData, - Database const& cx, - PromiseStream const& output, - Reference const& shardsAffectedByTeamFailure, - PromiseStream const& getShardMetrics, - PromiseStream const& getShardMetricsList, - FutureStream> const& getAverageShardBytes, - Promise const& readyToStart, - Reference> const& zeroHealthyTeams, - UID const& distributorId, - KeyRangeMap* const& shards); +ACTOR Future dataDistributionTracker(Reference initData, Database cx, + PromiseStream output, + Reference shardsAffectedByTeamFailure, + PromiseStream getShardMetrics, + PromiseStream getShardMetricsList, + FutureStream> getAverageShardBytes, + Promise readyToStart, Reference> anyZeroHealthyTeams, + UID distributorId, KeyRangeMap* shards, + bool const* trackerCancelled); -Future dataDistributionQueue( - Database const& cx, - PromiseStream const& output, - FutureStream const& input, - PromiseStream const& getShardMetrics, - Reference> const& processingUnhealthy, - vector const& teamCollection, - Reference const& shardsAffectedByTeamFailure, - MoveKeysLock const& lock, - PromiseStream> const& getAverageShardBytes, - UID const& distributorId, - int const& teamSize, - int const& singleRegionTeamSize, - double* const& lastLimited); +ACTOR Future dataDistributionQueue( + Database cx, PromiseStream output, FutureStream input, + PromiseStream getShardMetrics, Reference> processingUnhealthy, + vector teamCollection, Reference shardsAffectedByTeamFailure, + MoveKeysLock lock, PromiseStream> getAverageShardBytes, UID distributorId, int teamSize, + int singleRegionTeamSize, double* lastLimited); //Holds the permitted size and IO Bounds for a shard struct ShardSizeBounds { diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index d3ad7eaa41..eb45756e02 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -18,8 +18,9 @@ * limitations under the License. */ -#include #include +#include +#include #include "flow/ActorCollection.h" #include "flow/Util.h" @@ -82,8 +83,8 @@ struct RelocateData { }; class ParallelTCInfo : public ReferenceCounted, public IDataDistributionTeam { - vector> teams; - vector tempServerIDs; + std::vector> teams; + std::vector tempServerIDs; int64_t sum(std::function func) const { int64_t result = 0; @@ -94,11 +95,11 @@ class ParallelTCInfo : public ReferenceCounted, public IDataDist } template - vector collect(std::function(IDataDistributionTeam const&)> func) const { - vector result; + std::vector collect(std::function(IDataDistributionTeam const&)> func) const { + std::vector result; for (const auto& team : teams) { - vector newItems = func(*team); + std::vector newItems = func(*team); result.insert(result.end(), newItems.begin(), newItems.end()); } return result; @@ -124,7 +125,7 @@ public: return !any([func](IDataDistributionTeam const& team) { return !func(team); }); } - vector getLastKnownServerInterfaces() const override { + std::vector getLastKnownServerInterfaces() const override { return collect( [](IDataDistributionTeam const& team) { return team.getLastKnownServerInterfaces(); }); } @@ -137,11 +138,11 @@ public: return totalSize; } - vector const& getServerIDs() const override { + std::vector const& getServerIDs() const override { static vector tempServerIDs; tempServerIDs.clear(); for (const auto& team : teams) { - vector const &childIDs = team->getServerIDs(); + std::vector const& childIDs = team->getServerIDs(); tempServerIDs.insert(tempServerIDs.end(), childIDs.begin(), childIDs.end()); } return tempServerIDs; @@ -184,7 +185,7 @@ public: } virtual Future updateStorageMetrics() { - vector> futures; + std::vector> futures; for (auto& team : teams) { futures.push_back(team->updateStorageMetrics()); @@ -247,7 +248,7 @@ public: }; struct Busyness { - vector ledger; + std::vector ledger; Busyness() : ledger( 10, 0 ) {} @@ -551,8 +552,8 @@ struct DDQueueData { if(keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) { for( int shard = 0; shard < keyServersEntries.size(); shard++ ) { - vector src, dest; - decodeKeyServersValue( UIDtoTagMap, keyServersEntries[shard].value, src, dest ); + std::vector src, dest; + decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest); ASSERT( src.size() ); for( int i = 0; i < src.size(); i++ ) { servers.insert( src[i] ); @@ -855,7 +856,7 @@ struct DDQueueData { startedHere++; // update both inFlightActors and inFlight key range maps, cancelling deleted RelocateShards - vector ranges; + std::vector ranges; inFlightActors.getRangesAffectedByInsertion( rd.keys, ranges ); inFlightActors.cancel( KeyRangeRef( ranges.front().begin, ranges.back().end ) ); inFlight.insert( rd.keys, rd ); @@ -1437,7 +1438,7 @@ ACTOR Future dataDistributionQueue( state RelocateData launchData; state Future recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL); - state vector> balancingFutures; + state std::vector> balancingFutures; state ActorCollectionNoErrors actors; state PromiseStream rangesComplete; diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 14640a980b..03fa937b64 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -91,14 +91,43 @@ struct DataDistributionTracker { // Read hot detection PromiseStream readHotShard; + // The reference to trackerCancelled must be extracted by actors, + // because by the time (trackerCancelled == true) this memory cannot + // be accessed + bool const& trackerCancelled; + + // This class extracts the trackerCancelled reference from a DataDistributionTracker object + // Because some actors spawned by the dataDistributionTracker outlive the DataDistributionTracker + // object, we must guard against memory errors by using a GetTracker functor to access + // the DataDistributionTracker object. + class SafeAccessor { + bool const& trackerCancelled; + DataDistributionTracker& tracker; + + public: + SafeAccessor(DataDistributionTracker* tracker) + : trackerCancelled(tracker->trackerCancelled), tracker(*tracker) { + ASSERT(!trackerCancelled); + } + + DataDistributionTracker* operator()() { + if (trackerCancelled) { + TEST(true); // Trying to access DataDistributionTracker after tracker has been cancelled + throw dd_tracker_cancelled(); + } + return &tracker; + } + }; + DataDistributionTracker(Database cx, UID distributorId, Promise const& readyToStart, PromiseStream const& output, Reference shardsAffectedByTeamFailure, - Reference> anyZeroHealthyTeams, KeyRangeMap& shards) + Reference> anyZeroHealthyTeams, KeyRangeMap& shards, + bool const& trackerCancelled) : cx(cx), distributorId(distributorId), dbSizeEstimate(new AsyncVar()), systemSizeEstimate(0), maxShardSize(new AsyncVar>()), sizeChanges(false), readyToStart(readyToStart), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams), - shards(shards) {} + shards(shards), trackerCancelled(trackerCancelled) {} ~DataDistributionTracker() { @@ -150,7 +179,7 @@ int64_t getMaxShardSize( double dbSizeEstimate ) { (int64_t)SERVER_KNOBS->MAX_SHARD_BYTES); } -ACTOR Future trackShardMetrics(DataDistributionTracker* self, KeyRange keys, +ACTOR Future trackShardMetrics(DataDistributionTracker::SafeAccessor self, KeyRange keys, Reference>> shardMetrics) { state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus( shardMetrics->get().get().metrics ) : BandwidthStatusNormal; state double lastLowBandwidthStartTime = shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now(); @@ -209,7 +238,7 @@ ACTOR Future trackShardMetrics(DataDistributionTracker* self, KeyRange key // TraceEvent("RHDTriggerReadHotLoggingForShard") // .detail("ShardBegin", keys.begin.printable().c_str()) // .detail("ShardEnd", keys.end.printable().c_str()); - self->readHotShard.send(keys); + self()->readHotShard.send(keys); } else { ASSERT(false); } @@ -230,7 +259,8 @@ ACTOR Future trackShardMetrics(DataDistributionTracker* self, KeyRange key bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity; loop { - Transaction tr(self->cx); + Transaction tr(self()->cx); + // metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range std::pair, int> metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, shardCount ) ); if(metrics.first.present()) { BandwidthStatus newBandwidthStatus = getBandwidthStatus( metrics.first.get() ); @@ -253,9 +283,11 @@ ACTOR Future trackShardMetrics(DataDistributionTracker* self, KeyRange key .detail("TrackerID", trackerID);*/ if( shardMetrics->get().present() ) { - self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes ); + self()->dbSizeEstimate->set(self()->dbSizeEstimate->get() + metrics.first.get().bytes - + shardMetrics->get().get().metrics.bytes); if(keys.begin >= systemKeys.begin) { - self->systemSizeEstimate += metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes; + self()->systemSizeEstimate += + metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes; } } @@ -272,8 +304,9 @@ ACTOR Future trackShardMetrics(DataDistributionTracker* self, KeyRange key } } } catch( Error &e ) { - if (e.code() != error_code_actor_cancelled) - self->output.sendError(e); // Propagate failure to dataDistributionTracker + if (e.code() != error_code_actor_cancelled && e.code() != error_code_dd_tracker_cancelled) { + self()->output.sendError(e); // Propagate failure to dataDistributionTracker + } throw e; } } @@ -561,6 +594,8 @@ Future shardMerger( shardsMerged++; auto shardBounds = getShardSizeBounds( merged, maxShardSize ); + // If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it means + // the shard's metric may not be stable yet. So we cannot continue merging in this direction. if( endingStats.bytes >= shardBounds.min.bytes || getBandwidthStatus( endingStats ) != BandwidthStatusLow || now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY || @@ -591,13 +626,21 @@ Future shardMerger( //restarting shard tracker will derefenced values in the shard map, so make a copy KeyRange mergeRange = merged; + // OldKeys: Shards in the key range are merged as one shard defined by NewKeys; + // NewKeys: New key range after shards are merged; + // EndingSize: The new merged shard size in bytes; + // BatchedMerges: The number of shards merged. Each shard is defined in self->shards; + // LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's status + // becomes BandwidthStatusLow less than DD_LOW_BANDWIDTH_DELAY ago, the merging logic will stop at the shard; + // ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may have + // more than 1 shards. TraceEvent("RelocateShardMergeMetrics", self->distributorId) - .detail("OldKeys", keys) - .detail("NewKeys", mergeRange) - .detail("EndingSize", endingStats.bytes) - .detail("BatchedMerges", shardsMerged) - .detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime) - .detail("ShardCount", shardCount); + .detail("OldKeys", keys) + .detail("NewKeys", mergeRange) + .detail("EndingSize", endingStats.bytes) + .detail("BatchedMerges", shardsMerged) + .detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime) + .detail("ShardCount", shardCount); if(mergeRange.begin < systemKeys.begin) { self->systemSizeEstimate -= systemBytes; @@ -667,18 +710,14 @@ ACTOR Future shardEvaluator( return Void(); } -ACTOR Future shardTracker( - DataDistributionTracker* self, - KeyRange keys, - Reference>> shardSize) -{ - wait( yieldedFuture(self->readyToStart.getFuture()) ); +ACTOR Future shardTracker(DataDistributionTracker::SafeAccessor self, KeyRange keys, + Reference>> shardSize) { + wait(yieldedFuture(self()->readyToStart.getFuture())); if( !shardSize->get().present() ) wait( shardSize->onChange() ); - if( !self->maxShardSize->get().present() ) - wait( yieldedFuture(self->maxShardSize->onChange()) ); + if (!self()->maxShardSize->get().present()) wait(yieldedFuture(self()->maxShardSize->onChange())); // Since maxShardSize will become present for all shards at once, avoid slow tasks with a short delay wait( delay( 0, TaskPriority::DataDistribution ) ); @@ -686,27 +725,26 @@ ACTOR Future shardTracker( // Survives multiple calls to shardEvaluator and keeps merges from happening too quickly. state Reference wantsToMerge( new HasBeenTrueFor( shardSize->get() ) ); - /*TraceEvent("ShardTracker", self->distributorId) - .detail("Begin", keys.begin) - .detail("End", keys.end) - .detail("TrackerID", trackerID) - .detail("MaxBytes", self->maxShardSize->get().get()) - .detail("ShardSize", shardSize->get().get().bytes) - .detail("BytesPerKSec", shardSize->get().get().bytesPerKSecond);*/ + /*TraceEvent("ShardTracker", self()->distributorId) + .detail("Begin", keys.begin) + .detail("End", keys.end) + .detail("TrackerID", trackerID) + .detail("MaxBytes", self()->maxShardSize->get().get()) + .detail("ShardSize", shardSize->get().get().bytes) + .detail("BytesPerKSec", shardSize->get().get().bytesPerKSecond);*/ try { loop { // Use the current known size to check for (and start) splits and merges. - wait( shardEvaluator( self, keys, shardSize, wantsToMerge ) ); + wait(shardEvaluator(self(), keys, shardSize, wantsToMerge)); // We could have a lot of actors being released from the previous wait at the same time. Immediately calling // delay(0) mitigates the resulting SlowTask wait( delay(0, TaskPriority::DataDistribution) ); } } catch (Error& e) { - // If e is broken_promise then self may have already been deleted - if (e.code() != error_code_actor_cancelled && e.code() != error_code_broken_promise) { - self->output.sendError(e); // Propagate failure to dataDistributionTracker + if (e.code() != error_code_actor_cancelled && e.code() != error_code_dd_tracker_cancelled) { + self()->output.sendError(e); // Propagate failure to dataDistributionTracker } throw e; } @@ -738,8 +776,8 @@ void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optio ShardTrackedData data; data.stats = shardMetrics; - data.trackShard = shardTracker(self, ranges[i], shardMetrics); - data.trackBytes = trackShardMetrics(self, ranges[i], shardMetrics); + data.trackShard = shardTracker(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics); + data.trackBytes = trackShardMetrics(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics); self->shards.insert( ranges[i], data ); } } @@ -862,9 +900,10 @@ ACTOR Future dataDistributionTracker(Reference in PromiseStream getShardMetricsList, FutureStream> getAverageShardBytes, Promise readyToStart, Reference> anyZeroHealthyTeams, - UID distributorId, KeyRangeMap* shards) { + UID distributorId, KeyRangeMap* shards, + bool const* trackerCancelled) { state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure, - anyZeroHealthyTeams, *shards); + anyZeroHealthyTeams, *shards, *trackerCancelled); state Future loggingTrigger = Void(); state Future readHotDetect = readHotDetector(&self); try { diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index c44b708c3f..92adeb7387 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -579,6 +579,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0; init( OPERATION_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) OPERATION_COST_BYTE_FACTOR = 4096; init( PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS, true ); if( randomize && BUGGIFY ) PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS = false; + init( REPORT_DD_METRICS, true ); + init( DD_METRICS_REPORT_INTERVAL, 30.0 ); + init( FETCH_KEYS_TOO_LONG_TIME_CRITERIA, 300.0 ); //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; @@ -611,6 +614,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( MAX_STATUS_REQUESTS_PER_SECOND, 256.0 ); init( CONFIGURATION_ROWS_TO_FETCH, 20000 ); init( DISABLE_DUPLICATE_LOG_WARNING, false ); + init( HISTOGRAM_REPORT_INTERVAL, 300.0 ); // IPager init( PAGER_RESERVED_PAGES, 1 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 3e2e7d7166..6c10e5a268 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -508,6 +508,9 @@ public: double READ_TAG_MEASUREMENT_INTERVAL; int64_t OPERATION_COST_BYTE_FACTOR; bool PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS; + bool REPORT_DD_METRICS; + double DD_METRICS_REPORT_INTERVAL; + double FETCH_KEYS_TOO_LONG_TIME_CRITERIA; //Wait Failure int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS; @@ -540,6 +543,7 @@ public: double MAX_STATUS_REQUESTS_PER_SECOND; int CONFIGURATION_ROWS_TO_FETCH; bool DISABLE_DUPLICATE_LOG_WARNING; + double HISTOGRAM_REPORT_INTERVAL; // IPager int PAGER_RESERVED_PAGES; diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 83930fe536..ce50cd65a0 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -78,20 +78,23 @@ struct LogRouterData { const UID dbgid; Reference>> logSystem; Optional primaryPeekLocation; - NotifiedVersion version; - NotifiedVersion minPopped; + NotifiedVersion version; // The largest version at which the log router has peeked mutations + // from satellite tLog or primary tLogs. + NotifiedVersion minPopped; // The minimum version among all tags that has been popped by remote tLogs. const Version startVersion; - Version minKnownCommittedVersion; + Version minKnownCommittedVersion; // The minimum durable version among all LRs. + // A LR's durable version is the maximum version of mutations that have been + // popped by remote tLog. Version poppedVersion; Deque>>> messageBlocks; Tag routerTag; bool allowPops; LogSet logSet; - bool foundEpochEnd; - double waitForVersionTime = 0; - double maxWaitForVersionTime = 0; - double getMoreTime = 0; - double maxGetMoreTime = 0; + bool foundEpochEnd; // Cluster is not fully recovered yet. LR has to handle recovery + double waitForVersionTime = 0; // The total amount of time LR waits for remote tLog to peek and pop its data. + double maxWaitForVersionTime = 0; // The max one-instance wait time when LR must wait for remote tLog to pop data. + double getMoreTime = 0; // The total amount of time LR waits for satellite tLog's data to become available. + double maxGetMoreTime = 0; // The max wait time LR spent in a pull-data-request to satellite tLog. int64_t generation = -1; Reference peekLatencyDist; @@ -103,7 +106,9 @@ struct LogRouterData { std::map peekTracker; CounterCollection cc; - Counter getMoreCount, getMoreBlockedCount; + Counter getMoreCount; // Increase by 1 when LR tries to pull data from satellite tLog. + Counter + getMoreBlockedCount; // Increase by 1 if data is not available when LR tries to pull data from satellite tLog. Future logger; Reference eventCacheHolder; @@ -148,8 +153,10 @@ struct LogRouterData { eventCacheHolder = Reference( new EventCacheHolder(dbgid.shortString() + ".PeekLocation") ); - specialCounter(cc, "Version", [this](){ return this->version.get(); }); + // FetchedVersions: How many version of mutations buffered at LR and have not been popped by remote tLogs + specialCounter(cc, "Version", [this]() { return this->version.get(); }); specialCounter(cc, "MinPopped", [this](){ return this->minPopped.get(); }); + // TODO: Add minPopped locality and minPoppedId, similar as tLog Metrics specialCounter(cc, "FetchedVersions", [this](){ return std::max(0, std::min(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS, this->version.get() - this->minPopped.get())); }); specialCounter(cc, "MinKnownCommittedVersion", [this](){ return this->minKnownCommittedVersion; }); specialCounter(cc, "PoppedVersion", [this](){ return this->poppedVersion; }); @@ -222,8 +229,15 @@ ACTOR Future waitForVersion( LogRouterData *self, Version ver ) { // Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version. state double startTime = now(); if(self->version.get() < self->startVersion) { + // Log router needs to wait for remote tLogs to process data, whose version is less than self->startVersion, + // before the log router can pull more data (i.e., data after self->startVersion) from satellite tLog; + // This prevents LR from getting OOM due to it pulls too much data from satellite tLog at once; + // Note: each commit writes data to both primary tLog and satellite tLog. Satellite tLog can be viewed as + // a part of primary tLogs. if(ver > self->startVersion) { self->version.set(self->startVersion); + // Wait for remote tLog to peek and pop from LR, + // so that LR's minPopped version can increase to self->startVersion wait(self->minPopped.whenAtLeast(self->version.get())); } self->waitForVersionTime += now() - startTime; @@ -231,6 +245,9 @@ ACTOR Future waitForVersion( LogRouterData *self, Version ver ) { return Void(); } if(!self->foundEpochEnd) { + // Similar to proxy that does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions oustanding; + // Log router does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions outstanding because + // remote SS cannot roll back to more than MAX_READ_TRANSACTION_LIFE_VERSIONS ago. wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))); } else { while(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < ver) { @@ -250,6 +267,7 @@ ACTOR Future waitForVersion( LogRouterData *self, Version ver ) { return Void(); } +// Log router pull data from satellite tLog ACTOR Future pullAsyncData( LogRouterData *self ) { state Future dbInfoChange = Void(); state Reference r; @@ -581,6 +599,7 @@ ACTOR Future logRouterCore( addActor.send( logRouterPeekMessages( &logRouterData, req ) ); } when( TLogPopRequest req = waitNext( interf.popMessages.getFuture() ) ) { + // Request from remote tLog to pop data from LR addActor.send( logRouterPop( &logRouterData, req ) ); } when (wait(error)) {} diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 2634fe1b42..e8e81c1a4b 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -87,6 +87,14 @@ struct ProxyStats { Counter conflictRanges; Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors; Version lastCommitVersionAssigned; + double transactionRateAllowed, batchTransactionRateAllowed; + double transactionLimit, batchTransactionLimit; + // how much of the GRV requests queue was processed in one attempt to hand out read version. + double percentageOfDefaultGRVQueueProcessed; + double percentageOfBatchGRVQueueProcessed; + + LatencySample defaultTxnGRVTimeInQueue; + LatencySample batchTxnGRVTimeInQueue; LatencySample commitLatencySample; LatencySample grvLatencySample; @@ -136,36 +144,53 @@ struct ProxyStats { return r; } - explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr) - : cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), - maxComputeNS(0), minComputeNS(1e12), - bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS), - txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), - txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), - txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), - txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), - txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), - txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc), - txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), - txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc), - txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), - txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc), - txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc), - txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc), - commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), - conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc), - keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc), - lastCommitVersionAssigned(0), - commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), - grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE), - commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), - grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { + explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, + int64_t* commitBatchesMemBytesCountPtr) + : cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), maxComputeNS(0), minComputeNS(1e12), + bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS), + txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc), + txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc), + txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), + txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), + txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), + txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc), + txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), + txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc), + txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), + txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc), + txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc), + txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc), + commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), + conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc), + keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc), + lastCommitVersionAssigned(0), + commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), + grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), + defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + transactionRateAllowed(0), batchTransactionRateAllowed(0), transactionLimit(0), batchTransactionLimit(0), + percentageOfDefaultGRVQueueProcessed(0), percentageOfBatchGRVQueueProcessed(0) { specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;}); specialCounter(cc, "Version", [pVersion](){return *pVersion; }); specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); }); specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; }); specialCounter(cc, "MaxCompute", [this](){ return this->getAndResetMaxCompute(); }); specialCounter(cc, "MinCompute", [this](){ return this->getAndResetMinCompute(); }); + // The rate at which the limit(budget) is allowed to grow. + specialCounter(cc, "SystemAndDefaultTxnRateAllowed", [this]() { return this->transactionRateAllowed; }); + specialCounter(cc, "BatchTransactionRateAllowed", [this]() { return this->batchTransactionRateAllowed; }); + specialCounter(cc, "SystemAndDefaultTxnLimit", [this]() { return this->transactionLimit; }); + specialCounter(cc, "BatchTransactionLimit", [this]() { return this->batchTransactionLimit; }); + specialCounter(cc, "PercentageOfDefaultGRVQueueProcessed", + [this]() { return this->percentageOfDefaultGRVQueueProcessed; }); + specialCounter(cc, "PercentageOfBatchGRVQueueProcessed", + [this]() { return this->percentageOfBatchGRVQueueProcessed; }); logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics"); for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) { requestBuckets.push_back(0); @@ -246,10 +271,12 @@ struct TransactionRateInfo { } }; - -ACTOR Future getRate(UID myID, Reference> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, TransactionRateInfo *transactionRateInfo, - TransactionRateInfo *batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply, - TransactionTagMap* transactionTagCounter, PrioritizedTransactionTagMap* throttledTags) { +ACTOR Future getRate(UID myID, Reference> db, int64_t* inTransactionCount, + int64_t* inBatchTransactionCount, TransactionRateInfo* transactionRateInfo, + TransactionRateInfo* batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, + GetHealthMetricsReply* detailedHealthMetricsReply, + TransactionTagMap* transactionTagCounter, + PrioritizedTransactionTagMap* throttledTags, ProxyStats* stats) { state Future nextRequestTimer = Never(); state Future leaseTimeout = Never(); state Future reply = Never(); @@ -290,6 +317,15 @@ ACTOR Future getRate(UID myID, Reference> db, int64 transactionRateInfo->setRate(rep.transactionRate); batchTransactionRateInfo->setRate(rep.batchTransactionRate); //TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC); + + stats->transactionRateAllowed = rep.transactionRate; + stats->batchTransactionRateAllowed = rep.batchTransactionRate; + // TraceEvent("MasterProxyTxRate", myID) + // .detail("RKID", db->get().ratekeeper.get().id()) + // .detail("RateAllowed", rep.transactionRate) + // .detail("BatchRateAllowed", rep.batchTransactionRate) + // .detail("Lease", rep.leaseDuration) + // .detail("ReleasedTransactions", *inTransactionCount - lastTC); lastTC = *inTransactionCount; leaseTimeout = delay(rep.leaseDuration); nextRequestTimer = delayJittered(rep.leaseDuration / 2); @@ -1556,13 +1592,10 @@ ACTOR Future sendGrvReplies(Future replyFuture, std:: return Void(); } -ACTOR static Future transactionStarter( - MasterProxyInterface proxy, - Reference> db, - PromiseStream> addActor, - ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply, - GetHealthMetricsReply* detailedHealthMetricsReply) -{ +ACTOR static Future transactionStarter(MasterProxyInterface proxy, Reference> db, + PromiseStream> addActor, ProxyCommitData* commitData, + GetHealthMetricsReply* healthMetricsReply, + GetHealthMetricsReply* detailedHealthMetricsReply, ProxyStats* stats) { state double lastGRVTime = 0; state PromiseStream GRVTimer; state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN; @@ -1581,8 +1614,9 @@ ACTOR static Future transactionStarter( state PrioritizedTransactionTagMap throttledTags; state PromiseStream replyTimes; - - addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags)); + addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, + healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags, + stats)); addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo, &transactionTagCounter)); @@ -1611,6 +1645,9 @@ ACTOR static Future transactionStarter( normalRateInfo.reset(); batchRateInfo.reset(); + stats->transactionLimit = normalRateInfo.limit; + stats->batchTransactionLimit = batchRateInfo.limit; + int transactionsStarted[2] = {0,0}; int systemTransactionsStarted[2] = {0,0}; int defaultPriTransactionsStarted[2] = { 0, 0 }; @@ -1621,6 +1658,8 @@ ACTOR static Future transactionStarter( int requestsToStart = 0; + uint32_t defaultQueueSize = defaultQueue.size(); + uint32_t batchQueueSize = batchQueue.size(); while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) { Deque* transactionQueue; if(!systemQueue.empty()) { @@ -1649,12 +1688,16 @@ ACTOR static Future transactionStarter( } transactionsStarted[req.flags&1] += tc; - if (req.priority >= TransactionPriority::IMMEDIATE) + double currentTime = g_network->timer(); + if (req.priority >= TransactionPriority::IMMEDIATE) { systemTransactionsStarted[req.flags & 1] += tc; - else if (req.priority >= TransactionPriority::DEFAULT) + } else if (req.priority >= TransactionPriority::DEFAULT) { defaultPriTransactionsStarted[req.flags & 1] += tc; - else + stats->defaultTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime()); + } else { batchPriTransactionsStarted[req.flags & 1] += tc; + stats->batchTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime()); + } start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value"); transactionQueue->pop_front(); @@ -1691,6 +1734,8 @@ ACTOR static Future transactionStarter( g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast"); } + int defaultGRVProcessed = 0; + int batchGRVProcessed = 0; for (int i = 0; i < start.size(); i++) { if (start[i].size()) { Future readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]); @@ -1701,8 +1746,12 @@ ACTOR static Future transactionStarter( if (i == 0) { addActor.send(timeReply(readVersionReply, replyTimes)); } + defaultGRVProcessed += defaultPriTransactionsStarted[i]; + batchGRVProcessed += batchPriTransactionsStarted[i]; } } + stats->percentageOfDefaultGRVQueueProcessed = (double)defaultGRVProcessed / defaultQueueSize; + stats->percentageOfBatchGRVQueueProcessed = (double)batchGRVProcessed / batchQueueSize; } } @@ -2113,7 +2162,8 @@ ACTOR Future masterProxyServerCore( TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit); addActor.send(monitorRemoteCommitted(&commitData)); - addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply)); + addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, + &detailedHealthMetricsReply, &commitData.stats)); addActor.send(readRequestServer(proxy, addActor, &commitData)); addActor.send(rejoinServer(proxy, &commitData)); addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply)); diff --git a/fdbserver/StorageMetrics.actor.h b/fdbserver/StorageMetrics.actor.h index f4c58f2da6..67ee26a19f 100644 --- a/fdbserver/StorageMetrics.actor.h +++ b/fdbserver/StorageMetrics.actor.h @@ -27,6 +27,11 @@ #include "fdbserver/Knobs.h" #include "flow/actorcompiler.h" // This must be the last #include. +const StringRef STORAGESERVER_HISTOGRAM_GROUP = LiteralStringRef("StorageServer"); +const StringRef FETCH_KEYS_LATENCY_HISTOGRAM = LiteralStringRef("FetchKeysLatency"); +const StringRef FETCH_KEYS_BYTES_HISTOGRAM = LiteralStringRef("FetchKeysSize"); +const StringRef FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM = LiteralStringRef("FetchKeysBandwidth"); + struct StorageMetricSample { IndexedSet sample; int64_t metricUnitsPerSample; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 6ea751eef5..1c4c88e873 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -41,6 +41,7 @@ #include "fdbserver/WaitFailure.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/FDBExecHelper.actor.h" +#include "flow/Histogram.h" #include "flow/actorcompiler.h" // This must be the last #include. using std::pair; @@ -332,6 +333,7 @@ struct TLogData : NonCopyable { FlowLock concurrentLogRouterReads; FlowLock persistentDataCommitLock; + // Beginning of fields used by snapshot based backup and restore bool ignorePopRequest; // ignore pop request from storage servers double ignorePopDeadline; // time until which the ignorePopRequest will be // honored @@ -343,19 +345,26 @@ struct TLogData : NonCopyable { std::map toBePopped; // map of Tag->Version for all the pops // that came when ignorePopRequest was set Reference> degraded; + // End of fields used by snapshot based backup and restore + std::vector tempTagMessages; - TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference> dbInfo, Reference> degraded, std::string folder) - : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), - persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), - dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0), - diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0), - peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES), - concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), - ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped() - { - cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true); - } + Reference commitLatencyDist; + + TLogData(UID dbgid, UID workerID, IKeyValueStore* persistentData, IDiskQueue* persistentQueue, + Reference> dbInfo, Reference> degraded, std::string folder) + : dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()), + persistentData(persistentData), rawPersistentQueue(persistentQueue), + persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), + queueCommitEnd(0), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), + targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0), + peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES), + concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopRequest(false), + ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped(), + commitLatencyDist(Histogram::getHistogram(LiteralStringRef("tLog"), LiteralStringRef("commit"), + Histogram::Unit::microseconds)) { + cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true); + } }; struct LogData : NonCopyable, public ReferenceCounted { @@ -441,13 +450,19 @@ struct LogData : NonCopyable, public ReferenceCounted { bool stopped, initialized; DBRecoveryCount recoveryCount; + // If persistentDataVersion != persistentDurableDataVersion, + // then spilling is happening from persistentDurableDataVersion to persistentDataVersion. + // Data less than persistentDataDurableVersion is spilled on disk (or fully popped from the TLog); VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // The last version number in the portion of the log (written|durable) to persistentData - NotifiedVersion version, queueCommittedVersion; + NotifiedVersion version; + NotifiedVersion queueCommittedVersion; // The disk queue has committed up until the queueCommittedVersion version. Version queueCommittingVersion; - Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion; - Version queuePoppedVersion; + Version knownCommittedVersion; // The maximum version that a proxy has told us that is committed (all TLogs have + // ack'd a commit for this version). + Version durableKnownCommittedVersion, minKnownCommittedVersion; + Version queuePoppedVersion; // The disk queue has been popped up until the location which represents this version. Version minPoppedTagVersion; - Tag minPoppedTag; + Tag minPoppedTag; // The tag that makes tLog hold its data and cause tLog's disk queue increasing. Deque>>> messageBlocks; std::vector>> tag_data; //tag.locality | tag.id @@ -490,7 +505,8 @@ struct LogData : NonCopyable, public ReferenceCounted { Version unrecoveredBefore, recoveredAt; struct PeekTrackerData { - std::map>> sequence_version; + std::map>> + sequence_version; // second: Version is peeked begin version. bool is onlySpilled double lastUpdate; Tag tag; @@ -565,12 +581,15 @@ struct LogData : NonCopyable, public ReferenceCounted { queueCommittedVersion.initMetric(LiteralStringRef("TLog.QueueCommittedVersion"), cc.id); specialCounter(cc, "Version", [this](){ return this->version.get(); }); - specialCounter(cc, "QueueCommittedVersion", [this](){ return this->queueCommittedVersion.get(); }); + specialCounter(cc, "QueueCommittedVersion", [this]() { return this->queueCommittedVersion.get(); }); specialCounter(cc, "PersistentDataVersion", [this](){ return this->persistentDataVersion; }); specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; }); specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; }); specialCounter(cc, "QueuePoppedVersion", [this](){ return this->queuePoppedVersion; }); - specialCounter(cc, "MinPoppedTagVersion", [this](){ return this->minPoppedTagVersion; }); + specialCounter(cc, "MinPoppedTagVersion", [this]() { return this->minPoppedTagVersion; }); + // The locality and id of the tag that is responsible for making the TLog hold onto its oldest piece of data. + // If disk queues are growing and no one is sure why, then you shall look at this to find the tag responsible + // for why the TLog thinks it can't throw away data. specialCounter(cc, "MinPoppedTagLocality", [this](){ return this->minPoppedTag.locality; }); specialCounter(cc, "MinPoppedTagId", [this](){ return this->minPoppedTag.id; }); specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; }); @@ -792,6 +811,9 @@ ACTOR Future updatePoppedLocation( TLogData* self, Reference logD return Void(); } +// It runs against the oldest TLog instance, calculates the first location in the disk queue that contains un-popped +// data, and then issues a pop to the disk queue at that location so that anything earlier can be +// removed/forgotten/overwritten. In effect, it applies the effect of TLogPop RPCs to disk. ACTOR Future popDiskQueue( TLogData* self, Reference logData ) { if (!logData->initialized) return Void(); @@ -1084,9 +1106,11 @@ ACTOR Future tLogPop( TLogData* self, TLogPopRequest req, Reference updateStorage( TLogData* self ) { while(self->spillOrder.size() && !self->id_data.count(self->spillOrder.front())) { self->spillOrder.pop_front(); @@ -1868,7 +1892,11 @@ ACTOR Future tLogCommit( return Void(); } - if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!) + state double beforeCommitT = now(); + + // Not a duplicate (check relies on critical section between here self->version.set() below!) + state bool isNotDuplicate = (logData->version.get() == req.prevVersion); + if (isNotDuplicate) { if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before"); @@ -1906,6 +1934,10 @@ ACTOR Future tLogCommit( return Void(); } + if (isNotDuplicate) { + self->commitLatencyDist->sampleSeconds(now() - beforeCommitT); + } + if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After"); @@ -2266,6 +2298,7 @@ void removeLog( TLogData* self, Reference logData ) { } } +// remote tLog pull data from log routers ACTOR Future pullAsyncData( TLogData* self, Reference logData, std::vector tags, Version beginVersion, Optional endVersion, bool poppedIsKnownCommitted ) { state Future dbInfoChange = Void(); state Reference r; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index ab79ab7170..0725d36965 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -166,7 +166,7 @@ OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData) struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { const UID dbgid; LogSystemType logSystemType; - std::vector> tLogs; // LogSets in different locations: primary, remote or satellite + std::vector> tLogs; // LogSets in different locations: primary, satellite, or remote int expectedLogSets; int logRouterTags; int txsTags; @@ -195,7 +195,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted, std::pair > outstandingPops; // For each currently running popFromLog actor, (log server #, tag)->popped version + // For each currently running popFromLog actor, outstandingPops is + // (logID, tag)->(max popped version, durableKnownCommittedVersion). + // Why do we need durableKnownCommittedVersion? knownCommittedVersion gives the lower bound of what data + // will need to be copied into the next generation to restore the replication factor. + // Guess: It probably serves as a minimum version of what data should be on a TLog in the next generation and + // sending a pop for anything less than durableKnownCommittedVersion for the TLog will be absurd. + std::map, std::pair> outstandingPops; + Optional>> addActor; ActorCollection popActors; std::vector oldLogData; // each element has the log info. in one old epoch. @@ -270,6 +277,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoutstandingPops[].first ACTOR static Future popFromLog(TagPartitionedLogSystem* self, Reference>> log, Tag tag, double time) { @@ -1141,6 +1153,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted to = self->outstandingPops[ std::make_pair(log->get().id(),tag) ]; if (to.first <= last) { diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 79e816fa12..f7b1157af7 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -366,6 +366,14 @@ void failAfter( Future trigger, Endpoint e ) { failAfter( trigger, g_simulator.getProcess( e ) ); } +ACTOR Future histogramReport() { + loop { + wait(delay(SERVER_KNOBS->HISTOGRAM_REPORT_INTERVAL)); + + GetHistogramRegistry().logReport(); + } +} + void testSerializationSpeed() { double tstart; double build = 0, serialize = 0, deserialize = 0, copy = 0, deallocate = 0; @@ -1728,6 +1736,8 @@ int main(int argc, char* argv[]) { if (role == Simulation) { TraceEvent("Simulation").detail("TestFile", opts.testFile); + auto histogramReportActor = histogramReport(); + clientKnobs->trace(); flowKnobs->trace(); serverKnobs->trace(); @@ -1885,6 +1895,7 @@ int main(int argc, char* argv[]) { actors.push_back(fdbd(opts.connectionFile, opts.localities, opts.processClass, dataFolder, dataFolder, opts.storageMemLimit, opts.metricsConnFile, opts.metricsPrefix, opts.rsssize, opts.whitelistBinPaths)); + actors.push_back(histogramReport()); // actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement f = stopAfter(waitForAll(actors)); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index ac6c0a9434..0f1fb7b6bc 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -19,11 +19,16 @@ */ #include +#include +#include +#include + #include "fdbrpc/fdbrpc.h" #include "fdbrpc/LoadBalance.h" -#include "flow/IndexedSet.h" -#include "flow/Hash3.h" #include "flow/ActorCollection.h" +#include "flow/Hash3.h" +#include "flow/Histogram.h" +#include "flow/IndexedSet.h" #include "flow/SystemMonitor.h" #include "flow/Util.h" #include "fdbclient/Atomic.h" @@ -52,11 +57,8 @@ #include "fdbrpc/Smoother.h" #include "fdbrpc/Stats.h" #include "flow/TDMetric.actor.h" -#include -#include "flow/actorcompiler.h" // This must be the last #include. -using std::pair; -using std::make_pair; +#include "flow/actorcompiler.h" // This must be the last #include. #ifndef __INTEL_COMPILER #pragma region Data Structures @@ -224,13 +226,13 @@ struct UpdateEagerReadInfo { void finishKeyBegin() { std::sort(keyBegin.begin(), keyBegin.end()); keyBegin.resize( std::unique(keyBegin.begin(), keyBegin.end()) - keyBegin.begin() ); - std::sort(keys.begin(), keys.end(), [](const pair& lhs, const pair& rhs) { return (lhs.first < rhs.first) || (lhs.first == rhs.first && lhs.second > rhs.second); } ); - keys.resize(std::unique(keys.begin(), keys.end(), [](const pair& lhs, const pair& rhs) { return lhs.first == rhs.first; } ) - keys.begin()); + std::sort(keys.begin(), keys.end(), [](const std::pair& lhs, const std::pair& rhs) { return (lhs.first < rhs.first) || (lhs.first == rhs.first && lhs.second > rhs.second); } ); + keys.resize(std::unique(keys.begin(), keys.end(), [](const std::pair& lhs, const std::pair& rhs) { return lhs.first == rhs.first; } ) - keys.begin()); //value gets populated in doEagerReads } Optional& getValue(KeyRef key) { - int i = std::lower_bound(keys.begin(), keys.end(), pair(key, 0), [](const pair& lhs, const pair& rhs) { return lhs.first < rhs.first; } ) - keys.begin(); + int i = std::lower_bound(keys.begin(), keys.end(),std::pair(key, 0), [](const std::pair& lhs, const std::pair& rhs) { return lhs.first < rhs.first; } ) - keys.begin(); ASSERT( i < keys.size() && keys[i].first == key ); return value[i]; } @@ -284,9 +286,63 @@ private: std::map> mutationLog; // versions (durableVersion, version] public: +public: + // Histograms + struct FetchKeysHistograms { + const Reference latency; + const Reference bytes; + const Reference bandwidth; + + FetchKeysHistograms() + : latency(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_LATENCY_HISTOGRAM, + Histogram::Unit::microseconds)), + bytes(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_HISTOGRAM, + Histogram::Unit::bytes)), + bandwidth(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM, + Histogram::Unit::bytes_per_second)) {} + } fetchKeysHistograms; + + class CurrentRunningFetchKeys { + std::unordered_map startTimeMap; + std::unordered_map keyRangeMap; + + static const StringRef emptyString; + static const KeyRangeRef emptyKeyRange; + public: + void recordStart(const UID id, const KeyRange keyRange) { + startTimeMap[id] = now(); + keyRangeMap[id] = keyRange; + } + + void recordFinish(const UID id) { + startTimeMap.erase(id); + keyRangeMap.erase(id); + } + + std::pair longestTime() const { + if (numRunning() == 0) { + return {-1, emptyKeyRange}; + } + + const double currentTime = now(); + double longest = 0; + UID UIDofLongest; + for (const auto kv: startTimeMap) { + const double currentRunningTime = currentTime - kv.second; + if (longest < currentRunningTime) { + longest = currentRunningTime; + UIDofLongest = kv.first; + } + } + return {longest, keyRangeMap.at(UIDofLongest)}; + } + + int numRunning() const { return startTimeMap.size(); } + } currentRunningFetchKeys; + Tag tag; - vector> history; - vector> allHistory; + vector> history; + vector> allHistory; Version poppedAllAfter; std::map freeable; // for each version, an Arena that must be held until that version is < oldestVersion Arena lastArena; @@ -333,8 +389,8 @@ public: poppedAllAfter = std::numeric_limits::max(); } - vector>* hist = &history; - vector> allHistoryCopy; + vector>* hist = &history; + vector> allHistoryCopy; if(popAllTags) { allHistoryCopy = allHistory; hist = &allHistoryCopy; @@ -601,22 +657,18 @@ public: } } counters; - StorageServer(IKeyValueStore* storage, Reference> const& db, StorageServerInterface const& ssi) - : instanceID(deterministicRandom()->randomUniqueID().first()), - storage(this, storage), db(db), actors(false), - lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0), - rebootAfterDurableVersion(std::numeric_limits::max()), - durableInProgress(Void()), - versionLag(0), primaryLocality(tagLocalityInvalid), - updateEagerReads(0), - shardChangeCounter(0), - fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), - shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), - logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()), - readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), - behind(false), versionBehind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), - lastUpdate(now()), poppedAllAfter(std::numeric_limits::max()), cpuUsage(0.0), diskUsage(0.0) - { + StorageServer(IKeyValueStore* storage, Reference> const& db, + StorageServerInterface const& ssi) + : fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage), + db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0), + rebootAfterDurableVersion(std::numeric_limits::max()), durableInProgress(Void()), versionLag(0), + primaryLocality(tagLocalityInvalid), updateEagerReads(0), shardChangeCounter(0), + fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES), shuttingDown(false), + debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), logProtocol(0), + counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()), + readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), behind(false), versionBehind(false), + byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), lastUpdate(now()), + poppedAllAfter(std::numeric_limits::max()), cpuUsage(0.0), diskUsage(0.0) { version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id); oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id); durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id); @@ -729,6 +781,9 @@ public: } }; +const StringRef StorageServer::CurrentRunningFetchKeys::emptyString = LiteralStringRef(""); +const KeyRangeRef StorageServer::CurrentRunningFetchKeys::emptyKeyRange = KeyRangeRef(StorageServer::CurrentRunningFetchKeys::emptyString, StorageServer::CurrentRunningFetchKeys::emptyString); + // If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end, // and H(key) < |key+value|/bytesPerSample, // let sampledSize = max(|key+value|,bytesPerSample) @@ -1791,7 +1846,7 @@ bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion ) setDataDurableVersion(data->thisServerID, data->durableVersion.get()); if (checkFatalError.isReady()) checkFatalError.get(); - //TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion); + // TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion); validate(data); return nextDurableVersion == desiredDurableVersion; @@ -2107,16 +2162,56 @@ ACTOR Future logFetchKeysWarning(AddingShard* shard) { loop { state double waitSeconds = BUGGIFY ? 5.0 : 600.0; wait(delay(waitSeconds)); - TraceEvent(waitSeconds > 300.0 ? SevWarnAlways : SevInfo, "FetchKeysTooLong").detail("Duration", now() - startTime).detail("Phase", shard->phase).detail("Begin", shard->keys.begin.printable()).detail("End", shard->keys.end.printable()); + + const auto traceEventLevel = waitSeconds > SERVER_KNOBS->FETCH_KEYS_TOO_LONG_TIME_CRITERIA ? SevWarnAlways : SevInfo; + TraceEvent(traceEventLevel, "FetchKeysTooLong") + .detail("Duration", now() - startTime) + .detail("Phase", shard->phase) + .detail("Begin", shard->keys.begin.printable()) + .detail("End", shard->keys.end.printable()); } } +class FetchKeysMetricReporter { + const UID uid; + const double startTime; + int fetchedBytes; + StorageServer::FetchKeysHistograms& histograms; + StorageServer::CurrentRunningFetchKeys& currentRunning; + +public: + FetchKeysMetricReporter(const UID& uid_, const double startTime_, const KeyRange& keyRange, StorageServer::FetchKeysHistograms& histograms_, StorageServer::CurrentRunningFetchKeys& currentRunning_) + : uid(uid_), startTime(startTime_), fetchedBytes(0), histograms(histograms_), currentRunning(currentRunning_) { + + currentRunning.recordStart(uid, keyRange); + } + + void addFetchedBytes(const int bytes) { fetchedBytes += bytes; } + + ~FetchKeysMetricReporter() { + double latency = now() - startTime; + + // If fetchKeys is *NOT* run, i.e. returning immediately, still report a record. + if (latency == 0) latency = 1e6; + + const uint32_t bandwidth = fetchedBytes / latency; + + histograms.latency->sampleSeconds(latency); + histograms.bytes->sample(fetchedBytes); + histograms.bandwidth->sample(bandwidth); + + currentRunning.recordFinish(uid); + } +}; + ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { + state const UID fetchKeysID = deterministicRandom()->randomUniqueID(); state TraceInterval interval("FetchKeys"); state KeyRange keys = shard->keys; state Future warningLogger = logFetchKeysWarning(shard); - state double startt = now(); + state const double startTime = now(); state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES; + state FetchKeysMetricReporter metricReporter(fetchKeysID, startTime, keys, data->fetchKeysHistograms, data->currentRunningFetchKeys); // delay(0) to force a return to the run loop before the work of fetchKeys is started. // This allows adding->start() to be called inline with CSK. @@ -2154,7 +2249,7 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { state double executeStart = now(); ++data->counters.fetchWaitingCount; - data->counters.fetchWaitingMS += 1000*(executeStart - startt); + data->counters.fetchWaitingMS += 1000 * (executeStart - startTime); // Fetch keys gets called while the update actor is processing mutations. data->version will not be updated until all mutations for a version // have been processed. We need to take the durableVersionLock to ensure data->version is greater than the version of the mutation which caused @@ -2194,6 +2289,7 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { debugKeyRange("fetchRange", fetchVersion, keys); for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value)); + metricReporter.addFetchedBytes(expectedSize); data->counters.bytesFetched += expectedSize; if( fetchBlockBytes > expectedSize ) { holdingFKPL.release( fetchBlockBytes - expectedSize ); @@ -2261,8 +2357,9 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front(); //FIXME: remove when we no longer support upgrades from 5.X - if(debug_getRangeRetries >= 100) { + if (debug_getRangeRetries >= 100) { data->cx->enableLocalityLoadBalance = false; + // TODO: Add SevWarnAlways to say it was disabled. } debug_getRangeRetries++; @@ -2379,7 +2476,7 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { TraceEvent(SevError, "FetchKeysError", data->thisServerID) .error(e) - .detail("Elapsed", now()-startt) + .detail("Elapsed", now() - startTime) .detail("KeyBegin", keys.begin) .detail("KeyEnd",keys.end); if (e.code() != error_code_actor_cancelled) @@ -3226,7 +3323,9 @@ bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion void StorageServerDisk::makeVersionDurable( Version version ) { storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())) ); - //TraceEvent("MakeDurable", data->thisServerID).detail("FromVersion", prevStorageVersion).detail("ToVersion", version); + // TraceEvent("MakeDurable", data->thisServerID) + // .detail("FromVersion", prevStorageVersion) + // .detail("ToVersion", version); } void StorageServerDisk::changeLogProtocol(Version version, ProtocolVersion protocol) { @@ -3736,6 +3835,35 @@ ACTOR Future serveWatchValueRequests( StorageServer* self, FutureStream reportStorageServerState(StorageServer* self) { + if (!SERVER_KNOBS->REPORT_DD_METRICS) { + return Void(); + } + + loop { + wait(delay(SERVER_KNOBS->DD_METRICS_REPORT_INTERVAL)); + + const auto numRunningFetchKeys = self->currentRunningFetchKeys.numRunning(); + if (numRunningFetchKeys == 0) { + continue; + } + + const auto longestRunningFetchKeys = self->currentRunningFetchKeys.longestTime(); + + auto level = SevInfo; + if (longestRunningFetchKeys.first >= SERVER_KNOBS->FETCH_KEYS_TOO_LONG_TIME_CRITERIA) { + level = SevWarnAlways; + } + + TraceEvent(level, "FetchKeyCurrentStatus") + .detail("Timestamp", now()) + .detail("LongestRunningTime", longestRunningFetchKeys.first) + .detail("StartKey", longestRunningFetchKeys.second.begin.printable()) + .detail("EndKey", longestRunningFetchKeys.second.end.printable()) + .detail("NumRunning", numRunningFetchKeys); + } +} + ACTOR Future storageServerCore( StorageServer* self, StorageServerInterface ssi ) { state Future doUpdate = Void(); @@ -3756,6 +3884,7 @@ ACTOR Future storageServerCore( StorageServer* self, StorageServerInterfac self->actors.add(serveGetKeyRequests(self, ssi.getKey.getFuture())); self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture())); self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id())); + self->actors.add(reportStorageServerState(self)); self->transactionTagCounter.startNewInterval(self->thisServerID); self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL)); diff --git a/flow/Histogram.cpp b/flow/Histogram.cpp index e5281093c9..4de14970ef 100644 --- a/flow/Histogram.cpp +++ b/flow/Histogram.cpp @@ -41,6 +41,8 @@ thread_local ISimulator::ProcessInfo* ISimulator::currentProcess = nullptr; // we have a simulated contex here; we'd just use the current context regardless. static HistogramRegistry* globalHistograms = nullptr; +#pragma region HistogramRegistry + HistogramRegistry& GetHistogramRegistry() { ISimulator::ProcessInfo* info = g_simulator.getCurrentProcess(); @@ -89,6 +91,16 @@ void HistogramRegistry::logReport() { } } +#pragma endregion // HistogramRegistry + +#pragma region Histogram + +const std::unordered_map Histogram::UnitToStringMapper = { + { Histogram::Unit::microseconds, "microseconds" }, + { Histogram::Unit::bytes, "bytes" }, + { Histogram::Unit::bytes_per_second, "bytes_per_second" } +}; + void Histogram::writeToLog() { bool active = false; for (uint32_t i = 0; i < 32; i++) { @@ -102,17 +114,19 @@ void Histogram::writeToLog() { } TraceEvent e(SevInfo, "Histogram"); - e.detail("Group", group).detail("Op", op); + e.detail("Group", group).detail("Op", op).detail("Unit", UnitToStringMapper.at(unit)); + for (uint32_t i = 0; i < 32; i++) { + uint32_t value = ((uint32_t)1) << (i + 1); + if (buckets[i]) { switch (unit) { - case Unit::microseconds: { - uint32_t usec = ((uint32_t)1) << (i + 1); - e.detail(format("LessThan%u.%03u", usec / 1000, usec % 1000), buckets[i]); + case Unit::microseconds: + e.detail(format("LessThan%u.%03u", value / 1000, value % 1000), buckets[i]); break; - } case Unit::bytes: - e.detail(format("LessThan%u", ((uint32_t)1) << (i + 1)), buckets[i]); + case Unit::bytes_per_second: + e.detail(format("LessThan%u", value), buckets[i]); break; default: ASSERT(false); @@ -121,6 +135,8 @@ void Histogram::writeToLog() { } } +#pragma endregion // Histogram + TEST_CASE("/flow/histogram/smoke_test") { { @@ -168,4 +184,4 @@ TEST_CASE("/flow/histogram/smoke_test") { GetHistogramRegistry().logReport(); return Void(); -} \ No newline at end of file +} diff --git a/flow/Histogram.h b/flow/Histogram.h index fd765f4d86..7cef45b49f 100644 --- a/flow/Histogram.h +++ b/flow/Histogram.h @@ -26,6 +26,7 @@ #include #include +#include #ifdef _WIN32 #include @@ -57,11 +58,16 @@ HistogramRegistry& GetHistogramRegistry(); */ class Histogram sealed : public ReferenceCounted { public: - enum class Unit { microseconds, bytes }; + enum class Unit { microseconds, bytes, bytes_per_second }; private: + static const std::unordered_map UnitToStringMapper; + Histogram(std::string group, std::string op, Unit unit, HistogramRegistry& registry) : group(group), op(op), unit(unit), registry(registry), ReferenceCounted() { + + ASSERT(UnitToStringMapper.find(unit) != UnitToStringMapper.end()); + clear(); } diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 9a6d47b837..3b82a315e2 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -173,6 +173,7 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) { init( TRACE_RETRY_OPEN_INTERVAL, 1.00 ); init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h init( MAX_TRACE_SUPPRESSIONS, 1e4 ); + init( TRACE_DATETIME_ENABLED, true ); // trace time in human readable format (always real time) init( TRACE_SYNC_ENABLED, 0 ); init( TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, 500 ); init( TRACE_EVENT_THROTTLER_SAMPLE_EXPIRY, 1800.0 ); // 30 mins diff --git a/flow/Knobs.h b/flow/Knobs.h index 8dd9eb2d10..b6078cdd5e 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -192,6 +192,7 @@ public: double TRACE_RETRY_OPEN_INTERVAL; int MIN_TRACE_SEVERITY; int MAX_TRACE_SUPPRESSIONS; + bool TRACE_DATETIME_ENABLED; int TRACE_SYNC_ENABLED; int TRACE_EVENT_METRIC_UNITS_PER_SAMPLE; int TRACE_EVENT_THROTTLER_SAMPLE_EXPIRY; diff --git a/flow/SystemMonitor.cpp b/flow/SystemMonitor.cpp index bc03585dc3..e8f99d9a3a 100644 --- a/flow/SystemMonitor.cpp +++ b/flow/SystemMonitor.cpp @@ -60,7 +60,6 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta netData.init(); if (!g_network->isSimulated() && currentStats.initialized) { { - GetHistogramRegistry().logReport(); TraceEvent(eventName.c_str()) .detail("Elapsed", currentStats.elapsed) .detail("CPUSeconds", currentStats.processCPUSeconds) diff --git a/flow/Trace.cpp b/flow/Trace.cpp index ffd826efb9..f2542196c4 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -30,7 +30,7 @@ #include #include #include - +#include #include "flow/IThreadPool.h" #include "flow/ThreadHelper.actor.h" #include "flow/FastRef.h" @@ -474,6 +474,7 @@ public: if (roll) { auto o = new WriterThread::Roll; + double time = 0; writer->post(o); std::vector events = latestEventCache.getAllUnsafe(); @@ -482,9 +483,15 @@ public: TraceEventFields rolledFields; for(auto itr = events[idx].begin(); itr != events[idx].end(); ++itr) { if(itr->first == "Time") { - rolledFields.addField("Time", format("%.6f", TraceEvent::getCurrentTime())); + time = TraceEvent::getCurrentTime(); + rolledFields.addField("Time", format("%.6f", time)); rolledFields.addField("OriginalTime", itr->second); } + else if (itr->first == "DateTime") { + UNSTOPPABLE_ASSERT(time > 0); // "Time" field should always come first + rolledFields.addField("DateTime", TraceEvent::printRealTime(time)); + rolledFields.addField("OriginalDateTime", itr->second); + } else if(itr->first == "TrackLatestType") { rolledFields.addField("TrackLatestType", "Rolled"); } @@ -912,6 +919,9 @@ bool TraceEvent::init() { detail("Severity", int(severity)); detail("Time", "0.000000"); timeIndex = fields.size() - 1; + if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) { + detail("DateTime", ""); + } detail("Type", type); if(g_network && g_network->isSimulated()) { @@ -1112,7 +1122,7 @@ TraceEvent& TraceEvent::GetLastError() { // We're cheating in counting, as in practice, we only use {10,20,30,40}. static_assert(SevMaxUsed / 10 + 1 == 5, "Please bump eventCounts[5] to SevMaxUsed/10+1"); -unsigned long TraceEvent::eventCounts[5] = {0,0,0,0,0}; +unsigned long TraceEvent::eventCounts[5] = { 0, 0, 0, 0, 0 }; unsigned long TraceEvent::CountEventsLoggedAt(Severity sev) { return TraceEvent::eventCounts[sev/10]; @@ -1130,7 +1140,11 @@ void TraceEvent::log() { ++g_allocation_tracing_disabled; try { if (enabled) { - fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime()); + double time = TraceEvent::getCurrentTime(); + fields.mutate(timeIndex).second = format("%.6f", time); + if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) { + fields.mutate(timeIndex + 1).second = TraceEvent::printRealTime(time); + } if (this->severity == SevError) { severity = SevInfo; @@ -1201,6 +1215,31 @@ double TraceEvent::getCurrentTime() { } } +// converts the given flow time into a string +// with format: %Y-%m-%dT%H:%M:%S +// This only has second-resolution for the simple reason +// that std::put_time does not support higher resolution. +// This is fine since we always log the flow time as well. +std::string TraceEvent::printRealTime(double time) { + using Clock = std::chrono::system_clock; + time_t ts = Clock::to_time_t(Clock::time_point( + std::chrono::duration_cast(std::chrono::duration>(time)))); + if (g_network && g_network->isSimulated()) { + // The clock is simulated, so return the real time + ts = Clock::to_time_t(Clock::now()); + } + std::stringstream ss; +#ifdef _WIN32 + // MSVC gmtime is threadsafe + ss << std::put_time(::gmtime(&ts), "%Y-%m-%dT%H:%M:%SZ"); +#else + // use threadsafe gmt + struct tm result; + ss << std::put_time(::gmtime_r(&ts, &result), "%Y-%m-%dT%H:%M:%SZ"); +#endif + return ss.str(); +} + TraceInterval& TraceInterval::begin() { pairID = nondeterministicRandom()->randomUniqueID(); count = 0; @@ -1278,6 +1317,9 @@ void TraceBatch::dump() { TraceBatch::EventInfo::EventInfo(double time, const char *name, uint64_t id, const char *location) { fields.addField("Severity", format("%d", (int)SevInfo)); fields.addField("Time", format("%.6f", time)); + if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) { + fields.addField("DateTime", TraceEvent::printRealTime(time)); + } fields.addField("Type", name); fields.addField("ID", format("%016" PRIx64, id)); fields.addField("Location", location); @@ -1286,6 +1328,9 @@ TraceBatch::EventInfo::EventInfo(double time, const char *name, uint64_t id, con TraceBatch::AttachInfo::AttachInfo(double time, const char *name, uint64_t id, uint64_t to) { fields.addField("Severity", format("%d", (int)SevInfo)); fields.addField("Time", format("%.6f", time)); + if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) { + fields.addField("DateTime", TraceEvent::printRealTime(time)); + } fields.addField("Type", name); fields.addField("ID", format("%016" PRIx64, id)); fields.addField("To", format("%016" PRIx64, to)); @@ -1294,6 +1339,9 @@ TraceBatch::AttachInfo::AttachInfo(double time, const char *name, uint64_t id, u TraceBatch::BuggifyInfo::BuggifyInfo(double time, int activated, int line, std::string file) { fields.addField("Severity", format("%d", (int)SevInfo)); fields.addField("Time", format("%.6f", time)); + if (FLOW_KNOBS->TRACE_DATETIME_ENABLED) { + fields.addField("DateTime", TraceEvent::printRealTime(time)); + } fields.addField("Type", "BuggifySection"); fields.addField("Activated", format("%d", activated)); fields.addField("File", std::move(file)); diff --git a/flow/Trace.h b/flow/Trace.h index e5029afcaa..5a9d8e4f35 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -394,6 +395,7 @@ struct TraceEvent { static bool isNetworkThread(); static double getCurrentTime(); + static std::string printRealTime(double time); //Must be called directly after constructing the trace event TraceEvent& error(const class Error& e, bool includeCancelled=false) { diff --git a/flow/error_definitions.h b/flow/error_definitions.h index cd2d81ba5a..2041ce9578 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -92,6 +92,7 @@ ERROR( master_resolver_failed, 1210, "Master terminating because a Resolver fail ERROR( server_overloaded, 1211, "Server is under too much load and cannot respond" ) ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup worker failed") ERROR( tag_throttled, 1213, "Transaction tag is being throttled" ) +ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" ) // 15xx Platform errors ERROR( platform_error, 1500, "Platform error" ) diff --git a/versions.target b/versions.target new file mode 100644 index 0000000000..8b1467c77e --- /dev/null +++ b/versions.target @@ -0,0 +1,7 @@ + + + + 6.3.10 + 6.3 + +