Move implementations of TC*Info methods to TCInfo.actor.cpp
This commit is contained in:
parent
2165635478
commit
6e87c01404
|
@ -122,6 +122,22 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
TCServerInfo::TCServerInfo(StorageServerInterface ssi,
|
||||
DDTeamCollection* collection,
|
||||
ProcessClass processClass,
|
||||
bool inDesiredDC,
|
||||
Reference<LocalitySet> storageServerSet,
|
||||
Version addedVersion)
|
||||
: id(ssi.id()), addedVersion(addedVersion), collection(collection), lastKnownInterface(ssi),
|
||||
lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()),
|
||||
onRemoved(removed.getFuture()), onTSSPairRemoved(Never()), inDesiredDC(inDesiredDC),
|
||||
storeType(KeyValueStoreType::END) {
|
||||
|
||||
if (!ssi.isTss()) {
|
||||
localityEntry = ((LocalityMap<UID>*)storageServerSet.getPtr())->add(ssi.locality, &id);
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> TCServerInfo::updateServerMetrics() {
|
||||
return TCServerInfoImpl::updateServerMetrics(this);
|
||||
}
|
||||
|
@ -140,6 +156,208 @@ TCServerInfo::~TCServerInfo() {
|
|||
}
|
||||
}
|
||||
|
||||
Reference<TCMachineInfo> TCMachineInfo::clone() const {
|
||||
auto result = Reference<TCMachineInfo>(new TCMachineInfo);
|
||||
result->serversOnMachine = serversOnMachine;
|
||||
result->machineID = machineID;
|
||||
result->machineTeams = machineTeams;
|
||||
result->localityEntry = localityEntry;
|
||||
return result;
|
||||
}
|
||||
|
||||
TCMachineInfo::TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry) : localityEntry(entry) {
|
||||
ASSERT(serversOnMachine.empty());
|
||||
serversOnMachine.push_back(server);
|
||||
|
||||
LocalityData& locality = server->lastKnownInterface.locality;
|
||||
ASSERT(locality.zoneId().present());
|
||||
machineID = locality.zoneId().get();
|
||||
}
|
||||
|
||||
std::string TCMachineInfo::getServersIDStr() const {
|
||||
std::stringstream ss;
|
||||
if (serversOnMachine.empty())
|
||||
return "[unset]";
|
||||
|
||||
for (const auto& server : serversOnMachine) {
|
||||
ss << server->id.toString() << " ";
|
||||
}
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
|
||||
TCMachineTeamInfo::TCMachineTeamInfo(std::vector<Reference<TCMachineInfo>> const& machines)
|
||||
: machines(machines), id(deterministicRandom()->randomUniqueID()) {
|
||||
machineIDs.reserve(machines.size());
|
||||
for (int i = 0; i < machines.size(); i++) {
|
||||
machineIDs.push_back(machines[i]->machineID);
|
||||
}
|
||||
sort(machineIDs.begin(), machineIDs.end());
|
||||
}
|
||||
|
||||
std::string TCMachineTeamInfo::getMachineIDsStr() const {
|
||||
std::stringstream ss;
|
||||
|
||||
if (machineIDs.empty())
|
||||
return "[unset]";
|
||||
|
||||
for (const auto& id : machineIDs) {
|
||||
ss << id.contents().toString() << " ";
|
||||
}
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
|
||||
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers)
|
||||
: servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY),
|
||||
id(deterministicRandom()->randomUniqueID()) {
|
||||
if (servers.empty()) {
|
||||
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log();
|
||||
}
|
||||
serverIDs.reserve(servers.size());
|
||||
for (int i = 0; i < servers.size(); i++) {
|
||||
serverIDs.push_back(servers[i]->id);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<StorageServerInterface> TCTeamInfo::getLastKnownServerInterfaces() const {
|
||||
std::vector<StorageServerInterface> v;
|
||||
v.reserve(servers.size());
|
||||
for (const auto& server : servers) {
|
||||
v.push_back(server->lastKnownInterface);
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
std::string TCTeamInfo::getServerIDsStr() const {
|
||||
std::stringstream ss;
|
||||
|
||||
if (serverIDs.empty())
|
||||
return "[unset]";
|
||||
|
||||
for (const auto& id : serverIDs) {
|
||||
ss << id.toString() << " ";
|
||||
}
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
|
||||
void TCTeamInfo::addDataInFlightToTeam(int64_t delta) {
|
||||
for (int i = 0; i < servers.size(); i++)
|
||||
servers[i]->dataInFlightToServer += delta;
|
||||
}
|
||||
|
||||
int64_t TCTeamInfo::getDataInFlightToTeam() const {
|
||||
int64_t dataInFlight = 0.0;
|
||||
for (int i = 0; i < servers.size(); i++)
|
||||
dataInFlight += servers[i]->dataInFlightToServer;
|
||||
return dataInFlight;
|
||||
}
|
||||
|
||||
int64_t TCTeamInfo::getLoadBytes(bool includeInFlight, double inflightPenalty) const {
|
||||
int64_t physicalBytes = getLoadAverage();
|
||||
double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight);
|
||||
int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0;
|
||||
double availableSpaceMultiplier =
|
||||
SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF /
|
||||
(std::max(std::min(SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio), 0.000001));
|
||||
if (servers.size() > 2) {
|
||||
// make sure in triple replication the penalty is high enough that you will always avoid a team with a
|
||||
// member at 20% free space
|
||||
availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier;
|
||||
}
|
||||
|
||||
if (minAvailableSpaceRatio < SERVER_KNOBS->TARGET_AVAILABLE_SPACE_RATIO) {
|
||||
TraceEvent(SevWarn, "DiskNearCapacity").suppressFor(1.0).detail("AvailableSpaceRatio", minAvailableSpaceRatio);
|
||||
}
|
||||
|
||||
return (physicalBytes + (inflightPenalty * inFlightBytes)) * availableSpaceMultiplier;
|
||||
}
|
||||
|
||||
int64_t TCTeamInfo::getMinAvailableSpace(bool includeInFlight) const {
|
||||
int64_t minAvailableSpace = std::numeric_limits<int64_t>::max();
|
||||
for (const auto& server : servers) {
|
||||
if (server->serverMetrics.present()) {
|
||||
auto& replyValue = server->serverMetrics.get();
|
||||
|
||||
ASSERT(replyValue.available.bytes >= 0);
|
||||
ASSERT(replyValue.capacity.bytes >= 0);
|
||||
|
||||
int64_t bytesAvailable = replyValue.available.bytes;
|
||||
if (includeInFlight) {
|
||||
bytesAvailable -= server->dataInFlightToServer;
|
||||
}
|
||||
|
||||
minAvailableSpace = std::min(bytesAvailable, minAvailableSpace);
|
||||
}
|
||||
}
|
||||
|
||||
return minAvailableSpace; // Could be negative
|
||||
}
|
||||
|
||||
double TCTeamInfo::getMinAvailableSpaceRatio(bool includeInFlight) const {
|
||||
double minRatio = 1.0;
|
||||
for (const auto& server : servers) {
|
||||
if (server->serverMetrics.present()) {
|
||||
auto& replyValue = server->serverMetrics.get();
|
||||
|
||||
ASSERT(replyValue.available.bytes >= 0);
|
||||
ASSERT(replyValue.capacity.bytes >= 0);
|
||||
|
||||
int64_t bytesAvailable = replyValue.available.bytes;
|
||||
if (includeInFlight) {
|
||||
bytesAvailable = std::max((int64_t)0, bytesAvailable - server->dataInFlightToServer);
|
||||
}
|
||||
|
||||
if (replyValue.capacity.bytes == 0)
|
||||
minRatio = 0;
|
||||
else
|
||||
minRatio = std::min(minRatio, ((double)bytesAvailable) / replyValue.capacity.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
return minRatio;
|
||||
}
|
||||
|
||||
bool TCTeamInfo::hasHealthyAvailableSpace(double minRatio) const {
|
||||
return getMinAvailableSpaceRatio() >= minRatio && getMinAvailableSpace() > SERVER_KNOBS->MIN_AVAILABLE_SPACE;
|
||||
}
|
||||
|
||||
bool TCTeamInfo::isOptimal() const {
|
||||
for (const auto& server : servers) {
|
||||
if (server->lastKnownClass.machineClassFitness(ProcessClass::Storage) > ProcessClass::UnsetFit) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TCTeamInfo::hasServer(const UID& server) const {
|
||||
return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end();
|
||||
}
|
||||
|
||||
void TCTeamInfo::addServers(const std::vector<UID>& servers) {
|
||||
serverIDs.reserve(servers.size());
|
||||
for (int i = 0; i < servers.size(); i++) {
|
||||
serverIDs.push_back(servers[i]);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t TCTeamInfo::getLoadAverage() const {
|
||||
int64_t bytesSum = 0;
|
||||
int added = 0;
|
||||
for (int i = 0; i < servers.size(); i++)
|
||||
if (servers[i]->serverMetrics.present()) {
|
||||
added++;
|
||||
bytesSum += servers[i]->serverMetrics.get().load.bytes;
|
||||
}
|
||||
|
||||
if (added < servers.size())
|
||||
bytesSum *= 2;
|
||||
|
||||
return added == 0 ? 0 : bytesSum / added;
|
||||
}
|
||||
|
||||
Future<Void> TCTeamInfo::updateStorageMetrics() {
|
||||
return TCTeamInfoImpl::updateStorageMetrics(this);
|
||||
}
|
||||
|
|
|
@ -61,16 +61,7 @@ public:
|
|||
ProcessClass processClass,
|
||||
bool inDesiredDC,
|
||||
Reference<LocalitySet> storageServerSet,
|
||||
Version addedVersion = 0)
|
||||
: id(ssi.id()), addedVersion(addedVersion), collection(collection), lastKnownInterface(ssi),
|
||||
lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()),
|
||||
onRemoved(removed.getFuture()), onTSSPairRemoved(Never()), inDesiredDC(inDesiredDC),
|
||||
storeType(KeyValueStoreType::END) {
|
||||
|
||||
if (!ssi.isTss()) {
|
||||
localityEntry = ((LocalityMap<UID>*)storageServerSet.getPtr())->add(ssi.locality, &id);
|
||||
}
|
||||
}
|
||||
Version addedVersion = 0);
|
||||
|
||||
bool isCorrectStoreType(KeyValueStoreType configStoreType) const {
|
||||
// A new storage server's store type may not be set immediately.
|
||||
|
@ -96,35 +87,11 @@ public:
|
|||
std::vector<Reference<TCMachineTeamInfo>> machineTeams; // SOMEDAY: split good and bad machine teams.
|
||||
LocalityEntry localityEntry;
|
||||
|
||||
Reference<TCMachineInfo> clone() const {
|
||||
auto result = Reference<TCMachineInfo>(new TCMachineInfo);
|
||||
result->serversOnMachine = serversOnMachine;
|
||||
result->machineID = machineID;
|
||||
result->machineTeams = machineTeams;
|
||||
result->localityEntry = localityEntry;
|
||||
return result;
|
||||
}
|
||||
Reference<TCMachineInfo> clone() const;
|
||||
|
||||
explicit TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry) : localityEntry(entry) {
|
||||
ASSERT(serversOnMachine.empty());
|
||||
serversOnMachine.push_back(server);
|
||||
explicit TCMachineInfo(Reference<TCServerInfo> server, const LocalityEntry& entry);
|
||||
|
||||
LocalityData& locality = server->lastKnownInterface.locality;
|
||||
ASSERT(locality.zoneId().present());
|
||||
machineID = locality.zoneId().get();
|
||||
}
|
||||
|
||||
std::string getServersIDStr() const {
|
||||
std::stringstream ss;
|
||||
if (serversOnMachine.empty())
|
||||
return "[unset]";
|
||||
|
||||
for (const auto& server : serversOnMachine) {
|
||||
ss << server->id.toString() << " ";
|
||||
}
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
std::string getServersIDStr() const;
|
||||
};
|
||||
|
||||
// TeamCollection's machine team information
|
||||
|
@ -135,32 +102,14 @@ public:
|
|||
std::vector<Reference<TCTeamInfo>> serverTeams;
|
||||
UID id;
|
||||
|
||||
explicit TCMachineTeamInfo(std::vector<Reference<TCMachineInfo>> const& machines)
|
||||
: machines(machines), id(deterministicRandom()->randomUniqueID()) {
|
||||
machineIDs.reserve(machines.size());
|
||||
for (int i = 0; i < machines.size(); i++) {
|
||||
machineIDs.push_back(machines[i]->machineID);
|
||||
}
|
||||
sort(machineIDs.begin(), machineIDs.end());
|
||||
}
|
||||
explicit TCMachineTeamInfo(std::vector<Reference<TCMachineInfo>> const& machines);
|
||||
|
||||
int size() const {
|
||||
ASSERT(machines.size() == machineIDs.size());
|
||||
return machineIDs.size();
|
||||
}
|
||||
|
||||
std::string getMachineIDsStr() const {
|
||||
std::stringstream ss;
|
||||
|
||||
if (machineIDs.empty())
|
||||
return "[unset]";
|
||||
|
||||
for (const auto& id : machineIDs) {
|
||||
ss << id.contents().toString() << " ";
|
||||
}
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
std::string getMachineIDsStr() const;
|
||||
|
||||
bool operator==(TCMachineTeamInfo& rhs) const { return this->machineIDs == rhs.machineIDs; }
|
||||
};
|
||||
|
@ -179,140 +128,38 @@ public:
|
|||
Reference<TCMachineTeamInfo> machineTeam;
|
||||
Future<Void> tracker;
|
||||
|
||||
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers)
|
||||
: servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY),
|
||||
id(deterministicRandom()->randomUniqueID()) {
|
||||
if (servers.empty()) {
|
||||
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log();
|
||||
}
|
||||
serverIDs.reserve(servers.size());
|
||||
for (int i = 0; i < servers.size(); i++) {
|
||||
serverIDs.push_back(servers[i]->id);
|
||||
}
|
||||
}
|
||||
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers);
|
||||
|
||||
std::string getTeamID() const override { return id.shortString(); }
|
||||
|
||||
std::vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
|
||||
std::vector<StorageServerInterface> v;
|
||||
v.reserve(servers.size());
|
||||
for (const auto& server : servers) {
|
||||
v.push_back(server->lastKnownInterface);
|
||||
}
|
||||
return v;
|
||||
}
|
||||
std::vector<StorageServerInterface> getLastKnownServerInterfaces() const override;
|
||||
|
||||
int size() const override {
|
||||
ASSERT(servers.size() == serverIDs.size());
|
||||
return servers.size();
|
||||
}
|
||||
|
||||
std::vector<UID> const& getServerIDs() const override { return serverIDs; }
|
||||
|
||||
const std::vector<Reference<TCServerInfo>>& getServers() const { return servers; }
|
||||
|
||||
std::string getServerIDsStr() const {
|
||||
std::stringstream ss;
|
||||
std::string getServerIDsStr() const;
|
||||
|
||||
if (serverIDs.empty())
|
||||
return "[unset]";
|
||||
void addDataInFlightToTeam(int64_t delta) override;
|
||||
|
||||
for (const auto& id : serverIDs) {
|
||||
ss << id.toString() << " ";
|
||||
}
|
||||
int64_t getDataInFlightToTeam() const override;
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override;
|
||||
|
||||
void addDataInFlightToTeam(int64_t delta) override {
|
||||
for (int i = 0; i < servers.size(); i++)
|
||||
servers[i]->dataInFlightToServer += delta;
|
||||
}
|
||||
int64_t getDataInFlightToTeam() const override {
|
||||
int64_t dataInFlight = 0.0;
|
||||
for (int i = 0; i < servers.size(); i++)
|
||||
dataInFlight += servers[i]->dataInFlightToServer;
|
||||
return dataInFlight;
|
||||
}
|
||||
int64_t getMinAvailableSpace(bool includeInFlight = true) const override;
|
||||
|
||||
int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override {
|
||||
int64_t physicalBytes = getLoadAverage();
|
||||
double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight);
|
||||
int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0;
|
||||
double availableSpaceMultiplier =
|
||||
SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF /
|
||||
(std::max(std::min(SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio), 0.000001));
|
||||
if (servers.size() > 2) {
|
||||
// make sure in triple replication the penalty is high enough that you will always avoid a team with a
|
||||
// member at 20% free space
|
||||
availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier;
|
||||
}
|
||||
double getMinAvailableSpaceRatio(bool includeInFlight = true) const override;
|
||||
|
||||
if (minAvailableSpaceRatio < SERVER_KNOBS->TARGET_AVAILABLE_SPACE_RATIO) {
|
||||
TraceEvent(SevWarn, "DiskNearCapacity")
|
||||
.suppressFor(1.0)
|
||||
.detail("AvailableSpaceRatio", minAvailableSpaceRatio);
|
||||
}
|
||||
|
||||
return (physicalBytes + (inflightPenalty * inFlightBytes)) * availableSpaceMultiplier;
|
||||
}
|
||||
|
||||
int64_t getMinAvailableSpace(bool includeInFlight = true) const override {
|
||||
int64_t minAvailableSpace = std::numeric_limits<int64_t>::max();
|
||||
for (const auto& server : servers) {
|
||||
if (server->serverMetrics.present()) {
|
||||
auto& replyValue = server->serverMetrics.get();
|
||||
|
||||
ASSERT(replyValue.available.bytes >= 0);
|
||||
ASSERT(replyValue.capacity.bytes >= 0);
|
||||
|
||||
int64_t bytesAvailable = replyValue.available.bytes;
|
||||
if (includeInFlight) {
|
||||
bytesAvailable -= server->dataInFlightToServer;
|
||||
}
|
||||
|
||||
minAvailableSpace = std::min(bytesAvailable, minAvailableSpace);
|
||||
}
|
||||
}
|
||||
|
||||
return minAvailableSpace; // Could be negative
|
||||
}
|
||||
|
||||
double getMinAvailableSpaceRatio(bool includeInFlight = true) const override {
|
||||
double minRatio = 1.0;
|
||||
for (const auto& server : servers) {
|
||||
if (server->serverMetrics.present()) {
|
||||
auto& replyValue = server->serverMetrics.get();
|
||||
|
||||
ASSERT(replyValue.available.bytes >= 0);
|
||||
ASSERT(replyValue.capacity.bytes >= 0);
|
||||
|
||||
int64_t bytesAvailable = replyValue.available.bytes;
|
||||
if (includeInFlight) {
|
||||
bytesAvailable = std::max((int64_t)0, bytesAvailable - server->dataInFlightToServer);
|
||||
}
|
||||
|
||||
if (replyValue.capacity.bytes == 0)
|
||||
minRatio = 0;
|
||||
else
|
||||
minRatio = std::min(minRatio, ((double)bytesAvailable) / replyValue.capacity.bytes);
|
||||
}
|
||||
}
|
||||
|
||||
return minRatio;
|
||||
}
|
||||
|
||||
bool hasHealthyAvailableSpace(double minRatio) const override {
|
||||
return getMinAvailableSpaceRatio() >= minRatio && getMinAvailableSpace() > SERVER_KNOBS->MIN_AVAILABLE_SPACE;
|
||||
}
|
||||
bool hasHealthyAvailableSpace(double minRatio) const override;
|
||||
|
||||
Future<Void> updateStorageMetrics() override;
|
||||
|
||||
bool isOptimal() const override {
|
||||
for (const auto& server : servers) {
|
||||
if (server->lastKnownClass.machineClassFitness(ProcessClass::Storage) > ProcessClass::UnsetFit) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
bool isOptimal() const override;
|
||||
|
||||
bool isWrongConfiguration() const override { return wrongConfiguration; }
|
||||
void setWrongConfiguration(bool wrongConfiguration) override { this->wrongConfiguration = wrongConfiguration; }
|
||||
|
@ -323,32 +170,12 @@ public:
|
|||
void addref() override { ReferenceCounted<TCTeamInfo>::addref(); }
|
||||
void delref() override { ReferenceCounted<TCTeamInfo>::delref(); }
|
||||
|
||||
bool hasServer(const UID& server) {
|
||||
return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end();
|
||||
}
|
||||
bool hasServer(const UID& server) const;
|
||||
|
||||
void addServers(const std::vector<UID>& servers) override {
|
||||
serverIDs.reserve(servers.size());
|
||||
for (int i = 0; i < servers.size(); i++) {
|
||||
serverIDs.push_back(servers[i]);
|
||||
}
|
||||
}
|
||||
void addServers(const std::vector<UID>& servers) override;
|
||||
|
||||
private:
|
||||
// Calculate an "average" of the metrics replies that we received. Penalize teams from which we did not receive all
|
||||
// replies.
|
||||
int64_t getLoadAverage() const {
|
||||
int64_t bytesSum = 0;
|
||||
int added = 0;
|
||||
for (int i = 0; i < servers.size(); i++)
|
||||
if (servers[i]->serverMetrics.present()) {
|
||||
added++;
|
||||
bytesSum += servers[i]->serverMetrics.get().load.bytes;
|
||||
}
|
||||
|
||||
if (added < servers.size())
|
||||
bytesSum *= 2;
|
||||
|
||||
return added == 0 ? 0 : bytesSum / added;
|
||||
}
|
||||
int64_t getLoadAverage() const;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue