fix: forced recoveries now require a target dcid which will become the new primary location. During the forced recovery, the configuration will be changed to make that location primary, and usable_regions will be set to 1. If the target dcid is already the primary location, the forced recovery will do nothing. This makes forced recoveries idempotent, so it is safe to the client to re-send forced recovery commands to the cluster controller.

fix: the cluster controller attempts to do a commit to determine if the cluster is alive, since its own internal recoveryState might not be up-to-date.

fix: forceMasterFailure on the cluster controller did not always cause the current master to be re-recruited
This commit is contained in:
Evan Tschannen 2019-02-18 14:54:28 -08:00
parent 4c35ebdcc6
commit 8f2af8bed1
8 changed files with 166 additions and 83 deletions

View File

@ -523,10 +523,13 @@ void initHelp() {
"<type> <action> <ARGS>",
"namespace for all the profiling-related commands.",
"Different types support different actions. Run `profile` to get a list of types, and iteratively explore the help.\n");
helpMap["force_recovery_with_data_loss"] = CommandHelp(
"force_recovery_with_data_loss <DCID>",
"Force the database to recover into DCID",
"A forced recovery will cause the database to lose the most recently committed mutations. The amount of mutations that will be lost depends on how far behind the remote datacenter is. This command will change the region configuration to have a positive priority for the chosen DCID, and a negative priority for all other DCIDs. This command will set usable_regions to 1. If the database has already recovered, this command does nothing.\n");
hiddenCommands.insert("expensive_data_check");
hiddenCommands.insert("datadistribution");
hiddenCommands.insert("force_recovery_with_data_loss");
}
void printVersion() {
@ -2778,11 +2781,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "force_recovery_with_data_loss")) {
if(tokens.size() != 1) {
if(tokens.size() != 2) {
printUsage(tokens[0]);
is_error = true;
continue;
}
wait( makeInterruptable( forceRecovery( ccf ) ) );
wait( makeInterruptable( forceRecovery( ccf, tokens[1] ) ) );
continue;
}

4
fdbclient/ClusterInterface.h Executable file → Normal file
View File

@ -223,13 +223,15 @@ struct GetClientWorkersRequest {
};
struct ForceRecoveryRequest {
Key dcId;
ReplyPromise<Void> reply;
ForceRecoveryRequest() {}
explicit ForceRecoveryRequest(Key dcId) : dcId(dcId) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
serializer(ar, dcId, reply);
}
};

View File

