configurationMonitor does not need to check watch reference count
This commit is contained in:
parent
639afbe62c
commit
ab0f827058
|
@ -567,106 +567,44 @@ ACTOR Future<Void> changeCoordinators(Reference<ClusterRecoveryData> self) {
|
|||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// NOTE: This vector may not be initialized here as the keys might be initialized *AFTER* this vector, causing all
|
||||
// keys empty. The reason is that the order of the initialization of keys and this vector might not be ordered as wished
|
||||
// so the vector might be initialized before the keys receives values; thus, all values inside the vector are copied
|
||||
// from uninitialized KeyRefs..
|
||||
// See C++11 standard 3.6.2 for more info.
|
||||
std::vector<KeyRef> configurationMonitorWatchKeys;
|
||||
|
||||
ACTOR Future<Void> configurationMonitorImpl(Reference<ClusterRecoveryData> self,
|
||||
Database cx,
|
||||
Optional<int64_t>* pTenantId) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
ACTOR Future<Void> configurationMonitor(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.fromKeyValues((VectorRef<KeyValueRef>)results);
|
||||
TraceEvent("ConfigurationMonitor", self->dbgid)
|
||||
.detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(),
|
||||
self->recoveryState);
|
||||
if (conf != self->configuration) {
|
||||
if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED &&
|
||||
self->recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
self->controllerData->shouldCommitSuicide = true;
|
||||
throw restart_cluster_controller();
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
DatabaseConfiguration conf;
|
||||
conf.fromKeyValues((VectorRef<KeyValueRef>)results);
|
||||
TraceEvent("ConfigurationMonitor", self->dbgid)
|
||||
.detail(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(),
|
||||
self->recoveryState);
|
||||
if (conf != self->configuration) {
|
||||
if (self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED &&
|
||||
self->recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
self->controllerData->shouldCommitSuicide = true;
|
||||
throw restart_cluster_controller();
|
||||
}
|
||||
|
||||
self->configuration = conf;
|
||||
self->registrationTrigger.trigger();
|
||||
}
|
||||
|
||||
self->configuration = conf;
|
||||
self->registrationTrigger.trigger();
|
||||
state Future<Void> watchFuture =
|
||||
tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) ||
|
||||
tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) ||
|
||||
tr.watch(failedLocalityVersionKey);
|
||||
wait(tr.commit());
|
||||
wait(watchFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> watchFutures;
|
||||
std::transform(std::begin(configurationMonitorWatchKeys),
|
||||
std::end(configurationMonitorWatchKeys),
|
||||
std::back_inserter(watchFutures),
|
||||
[this](KeyRef key) { return tr.watch(key); });
|
||||
// Only after this stage, where getKeyLocation is called indirectly, the tenant information is updated and
|
||||
// set to the transaction state.
|
||||
(*pTenantId) = tr.getTransactionState()->tenantId();
|
||||
state Future<Void> watchFuture = waitForAny(watchFutures);
|
||||
|
||||
wait(tr.commit());
|
||||
|
||||
wait(watchFuture);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
ACTOR Future<Void> configurationMonitor(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
state Optional<int64_t> tenantId;
|
||||
|
||||
// The keys cannot be initialized where it is defined. see comments at the definition.
|
||||
if (configurationMonitorWatchKeys.empty()) {
|
||||
configurationMonitorWatchKeys = std::vector<KeyRef>{ moveKeysLockOwnerKey,
|
||||
excludedServersVersionKey,
|
||||
failedServersVersionKey,
|
||||
excludedLocalityVersionKey,
|
||||
failedLocalityVersionKey };
|
||||
}
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
wait(configurationMonitorImpl(self, cx, &tenantId));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Cancel all watches created by configurationMonitorImpl. Due to a circular reference issue, if the watches are
|
||||
// not cancelled manually, the DatabaseContext object in cx will not be able to destructed properly, see
|
||||
//
|
||||
// https://github.com/apple/foundationdb/issues/8321
|
||||
//
|
||||
// for more detailed discussion.
|
||||
|
||||
if (!tenantId.present()) {
|
||||
// Tenant ID not set, no watches are created in this case, no cleanup required.
|
||||
return Void();
|
||||
}
|
||||
std::for_each(std::begin(configurationMonitorWatchKeys),
|
||||
std::end(configurationMonitorWatchKeys),
|
||||
[this](KeyRef key) { cx->decreaseWatchRefCount(tenantId.get(), key); });
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<Version>> getMinBackupVersion(Reference<ClusterRecoveryData> self, Database cx) {
|
||||
|
|
Loading…
Reference in New Issue