Merge pull request #6379 from sfc-gh-tclinkenbeard/dd-refactor

Shrink public interface of `DDTeamCollection`
This commit is contained in:
Trevor Clinkenbeard 2022-02-11 11:44:15 -08:00 committed by GitHub
commit b8d8eafdec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 874 additions and 790 deletions

View File

@ -355,7 +355,7 @@ public:
for (; idx < self->badTeams.size(); idx++) { for (; idx < self->badTeams.size(); idx++) {
servers.clear(); servers.clear();
for (const auto& server : self->badTeams[idx]->getServers()) { for (const auto& server : self->badTeams[idx]->getServers()) {
if (server->inDesiredDC && !self->server_status.get(server->id).isUnhealthy()) { if (server->inDesiredDC && !self->server_status.get(server->getId()).isUnhealthy()) {
servers.push_back(server); servers.push_back(server);
} }
} }
@ -374,7 +374,7 @@ public:
for (int l = 0; l < testTeam.size(); l++) { for (int l = 0; l < testTeam.size(); l++) {
bool foundServer = false; bool foundServer = false;
for (auto it : servers) { for (auto it : servers) {
if (it->id == testTeam[l]) { if (it->getId() == testTeam[l]) {
foundServer = true; foundServer = true;
break; break;
} }
@ -400,7 +400,7 @@ public:
} else { } else {
tempSet->clear(); tempSet->clear();
for (auto it : servers) { for (auto it : servers) {
tempMap->add(it->lastKnownInterface.locality, &it->id); tempMap->add(it->lastKnownInterface.locality, &it->getId());
} }
std::vector<LocalityEntry> resultEntries, forcedEntries; std::vector<LocalityEntry> resultEntries, forcedEntries;
@ -418,7 +418,7 @@ public:
} else { } else {
serverIds.clear(); serverIds.clear();
for (auto it : servers) { for (auto it : servers) {
serverIds.push_back(it->id); serverIds.push_back(it->getId());
} }
TraceEvent(SevWarnAlways, "CannotAddSubset", self->distributorId) TraceEvent(SevWarnAlways, "CannotAddSubset", self->distributorId)
.detail("Servers", describe(serverIds)); .detail("Servers", describe(serverIds));
@ -453,7 +453,7 @@ public:
self->addActor.send(self->checkInvalidLocalities); self->addActor.send(self->checkInvalidLocalities);
} }
} }
self->addServer(server.first, server.second, self->serverTrackerErrorOut, 0, ddEnabledState); self->addServer(server.first, server.second, self->serverTrackerErrorOut, 0, *ddEnabledState);
} }
} }
@ -969,7 +969,7 @@ public:
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged; state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;
state Future<Void> storeTypeTracker = (isTss) ? Never() : keyValueStoreTypeTracker(self, server); state Future<Void> storeTypeTracker = (isTss) ? Never() : keyValueStoreTypeTracker(self, server);
state bool hasWrongDC = !self->isCorrectDC(server); state bool hasWrongDC = !self->isCorrectDC(*server);
state bool hasInvalidLocality = state bool hasInvalidLocality =
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality); !self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
state int targetTeamNumPerServer = state int targetTeamNumPerServer =
@ -980,7 +980,7 @@ public:
status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get(); status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get();
status.isWrongConfiguration = false; status.isWrongConfiguration = false;
status.isWiggling = false; status.isWiggling = false;
hasWrongDC = !self->isCorrectDC(server); hasWrongDC = !self->isCorrectDC(*server);
hasInvalidLocality = hasInvalidLocality =
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality); !self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
@ -996,26 +996,26 @@ public:
TraceEvent("SameAddress", self->distributorId) TraceEvent("SameAddress", self->distributorId)
.detail("Failed", statusInfo.isFailed) .detail("Failed", statusInfo.isFailed)
.detail("Undesired", statusInfo.isUndesired) .detail("Undesired", statusInfo.isUndesired)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("OtherServer", i.second->id) .detail("OtherServer", i.second->getId())
.detail("Address", server->lastKnownInterface.address()) .detail("Address", server->lastKnownInterface.address())
.detail("NumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(server->id)) .detail("NumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(server->getId()))
.detail("OtherNumShards", .detail("OtherNumShards",
self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id)) self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->getId()))
.detail("OtherHealthy", !self->server_status.get(i.second->id).isUnhealthy()); .detail("OtherHealthy", !self->server_status.get(i.second->getId()).isUnhealthy());
// wait for the server's ip to be changed // wait for the server's ip to be changed
otherChanges.push_back(self->server_status.onChange(i.second->id)); otherChanges.push_back(self->server_status.onChange(i.second->getId()));
if (!self->server_status.get(i.second->id).isUnhealthy()) { if (!self->server_status.get(i.second->getId()).isUnhealthy()) {
if (self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id) >= if (self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->getId()) >=
self->shardsAffectedByTeamFailure->getNumberOfShards(server->id)) { self->shardsAffectedByTeamFailure->getNumberOfShards(server->getId())) {
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId) TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("Address", server->lastKnownInterface.address()) .detail("Address", server->lastKnownInterface.address())
.detail("OtherServer", i.second->id) .detail("OtherServer", i.second->getId())
.detail("NumShards", .detail("NumShards",
self->shardsAffectedByTeamFailure->getNumberOfShards(server->id)) self->shardsAffectedByTeamFailure->getNumberOfShards(server->getId()))
.detail("OtherNumShards", .detail("OtherNumShards",
self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id)); self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->getId()));
status.isUndesired = true; status.isUndesired = true;
} else } else
@ -1035,7 +1035,7 @@ public:
if (self->optimalTeamCount > 0) { if (self->optimalTeamCount > 0) {
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId) TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("OptimalTeamCount", self->optimalTeamCount) .detail("OptimalTeamCount", self->optimalTeamCount)
.detail("Fitness", server->lastKnownClass.machineClassFitness(ProcessClass::Storage)); .detail("Fitness", server->lastKnownClass.machineClassFitness(ProcessClass::Storage));
status.isUndesired = true; status.isUndesired = true;
@ -1047,7 +1047,7 @@ public:
// replaced with a server having the correct type // replaced with a server having the correct type
if (hasWrongDC || hasInvalidLocality) { if (hasWrongDC || hasInvalidLocality) {
TraceEvent(SevWarn, "UndesiredDCOrLocality", self->distributorId) TraceEvent(SevWarn, "UndesiredDCOrLocality", self->distributorId)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("WrongDC", hasWrongDC) .detail("WrongDC", hasWrongDC)
.detail("InvalidLocality", hasInvalidLocality); .detail("InvalidLocality", hasInvalidLocality);
status.isUndesired = true; status.isUndesired = true;
@ -1055,7 +1055,7 @@ public:
} }
if (server->wrongStoreTypeToRemove.get()) { if (server->wrongStoreTypeToRemove.get()) {
TraceEvent(SevWarn, "WrongStoreTypeToRemove", self->distributorId) TraceEvent(SevWarn, "WrongStoreTypeToRemove", self->distributorId)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("StoreType", "?"); .detail("StoreType", "?");
status.isUndesired = true; status.isUndesired = true;
status.isWrongConfiguration = true; status.isWrongConfiguration = true;
@ -1065,7 +1065,7 @@ public:
// wiggler. // wiggler.
auto invalidWiggleServer = auto invalidWiggleServer =
[](const AddressExclusion& addr, const DDTeamCollection* tc, const TCServerInfo* server) { [](const AddressExclusion& addr, const DDTeamCollection* tc, const TCServerInfo* server) {
return !tc->wigglingId.present() || server->id != tc->wigglingId.get(); return !tc->wigglingId.present() || server->getId() != tc->wigglingId.get();
}; };
// If the storage server is in the excluded servers list, it is undesired // If the storage server is in the excluded servers list, it is undesired
NetworkAddress a = server->lastKnownInterface.address(); NetworkAddress a = server->lastKnownInterface.address();
@ -1115,7 +1115,7 @@ public:
if (worstStatus != DDTeamCollection::Status::NONE) { if (worstStatus != DDTeamCollection::Status::NONE) {
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId) TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("Excluded", worstAddr.toString()); .detail("Excluded", worstAddr.toString());
status.isUndesired = true; status.isUndesired = true;
status.isWrongConfiguration = true; status.isWrongConfiguration = true;
@ -1124,19 +1124,19 @@ public:
status.isWiggling = true; status.isWiggling = true;
TraceEvent("PerpetualStorageWiggleSS", self->distributorId) TraceEvent("PerpetualStorageWiggleSS", self->distributorId)
.detail("Primary", self->primary) .detail("Primary", self->primary)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("ProcessId", server->lastKnownInterface.locality.processId()) .detail("ProcessId", server->lastKnownInterface.locality.processId())
.detail("Address", worstAddr.toString()); .detail("Address", worstAddr.toString());
} else if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) { } else if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) {
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId) TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("Excluded", worstAddr.toString()); .detail("Excluded", worstAddr.toString());
wait(delay(0.0)); // Do not throw an error while still inside trackExcludedServers wait(delay(0.0)); // Do not throw an error while still inside trackExcludedServers
while (!ddEnabledState->isDDEnabled()) { while (!ddEnabledState->isDDEnabled()) {
wait(delay(1.0)); wait(delay(1.0));
} }
if (self->removeFailedServer.canBeSet()) { if (self->removeFailedServer.canBeSet()) {
self->removeFailedServer.send(server->id); self->removeFailedServer.send(server->getId());
} }
throw movekeys_conflict(); throw movekeys_conflict();
} }
@ -1161,7 +1161,7 @@ public:
when(wait(failureTracker || server->onTSSPairRemoved || server->killTss.getFuture())) { when(wait(failureTracker || server->onTSSPairRemoved || server->killTss.getFuture())) {
// The server is failed AND all data has been removed from it, so permanently remove it. // The server is failed AND all data has been removed from it, so permanently remove it.
TraceEvent("StatusMapChange", self->distributorId) TraceEvent("StatusMapChange", self->distributorId)
.detail("ServerID", server->id) .detail("ServerID", server->getId())
.detail("Status", "Removing"); .detail("Status", "Removing");
if (server->updated.canBeSet()) { if (server->updated.canBeSet()) {
@ -1171,18 +1171,18 @@ public:
// Remove server from FF/serverList // Remove server from FF/serverList
storageMetadataTracker.cancel(); storageMetadataTracker.cancel();
wait(removeStorageServer( wait(removeStorageServer(
cx, server->id, server->lastKnownInterface.tssPairID, self->lock, ddEnabledState)); cx, server->getId(), server->lastKnownInterface.tssPairID, self->lock, ddEnabledState));
TraceEvent("StatusMapChange", self->distributorId) TraceEvent("StatusMapChange", self->distributorId)
.detail("ServerID", server->id) .detail("ServerID", server->getId())
.detail("Status", "Removed"); .detail("Status", "Removed");
// Sets removeSignal (alerting dataDistributionTeamCollection to remove the storage server from // Sets removeSignal (alerting dataDistributionTeamCollection to remove the storage server from
// its own data structures) // its own data structures)
server->removed.send(Void()); server->removed.send(Void());
if (isTss) { if (isTss) {
self->removedTSS.send(server->id); self->removedTSS.send(server->getId());
} else { } else {
self->removedServers.send(server->id); self->removedServers.send(server->getId());
} }
return Void(); return Void();
} }
@ -1194,7 +1194,7 @@ public:
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() != bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get(); newInterface.first.locality.zoneId().get();
TraceEvent("StorageServerInterfaceChanged", self->distributorId) TraceEvent("StorageServerInterfaceChanged", self->distributorId)
.detail("ServerID", server->id) .detail("ServerID", server->getId())
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token) .detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token) .detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged) .detail("LocalityChanged", localityChanged)
@ -1239,7 +1239,7 @@ public:
// is; If the destination machine is new, create one; otherwise, add server to an // is; If the destination machine is new, create one; otherwise, add server to an
// existing one Update server's machine reference to the destination machine // existing one Update server's machine reference to the destination machine
Reference<TCMachineInfo> destMachine = Reference<TCMachineInfo> destMachine =
self->checkAndCreateMachine(self->server_info[server->id]); self->checkAndCreateMachine(self->server_info[server->getId()]);
ASSERT(destMachine.isValid()); ASSERT(destMachine.isValid());
} }
@ -1301,7 +1301,7 @@ public:
// keyValueStoreTypeTracker // keyValueStoreTypeTracker
storeTypeTracker = (isTss) ? Never() : keyValueStoreTypeTracker(self, server); storeTypeTracker = (isTss) ? Never() : keyValueStoreTypeTracker(self, server);
storageMetadataTracker = (isTss) ? Never() : readOrCreateStorageMetadata(self, server); storageMetadataTracker = (isTss) ? Never() : readOrCreateStorageMetadata(self, server);
hasWrongDC = !self->isCorrectDC(server); hasWrongDC = !self->isCorrectDC(*server);
hasInvalidLocality = !self->isValidLocality(self->configuration.storagePolicy, hasInvalidLocality = !self->isValidLocality(self->configuration.storagePolicy,
server->lastKnownInterface.locality); server->lastKnownInterface.locality);
self->restartTeamBuilder.trigger(); self->restartTeamBuilder.trigger();
@ -1310,11 +1310,11 @@ public:
self->restartRecruiting.trigger(); self->restartRecruiting.trigger();
} }
when(wait(otherChanges.empty() ? Never() : quorum(otherChanges, 1))) { when(wait(otherChanges.empty() ? Never() : quorum(otherChanges, 1))) {
TraceEvent("SameAddressChangedStatus", self->distributorId).detail("ServerID", server->id); TraceEvent("SameAddressChangedStatus", self->distributorId).detail("ServerID", server->getId());
} }
when(wait(server->wrongStoreTypeToRemove.onChange())) { when(wait(server->wrongStoreTypeToRemove.onChange())) {
TraceEvent("UndesiredStorageServerTriggered", self->distributorId) TraceEvent("UndesiredStorageServerTriggered", self->distributorId)
.detail("Server", server->id) .detail("Server", server->getId())
.detail("StoreType", server->storeType) .detail("StoreType", server->storeType)
.detail("ConfigStoreType", self->configuration.storageServerStoreType) .detail("ConfigStoreType", self->configuration.storageServerStoreType)
.detail("WrongStoreTypeRemoved", server->wrongStoreTypeToRemove.get()); .detail("WrongStoreTypeRemoved", server->wrongStoreTypeToRemove.get());
@ -1334,7 +1334,7 @@ public:
TraceEvent("StorageServerTrackerCancelled", self->distributorId) TraceEvent("StorageServerTrackerCancelled", self->distributorId)
.suppressFor(1.0) .suppressFor(1.0)
.detail("Primary", self->primary) .detail("Primary", self->primary)
.detail("Server", server->id) .detail("Server", server->getId())
.error(e, /*includeCancelled*/ true); .error(e, /*includeCancelled*/ true);
if (e.code() != error_code_actor_cancelled && errorOut.canBeSet()) { if (e.code() != error_code_actor_cancelled && errorOut.canBeSet()) {
errorOut.sendError(e); errorOut.sendError(e);
@ -1355,7 +1355,7 @@ public:
// Removing a server here when DD is not healthy may lead to rare failure scenarios, for example, // Removing a server here when DD is not healthy may lead to rare failure scenarios, for example,
// the server with wrong storeType is shutting down while this actor marks it as to-be-removed. // the server with wrong storeType is shutting down while this actor marks it as to-be-removed.
// In addition, removing servers cause extra data movement, which should be done while a cluster is healthy // In addition, removing servers cause extra data movement, which should be done while a cluster is healthy
wait(waitUntilHealthy(self)); wait(self->waitUntilHealthy());
bool foundSSToRemove = false; bool foundSSToRemove = false;
@ -1388,7 +1388,7 @@ public:
// NOTE: this actor returns when the cluster is healthy and stable (no server is expected to be removed in a period) // NOTE: this actor returns when the cluster is healthy and stable (no server is expected to be removed in a period)
// processingWiggle and processingUnhealthy indicate that some servers are going to be removed. // processingWiggle and processingUnhealthy indicate that some servers are going to be removed.
ACTOR static Future<Void> waitUntilHealthy(DDTeamCollection* self, double extraDelay = 0, bool waitWiggle = false) { ACTOR static Future<Void> waitUntilHealthy(DDTeamCollection const* self, double extraDelay, bool waitWiggle) {
state int waitCount = 0; state int waitCount = 0;
loop { loop {
while (self->zeroHealthyTeams->get() || self->processingUnhealthy->get() || while (self->zeroHealthyTeams->get() || self->processingUnhealthy->get() ||
@ -1424,7 +1424,7 @@ public:
ACTOR static Future<Void> removeBadTeams(DDTeamCollection* self) { ACTOR static Future<Void> removeBadTeams(DDTeamCollection* self) {
wait(self->initialFailureReactionDelay); wait(self->initialFailureReactionDelay);
wait(waitUntilHealthy(self)); wait(self->waitUntilHealthy());
wait(self->addSubsetComplete.getFuture()); wait(self->addSubsetComplete.getFuture());
TraceEvent("DDRemovingBadServerTeams", self->distributorId).detail("Primary", self->primary); TraceEvent("DDRemovingBadServerTeams", self->distributorId).detail("Primary", self->primary);
for (auto it : self->badTeams) { for (auto it : self->badTeams) {
@ -1622,7 +1622,7 @@ public:
// To avoid removing machine teams too fast, which is unlikely happen though // To avoid removing machine teams too fast, which is unlikely happen though
wait(delay(SERVER_KNOBS->TR_REMOVE_MACHINE_TEAM_DELAY, TaskPriority::DataDistribution)); wait(delay(SERVER_KNOBS->TR_REMOVE_MACHINE_TEAM_DELAY, TaskPriority::DataDistribution));
wait(waitUntilHealthy(self, SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_EXTRA_DELAY)); wait(self->waitUntilHealthy(SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_EXTRA_DELAY));
// Wait for the badTeamRemover() to avoid the potential race between adding the bad team (add the team // Wait for the badTeamRemover() to avoid the potential race between adding the bad team (add the team
// tracker) and remove bad team (cancel the team tracker). // tracker) and remove bad team (cancel the team tracker).
@ -1675,7 +1675,7 @@ public:
for (auto& s : team->getServers()) { for (auto& s : team->getServers()) {
if (s->teams.size() == 0) { if (s->teams.size() == 0) {
TraceEvent(SevError, "MachineTeamRemoverTooAggressive", self->distributorId) TraceEvent(SevError, "MachineTeamRemoverTooAggressive", self->distributorId)
.detail("Server", s->id) .detail("Server", s->getId())
.detail("ServerTeam", team->getDesc()); .detail("ServerTeam", team->getDesc());
self->traceAllInfo(true); self->traceAllInfo(true);
} }
@ -1750,7 +1750,7 @@ public:
wait(self->pauseWiggle->onChange()); wait(self->pauseWiggle->onChange());
} }
} else { } else {
wait(waitUntilHealthy(self, SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_EXTRA_DELAY)); wait(self->waitUntilHealthy(SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_EXTRA_DELAY));
} }
// Wait for the badTeamRemover() to avoid the potential race between // Wait for the badTeamRemover() to avoid the potential race between
// adding the bad team (add the team tracker) and remove bad team (cancel the team tracker). // adding the bad team (add the team tracker) and remove bad team (cancel the team tracker).
@ -2026,7 +2026,7 @@ public:
.detail("HealthyTeamCount", self->healthyTeamCount); .detail("HealthyTeamCount", self->healthyTeamCount);
} else { } else {
choose { choose {
when(wait(waitUntilHealthy(self))) { when(wait(self->waitUntilHealthy())) {
TEST(true); // start wiggling TEST(true); // start wiggling
wait(self->storageWiggler->startWiggle()); wait(self->storageWiggler->startWiggle());
auto fv = self->excludeStorageServersForWiggle(id); auto fv = self->excludeStorageServersForWiggle(id);
@ -2114,9 +2114,9 @@ public:
if (speed == 1 && stopWiggleSignal.get()) { // avoid duplicated start if (speed == 1 && stopWiggleSignal.get()) { // avoid duplicated start
stopWiggleSignal.set(false); stopWiggleSignal.set(false);
collection.add(teamCollection->perpetualStorageWiggleIterator( collection.add(teamCollection->perpetualStorageWiggleIterator(
&stopWiggleSignal, finishStorageWiggleSignal.getFuture())); stopWiggleSignal, finishStorageWiggleSignal.getFuture()));
collection.add( collection.add(
teamCollection->perpetualStorageWiggler(&stopWiggleSignal, finishStorageWiggleSignal)); teamCollection->perpetualStorageWiggler(stopWiggleSignal, finishStorageWiggleSignal));
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId) TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId)
.detail("Primary", teamCollection->primary); .detail("Primary", teamCollection->primary);
} else if (speed == 0) { } else if (speed == 0) {
@ -2371,7 +2371,7 @@ public:
candidateWorker.processClass, candidateWorker.processClass,
self->serverTrackerErrorOut, self->serverTrackerErrorOut,
newServer.get().addedVersion, newServer.get().addedVersion,
ddEnabledState); *ddEnabledState);
self->waitUntilRecruited.set(false); self->waitUntilRecruited.set(false);
// signal all done after adding tss to tracking info // signal all done after adding tss to tracking info
tssState->markComplete(); tssState->markComplete();
@ -2587,7 +2587,7 @@ public:
} else if (tssToKill > 0) { } else if (tssToKill > 0) {
auto itr = self->tss_info_by_pair.begin(); auto itr = self->tss_info_by_pair.begin();
for (int i = 0; i < tssToKill; i++, itr++) { for (int i = 0; i < tssToKill; i++, itr++) {
UID tssId = itr->second->id; UID tssId = itr->second->getId();
StorageServerInterface tssi = itr->second->lastKnownInterface; StorageServerInterface tssi = itr->second->lastKnownInterface;
if (self->shouldHandleServer(tssi) && self->server_and_tss_info.count(tssId)) { if (self->shouldHandleServer(tssi) && self->server_and_tss_info.count(tssId)) {
@ -2637,7 +2637,7 @@ public:
} }
wait(self->initialFailureReactionDelay && waitForAll(serverUpdates)); wait(self->initialFailureReactionDelay && waitForAll(serverUpdates));
wait(waitUntilHealthy(self)); wait(self->waitUntilHealthy());
TraceEvent("DDUpdatingReplicas", self->distributorId) TraceEvent("DDUpdatingReplicas", self->distributorId)
.detail("Primary", self->primary) .detail("Primary", self->primary)
.detail("DcId", dcId) .detail("DcId", dcId)
@ -2752,7 +2752,7 @@ public:
processClass, processClass,
self->serverTrackerErrorOut, self->serverTrackerErrorOut,
tr.getReadVersion().get(), tr.getReadVersion().get(),
ddEnabledState); *ddEnabledState);
} }
} }
@ -2862,18 +2862,18 @@ public:
serverMetadataKeys.begin, IncludeVersion()); serverMetadataKeys.begin, IncludeVersion());
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx); state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
state StorageMetadataType data(timer_int()); state StorageMetadataType data(timer_int());
// printf("------ read metadata %s\n", server->id.toString().c_str()); // printf("------ read metadata %s\n", server->getId().toString().c_str());
// read storage metadata // read storage metadata
loop { loop {
try { try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
auto property = metadataMap.getProperty(server->id); auto property = metadataMap.getProperty(server->getId());
Optional<StorageMetadataType> metadata = wait(property.get(tr)); Optional<StorageMetadataType> metadata = wait(property.get(tr));
// NOTE: in upgrade testing, there may not be any metadata // NOTE: in upgrade testing, there may not be any metadata
if (metadata.present()) { if (metadata.present()) {
data = metadata.get(); data = metadata.get();
} else { } else {
metadataMap.set(tr, server->id, data); metadataMap.set(tr, server->getId(), data);
} }
wait(tr->commit()); wait(tr->commit());
break; break;
@ -2883,15 +2883,344 @@ public:
} }
// add server to wiggler // add server to wiggler
if (self->storageWiggler->contains(server->id)) { if (self->storageWiggler->contains(server->getId())) {
self->storageWiggler->updateMetadata(server->id, data); self->storageWiggler->updateMetadata(server->getId(), data);
} else { } else {
self->storageWiggler->addServer(server->id, data); self->storageWiggler->addServer(server->getId(), data);
} }
return Never(); return Never();
} }
};
ACTOR static Future<Void> run(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
state Future<Void> error = actorCollection(self->addActor.getFuture());
try {
wait(self->init(initData, *ddEnabledState));
initData = Reference<InitialDataDistribution>();
self->addActor.send(self->serverGetTeamRequests(tci));
TraceEvent("DDTeamCollectionBegin", self->distributorId).detail("Primary", self->primary);
wait(self->readyToStart || error);
TraceEvent("DDTeamCollectionReadyToStart", self->distributorId).detail("Primary", self->primary);
// removeBadTeams() does not always run. We may need to restart the actor when needed.
// So we need the badTeamRemover variable to check if the actor is ready.
if (self->badTeamRemover.isReady()) {
self->badTeamRemover = self->removeBadTeams();
self->addActor.send(self->badTeamRemover);
}
self->addActor.send(self->machineTeamRemover());
self->addActor.send(self->serverTeamRemover());
if (self->wrongStoreTypeRemover.isReady()) {
self->wrongStoreTypeRemover = self->removeWrongStoreType();
self->addActor.send(self->wrongStoreTypeRemover);
}
self->traceTeamCollectionInfo();
if (self->includedDCs.size()) {
// start this actor before any potential recruitments can happen
self->addActor.send(self->updateReplicasKey(self->includedDCs[0]));
}
// The following actors (e.g. storageRecruiter) do not need to be assigned to a variable because
// they are always running.
self->addActor.send(self->storageRecruiter(recruitStorage, *ddEnabledState));
self->addActor.send(self->monitorStorageServerRecruitment());
self->addActor.send(self->waitServerListChange(serverRemoved.getFuture(), *ddEnabledState));
self->addActor.send(self->trackExcludedServers());
self->addActor.send(self->monitorHealthyTeams());
self->addActor.send(self->waitHealthyZoneChange());
self->addActor.send(self->monitorPerpetualStorageWiggle());
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
loop choose {
when(UID removedServer = waitNext(self->removedServers.getFuture())) {
TEST(true); // Storage server removed from database
self->removeServer(removedServer);
serverRemoved.send(Void());
self->restartRecruiting.trigger();
}
when(UID removedTSS = waitNext(self->removedTSS.getFuture())) {
TEST(true); // TSS removed from database
self->removeTSS(removedTSS);
serverRemoved.send(Void());
self->restartRecruiting.trigger();
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->zeroHealthyTeams->get()) {
self->restartRecruiting.trigger();
self->noHealthyTeams();
}
}
when(wait(loggingTrigger)) {
int highestPriority = 0;
for (auto it : self->priority_teams) {
if (it.second > 0) {
highestPriority = std::max(highestPriority, it.first);
}
}
TraceEvent("TotalDataInFlight", self->distributorId)
.detail("Primary", self->primary)
.detail("TotalBytes", self->getDebugTotalDataInFlight())
.detail("UnhealthyServers", self->unhealthyServers)
.detail("ServerCount", self->server_info.size())
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("HighestPriority", highestPriority)
.trackLatest(self->primary ? "TotalDataInFlight"
: "TotalDataInFlightRemote"); // This trace event's trackLatest
// lifetime is controlled by
// DataDistributorData::totalDataInFlightEventHolder or
// DataDistributorData::totalDataInFlightRemoteEventHolder.
// The track latest key we use here must match the key used in
// the holder.
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
}
when(wait(self->serverTrackerErrorOut.getFuture())) {} // Propagate errors from storageServerTracker
when(wait(error)) {}
}
} catch (Error& e) {
if (e.code() != error_code_movekeys_conflict)
TraceEvent(SevError, "DataDistributionTeamCollectionError", self->distributorId).error(e);
throw e;
}
}
// Take a snapshot of necessary data structures from `DDTeamCollection` and print them out with yields to avoid slow
// task on the run loop.
ACTOR static Future<Void> printSnapshotTeamsInfo(Reference<DDTeamCollection> self) {
state DatabaseConfiguration configuration;
state std::map<UID, Reference<TCServerInfo>> server_info;
state std::map<UID, ServerStatus> server_status;
state std::vector<Reference<TCTeamInfo>> teams;
state std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info;
state std::vector<Reference<TCMachineTeamInfo>> machineTeams;
// state std::vector<std::string> internedLocalityRecordKeyNameStrings;
// state int machineLocalityMapEntryArraySize;
// state std::vector<Reference<LocalityRecord>> machineLocalityMapRecordArray;
state int traceEventsPrinted = 0;
state std::vector<const UID*> serverIDs;
state double lastPrintTime = 0;
state ReadYourWritesTransaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Future<Void> watchFuture = tr.watch(triggerDDTeamInfoPrintKey);
wait(tr.commit());
wait(self->printDetailedTeamsInfo.onTrigger() || watchFuture);
tr.reset();
if (now() - lastPrintTime < SERVER_KNOBS->DD_TEAMS_INFO_PRINT_INTERVAL) {
continue;
}
lastPrintTime = now();
traceEventsPrinted = 0;
double snapshotStart = now();
configuration = self->configuration;
server_info = self->server_info;
teams = self->teams;
// Perform deep copy so we have a consistent snapshot, even if yields are performed
for (const auto& [machineId, info] : self->machine_info) {
machine_info.emplace(machineId, info->clone());
}
machineTeams = self->machineTeams;
// internedLocalityRecordKeyNameStrings = self->machineLocalityMap._keymap->_lookuparray;
// machineLocalityMapEntryArraySize = self->machineLocalityMap.size();
// machineLocalityMapRecordArray = self->machineLocalityMap.getRecordArray();
std::vector<const UID*> _uids = self->machineLocalityMap.getObjects();
serverIDs = _uids;
auto const& keys = self->server_status.getKeys();
for (auto const& key : keys) {
// Add to or update the local server_status map
server_status[key] = self->server_status.get(key);
}
TraceEvent("DDPrintSnapshotTeasmInfo", self->getDistributorId())
.detail("SnapshotSpeed", now() - snapshotStart)
.detail("Primary", self->isPrimary());
// Print to TraceEvents
TraceEvent("DDConfig", self->getDistributorId())
.detail("StorageTeamSize", configuration.storageTeamSize)
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER)
.detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER)
.detail("Primary", self->isPrimary());
TraceEvent("ServerInfo", self->getDistributorId())
.detail("Size", server_info.size())
.detail("Primary", self->isPrimary());
state int i;
state std::map<UID, Reference<TCServerInfo>>::iterator server = server_info.begin();
for (i = 0; i < server_info.size(); i++) {
TraceEvent("ServerInfo", self->getDistributorId())
.detail("ServerInfoIndex", i)
.detail("ServerID", server->first.toString())
.detail("ServerTeamOwned", server->second->teams.size())
.detail("MachineID", server->second->machine->machineID.contents().toString())
.detail("Primary", self->isPrimary());
server++;
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
server = server_info.begin();
for (i = 0; i < server_info.size(); i++) {
const UID& uid = server->first;
TraceEvent e("ServerStatus", self->getDistributorId());
e.detail("ServerUID", uid)
.detail("MachineIsValid", server_info[uid]->machine.isValid())
.detail("MachineTeamSize",
server_info[uid]->machine.isValid() ? server_info[uid]->machine->machineTeams.size()
: -1)
.detail("Primary", self->isPrimary());
// ServerStatus might not be known if server was very recently added and
// storageServerFailureTracker() has not yet updated self->server_status If the UID is not found, do
// not assume the server is healthy or unhealthy
auto it = server_status.find(uid);
if (it != server_status.end()) {
e.detail("Healthy", !it->second.isUnhealthy());
}
server++;
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
TraceEvent("ServerTeamInfo", self->getDistributorId())
.detail("Size", teams.size())
.detail("Primary", self->isPrimary());
for (i = 0; i < teams.size(); i++) {
const auto& team = teams[i];
TraceEvent("ServerTeamInfo", self->getDistributorId())
.detail("TeamIndex", i)
.detail("Healthy", team->isHealthy())
.detail("TeamSize", team->size())
.detail("MemberIDs", team->getServerIDsStr())
.detail("Primary", self->isPrimary());
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
TraceEvent("MachineInfo", self->getDistributorId())
.detail("Size", machine_info.size())
.detail("Primary", self->isPrimary());
state std::map<Standalone<StringRef>, Reference<TCMachineInfo>>::iterator machine =
machine_info.begin();
state bool isMachineHealthy = false;
for (i = 0; i < machine_info.size(); i++) {
Reference<TCMachineInfo> _machine = machine->second;
if (!_machine.isValid() || machine_info.find(_machine->machineID) == machine_info.end() ||
_machine->serversOnMachine.empty()) {
isMachineHealthy = false;
}
// Healthy machine has at least one healthy server
for (auto& server : _machine->serversOnMachine) {
// ServerStatus might not be known if server was very recently added and
// storageServerFailureTracker() has not yet updated self->server_status If the UID is not
// found, do not assume the server is healthy
auto it = server_status.find(server->getId());
if (it != server_status.end() && !it->second.isUnhealthy()) {
isMachineHealthy = true;
}
}
isMachineHealthy = false;
TraceEvent("MachineInfo", self->getDistributorId())
.detail("MachineInfoIndex", i)
.detail("Healthy", isMachineHealthy)
.detail("MachineID", machine->first.contents().toString())
.detail("MachineTeamOwned", machine->second->machineTeams.size())
.detail("ServerNumOnMachine", machine->second->serversOnMachine.size())
.detail("ServersID", machine->second->getServersIDStr())
.detail("Primary", self->isPrimary());
machine++;
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
TraceEvent("MachineTeamInfo", self->getDistributorId())
.detail("Size", machineTeams.size())
.detail("Primary", self->isPrimary());
for (i = 0; i < machineTeams.size(); i++) {
const auto& team = machineTeams[i];
TraceEvent("MachineTeamInfo", self->getDistributorId())
.detail("TeamIndex", i)
.detail("MachineIDs", team->getMachineIDsStr())
.detail("ServerTeams", team->serverTeams.size())
.detail("Primary", self->isPrimary());
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
// TODO: re-enable the following logging or remove them.
// TraceEvent("LocalityRecordKeyName", self->getDistributorId())
// .detail("Size", internedLocalityRecordKeyNameStrings.size())
// .detail("Primary", self->isPrimary());
// for (i = 0; i < internedLocalityRecordKeyNameStrings.size(); i++) {
// TraceEvent("LocalityRecordKeyIndexName", self->getDistributorId())
// .detail("KeyIndex", i)
// .detail("KeyName", internedLocalityRecordKeyNameStrings[i])
// .detail("Primary", self->isPrimary());
// if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
// wait(yield());
// }
// }
// TraceEvent("MachineLocalityMap", self->getDistributorId())
// .detail("Size", machineLocalityMapEntryArraySize)
// .detail("Primary", self->isPrimary());
// for (i = 0; i < serverIDs.size(); i++) {
// const auto& serverID = serverIDs[i];
// Reference<LocalityRecord> record = machineLocalityMapRecordArray[i];
// if (record.isValid()) {
// TraceEvent("MachineLocalityMap", self->getDistributorId())
// .detail("LocalityIndex", i)
// .detail("UID", serverID->toString())
// .detail("LocalityRecord", record->toString())
// .detail("Primary", self->isPrimary());
// } else {
// TraceEvent("MachineLocalityMap", self->getDistributorId())
// .detail("LocalityIndex", i)
// .detail("UID", serverID->toString())
// .detail("LocalityRecord", "[NotFound]")
// .detail("Primary", self->isPrimary());
// }
// if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
// wait(yield());
// }
// }
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
}; // class DDTeamCollectionImpl
Reference<TCMachineTeamInfo> DDTeamCollection::findMachineTeam( Reference<TCMachineTeamInfo> DDTeamCollection::findMachineTeam(
std::vector<Standalone<StringRef>> const& machineIDs) const { std::vector<Standalone<StringRef>> const& machineIDs) const {
@ -2973,7 +3302,7 @@ bool DDTeamCollection::isMachineHealthy(Reference<TCMachineInfo> const& machine)
// Healthy machine has at least one healthy server // Healthy machine has at least one healthy server
for (auto& server : machine->serversOnMachine) { for (auto& server : machine->serversOnMachine) {
if (!server_status.get(server->id).isUnhealthy()) { if (!server_status.get(server->getId()).isUnhealthy()) {
return true; return true;
} }
} }
@ -3023,8 +3352,8 @@ Future<Void> DDTeamCollection::addSubsetOfEmergencyTeams() {
} }
Future<Void> DDTeamCollection::init(Reference<InitialDataDistribution> initTeams, Future<Void> DDTeamCollection::init(Reference<InitialDataDistribution> initTeams,
DDEnabledState const* ddEnabledState) { DDEnabledState const& ddEnabledState) {
return DDTeamCollectionImpl::init(this, initTeams, ddEnabledState); return DDTeamCollectionImpl::init(this, initTeams, &ddEnabledState);
} }
Future<Void> DDTeamCollection::buildTeams() { Future<Void> DDTeamCollection::buildTeams() {
@ -3040,22 +3369,22 @@ Future<Void> DDTeamCollection::storageServerTracker(
TCServerInfo* server, // This actor is owned by this TCServerInfo, point to server_info[id] TCServerInfo* server, // This actor is owned by this TCServerInfo, point to server_info[id]
Promise<Void> errorOut, Promise<Void> errorOut,
Version addedVersion, Version addedVersion,
const DDEnabledState* ddEnabledState, DDEnabledState const& ddEnabledState,
bool isTss) { bool isTss) {
return DDTeamCollectionImpl::storageServerTracker(this, cx, server, errorOut, addedVersion, ddEnabledState, isTss); return DDTeamCollectionImpl::storageServerTracker(this, cx, server, errorOut, addedVersion, &ddEnabledState, isTss);
} }
Future<Void> DDTeamCollection::removeWrongStoreType() { Future<Void> DDTeamCollection::removeWrongStoreType() {
return DDTeamCollectionImpl::removeWrongStoreType(this); return DDTeamCollectionImpl::removeWrongStoreType(this);
} }
Future<Void> DDTeamCollection::waitUntilHealthy(double extraDelay, bool waitWiggle) { Future<Void> DDTeamCollection::waitUntilHealthy(double extraDelay, bool waitWiggle) const {
return DDTeamCollectionImpl::waitUntilHealthy(this, extraDelay, waitWiggle); return DDTeamCollectionImpl::waitUntilHealthy(this, extraDelay, waitWiggle);
} }
bool DDTeamCollection::isCorrectDC(TCServerInfo* server) const { bool DDTeamCollection::isCorrectDC(TCServerInfo const& server) const {
return (includedDCs.empty() || return (includedDCs.empty() ||
std::find(includedDCs.begin(), includedDCs.end(), server->lastKnownInterface.locality.dcId()) != std::find(includedDCs.begin(), includedDCs.end(), server.lastKnownInterface.locality.dcId()) !=
includedDCs.end()); includedDCs.end());
} }
@ -3094,18 +3423,18 @@ Future<Void> DDTeamCollection::updateNextWigglingStorageID() {
return DDTeamCollectionImpl::updateNextWigglingStorageID(this); return DDTeamCollectionImpl::updateNextWigglingStorageID(this);
} }
Future<Void> DDTeamCollection::perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal, Future<Void> DDTeamCollection::perpetualStorageWiggleIterator(AsyncVar<bool>& stopSignal,
FutureStream<Void> finishStorageWiggleSignal) { FutureStream<Void> finishStorageWiggleSignal) {
return DDTeamCollectionImpl::perpetualStorageWiggleIterator(this, stopSignal, finishStorageWiggleSignal); return DDTeamCollectionImpl::perpetualStorageWiggleIterator(this, &stopSignal, finishStorageWiggleSignal);
} }
Future<Void> DDTeamCollection::clusterHealthCheckForPerpetualWiggle(int* extraTeamCount) { Future<Void> DDTeamCollection::clusterHealthCheckForPerpetualWiggle(int& extraTeamCount) {
return DDTeamCollectionImpl::clusterHealthCheckForPerpetualWiggle(this, extraTeamCount); return DDTeamCollectionImpl::clusterHealthCheckForPerpetualWiggle(this, &extraTeamCount);
} }
Future<Void> DDTeamCollection::perpetualStorageWiggler(AsyncVar<bool>* stopSignal, Future<Void> DDTeamCollection::perpetualStorageWiggler(AsyncVar<bool>& stopSignal,
PromiseStream<Void> finishStorageWiggleSignal) { PromiseStream<Void> finishStorageWiggleSignal) {
return DDTeamCollectionImpl::perpetualStorageWiggler(this, stopSignal, finishStorageWiggleSignal); return DDTeamCollectionImpl::perpetualStorageWiggler(this, &stopSignal, finishStorageWiggleSignal);
} }
Future<Void> DDTeamCollection::monitorPerpetualStorageWiggle() { Future<Void> DDTeamCollection::monitorPerpetualStorageWiggle() {
@ -3113,8 +3442,8 @@ Future<Void> DDTeamCollection::monitorPerpetualStorageWiggle() {
} }
Future<Void> DDTeamCollection::waitServerListChange(FutureStream<Void> serverRemoved, Future<Void> DDTeamCollection::waitServerListChange(FutureStream<Void> serverRemoved,
DDEnabledState const* ddEnabledState) { DDEnabledState const& ddEnabledState) {
return DDTeamCollectionImpl::waitServerListChange(this, serverRemoved, ddEnabledState); return DDTeamCollectionImpl::waitServerListChange(this, serverRemoved, &ddEnabledState);
} }
Future<Void> DDTeamCollection::waitHealthyZoneChange() { Future<Void> DDTeamCollection::waitHealthyZoneChange() {
@ -3126,16 +3455,16 @@ Future<Void> DDTeamCollection::monitorStorageServerRecruitment() {
} }
Future<Void> DDTeamCollection::initializeStorage(RecruitStorageReply candidateWorker, Future<Void> DDTeamCollection::initializeStorage(RecruitStorageReply candidateWorker,
DDEnabledState const* ddEnabledState, DDEnabledState const& ddEnabledState,
bool recruitTss, bool recruitTss,
Reference<TSSPairState> tssState) { Reference<TSSPairState> tssState) {
return DDTeamCollectionImpl::initializeStorage(this, candidateWorker, ddEnabledState, recruitTss, tssState); return DDTeamCollectionImpl::initializeStorage(this, candidateWorker, &ddEnabledState, recruitTss, tssState);
} }
Future<Void> DDTeamCollection::storageRecruiter( Future<Void> DDTeamCollection::storageRecruiter(
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage, Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const* ddEnabledState) { DDEnabledState const& ddEnabledState) {
return DDTeamCollectionImpl::storageRecruiter(this, recruitStorage, ddEnabledState); return DDTeamCollectionImpl::storageRecruiter(this, recruitStorage, &ddEnabledState);
} }
Future<Void> DDTeamCollection::updateReplicasKey(Optional<Key> dcId) { Future<Void> DDTeamCollection::updateReplicasKey(Optional<Key> dcId) {
@ -3171,7 +3500,7 @@ void DDTeamCollection::resetLocalitySet() {
LocalityMap<UID>* storageServerMap = (LocalityMap<UID>*)storageServerSet.getPtr(); LocalityMap<UID>* storageServerMap = (LocalityMap<UID>*)storageServerSet.getPtr();
for (auto& it : server_info) { for (auto& it : server_info) {
it.second->localityEntry = storageServerMap->add(it.second->lastKnownInterface.locality, &it.second->id); it.second->localityEntry = storageServerMap->add(it.second->lastKnownInterface.locality, &it.second->getId());
} }
} }
@ -3206,26 +3535,26 @@ DDTeamCollection::DDTeamCollection(Database const& cx,
PromiseStream<GetMetricsRequest> getShardMetrics, PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer, Promise<UID> removeFailedServer,
PromiseStream<Promise<int>> getUnhealthyRelocationCount) PromiseStream<Promise<int>> getUnhealthyRelocationCount)
: cx(cx), distributorId(distributorId), configuration(configuration), doBuildTeams(true), lastBuildTeamsFailed(false), : doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), lock(lock), output(output),
teamBuilder(Void()), lock(lock), output(output), unhealthyServers(0), unhealthyServers(0), storageWiggler(makeReference<StorageWiggler>(this)), processingWiggle(processingWiggle),
storageWiggler(makeReference<StorageWiggler>(this)), processingWiggle(processingWiggle),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
initialFailureReactionDelay( initialFailureReactionDelay(
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)), delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)),
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay)), recruitingStream(0), initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay)), recruitingStream(0),
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), healthyTeamCount(0), zeroHealthyTeams(zeroHealthyTeams), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), healthyTeamCount(0), zeroHealthyTeams(zeroHealthyTeams),
optimalTeamCount(0), zeroOptimalTeams(true), isTssRecruiting(false), includedDCs(includedDCs), optimalTeamCount(0), zeroOptimalTeams(true), isTssRecruiting(false), includedDCs(includedDCs),
otherTrackedDCs(otherTrackedDCs), primary(primary), processingUnhealthy(processingUnhealthy), otherTrackedDCs(otherTrackedDCs), processingUnhealthy(processingUnhealthy), readyToStart(readyToStart),
readyToStart(readyToStart), checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)), checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)), badTeamRemover(Void()),
badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()), clearHealthyZoneFuture(true),
storageServerSet(new LocalityMap<UID>()), clearHealthyZoneFuture(true),
medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0), medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0),
lowestUtilizationTeam(0), highestUtilizationTeam(0), getShardMetrics(getShardMetrics), lowestUtilizationTeam(0), highestUtilizationTeam(0), getShardMetrics(getShardMetrics),
getUnhealthyRelocationCount(getUnhealthyRelocationCount), removeFailedServer(removeFailedServer), getUnhealthyRelocationCount(getUnhealthyRelocationCount), removeFailedServer(removeFailedServer),
ddTrackerStartingEventHolder(makeReference<EventCacheHolder>("DDTrackerStarting")), ddTrackerStartingEventHolder(makeReference<EventCacheHolder>("DDTrackerStarting")),
teamCollectionInfoEventHolder(makeReference<EventCacheHolder>("TeamCollectionInfo")), teamCollectionInfoEventHolder(makeReference<EventCacheHolder>("TeamCollectionInfo")),
storageServerRecruitmentEventHolder( storageServerRecruitmentEventHolder(
makeReference<EventCacheHolder>("StorageServerRecruitment_" + distributorId.toString())) { makeReference<EventCacheHolder>("StorageServerRecruitment_" + distributorId.toString())),
primary(primary), distributorId(distributorId), cx(cx), configuration(configuration),
storageServerSet(new LocalityMap<UID>()) {
if (!primary || configuration.usableRegions == 1) { if (!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId) TraceEvent("DDTrackerStarting", distributorId)
.detail("State", "Inactive") .detail("State", "Inactive")
@ -3691,7 +4020,7 @@ void DDTeamCollection::rebuildMachineLocalityMap() {
.detail("InvalidLocality", locality.toString()); .detail("InvalidLocality", locality.toString());
continue; continue;
} }
const LocalityEntry& localityEntry = machineLocalityMap.add(locality, &representativeServer->id); const LocalityEntry& localityEntry = machineLocalityMap.add(locality, &representativeServer->getId());
machine->second->localityEntry = localityEntry; machine->second->localityEntry = localityEntry;
++numHealthyMachine; ++numHealthyMachine;
} }
@ -3716,7 +4045,7 @@ int DDTeamCollection::addBestMachineTeams(int machineTeamsToBuild) {
int minTeamCount = std::numeric_limits<int>::max(); int minTeamCount = std::numeric_limits<int>::max();
for (auto& machine : machine_info) { for (auto& machine : machine_info) {
// Skip invalid machine whose representative server is not in server_info // Skip invalid machine whose representative server is not in server_info
ASSERT_WE_THINK(server_info.find(machine.second->serversOnMachine[0]->id) != server_info.end()); ASSERT_WE_THINK(server_info.find(machine.second->serversOnMachine[0]->getId()) != server_info.end());
// Skip unhealthy machines // Skip unhealthy machines
if (!isMachineHealthy(machine.second)) if (!isMachineHealthy(machine.second))
continue; continue;
@ -3888,7 +4217,7 @@ Reference<TCMachineTeamInfo> DDTeamCollection::findOneRandomMachineTeam(TCServer
// If we cannot find a healthy machine team // If we cannot find a healthy machine team
TraceEvent("NoHealthyMachineTeamForServer") TraceEvent("NoHealthyMachineTeamForServer")
.detail("ServerID", chosenServer.id) .detail("ServerID", chosenServer.getId())
.detail("MachineTeams", chosenServer.machine->machineTeams.size()); .detail("MachineTeams", chosenServer.machine->machineTeams.size());
return Reference<TCMachineTeamInfo>(); return Reference<TCMachineTeamInfo>();
} }
@ -4173,16 +4502,16 @@ int DDTeamCollection::addTeamsBestOf(int teamsToBuild, int desiredTeams, int max
for (auto& machine : chosenMachineTeam->machines) { for (auto& machine : chosenMachineTeam->machines) {
UID serverID; UID serverID;
if (machine == chosenServer->machine) { if (machine == chosenServer->machine) {
serverID = chosenServer->id; serverID = chosenServer->getId();
++chosenServerCount; ++chosenServerCount;
} else { } else {
std::vector<Reference<TCServerInfo>> healthyProcesses; std::vector<Reference<TCServerInfo>> healthyProcesses;
for (auto it : machine->serversOnMachine) { for (auto it : machine->serversOnMachine) {
if (!server_status.get(it->id).isUnhealthy()) { if (!server_status.get(it->getId()).isUnhealthy()) {
healthyProcesses.push_back(it); healthyProcesses.push_back(it);
} }
} }
serverID = deterministicRandom()->randomChoice(healthyProcesses)->id; serverID = deterministicRandom()->randomChoice(healthyProcesses)->getId();
} }
serverTeam.push_back(serverID); serverTeam.push_back(serverID);
} }
@ -4305,7 +4634,7 @@ void DDTeamCollection::noHealthyTeams() const {
std::set<UID> desiredServerSet; std::set<UID> desiredServerSet;
std::string desc; std::string desc;
for (auto i = server_info.begin(); i != server_info.end(); ++i) { for (auto i = server_info.begin(); i != server_info.end(); ++i) {
ASSERT(i->first == i->second->id); ASSERT(i->first == i->second->getId());
if (!server_status.get(i->first).isFailed) { if (!server_status.get(i->first).isFailed) {
desiredServerSet.insert(i->first); desiredServerSet.insert(i->first);
desc += i->first.shortString() + " (" + i->second->lastKnownInterface.toString() + "), "; desc += i->first.shortString() + " (" + i->second->lastKnownInterface.toString() + "), ";
@ -4330,7 +4659,7 @@ void DDTeamCollection::addServer(StorageServerInterface newServer,
ProcessClass processClass, ProcessClass processClass,
Promise<Void> errorOut, Promise<Void> errorOut,
Version addedVersion, Version addedVersion,
const DDEnabledState* ddEnabledState) { DDEnabledState const& ddEnabledState) {
if (!shouldHandleServer(newServer)) { if (!shouldHandleServer(newServer)) {
return; return;
} }
@ -4429,7 +4758,7 @@ bool DDTeamCollection::removeTeam(Reference<TCTeamInfo> team) {
} }
Reference<TCMachineInfo> DDTeamCollection::checkAndCreateMachine(Reference<TCServerInfo> server) { Reference<TCMachineInfo> DDTeamCollection::checkAndCreateMachine(Reference<TCServerInfo> server) {
ASSERT(server.isValid() && server_info.find(server->id) != server_info.end()); ASSERT(server.isValid() && server_info.find(server->getId()) != server_info.end());
auto& locality = server->lastKnownInterface.locality; auto& locality = server->lastKnownInterface.locality;
Standalone<StringRef> machine_id = locality.zoneId().get(); // locality to machine_id with std::string type Standalone<StringRef> machine_id = locality.zoneId().get(); // locality to machine_id with std::string type
@ -4438,7 +4767,7 @@ Reference<TCMachineInfo> DDTeamCollection::checkAndCreateMachine(Reference<TCSer
// uid is the first storage server process on the machine // uid is the first storage server process on the machine
TEST(true); // First storage server in process on the machine TEST(true); // First storage server in process on the machine
// For each machine, store the first server's localityEntry into machineInfo for later use. // For each machine, store the first server's localityEntry into machineInfo for later use.
LocalityEntry localityEntry = machineLocalityMap.add(locality, &server->id); LocalityEntry localityEntry = machineLocalityMap.add(locality, &server->getId());
machineInfo = makeReference<TCMachineInfo>(server, localityEntry); machineInfo = makeReference<TCMachineInfo>(server, localityEntry);
machine_info.insert(std::make_pair(machine_id, machineInfo)); machine_info.insert(std::make_pair(machine_id, machineInfo));
} else { } else {
@ -4746,3 +5075,15 @@ bool DDTeamCollection::exclusionSafetyCheck(std::vector<UID>& excludeServerIDs)
} }
return true; return true;
} }
Future<Void> DDTeamCollection::run(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const& ddEnabledState) {
return DDTeamCollectionImpl::run(teamCollection, initData, tci, recruitStorage, &ddEnabledState);
}
Future<Void> DDTeamCollection::printSnapshotTeamsInfo(Reference<DDTeamCollection> self) {
return DDTeamCollectionImpl::printSnapshotTeamsInfo(self);
}

View File

@ -172,6 +172,101 @@ typedef AsyncMap<UID, ServerStatus> ServerStatusMap;
class DDTeamCollection : public ReferenceCounted<DDTeamCollection> { class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
friend class DDTeamCollectionImpl; friend class DDTeamCollectionImpl;
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3 };
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
// addActor is used to create the actorCollection when the dataDistributionTeamCollection is created
PromiseStream<Future<Void>> addActor;
bool doBuildTeams;
bool lastBuildTeamsFailed;
Future<Void> teamBuilder;
AsyncTrigger restartTeamBuilder;
AsyncVar<bool> waitUntilRecruited; // make teambuilder wait until one new SS is recruited
MoveKeysLock lock;
PromiseStream<RelocateShard> output;
std::vector<UID> allServers;
int64_t unhealthyServers;
std::map<int, int> priority_teams;
std::map<UID, Reference<TCServerInfo>> tss_info_by_pair;
std::map<UID, Reference<TCServerInfo>> server_and_tss_info; // TODO could replace this with an efficient way to do a
// read-only concatenation of 2 data structures?
std::map<Key, int> lagging_zones; // zone to number of storage servers lagging
AsyncVar<bool> disableFailingLaggingServers;
// storage wiggle info
Reference<StorageWiggler> storageWiggler;
std::vector<AddressExclusion> wiggleAddresses; // collection of wiggling servers' address
Optional<UID> wigglingId; // Process id of current wiggling storage server;
Reference<AsyncVar<bool>> pauseWiggle;
Reference<AsyncVar<bool>> processingWiggle; // track whether wiggling relocation is being processed
PromiseStream<StorageWiggleValue> nextWiggleInfo;
std::vector<Reference<TCTeamInfo>> badTeams;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
PromiseStream<UID> removedServers;
PromiseStream<UID> removedTSS;
std::set<UID> recruitingIds; // The IDs of the SS/TSS which are being recruited
std::set<NetworkAddress> recruitingLocalities;
Future<Void> initialFailureReactionDelay;
Future<Void> initializationDoneActor;
Promise<Void> serverTrackerErrorOut;
AsyncVar<int> recruitingStream;
Debouncer restartRecruiting;
int healthyTeamCount;
Reference<AsyncVar<bool>> zeroHealthyTeams;
int optimalTeamCount;
AsyncVar<bool> zeroOptimalTeams;
int bestTeamKeepStuckCount = 0;
bool isTssRecruiting; // If tss recruiting is waiting on a pair, don't consider DD recruiting for the purposes of
// QuietDB
std::set<AddressExclusion>
invalidLocalityAddr; // These address have invalidLocality for the configured storagePolicy
std::vector<Optional<Key>> includedDCs;
Optional<std::vector<Optional<Key>>> otherTrackedDCs;
Reference<AsyncVar<bool>> processingUnhealthy;
Future<Void> readyToStart;
Future<Void> checkTeamDelay;
Promise<Void> addSubsetComplete;
Future<Void> badTeamRemover;
Future<Void> checkInvalidLocalities;
Future<Void> wrongStoreTypeRemover;
AsyncVar<Optional<Key>> healthyZone;
Future<bool> clearHealthyZoneFuture;
double medianAvailableSpace;
double lastMedianAvailableSpaceUpdate;
int lowestUtilizationTeam;
int highestUtilizationTeam;
PromiseStream<GetMetricsRequest> getShardMetrics;
PromiseStream<Promise<int>> getUnhealthyRelocationCount;
Promise<UID> removeFailedServer;
// WIGGLING if an address is under storage wiggling.
// EXCLUDED if an address is in the excluded list in the database.
// FAILED if an address is permanently failed.
// NONE by default. Updated asynchronously (eventually)
AsyncMap<AddressExclusion, Status> excludedServers;
Reference<EventCacheHolder> ddTrackerStartingEventHolder;
Reference<EventCacheHolder> teamCollectionInfoEventHolder;
Reference<EventCacheHolder> storageServerRecruitmentEventHolder;
bool primary;
UID distributorId;
LocalityMap<UID> machineLocalityMap; // locality info of machines
// Randomly choose one machine team that has chosenServer and has the correct size // Randomly choose one machine team that has chosenServer and has the correct size
// When configuration is changed, we may have machine teams with old storageTeamSize // When configuration is changed, we may have machine teams with old storageTeamSize
Reference<TCMachineTeamInfo> findOneRandomMachineTeam(TCServerInfo const& chosenServer) const; Reference<TCMachineTeamInfo> findOneRandomMachineTeam(TCServerInfo const& chosenServer) const;
@ -315,16 +410,16 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
TCServerInfo* server, TCServerInfo* server,
Promise<Void> errorOut, Promise<Void> errorOut,
Version addedVersion, Version addedVersion,
const DDEnabledState* ddEnabledState, DDEnabledState const& ddEnabledState,
bool isTss); bool isTss);
bool teamContainsFailedServer(Reference<TCTeamInfo> team) const; bool teamContainsFailedServer(Reference<TCTeamInfo> team) const;
// NOTE: this actor returns when the cluster is healthy and stable (no server is expected to be removed in a period) // NOTE: this actor returns when the cluster is healthy and stable (no server is expected to be removed in a period)
// processingWiggle and processingUnhealthy indicate that some servers are going to be removed. // processingWiggle and processingUnhealthy indicate that some servers are going to be removed.
Future<Void> waitUntilHealthy(double extraDelay = 0, bool waitWiggle = false); Future<Void> waitUntilHealthy(double extraDelay = 0, bool waitWiggle = false) const;
bool isCorrectDC(TCServerInfo* server) const; bool isCorrectDC(TCServerInfo const& server) const;
// Set the server's storeType; Error is caught by the caller // Set the server's storeType; Error is caught by the caller
Future<Void> keyValueStoreTypeTracker(TCServerInfo* server); Future<Void> keyValueStoreTypeTracker(TCServerInfo* server);
@ -343,22 +438,22 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
// Iterate over each storage process to do storage wiggle. After initializing the first Process ID, it waits a // Iterate over each storage process to do storage wiggle. After initializing the first Process ID, it waits a
// signal from `perpetualStorageWiggler` indicating the wiggling of current process is finished. Then it writes the // signal from `perpetualStorageWiggler` indicating the wiggling of current process is finished. Then it writes the
// next Process ID to a system key: `perpetualStorageWiggleIDPrefix` to show the next process to wiggle. // next Process ID to a system key: `perpetualStorageWiggleIDPrefix` to show the next process to wiggle.
Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal, Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>& stopSignal,
FutureStream<Void> finishStorageWiggleSignal); FutureStream<Void> finishStorageWiggleSignal);
// periodically check whether the cluster is healthy if we continue perpetual wiggle // periodically check whether the cluster is healthy if we continue perpetual wiggle
Future<Void> clusterHealthCheckForPerpetualWiggle(int* extraTeamCount); Future<Void> clusterHealthCheckForPerpetualWiggle(int& extraTeamCount);
// Watches the value change of `perpetualStorageWiggleIDPrefix`, and adds the storage server into excludeServers // Watches the value change of `perpetualStorageWiggleIDPrefix`, and adds the storage server into excludeServers
// which prevent recruiting the wiggling storage servers and let teamTracker start to move data off the affected // which prevent recruiting the wiggling storage servers and let teamTracker start to move data off the affected
// teams. The wiggling process of current storage servers will be paused if the cluster is unhealthy and restarted // teams. The wiggling process of current storage servers will be paused if the cluster is unhealthy and restarted
// once the cluster is healthy again. // once the cluster is healthy again.
Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal, PromiseStream<Void> finishStorageWiggleSignal); Future<Void> perpetualStorageWiggler(AsyncVar<bool>& stopSignal, PromiseStream<Void> finishStorageWiggleSignal);
int numExistingSSOnAddr(const AddressExclusion& addr) const; int numExistingSSOnAddr(const AddressExclusion& addr) const;
Future<Void> initializeStorage(RecruitStorageReply candidateWorker, Future<Void> initializeStorage(RecruitStorageReply candidateWorker,
DDEnabledState const* ddEnabledState, DDEnabledState const& ddEnabledState,
bool recruitTss, bool recruitTss,
Reference<TSSPairState> tssState); Reference<TSSPairState> tssState);
@ -383,111 +478,66 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
// Read storage metadata from database, and do necessary updates // Read storage metadata from database, and do necessary updates
Future<Void> readOrCreateStorageMetadata(TCServerInfo* server); Future<Void> readOrCreateStorageMetadata(TCServerInfo* server);
public: Future<Void> serverGetTeamRequests(TeamCollectionInterface tci);
// clang-format off
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3};
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error. Future<Void> removeBadTeams();
// addActor is used to create the actorCollection when the dataDistributionTeamCollection is created
PromiseStream<Future<Void>> addActor; Future<Void> machineTeamRemover();
// Remove the server team whose members have the most number of process teams
// until the total number of server teams is no larger than the desired number
Future<Void> serverTeamRemover();
Future<Void> removeWrongStoreType();
// Check if the number of server (and machine teams) is larger than the maximum allowed number
void traceTeamCollectionInfo() const;
Future<Void> updateReplicasKey(Optional<Key> dcId);
Future<Void> storageRecruiter(Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const& ddEnabledState);
// Monitor whether or not storage servers are being recruited. If so, then a database cannot be considered quiet
Future<Void> monitorStorageServerRecruitment();
// The serverList system keyspace keeps the StorageServerInterface for each serverID. Storage server's storeType
// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk,
// process on each machine creates the TCServer with the correct serverID and StorageServerInterface.
Future<Void> waitServerListChange(FutureStream<Void> serverRemoved, DDEnabledState const& ddEnabledState);
Future<Void> trackExcludedServers();
Future<Void> monitorHealthyTeams();
// This coroutine sets a watch to monitor the value change of `perpetualStorageWiggleKey` which is controlled by
// command `configure perpetual_storage_wiggle=$value` if the value is 1, this actor start 2 actors,
// `perpetualStorageWiggleIterator` and `perpetualStorageWiggler`. Otherwise, it sends stop signal to them.
Future<Void> monitorPerpetualStorageWiggle();
Future<Void> waitHealthyZoneChange();
int64_t getDebugTotalDataInFlight() const;
void noHealthyTeams() const;
public:
Database cx; Database cx;
UID distributorId;
DatabaseConfiguration configuration; DatabaseConfiguration configuration;
bool doBuildTeams;
bool lastBuildTeamsFailed;
Future<Void> teamBuilder;
AsyncTrigger restartTeamBuilder;
AsyncVar<bool> waitUntilRecruited; // make teambuilder wait until one new SS is recruited
MoveKeysLock lock;
PromiseStream<RelocateShard> output;
std::vector<UID> allServers;
ServerStatusMap server_status; ServerStatusMap server_status;
int64_t unhealthyServers;
std::map<int,int> priority_teams;
std::map<UID, Reference<TCServerInfo>> server_info; std::map<UID, Reference<TCServerInfo>> server_info;
std::map<UID, Reference<TCServerInfo>> tss_info_by_pair;
std::map<UID, Reference<TCServerInfo>> server_and_tss_info; // TODO could replace this with an efficient way to do a read-only concatenation of 2 data structures?
std::map<Key, int> lagging_zones; // zone to number of storage servers lagging
AsyncVar<bool> disableFailingLaggingServers;
// storage wiggle info
Reference<StorageWiggler> storageWiggler;
std::vector<AddressExclusion> wiggleAddresses; // collection of wiggling servers' address
Optional<UID> wigglingId; // Process id of current wiggling storage server;
Reference<AsyncVar<bool>> pauseWiggle;
Reference<AsyncVar<bool>> processingWiggle; // track whether wiggling relocation is being processed
PromiseStream<StorageWiggleValue> nextWiggleInfo;
// machine_info has all machines info; key must be unique across processes on the same machine // machine_info has all machines info; key must be unique across processes on the same machine
std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info; std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info;
std::vector<Reference<TCMachineTeamInfo>> machineTeams; // all machine teams std::vector<Reference<TCMachineTeamInfo>> machineTeams; // all machine teams
LocalityMap<UID> machineLocalityMap; // locality info of machines
std::vector<Reference<TCTeamInfo>> teams; std::vector<Reference<TCTeamInfo>> teams;
std::vector<Reference<TCTeamInfo>> badTeams;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
PromiseStream<UID> removedServers;
PromiseStream<UID> removedTSS;
std::set<UID> recruitingIds; // The IDs of the SS/TSS which are being recruited
std::set<NetworkAddress> recruitingLocalities;
Future<Void> initialFailureReactionDelay;
Future<Void> initializationDoneActor;
Promise<Void> serverTrackerErrorOut;
AsyncVar<int> recruitingStream;
Debouncer restartRecruiting;
int healthyTeamCount;
Reference<AsyncVar<bool>> zeroHealthyTeams;
int optimalTeamCount;
AsyncVar<bool> zeroOptimalTeams;
int bestTeamKeepStuckCount = 0;
bool isTssRecruiting; // If tss recruiting is waiting on a pair, don't consider DD recruiting for the purposes of QuietDB
// WIGGLING if an address is under storage wiggling.
// EXCLUDED if an address is in the excluded list in the database.
// FAILED if an address is permanently failed.
// NONE by default. Updated asynchronously (eventually)
AsyncMap< AddressExclusion, Status > excludedServers;
std::set<AddressExclusion> invalidLocalityAddr; // These address have invalidLocality for the configured storagePolicy
std::vector<Optional<Key>> includedDCs;
Optional<std::vector<Optional<Key>>> otherTrackedDCs;
bool primary;
Reference<AsyncVar<bool>> processingUnhealthy;
Future<Void> readyToStart;
Future<Void> checkTeamDelay;
Promise<Void> addSubsetComplete;
Future<Void> badTeamRemover;
Future<Void> checkInvalidLocalities;
Future<Void> wrongStoreTypeRemover;
Reference<LocalitySet> storageServerSet;
std::vector<DDTeamCollection*> teamCollections; std::vector<DDTeamCollection*> teamCollections;
AsyncVar<Optional<Key>> healthyZone;
Future<bool> clearHealthyZoneFuture;
double medianAvailableSpace;
double lastMedianAvailableSpaceUpdate;
// clang-format on
int lowestUtilizationTeam;
int highestUtilizationTeam;
AsyncTrigger printDetailedTeamsInfo; AsyncTrigger printDetailedTeamsInfo;
PromiseStream<GetMetricsRequest> getShardMetrics; Reference<LocalitySet> storageServerSet;
PromiseStream<Promise<int>> getUnhealthyRelocationCount;
Promise<UID> removeFailedServer;
Reference<EventCacheHolder> ddTrackerStartingEventHolder;
Reference<EventCacheHolder> teamCollectionInfoEventHolder;
Reference<EventCacheHolder> storageServerRecruitmentEventHolder;
DDTeamCollection(Database const& cx, DDTeamCollection(Database const& cx,
UID distributorId, UID distributorId,
@ -517,9 +567,7 @@ public:
Future<Void> getTeam(GetTeamRequest); Future<Void> getTeam(GetTeamRequest);
int64_t getDebugTotalDataInFlight() const; Future<Void> init(Reference<InitialDataDistribution> initTeams, DDEnabledState const& ddEnabledState);
Future<Void> init(Reference<InitialDataDistribution> initTeams, DDEnabledState const* ddEnabledState);
// Assume begin to end is sorted by std::sort // Assume begin to end is sorted by std::sort
// Assume InputIt is iterator to UID // Assume InputIt is iterator to UID
@ -547,14 +595,17 @@ public:
void addTeam(std::set<UID> const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); } void addTeam(std::set<UID> const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); }
// FIXME: Public for testing only
// Group storage servers (process) based on their machineId in LocalityData // Group storage servers (process) based on their machineId in LocalityData
// All created machines are healthy // All created machines are healthy
// Return The number of healthy servers we grouped into machines // Return The number of healthy servers we grouped into machines
int constructMachinesFromServers(); int constructMachinesFromServers();
// FIXME: Public for testing only
// To enable verbose debug info, set shouldPrint to true // To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) const; void traceAllInfo(bool shouldPrint = false) const;
// FIXME: Public for testing only
// Create machineTeamsToBuild number of machine teams // Create machineTeamsToBuild number of machine teams
// No operation if machineTeamsToBuild is 0 // No operation if machineTeamsToBuild is 0
// Note: The creation of machine teams should not depend on server teams: // Note: The creation of machine teams should not depend on server teams:
@ -566,6 +617,7 @@ public:
// return number of added machine teams // return number of added machine teams
int addBestMachineTeams(int machineTeamsToBuild); int addBestMachineTeams(int machineTeamsToBuild);
// FIXME: Public for testing only
// Sanity check the property of teams in unit test // Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams // Return true if all server teams belong to machine teams
bool sanityCheckTeams() const; bool sanityCheckTeams() const;
@ -576,19 +628,15 @@ public:
// build an extra machine team and record the event in trace // build an extra machine team and record the event in trace
int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams); int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams);
// Check if the number of server (and machine teams) is larger than the maximum allowed number
void traceTeamCollectionInfo() const;
void noHealthyTeams() const;
void addServer(StorageServerInterface newServer, void addServer(StorageServerInterface newServer,
ProcessClass processClass, ProcessClass processClass,
Promise<Void> errorOut, Promise<Void> errorOut,
Version addedVersion, Version addedVersion,
const DDEnabledState* ddEnabledState); DDEnabledState const& ddEnabledState);
bool removeTeam(Reference<TCTeamInfo> team); bool removeTeam(Reference<TCTeamInfo> team);
// FIXME: Public for testing only
// Check if the server belongs to a machine; if not, create the machine. // Check if the server belongs to a machine; if not, create the machine.
// Establish the two-direction link between server and machine // Establish the two-direction link between server and machine
Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server); Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server);
@ -597,43 +645,22 @@ public:
void removeServer(UID removedServer); void removeServer(UID removedServer);
Future<Void> removeWrongStoreType();
Future<Void> removeBadTeams();
Future<Void> machineTeamRemover();
// Remove the server team whose members have the most number of process teams
// until the total number of server teams is no larger than the desired number
Future<Void> serverTeamRemover();
Future<Void> trackExcludedServers();
// This coroutine sets a watch to monitor the value change of `perpetualStorageWiggleKey` which is controlled by
// command `configure perpetual_storage_wiggle=$value` if the value is 1, this actor start 2 actors,
// `perpetualStorageWiggleIterator` and `perpetualStorageWiggler`. Otherwise, it sends stop signal to them.
Future<Void> monitorPerpetualStorageWiggle();
// The serverList system keyspace keeps the StorageServerInterface for each serverID. Storage server's storeType
// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk,
// process on each machine creates the TCServer with the correct serverID and StorageServerInterface.
Future<Void> waitServerListChange(FutureStream<Void> serverRemoved, const DDEnabledState* ddEnabledState);
Future<Void> waitHealthyZoneChange();
// Monitor whether or not storage servers are being recruited. If so, then a database cannot be considered quiet
Future<Void> monitorStorageServerRecruitment();
Future<Void> storageRecruiter(Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const* ddEnabledState);
Future<Void> updateReplicasKey(Optional<Key> dcId);
Future<Void> serverGetTeamRequests(TeamCollectionInterface tci);
Future<Void> monitorHealthyTeams();
// Find size of set intersection of excludeServerIDs and serverIDs on each team and see if the leftover team is // Find size of set intersection of excludeServerIDs and serverIDs on each team and see if the leftover team is
// valid // valid
bool exclusionSafetyCheck(std::vector<UID>& excludeServerIDs); bool exclusionSafetyCheck(std::vector<UID>& excludeServerIDs);
bool isPrimary() const { return primary; }
UID getDistributorId() const { return distributorId; }
// Keep track of servers and teams -- serves requests for getRandomTeam
static Future<Void> run(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const& ddEnabledState);
// Take a snapshot of necessary data structures from `DDTeamCollection` and print them out with yields to avoid slow
// task on the run loop.
static Future<Void> printSnapshotTeamsInfo(Reference<DDTeamCollection> self);
}; };

