Merge pull request #1874 from xumengpanda/mengxu/DD-code-read

DataDistribution:Add comments to help understand the code
This commit is contained in:
Evan Tschannen 2019-07-26 13:30:44 -07:00 committed by GitHub
commit 04dd293af0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 6 deletions

View File

@ -135,7 +135,7 @@ ACTOR Future<Void> updateServerMetrics( Reference<TCServerInfo> server ) {
return Void();
}
// Machine team information
// TeamCollection's machine team information
class TCMachineTeamInfo : public ReferenceCounted<TCMachineTeamInfo> {
public:
vector<Reference<TCMachineInfo>> machines;
@ -170,6 +170,7 @@ public:
bool operator==(TCMachineTeamInfo& rhs) const { return this->machineIDs == rhs.machineIDs; }
};
// TeamCollection's server team info.
class TCTeamInfo : public ReferenceCounted<TCTeamInfo>, public IDataDistributionTeam {
private:
vector< Reference<TCServerInfo> > servers;
@ -3459,6 +3460,7 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection* self, RecruitStorageRepl
return Void();
}
// Recruit a worker as a storage server
ACTOR Future<Void> storageRecruiter( DDTeamCollection* self, Reference<AsyncVar<struct ServerDBInfo>> db ) {
state Future<RecruitStorageReply> fCandidateWorker;
state RecruitStorageRequest lastRequest;
@ -3601,6 +3603,8 @@ ACTOR Future<Void> dataDistributionTeamCollection(
wait( self->readyToStart || error );
TraceEvent("DDTeamCollectionReadyToStart", self->distributorId).detail("Primary", self->primary);
// removeBadTeams() does not always run. We may need to restart the actor when needed.
// So we need the badTeamRemover variable to check if the actor is ready.
if(self->badTeamRemover.isReady()) {
self->badTeamRemover = removeBadTeams(self);
self->addActor.send(self->badTeamRemover);
@ -3621,6 +3625,8 @@ ACTOR Future<Void> dataDistributionTeamCollection(
self->addActor.send(updateReplicasKey(self, self->includedDCs[0]));
}
// The following actors (e.g. storageRecruiter) do not need to be assigned to a variable because
// they are always running.
self->addActor.send(storageRecruiter( self, db ));
self->addActor.send(monitorStorageServerRecruitment( self ));
self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() ));

View File

@ -286,6 +286,7 @@ int getWorkFactor( RelocateData const& relocation ) {
return WORK_FULL_UTILIZATION / relocation.src.size() / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
}
// Data movement's resource control: Do not overload source servers used for the RelocateData
// return true if servers are not too busy to launch the relocation
bool canLaunch( RelocateData & relocation, int teamSize, std::map<UID, Busyness> & busymap,
std::vector<RelocateData> cancellableRelocations ) {
@ -354,10 +355,10 @@ struct DDQueueData {
std::set<RelocateData, std::greater<RelocateData>> fetchingSourcesQueue;
std::set<RelocateData, std::greater<RelocateData>> fetchKeysComplete;
KeyRangeActorMap getSourceActors;
std::map<UID, std::set<RelocateData, std::greater<RelocateData>>> queue;
std::map<UID, std::set<RelocateData, std::greater<RelocateData>>> queue; //Key UID is serverID, value is the serverID's set of RelocateData to relocate
KeyRangeMap< RelocateData > inFlight;
KeyRangeActorMap inFlightActors;
KeyRangeActorMap inFlightActors; //Key: RelocatData, Value: Actor to move the data
Promise<Void> error;
PromiseStream<RelocateData> dataTransferComplete;
@ -552,7 +553,10 @@ struct DDQueueData {
ASSERT(servers.size() > 0);
}
//If the size of keyServerEntries is large, then just assume we are using all storage servers
// If the size of keyServerEntries is large, then just assume we are using all storage servers
// Why the size can be large?
// When a shard is inflight and DD crashes, some destination servers may have already got the data.
// The new DD will treat the destination servers as source servers. So the size can be large.
else {
Standalone<RangeResultRef> serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
@ -787,6 +791,7 @@ struct DDQueueData {
}
}
// Data movement avoids overloading source servers in moving data.
// SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the queue
// FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY
if( !canLaunch( rd, teamSize, busymap, cancellableRelocations ) ) {
@ -794,6 +799,9 @@ struct DDQueueData {
continue;
}
// From now on, the source servers for the RelocateData rd have enough resource to move the data away,
// because they do not have too much inflight data movement.
//logRelocation( rd, "LaunchingRelocation" );
//TraceEvent(rd.interval.end(), distributorId).detail("Result","Success");
@ -849,7 +857,7 @@ struct DDQueueData {
extern bool noUnseed;
// This actor relocates the specified keys to a good place.
// These live in the inFlightActor key range map.
// The inFlightActor key range map stores the actor for each RelocateData
ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd )
{
state Promise<Void> errorOut( self->error );
@ -996,6 +1004,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
state Error error = success();
state Promise<Void> dataMovementComplete;
// Move keys from source to destination by chaning the serverKeyList and keyServerList system keys
state Future<Void> doMoveKeys = moveKeys(self->cx, rd.keys, destIds, healthyIds, self->lock, dataMovementComplete, &self->startMoveKeysParallelismLock, &self->finishMoveKeysParallelismLock, self->teamCollections.size() > 1, relocateShardInterval.pairID );
state Future<Void> pollHealth = signalledTransferComplete ? Never() : delay( SERVER_KNOBS->HEALTH_POLL_TIME, TaskPriority::DataDistributionLaunch );
try {
@ -1086,6 +1095,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
}
}
// Move a random shard of sourceTeam's to destTeam if sourceTeam has much more data than destTeam
ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<IDataDistributionTeam> sourceTeam, Reference<IDataDistributionTeam> destTeam, bool primary ) {
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
return false;
@ -1233,6 +1243,7 @@ ACTOR Future<Void> dataDistributionQueue(
// For the given servers that caused us to go around the loop, find the next item(s) that can be launched.
if( launchData.startTime != -1 ) {
// Launch dataDistributionRelocator actor to relocate the launchData
self.launchQueuedWork( launchData );
launchData = RelocateData();
}
@ -1256,6 +1267,7 @@ ACTOR Future<Void> dataDistributionQueue(
launchQueuedWorkTimeout = Never();
}
when ( RelocateData results = waitNext( self.fetchSourceServersComplete.getFuture() ) ) {
// This when is triggered by queueRelocation() which is triggered by sending self.input
self.completeSourceFetch( results );
launchData = results;
}

View File

@ -26,6 +26,7 @@
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// The used bandwidth of a shard. The higher the value is, the busier the shard is.
enum BandwidthStatus {
BandwidthStatusLow,
BandwidthStatusNormal,
@ -427,7 +428,7 @@ Future<Void> shardMerger(
if( endingStats.bytes >= shardBounds.min.bytes ||
getBandwidthStatus( endingStats ) != BandwidthStatusLow ||
shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT ) {
// The merged range is larger than the min bounds se we cannot continue merging in this direction.
// The merged range is larger than the min bounds so we cannot continue merging in this direction.
// This means that:
// 1. If we were going forwards (the starting direction), we roll back the last speculative merge.
// In this direction we do not want to go above this boundary since we will merge at least one in
@ -752,6 +753,7 @@ void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
check();
}
// Move keys to destinationTeams by updating shard_teams
void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team> destinationTeams ) {
/*TraceEvent("ShardsAffectedByTeamFailureMove")
.detail("KeyBegin", keys.begin)