Make more DDTeamCollection methods private.

The methods only used by DDTeamCollection::run can now be made private.
This commit is contained in:
sfc-gh-tclinkenbeard 2022-02-10 16:18:46 -08:00
parent c4508330d2
commit 641a38bd0b
3 changed files with 79 additions and 77 deletions

View File

@ -2893,10 +2893,10 @@ public:
}
ACTOR static Future<Void> run(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const* ddEnabledState) {
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -2974,16 +2974,15 @@ public:
}
TraceEvent("TotalDataInFlight", self->distributorId)
.detail("Primary", self->primary)
.detail("TotalBytes", self->getDebugTotalDataInFlight())
.detail("UnhealthyServers", self->unhealthyServers)
.detail("ServerCount", self->server_info.size())
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("HighestPriority", highestPriority)
.trackLatest(
self->primary
? "TotalDataInFlight"
: "TotalDataInFlightRemote"); // This trace event's trackLatest lifetime is controlled by
.detail("Primary", self->primary)
.detail("TotalBytes", self->getDebugTotalDataInFlight())
.detail("UnhealthyServers", self->unhealthyServers)
.detail("ServerCount", self->server_info.size())
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("HighestPriority", highestPriority)
.trackLatest(self->primary ? "TotalDataInFlight"
: "TotalDataInFlightRemote"); // This trace event's trackLatest
// lifetime is controlled by
// DataDistributorData::totalDataInFlightEventHolder or
// DataDistributorData::totalDataInFlightRemoteEventHolder.
// The track latest key we use here must match the key used in
@ -4858,9 +4857,9 @@ bool DDTeamCollection::exclusionSafetyCheck(std::vector<UID>& excludeServerIDs)
}
Future<Void> DDTeamCollection::run(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const &ddEnabledState) {
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const& ddEnabledState) {
return DDTeamCollectionImpl::run(teamCollection, initData, tci, recruitStorage, &ddEnabledState);
}

View File

