Merge pull request #5385 from sfc-gh-tclinkenbeard/debug-dd

Capture deep copy of `machine_info` in `printSnapshotTeamsInfo`
This commit is contained in:
Trevor Clinkenbeard 2021-08-20 13:25:50 -07:00 committed by GitHub
commit 66df75c570
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 140 additions and 98 deletions

View File

@ -787,6 +787,7 @@ public:
}
bool operator==(const RequestStream<T>& rhs) const { return queue == rhs.queue; }
bool operator!=(const RequestStream<T>& rhs) const { return !(*this == rhs); }
bool isEmpty() const { return !queue->isReady(); }
uint32_t size() const { return queue->size(); }

View File

@ -40,6 +40,7 @@
#include "fdbserver/WaitFailure.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/BooleanParam.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -49,6 +50,20 @@ class TCTeamInfo;
struct TCMachineInfo;
class TCMachineTeamInfo;
namespace {
// Helper function for STL containers, with flow-friendly error handling
template <class MapContainer, class K>
auto get(MapContainer& m, K const& k) -> decltype(m.at(k)) {
auto it = m.find(k);
ASSERT(it != m.end());
return it->second;
}
} // namespace
FDB_BOOLEAN_PARAM(IsPrimary);
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self);
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self);
ACTOR Future<Void> waitForAllDataRemoved(Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams);
@ -97,7 +112,7 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
}
}
bool isCorrectStoreType(KeyValueStoreType configStoreType) {
bool isCorrectStoreType(KeyValueStoreType configStoreType) const {
// A new storage server's store type may not be set immediately.
// If a storage server does not reply its storeType, it will be tracked by failure monitor and removed.
return (storeType == configStoreType || storeType == KeyValueStoreType::END);
@ -105,12 +120,24 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
~TCServerInfo();
};
struct TCMachineInfo : public ReferenceCounted<TCMachineInfo> {
class TCMachineInfo : public ReferenceCounted<TCMachineInfo> {
TCMachineInfo() = default;
public:
std::vector<Reference<TCServerInfo>> serversOnMachine; // SOMEDAY: change from vector to set
Standalone<StringRef> machineID;
std::vector<Reference<TCMachineTeamInfo>> machineTeams; // SOMEDAY: split good and bad machine teams.
LocalityEntry localityEntry;
Reference<TCMachineInfo> clone() const {
auto result = Reference<TCMachineInfo>(new TCMachineInfo);
result->serversOnMachine = serversOnMachine;
result->machineID = machineID;
result->machineTeams = machineTeams;
result->localityEntry = localityEntry;
return result;
}
explicit TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry) : localityEntry(entry) {
ASSERT(serversOnMachine.empty());
serversOnMachine.push_back(server);
@ -393,7 +420,7 @@ struct ServerStatus {
// If a process has reappeared without the storage server that was on it (isFailed == true), we don't need to
// exclude it We also don't need to exclude processes who are in the wrong configuration (since those servers will
// be removed)
bool excludeOnRecruit() { return !isFailed && !isWrongConfiguration; }
bool excludeOnRecruit() const { return !isFailed && !isWrongConfiguration; }
};
typedef AsyncMap<UID, ServerStatus> ServerStatusMap;
@ -601,7 +628,6 @@ Future<Void> teamTracker(struct DDTeamCollection* const& self,
struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// clang-format off
enum { REQUESTING_WORKER = 0, GETTING_WORKER = 1, GETTING_STORAGE = 2 };
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3};
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
@ -681,7 +707,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Future<Void> wrongStoreTypeRemover;
Reference<LocalitySet> storageServerSet;
std::vector<LocalityEntry> forcedEntries, resultEntries;
std::vector<DDTeamCollection*> teamCollections;
AsyncVar<Optional<Key>> healthyZone;
@ -707,13 +732,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
bool satisfiesPolicy(const std::vector<Reference<TCServerInfo>>& team, int amount = -1) {
forcedEntries.clear();
resultEntries.clear();
bool satisfiesPolicy(const std::vector<Reference<TCServerInfo>>& team, int amount = -1) const {
std::vector<LocalityEntry> forcedEntries, resultEntries;
if (amount == -1) {
amount = team.size();
}
forcedEntries.reserve(amount);
for (int i = 0; i < amount; i++) {
forcedEntries.push_back(team[i]->localityEntry);
}
@ -732,7 +757,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
Future<Void> readyToStart,
Reference<AsyncVar<bool>> zeroHealthyTeams,
bool primary,
IsPrimary primary,
Reference<AsyncVar<bool>> processingUnhealthy,
PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer,
@ -1071,7 +1096,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
int64_t getDebugTotalDataInFlight() {
int64_t getDebugTotalDataInFlight() const {
int64_t total = 0;
for (auto itr = server_info.begin(); itr != server_info.end(); ++itr)
total += itr->second->dataInFlightToServer;
@ -1136,14 +1161,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
tempMap->add(it->lastKnownInterface.locality, &it->id);
}
self->resultEntries.clear();
self->forcedEntries.clear();
std::vector<LocalityEntry> resultEntries, forcedEntries;
bool result = tempSet->selectReplicas(
self->configuration.storagePolicy, self->forcedEntries, self->resultEntries);
ASSERT(result && self->resultEntries.size() == self->configuration.storageTeamSize);
self->configuration.storagePolicy, forcedEntries, resultEntries);
ASSERT(result && resultEntries.size() == self->configuration.storageTeamSize);
serverIds.clear();
for (auto& it : self->resultEntries) {
for (auto& it : resultEntries) {
serverIds.push_back(*tempMap->getObject(it));
}
std::sort(serverIds.begin(), serverIds.end());
@ -1205,7 +1229,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Check if server or machine has a valid locality based on configured replication policy
bool isValidLocality(Reference<IReplicationPolicy> storagePolicy, const LocalityData& locality) {
bool isValidLocality(Reference<IReplicationPolicy> storagePolicy, const LocalityData& locality) const {
// Future: Once we add simulation test that misconfigure a cluster, such as not setting some locality entries,
// DD_VALIDATE_LOCALITY should always be true. Otherwise, simulation test may fail.
if (!SERVER_KNOBS->DD_VALIDATE_LOCALITY) {
@ -1223,7 +1247,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return true;
}
void evaluateTeamQuality() {
void evaluateTeamQuality() const {
int teamCount = teams.size(), serverCount = allServers.size();
double teamsPerServer = (double)teamCount * configuration.storageTeamSize / serverCount;
@ -1300,14 +1324,16 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return maxMatchingServers;
}
int overlappingMachineMembers(vector<Standalone<StringRef>>& team) {
int overlappingMachineMembers(vector<Standalone<StringRef>> const& team) const {
if (team.empty()) {
return 0;
}
int maxMatchingServers = 0;
Standalone<StringRef>& serverID = team[0];
for (auto& usedTeam : machine_info[serverID]->machineTeams) {
auto it = machine_info.find(team[0]);
ASSERT(it != machine_info.end());
auto const& machineTeams = it->second->machineTeams;
for (auto const& usedTeam : machineTeams) {
auto used = usedTeam->machineIDs;
int teamIdx = 0;
int usedIdx = 0;
@ -1333,13 +1359,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return maxMatchingServers;
}
Reference<TCMachineTeamInfo> findMachineTeam(vector<Standalone<StringRef>>& machineIDs) {
Reference<TCMachineTeamInfo> findMachineTeam(vector<Standalone<StringRef>> const& machineIDs) const {
if (machineIDs.empty()) {
return Reference<TCMachineTeamInfo>();
}
Standalone<StringRef> machineID = machineIDs[0];
for (auto& machineTeam : machine_info[machineID]->machineTeams) {
for (auto& machineTeam : get(machine_info, machineID)->machineTeams) {
if (machineTeam->machineIDs == machineIDs) {
return machineTeam;
}
@ -1473,7 +1499,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return totalServerIndex;
}
void traceConfigInfo() {
void traceConfigInfo() const {
TraceEvent("DDConfig", distributorId)
.detail("StorageTeamSize", configuration.storageTeamSize)
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER)
@ -1481,7 +1507,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("StoreType", configuration.storageServerStoreType);
}
void traceServerInfo() {
void traceServerInfo() const {
int i = 0;
TraceEvent("ServerInfo", distributorId).detail("Size", server_info.size());
@ -1499,13 +1525,14 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
TraceEvent("ServerStatus", distributorId)
.detail("ServerID", uid)
.detail("Healthy", !server_status.get(uid).isUnhealthy())
.detail("MachineIsValid", server_info[uid]->machine.isValid())
.detail("MachineIsValid", get(server_info, uid)->machine.isValid())
.detail("MachineTeamSize",
server_info[uid]->machine.isValid() ? server_info[uid]->machine->machineTeams.size() : -1);
get(server_info, uid)->machine.isValid() ? get(server_info, uid)->machine->machineTeams.size()
: -1);
}
}
void traceServerTeamInfo() {
void traceServerTeamInfo() const {
int i = 0;
TraceEvent("ServerTeamInfo", distributorId).detail("Size", teams.size());
@ -1519,7 +1546,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
void traceMachineInfo() {
void traceMachineInfo() const {
int i = 0;
TraceEvent("MachineInfo").detail("Size", machine_info.size());
@ -1534,7 +1561,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
void traceMachineTeamInfo() {
void traceMachineTeamInfo() const {
int i = 0;
TraceEvent("MachineTeamInfo", distributorId).detail("Size", machineTeams.size());
@ -1548,7 +1575,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Locality string is hashed into integer, used as KeyIndex
// For better understand which KeyIndex is used for locality, we print this info in trace.
void traceLocalityArrayIndexName() {
void traceLocalityArrayIndexName() const {
TraceEvent("LocalityRecordKeyName").detail("Size", machineLocalityMap._keymap->_lookuparray.size());
for (int i = 0; i < machineLocalityMap._keymap->_lookuparray.size(); ++i) {
TraceEvent("LocalityRecordKeyIndexName")
@ -1557,7 +1584,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
void traceMachineLocalityMap() {
void traceMachineLocalityMap() const {
int i = 0;
TraceEvent("MachineLocalityMap", distributorId).detail("Size", machineLocalityMap.size());
@ -1578,7 +1605,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) {
void traceAllInfo(bool shouldPrint = false) const {
if (!shouldPrint)
return;
@ -1783,7 +1810,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return addedMachineTeams;
}
bool isMachineTeamHealthy(vector<Standalone<StringRef>> const& machineIDs) {
bool isMachineTeamHealthy(vector<Standalone<StringRef>> const& machineIDs) const {
int healthyNum = 0;
// A healthy machine team should have the desired number of machines
@ -1791,7 +1818,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return false;
for (auto& id : machineIDs) {
auto& machine = machine_info[id];
auto& machine = get(machine_info, id);
if (isMachineHealthy(machine)) {
healthyNum++;
}
@ -1799,22 +1826,22 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return (healthyNum == machineIDs.size());
}
bool isMachineTeamHealthy(Reference<TCMachineTeamInfo> const& machineTeam) {
bool isMachineTeamHealthy(TCMachineTeamInfo const& machineTeam) const {
int healthyNum = 0;
// A healthy machine team should have the desired number of machines
if (machineTeam->size() != configuration.storageTeamSize)
if (machineTeam.size() != configuration.storageTeamSize)
return false;
for (auto& machine : machineTeam->machines) {
for (auto& machine : machineTeam.machines) {
if (isMachineHealthy(machine)) {
healthyNum++;
}
}
return (healthyNum == machineTeam->machines.size());
return (healthyNum == machineTeam.machines.size());
}
bool isMachineHealthy(Reference<TCMachineInfo> const& machine) {
bool isMachineHealthy(Reference<TCMachineInfo> const& machine) const {
if (!machine.isValid() || machine_info.find(machine->machineID) == machine_info.end() ||
machine->serversOnMachine.empty()) {
return false;
@ -1831,7 +1858,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Return the healthy server with the least number of correct-size server teams
Reference<TCServerInfo> findOneLeastUsedServer() {
Reference<TCServerInfo> findOneLeastUsedServer() const {
vector<Reference<TCServerInfo>> leastUsedServers;
int minTeams = std::numeric_limits<int>::max();
for (auto& server : server_info) {
@ -1864,11 +1891,11 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Randomly choose one machine team that has chosenServer and has the correct size
// When configuration is changed, we may have machine teams with old storageTeamSize
Reference<TCMachineTeamInfo> findOneRandomMachineTeam(Reference<TCServerInfo> chosenServer) {
if (!chosenServer->machine->machineTeams.empty()) {
Reference<TCMachineTeamInfo> findOneRandomMachineTeam(TCServerInfo const& chosenServer) const {
if (!chosenServer.machine->machineTeams.empty()) {
std::vector<Reference<TCMachineTeamInfo>> healthyMachineTeamsForChosenServer;
for (auto& mt : chosenServer->machine->machineTeams) {
if (isMachineTeamHealthy(mt)) {
for (auto& mt : chosenServer.machine->machineTeams) {
if (isMachineTeamHealthy(*mt)) {
healthyMachineTeamsForChosenServer.push_back(mt);
}
}
@ -1879,16 +1906,16 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// If we cannot find a healthy machine team
TraceEvent("NoHealthyMachineTeamForServer")
.detail("ServerID", chosenServer->id)
.detail("MachineTeams", chosenServer->machine->machineTeams.size());
.detail("ServerID", chosenServer.id)
.detail("MachineTeams", chosenServer.machine->machineTeams.size());
return Reference<TCMachineTeamInfo>();
}
// A server team should always come from servers on a machine team
// Check if it is true
bool isOnSameMachineTeam(Reference<TCTeamInfo>& team) {
bool isOnSameMachineTeam(TCTeamInfo const& team) const {
std::vector<Standalone<StringRef>> machineIDs;
for (const auto& server : team->getServers()) {
for (const auto& server : team.getServers()) {
if (!server->machine.isValid())
return false;
machineIDs.push_back(server->machine->machineID);
@ -1896,7 +1923,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
std::sort(machineIDs.begin(), machineIDs.end());
int numExistance = 0;
for (const auto& server : team->getServers()) {
for (const auto& server : team.getServers()) {
for (const auto& candidateMachineTeam : server->machine->machineTeams) {
std::sort(candidateMachineTeam->machineIDs.begin(), candidateMachineTeam->machineIDs.end());
if (machineIDs == candidateMachineTeam->machineIDs) {
@ -1905,14 +1932,14 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
}
return (numExistance == team->size());
return (numExistance == team.size());
}
// Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams
bool sanityCheckTeams() {
bool sanityCheckTeams() const {
for (auto& team : teams) {
if (isOnSameMachineTeam(team) == false) {
if (isOnSameMachineTeam(*team) == false) {
return false;
}
}
@ -1920,7 +1947,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return true;
}
int calculateHealthyServerCount() {
int calculateHealthyServerCount() const {
int serverCount = 0;
for (auto i = server_info.begin(); i != server_info.end(); ++i) {
if (!server_status.get(i->first).isUnhealthy()) {
@ -1930,7 +1957,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return serverCount;
}
int calculateHealthyMachineCount() {
int calculateHealthyMachineCount() const {
int totalHealthyMachineCount = 0;
for (auto& m : machine_info) {
if (isMachineHealthy(m.second)) {
@ -1941,7 +1968,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return totalHealthyMachineCount;
}
std::pair<int64_t, int64_t> calculateMinMaxServerTeamsOnServer() {
std::pair<int64_t, int64_t> calculateMinMaxServerTeamsOnServer() const {
int64_t minTeams = std::numeric_limits<int64_t>::max();
int64_t maxTeams = 0;
for (auto& server : server_info) {
@ -1954,7 +1981,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return std::make_pair(minTeams, maxTeams);
}
std::pair<int64_t, int64_t> calculateMinMaxMachineTeamsOnMachine() {
std::pair<int64_t, int64_t> calculateMinMaxMachineTeamsOnMachine() const {
int64_t minTeams = std::numeric_limits<int64_t>::max();
int64_t maxTeams = 0;
for (auto& machine : machine_info) {
@ -1968,7 +1995,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Sanity check
bool isServerTeamCountCorrect(Reference<TCMachineTeamInfo>& mt) {
bool isServerTeamCountCorrect(Reference<TCMachineTeamInfo> const& mt) const {
int num = 0;
bool ret = true;
for (auto& team : teams) {
@ -1987,7 +2014,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Find the machine team with the least number of server teams
std::pair<Reference<TCMachineTeamInfo>, int> getMachineTeamWithLeastProcessTeams() {
std::pair<Reference<TCMachineTeamInfo>, int> getMachineTeamWithLeastProcessTeams() const {
Reference<TCMachineTeamInfo> retMT;
int minNumProcessTeams = std::numeric_limits<int>::max();
@ -2006,7 +2033,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Find the machine team whose members are on the most number of machine teams, same logic as serverTeamRemover
std::pair<Reference<TCMachineTeamInfo>, int> getMachineTeamWithMostMachineTeams() {
std::pair<Reference<TCMachineTeamInfo>, int> getMachineTeamWithMostMachineTeams() const {
Reference<TCMachineTeamInfo> retMT;
int maxNumMachineTeams = 0;
int targetMachineTeamNumPerMachine =
@ -2030,7 +2057,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Find the server team whose members are on the most number of server teams
std::pair<Reference<TCTeamInfo>, int> getServerTeamWithMostProcessTeams() {
std::pair<Reference<TCTeamInfo>, int> getServerTeamWithMostProcessTeams() const {
Reference<TCTeamInfo> retST;
int maxNumProcessTeams = 0;
int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2;
@ -2052,10 +2079,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return std::pair<Reference<TCTeamInfo>, int>(retST, maxNumProcessTeams);
}
int getHealthyMachineTeamCount() {
int getHealthyMachineTeamCount() const {
int healthyTeamCount = 0;
for (auto mt = machineTeams.begin(); mt != machineTeams.end(); ++mt) {
ASSERT((*mt)->machines.size() == configuration.storageTeamSize);
for (const auto& mt : machineTeams) {
ASSERT(mt->machines.size() == configuration.storageTeamSize);
if (isMachineTeamHealthy(*mt)) {
++healthyTeamCount;
@ -2067,7 +2094,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Each machine is expected to have targetMachineTeamNumPerMachine
// Return true if there exists a machine that does not have enough teams.
bool notEnoughMachineTeamsForAMachine() {
bool notEnoughMachineTeamsForAMachine() const {
// If we want to remove the machine team with most machine teams, we use the same logic as
// notEnoughTeamsForAServer
int targetMachineTeamNumPerMachine =
@ -2089,7 +2116,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Each server is expected to have targetTeamNumPerServer teams.
// Return true if there exists a server that does not have enough teams.
bool notEnoughTeamsForAServer() {
bool notEnoughTeamsForAServer() const {
// We build more teams than we finally want so that we can use serverTeamRemover() actor to remove the teams
// whose member belong to too many teams. This allows us to get a more balanced number of teams per server.
// We want to ensure every server has targetTeamNumPerServer teams.
@ -2161,7 +2188,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// instead of choosing the least used machine team.
// The correlation happens, for example, when we add two new machines, we may always choose the machine
// team with these two new machines because they are typically less used.
Reference<TCMachineTeamInfo> chosenMachineTeam = findOneRandomMachineTeam(chosenServer);
Reference<TCMachineTeamInfo> chosenMachineTeam = findOneRandomMachineTeam(*chosenServer);
if (!chosenMachineTeam.isValid()) {
// We may face the situation that temporarily we have no healthy machine.
@ -2263,7 +2290,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Check if the number of server (and machine teams) is larger than the maximum allowed number
void traceTeamCollectionInfo() {
void traceTeamCollectionInfo() const {
int totalHealthyServerCount = calculateHealthyServerCount();
int desiredServerTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * totalHealthyServerCount;
int maxServerTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * totalHealthyServerCount;
@ -2433,7 +2460,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return Void();
}
void noHealthyTeams() {
void noHealthyTeams() const {
std::set<UID> desiredServerSet;
std::string desc;
for (auto i = server_info.begin(); i != server_info.end(); ++i) {
@ -2450,7 +2477,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("NonFailedServerCount", desiredServerSet.size());
}
bool shouldHandleServer(const StorageServerInterface& newServer) {
bool shouldHandleServer(const StorageServerInterface& newServer) const {
return (includedDCs.empty() ||
std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end() ||
(otherTrackedDCs.present() &&
@ -3012,7 +3039,10 @@ ACTOR Future<Void> printSnapshotTeamsInfo(Reference<DDTeamCollection> self) {
configuration = self->configuration;
server_info = self->server_info;
teams = self->teams;
machine_info = self->machine_info;
// Perform deep copy so we have a consistent snapshot, even if yields are performed
for (const auto& [machineId, info] : self->machine_info) {
machine_info.emplace(machineId, info->clone());
}
machineTeams = self->machineTeams;
// internedLocalityRecordKeyNameStrings = self->machineLocalityMap._keymap->_lookuparray;
// machineLocalityMapEntryArraySize = self->machineLocalityMap.size();
@ -3059,7 +3089,7 @@ ACTOR Future<Void> printSnapshotTeamsInfo(Reference<DDTeamCollection> self) {
const UID& uid = server->first;
TraceEvent("ServerStatus", self->distributorId)
.detail("ServerUID", uid)
.detail("Healthy", !server_status.at(uid).isUnhealthy())
.detail("Healthy", !get(server_status, uid).isUnhealthy())
.detail("MachineIsValid", server_info[uid]->machine.isValid())
.detail("MachineTeamSize",
server_info[uid]->machine.isValid() ? server_info[uid]->machine->machineTeams.size() : -1)
@ -3100,7 +3130,7 @@ ACTOR Future<Void> printSnapshotTeamsInfo(Reference<DDTeamCollection> self) {
// Healthy machine has at least one healthy server
for (auto& server : _machine->serversOnMachine) {
if (!server_status.at(server->id).isUnhealthy()) {
if (!get(server_status, server->id).isUnhealthy()) {
isMachineHealthy = true;
}
}
@ -5219,7 +5249,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
}
ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
const DDEnabledState* ddEnabledState) {
state Future<RecruitStorageReply> fCandidateWorker;
state RecruitStorageRequest lastRequest;
@ -5326,8 +5356,8 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
rsr.excludeAddresses != lastRequest.excludeAddresses ||
rsr.criticalRecruitment != lastRequest.criticalRecruitment) {
lastRequest = rsr;
fCandidateWorker = brokenPromiseToNever(
db->get().clusterInterface.recruitStorage.getReply(rsr, TaskPriority::DataDistribution));
fCandidateWorker =
brokenPromiseToNever(recruitStorage->get().getReply(rsr, TaskPriority::DataDistribution));
}
choose {
@ -5375,9 +5405,7 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
}
}
}
when(wait(db->onChange())) { // SOMEDAY: only if clusterInterface changes?
fCandidateWorker = Future<RecruitStorageReply>();
}
when(wait(recruitStorage->onChange())) { fCandidateWorker = Future<RecruitStorageReply>(); }
when(wait(self->zeroHealthyTeams->onChange())) {
if (!pendingTSSCheck && self->zeroHealthyTeams->get() &&
(self->isTssRecruiting || self->tss_info_by_pair.size() > 0)) {
@ -5519,11 +5547,12 @@ ACTOR Future<Void> monitorHealthyTeams(DDTeamCollection* self) {
}
// Keep track of servers and teams -- serves requests for getRandomTeam
ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState const* ddEnabledState) {
ACTOR Future<Void> dataDistributionTeamCollection(
Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage,
DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -5562,7 +5591,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> te
// The following actors (e.g. storageRecruiter) do not need to be assigned to a variable because
// they are always running.
self->addActor.send(storageRecruiter(self, db, ddEnabledState));
self->addActor.send(storageRecruiter(self, recruitStorage, ddEnabledState));
self->addActor.send(monitorStorageServerRecruitment(self));
self->addActor.send(waitServerListChange(self, serverRemoved.getFuture(), ddEnabledState));
self->addActor.send(trackExcludedServers(self));
@ -6040,12 +6069,14 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(),
readyToStart.getFuture(),
zeroHealthyTeams[0],
true,
IsPrimary::True,
processingUnhealthy,
getShardMetrics,
removeFailedServer,
getUnhealthyRelocationCount);
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
auto recruitStorage = IAsyncListener<RequestStream<RecruitStorageRequest>>::create(
self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; });
if (configuration.usableRegions > 1) {
remoteTeamCollection =
makeReference<DDTeamCollection>(cx,
@ -6058,7 +6089,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
Optional<std::vector<Optional<Key>>>(),
readyToStart.getFuture() && remoteRecovered(self->dbInfo),
zeroHealthyTeams[1],
false,
IsPrimary::False,
processingUnhealthy,
getShardMetrics,
removeFailedServer,
@ -6067,7 +6098,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(
reportErrorsExcept(dataDistributionTeamCollection(
remoteTeamCollection, initData, tcis[1], self->dbInfo, ddEnabledState),
remoteTeamCollection, initData, tcis[1], recruitStorage, ddEnabledState),
"DDTeamCollectionSecondary",
self->ddId,
&normalDDQueueErrors()));
@ -6075,11 +6106,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
}
primaryTeamCollection->teamCollections = teamCollectionsPtrs;
self->teamCollection = primaryTeamCollection.getPtr();
actors.push_back(reportErrorsExcept(
dataDistributionTeamCollection(primaryTeamCollection, initData, tcis[0], self->dbInfo, ddEnabledState),
"DDTeamCollectionPrimary",
self->ddId,
&normalDDQueueErrors()));
actors.push_back(
reportErrorsExcept(dataDistributionTeamCollection(
primaryTeamCollection, initData, tcis[0], recruitStorage, ddEnabledState),
"DDTeamCollectionPrimary",
self->ddId,
&normalDDQueueErrors()));
actors.push_back(printSnapshotTeamsInfo(primaryTeamCollection));
actors.push_back(yieldPromiseStream(output.getFuture(), input));
@ -6538,7 +6570,7 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
{},
Future<Void>(Void()),
makeReference<AsyncVar<bool>>(true),
true,
IsPrimary::True,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>(),
Promise<UID>(),
@ -6581,7 +6613,7 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
{},
Future<Void>(Void()),
makeReference<AsyncVar<bool>>(true),
true,
IsPrimary::True,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>(),
Promise<UID>(),

View File

@ -547,14 +547,14 @@ public:
}
}
void clear(K const& k) { set(k, V()); }
V const& get(K const& k) {
V const& get(K const& k) const {
auto it = items.find(k);
if (it != items.end())
return it->second.value;
else
return defaultValue;
}
int count(K const& k) {
int count(K const& k) const {
auto it = items.find(k);
if (it != items.end())
return 1;
@ -566,7 +566,7 @@ public:
return destroyOnCancel(this, k, item.change.getFuture());
return item.change.getFuture();
}
std::vector<K> getKeys() {
std::vector<K> getKeys() const {
std::vector<K> keys;
keys.reserve(items.size());
for (auto i = items.begin(); i != items.end(); ++i)
@ -1985,6 +1985,8 @@ public:
virtual Output const& get() const = 0;
virtual Future<Void> onChange() const = 0;
template <class Input, class F>
static Reference<IAsyncListener> create(Reference<AsyncVar<Input> const> const& input, F const& f);
template <class Input, class F>
static Reference<IAsyncListener> create(Reference<AsyncVar<Input>> const& input, F const& f);
static Reference<IAsyncListener> create(Reference<AsyncVar<Output>> const& output);
};
@ -2014,10 +2016,17 @@ public:
template <class Output>
template <class Input, class F>
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input>> const& input, F const& f) {
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input> const> const& input,
F const& f) {
return makeReference<IAsyncListenerImpl::AsyncListener<Input, Output, F>>(input, f);
}
template <class Output>
template <class Input, class F>
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input>> const& input, F const& f) {
return create(Reference<AsyncVar<Input> const>(input), f);
}
template <class Output>
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Output>> const& input) {
auto identity = [](const auto& x) { return x; };