@ -268,55 +268,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["usable_regions"] = usableRegions;
if(regions.size()) {
StatusArray regionArr;
for(auto& r : regions) {
StatusObject regionObj;
StatusArray dcArr;
StatusObject dcObj;
dcObj["id"] = r.dcId.toString();
dcObj["priority"] = r.priority;
dcArr.push_back(dcObj);
if(r.satelliteTLogReplicationFactor == 1 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
regionObj["satellite_redundancy_mode"] = "one_satellite_single";
} else if(r.satelliteTLogReplicationFactor == 2 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
regionObj["satellite_redundancy_mode"] = "one_satellite_double";
} else if(r.satelliteTLogReplicationFactor == 3 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
regionObj["satellite_redundancy_mode"] = "one_satellite_triple";
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 1 && r.satelliteTLogReplicationFactorFallback == 2 && r.satelliteTLogWriteAntiQuorumFallback == 0) {
regionObj["satellite_redundancy_mode"] = "two_satellite_safe";
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 2 && r.satelliteTLogUsableDcsFallback == 1 && r.satelliteTLogReplicationFactorFallback == 2 && r.satelliteTLogWriteAntiQuorumFallback == 0) {
regionObj["satellite_redundancy_mode"] = "two_satellite_fast";
} else if(r.satelliteTLogReplicationFactor != 0) {
regionObj["satellite_log_replicas"] = r.satelliteTLogReplicationFactor;
regionObj["satellite_usable_dcs"] = r.satelliteTLogUsableDcs;
regionObj["satellite_anti_quorum"] = r.satelliteTLogWriteAntiQuorum;
if(r.satelliteTLogPolicy) regionObj["satellite_log_policy"] = r.satelliteTLogPolicy->info();
regionObj["satellite_log_replicas_fallback"] = r.satelliteTLogReplicationFactorFallback;
regionObj["satellite_usable_dcs_fallback"] = r.satelliteTLogUsableDcsFallback;
regionObj["satellite_anti_quorum_fallback"] = r.satelliteTLogWriteAntiQuorumFallback;
if(r.satelliteTLogPolicyFallback) regionObj["satellite_log_policy_fallback"] = r.satelliteTLogPolicyFallback->info();
}
if( r.satelliteDesiredTLogCount != -1 ) {
regionObj["satellite_logs"] = r.satelliteDesiredTLogCount;
}
if(r.satellites.size()) {
for(auto& s : r.satellites) {
StatusObject satObj;
satObj["id"] = s.dcId.toString();
satObj["priority"] = s.priority;
satObj["satellite"] = 1;
dcArr.push_back(satObj);
}
}
regionObj["datacenters"] = dcArr;
regionArr.push_back(regionObj);
}
result["regions"] = regionArr;
result["regions"] = getRegionJSON();
}
if( desiredTLogCount != -1 ) {
@ -351,6 +303,58 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
return result;
}
StatusArray DatabaseConfiguration::getRegionJSON() const {
StatusArray regionArr;
for(auto& r : regions) {
StatusObject regionObj;
StatusArray dcArr;
StatusObject dcObj;
dcObj["id"] = r.dcId.toString();
dcObj["priority"] = r.priority;
dcArr.push_back(dcObj);
if(r.satelliteTLogReplicationFactor == 1 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
regionObj["satellite_redundancy_mode"] = "one_satellite_single";
} else if(r.satelliteTLogReplicationFactor == 2 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
regionObj["satellite_redundancy_mode"] = "one_satellite_double";
} else if(r.satelliteTLogReplicationFactor == 3 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
regionObj["satellite_redundancy_mode"] = "one_satellite_triple";
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 1 && r.satelliteTLogReplicationFactorFallback == 2 && r.satelliteTLogWriteAntiQuorumFallback == 0) {
regionObj["satellite_redundancy_mode"] = "two_satellite_safe";
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 2 && r.satelliteTLogUsableDcsFallback == 1 && r.satelliteTLogReplicationFactorFallback == 2 && r.satelliteTLogWriteAntiQuorumFallback == 0) {
regionObj["satellite_redundancy_mode"] = "two_satellite_fast";
} else if(r.satelliteTLogReplicationFactor != 0) {
regionObj["satellite_log_replicas"] = r.satelliteTLogReplicationFactor;
regionObj["satellite_usable_dcs"] = r.satelliteTLogUsableDcs;
regionObj["satellite_anti_quorum"] = r.satelliteTLogWriteAntiQuorum;
if(r.satelliteTLogPolicy) regionObj["satellite_log_policy"] = r.satelliteTLogPolicy->info();
regionObj["satellite_log_replicas_fallback"] = r.satelliteTLogReplicationFactorFallback;
regionObj["satellite_usable_dcs_fallback"] = r.satelliteTLogUsableDcsFallback;
regionObj["satellite_anti_quorum_fallback"] = r.satelliteTLogWriteAntiQuorumFallback;
if(r.satelliteTLogPolicyFallback) regionObj["satellite_log_policy_fallback"] = r.satelliteTLogPolicyFallback->info();
}
if( r.satelliteDesiredTLogCount != -1 ) {
regionObj["satellite_logs"] = r.satelliteDesiredTLogCount;
}
if(r.satellites.size()) {
for(auto& s : r.satellites) {
StatusObject satObj;
satObj["id"] = s.dcId.toString();
satObj["priority"] = s.priority;
satObj["satellite"] = 1;
dcArr.push_back(satObj);
}
}
regionObj["datacenters"] = dcArr;
regionArr.push_back(regionObj);
}
return regionArr;
}
std::string DatabaseConfiguration::toString() const {
return json_spirit::write_string(json_spirit::mValue(toJSON()), json_spirit::Output_options::none);
}

View File

