Merge branch 'main' into granule_merging
This commit is contained in:
commit
236630855b
|
@ -699,6 +699,15 @@ def tenants(logger):
|
|||
|
||||
run_fdbcli_command('writemode on; clear tenant_test')
|
||||
|
||||
def integer_options():
|
||||
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=fdbcli_env)
|
||||
cmd_sequence = ['option on TIMEOUT 1000', 'writemode on', 'clear foo']
|
||||
output, error_output = process.communicate(input='\n'.join(cmd_sequence).encode())
|
||||
|
||||
lines = output.decode().strip().split('\n')[-2:]
|
||||
assert lines[0] == 'Option enabled for all transactions'
|
||||
assert lines[1].startswith('Committed')
|
||||
assert error_output == b''
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
|
||||
|
@ -745,6 +754,7 @@ if __name__ == '__main__':
|
|||
triggerddteaminfolog()
|
||||
tenants()
|
||||
versionepoch()
|
||||
integer_options()
|
||||
else:
|
||||
assert args.process_number > 1, "Process number should be positive"
|
||||
coordinators()
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
.. _release-notes:
|
||||
|
||||
#############
|
||||
Release Notes
|
||||
#############
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
.. _release-notes:
|
||||
|
||||
#############
|
||||
Release Notes
|
||||
#############
|
||||
|
||||
7.2.0
|
||||
======
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
Performance
|
||||
-----------
|
||||
|
||||
Reliability
|
||||
-----------
|
||||
|
||||
Fixes
|
||||
-----
|
||||
|
||||
* In ``fdbcli``, integer options are now expressed as integers rather than byte strings (e.g. ``option on TIMEOUT 1000``). `(PR #7571) <https://github.com/apple/foundationdb/pull/7571>`_
|
||||
|
||||
Status
|
||||
------
|
||||
|
||||
Bindings
|
||||
--------
|
||||
|
||||
Other Changes
|
||||
-------------
|
||||
|
||||
Earlier release notes
|
||||
---------------------
|
||||
* :doc:`7.1 (API Version 710) </release-notes/release-notes-710>`
|
||||
* :doc:`7.0 (API Version 700) </release-notes/release-notes-700>`
|
||||
* :doc:`6.3 (API Version 630) </release-notes/release-notes-630>`
|
||||
* :doc:`6.2 (API Version 620) </release-notes/release-notes-620>`
|
||||
* :doc:`6.1 (API Version 610) </release-notes/release-notes-610>`
|
||||
* :doc:`6.0 (API Version 600) </release-notes/release-notes-600>`
|
||||
* :doc:`5.2 (API Version 520) </release-notes/release-notes-520>`
|
||||
* :doc:`5.1 (API Version 510) </release-notes/release-notes-510>`
|
||||
* :doc:`5.0 (API Version 500) </release-notes/release-notes-500>`
|
||||
* :doc:`4.6 (API Version 460) </release-notes/release-notes-460>`
|
||||
* :doc:`4.5 (API Version 450) </release-notes/release-notes-450>`
|
||||
* :doc:`4.4 (API Version 440) </release-notes/release-notes-440>`
|
||||
* :doc:`4.3 (API Version 430) </release-notes/release-notes-430>`
|
||||
* :doc:`4.2 (API Version 420) </release-notes/release-notes-420>`
|
||||
* :doc:`4.1 (API Version 410) </release-notes/release-notes-410>`
|
||||
* :doc:`4.0 (API Version 400) </release-notes/release-notes-400>`
|
||||
* :doc:`3.0 (API Version 300) </release-notes/release-notes-300>`
|
||||
* :doc:`2.0 (API Version 200) </release-notes/release-notes-200>`
|
||||
* :doc:`1.0 (API Version 100) </release-notes/release-notes-100>`
|
||||
* :doc:`Beta 3 (API Version 23) </release-notes/release-notes-023>`
|
||||
* :doc:`Beta 2 (API Version 22) </release-notes/release-notes-022>`
|
||||
* :doc:`Beta 1 (API Version 21) </release-notes/release-notes-021>`
|
||||
* :doc:`Alpha 6 (API Version 16) </release-notes/release-notes-016>`
|
||||
* :doc:`Alpha 5 (API Version 14) </release-notes/release-notes-014>`
|
|
@ -201,9 +201,31 @@ private:
|
|||
bool enabled,
|
||||
Optional<StringRef> arg,
|
||||
bool intrans) {
|
||||
if (enabled && arg.present() != FDBTransactionOptions::optionInfo.getMustExist(option).hasParameter) {
|
||||
fprintf(stderr, "ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected");
|
||||
throw invalid_option_value();
|
||||
// If the parameter type is an int, we will extract into this variable and reference its memory with a StringRef
|
||||
int64_t parsedInt = 0;
|
||||
|
||||
if (enabled) {
|
||||
auto optionInfo = FDBTransactionOptions::optionInfo.getMustExist(option);
|
||||
if (arg.present() != optionInfo.hasParameter) {
|
||||
fprintf(stderr, "ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected");
|
||||
throw invalid_option_value();
|
||||
}
|
||||
if (arg.present() && optionInfo.paramType == FDBOptionInfo::ParamType::Int) {
|
||||
try {
|
||||
size_t nextIdx;
|
||||
std::string value = arg.get().toString();
|
||||
parsedInt = std::stoll(value, &nextIdx);
|
||||
if (nextIdx != value.length()) {
|
||||
fprintf(
|
||||
stderr, "ERROR: could not parse value `%s' as an integer\n", arg.get().toString().c_str());
|
||||
throw invalid_option_value();
|
||||
}
|
||||
arg = StringRef(reinterpret_cast<uint8_t*>(&parsedInt), 8);
|
||||
} catch (std::exception e) {
|
||||
fprintf(stderr, "ERROR: could not parse value `%s' as an integer\n", arg.get().toString().c_str());
|
||||
throw invalid_option_value();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (intrans) {
|
||||
|
|
|
@ -215,7 +215,6 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
|
|||
state Reference<ClusterRecoveryData> recoveryData;
|
||||
state PromiseStream<Future<Void>> addActor;
|
||||
state Future<Void> recoveryCore;
|
||||
state bool recoveredDisk = false;
|
||||
|
||||
// SOMEDAY: If there is already a non-failed master referenced by zkMasterInfo, use that one until it fails
|
||||
// When this someday is implemented, make sure forced failures still cause the master to be recruited again
|
||||
|
@ -258,18 +257,6 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
|
|||
.detail("ChangeID", dbInfo.id);
|
||||
db->serverInfo->set(dbInfo);
|
||||
|
||||
if (SERVER_KNOBS->ENABLE_ENCRYPTION && !recoveredDisk) {
|
||||
// EKP singleton recruitment waits for 'Master/Sequencer' recruitment, execute wait for
|
||||
// 'recoveredDiskFiles' optimization once EKP recruitment is unblocked to avoid circular dependencies
|
||||
// with StorageServer initialization. The waiting for recoveredDiskFiles is to make sure the worker
|
||||
// server on the same process has been registered with the new CC before recruitment.
|
||||
|
||||
wait(recoveredDiskFiles);
|
||||
TraceEvent("CCWDB_RecoveredDiskFiles", cluster->id).log();
|
||||
// Need to be done for the first once in the lifetime of ClusterController
|
||||
recoveredDisk = true;
|
||||
}
|
||||
|
||||
state Future<Void> spinDelay = delay(
|
||||
SERVER_KNOBS
|
||||
->MASTER_SPIN_DELAY); // Don't retry cluster recovery more than once per second, but don't delay
|
||||
|
@ -1142,7 +1129,8 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
|
|||
.detail("ZoneId", w.locality.zoneId())
|
||||
.detail("DataHall", w.locality.dataHallId())
|
||||
.detail("PClass", req.processClass.toString())
|
||||
.detail("Workers", self->id_worker.size());
|
||||
.detail("Workers", self->id_worker.size())
|
||||
.detail("RecoveredDiskFiles", req.recoveredDiskFiles);
|
||||
self->goodRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY);
|
||||
self->goodRemoteRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY);
|
||||
} else {
|
||||
|
@ -1154,7 +1142,8 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
|
|||
.detail("DataHall", w.locality.dataHallId())
|
||||
.detail("PClass", req.processClass.toString())
|
||||
.detail("Workers", self->id_worker.size())
|
||||
.detail("Degraded", req.degraded);
|
||||
.detail("Degraded", req.degraded)
|
||||
.detail("RecoveredDiskFiles", req.recoveredDiskFiles);
|
||||
}
|
||||
if (w.address() == g_network->getLocalAddress()) {
|
||||
if (self->changingDcIds.get().first) {
|
||||
|
@ -1207,6 +1196,7 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
|
|||
newProcessClass,
|
||||
newPriorityInfo,
|
||||
req.degraded,
|
||||
req.recoveredDiskFiles,
|
||||
req.issues);
|
||||
if (!self->masterProcessId.present() &&
|
||||
w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
|
||||
|
@ -1232,6 +1222,7 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
|
|||
info->second.priorityInfo = newPriorityInfo;
|
||||
info->second.initialClass = req.initialClass;
|
||||
info->second.details.degraded = req.degraded;
|
||||
info->second.details.recoveredDiskFiles = req.recoveredDiskFiles;
|
||||
info->second.gen = req.generation;
|
||||
info->second.issues = req.issues;
|
||||
|
||||
|
|
|
@ -124,6 +124,37 @@ class DDTxnProcessorImpl {
|
|||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY, TaskPriority::DataDistribution));
|
||||
|
||||
try {
|
||||
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
|
||||
if (!mode.present() && ddEnabledState->isDDEnabled()) {
|
||||
TraceEvent("WaitForDDEnabledSucceeded").log();
|
||||
return Void();
|
||||
}
|
||||
if (mode.present()) {
|
||||
BinaryReader rd(mode.get(), Unversioned());
|
||||
int m;
|
||||
rd >> m;
|
||||
TraceEvent(SevDebug, "WaitForDDEnabled")
|
||||
.detail("Mode", m)
|
||||
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
|
||||
if (m && ddEnabledState->isDDEnabled()) {
|
||||
TraceEvent("WaitForDDEnabledSucceeded").log();
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
|
||||
|
@ -147,4 +178,8 @@ Future<Void> DDTxnProcessor::updateReplicaKeys(const std::vector<Optional<Key>>&
|
|||
const std::vector<Optional<Key>>& remoteIds,
|
||||
const DatabaseConfiguration& configuration) const {
|
||||
return DDTxnProcessorImpl::updateReplicaKeys(cx, primaryIds, remoteIds, configuration);
|
||||
}
|
||||
|
||||
Future<Void> DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const {
|
||||
return DDTxnProcessorImpl::waitForDataDistributionEnabled(cx, ddEnabledState);
|
||||
}
|
|
@ -428,37 +428,6 @@ ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY, TaskPriority::DataDistribution));
|
||||
|
||||
try {
|
||||
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
|
||||
if (!mode.present() && ddEnabledState->isDDEnabled()) {
|
||||
TraceEvent("WaitForDDEnabledSucceeded").log();
|
||||
return Void();
|
||||
}
|
||||
if (mode.present()) {
|
||||
BinaryReader rd(mode.get(), Unversioned());
|
||||
int m;
|
||||
rd >> m;
|
||||
TraceEvent(SevDebug, "WaitForDDEnabled")
|
||||
.detail("Mode", m)
|
||||
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
|
||||
if (m && ddEnabledState->isDDEnabled()) {
|
||||
TraceEvent("WaitForDDEnabledSucceeded").log();
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<bool> isDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
|
@ -601,6 +570,10 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
|
|||
remoteDcIds.push_back(regions[1].dcId);
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> waitDataDistributorEnabled(const DDEnabledState* ddEnabledState) const {
|
||||
return txnProcessor->waitForDataDistributionEnabled(ddEnabledState);
|
||||
}
|
||||
};
|
||||
|
||||
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
|
||||
|
@ -707,7 +680,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
.detail("HighestPriority", self->configuration.usableRegions > 1 ? 0 : -1)
|
||||
.trackLatest(self->totalDataInFlightRemoteEventHolder->trackingKey);
|
||||
|
||||
wait(waitForDataDistributionEnabled(cx, ddEnabledState));
|
||||
wait(self->waitDataDistributorEnabled(ddEnabledState));
|
||||
TraceEvent("DataDistributionEnabled").log();
|
||||
}
|
||||
|
||||
|
|
|
@ -64,9 +64,10 @@ struct WorkerInfo : NonCopyable {
|
|||
ProcessClass processClass,
|
||||
ClusterControllerPriorityInfo priorityInfo,
|
||||
bool degraded,
|
||||
bool recoveredDiskFiles,
|
||||
Standalone<VectorRef<StringRef>> issues)
|
||||
: watcher(watcher), reply(reply), gen(gen), reboots(0), initialClass(initialClass), priorityInfo(priorityInfo),
|
||||
details(interf, processClass, degraded), issues(issues) {}
|
||||
details(interf, processClass, degraded, recoveredDiskFiles), issues(issues) {}
|
||||
|
||||
WorkerInfo(WorkerInfo&& r) noexcept
|
||||
: watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), reboots(r.reboots),
|
||||
|
@ -301,7 +302,7 @@ public:
|
|||
std::set<AddressExclusion> excludedAddresses(req.excludeAddresses.begin(), req.excludeAddresses.end());
|
||||
|
||||
for (auto& it : id_worker)
|
||||
if (workerAvailable(it.second, false) &&
|
||||
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
|
||||
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
|
||||
(includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId())) &&
|
||||
!addressExcluded(excludedAddresses, it.second.details.interf.address()) &&
|
||||
|
@ -316,7 +317,7 @@ public:
|
|||
Optional<WorkerDetails> bestInfo;
|
||||
for (auto& it : id_worker) {
|
||||
ProcessClass::Fitness fit = it.second.details.processClass.machineClassFitness(ProcessClass::Storage);
|
||||
if (workerAvailable(it.second, false) &&
|
||||
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
|
||||
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
|
||||
(includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId())) &&
|
||||
!addressExcluded(excludedAddresses, it.second.details.interf.address()) && fit < bestFit) {
|
||||
|
@ -365,7 +366,8 @@ public:
|
|||
|
||||
for (auto& it : id_worker) {
|
||||
auto fitness = it.second.details.processClass.machineClassFitness(ProcessClass::Storage);
|
||||
if (workerAvailable(it.second, false) && !conf.isExcludedServer(it.second.details.interf.addresses()) &&
|
||||
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
|
||||
!conf.isExcludedServer(it.second.details.interf.addresses()) &&
|
||||
!isExcludedDegradedServer(it.second.details.interf.addresses()) &&
|
||||
fitness != ProcessClass::NeverAssign &&
|
||||
(!dcId.present() || it.second.details.interf.locality.dcId() == dcId.get())) {
|
||||
|
@ -597,6 +599,11 @@ public:
|
|||
logWorkerUnavailable(SevInfo, id, "complex", "Worker is not available", worker_details, fitness, dcIds);
|
||||
continue;
|
||||
}
|
||||
if (!worker_details.recoveredDiskFiles) {
|
||||
logWorkerUnavailable(
|
||||
SevInfo, id, "complex", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
|
||||
continue;
|
||||
}
|
||||
if (conf.isExcludedServer(worker_details.interf.addresses())) {
|
||||
logWorkerUnavailable(SevInfo,
|
||||
id,
|
||||
|
@ -842,6 +849,11 @@ public:
|
|||
logWorkerUnavailable(SevInfo, id, "simple", "Worker is not available", worker_details, fitness, dcIds);
|
||||
continue;
|
||||
}
|
||||
if (!worker_details.recoveredDiskFiles) {
|
||||
logWorkerUnavailable(
|
||||
SevInfo, id, "simple", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
|
||||
continue;
|
||||
}
|
||||
if (conf.isExcludedServer(worker_details.interf.addresses())) {
|
||||
logWorkerUnavailable(SevInfo,
|
||||
id,
|
||||
|
@ -989,6 +1001,11 @@ public:
|
|||
SevInfo, id, "deprecated", "Worker is not available", worker_details, fitness, dcIds);
|
||||
continue;
|
||||
}
|
||||
if (!worker_details.recoveredDiskFiles) {
|
||||
logWorkerUnavailable(
|
||||
SevInfo, id, "deprecated", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
|
||||
continue;
|
||||
}
|
||||
if (conf.isExcludedServer(worker_details.interf.addresses())) {
|
||||
logWorkerUnavailable(SevInfo,
|
||||
id,
|
||||
|
|
|
@ -51,6 +51,8 @@ public:
|
|||
const DatabaseConfiguration& configuration) const {
|
||||
return Void();
|
||||
}
|
||||
|
||||
virtual Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const = 0;
|
||||
};
|
||||
|
||||
class DDTxnProcessorImpl;
|
||||
|
@ -76,6 +78,8 @@ public:
|
|||
Future<Void> updateReplicaKeys(const std::vector<Optional<Key>>& primaryIds,
|
||||
const std::vector<Optional<Key>>& remoteIds,
|
||||
const DatabaseConfiguration& configuration) const override;
|
||||
|
||||
Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const override;
|
||||
};
|
||||
|
||||
// A mock transaction implementation for test usage.
|
||||
|
|
|
@ -138,10 +138,11 @@ struct WorkerDetails {
|
|||
WorkerInterface interf;
|
||||
ProcessClass processClass;
|
||||
bool degraded;
|
||||
bool recoveredDiskFiles;
|
||||
|
||||
WorkerDetails() : degraded(false) {}
|
||||
WorkerDetails(const WorkerInterface& interf, ProcessClass processClass, bool degraded)
|
||||
: interf(interf), processClass(processClass), degraded(degraded) {}
|
||||
WorkerDetails(const WorkerInterface& interf, ProcessClass processClass, bool degraded, bool recoveredDiskFiles)
|
||||
: interf(interf), processClass(processClass), degraded(degraded), recoveredDiskFiles(recoveredDiskFiles) {}
|
||||
|
||||
bool operator<(const WorkerDetails& r) const { return interf.id() < r.interf.id(); }
|
||||
|
||||
|
@ -436,6 +437,7 @@ struct RegisterWorkerRequest {
|
|||
Version lastSeenKnobVersion;
|
||||
ConfigClassSet knobConfigClassSet;
|
||||
bool requestDbInfo;
|
||||
bool recoveredDiskFiles;
|
||||
|
||||
RegisterWorkerRequest()
|
||||
: priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
|
||||
|
@ -450,11 +452,12 @@ struct RegisterWorkerRequest {
|
|||
Optional<EncryptKeyProxyInterface> ekpInterf,
|
||||
bool degraded,
|
||||
Version lastSeenKnobVersion,
|
||||
ConfigClassSet knobConfigClassSet)
|
||||
ConfigClassSet knobConfigClassSet,
|
||||
bool recoveredDiskFiles)
|
||||
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
|
||||
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),
|
||||
encryptKeyProxyInterf(ekpInterf), degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion),
|
||||
knobConfigClassSet(knobConfigClassSet), requestDbInfo(false) {}
|
||||
knobConfigClassSet(knobConfigClassSet), requestDbInfo(false), recoveredDiskFiles(recoveredDiskFiles) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
@ -474,7 +477,8 @@ struct RegisterWorkerRequest {
|
|||
degraded,
|
||||
lastSeenKnobVersion,
|
||||
knobConfigClassSet,
|
||||
requestDbInfo);
|
||||
requestDbInfo,
|
||||
recoveredDiskFiles);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -565,7 +565,8 @@ ACTOR Future<Void> registrationClient(
|
|||
Reference<AsyncVar<std::set<std::string>> const> issues,
|
||||
Reference<ConfigNode> configNode,
|
||||
Reference<LocalConfiguration> localConfig,
|
||||
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
||||
Reference<AsyncVar<ServerDBInfo>> dbInfo,
|
||||
Promise<Void> recoveredDiskFiles) {
|
||||
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
|
||||
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
|
||||
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
|
||||
|
@ -600,7 +601,8 @@ ACTOR Future<Void> registrationClient(
|
|||
ekpInterf->get(),
|
||||
degraded->get(),
|
||||
localConfig->lastSeenVersion(),
|
||||
localConfig->configClassSet());
|
||||
localConfig->configClassSet(),
|
||||
recoveredDiskFiles.isSet());
|
||||
|
||||
for (auto const& i : issues->get()) {
|
||||
request.issues.push_back_deep(request.issues.arena(), i);
|
||||
|
@ -637,10 +639,15 @@ ACTOR Future<Void> registrationClient(
|
|||
request.requestDbInfo = true;
|
||||
firstReg = false;
|
||||
}
|
||||
TraceEvent("WorkerRegister")
|
||||
.detail("CCID", ccInterface->get().get().id())
|
||||
.detail("Generation", requestGeneration)
|
||||
.detail("RecoveredDiskFiles", recoveredDiskFiles.isSet());
|
||||
}
|
||||
state Future<RegisterWorkerReply> registrationReply =
|
||||
ccInterfacePresent ? brokenPromiseToNever(ccInterface->get().get().registerWorker.getReply(request))
|
||||
: Never();
|
||||
state Future<Void> recovered = recoveredDiskFiles.isSet() ? Never() : recoveredDiskFiles.getFuture();
|
||||
state double startTime = now();
|
||||
loop choose {
|
||||
when(RegisterWorkerReply reply = wait(registrationReply)) {
|
||||
|
@ -664,6 +671,7 @@ ACTOR Future<Void> registrationClient(
|
|||
when(wait(degraded->onChange())) { break; }
|
||||
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
|
||||
when(wait(issues->onChange())) { break; }
|
||||
when(wait(recovered)) { break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1852,8 +1860,31 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
startRole(Role::WORKER, interf.id(), interf.id(), details);
|
||||
errorForwarders.add(traceRole(Role::WORKER, interf.id()));
|
||||
|
||||
wait(waitForAll(recoveries));
|
||||
recoveredDiskFiles.send(Void());
|
||||
// We want to avoid the worker being recruited as storage or TLog before recoverying it is local files,
|
||||
// to make sure:
|
||||
// (1) the worker can start serving requests once it is recruited as storage or TLog server, and
|
||||
// (2) a slow recovering worker server wouldn't been recruited as TLog and make recovery slow.
|
||||
// However, the worker server can still serve stateless roles, and if encryption is on, it is crucial to have
|
||||
// some worker available to serve the EncryptKeyProxy role, before opening encrypted storage files.
|
||||
//
|
||||
// When encryption-at-rest is enabled, the follow code allows a worker to first register with the
|
||||
// cluster controller to be recruited only as a stateless process i.e. it can't be recruited as a SS or TLog
|
||||
// process; once the local disk recovery is complete (if applicable), the process re-registers with cluster
|
||||
// controller as a stateful process role.
|
||||
//
|
||||
// TODO(yiwu): Unify behavior for encryption and non-encryption once the change is stable.
|
||||
Future<Void> recoverDiskFiles = trigger(
|
||||
[=]() {
|
||||
TraceEvent("RecoveriesComplete", interf.id());
|
||||
recoveredDiskFiles.send(Void());
|
||||
return Void();
|
||||
},
|
||||
waitForAll(recoveries));
|
||||
if (!SERVER_KNOBS->ENABLE_ENCRYPTION) {
|
||||
wait(recoverDiskFiles);
|
||||
} else {
|
||||
errorForwarders.add(recoverDiskFiles);
|
||||
}
|
||||
|
||||
errorForwarders.add(registrationClient(ccInterface,
|
||||
interf,
|
||||
|
@ -1868,7 +1899,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
issues,
|
||||
configNode,
|
||||
localConfig,
|
||||
dbInfo));
|
||||
dbInfo,
|
||||
recoveredDiskFiles));
|
||||
|
||||
if (configNode.isValid()) {
|
||||
errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));
|
||||
|
@ -1878,8 +1910,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
errorForwarders.add(healthMonitor(ccInterface, interf, locality, dbInfo));
|
||||
}
|
||||
|
||||
TraceEvent("RecoveriesComplete", interf.id());
|
||||
|
||||
loop choose {
|
||||
when(UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
|
||||
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(
|
||||
|
|
Loading…
Reference in New Issue