View File

@ -238,16 +238,16 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Data
// add server to wiggling queue // add server to wiggling queue
void StorageWiggler::addServer(const UID& serverId, const StorageMetadataType& metadata) { void StorageWiggler::addServer(const UID& serverId, const StorageMetadataType& metadata) {
// std::cout << "size: " << pq_handles.size() << " add " << serverId.toString() << " DC: "<< teamCollection->primary // std::cout << "size: " << pq_handles.size() << " add " << serverId.toString() << " DC: "
// << std::endl; // << teamCollection->isPrimary() << std::endl;
ASSERT(!pq_handles.count(serverId)); ASSERT(!pq_handles.count(serverId));
pq_handles[serverId] = wiggle_pq.emplace(metadata, serverId); pq_handles[serverId] = wiggle_pq.emplace(metadata, serverId);
nonEmpty.set(true); nonEmpty.set(true);
} }
void StorageWiggler::removeServer(const UID& serverId) { void StorageWiggler::removeServer(const UID& serverId) {
// std::cout << "size: " << pq_handles.size() << " remove " << serverId.toString() << " DC: "<< // std::cout << "size: " << pq_handles.size() << " remove " << serverId.toString() << " DC: "
// teamCollection->primary <<std::endl; // << teamCollection->isPrimary() << std::endl;
if (contains(serverId)) { // server haven't been popped if (contains(serverId)) { // server haven't been popped
auto handle = pq_handles.at(serverId); auto handle = pq_handles.at(serverId);
pq_handles.erase(serverId); pq_handles.erase(serverId);
@ -258,7 +258,7 @@ void StorageWiggler::removeServer(const UID& serverId) {
void StorageWiggler::updateMetadata(const UID& serverId, const StorageMetadataType& metadata) { void StorageWiggler::updateMetadata(const UID& serverId, const StorageMetadataType& metadata) {
// std::cout << "size: " << pq_handles.size() << " update " << serverId.toString() // std::cout << "size: " << pq_handles.size() << " update " << serverId.toString()
// << " DC: " << teamCollection->primary << std::endl; // << " DC: " << teamCollection->isPrimary() << std::endl;
auto handle = pq_handles.at(serverId); auto handle = pq_handles.at(serverId);
if ((*handle).first.createdTime == metadata.createdTime) { if ((*handle).first.createdTime == metadata.createdTime) {
return; return;
@ -280,7 +280,7 @@ Future<Void> StorageWiggler::resetStats() {
auto newMetrics = StorageWiggleMetrics(); auto newMetrics = StorageWiggleMetrics();
newMetrics.smoothed_round_duration = metrics.smoothed_round_duration; newMetrics.smoothed_round_duration = metrics.smoothed_round_duration;
newMetrics.smoothed_wiggle_duration = metrics.smoothed_wiggle_duration; newMetrics.smoothed_wiggle_duration = metrics.smoothed_wiggle_duration;
return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->primary, newMetrics); return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->isPrimary(), newMetrics);
} }
Future<Void> StorageWiggler::restoreStats() { Future<Void> StorageWiggler::restoreStats() {
@ -291,7 +291,7 @@ Future<Void> StorageWiggler::restoreStats() {
} }
return Void(); return Void();
}; };
auto readFuture = StorageWiggleMetrics::runGetTransaction(teamCollection->cx, teamCollection->primary); auto readFuture = StorageWiggleMetrics::runGetTransaction(teamCollection->cx, teamCollection->isPrimary());
return map(readFuture, assignFunc); return map(readFuture, assignFunc);
} }
Future<Void> StorageWiggler::startWiggle() { Future<Void> StorageWiggler::startWiggle() {
@ -299,7 +299,7 @@ Future<Void> StorageWiggler::startWiggle() {
if (shouldStartNewRound()) { if (shouldStartNewRound()) {
metrics.last_round_start = metrics.last_wiggle_start; metrics.last_round_start = metrics.last_wiggle_start;
} }
return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->primary, metrics); return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->isPrimary(), metrics);
} }
Future<Void> StorageWiggler::finishWiggle() { Future<Void> StorageWiggler::finishWiggle() {
@ -314,225 +314,7 @@ Future<Void> StorageWiggler::finishWiggle() {
duration = metrics.last_round_finish - metrics.last_round_start; duration = metrics.last_round_finish - metrics.last_round_start;
metrics.smoothed_round_duration.setTotal((double)duration); metrics.smoothed_round_duration.setTotal((double)duration);
} }
return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->primary, metrics); return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->isPrimary(), metrics);
}
// Take a snapshot of necessary data structures from `DDTeamCollection` and print them out with yields to avoid slow
// task on the run loop.
ACTOR Future<Void> printSnapshotTeamsInfo(Reference<DDTeamCollection> self) {
state DatabaseConfiguration configuration;
state std::map<UID, Reference<TCServerInfo>> server_info;
state std::map<UID, ServerStatus> server_status;
state std::vector<Reference<TCTeamInfo>> teams;
state std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info;
state std::vector<Reference<TCMachineTeamInfo>> machineTeams;
// state std::vector<std::string> internedLocalityRecordKeyNameStrings;
// state int machineLocalityMapEntryArraySize;
// state std::vector<Reference<LocalityRecord>> machineLocalityMapRecordArray;
state int traceEventsPrinted = 0;
state std::vector<const UID*> serverIDs;
state double lastPrintTime = 0;
state ReadYourWritesTransaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Future<Void> watchFuture = tr.watch(triggerDDTeamInfoPrintKey);
wait(tr.commit());
wait(self->printDetailedTeamsInfo.onTrigger() || watchFuture);
tr.reset();
if (now() - lastPrintTime < SERVER_KNOBS->DD_TEAMS_INFO_PRINT_INTERVAL) {
continue;
}
lastPrintTime = now();
traceEventsPrinted = 0;
double snapshotStart = now();
configuration = self->configuration;
server_info = self->server_info;
teams = self->teams;
// Perform deep copy so we have a consistent snapshot, even if yields are performed
for (const auto& [machineId, info] : self->machine_info) {
machine_info.emplace(machineId, info->clone());
}
machineTeams = self->machineTeams;
// internedLocalityRecordKeyNameStrings = self->machineLocalityMap._keymap->_lookuparray;
// machineLocalityMapEntryArraySize = self->machineLocalityMap.size();
// machineLocalityMapRecordArray = self->machineLocalityMap.getRecordArray();
std::vector<const UID*> _uids = self->machineLocalityMap.getObjects();
serverIDs = _uids;
auto const& keys = self->server_status.getKeys();
for (auto const& key : keys) {
// Add to or update the local server_status map
server_status[key] = self->server_status.get(key);
}
TraceEvent("DDPrintSnapshotTeasmInfo", self->distributorId)
.detail("SnapshotSpeed", now() - snapshotStart)
.detail("Primary", self->primary);
// Print to TraceEvents
TraceEvent("DDConfig", self->distributorId)
.detail("StorageTeamSize", configuration.storageTeamSize)
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER)
.detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER)
.detail("Primary", self->primary);
TraceEvent("ServerInfo", self->distributorId)
.detail("Size", server_info.size())
.detail("Primary", self->primary);
state int i;
state std::map<UID, Reference<TCServerInfo>>::iterator server = server_info.begin();
for (i = 0; i < server_info.size(); i++) {
TraceEvent("ServerInfo", self->distributorId)
.detail("ServerInfoIndex", i)
.detail("ServerID", server->first.toString())
.detail("ServerTeamOwned", server->second->teams.size())
.detail("MachineID", server->second->machine->machineID.contents().toString())
.detail("Primary", self->primary);
server++;
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
server = server_info.begin();
for (i = 0; i < server_info.size(); i++) {
const UID& uid = server->first;
TraceEvent e("ServerStatus", self->distributorId);
e.detail("ServerUID", uid)
.detail("MachineIsValid", server_info[uid]->machine.isValid())
.detail("MachineTeamSize",
server_info[uid]->machine.isValid() ? server_info[uid]->machine->machineTeams.size() : -1)
.detail("Primary", self->primary);
// ServerStatus might not be known if server was very recently added and storageServerFailureTracker()
// has not yet updated self->server_status
// If the UID is not found, do not assume the server is healthy or unhealthy
auto it = server_status.find(uid);
if (it != server_status.end()) {
e.detail("Healthy", !it->second.isUnhealthy());
}
server++;
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
TraceEvent("ServerTeamInfo", self->distributorId)
.detail("Size", teams.size())
.detail("Primary", self->primary);
for (i = 0; i < teams.size(); i++) {
const auto& team = teams[i];
TraceEvent("ServerTeamInfo", self->distributorId)
.detail("TeamIndex", i)
.detail("Healthy", team->isHealthy())
.detail("TeamSize", team->size())
.detail("MemberIDs", team->getServerIDsStr())
.detail("Primary", self->primary);
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
TraceEvent("MachineInfo", self->distributorId)
.detail("Size", machine_info.size())
.detail("Primary", self->primary);
state std::map<Standalone<StringRef>, Reference<TCMachineInfo>>::iterator machine = machine_info.begin();
state bool isMachineHealthy = false;
for (i = 0; i < machine_info.size(); i++) {
Reference<TCMachineInfo> _machine = machine->second;
if (!_machine.isValid() || machine_info.find(_machine->machineID) == machine_info.end() ||
_machine->serversOnMachine.empty()) {
isMachineHealthy = false;
}
// Healthy machine has at least one healthy server
for (auto& server : _machine->serversOnMachine) {
// ServerStatus might not be known if server was very recently added and
// storageServerFailureTracker() has not yet updated self->server_status If the UID is not found, do
// not assume the server is healthy
auto it = server_status.find(server->id);
if (it != server_status.end() && !it->second.isUnhealthy()) {
isMachineHealthy = true;
}
}
isMachineHealthy = false;
TraceEvent("MachineInfo", self->distributorId)
.detail("MachineInfoIndex", i)
.detail("Healthy", isMachineHealthy)
.detail("MachineID", machine->first.contents().toString())
.detail("MachineTeamOwned", machine->second->machineTeams.size())
.detail("ServerNumOnMachine", machine->second->serversOnMachine.size())
.detail("ServersID", machine->second->getServersIDStr())
.detail("Primary", self->primary);
machine++;
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
TraceEvent("MachineTeamInfo", self->distributorId)
.detail("Size", machineTeams.size())
.detail("Primary", self->primary);
for (i = 0; i < machineTeams.size(); i++) {
const auto& team = machineTeams[i];
TraceEvent("MachineTeamInfo", self->distributorId)
.detail("TeamIndex", i)
.detail("MachineIDs", team->getMachineIDsStr())
.detail("ServerTeams", team->serverTeams.size())
.detail("Primary", self->primary);
if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
wait(yield());
}
}
// TODO: re-enable the following logging or remove them.
// TraceEvent("LocalityRecordKeyName", self->distributorId)
// .detail("Size", internedLocalityRecordKeyNameStrings.size())
// .detail("Primary", self->primary);
// for (i = 0; i < internedLocalityRecordKeyNameStrings.size(); i++) {
// TraceEvent("LocalityRecordKeyIndexName", self->distributorId)
// .detail("KeyIndex", i)
// .detail("KeyName", internedLocalityRecordKeyNameStrings[i])
// .detail("Primary", self->primary);
// if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
// wait(yield());
// }
// }
// TraceEvent("MachineLocalityMap", self->distributorId)
// .detail("Size", machineLocalityMapEntryArraySize)
// .detail("Primary", self->primary);
// for (i = 0; i < serverIDs.size(); i++) {
// const auto& serverID = serverIDs[i];
// Reference<LocalityRecord> record = machineLocalityMapRecordArray[i];
// if (record.isValid()) {
// TraceEvent("MachineLocalityMap", self->distributorId)
// .detail("LocalityIndex", i)
// .detail("UID", serverID->toString())
// .detail("LocalityRecord", record->toString())
// .detail("Primary", self->primary);
// } else {
// TraceEvent("MachineLocalityMap", self->distributorId)
// .detail("LocalityIndex", i)
// .detail("UID", serverID->toString())
// .detail("LocalityRecord", "[NotFound]")
// .detail("Primary", self->primary);
// }
// if (++traceEventsPrinted % SERVER_KNOBS->DD_TEAMS_INFO_PRINT_YIELD_COUNT == 0) {
// wait(yield());
// }
// }
} catch (Error& e) {
wait(tr.onError(e));
}
}
} }
ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses( ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(
@ -564,117 +346,6 @@ ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
return Void(); return Void();
} }
// Keep track of servers and teams -- serves requests for getRandomTeam
ACTOR Future<Void> dataDistributionTeamCollection(
Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
state Future<Void> error = actorCollection(self->addActor.getFuture());
try {
wait(self->init(initData, ddEnabledState));
initData = Reference<InitialDataDistribution>();
self->addActor.send(self->serverGetTeamRequests(tci));
TraceEvent("DDTeamCollectionBegin", self->distributorId).detail("Primary", self->primary);
wait(self->readyToStart || error);
TraceEvent("DDTeamCollectionReadyToStart", self->distributorId).detail("Primary", self->primary);
// removeBadTeams() does not always run. We may need to restart the actor when needed.
// So we need the badTeamRemover variable to check if the actor is ready.
if (self->badTeamRemover.isReady()) {
self->badTeamRemover = self->removeBadTeams();
self->addActor.send(self->badTeamRemover);
}
self->addActor.send(self->machineTeamRemover());
self->addActor.send(self->serverTeamRemover());
if (self->wrongStoreTypeRemover.isReady()) {
self->wrongStoreTypeRemover = self->removeWrongStoreType();
self->addActor.send(self->wrongStoreTypeRemover);
}
self->traceTeamCollectionInfo();
if (self->includedDCs.size()) {
// start this actor before any potential recruitments can happen
self->addActor.send(self->updateReplicasKey(self->includedDCs[0]));
}
// The following actors (e.g. storageRecruiter) do not need to be assigned to a variable because
// they are always running.
self->addActor.send(self->storageRecruiter(recruitStorage, ddEnabledState));
self->addActor.send(self->monitorStorageServerRecruitment());
self->addActor.send(self->waitServerListChange(serverRemoved.getFuture(), ddEnabledState));
self->addActor.send(self->trackExcludedServers());
self->addActor.send(self->monitorHealthyTeams());
self->addActor.send(self->waitHealthyZoneChange());
self->addActor.send(self->monitorPerpetualStorageWiggle());
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
loop choose {
when(UID removedServer = waitNext(self->removedServers.getFuture())) {
TEST(true); // Storage server removed from database
self->removeServer(removedServer);
serverRemoved.send(Void());
self->restartRecruiting.trigger();
}
when(UID removedTSS = waitNext(self->removedTSS.getFuture())) {
TEST(true); // TSS removed from database
self->removeTSS(removedTSS);
serverRemoved.send(Void());
self->restartRecruiting.trigger();
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->zeroHealthyTeams->get()) {
self->restartRecruiting.trigger();
self->noHealthyTeams();
}
}
when(wait(loggingTrigger)) {
int highestPriority = 0;
for (auto it : self->priority_teams) {
if (it.second > 0) {
highestPriority = std::max(highestPriority, it.first);
}
}
TraceEvent("TotalDataInFlight", self->distributorId)
.detail("Primary", self->primary)
.detail("TotalBytes", self->getDebugTotalDataInFlight())
.detail("UnhealthyServers", self->unhealthyServers)
.detail("ServerCount", self->server_info.size())
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("HighestPriority", highestPriority)
.trackLatest(
self->primary
? "TotalDataInFlight"
: "TotalDataInFlightRemote"); // This trace event's trackLatest lifetime is controlled by
// DataDistributorData::totalDataInFlightEventHolder or
// DataDistributorData::totalDataInFlightRemoteEventHolder.
// The track latest key we use here must match the key used in
// the holder.
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
}
when(wait(self->serverTrackerErrorOut.getFuture())) {} // Propagate errors from storageServerTracker
when(wait(error)) {}
}
} catch (Error& e) {
if (e.code() != error_code_movekeys_conflict)
TraceEvent(SevError, "DataDistributionTeamCollectionError", self->distributorId).error(e);
throw e;
}
}
ACTOR Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) { ACTOR Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
state Transaction tr(cx); state Transaction tr(cx);
loop { loop {
@ -1131,24 +802,22 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
getUnhealthyRelocationCount); getUnhealthyRelocationCount);
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr()); teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs; remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( actors.push_back(reportErrorsExcept(
reportErrorsExcept(dataDistributionTeamCollection( DDTeamCollection::run(remoteTeamCollection, initData, tcis[1], recruitStorage, *ddEnabledState),
remoteTeamCollection, initData, tcis[1], recruitStorage, ddEnabledState), "DDTeamCollectionSecondary",
"DDTeamCollectionSecondary", self->ddId,
self->ddId, &normalDDQueueErrors()));
&normalDDQueueErrors())); actors.push_back(DDTeamCollection::printSnapshotTeamsInfo(remoteTeamCollection));
actors.push_back(printSnapshotTeamsInfo(remoteTeamCollection));
} }
primaryTeamCollection->teamCollections = teamCollectionsPtrs; primaryTeamCollection->teamCollections = teamCollectionsPtrs;
self->teamCollection = primaryTeamCollection.getPtr(); self->teamCollection = primaryTeamCollection.getPtr();
actors.push_back( actors.push_back(reportErrorsExcept(
reportErrorsExcept(dataDistributionTeamCollection( DDTeamCollection::run(primaryTeamCollection, initData, tcis[0], recruitStorage, *ddEnabledState),
primaryTeamCollection, initData, tcis[0], recruitStorage, ddEnabledState), "DDTeamCollectionPrimary",
"DDTeamCollectionPrimary", self->ddId,
self->ddId, &normalDDQueueErrors()));
&normalDDQueueErrors()));
actors.push_back(printSnapshotTeamsInfo(primaryTeamCollection)); actors.push_back(DDTeamCollection::printSnapshotTeamsInfo(primaryTeamCollection));
actors.push_back(yieldPromiseStream(output.getFuture(), input)); actors.push_back(yieldPromiseStream(output.getFuture(), input));
wait(waitForAll(actors)); wait(waitForAll(actors));

View File

@ -67,7 +67,7 @@ public:
if (server->serverMetrics.get().lastUpdate < now() - SERVER_KNOBS->DD_SS_STUCK_TIME_LIMIT) { if (server->serverMetrics.get().lastUpdate < now() - SERVER_KNOBS->DD_SS_STUCK_TIME_LIMIT) {
if (server->ssVersionTooFarBehind.get() == false) { if (server->ssVersionTooFarBehind.get() == false) {
TraceEvent("StorageServerStuck", server->collection->distributorId) TraceEvent("StorageServerStuck", server->collection->getDistributorId())
.detail("ServerId", server->id.toString()) .detail("ServerId", server->id.toString())
.detail("LastUpdate", server->serverMetrics.get().lastUpdate); .detail("LastUpdate", server->serverMetrics.get().lastUpdate);
server->ssVersionTooFarBehind.set(true); server->ssVersionTooFarBehind.set(true);
@ -75,7 +75,7 @@ public:
} }
} else if (server->serverMetrics.get().versionLag > SERVER_KNOBS->DD_SS_FAILURE_VERSIONLAG) { } else if (server->serverMetrics.get().versionLag > SERVER_KNOBS->DD_SS_FAILURE_VERSIONLAG) {
if (server->ssVersionTooFarBehind.get() == false) { if (server->ssVersionTooFarBehind.get() == false) {
TraceEvent(SevWarn, "SSVersionDiffLarge", server->collection->distributorId) TraceEvent(SevWarn, "SSVersionDiffLarge", server->collection->getDistributorId())
.detail("ServerId", server->id.toString()) .detail("ServerId", server->id.toString())
.detail("VersionLag", server->serverMetrics.get().versionLag); .detail("VersionLag", server->serverMetrics.get().versionLag);
server->ssVersionTooFarBehind.set(true); server->ssVersionTooFarBehind.set(true);
@ -83,7 +83,7 @@ public:
} }
} else if (server->serverMetrics.get().versionLag < SERVER_KNOBS->DD_SS_ALLOWED_VERSIONLAG) { } else if (server->serverMetrics.get().versionLag < SERVER_KNOBS->DD_SS_ALLOWED_VERSIONLAG) {
if (server->ssVersionTooFarBehind.get() == true) { if (server->ssVersionTooFarBehind.get() == true) {
TraceEvent("SSVersionDiffNormal", server->collection->distributorId) TraceEvent("SSVersionDiffNormal", server->collection->getDistributorId())
.detail("ServerId", server->id.toString()) .detail("ServerId", server->id.toString())
.detail("VersionLag", server->serverMetrics.get().versionLag); .detail("VersionLag", server->serverMetrics.get().versionLag);
server->ssVersionTooFarBehind.set(false); server->ssVersionTooFarBehind.set(false);
@ -122,6 +122,22 @@ public:
} }
}; };
TCServerInfo::TCServerInfo(StorageServerInterface ssi,
DDTeamCollection* collection,
ProcessClass processClass,
bool inDesiredDC,
Reference<LocalitySet> storageServerSet,
Version addedVersion)
: id(ssi.id()), addedVersion(addedVersion), collection(collection), lastKnownInterface(ssi),
lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()),
onRemoved(removed.getFuture()), onTSSPairRemoved(Never()), inDesiredDC(inDesiredDC),
storeType(KeyValueStoreType::END) {
if (!ssi.isTss()) {
localityEntry = ((LocalityMap<UID>*)storageServerSet.getPtr())->add(ssi.locality, &id);
}
}
Future<Void> TCServerInfo::updateServerMetrics() { Future<Void> TCServerInfo::updateServerMetrics() {
return TCServerInfoImpl::updateServerMetrics(this); return TCServerInfoImpl::updateServerMetrics(this);
} }
@ -140,6 +156,208 @@ TCServerInfo::~TCServerInfo() {
} }
} }
Reference<TCMachineInfo> TCMachineInfo::clone() const {
auto result = Reference<TCMachineInfo>(new TCMachineInfo);
result->serversOnMachine = serversOnMachine;
result->machineID = machineID;
result->machineTeams = machineTeams;
result->localityEntry = localityEntry;
return result;
}
TCMachineInfo::TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry) : localityEntry(entry) {
ASSERT(serversOnMachine.empty());
serversOnMachine.push_back(server);
LocalityData& locality = server->lastKnownInterface.locality;
ASSERT(locality.zoneId().present());
machineID = locality.zoneId().get();
}
std::string TCMachineInfo::getServersIDStr() const {
std::stringstream ss;
if (serversOnMachine.empty())
return "[unset]";
for (const auto& server : serversOnMachine) {
ss << server->getId().toString() << " ";
}
return std::move(ss).str();
}
TCMachineTeamInfo::TCMachineTeamInfo(std::vector<Reference<TCMachineInfo>> const& machines)
: machines(machines), id(deterministicRandom()->randomUniqueID()) {
machineIDs.reserve(machines.size());
for (int i = 0; i < machines.size(); i++) {
machineIDs.push_back(machines[i]->machineID);
}
sort(machineIDs.begin(), machineIDs.end());
}
std::string TCMachineTeamInfo::getMachineIDsStr() const {
std::stringstream ss;
if (machineIDs.empty())
return "[unset]";
for (const auto& id : machineIDs) {
ss << id.contents().toString() << " ";
}
return std::move(ss).str();
}
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers)
: servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY),
id(deterministicRandom()->randomUniqueID()) {
if (servers.empty()) {
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log();
}
serverIDs.reserve(servers.size());
for (int i = 0; i < servers.size(); i++) {
serverIDs.push_back(servers[i]->getId());
}
}
std::vector<StorageServerInterface> TCTeamInfo::getLastKnownServerInterfaces() const {
std::vector<StorageServerInterface> v;
v.reserve(servers.size());
for (const auto& server : servers) {
v.push_back(server->lastKnownInterface);
}
return v;
}
std::string TCTeamInfo::getServerIDsStr() const {
std::stringstream ss;
if (serverIDs.empty())
return "[unset]";
for (const auto& id : serverIDs) {
ss << id.toString() << " ";
}
return std::move(ss).str();
}
void TCTeamInfo::addDataInFlightToTeam(int64_t delta) {
for (int i = 0; i < servers.size(); i++)
servers[i]->dataInFlightToServer += delta;
}
int64_t TCTeamInfo::getDataInFlightToTeam() const {
int64_t dataInFlight = 0.0;
for (int i = 0; i < servers.size(); i++)
dataInFlight += servers[i]->dataInFlightToServer;
return dataInFlight;
}
int64_t TCTeamInfo::getLoadBytes(bool includeInFlight, double inflightPenalty) const {
int64_t physicalBytes = getLoadAverage();
double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight);
int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0;
double availableSpaceMultiplier =
SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF /
(std::max(std::min(SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio), 0.000001));
if (servers.size() > 2) {
// make sure in triple replication the penalty is high enough that you will always avoid a team with a
// member at 20% free space
availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier;
}
if (minAvailableSpaceRatio < SERVER_KNOBS->TARGET_AVAILABLE_SPACE_RATIO) {
TraceEvent(SevWarn, "DiskNearCapacity").suppressFor(1.0).detail("AvailableSpaceRatio", minAvailableSpaceRatio);
}
return (physicalBytes + (inflightPenalty * inFlightBytes)) * availableSpaceMultiplier;
}
int64_t TCTeamInfo::getMinAvailableSpace(bool includeInFlight) const {
int64_t minAvailableSpace = std::numeric_limits<int64_t>::max();
for (const auto& server : servers) {
if (server->serverMetrics.present()) {
auto& replyValue = server->serverMetrics.get();
ASSERT(replyValue.available.bytes >= 0);
ASSERT(replyValue.capacity.bytes >= 0);
int64_t bytesAvailable = replyValue.available.bytes;
if (includeInFlight) {
bytesAvailable -= server->dataInFlightToServer;
}
minAvailableSpace = std::min(bytesAvailable, minAvailableSpace);
}
}
return minAvailableSpace; // Could be negative
}
double TCTeamInfo::getMinAvailableSpaceRatio(bool includeInFlight) const {
double minRatio = 1.0;
for (const auto& server : servers) {
if (server->serverMetrics.present()) {
auto& replyValue = server->serverMetrics.get();
ASSERT(replyValue.available.bytes >= 0);
ASSERT(replyValue.capacity.bytes >= 0);
int64_t bytesAvailable = replyValue.available.bytes;
if (includeInFlight) {
bytesAvailable = std::max((int64_t)0, bytesAvailable - server->dataInFlightToServer);
}
if (replyValue.capacity.bytes == 0)
minRatio = 0;
else
minRatio = std::min(minRatio, ((double)bytesAvailable) / replyValue.capacity.bytes);
}
}
return minRatio;
}
bool TCTeamInfo::hasHealthyAvailableSpace(double minRatio) const {
return getMinAvailableSpaceRatio() >= minRatio && getMinAvailableSpace() > SERVER_KNOBS->MIN_AVAILABLE_SPACE;
}
bool TCTeamInfo::isOptimal() const {
for (const auto& server : servers) {
if (server->lastKnownClass.machineClassFitness(ProcessClass::Storage) > ProcessClass::UnsetFit) {
return false;
}
}
return true;
}
bool TCTeamInfo::hasServer(const UID& server) const {
return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end();
}
void TCTeamInfo::addServers(const std::vector<UID>& servers) {
serverIDs.reserve(servers.size());
for (int i = 0; i < servers.size(); i++) {
serverIDs.push_back(servers[i]);
}
}
int64_t TCTeamInfo::getLoadAverage() const {
int64_t bytesSum = 0;
int added = 0;
for (int i = 0; i < servers.size(); i++)
if (servers[i]->serverMetrics.present()) {
added++;
bytesSum += servers[i]->serverMetrics.get().load.bytes;
}
if (added < servers.size())
bytesSum *= 2;
return added == 0 ? 0 : bytesSum / added;
}
Future<Void> TCTeamInfo::updateStorageMetrics() { Future<Void> TCTeamInfo::updateStorageMetrics() {
return TCTeamInfoImpl::updateStorageMetrics(this); return TCTeamInfoImpl::updateStorageMetrics(this);
} }