@ -90,6 +90,7 @@ struct DatabaseConfiguration {
std::string toString() const;
StatusObject toJSON(bool noPolicies = false) const;
StatusArray getRegionJSON() const;
RegionInfo getRegion( Optional<Key> dcId ) const {
if(!dcId.present()) {

View File

@ -1514,16 +1514,18 @@ ACTOR Future<Void> checkDatabaseLock( Reference<ReadYourWritesTransaction> tr, U
return Void();
}
ACTOR Future<Void> forceRecovery (Reference<ClusterConnectionFile> clusterFile) {
ACTOR Future<Void> forceRecovery( Reference<ClusterConnectionFile> clusterFile, Key dcId ) {
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);
while(!clusterInterface->get().present()) {
wait(clusterInterface->onChange());
loop {
choose {
when ( wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().forceRecovery.getReply( ForceRecoveryRequest(dcId) ) ) : Never() ) ) {
return Void();
}
when ( wait( clusterInterface->onChange() )) {}
}
}
ErrorOr<Void> _ = wait(clusterInterface->get().get().forceRecovery.tryGetReply( ForceRecoveryRequest() ));
return Void();
}
ACTOR Future<Void> waitForPrimaryDC( Database cx, StringRef dcId ) {

View File

@ -169,7 +169,7 @@ Future<Void> checkDatabaseLock( Reference<ReadYourWritesTransaction> const& tr,
Future<int> setDDMode( Database const& cx, int const& mode );
Future<Void> forceRecovery (Reference<ClusterConnectionFile> const& clusterFile);
Future<Void> forceRecovery( Reference<ClusterConnectionFile> const& clusterFile, Standalone<StringRef> const& dcId );
Future<Void> waitForPrimaryDC( Database const& cx, StringRef const& dcId );

View File

@ -90,7 +90,7 @@ public:
std::map<NetworkAddress, double> incompatibleConnections;
ClientVersionMap clientVersionMap;
std::map<NetworkAddress, std::string> traceLogGroupMap;
Promise<Void> forceMasterFailure;
AsyncTrigger forceMasterFailure;
int64_t masterRegistrationCount;
bool recoveryStalled;
bool forceRecovery;
@ -1077,21 +1077,20 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
cluster->masterProcessId = masterWorker.worker.first.locality.processId();
cluster->db.unfinishedRecoveries++;
ErrorOr<MasterInterface> newMaster = wait( masterWorker.worker.first.master.tryGetReply( rmq ) );
if (newMaster.present()) {
TraceEvent("CCWDB", cluster->id).detail("Recruited", newMaster.get().id());
state Future<ErrorOr<MasterInterface>> fNewMaster = masterWorker.worker.first.master.tryGetReply( rmq );
wait( ready(fNewMaster) || db->forceMasterFailure.onTrigger() );
if (fNewMaster.isReady() && fNewMaster.get().present()) {
TraceEvent("CCWDB", cluster->id).detail("Recruited", fNewMaster.get().get().id());
// for status tool
TraceEvent("RecruitedMasterWorker", cluster->id)
.detail("Address", newMaster.get().address())
.detail("Address", fNewMaster.get().get().address())
.trackLatest("RecruitedMasterWorker");
iMaster = newMaster.get();
iMaster = fNewMaster.get().get();
db->masterRegistrationCount = 0;
db->recoveryStalled = false;
db->forceRecovery = false;
db->forceMasterFailure = Promise<Void>();
auto dbInfo = ServerDBInfo();
dbInfo.master = iMaster;
@ -1103,7 +1102,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->serverInfo->set( dbInfo );
wait( delay(SERVER_KNOBS->MASTER_SPIN_DELAY) ); // Don't retry master recovery more than once per second, but don't delay the "first" recovery after more than a second of normal operation
state Future<Void> spinDelay = delay(SERVER_KNOBS->MASTER_SPIN_DELAY); // Don't retry master recovery more than once per second, but don't delay the "first" recovery after more than a second of normal operation
TraceEvent("CCWDB", cluster->id).detail("Watching", iMaster.id());
@ -1111,9 +1110,10 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
loop choose {
when (wait( waitFailureClient( iMaster.waitFailure, db->masterRegistrationCount ?
SERVER_KNOBS->MASTER_FAILURE_REACTION_TIME : (now() - recoveryStart) * SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY,
db->masterRegistrationCount ? -SERVER_KNOBS->MASTER_FAILURE_REACTION_TIME/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY : SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) || db->forceMasterFailure.getFuture() )) { break; }
db->masterRegistrationCount ? -SERVER_KNOBS->MASTER_FAILURE_REACTION_TIME/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY : SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) || db->forceMasterFailure.onTrigger() )) { break; }
when (wait( db->serverInfo->onChange() )) {}
}
wait(spinDelay);
TEST(true); // clusterWatchDatabase() master failed
TraceEvent(SevWarn,"DetectedFailedMaster", cluster->id).detail("OldMaster", iMaster.id());
@ -1279,10 +1279,8 @@ ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
self->checkRecoveryStalled();
if (self->betterMasterExists()) {
if (!self->db.forceMasterFailure.isSet()) {
self->db.forceMasterFailure.send( Void() );
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
}
self->db.forceMasterFailure.trigger();
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
}
} catch( Error &e ) {
if(e.code() != error_code_operation_failed && e.code() != error_code_no_more_servers) {
@ -2167,6 +2165,49 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
}
}
ACTOR Future<Void> doEmptyCommit(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.makeSelfConflicting();
wait(tr.commit());
return Void();
} catch( Error &e ) {
wait( tr.onError(e) );
}
}
}
ACTOR Future<Void> handleForcedRecoveries( ClusterControllerData *self, ClusterControllerFullInterface interf ) {
while(!self->clusterControllerProcessId.present()) {
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
loop {
state ForceRecoveryRequest req = waitNext( interf.clientInterface.forceRecovery.getFuture() );
TraceEvent("ForcedRecoveryStart", self->id).detail("ClusterControllerDcId", printable(self->clusterControllerDcId)).detail("DcId", req.dcId.printable());
state Future<Void> fCommit = doEmptyCommit(self->cx);
wait(fCommit || delay(5.0));
if(!fCommit.isReady() || fCommit.isError()) {
if(!self->clusterControllerDcId.present() || self->clusterControllerDcId != req.dcId) {
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.dcId);
dcPriority.push_back(self->clusterControllerDcId);
self->desiredDcIds.set(dcPriority);
} else {
self->db.forceRecovery = true;
self->db.forceMasterFailure.trigger();
}
wait(fCommit);
}
TraceEvent("ForcedRecoveryFinish", self->id);
self->db.forceRecovery = false;
req.reply.send(Void());
}
}
ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf, Future<Void> leaderFail, ServerCoordinators coordinators, LocalityData locality ) {
state ClusterControllerData self( interf, locality );
state Future<Void> coordinationPingDelay = delay( SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY );
@ -2185,6 +2226,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
addActor.send( updatedChangingDatacenters(&self) );
addActor.send( updatedChangedDatacenters(&self) );
addActor.send( updateDatacenterVersionDifference(&self) );
addActor.send( handleForcedRecoveries(&self, interf) );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
loop choose {
@ -2241,15 +2283,6 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
}
req.reply.send(workers);
}
when( ForceRecoveryRequest req = waitNext( interf.clientInterface.forceRecovery.getFuture() ) ) {
if(self.db.masterRegistrationCount == 0 || self.db.serverInfo->get().recoveryState <= RecoveryState::RECRUITING) {
if (!self.db.forceMasterFailure.isSet()) {
self.db.forceRecovery = true;
self.db.forceMasterFailure.send( Void() );
}
}
req.reply.send(Void());
}
when( wait( coordinationPingDelay ) ) {
CoordinationPingMessage message(self.id, step++);
for(auto& it : self.id_worker)

View File

@ -256,6 +256,10 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
hasConfiguration(false),
recruitmentStalled( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) )
{
if(forceRecovery && !myInterface.locality.dcId().present()) {
TraceEvent(SevError, "ForcedRecoveryRequiresDcID");
forceRecovery = false;
}
}
~MasterData() { if(txnStateStore) txnStateStore->close(); }
};
@ -767,6 +771,31 @@ ACTOR Future<Void> discardCommit(IKeyValueStore* store, LogSystemDiskQueueAdapte
return Void();
}
void updateConfigForForcedRecovery(Reference<MasterData> self, vector<Standalone<CommitTransactionRef>>* initialConfChanges) {
bool regionsChanged = false;
for(auto& it : self->configuration.regions) {
if(it.dcId == self->myInterface.locality.dcId().get() && it.priority < 0) {
it.priority = 1;
regionsChanged = true;
} else if(it.dcId != self->myInterface.locality.dcId().get() && it.priority >= 0) {
it.priority = -1;
regionsChanged = true;
}
}
Standalone<CommitTransactionRef> regionCommit;
regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "usable_regions", LiteralStringRef("1")));
self->configuration.applyMutation( regionCommit.mutations.back() );
if(regionsChanged) {
std::sort(self->configuration.regions.begin(), self->configuration.regions.end(), RegionInfo::sort_by_priority() );
StatusObject regionJSON;
regionJSON["regions"] = self->configuration.getRegionJSON();
regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "regions", BinaryWriter::toValue(regionJSON, IncludeVersion()).toString()));
self->configuration.applyMutation( regionCommit.mutations.back() );
TraceEvent("ForcedRecoveryConfigChange", self->dbgid).detail("Conf", self->configuration.toString());
}
initialConfChanges->push_back(regionCommit);
}
ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem, vector<StorageServerInterface>* seedServers, vector<Standalone<CommitTransactionRef>>* initialConfChanges ) {
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::reading_transaction_system_state)
@ -784,6 +813,10 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
}
}
if(self->forceRecovery) {
updateConfigForForcedRecovery(self, initialConfChanges);
}
debug_checkMaxRestoredVersion( UID(), self->lastEpochEnd, "DBRecovery" );
// Ordinarily we pass through this loop once and recover. We go around the loop if recovery stalls for more than a second,
@ -820,6 +853,9 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
} else {
initialConfChanges->push_back(req);
}
if(self->forceRecovery) {
updateConfigForForcedRecovery(self, initialConfChanges);
}
if(self->configuration != oldConf) { //confChange does not trigger when including servers
self->dcId_locality = originalLocalityMap;
@ -1194,6 +1230,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
.detail("Status", RecoveryStatus::names[RecoveryStatus::locking_coordinated_state])
.detail("TLogs", self->cstate.prevDBState.tLogs.size())
.detail("MyRecoveryCount", self->cstate.prevDBState.recoveryCount+2)
.detail("ForceRecovery", self->forceRecovery)
.trackLatest("MasterRecoveryState");
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );