diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 08eb9ce93f..eb9a281bee 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -135,7 +135,7 @@ ACTOR Future updateServerMetrics( Reference server ) { return Void(); } -// Machine team information +// TeamCollection's machine team information class TCMachineTeamInfo : public ReferenceCounted { public: vector> machines; @@ -170,6 +170,7 @@ public: bool operator==(TCMachineTeamInfo& rhs) const { return this->machineIDs == rhs.machineIDs; } }; +// TeamCollection's server team info. class TCTeamInfo : public ReferenceCounted, public IDataDistributionTeam { private: vector< Reference > servers; @@ -3459,6 +3460,7 @@ ACTOR Future initializeStorage( DDTeamCollection* self, RecruitStorageRepl return Void(); } +// Recruit a worker as a storage server ACTOR Future storageRecruiter( DDTeamCollection* self, Reference> db ) { state Future fCandidateWorker; state RecruitStorageRequest lastRequest; @@ -3601,6 +3603,8 @@ ACTOR Future dataDistributionTeamCollection( wait( self->readyToStart || error ); TraceEvent("DDTeamCollectionReadyToStart", self->distributorId).detail("Primary", self->primary); + // removeBadTeams() does not always run. We may need to restart the actor when needed. + // So we need the badTeamRemover variable to check if the actor is ready. if(self->badTeamRemover.isReady()) { self->badTeamRemover = removeBadTeams(self); self->addActor.send(self->badTeamRemover); @@ -3621,6 +3625,8 @@ ACTOR Future dataDistributionTeamCollection( self->addActor.send(updateReplicasKey(self, self->includedDCs[0])); } + // The following actors (e.g. storageRecruiter) do not need to be assigned to a variable because + // they are always running. self->addActor.send(storageRecruiter( self, db )); self->addActor.send(monitorStorageServerRecruitment( self )); self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() )); diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index fe964f6894..ce320eb663 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -286,6 +286,7 @@ int getWorkFactor( RelocateData const& relocation ) { return WORK_FULL_UTILIZATION / relocation.src.size() / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER; } +// Data movement's resource control: Do not overload source servers used for the RelocateData // return true if servers are not too busy to launch the relocation bool canLaunch( RelocateData & relocation, int teamSize, std::map & busymap, std::vector cancellableRelocations ) { @@ -354,10 +355,10 @@ struct DDQueueData { std::set> fetchingSourcesQueue; std::set> fetchKeysComplete; KeyRangeActorMap getSourceActors; - std::map>> queue; + std::map>> queue; //Key UID is serverID, value is the serverID's set of RelocateData to relocate KeyRangeMap< RelocateData > inFlight; - KeyRangeActorMap inFlightActors; + KeyRangeActorMap inFlightActors; //Key: RelocatData, Value: Actor to move the data Promise error; PromiseStream dataTransferComplete; @@ -552,7 +553,10 @@ struct DDQueueData { ASSERT(servers.size() > 0); } - //If the size of keyServerEntries is large, then just assume we are using all storage servers + // If the size of keyServerEntries is large, then just assume we are using all storage servers + // Why the size can be large? + // When a shard is inflight and DD crashes, some destination servers may have already got the data. + // The new DD will treat the destination servers as source servers. So the size can be large. else { Standalone serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY ); @@ -787,6 +791,7 @@ struct DDQueueData { } } + // Data movement avoids overloading source servers in moving data. // SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the queue // FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY if( !canLaunch( rd, teamSize, busymap, cancellableRelocations ) ) { @@ -794,6 +799,9 @@ struct DDQueueData { continue; } + // From now on, the source servers for the RelocateData rd have enough resource to move the data away, + // because they do not have too much inflight data movement. + //logRelocation( rd, "LaunchingRelocation" ); //TraceEvent(rd.interval.end(), distributorId).detail("Result","Success"); @@ -849,7 +857,7 @@ struct DDQueueData { extern bool noUnseed; // This actor relocates the specified keys to a good place. -// These live in the inFlightActor key range map. +// The inFlightActor key range map stores the actor for each RelocateData ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd ) { state Promise errorOut( self->error ); @@ -996,6 +1004,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd state Error error = success(); state Promise dataMovementComplete; + // Move keys from source to destination by chaning the serverKeyList and keyServerList system keys state Future doMoveKeys = moveKeys(self->cx, rd.keys, destIds, healthyIds, self->lock, dataMovementComplete, &self->startMoveKeysParallelismLock, &self->finishMoveKeysParallelismLock, self->teamCollections.size() > 1, relocateShardInterval.pairID ); state Future pollHealth = signalledTransferComplete ? Never() : delay( SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch ); try { @@ -1086,6 +1095,7 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd } } +// Move a random shard of sourceTeam's to destTeam if sourceTeam has much more data than destTeam ACTOR Future rebalanceTeams( DDQueueData* self, int priority, Reference sourceTeam, Reference destTeam, bool primary ) { if(g_network->isSimulated() && g_simulator.speedUpSimulation) { return false; @@ -1233,6 +1243,7 @@ ACTOR Future dataDistributionQueue( // For the given servers that caused us to go around the loop, find the next item(s) that can be launched. if( launchData.startTime != -1 ) { + // Launch dataDistributionRelocator actor to relocate the launchData self.launchQueuedWork( launchData ); launchData = RelocateData(); } @@ -1256,6 +1267,7 @@ ACTOR Future dataDistributionQueue( launchQueuedWorkTimeout = Never(); } when ( RelocateData results = waitNext( self.fetchSourceServersComplete.getFuture() ) ) { + // This when is triggered by queueRelocation() which is triggered by sending self.input self.completeSourceFetch( results ); launchData = results; } diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index ca4a849a33..66fdf3d0d9 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -26,6 +26,7 @@ #include "flow/ActorCollection.h" #include "flow/actorcompiler.h" // This must be the last #include. +// The used bandwidth of a shard. The higher the value is, the busier the shard is. enum BandwidthStatus { BandwidthStatusLow, BandwidthStatusNormal, @@ -427,7 +428,7 @@ Future shardMerger( if( endingStats.bytes >= shardBounds.min.bytes || getBandwidthStatus( endingStats ) != BandwidthStatusLow || shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT ) { - // The merged range is larger than the min bounds se we cannot continue merging in this direction. + // The merged range is larger than the min bounds so we cannot continue merging in this direction. // This means that: // 1. If we were going forwards (the starting direction), we roll back the last speculative merge. // In this direction we do not want to go above this boundary since we will merge at least one in @@ -752,6 +753,7 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) { check(); } +// Move keys to destinationTeams by updating shard_teams void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector destinationTeams ) { /*TraceEvent("ShardsAffectedByTeamFailureMove") .detail("KeyBegin", keys.begin)