diff --git a/fdbserver/TCInfo.actor.cpp b/fdbserver/TCInfo.actor.cpp index d94392fdce..aa2f90c2fb 100644 --- a/fdbserver/TCInfo.actor.cpp +++ b/fdbserver/TCInfo.actor.cpp @@ -122,6 +122,22 @@ public: } }; +TCServerInfo::TCServerInfo(StorageServerInterface ssi, + DDTeamCollection* collection, + ProcessClass processClass, + bool inDesiredDC, + Reference storageServerSet, + Version addedVersion) + : id(ssi.id()), addedVersion(addedVersion), collection(collection), lastKnownInterface(ssi), + lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), + onRemoved(removed.getFuture()), onTSSPairRemoved(Never()), inDesiredDC(inDesiredDC), + storeType(KeyValueStoreType::END) { + + if (!ssi.isTss()) { + localityEntry = ((LocalityMap*)storageServerSet.getPtr())->add(ssi.locality, &id); + } +} + Future TCServerInfo::updateServerMetrics() { return TCServerInfoImpl::updateServerMetrics(this); } @@ -140,6 +156,208 @@ TCServerInfo::~TCServerInfo() { } } +Reference TCMachineInfo::clone() const { + auto result = Reference(new TCMachineInfo); + result->serversOnMachine = serversOnMachine; + result->machineID = machineID; + result->machineTeams = machineTeams; + result->localityEntry = localityEntry; + return result; +} + +TCMachineInfo::TCMachineInfo(Reference server, const LocalityEntry& entry) : localityEntry(entry) { + ASSERT(serversOnMachine.empty()); + serversOnMachine.push_back(server); + + LocalityData& locality = server->lastKnownInterface.locality; + ASSERT(locality.zoneId().present()); + machineID = locality.zoneId().get(); +} + +std::string TCMachineInfo::getServersIDStr() const { + std::stringstream ss; + if (serversOnMachine.empty()) + return "[unset]"; + + for (const auto& server : serversOnMachine) { + ss << server->id.toString() << " "; + } + + return std::move(ss).str(); +} + +TCMachineTeamInfo::TCMachineTeamInfo(std::vector> const& machines) + : machines(machines), id(deterministicRandom()->randomUniqueID()) { + machineIDs.reserve(machines.size()); + for (int i = 0; i < machines.size(); i++) { + machineIDs.push_back(machines[i]->machineID); + } + sort(machineIDs.begin(), machineIDs.end()); +} + +std::string TCMachineTeamInfo::getMachineIDsStr() const { + std::stringstream ss; + + if (machineIDs.empty()) + return "[unset]"; + + for (const auto& id : machineIDs) { + ss << id.contents().toString() << " "; + } + + return std::move(ss).str(); +} + +TCTeamInfo::TCTeamInfo(std::vector> const& servers) + : servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY), + id(deterministicRandom()->randomUniqueID()) { + if (servers.empty()) { + TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log(); + } + serverIDs.reserve(servers.size()); + for (int i = 0; i < servers.size(); i++) { + serverIDs.push_back(servers[i]->id); + } +} + +std::vector TCTeamInfo::getLastKnownServerInterfaces() const { + std::vector v; + v.reserve(servers.size()); + for (const auto& server : servers) { + v.push_back(server->lastKnownInterface); + } + return v; +} + +std::string TCTeamInfo::getServerIDsStr() const { + std::stringstream ss; + + if (serverIDs.empty()) + return "[unset]"; + + for (const auto& id : serverIDs) { + ss << id.toString() << " "; + } + + return std::move(ss).str(); +} + +void TCTeamInfo::addDataInFlightToTeam(int64_t delta) { + for (int i = 0; i < servers.size(); i++) + servers[i]->dataInFlightToServer += delta; +} + +int64_t TCTeamInfo::getDataInFlightToTeam() const { + int64_t dataInFlight = 0.0; + for (int i = 0; i < servers.size(); i++) + dataInFlight += servers[i]->dataInFlightToServer; + return dataInFlight; +} + +int64_t TCTeamInfo::getLoadBytes(bool includeInFlight, double inflightPenalty) const { + int64_t physicalBytes = getLoadAverage(); + double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight); + int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0; + double availableSpaceMultiplier = + SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF / + (std::max(std::min(SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio), 0.000001)); + if (servers.size() > 2) { + // make sure in triple replication the penalty is high enough that you will always avoid a team with a + // member at 20% free space + availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier; + } + + if (minAvailableSpaceRatio < SERVER_KNOBS->TARGET_AVAILABLE_SPACE_RATIO) { + TraceEvent(SevWarn, "DiskNearCapacity").suppressFor(1.0).detail("AvailableSpaceRatio", minAvailableSpaceRatio); + } + + return (physicalBytes + (inflightPenalty * inFlightBytes)) * availableSpaceMultiplier; +} + +int64_t TCTeamInfo::getMinAvailableSpace(bool includeInFlight) const { + int64_t minAvailableSpace = std::numeric_limits::max(); + for (const auto& server : servers) { + if (server->serverMetrics.present()) { + auto& replyValue = server->serverMetrics.get(); + + ASSERT(replyValue.available.bytes >= 0); + ASSERT(replyValue.capacity.bytes >= 0); + + int64_t bytesAvailable = replyValue.available.bytes; + if (includeInFlight) { + bytesAvailable -= server->dataInFlightToServer; + } + + minAvailableSpace = std::min(bytesAvailable, minAvailableSpace); + } + } + + return minAvailableSpace; // Could be negative +} + +double TCTeamInfo::getMinAvailableSpaceRatio(bool includeInFlight) const { + double minRatio = 1.0; + for (const auto& server : servers) { + if (server->serverMetrics.present()) { + auto& replyValue = server->serverMetrics.get(); + + ASSERT(replyValue.available.bytes >= 0); + ASSERT(replyValue.capacity.bytes >= 0); + + int64_t bytesAvailable = replyValue.available.bytes; + if (includeInFlight) { + bytesAvailable = std::max((int64_t)0, bytesAvailable - server->dataInFlightToServer); + } + + if (replyValue.capacity.bytes == 0) + minRatio = 0; + else + minRatio = std::min(minRatio, ((double)bytesAvailable) / replyValue.capacity.bytes); + } + } + + return minRatio; +} + +bool TCTeamInfo::hasHealthyAvailableSpace(double minRatio) const { + return getMinAvailableSpaceRatio() >= minRatio && getMinAvailableSpace() > SERVER_KNOBS->MIN_AVAILABLE_SPACE; +} + +bool TCTeamInfo::isOptimal() const { + for (const auto& server : servers) { + if (server->lastKnownClass.machineClassFitness(ProcessClass::Storage) > ProcessClass::UnsetFit) { + return false; + } + } + return true; +} + +bool TCTeamInfo::hasServer(const UID& server) const { + return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end(); +} + +void TCTeamInfo::addServers(const std::vector& servers) { + serverIDs.reserve(servers.size()); + for (int i = 0; i < servers.size(); i++) { + serverIDs.push_back(servers[i]); + } +} + +int64_t TCTeamInfo::getLoadAverage() const { + int64_t bytesSum = 0; + int added = 0; + for (int i = 0; i < servers.size(); i++) + if (servers[i]->serverMetrics.present()) { + added++; + bytesSum += servers[i]->serverMetrics.get().load.bytes; + } + + if (added < servers.size()) + bytesSum *= 2; + + return added == 0 ? 0 : bytesSum / added; +} + Future TCTeamInfo::updateStorageMetrics() { return TCTeamInfoImpl::updateStorageMetrics(this); } diff --git a/fdbserver/TCInfo.h b/fdbserver/TCInfo.h index a7188942e0..590c96f3c5 100644 --- a/fdbserver/TCInfo.h +++ b/fdbserver/TCInfo.h @@ -61,16 +61,7 @@ public: ProcessClass processClass, bool inDesiredDC, Reference storageServerSet, - Version addedVersion = 0) - : id(ssi.id()), addedVersion(addedVersion), collection(collection), lastKnownInterface(ssi), - lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), - onRemoved(removed.getFuture()), onTSSPairRemoved(Never()), inDesiredDC(inDesiredDC), - storeType(KeyValueStoreType::END) { - - if (!ssi.isTss()) { - localityEntry = ((LocalityMap*)storageServerSet.getPtr())->add(ssi.locality, &id); - } - } + Version addedVersion = 0); bool isCorrectStoreType(KeyValueStoreType configStoreType) const { // A new storage server's store type may not be set immediately. @@ -96,35 +87,11 @@ public: std::vector> machineTeams; // SOMEDAY: split good and bad machine teams. LocalityEntry localityEntry; - Reference clone() const { - auto result = Reference(new TCMachineInfo); - result->serversOnMachine = serversOnMachine; - result->machineID = machineID; - result->machineTeams = machineTeams; - result->localityEntry = localityEntry; - return result; - } + Reference clone() const; - explicit TCMachineInfo(Reference server, const LocalityEntry& entry) : localityEntry(entry) { - ASSERT(serversOnMachine.empty()); - serversOnMachine.push_back(server); + explicit TCMachineInfo(Reference server, const LocalityEntry& entry); - LocalityData& locality = server->lastKnownInterface.locality; - ASSERT(locality.zoneId().present()); - machineID = locality.zoneId().get(); - } - - std::string getServersIDStr() const { - std::stringstream ss; - if (serversOnMachine.empty()) - return "[unset]"; - - for (const auto& server : serversOnMachine) { - ss << server->id.toString() << " "; - } - - return std::move(ss).str(); - } + std::string getServersIDStr() const; }; // TeamCollection's machine team information @@ -135,32 +102,14 @@ public: std::vector> serverTeams; UID id; - explicit TCMachineTeamInfo(std::vector> const& machines) - : machines(machines), id(deterministicRandom()->randomUniqueID()) { - machineIDs.reserve(machines.size()); - for (int i = 0; i < machines.size(); i++) { - machineIDs.push_back(machines[i]->machineID); - } - sort(machineIDs.begin(), machineIDs.end()); - } + explicit TCMachineTeamInfo(std::vector> const& machines); int size() const { ASSERT(machines.size() == machineIDs.size()); return machineIDs.size(); } - std::string getMachineIDsStr() const { - std::stringstream ss; - - if (machineIDs.empty()) - return "[unset]"; - - for (const auto& id : machineIDs) { - ss << id.contents().toString() << " "; - } - - return std::move(ss).str(); - } + std::string getMachineIDsStr() const; bool operator==(TCMachineTeamInfo& rhs) const { return this->machineIDs == rhs.machineIDs; } }; @@ -179,140 +128,38 @@ public: Reference machineTeam; Future tracker; - explicit TCTeamInfo(std::vector> const& servers) - : servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY), - id(deterministicRandom()->randomUniqueID()) { - if (servers.empty()) { - TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log(); - } - serverIDs.reserve(servers.size()); - for (int i = 0; i < servers.size(); i++) { - serverIDs.push_back(servers[i]->id); - } - } + explicit TCTeamInfo(std::vector> const& servers); std::string getTeamID() const override { return id.shortString(); } - std::vector getLastKnownServerInterfaces() const override { - std::vector v; - v.reserve(servers.size()); - for (const auto& server : servers) { - v.push_back(server->lastKnownInterface); - } - return v; - } + std::vector getLastKnownServerInterfaces() const override; + int size() const override { ASSERT(servers.size() == serverIDs.size()); return servers.size(); } + std::vector const& getServerIDs() const override { return serverIDs; } + const std::vector>& getServers() const { return servers; } - std::string getServerIDsStr() const { - std::stringstream ss; + std::string getServerIDsStr() const; - if (serverIDs.empty()) - return "[unset]"; + void addDataInFlightToTeam(int64_t delta) override; - for (const auto& id : serverIDs) { - ss << id.toString() << " "; - } + int64_t getDataInFlightToTeam() const override; - return std::move(ss).str(); - } + int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override; - void addDataInFlightToTeam(int64_t delta) override { - for (int i = 0; i < servers.size(); i++) - servers[i]->dataInFlightToServer += delta; - } - int64_t getDataInFlightToTeam() const override { - int64_t dataInFlight = 0.0; - for (int i = 0; i < servers.size(); i++) - dataInFlight += servers[i]->dataInFlightToServer; - return dataInFlight; - } + int64_t getMinAvailableSpace(bool includeInFlight = true) const override; - int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override { - int64_t physicalBytes = getLoadAverage(); - double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight); - int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0; - double availableSpaceMultiplier = - SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF / - (std::max(std::min(SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio), 0.000001)); - if (servers.size() > 2) { - // make sure in triple replication the penalty is high enough that you will always avoid a team with a - // member at 20% free space - availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier; - } + double getMinAvailableSpaceRatio(bool includeInFlight = true) const override; - if (minAvailableSpaceRatio < SERVER_KNOBS->TARGET_AVAILABLE_SPACE_RATIO) { - TraceEvent(SevWarn, "DiskNearCapacity") - .suppressFor(1.0) - .detail("AvailableSpaceRatio", minAvailableSpaceRatio); - } - - return (physicalBytes + (inflightPenalty * inFlightBytes)) * availableSpaceMultiplier; - } - - int64_t getMinAvailableSpace(bool includeInFlight = true) const override { - int64_t minAvailableSpace = std::numeric_limits::max(); - for (const auto& server : servers) { - if (server->serverMetrics.present()) { - auto& replyValue = server->serverMetrics.get(); - - ASSERT(replyValue.available.bytes >= 0); - ASSERT(replyValue.capacity.bytes >= 0); - - int64_t bytesAvailable = replyValue.available.bytes; - if (includeInFlight) { - bytesAvailable -= server->dataInFlightToServer; - } - - minAvailableSpace = std::min(bytesAvailable, minAvailableSpace); - } - } - - return minAvailableSpace; // Could be negative - } - - double getMinAvailableSpaceRatio(bool includeInFlight = true) const override { - double minRatio = 1.0; - for (const auto& server : servers) { - if (server->serverMetrics.present()) { - auto& replyValue = server->serverMetrics.get(); - - ASSERT(replyValue.available.bytes >= 0); - ASSERT(replyValue.capacity.bytes >= 0); - - int64_t bytesAvailable = replyValue.available.bytes; - if (includeInFlight) { - bytesAvailable = std::max((int64_t)0, bytesAvailable - server->dataInFlightToServer); - } - - if (replyValue.capacity.bytes == 0) - minRatio = 0; - else - minRatio = std::min(minRatio, ((double)bytesAvailable) / replyValue.capacity.bytes); - } - } - - return minRatio; - } - - bool hasHealthyAvailableSpace(double minRatio) const override { - return getMinAvailableSpaceRatio() >= minRatio && getMinAvailableSpace() > SERVER_KNOBS->MIN_AVAILABLE_SPACE; - } + bool hasHealthyAvailableSpace(double minRatio) const override; Future updateStorageMetrics() override; - bool isOptimal() const override { - for (const auto& server : servers) { - if (server->lastKnownClass.machineClassFitness(ProcessClass::Storage) > ProcessClass::UnsetFit) { - return false; - } - } - return true; - } + bool isOptimal() const override; bool isWrongConfiguration() const override { return wrongConfiguration; } void setWrongConfiguration(bool wrongConfiguration) override { this->wrongConfiguration = wrongConfiguration; } @@ -323,32 +170,12 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } - bool hasServer(const UID& server) { - return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end(); - } + bool hasServer(const UID& server) const; - void addServers(const std::vector& servers) override { - serverIDs.reserve(servers.size()); - for (int i = 0; i < servers.size(); i++) { - serverIDs.push_back(servers[i]); - } - } + void addServers(const std::vector& servers) override; private: // Calculate an "average" of the metrics replies that we received. Penalize teams from which we did not receive all // replies. - int64_t getLoadAverage() const { - int64_t bytesSum = 0; - int added = 0; - for (int i = 0; i < servers.size(); i++) - if (servers[i]->serverMetrics.present()) { - added++; - bytesSum += servers[i]->serverMetrics.get().load.bytes; - } - - if (added < servers.size()) - bytesSum *= 2; - - return added == 0 ? 0 : bytesSum / added; - } + int64_t getLoadAverage() const; };