From b7478f5dd3f40a39cf03c666c038c793f03535d7 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Fri, 19 Jul 2019 16:22:15 -0700 Subject: [PATCH] DD:Add comments to help understand code Add comments to explain the functionalities of some code. --- fdbserver/DataDistribution.actor.cpp | 8 +++++++- fdbserver/DataDistributionQueue.actor.cpp | 20 ++++++++++++++++---- fdbserver/DataDistributionTracker.actor.cpp | 4 +++- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index ebb599781d..f2a6a682a8 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -132,7 +132,7 @@ ACTOR Future updateServerMetrics( Reference server ) { return Void(); } -// Machine team information +// TeamCollection's machine team information class TCMachineTeamInfo : public ReferenceCounted { public: vector> machines; @@ -167,6 +167,7 @@ public: bool operator==(TCMachineTeamInfo& rhs) const { return this->machineIDs == rhs.machineIDs; } }; +// TeamCollection's server team info. class TCTeamInfo : public ReferenceCounted, public IDataDistributionTeam { private: vector< Reference > servers; @@ -3456,6 +3457,7 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl return Void(); } +// Recruit a worker as a storage server ACTOR Future storageRecruiter( DDTeamCollection* self, Reference> db ) { state Future fCandidateWorker; state RecruitStorageRequest lastRequest; @@ -3598,6 +3600,8 @@ ACTOR Future dataDistributionTeamCollection( wait( self->readyToStart || error ); TraceEvent("DDTeamCollectionReadyToStart", self->distributorId).detail("Primary", self->primary); + // removeBadTeams() does not always run. We may need to restart the actor when needed. + // So we need the badTeamRemover variable to check if the actor is ready. if(self->badTeamRemover.isReady()) { self->badTeamRemover = removeBadTeams(self); self->addActor.send(self->badTeamRemover); @@ -3618,6 +3622,8 @@ ACTOR Future dataDistributionTeamCollection( self->addActor.send(updateReplicasKey(self, self->includedDCs[0])); } + // The following actors (e.g. storageRecruiter) do not need to be assigned to a variable because + // they are always running. self->addActor.send(storageRecruiter( self, db )); self->addActor.send(monitorStorageServerRecruitment( self )); self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() )); diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index d11fc63146..47420fcdbb 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -286,6 +286,7 @@ int getWorkFactor( RelocateData const& relocation ) { return WORK_FULL_UTILIZATION / relocation.src.size() / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER; } +// Data movement's resource control: Do not overload source servers used for the RelocateData // return true if servers are not too busy to launch the relocation bool canLaunch( RelocateData & relocation, int teamSize, std::map & busymap, std::vector cancellableRelocations ) { @@ -354,10 +355,10 @@ struct DDQueueData { std::set> fetchingSourcesQueue; std::set> fetchKeysComplete; KeyRangeActorMap getSourceActors; - std::map>> queue; + std::map>> queue; //Key UID is serverID, value is the serverID's set of RelocateData to relocate KeyRangeMap< RelocateData > inFlight; - KeyRangeActorMap inFlightActors; + KeyRangeActorMap inFlightActors; //Key: RelocatData, Value: Actor to move the data Promise error; PromiseStream dataTransferComplete; @@ -547,7 +548,10 @@ struct DDQueueData { ASSERT(servers.size() > 0); } - //If the size of keyServerEntries is large, then just assume we are using all storage servers + // If the size of keyServerEntries is large, then just assume we are using all storage servers + // Why the size can be large? + // When a shard is inflight and DD crashes, some destination servers may have already got the data. + // The new DD will treat the destination servers as source servers. So the size can be large. else { Standalone serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY ); @@ -783,6 +787,7 @@ struct DDQueueData { } } + // Data movement avoids overloading source servers in moving data. // SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the queue // FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY if( !canLaunch( rd, teamSize, busymap, cancellableRelocations ) ) { @@ -790,6 +795,9 @@ struct DDQueueData { continue; } + // From now on, the source servers for the RelocateData rd have enough resource to move the data away, + // because they do not have too much inflight data movement. + //logRelocation( rd, "LaunchingRelocation" ); //TraceEvent(rd.interval.end(), distributorId).detail("Result","Success"); @@ -845,7 +853,7 @@ struct DDQueueData { extern bool noUnseed; // This actor relocates the specified keys to a good place. -// These live in the inFlightActor key range map. +// The inFlightActor key range map stores the actor for each RelocateData ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd ) { state Promise errorOut( self->error ); @@ -992,6 +1000,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd state Error error = success(); state Promise dataMovementComplete; + // Move keys from source to destination by chaning the serverKeyList and keyServerList system keys state Future doMoveKeys = moveKeys(self->cx, rd.keys, destIds, healthyIds, self->lock, dataMovementComplete, &self->startMoveKeysParallelismLock, &self->finishMoveKeysParallelismLock, self->teamCollections.size() > 1, relocateShardInterval.pairID ); state Future pollHealth = signalledTransferComplete ? Never() : delay( SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch ); try { @@ -1082,6 +1091,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd } } +// Move a random shard of sourceTeam's to destTeam if sourceTeam has much more data than destTeam ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference sourceTeam, Reference destTeam, bool primary ) { if(g_network->isSimulated() && g_simulator.speedUpSimulation) { return false; @@ -1229,6 +1239,7 @@ ACTOR Future dataDistributionQueue( // For the given servers that caused us to go around the loop, find the next item(s) that can be launched. if( launchData.startTime != -1 ) { + // Launch dataDistributionRelocator actor to relocate the launchData self.launchQueuedWork( launchData ); launchData = RelocateData(); } @@ -1252,6 +1263,7 @@ ACTOR Future dataDistributionQueue( launchQueuedWorkTimeout = Never(); } when ( RelocateData results = waitNext( self.fetchSourceServersComplete.getFuture() ) ) { + // This when is triggered by queueRelocation() which is triggered by sending self.input self.completeSourceFetch( results ); launchData = results; } diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index ca4a849a33..66fdf3d0d9 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -26,6 +26,7 @@ #include "flow/ActorCollection.h" #include "flow/actorcompiler.h" // This must be the last #include. +// The used bandwidth of a shard. The higher the value is, the busier the shard is. enum BandwidthStatus { BandwidthStatusLow, BandwidthStatusNormal, @@ -427,7 +428,7 @@ Future shardMerger( if( endingStats.bytes >= shardBounds.min.bytes || getBandwidthStatus( endingStats ) != BandwidthStatusLow || shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT ) { - // The merged range is larger than the min bounds se we cannot continue merging in this direction. + // The merged range is larger than the min bounds so we cannot continue merging in this direction. // This means that: // 1. If we were going forwards (the starting direction), we roll back the last speculative merge. // In this direction we do not want to go above this boundary since we will merge at least one in @@ -752,6 +753,7 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) { check(); } +// Move keys to destinationTeams by updating shard_teams void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector destinationTeams ) { /*TraceEvent("ShardsAffectedByTeamFailureMove") .detail("KeyBegin", keys.begin)