fix uninitialized member

This commit is contained in:
Xiaoxi Wang 2022-02-28 10:22:32 -08:00
parent 40a1f562a7
commit 5e74b5006e
5 changed files with 82 additions and 47 deletions

View File

@ -245,7 +245,9 @@ public:
(!req.preferLowerUtilization ||
self->teams[currentIndex]->hasHealthyAvailableSpace(self->medianAvailableSpace))) {
int64_t loadBytes = self->teams[currentIndex]->getLoadBytes(true, req.inflightPenalty);
if ((!bestOption.present() || (req.preferLowerUtilization && loadBytes < bestLoadBytes) ||
if ((!bestOption.present() ||
((bool)req.teamSorter && req.teamSorter(bestOption.get(), self->teams[currentIndex])) ||
(req.preferLowerUtilization && loadBytes < bestLoadBytes) ||
(!req.preferLowerUtilization && loadBytes > bestLoadBytes)) &&
(!req.teamMustHaveShards ||
self->shardsAffectedByTeamFailure->hasShards(ShardsAffectedByTeamFailure::Team(

View File

@ -47,6 +47,7 @@ struct IDataDistributionTeam {
virtual void addDataInFlightToTeam(int64_t delta) = 0;
virtual int64_t getDataInFlightToTeam() const = 0;
virtual int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
virtual double getLoadReadBandwidth() const = 0;
virtual int64_t getMinAvailableSpace(bool includeInFlight = true) const = 0;
virtual double getMinAvailableSpaceRatio(bool includeInFlight = true) const = 0;
virtual bool hasHealthyAvailableSpace(double minRatio) const = 0;
@ -81,29 +82,33 @@ struct GetTeamRequest {
bool wantsTrueBest;
bool preferLowerUtilization; // = false --> higher utilization team will be returned
bool teamMustHaveShards;
bool preferLowerReadTraffic;
double inflightPenalty;
std::vector<UID> completeSources;
std::vector<UID> src;
Promise<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> reply;
// optional
typedef Reference<IDataDistributionTeam> TeamRef;
std::function<bool(TeamRef)> hardConstraint;
std::function<bool(TeamRef, TeamRef)>
teamSorter; // => true if a.score < b.score, the reply will choose the largest one
GetTeamRequest() {}
GetTeamRequest(bool wantsNewServers,
bool wantsTrueBest,
bool preferLowerUtilization,
bool teamMustHaveShards,
bool preferLowerReadTraffic = false,
double inflightPenalty = 1.0)
: wantsNewServers(wantsNewServers), wantsTrueBest(wantsTrueBest), preferLowerUtilization(preferLowerUtilization),
teamMustHaveShards(teamMustHaveShards), preferLowerReadTraffic(preferLowerReadTraffic),
inflightPenalty(inflightPenalty) {}
teamMustHaveShards(teamMustHaveShards), inflightPenalty(inflightPenalty) {}
std::string getDesc() const {
std::stringstream ss;
ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest
<< " PreferLowerUtilization:" << preferLowerUtilization << " teamMustHaveShards:" << teamMustHaveShards
<< " PreferLowerReadTraffic" << preferLowerReadTraffic << " inflightPenalty:" << inflightPenalty << ";";
<< " inflightPenalty:" << inflightPenalty << " hardConstraint: " << (bool)hardConstraint
<< " teamSorter: " << (bool)teamSorter << ";";
ss << "CompleteSources:";
for (const auto& cs : completeSources) {
ss << cs.toString() << ",";

View File

@ -174,6 +174,10 @@ public:
});
}
double getLoadReadBandwidth() const override {
return sum([](IDataDistributionTeam const& team) { return team.getLoadReadBandwidth(); });
}
int64_t getMinAvailableSpace(bool includeInFlight = true) const override {
int64_t result = std::numeric_limits<int64_t>::max();
for (const auto& team : teams) {
@ -1021,7 +1025,6 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM,
true,
false,
false,
inflightPenalty);
req.src = rd.src;
req.completeSources = rd.completeSources;
@ -1427,17 +1430,22 @@ ACTOR Future<Void> getSrcDestTeams(DDQueueData* self,
return Void();
}
bool greaterReadLoad(Reference<IDataDistributionTeam> a, Reference<IDataDistributionTeam> b) {
return a->getLoadReadBandwidth() > b->getLoadReadBandwidth();
}
ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionIndex) {
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
state int resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
state Transaction tr(self->cx);
state double lastRead = 0;
state bool skipCurrentLoop = false;
state Reference<IDataDistributionTeam> sourceTeam, destTeam;
loop {
state bool moved = false;
state bool disableReadBalance = false;
state bool disableDiskBalance = false;
state Reference<IDataDistributionTeam> sourceTeam;
state Reference<IDataDistributionTeam> destTeam;
state GetTeamRequest srcReq;
state GetTeamRequest destReq;
state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId);
@ -1457,8 +1465,7 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
// reset loop interval
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
} else if (val.present()) {
if(val.get().size() > 0) {
std::cout << val.get().toString() <<"\n";
if (val.get().size() > 0) {
int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned());
if (ddIgnore & DDIgnore::REBALANCE_DISK) {
disableReadBalance = true;
@ -1488,12 +1495,10 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
// FIXME: read balance and disk balance shouldn't be mutual exclusive in the future
srcReq = GetTeamRequest(true, true, false, true);
destReq = GetTeamRequest(true, false, true, false);
if (!disableReadBalance) {
srcReq = GetTeamRequest(true, true, false, true, false);
destReq = GetTeamRequest(true, false, true, false, false);
} else {
srcReq = GetTeamRequest(true, true, false, true);
destReq = GetTeamRequest(true, false, true, false);
srcReq.teamSorter = greaterReadLoad;
}
// clang-format off
wait(getSrcDestTeams(self, teamCollectionIndex, srcReq, destReq, sourceTeam, destTeam,
@ -1548,8 +1553,13 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
state bool skipCurrentLoop = false;
loop {
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
state bool moved = false;
state bool disableReadBalance = false;
state bool disableDiskBalance = false;
state Reference<IDataDistributionTeam> sourceTeam;
state Reference<IDataDistributionTeam> destTeam;
state GetTeamRequest srcReq;
state GetTeamRequest destReq;
state TraceEvent traceEvent("BgDDValleyFiller", self->distributorId);
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval);
@ -1566,8 +1576,20 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
if (skipCurrentLoop && !val.present()) {
// reset loop interval
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
} else if (val.present()) {
if (val.get().size() > 0) {
int ddIgnore = BinaryReader::fromStringRef<int>(val.get(), Unversioned());
if (ddIgnore & DDIgnore::REBALANCE_DISK) {
disableReadBalance = true;
}
if (ddIgnore & DDIgnore::REBALANCE_READ) {
disableDiskBalance = true;
}
skipCurrentLoop = disableReadBalance && disableDiskBalance;
} else {
skipCurrentLoop = true;
}
}
skipCurrentLoop = val.present();
}
traceEvent.detail("Enabled", !skipCurrentLoop);
@ -1584,39 +1606,29 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> _randomTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(true, false, false, true, false))));
randomTeam = _randomTeam;
traceEvent.detail("SourceTeam",
printable(randomTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
// FIXME: read balance and disk balance shouldn't be mutual exclusive in the future
srcReq = GetTeamRequest(true, false, false, true);
destReq = GetTeamRequest(true, true, true, false);
if (!disableReadBalance) {
destReq.teamSorter = greaterReadLoad;
}
if (randomTeam.first.present()) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> unloadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(true, true, true, false, true))));
traceEvent.detail(
"DestTeam",
printable(unloadedTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (unloadedTeam.first.present()) {
bool _moved = wait(rebalanceTeams(self,
SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM,
randomTeam.first.get(),
unloadedTeam.first.get(),
teamCollectionIndex == 0,
&traceEvent));
moved = _moved;
if (moved) {
resetCount = 0;
} else {
resetCount++;
}
// clang-format off
wait(getSrcDestTeams(self, teamCollectionIndex, srcReq, destReq, sourceTeam, destTeam,
SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM,&traceEvent));
if (sourceTeam.isValid() && destTeam.isValid()) {
if (!disableReadBalance) {
wait(store(moved,rebalanceReadLoad(self,SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM,
sourceTeam, destTeam,teamCollectionIndex == 0,
&traceEvent)));
} else {
wait(store(moved,rebalanceTeams(self,SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM,
sourceTeam, destTeam,teamCollectionIndex == 0,
&traceEvent)));
}
}
// clang-format on
moved ? resetCount = 0 : resetCount++;
}
if (now() - (*self->lastLimited) < SERVER_KNOBS->BG_DD_SATURATION_DELAY) {

View File

@ -336,6 +336,20 @@ int64_t TCTeamInfo::getLoadBytes(bool includeInFlight, double inflightPenalty) c
return (physicalBytes + (inflightPenalty * inFlightBytes)) * availableSpaceMultiplier;
}
double TCTeamInfo::getLoadReadBandwidth() const {
double sum = 0;
int size = 0;
for (const auto& server : servers) {
if (server->serverMetricsPresent()) {
auto& replyValue = server->getServerMetrics();
ASSERT(replyValue.load.bytesReadPerKSecond >= 0);
sum += replyValue.load.bytesReadPerKSecond;
size += 1;
}
}
return size == 0 ? 0 : sum / size;
}
int64_t TCTeamInfo::getMinAvailableSpace(bool includeInFlight) const {
int64_t minAvailableSpace = std::numeric_limits<int64_t>::max();
for (const auto& server : servers) {

View File

@ -191,6 +191,8 @@ public:
int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const override;
double getLoadReadBandwidth() const override;
int64_t getMinAvailableSpace(bool includeInFlight = true) const override;
double getMinAvailableSpaceRatio(bool includeInFlight = true) const override;