View File

@ -28,9 +28,9 @@ class TCMachineTeamInfo;
class TCServerInfo : public ReferenceCounted<TCServerInfo> { class TCServerInfo : public ReferenceCounted<TCServerInfo> {
friend class TCServerInfoImpl; friend class TCServerInfoImpl;
UID id;
public: public:
UID id;
Version addedVersion; // Read version when this Server is added Version addedVersion; // Read version when this Server is added
DDTeamCollection* collection; DDTeamCollection* collection;
StorageServerInterface lastKnownInterface; StorageServerInterface lastKnownInterface;
@ -61,16 +61,9 @@ public:
ProcessClass processClass, ProcessClass processClass,
bool inDesiredDC, bool inDesiredDC,
Reference<LocalitySet> storageServerSet, Reference<LocalitySet> storageServerSet,
Version addedVersion = 0) Version addedVersion = 0);
: id(ssi.id()), addedVersion(addedVersion), collection(collection), lastKnownInterface(ssi),
lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()),
onRemoved(removed.getFuture()), onTSSPairRemoved(Never()), inDesiredDC(inDesiredDC),
storeType(KeyValueStoreType::END) {
if (!ssi.isTss()) { UID const& getId() const { return id; }
localityEntry = ((LocalityMap<UID>*)storageServerSet.getPtr())->add(ssi.locality, &id);
}
}
bool isCorrectStoreType(KeyValueStoreType configStoreType) const { bool isCorrectStoreType(KeyValueStoreType configStoreType) const {
// A new storage server's store type may not be set immediately. // A new storage server's store type may not be set immediately.
@ -96,35 +89,11 @@ public:
std::vector<Reference<TCMachineTeamInfo>> machineTeams; // SOMEDAY: split good and bad machine teams. std::vector<Reference<TCMachineTeamInfo>> machineTeams; // SOMEDAY: split good and bad machine teams.
LocalityEntry localityEntry; LocalityEntry localityEntry;
Reference<TCMachineInfo> clone() const { Reference<TCMachineInfo> clone() const;
auto result = Reference<TCMachineInfo>(new TCMachineInfo);
result->serversOnMachine = serversOnMachine;
result->machineID = machineID;
result->machineTeams = machineTeams;
result->localityEntry = localityEntry;
return result;
}
explicit TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry) : localityEntry(entry) { explicit TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry);
ASSERT(serversOnMachine.empty());
serversOnMachine.push_back(server);
LocalityData& locality = server->lastKnownInterface.locality; std::string getServersIDStr() const;
ASSERT(locality.zoneId().present());
machineID = locality.zoneId().get();
}
std::string getServersIDStr() const {
std::stringstream ss;
if (serversOnMachine.empty())
return "[unset]";
for (const auto& server : serversOnMachine) {
ss << server->id.toString() << " ";
}
return std::move(ss).str();
}
}; };
// TeamCollection's machine team information // TeamCollection's machine team information
@ -135,32 +104,14 @@ public:
std::vector<Reference<TCTeamInfo>> serverTeams; std::vector<Reference<TCTeamInfo>> serverTeams;
UID id; UID id;
explicit TCMachineTeamInfo(std::vector<Reference<TCMachineInfo>> const& machines) explicit TCMachineTeamInfo(std::vector<Reference<TCMachineInfo>> const& machines);
: machines(machines), id(deterministicRandom()->randomUniqueID()) {
machineIDs.reserve(machines.size());
for (int i = 0; i < machines.size(); i++) {
machineIDs.push_back(machines[i]->machineID);
}
sort(machineIDs.begin(), machineIDs.end());
}
int size() const { int size() const {
ASSERT(machines.size() == machineIDs.size()); ASSERT(machines.size() == machineIDs.size());
return machineIDs.size(); return machineIDs.size();
} }
std::string getMachineIDsStr() const { std::string getMachineIDsStr() const;
std::stringstream ss;
if (machineIDs.empty())
return "[unset]";
for (const auto& id : machineIDs) {
ss << id.contents().toString() << " ";
}
return std::move(ss).str();
}
bool operator==(TCMachineTeamInfo& rhs) const { return this->machineIDs == rhs.machineIDs; } bool operator==(TCMachineTeamInfo& rhs) const { return this->machineIDs == rhs.machineIDs; }
}; };
@ -179,140 +130,38 @@ public:
Reference<TCMachineTeamInfo> machineTeam; Reference<TCMachineTeamInfo> machineTeam;
Future<Void> tracker; Future<Void> tracker;
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers) explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers);
: servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY),
id(deterministicRandom()->randomUniqueID()) {
if (servers.empty()) {
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log();
}
serverIDs.reserve(servers.size());
for (int i = 0; i < servers.size(); i++) {
serverIDs.push_back(servers[i]->id);
}
}
std::string getTeamID() const override { return id.shortString(); } std::string getTeamID() const override { return id.shortString(); }
std::vector<StorageServerInterface> getLastKnownServerInterfaces() const override { std::vector<StorageServerInterface> getLastKnownServerInterfaces() const override;
std::vector<StorageServerInterface> v;
v.reserve(servers.size());
for (const auto& server : servers) {
v.push_back(server->lastKnownInterface);
}
return v;
}
int size() const override { int size() const override {
ASSERT(servers.size() == serverIDs.size()); ASSERT(servers.size() == serverIDs.size());
return servers.size(); return servers.size();
} }
std::vector<UID> const& getServerIDs() const override { return serverIDs; } std::vector<UID> const& getServerIDs() const override { return serverIDs; }
const std::vector<Reference<TCServerInfo>>& getServers() const { return servers; } const std::vector<Reference<TCServerInfo>>& getServers() const { return servers; }
std::string getServerIDsStr() const { std::string getServerIDsStr() const;
std::stringstream ss;
if (serverIDs.empty()) void addDataInFlightToTeam(int64_t delta) override;
return "[unset]";
for (const auto& id : serverIDs) { int64_t getDataInFlightToTeam() const override;
ss << id.toString() << " ";
}
return std::move(ss).str(); int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override;
}
void addDataInFlightToTeam(int64_t delta) override { int64_t getMinAvailableSpace(bool includeInFlight = true) const override;
for (int i = 0; i < servers.size(); i++)
servers[i]->dataInFlightToServer += delta;
}
int64_t getDataInFlightToTeam() const override {
int64_t dataInFlight = 0.0;
for (int i = 0; i < servers.size(); i++)
dataInFlight += servers[i]->dataInFlightToServer;
return dataInFlight;
}
int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override { double getMinAvailableSpaceRatio(bool includeInFlight = true) const override;
int64_t physicalBytes = getLoadAverage();
double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight);
int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0;
double availableSpaceMultiplier =
SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF /
(std::max(std::min(SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio), 0.000001));
if (servers.size() > 2) {
// make sure in triple replication the penalty is high enough that you will always avoid a team with a
// member at 20% free space
availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier;
}
if (minAvailableSpaceRatio < SERVER_KNOBS->TARGET_AVAILABLE_SPACE_RATIO) { bool hasHealthyAvailableSpace(double minRatio) const override;
TraceEvent(SevWarn, "DiskNearCapacity")
.suppressFor(1.0)
.detail("AvailableSpaceRatio", minAvailableSpaceRatio);
}
return (physicalBytes + (inflightPenalty * inFlightBytes)) * availableSpaceMultiplier;
}
int64_t getMinAvailableSpace(bool includeInFlight = true) const override {
int64_t minAvailableSpace = std::numeric_limits<int64_t>::max();
for (const auto& server : servers) {
if (server->serverMetrics.present()) {
auto& replyValue = server->serverMetrics.get();
ASSERT(replyValue.available.bytes >= 0);
ASSERT(replyValue.capacity.bytes >= 0);
int64_t bytesAvailable = replyValue.available.bytes;
if (includeInFlight) {
bytesAvailable -= server->dataInFlightToServer;
}
minAvailableSpace = std::min(bytesAvailable, minAvailableSpace);
}
}
return minAvailableSpace; // Could be negative
}
double getMinAvailableSpaceRatio(bool includeInFlight = true) const override {
double minRatio = 1.0;
for (const auto& server : servers) {
if (server->serverMetrics.present()) {
auto& replyValue = server->serverMetrics.get();
ASSERT(replyValue.available.bytes >= 0);
ASSERT(replyValue.capacity.bytes >= 0);
int64_t bytesAvailable = replyValue.available.bytes;
if (includeInFlight) {
bytesAvailable = std::max((int64_t)0, bytesAvailable - server->dataInFlightToServer);
}
if (replyValue.capacity.bytes == 0)
minRatio = 0;
else
minRatio = std::min(minRatio, ((double)bytesAvailable) / replyValue.capacity.bytes);
}
}
return minRatio;
}
bool hasHealthyAvailableSpace(double minRatio) const override {
return getMinAvailableSpaceRatio() >= minRatio && getMinAvailableSpace() > SERVER_KNOBS->MIN_AVAILABLE_SPACE;
}
Future<Void> updateStorageMetrics() override; Future<Void> updateStorageMetrics() override;
bool isOptimal() const override { bool isOptimal() const override;
for (const auto& server : servers) {
if (server->lastKnownClass.machineClassFitness(ProcessClass::Storage) > ProcessClass::UnsetFit) {
return false;
}
}
return true;
}
bool isWrongConfiguration() const override { return wrongConfiguration; } bool isWrongConfiguration() const override { return wrongConfiguration; }
void setWrongConfiguration(bool wrongConfiguration) override { this->wrongConfiguration = wrongConfiguration; } void setWrongConfiguration(bool wrongConfiguration) override { this->wrongConfiguration = wrongConfiguration; }
@ -323,32 +172,12 @@ public:
void addref() override { ReferenceCounted<TCTeamInfo>::addref(); } void addref() override { ReferenceCounted<TCTeamInfo>::addref(); }
void delref() override { ReferenceCounted<TCTeamInfo>::delref(); } void delref() override { ReferenceCounted<TCTeamInfo>::delref(); }
bool hasServer(const UID& server) { bool hasServer(const UID& server) const;
return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end();
}
void addServers(const std::vector<UID>& servers) override { void addServers(const std::vector<UID>& servers) override;
serverIDs.reserve(servers.size());
for (int i = 0; i < servers.size(); i++) {
serverIDs.push_back(servers[i]);
}
}
private: private:
// Calculate an "average" of the metrics replies that we received. Penalize teams from which we did not receive all // Calculate an "average" of the metrics replies that we received. Penalize teams from which we did not receive all
// replies. // replies.
int64_t getLoadAverage() const { int64_t getLoadAverage() const;
int64_t bytesSum = 0;
int added = 0;
for (int i = 0; i < servers.size(); i++)
if (servers[i]->serverMetrics.present()) {
added++;
bytesSum += servers[i]->serverMetrics.get().load.bytes;
}
if (added < servers.size())
bytesSum *= 2;
return added == 0 ? 0 : bytesSum / added;
}
}; };