Merge pull request #7994 from jzhou77/main
Fix missing localities for fdbserver
This commit is contained in:
commit
05e463f79f
|
@ -486,6 +486,11 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
||||||
// server count is within "LOAD_BALANCE_MAX_BAD_OPTIONS". We
|
// server count is within "LOAD_BALANCE_MAX_BAD_OPTIONS". We
|
||||||
// do not need to consider any remote servers.
|
// do not need to consider any remote servers.
|
||||||
break;
|
break;
|
||||||
|
} else if (badServers == alternatives->countBest() && i == badServers) {
|
||||||
|
TraceEvent("AllLocalAlternativesFailed")
|
||||||
|
.detail("Alternatives", alternatives->description())
|
||||||
|
.detail("Total", alternatives->size())
|
||||||
|
.detail("Best", alternatives->countBest());
|
||||||
}
|
}
|
||||||
|
|
||||||
RequestStream<Request, P> const* thisStream = &alternatives->get(i, channel);
|
RequestStream<Request, P> const* thisStream = &alternatives->get(i, channel);
|
||||||
|
@ -587,6 +592,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
||||||
// nextAlt. This logic matters only if model == nullptr. Otherwise, the
|
// nextAlt. This logic matters only if model == nullptr. Otherwise, the
|
||||||
// bestAlt and nextAlt have been decided.
|
// bestAlt and nextAlt have been decided.
|
||||||
state RequestStream<Request, P> const* stream = nullptr;
|
state RequestStream<Request, P> const* stream = nullptr;
|
||||||
|
state LBDistance::Type distance;
|
||||||
for (int alternativeNum = 0; alternativeNum < alternatives->size(); alternativeNum++) {
|
for (int alternativeNum = 0; alternativeNum < alternatives->size(); alternativeNum++) {
|
||||||
int useAlt = nextAlt;
|
int useAlt = nextAlt;
|
||||||
if (nextAlt == startAlt)
|
if (nextAlt == startAlt)
|
||||||
|
@ -595,6 +601,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
||||||
useAlt = (nextAlt + alternatives->size() - 1) % alternatives->size();
|
useAlt = (nextAlt + alternatives->size() - 1) % alternatives->size();
|
||||||
|
|
||||||
stream = &alternatives->get(useAlt, channel);
|
stream = &alternatives->get(useAlt, channel);
|
||||||
|
distance = alternatives->getDistance(useAlt);
|
||||||
if (!IFailureMonitor::failureMonitor().getState(stream->getEndpoint()).failed &&
|
if (!IFailureMonitor::failureMonitor().getState(stream->getEndpoint()).failed &&
|
||||||
(!firstRequestEndpoint.present() || stream->getEndpoint().token.first() != firstRequestEndpoint.get()))
|
(!firstRequestEndpoint.present() || stream->getEndpoint().token.first() != firstRequestEndpoint.get()))
|
||||||
break;
|
break;
|
||||||
|
@ -602,6 +609,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
||||||
if (nextAlt == startAlt)
|
if (nextAlt == startAlt)
|
||||||
triedAllOptions = TriedAllOptions::True;
|
triedAllOptions = TriedAllOptions::True;
|
||||||
stream = nullptr;
|
stream = nullptr;
|
||||||
|
distance = LBDistance::DISTANT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!stream && !firstRequestData.isValid()) {
|
if (!stream && !firstRequestData.isValid()) {
|
||||||
|
@ -637,6 +645,18 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
||||||
firstRequestEndpoint = Optional<uint64_t>();
|
firstRequestEndpoint = Optional<uint64_t>();
|
||||||
} else if (firstRequestData.isValid()) {
|
} else if (firstRequestData.isValid()) {
|
||||||
// Issue a second request, the first one is taking a long time.
|
// Issue a second request, the first one is taking a long time.
|
||||||
|
if (distance == LBDistance::DISTANT) {
|
||||||
|
TraceEvent("LBDistant2nd")
|
||||||
|
.suppressFor(0.1)
|
||||||
|
.detail("Distance", (int)distance)
|
||||||
|
.detail("BackOff", backoff)
|
||||||
|
.detail("TriedAllOptions", triedAllOptions)
|
||||||
|
.detail("Alternatives", alternatives->description())
|
||||||
|
.detail("Token", stream->getEndpoint().token)
|
||||||
|
.detail("Total", alternatives->size())
|
||||||
|
.detail("Best", alternatives->countBest())
|
||||||
|
.detail("Attempts", numAttempts);
|
||||||
|
}
|
||||||
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
|
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
|
||||||
|
|
||||||
loop choose {
|
loop choose {
|
||||||
|
@ -664,6 +684,18 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Issue a request, if it takes too long to get a reply, go around the loop
|
// Issue a request, if it takes too long to get a reply, go around the loop
|
||||||
|
if (distance == LBDistance::DISTANT) {
|
||||||
|
TraceEvent("LBDistant")
|
||||||
|
.suppressFor(0.1)
|
||||||
|
.detail("Distance", (int)distance)
|
||||||
|
.detail("BackOff", backoff)
|
||||||
|
.detail("TriedAllOptions", triedAllOptions)
|
||||||
|
.detail("Alternatives", alternatives->description())
|
||||||
|
.detail("Token", stream->getEndpoint().token)
|
||||||
|
.detail("Total", alternatives->size())
|
||||||
|
.detail("Best", alternatives->countBest())
|
||||||
|
.detail("Attempts", numAttempts);
|
||||||
|
}
|
||||||
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
|
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
|
||||||
firstRequestEndpoint = stream->getEndpoint().token.first();
|
firstRequestEndpoint = stream->getEndpoint().token.first();
|
||||||
|
|
||||||
|
|
|
@ -226,6 +226,7 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
T const& getInterface(int index) { return alternatives[index]->interf; }
|
T const& getInterface(int index) { return alternatives[index]->interf; }
|
||||||
|
LBDistance::Type getDistance(int index) const { return (LBDistance::Type)alternatives[index]->distance; }
|
||||||
UID getId(int index) const { return alternatives[index]->interf.id(); }
|
UID getId(int index) const { return alternatives[index]->interf.id(); }
|
||||||
bool hasInterface(UID id) const {
|
bool hasInterface(UID id) const {
|
||||||
for (const auto& ref : alternatives) {
|
for (const auto& ref : alternatives) {
|
||||||
|
|
|
@ -3358,7 +3358,9 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
|
||||||
makeReference<AsyncVar<ClusterControllerPriorityInfo>>(getCCPriorityInfo(fitnessFilePath, processClass));
|
makeReference<AsyncVar<ClusterControllerPriorityInfo>>(getCCPriorityInfo(fitnessFilePath, processClass));
|
||||||
auto serverDBInfo = ServerDBInfo();
|
auto serverDBInfo = ServerDBInfo();
|
||||||
serverDBInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
|
serverDBInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
|
||||||
|
serverDBInfo.myLocality = localities;
|
||||||
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>(serverDBInfo);
|
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>(serverDBInfo);
|
||||||
|
TraceEvent("MyLocality").detail("Locality", dbInfo->get().myLocality.toString());
|
||||||
|
|
||||||
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
|
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
|
||||||
"MonitorAndWriteCCPriorityInfo"));
|
"MonitorAndWriteCCPriorityInfo"));
|
||||||
|
|
Loading…
Reference in New Issue