Merge pull request #4583 from sfc-gh-mpilman/buildcop/2021/03/25/lowlatency1
Fix bug where betterMasterExist and recruitment disagree
This commit is contained in:
commit
58c9158b73
|
@ -339,7 +339,7 @@ public:
|
|||
bool checkStable = false,
|
||||
std::set<Optional<Key>> dcIds = std::set<Optional<Key>>(),
|
||||
std::vector<UID> exclusionWorkerIds = {}) {
|
||||
std::map<std::pair<ProcessClass::Fitness, bool>, vector<WorkerDetails>> fitness_workers;
|
||||
std::map<std::tuple<ProcessClass::Fitness, int, bool, bool>, vector<WorkerDetails>> fitness_workers;
|
||||
std::vector<WorkerDetails> results;
|
||||
std::vector<LocalityData> unavailableLocals;
|
||||
Reference<LocalitySet> logServerSet;
|
||||
|
@ -406,80 +406,94 @@ public:
|
|||
}
|
||||
|
||||
// This worker is a candidate for TLog recruitment.
|
||||
fitness_workers[std::make_pair(fitness, worker_details.degraded)].push_back(worker_details);
|
||||
bool inCCDC = worker_details.interf.locality.dcId() == clusterControllerDcId;
|
||||
fitness_workers[std::make_tuple(fitness, id_used[worker_process_id], worker_details.degraded, inCCDC)]
|
||||
.push_back(worker_details);
|
||||
}
|
||||
|
||||
results.reserve(results.size() + id_worker.size());
|
||||
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign && !bCompleted; fitness++) {
|
||||
// FIXME: it's not clear whether this is necessary.
|
||||
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign; fitness++) {
|
||||
auto fitnessEnum = (ProcessClass::Fitness)fitness;
|
||||
for (int addingDegraded = 0; addingDegraded < 2; addingDegraded++) {
|
||||
auto workerItr = fitness_workers.find(std::make_pair(fitnessEnum, (bool)addingDegraded));
|
||||
if (workerItr != fitness_workers.end()) {
|
||||
for (auto& worker : workerItr->second) {
|
||||
logServerMap->add(worker.interf.locality, &worker);
|
||||
}
|
||||
}
|
||||
fitness_workers[std::make_tuple(fitnessEnum, 0, addingDegraded, false)];
|
||||
}
|
||||
}
|
||||
results.reserve(results.size() + id_worker.size());
|
||||
for (auto workerIter = fitness_workers.begin(); workerIter != fitness_workers.end(); ++workerIter) {
|
||||
auto fitness = std::get<0>(workerIter->first);
|
||||
auto used = std::get<1>(workerIter->first);
|
||||
auto addingDegraded = std::get<2>(workerIter->first);
|
||||
ASSERT(fitness < ProcessClass::NeverAssign);
|
||||
if (bCompleted) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (logServerSet->size() < (addingDegraded == 0 ? desired : required)) {
|
||||
} else if (logServerSet->size() == required || logServerSet->size() <= desired) {
|
||||
if (logServerSet->validate(policy)) {
|
||||
for (auto& object : logServerMap->getObjects()) {
|
||||
results.push_back(*object);
|
||||
}
|
||||
bCompleted = true;
|
||||
break;
|
||||
for (auto& worker : workerIter->second) {
|
||||
logServerMap->add(worker.interf.locality, &worker);
|
||||
}
|
||||
|
||||
if (logServerSet->size() < (std::get<2>(workerIter->first) ? required : desired)) {
|
||||
} else if (logServerSet->size() == required || logServerSet->size() <= desired) {
|
||||
if (logServerSet->validate(policy)) {
|
||||
for (auto& object : logServerMap->getObjects()) {
|
||||
results.push_back(*object);
|
||||
}
|
||||
TraceEvent(SevWarn, "GWFTADNotAcceptable", id)
|
||||
bCompleted = true;
|
||||
break;
|
||||
}
|
||||
TraceEvent(SevWarn, "GWFTADNotAcceptable", id)
|
||||
.detail("DcIds", dcList)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("Required", required)
|
||||
.detail("TLogPolicy", policy->info())
|
||||
.detail("DesiredLogs", desired)
|
||||
.detail("Used", used)
|
||||
.detail("AddingDegraded", addingDegraded);
|
||||
}
|
||||
// Try to select the desired size, if larger
|
||||
else {
|
||||
std::vector<LocalityEntry> bestSet;
|
||||
std::vector<LocalityData> tLocalities;
|
||||
|
||||
// Try to find the best team of servers to fulfill the policy
|
||||
if (findBestPolicySet(bestSet,
|
||||
logServerSet,
|
||||
policy,
|
||||
desired,
|
||||
SERVER_KNOBS->POLICY_RATING_TESTS,
|
||||
SERVER_KNOBS->POLICY_GENERATIONS)) {
|
||||
results.reserve(results.size() + bestSet.size());
|
||||
for (auto& entry : bestSet) {
|
||||
auto object = logServerMap->getObject(entry);
|
||||
ASSERT(object);
|
||||
results.push_back(*object);
|
||||
tLocalities.push_back(object->interf.locality);
|
||||
}
|
||||
TraceEvent("GWFTADBestResults", id)
|
||||
.detail("DcIds", dcList)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Used", used)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("Required", required)
|
||||
.detail("TLogPolicy", policy->info())
|
||||
.detail("DesiredLogs", desired)
|
||||
.detail("AddingDegraded", addingDegraded);
|
||||
}
|
||||
// Try to select the desired size, if larger
|
||||
else {
|
||||
std::vector<LocalityEntry> bestSet;
|
||||
std::vector<LocalityData> tLocalities;
|
||||
|
||||
// Try to find the best team of servers to fulfill the policy
|
||||
if (findBestPolicySet(bestSet,
|
||||
logServerSet,
|
||||
policy,
|
||||
desired,
|
||||
SERVER_KNOBS->POLICY_RATING_TESTS,
|
||||
SERVER_KNOBS->POLICY_GENERATIONS)) {
|
||||
results.reserve(results.size() + bestSet.size());
|
||||
for (auto& entry : bestSet) {
|
||||
auto object = logServerMap->getObject(entry);
|
||||
ASSERT(object);
|
||||
results.push_back(*object);
|
||||
tLocalities.push_back(object->interf.locality);
|
||||
}
|
||||
TraceEvent("GWFTADBestResults", id)
|
||||
.detail("DcIds", dcList)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("BestCount", bestSet.size())
|
||||
.detail("BestZones", ::describeZones(tLocalities))
|
||||
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
|
||||
.detail("TLogPolicy", policy->info())
|
||||
.detail("TotalResults", results.size())
|
||||
.detail("DesiredLogs", desired)
|
||||
.detail("AddingDegraded", addingDegraded);
|
||||
bCompleted = true;
|
||||
break;
|
||||
}
|
||||
TraceEvent(SevWarn, "GWFTADNoBest", id)
|
||||
.detail("DcIds", dcList)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("Required", required)
|
||||
.detail("BestCount", bestSet.size())
|
||||
.detail("BestZones", ::describeZones(tLocalities))
|
||||
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
|
||||
.detail("TLogPolicy", policy->info())
|
||||
.detail("TotalResults", results.size())
|
||||
.detail("DesiredLogs", desired)
|
||||
.detail("AddingDegraded", addingDegraded);
|
||||
bCompleted = true;
|
||||
break;
|
||||
}
|
||||
TraceEvent(SevWarn, "GWFTADNoBest", id)
|
||||
.detail("DcIds", dcList)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Used", used)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("Required", required)
|
||||
.detail("TLogPolicy", policy->info())
|
||||
.detail("DesiredLogs", desired)
|
||||
.detail("AddingDegraded", addingDegraded);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1157,12 +1171,14 @@ public:
|
|||
req.configuration,
|
||||
used,
|
||||
first_commit_proxy);
|
||||
|
||||
auto grv_proxies = getWorkersForRoleInDatacenter(dcId,
|
||||
ProcessClass::GrvProxy,
|
||||
req.configuration.getDesiredGrvProxies(),
|
||||
req.configuration,
|
||||
used,
|
||||
first_grv_proxy);
|
||||
|
||||
auto resolvers = getWorkersForRoleInDatacenter(dcId,
|
||||
ProcessClass::Resolver,
|
||||
req.configuration.getDesiredResolvers(),
|
||||
|
@ -1216,6 +1232,7 @@ public:
|
|||
}
|
||||
|
||||
if (bestDC != clusterControllerDcId) {
|
||||
TraceEvent("BestDCIsNotClusterDC");
|
||||
vector<Optional<Key>> dcPriority;
|
||||
dcPriority.push_back(bestDC);
|
||||
desiredDcIds.set(dcPriority);
|
||||
|
|
|
@ -74,6 +74,7 @@ struct LowLatencyWorkload : TestWorkload {
|
|||
++self->operations;
|
||||
loop {
|
||||
try {
|
||||
TraceEvent("StartLowLatencyTransaction");
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
if (doCommit) {
|
||||
|
@ -84,6 +85,7 @@ struct LowLatencyWorkload : TestWorkload {
|
|||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("LowLatencyTransactionFailed").error(e, true);
|
||||
wait(tr.onError(e));
|
||||
++self->retries;
|
||||
}
|
||||
|
|
|
@ -184,6 +184,12 @@ Reference<IRandom> deterministicRandom();
|
|||
// non-deterministic contexts.
|
||||
Reference<IRandom> nondeterministicRandom();
|
||||
|
||||
// This returns a deterministic random number generator initialized with the same seed as the one returned by
|
||||
// deterministicRandom. The main use-case for this is to generate deterministic random numbers without changing the
|
||||
// determinism of the simulator. This is useful for things like generating random UIDs for debug transactions.
|
||||
// WARNING: This is not thread safe and must not be called from any other thread than the network thread!
|
||||
Reference<IRandom> debugRandom();
|
||||
|
||||
// Populates a buffer with a random sequence of bytes
|
||||
void generateRandomData(uint8_t* buffer, int length);
|
||||
|
||||
|
|
|
@ -47,11 +47,17 @@ INetwork* g_network = 0;
|
|||
|
||||
FILE* randLog = 0;
|
||||
thread_local Reference<IRandom> seededRandom;
|
||||
Reference<IRandom> seededDebugRandom;
|
||||
uint64_t debug_lastLoadBalanceResultEndpointToken = 0;
|
||||
bool noUnseed = false;
|
||||
|
||||
void setThreadLocalDeterministicRandomSeed(uint32_t seed) {
|
||||
seededRandom = Reference<IRandom>(new DeterministicRandom(seed, true));
|
||||
seededDebugRandom = Reference<IRandom>(new DeterministicRandom(seed));
|
||||
}
|
||||
|
||||
Reference<IRandom> debugRandom() {
|
||||
return seededDebugRandom;
|
||||
}
|
||||
|
||||
Reference<IRandom> deterministicRandom() {
|
||||
|
|
Loading…
Reference in New Issue