@ -383,6 +383,49 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
// Read storage metadata from database, and do necessary updates
Future<Void> readOrCreateStorageMetadata(TCServerInfo* server);
Future<Void> serverGetTeamRequests(TeamCollectionInterface tci);
Future<Void> removeBadTeams();
Future<Void> machineTeamRemover();
// Remove the server team whose members have the most number of process teams
// until the total number of server teams is no larger than the desired number
Future<Void> serverTeamRemover();
Future<Void> removeWrongStoreType();
// Check if the number of server (and machine teams) is larger than the maximum allowed number
void traceTeamCollectionInfo() const;
Future<Void> updateReplicasKey(Optional<Key> dcId);
Future<Void> storageRecruiter(Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const& ddEnabledState);
// Monitor whether or not storage servers are being recruited. If so, then a database cannot be considered quiet
Future<Void> monitorStorageServerRecruitment();
// The serverList system keyspace keeps the StorageServerInterface for each serverID. Storage server's storeType
// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk,
// process on each machine creates the TCServer with the correct serverID and StorageServerInterface.
Future<Void> waitServerListChange(FutureStream<Void> serverRemoved, DDEnabledState const& ddEnabledState);
Future<Void> trackExcludedServers();
Future<Void> monitorHealthyTeams();
// This coroutine sets a watch to monitor the value change of `perpetualStorageWiggleKey` which is controlled by
// command `configure perpetual_storage_wiggle=$value` if the value is 1, this actor start 2 actors,
// `perpetualStorageWiggleIterator` and `perpetualStorageWiggler`. Otherwise, it sends stop signal to them.
Future<Void> monitorPerpetualStorageWiggle();
Future<Void> waitHealthyZoneChange();
int64_t getDebugTotalDataInFlight() const;
void noHealthyTeams() const;
public:
// clang-format off
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3};
@ -517,8 +560,6 @@ public:
Future<Void> getTeam(GetTeamRequest);
int64_t getDebugTotalDataInFlight() const;
Future<Void> init(Reference<InitialDataDistribution> initTeams, DDEnabledState const& ddEnabledState);
// Assume begin to end is sorted by std::sort
@ -547,14 +588,17 @@ public:
void addTeam(std::set<UID> const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); }
// FIXME: Public for testing only
// Group storage servers (process) based on their machineId in LocalityData
// All created machines are healthy
// Return The number of healthy servers we grouped into machines
int constructMachinesFromServers();
// FIXME: Public for testing only
// To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) const;
// FIXME: Public for testing only
// Create machineTeamsToBuild number of machine teams
// No operation if machineTeamsToBuild is 0
// Note: The creation of machine teams should not depend on server teams:
@ -566,6 +610,7 @@ public:
// return number of added machine teams
int addBestMachineTeams(int machineTeamsToBuild);
// FIXME: Public for testing only
// Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams
bool sanityCheckTeams() const;
@ -576,11 +621,6 @@ public:
// build an extra machine team and record the event in trace
int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams);
// Check if the number of server (and machine teams) is larger than the maximum allowed number
void traceTeamCollectionInfo() const;
void noHealthyTeams() const;
void addServer(StorageServerInterface newServer,
ProcessClass processClass,
Promise<Void> errorOut,
@ -589,6 +629,7 @@ public:
bool removeTeam(Reference<TCTeamInfo> team);
// FIXME: Public for testing only
// Check if the server belongs to a machine; if not, create the machine.
// Establish the two-direction link between server and machine
Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server);
@ -597,50 +638,14 @@ public:
void removeServer(UID removedServer);
Future<Void> removeWrongStoreType();
Future<Void> removeBadTeams();
Future<Void> machineTeamRemover();
// Remove the server team whose members have the most number of process teams
// until the total number of server teams is no larger than the desired number
Future<Void> serverTeamRemover();
Future<Void> trackExcludedServers();
// This coroutine sets a watch to monitor the value change of `perpetualStorageWiggleKey` which is controlled by
// command `configure perpetual_storage_wiggle=$value` if the value is 1, this actor start 2 actors,
// `perpetualStorageWiggleIterator` and `perpetualStorageWiggler`. Otherwise, it sends stop signal to them.
Future<Void> monitorPerpetualStorageWiggle();
// The serverList system keyspace keeps the StorageServerInterface for each serverID. Storage server's storeType
// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk,
// process on each machine creates the TCServer with the correct serverID and StorageServerInterface.
Future<Void> waitServerListChange(FutureStream<Void> serverRemoved, DDEnabledState const& ddEnabledState);
Future<Void> waitHealthyZoneChange();
// Monitor whether or not storage servers are being recruited. If so, then a database cannot be considered quiet
Future<Void> monitorStorageServerRecruitment();
Future<Void> storageRecruiter(Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const& ddEnabledState);
Future<Void> updateReplicasKey(Optional<Key> dcId);
Future<Void> serverGetTeamRequests(TeamCollectionInterface tci);
Future<Void> monitorHealthyTeams();
// Find size of set intersection of excludeServerIDs and serverIDs on each team and see if the leftover team is
// valid
bool exclusionSafetyCheck(std::vector<UID>& excludeServerIDs);
// Keep track of servers and teams -- serves requests for getRandomTeam
static Future<Void> run(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const &ddEnabledState);
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const& ddEnabledState);
};

View File

@ -1020,22 +1020,20 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
getUnhealthyRelocationCount);
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(
reportErrorsExcept(DDTeamCollection::run(
remoteTeamCollection, initData, tcis[1], recruitStorage, *ddEnabledState),
"DDTeamCollectionSecondary",
self->ddId,
&normalDDQueueErrors()));
actors.push_back(reportErrorsExcept(
DDTeamCollection::run(remoteTeamCollection, initData, tcis[1], recruitStorage, *ddEnabledState),
"DDTeamCollectionSecondary",
self->ddId,
&normalDDQueueErrors()));
actors.push_back(printSnapshotTeamsInfo(remoteTeamCollection));
}
primaryTeamCollection->teamCollections = teamCollectionsPtrs;
self->teamCollection = primaryTeamCollection.getPtr();
actors.push_back(
reportErrorsExcept(DDTeamCollection::run(
primaryTeamCollection, initData, tcis[0], recruitStorage, *ddEnabledState),
"DDTeamCollectionPrimary",
self->ddId,
&normalDDQueueErrors()));
actors.push_back(reportErrorsExcept(
DDTeamCollection::run(primaryTeamCollection, initData, tcis[0], recruitStorage, *ddEnabledState),
"DDTeamCollectionPrimary",
self->ddId,
&normalDDQueueErrors()));
actors.push_back(printSnapshotTeamsInfo(primaryTeamCollection));
actors.push_back(yieldPromiseStream(output.getFuture(), input));