Make IDataDistributionTeam const-correct
This commit is contained in:
parent
9416e9139e
commit
9a2ce4c981
|
@ -173,21 +173,19 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() {
|
||||
vector<StorageServerInterface> v;
|
||||
v.reserve(servers.size());
|
||||
for(int i=0; i<servers.size(); i++)
|
||||
v.push_back(servers[i]->lastKnownInterface);
|
||||
vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
|
||||
vector<StorageServerInterface> v(servers.size());
|
||||
for (const auto& server : servers) v.push_back(server->lastKnownInterface);
|
||||
return v;
|
||||
}
|
||||
virtual int size() {
|
||||
int size() const override {
|
||||
ASSERT(servers.size() == serverIDs.size());
|
||||
return servers.size();
|
||||
}
|
||||
virtual vector<UID> const& getServerIDs() { return serverIDs; }
|
||||
vector<UID> const& getServerIDs() const override { return serverIDs; }
|
||||
const vector<Reference<TCServerInfo>>& getServers() { return servers; }
|
||||
|
||||
virtual std::string getServerIDsStr() {
|
||||
std::string getServerIDsStr() const override {
|
||||
std::stringstream ss;
|
||||
|
||||
if (serverIDs.empty()) return "[unset]";
|
||||
|
@ -199,18 +197,18 @@ public:
|
|||
return ss.str();
|
||||
}
|
||||
|
||||
virtual void addDataInFlightToTeam( int64_t delta ) {
|
||||
void addDataInFlightToTeam(int64_t delta) override {
|
||||
for(int i=0; i<servers.size(); i++)
|
||||
servers[i]->dataInFlightToServer += delta;
|
||||
}
|
||||
virtual int64_t getDataInFlightToTeam() {
|
||||
int64_t getDataInFlightToTeam() const override {
|
||||
int64_t dataInFlight = 0.0;
|
||||
for(int i=0; i<servers.size(); i++)
|
||||
dataInFlight += servers[i]->dataInFlightToServer;
|
||||
return dataInFlight;
|
||||
}
|
||||
|
||||
virtual int64_t getLoadBytes( bool includeInFlight = true, double inflightPenalty = 1.0 ) {
|
||||
int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override {
|
||||
int64_t physicalBytes = getLoadAverage();
|
||||
double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight);
|
||||
int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0;
|
||||
|
@ -227,18 +225,18 @@ public:
|
|||
return (physicalBytes + (inflightPenalty*inFlightBytes)) * availableSpaceMultiplier;
|
||||
}
|
||||
|
||||
virtual int64_t getMinAvailableSpace( bool includeInFlight = true ) {
|
||||
int64_t getMinAvailableSpace(bool includeInFlight = true) const override {
|
||||
int64_t minAvailableSpace = std::numeric_limits<int64_t>::max();
|
||||
for(int i=0; i<servers.size(); i++) {
|
||||
if( servers[i]->serverMetrics.present() ) {
|
||||
auto& replyValue = servers[i]->serverMetrics.get();
|
||||
for (const auto& server : servers) {
|
||||
if (server->serverMetrics.present()) {
|
||||
auto& replyValue = server->serverMetrics.get();
|
||||
|
||||
ASSERT(replyValue.available.bytes >= 0);
|
||||
ASSERT(replyValue.capacity.bytes >= 0);
|
||||
|
||||
int64_t bytesAvailable = replyValue.available.bytes;
|
||||
if(includeInFlight) {
|
||||
bytesAvailable -= servers[i]->dataInFlightToServer;
|
||||
bytesAvailable -= server->dataInFlightToServer;
|
||||
}
|
||||
|
||||
minAvailableSpace = std::min(bytesAvailable, minAvailableSpace);
|
||||
|
@ -248,18 +246,18 @@ public:
|
|||
return minAvailableSpace; // Could be negative
|
||||
}
|
||||
|
||||
virtual double getMinAvailableSpaceRatio( bool includeInFlight = true ) {
|
||||
double getMinAvailableSpaceRatio(bool includeInFlight = true) const override {
|
||||
double minRatio = 1.0;
|
||||
for(int i=0; i<servers.size(); i++) {
|
||||
if( servers[i]->serverMetrics.present() ) {
|
||||
auto& replyValue = servers[i]->serverMetrics.get();
|
||||
for (const auto& server : servers) {
|
||||
if (server->serverMetrics.present()) {
|
||||
auto& replyValue = server->serverMetrics.get();
|
||||
|
||||
ASSERT(replyValue.available.bytes >= 0);
|
||||
ASSERT(replyValue.capacity.bytes >= 0);
|
||||
|
||||
int64_t bytesAvailable = replyValue.available.bytes;
|
||||
if(includeInFlight) {
|
||||
bytesAvailable = std::max((int64_t)0, bytesAvailable - servers[i]->dataInFlightToServer);
|
||||
bytesAvailable = std::max((int64_t)0, bytesAvailable - server->dataInFlightToServer);
|
||||
}
|
||||
|
||||
if(replyValue.capacity.bytes == 0)
|
||||
|
@ -272,29 +270,27 @@ public:
|
|||
return minRatio;
|
||||
}
|
||||
|
||||
virtual bool hasHealthyAvailableSpace(double minRatio) {
|
||||
bool hasHealthyAvailableSpace(double minRatio) const override {
|
||||
return getMinAvailableSpaceRatio() >= minRatio && getMinAvailableSpace() > SERVER_KNOBS->MIN_AVAILABLE_SPACE;
|
||||
}
|
||||
|
||||
virtual Future<Void> updateStorageMetrics() {
|
||||
return doUpdateStorageMetrics( this );
|
||||
}
|
||||
Future<Void> updateStorageMetrics() override { return doUpdateStorageMetrics(this); }
|
||||
|
||||
virtual bool isOptimal() {
|
||||
for(int i=0; i<servers.size(); i++) {
|
||||
if( servers[i]->lastKnownClass.machineClassFitness( ProcessClass::Storage ) > ProcessClass::UnsetFit ) {
|
||||
bool isOptimal() const override {
|
||||
for (const auto& server : servers) {
|
||||
if (server->lastKnownClass.machineClassFitness(ProcessClass::Storage) > ProcessClass::UnsetFit) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual bool isWrongConfiguration() { return wrongConfiguration; }
|
||||
virtual void setWrongConfiguration(bool wrongConfiguration) { this->wrongConfiguration = wrongConfiguration; }
|
||||
virtual bool isHealthy() { return healthy; }
|
||||
virtual void setHealthy(bool h) { healthy = h; }
|
||||
virtual int getPriority() { return priority; }
|
||||
virtual void setPriority(int p) { priority = p; }
|
||||
bool isWrongConfiguration() const override { return wrongConfiguration; }
|
||||
void setWrongConfiguration(bool wrongConfiguration) override { this->wrongConfiguration = wrongConfiguration; }
|
||||
bool isHealthy() const override { return healthy; }
|
||||
void setHealthy(bool h) override { healthy = h; }
|
||||
int getPriority() const override { return priority; }
|
||||
void setPriority(int p) override { priority = p; }
|
||||
virtual void addref() { ReferenceCounted<TCTeamInfo>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<TCTeamInfo>::delref(); }
|
||||
|
||||
|
@ -307,7 +303,7 @@ public:
|
|||
|
||||
private:
|
||||
// Calculate an "average" of the metrics replies that we received. Penalize teams from which we did not receive all replies.
|
||||
int64_t getLoadAverage() {
|
||||
int64_t getLoadAverage() const {
|
||||
int64_t bytesSum = 0;
|
||||
int added = 0;
|
||||
for(int i=0; i<servers.size(); i++)
|
||||
|
|
|
@ -38,28 +38,28 @@ struct RelocateShard {
|
|||
};
|
||||
|
||||
struct IDataDistributionTeam {
|
||||
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() = 0;
|
||||
virtual int size() = 0;
|
||||
virtual vector<UID> const& getServerIDs() = 0;
|
||||
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() const = 0;
|
||||
virtual int size() const = 0;
|
||||
virtual vector<UID> const& getServerIDs() const = 0;
|
||||
virtual void addDataInFlightToTeam( int64_t delta ) = 0;
|
||||
virtual int64_t getDataInFlightToTeam() = 0;
|
||||
virtual int64_t getLoadBytes( bool includeInFlight = true, double inflightPenalty = 1.0 ) = 0;
|
||||
virtual int64_t getMinAvailableSpace( bool includeInFlight = true ) = 0;
|
||||
virtual double getMinAvailableSpaceRatio( bool includeInFlight = true ) = 0;
|
||||
virtual bool hasHealthyAvailableSpace( double minRatio ) = 0;
|
||||
virtual int64_t getDataInFlightToTeam() const = 0;
|
||||
virtual int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
|
||||
virtual int64_t getMinAvailableSpace(bool includeInFlight = true) const = 0;
|
||||
virtual double getMinAvailableSpaceRatio(bool includeInFlight = true) const = 0;
|
||||
virtual bool hasHealthyAvailableSpace(double minRatio) const = 0;
|
||||
virtual Future<Void> updateStorageMetrics() = 0;
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
virtual bool isHealthy() = 0;
|
||||
virtual bool isHealthy() const = 0;
|
||||
virtual void setHealthy(bool) = 0;
|
||||
virtual int getPriority() = 0;
|
||||
virtual int getPriority() const = 0;
|
||||
virtual void setPriority(int) = 0;
|
||||
virtual bool isOptimal() = 0;
|
||||
virtual bool isWrongConfiguration() = 0;
|
||||
virtual bool isOptimal() const = 0;
|
||||
virtual bool isWrongConfiguration() const = 0;
|
||||
virtual void setWrongConfiguration(bool) = 0;
|
||||
virtual void addServers(const vector<UID> &servers) = 0;
|
||||
|
||||
std::string getDesc() {
|
||||
std::string getDesc() const {
|
||||
const auto& servers = getLastKnownServerInterfaces();
|
||||
std::string s = format("Size %d; ", servers.size());
|
||||
for(int i=0; i<servers.size(); i++) {
|
||||
|
|
|
@ -84,7 +84,6 @@ struct RelocateData {
|
|||
class ParallelTCInfo : public ReferenceCounted<ParallelTCInfo>, public IDataDistributionTeam {
|
||||
public:
|
||||
vector<Reference<IDataDistributionTeam>> teams;
|
||||
vector<UID> tempServerIDs;
|
||||
|
||||
ParallelTCInfo() { }
|
||||
|
||||
|
@ -96,47 +95,44 @@ public:
|
|||
teams.clear();
|
||||
}
|
||||
|
||||
int64_t sum(std::function<int64_t(Reference<IDataDistributionTeam>)> func) {
|
||||
int64_t sum(std::function<int64_t(IDataDistributionTeam const&)> func) const {
|
||||
int64_t result = 0;
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
result += func(*it);
|
||||
for (const auto& team : teams) {
|
||||
result += func(*team);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
vector<T> collect(std::function < vector<T>(Reference<IDataDistributionTeam>)> func) {
|
||||
vector<T> result;
|
||||
template <class T>
|
||||
vector<T> collect(std::function<vector<T>(IDataDistributionTeam const&)> func) const {
|
||||
vector<T> result(teams.size());
|
||||
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
vector<T> newItems = func(*it);
|
||||
for (const auto& team : teams) {
|
||||
vector<T> newItems = func(*team);
|
||||
result.insert(result.end(), newItems.begin(), newItems.end());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool any(std::function<bool(Reference<IDataDistributionTeam>)> func) {
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
if (func(*it)) {
|
||||
bool any(std::function<bool(IDataDistributionTeam const&)> func) const {
|
||||
for (const auto& team : teams) {
|
||||
if (func(*team)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool all(std::function<bool(Reference<IDataDistributionTeam>)> func) {
|
||||
return !any([func](Reference<IDataDistributionTeam> team) {
|
||||
return !func(team);
|
||||
});
|
||||
bool all(std::function<bool(IDataDistributionTeam const&)> func) const {
|
||||
return !any([func](IDataDistributionTeam const& team) { return !func(team); });
|
||||
}
|
||||
|
||||
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() {
|
||||
return collect<StorageServerInterface>([](Reference<IDataDistributionTeam> team) {
|
||||
return team->getLastKnownServerInterfaces();
|
||||
});
|
||||
vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
|
||||
return collect<StorageServerInterface>(
|
||||
[](IDataDistributionTeam const& team) { return team.getLastKnownServerInterfaces(); });
|
||||
}
|
||||
|
||||
virtual int size() {
|
||||
int size() const override {
|
||||
int totalSize = 0;
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
totalSize += (*it)->size();
|
||||
|
@ -144,94 +140,85 @@ public:
|
|||
return totalSize;
|
||||
}
|
||||
|
||||
virtual vector<UID> const& getServerIDs() {
|
||||
vector<UID> const& getServerIDs() const override {
|
||||
static vector<UID> tempServerIDs;
|
||||
tempServerIDs.clear();
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
vector<UID> const& childIDs = (*it)->getServerIDs();
|
||||
for (const auto& team : teams) {
|
||||
vector<UID> const &childIDs = team->getServerIDs();
|
||||
tempServerIDs.insert(tempServerIDs.end(), childIDs.begin(), childIDs.end());
|
||||
}
|
||||
return tempServerIDs;
|
||||
}
|
||||
|
||||
virtual void addDataInFlightToTeam(int64_t delta) {
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
(*it)->addDataInFlightToTeam(delta);
|
||||
void addDataInFlightToTeam(int64_t delta) override {
|
||||
for (auto& team : teams) {
|
||||
team->addDataInFlightToTeam(delta);
|
||||
}
|
||||
}
|
||||
|
||||
virtual int64_t getDataInFlightToTeam() {
|
||||
return sum([](Reference<IDataDistributionTeam> team) {
|
||||
return team->getDataInFlightToTeam();
|
||||
int64_t getDataInFlightToTeam() const override {
|
||||
return sum([](IDataDistributionTeam const& team) { return team.getDataInFlightToTeam(); });
|
||||
}
|
||||
|
||||
int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override {
|
||||
return sum([includeInFlight, inflightPenalty](IDataDistributionTeam const& team) {
|
||||
return team.getLoadBytes(includeInFlight, inflightPenalty);
|
||||
});
|
||||
}
|
||||
|
||||
virtual int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0 ) {
|
||||
return sum([includeInFlight, inflightPenalty](Reference<IDataDistributionTeam> team) {
|
||||
return team->getLoadBytes(includeInFlight, inflightPenalty);
|
||||
});
|
||||
}
|
||||
|
||||
virtual int64_t getMinAvailableSpace(bool includeInFlight = true) {
|
||||
int64_t getMinAvailableSpace(bool includeInFlight = true) const override {
|
||||
int64_t result = std::numeric_limits<int64_t>::max();
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
result = std::min(result, (*it)->getMinAvailableSpace(includeInFlight));
|
||||
for (const auto& team : teams) {
|
||||
result = std::min(result, team->getMinAvailableSpace(includeInFlight));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
virtual double getMinAvailableSpaceRatio(bool includeInFlight = true) {
|
||||
double getMinAvailableSpaceRatio(bool includeInFlight = true) const override {
|
||||
double result = std::numeric_limits<double>::max();
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
result = std::min(result, (*it)->getMinAvailableSpaceRatio(includeInFlight));
|
||||
for (const auto& team : teams) {
|
||||
result = std::min(result, team->getMinAvailableSpaceRatio(includeInFlight));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
virtual bool hasHealthyAvailableSpace(double minRatio) {
|
||||
return all([minRatio](Reference<IDataDistributionTeam> team) {
|
||||
return team->hasHealthyAvailableSpace(minRatio);
|
||||
});
|
||||
bool hasHealthyAvailableSpace(double minRatio) const {
|
||||
return all([minRatio](IDataDistributionTeam const& team) { return team.hasHealthyAvailableSpace(minRatio); });
|
||||
}
|
||||
|
||||
virtual Future<Void> updateStorageMetrics() {
|
||||
vector<Future<Void>> futures;
|
||||
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
futures.push_back((*it)->updateStorageMetrics());
|
||||
for (auto& team : teams) {
|
||||
futures.push_back(team->updateStorageMetrics());
|
||||
}
|
||||
return waitForAll(futures);
|
||||
}
|
||||
|
||||
virtual bool isOptimal() {
|
||||
return all([](Reference<IDataDistributionTeam> team) {
|
||||
return team->isOptimal();
|
||||
});
|
||||
bool isOptimal() const override {
|
||||
return all([](IDataDistributionTeam const& team) { return team.isOptimal(); });
|
||||
}
|
||||
|
||||
virtual bool isWrongConfiguration() {
|
||||
return any([](Reference<IDataDistributionTeam> team) {
|
||||
return team->isWrongConfiguration();
|
||||
});
|
||||
bool isWrongConfiguration() const override {
|
||||
return any([](IDataDistributionTeam const& team) { return team.isWrongConfiguration(); });
|
||||
}
|
||||
virtual void setWrongConfiguration(bool wrongConfiguration) {
|
||||
void setWrongConfiguration(bool wrongConfiguration) override {
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
(*it)->setWrongConfiguration(wrongConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool isHealthy() {
|
||||
return all([](Reference<IDataDistributionTeam> team) {
|
||||
return team->isHealthy();
|
||||
});
|
||||
bool isHealthy() const override {
|
||||
return all([](IDataDistributionTeam const& team) { return team.isHealthy(); });
|
||||
}
|
||||
|
||||
virtual void setHealthy(bool h) {
|
||||
void setHealthy(bool h) override {
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
(*it)->setHealthy(h);
|
||||
}
|
||||
}
|
||||
|
||||
virtual int getPriority() {
|
||||
int getPriority() const override {
|
||||
int priority = 0;
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
priority = std::max(priority, (*it)->getPriority());
|
||||
|
@ -239,7 +226,7 @@ public:
|
|||
return priority;
|
||||
}
|
||||
|
||||
virtual void setPriority(int p) {
|
||||
void setPriority(int p) override {
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
(*it)->setPriority(p);
|
||||
}
|
||||
|
@ -247,7 +234,7 @@ public:
|
|||
virtual void addref() { ReferenceCounted<ParallelTCInfo>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<ParallelTCInfo>::delref(); }
|
||||
|
||||
virtual void addServers(const std::vector<UID>& servers) {
|
||||
void addServers(const std::vector<UID>& servers) override {
|
||||
ASSERT(!teams.empty());
|
||||
teams[0]->addServers(servers);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue