Merge pull request #565 from etschannen/feature-remote-logs

Simulation did not permanently kill machines in most tests
This commit is contained in:
Alex Miller 2018-07-05 15:07:28 -07:00 committed by GitHub
commit bb2eb2fe53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 86 additions and 45 deletions

View File

@ -284,6 +284,9 @@
"initializing_transaction_servers", "initializing_transaction_servers",
"recovery_transaction", "recovery_transaction",
"writing_coordinated_state", "writing_coordinated_state",
"accepting_commits",
"all_logs_recruited",
"storage_recovered",
"fully_recovered" "fully_recovered"
] ]
}, },

View File

@ -735,7 +735,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
std::string description; std::string description;
if (recoveryState.get("name", name) && if (recoveryState.get("name", name) &&
recoveryState.get("description", description) && recoveryState.get("description", description) &&
name != "fully_recovered" && name != "remote_recovered") name != "accepting_commits" && name != "all_logs_recruited" && name != "storage_recovered" && name != "fully_recovered")
{ {
fatalRecoveryState = true; fatalRecoveryState = true;

View File

@ -377,8 +377,9 @@ StatusObject getClientDatabaseStatus(StatusObjectReader client, StatusObjectRead
try { try {
// Lots of the JSON reads in this code could throw, and that's OK, isAvailable and isHealthy will be // Lots of the JSON reads in this code could throw, and that's OK, isAvailable and isHealthy will be
// at the states we want them to be in (currently) // at the states we want them to be in (currently)
std::string recoveryStateName = cluster.at("recovery_state.name").get_str();
isAvailable = client.at("coordinators.quorum_reachable").get_bool() isAvailable = client.at("coordinators.quorum_reachable").get_bool()
&& ( cluster.at("recovery_state.name") == "fully_recovered" || "remote_recovered" ) && ( recoveryStateName == "accepting_commits" || recoveryStateName == "all_logs_recruited" || recoveryStateName == "storage_recovered" || recoveryStateName == "fully_recovered" )
&& cluster.at("database_available").get_bool(); && cluster.at("database_available").get_bool();
if (isAvailable) if (isAvailable)

View File

@ -1117,7 +1117,7 @@ public:
bool remoteSatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteSatelliteProcessesDead, satelliteTLogPolicy, remoteSatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : remoteSatelliteProcessesDead.validate(satelliteTLogPolicy); bool remoteSatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteSatelliteProcessesDead, satelliteTLogPolicy, remoteSatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : remoteSatelliteProcessesDead.validate(satelliteTLogPolicy);
notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy); notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
if(usableRegions > 1) { if(usableRegions > 1 && allowLogSetKills) {
tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) || ( primaryTLogsDead && remoteTLogsDead ) || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) ); tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) || ( primaryTLogsDead && remoteTLogsDead ) || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
} else { } else {
tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy); tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy);
@ -1266,7 +1266,7 @@ public:
processesDead.push_back(processInfo); processesDead.push_back(processInfo);
excluded++; excluded++;
} }
else if (!processInfo->isCleared()) { else if (processInfo->isCleared()) {
processesDead.push_back(processInfo); processesDead.push_back(processInfo);
cleared++; cleared++;
} }

View File

@ -34,7 +34,7 @@ enum ClogMode { ClogDefault, ClogAll, ClogSend, ClogReceive };
class ISimulator : public INetwork { class ISimulator : public INetwork {
public: public:
ISimulator() : desiredCoordinators(1), physicalDatacenters(1), processesPerMachine(0), isStopped(false), lastConnectionFailure(0), connectionFailuresDisableDuration(0), speedUpSimulation(false), allSwapsDisabled(false), backupAgents(WaitForType), drAgents(WaitForType), extraDB(NULL) {} ISimulator() : desiredCoordinators(1), physicalDatacenters(1), processesPerMachine(0), isStopped(false), lastConnectionFailure(0), connectionFailuresDisableDuration(0), speedUpSimulation(false), allSwapsDisabled(false), backupAgents(WaitForType), drAgents(WaitForType), extraDB(NULL), allowLogSetKills(true) {}
// Order matters! // Order matters!
enum KillType { KillInstantly, InjectFaults, RebootAndDelete, RebootProcessAndDelete, Reboot, RebootProcess, None }; enum KillType { KillInstantly, InjectFaults, RebootAndDelete, RebootProcessAndDelete, Reboot, RebootProcess, None };
@ -77,8 +77,8 @@ public:
bool isReliable() const { return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0; } bool isReliable() const { return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0; }
bool isAvailable() const { return !isExcluded() && isReliable(); } bool isAvailable() const { return !isExcluded() && isReliable(); }
bool isExcluded() const { return !excluded; } bool isExcluded() const { return excluded; }
bool isCleared() const { return !cleared; } bool isCleared() const { return cleared; }
// Returns true if the class represents an acceptable worker // Returns true if the class represents an acceptable worker
bool isAvailableClass() const { bool isAvailableClass() const {
@ -282,6 +282,7 @@ public:
Optional<Standalone<StringRef>> primaryDcId; Optional<Standalone<StringRef>> primaryDcId;
IRepPolicyRef remoteTLogPolicy; IRepPolicyRef remoteTLogPolicy;
int32_t usableRegions; int32_t usableRegions;
bool allowLogSetKills;
Optional<Standalone<StringRef>> remoteDcId; Optional<Standalone<StringRef>> remoteDcId;
bool hasSatelliteReplication; bool hasSatelliteReplication;
IRepPolicyRef satelliteTLogPolicy; IRepPolicyRef satelliteTLogPolicy;

View File

@ -791,7 +791,7 @@ public:
bool betterMasterExists() { bool betterMasterExists() {
ServerDBInfo dbi = db.serverInfo->get(); ServerDBInfo dbi = db.serverInfo->get();
if(dbi.recoveryState < RecoveryState::FULLY_RECOVERED) { if(dbi.recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return false; return false;
} }
@ -923,14 +923,14 @@ public:
return false; return false;
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog); RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog);
RoleFitness newRemoteTLogFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC) : remote_tlogs, ProcessClass::TLog); RoleFitness newRemoteTLogFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) ? getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC) : remote_tlogs, ProcessClass::TLog);
if(oldRemoteTLogFit < newRemoteTLogFit) return false; if(oldRemoteTLogFit < newRemoteTLogFit) return false;
int oldRouterCount = oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,oldTLogFit.count)); int oldRouterCount = oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,oldTLogFit.count));
int newRouterCount = newTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,newTLogFit.count)); int newRouterCount = newTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,newTLogFit.count));
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter); RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter);
RoleFitness newLogRoutersFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForRoleInDatacenter( *remoteDC.begin(), ProcessClass::LogRouter, newRouterCount, db.config, id_used, Optional<WorkerFitnessInfo>(), true ) : log_routers, ProcessClass::LogRouter); RoleFitness newLogRoutersFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) ? getWorkersForRoleInDatacenter( *remoteDC.begin(), ProcessClass::LogRouter, newRouterCount, db.config, id_used, Optional<WorkerFitnessInfo>(), true ) : log_routers, ProcessClass::LogRouter);
if(oldLogRoutersFit.count < oldRouterCount) { if(oldLogRoutersFit.count < oldRouterCount) {
oldLogRoutersFit.worstFit = ProcessClass::NeverAssign; oldLogRoutersFit.worstFit = ProcessClass::NeverAssign;
@ -1555,7 +1555,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
if ( req.configuration.present() ) { if ( req.configuration.present() ) {
db->config = req.configuration.get(); db->config = req.configuration.get();
if ( req.recoveryState >= RecoveryState::FULLY_RECOVERED ) { if ( req.recoveryState >= RecoveryState::ACCEPTING_COMMITS ) {
self->gotFullyRecoveredConfig = true; self->gotFullyRecoveredConfig = true;
db->fullyRecoveredConfig = req.configuration.get(); db->fullyRecoveredConfig = req.configuration.get();
for ( auto& it : self->id_worker ) { for ( auto& it : self->id_worker ) {
@ -2049,7 +2049,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state double lastLogTime = 0; state double lastLogTime = 0;
loop { loop {
self->versionDifferenceUpdated = false; self->versionDifferenceUpdated = false;
if(self->db.serverInfo->get().recoveryState >= RecoveryState::FULLY_RECOVERED && self->db.config.usableRegions == 1) { if(self->db.serverInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) {
self->versionDifferenceUpdated = true; self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = 0; self->datacenterVersionDifference = 0;
Void _ = wait(self->db.serverInfo->onChange()); Void _ = wait(self->db.serverInfo->onChange());
@ -2058,7 +2058,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state Optional<TLogInterface> primaryLog; state Optional<TLogInterface> primaryLog;
state Optional<TLogInterface> remoteLog; state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().recoveryState == RecoveryState::REMOTE_RECOVERED) { if(self->db.serverInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) { for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) { if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
for(auto& tLog : logSet.tLogs) { for(auto& tLog : logSet.tLogs) {
@ -2086,8 +2086,8 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state Future<Void> onChange = self->db.serverInfo->onChange(); state Future<Void> onChange = self->db.serverInfo->onChange();
loop { loop {
state Future<TLogQueuingMetricsReply> primaryMetrics = primaryLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ); state Future<TLogQueuingMetricsReply> primaryMetrics = brokenPromiseToNever( primaryLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ) );
state Future<TLogQueuingMetricsReply> remoteMetrics = remoteLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ); state Future<TLogQueuingMetricsReply> remoteMetrics = brokenPromiseToNever( remoteLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ) );
Void _ = wait( ( success(primaryMetrics) && success(remoteMetrics) ) || onChange ); Void _ = wait( ( success(primaryMetrics) && success(remoteMetrics) ) || onChange );
if(onChange.isReady()) { if(onChange.isReady()) {

View File

@ -366,7 +366,7 @@ ACTOR Future<Void> logRouterCore(
loop choose { loop choose {
when( Void _ = wait( dbInfoChange ) ) { when( Void _ = wait( dbInfoChange ) ) {
dbInfoChange = db->onChange(); dbInfoChange = db->onChange();
logRouterData.allowPops = db->get().recoveryState == RecoveryState::REMOTE_RECOVERED; logRouterData.allowPops = db->get().recoveryState == RecoveryState::FULLY_RECOVERED;
logRouterData.logSystem->set(ILogSystem::fromServerDBInfo( logRouterData.dbgid, db->get(), true )); logRouterData.logSystem->set(ILogSystem::fromServerDBInfo( logRouterData.dbgid, db->get(), true ));
} }
when( TLogPeekRequest req = waitNext( interf.peekMessages.getFuture() ) ) { when( TLogPeekRequest req = waitNext( interf.peekMessages.getFuture() ) ) {
@ -381,7 +381,7 @@ ACTOR Future<Void> logRouterCore(
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, uint64_t recoveryCount, TLogInterface myInterface) { ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, uint64_t recoveryCount, TLogInterface myInterface) {
loop{ loop{
bool isDisplaced = ( (db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED) || (db->get().recoveryCount == recoveryCount && db->get().recoveryState == RecoveryState::REMOTE_RECOVERED) ); bool isDisplaced = ( (db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED) || (db->get().recoveryCount == recoveryCount && db->get().recoveryState == RecoveryState::FULLY_RECOVERED) );
if(isDisplaced) { if(isDisplaced) {
for(auto& log : db->get().logSystemConfig.tLogs) { for(auto& log : db->get().logSystemConfig.tLogs) {
if( std::count( log.logRouters.begin(), log.logRouters.end(), myInterface.id() ) ) { if( std::count( log.logRouters.begin(), log.logRouters.end(), myInterface.id() ) ) {

View File

@ -161,7 +161,18 @@ public:
minUsed = std::min(minUsed, i); minUsed = std::min(minUsed, i);
maxUsed = std::max(maxUsed, i); maxUsed = std::max(maxUsed, i);
} }
TraceEvent(((maxUsed - minUsed > 1) || (maxUsedBest - minUsedBest > 1)) ? (g_network->isSimulated() ? SevError : SevWarnAlways) : SevInfo, "CheckSatelliteTagLocations").detail("MinUsed", minUsed).detail("MaxUsed", maxUsed).detail("MinUsedBest", minUsedBest).detail("MaxUsedBest", maxUsedBest);
bool foundDuplicate = false;
std::set<Optional<Key>> zones;
for(auto& loc : tLogLocalities) {
if(zones.count(loc.zoneId())) {
foundDuplicate = true;
break;
}
zones.insert(loc.zoneId());
}
TraceEvent(((maxUsed - minUsed > 1) || (maxUsedBest - minUsedBest > 1)) ? (g_network->isSimulated() && !foundDuplicate ? SevError : SevWarnAlways) : SevInfo, "CheckSatelliteTagLocations").detail("MinUsed", minUsed).detail("MaxUsed", maxUsed).detail("MinUsedBest", minUsedBest).detail("MaxUsedBest", maxUsedBest).detail("DuplicateZones", foundDuplicate);
} }
int bestLocationFor( Tag tag ) { int bestLocationFor( Tag tag ) {

View File

@ -974,7 +974,7 @@ ACTOR static Future<Void> transactionStarter(
otherProxies.push_back(mp); otherProxies.push_back(mp);
} }
ASSERT(db->get().recoveryState >= RecoveryState::FULLY_RECOVERED); // else potentially we could return uncommitted read versions (since self->committedVersion is only a committed version if this recovery succeeds) ASSERT(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS); // else potentially we could return uncommitted read versions (since self->committedVersion is only a committed version if this recovery succeeds)
TraceEvent("ProxyReadyForTxnStarts", proxy.id()); TraceEvent("ProxyReadyForTxnStarts", proxy.id());
@ -1247,7 +1247,7 @@ ACTOR Future<Void> masterProxyServerCore(
const vector<CommitTransactionRequest> &trs = batchedRequests.first; const vector<CommitTransactionRequest> &trs = batchedRequests.first;
int batchBytes = batchedRequests.second; int batchBytes = batchedRequests.second;
//TraceEvent("MasterProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount); //TraceEvent("MasterProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);
if (trs.size() || (db->get().recoveryState >= RecoveryState::FULLY_RECOVERED && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) { if (trs.size() || (db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) {
lastCommit = now(); lastCommit = now();
if (trs.size() || lastCommitComplete.isReady()) { if (trs.size() || lastCommitComplete.isReady()) {

View File

@ -286,11 +286,11 @@ ACTOR Future<bool> getStorageServersRecruiting( Database cx, Reference<AsyncVar<
ACTOR Future<Void> reconfigureAfter(Database cx, double time, Reference<AsyncVar<ServerDBInfo>> dbInfo) { ACTOR Future<Void> reconfigureAfter(Database cx, double time, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
Void _ = wait( delay(time) ); Void _ = wait( delay(time) );
if(g_network->isSimulated()) { if(g_network->isSimulated() && g_simulator.allowLogSetKills) {
TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration"); TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration");
g_simulator.usableRegions = 1; g_simulator.usableRegions = 1;
ConfigurationResult::Type _ = wait( changeConfig( cx, "repopulate_anti_quorum=1" ) ); ConfigurationResult::Type _ = wait( changeConfig( cx, "repopulate_anti_quorum=1" ) );
while( dbInfo->get().recoveryState < RecoveryState::REMOTE_RECOVERED ) { while( dbInfo->get().recoveryState < RecoveryState::STORAGE_RECOVERED ) {
Void _ = wait( dbInfo->onChange() ); Void _ = wait( dbInfo->onChange() );
} }
ConfigurationResult::Type _ = wait( changeConfig( cx, "usable_regions=1" ) ); ConfigurationResult::Type _ = wait( changeConfig( cx, "usable_regions=1" ) );

View File

@ -27,7 +27,7 @@
// RecoveryState and RecoveryStatus should probably be merged. The former is passed through ServerDBInfo and used for "real" decisions in the system; the latter // RecoveryState and RecoveryStatus should probably be merged. The former is passed through ServerDBInfo and used for "real" decisions in the system; the latter
// is slightly more detailed and is used by the status infrastructure. But I'm scared to make changes to the former so close to 1.0 release, so I'm making the latter. // is slightly more detailed and is used by the status infrastructure. But I'm scared to make changes to the former so close to 1.0 release, so I'm making the latter.
enum class RecoveryState { UNINITIALIZED = 0, READING_CSTATE = 1, LOCKING_CSTATE = 2, RECRUITING = 3, RECOVERY_TRANSACTION = 4, WRITING_CSTATE = 5, FULLY_RECOVERED = 6, REMOTE_RECOVERED = 7 }; enum class RecoveryState { UNINITIALIZED = 0, READING_CSTATE = 1, LOCKING_CSTATE = 2, RECRUITING = 3, RECOVERY_TRANSACTION = 4, WRITING_CSTATE = 5, ACCEPTING_COMMITS = 6, ALL_LOGS_RECRUITED = 7, STORAGE_RECOVERED = 8, FULLY_RECOVERED = 9 };
BINARY_SERIALIZABLE( RecoveryState ); BINARY_SERIALIZABLE( RecoveryState );
namespace RecoveryStatus { namespace RecoveryStatus {
@ -43,8 +43,10 @@ namespace RecoveryStatus {
initializing_transaction_servers, initializing_transaction_servers,
recovery_transaction, recovery_transaction,
writing_coordinated_state, writing_coordinated_state,
accepting_commits,
all_logs_recruited,
storage_recovered,
fully_recovered, fully_recovered,
remote_recovered,
END END
}; };

View File

@ -37,7 +37,7 @@ const char* RecoveryStatus::names[] = {
"reading_coordinated_state", "locking_coordinated_state", "locking_old_transaction_servers", "reading_transaction_system_state", "reading_coordinated_state", "locking_coordinated_state", "locking_old_transaction_servers", "reading_transaction_system_state",
"configuration_missing", "configuration_never_created", "configuration_invalid", "configuration_missing", "configuration_never_created", "configuration_invalid",
"recruiting_transaction_servers", "initializing_transaction_servers", "recovery_transaction", "recruiting_transaction_servers", "initializing_transaction_servers", "recovery_transaction",
"writing_coordinated_state", "fully_recovered", "remote_recovered" "writing_coordinated_state", "accepting_commits", "all_logs_recruited", "storage_recovered", "fully_recovered"
}; };
static_assert( sizeof(RecoveryStatus::names) == sizeof(RecoveryStatus::names[0])*RecoveryStatus::END, "RecoveryStatus::names[] size" ); static_assert( sizeof(RecoveryStatus::names) == sizeof(RecoveryStatus::names[0])*RecoveryStatus::END, "RecoveryStatus::names[] size" );
const char* RecoveryStatus::descriptions[] = { const char* RecoveryStatus::descriptions[] = {
@ -63,10 +63,14 @@ const char* RecoveryStatus::descriptions[] = {
"Performing recovery transaction.", "Performing recovery transaction.",
// writing_coordinated_state // writing_coordinated_state
"Writing coordinated state. Verify that a majority of coordination server processes are active.", "Writing coordinated state. Verify that a majority of coordination server processes are active.",
// accepting_commits
"Accepting commits.",
// all_logs_recruited
"Accepting commits. All logs recruited.",
// storage_recovered
"Accepting commits. All storage servers are reading from the new logs.",
// fully_recovered // fully_recovered
"Recovery complete.", "Recovery complete."
// remote_recovered
"Remote recovery complete."
}; };
static_assert( sizeof(RecoveryStatus::descriptions) == sizeof(RecoveryStatus::descriptions[0])*RecoveryStatus::END, "RecoveryStatus::descriptions[] size" ); static_assert( sizeof(RecoveryStatus::descriptions) == sizeof(RecoveryStatus::descriptions[0])*RecoveryStatus::END, "RecoveryStatus::descriptions[] size" );
@ -1396,7 +1400,7 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
static StatusArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) { static StatusArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) {
StatusArray oldTlogsArray; StatusArray oldTlogsArray;
if(db->get().recoveryState >= RecoveryState::FULLY_RECOVERED) { if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
for(auto it : db->get().logSystemConfig.oldTLogs) { for(auto it : db->get().logSystemConfig.oldTLogs) {
StatusObject statusObj; StatusObject statusObj;
StatusArray logsObj; StatusArray logsObj;
@ -1780,7 +1784,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state std::vector<StatusObject> workerStatuses = wait(getAll(futures2)); state std::vector<StatusObject> workerStatuses = wait(getAll(futures2));
int oldLogFaultTolerance = 100; int oldLogFaultTolerance = 100;
if(db->get().recoveryState >= RecoveryState::FULLY_RECOVERED && db->get().logSystemConfig.oldTLogs.size() > 0) { if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && db->get().logSystemConfig.oldTLogs.size() > 0) {
statusObj["old_logs"] = oldTlogFetcher(&oldLogFaultTolerance, db, address_workers); statusObj["old_logs"] = oldTlogFetcher(&oldLogFaultTolerance, db, address_workers);
} }

View File

@ -1272,7 +1272,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
if(isPrimary) { if(isPrimary) {
isDisplaced = isDisplaced && inf.recoveryCount >= recoveryCount && inf.recoveryState != RecoveryState::UNINITIALIZED; isDisplaced = isDisplaced && inf.recoveryCount >= recoveryCount && inf.recoveryState != RecoveryState::UNINITIALIZED;
} else { } else {
isDisplaced = isDisplaced && ( ( inf.recoveryCount > recoveryCount && inf.recoveryState != RecoveryState::UNINITIALIZED ) || ( inf.recoveryCount == recoveryCount && inf.recoveryState == RecoveryState::REMOTE_RECOVERED ) ); isDisplaced = isDisplaced && ( ( inf.recoveryCount > recoveryCount && inf.recoveryState != RecoveryState::UNINITIALIZED ) || ( inf.recoveryCount == recoveryCount && inf.recoveryState == RecoveryState::FULLY_RECOVERED ) );
} }
if(isDisplaced) { if(isDisplaced) {
for(auto& log : inf.logSystemConfig.tLogs) { for(auto& log : inf.logSystemConfig.tLogs) {
@ -1383,7 +1383,7 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
when( Void _ = wait( dbInfoChange ) ) { when( Void _ = wait( dbInfoChange ) ) {
dbInfoChange = self->dbInfo->onChange(); dbInfoChange = self->dbInfo->onChange();
bool found = false; bool found = false;
if(self->dbInfo->get().recoveryState >= RecoveryState::FULLY_RECOVERED) { if(self->dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
for(auto& logs : self->dbInfo->get().logSystemConfig.tLogs) { for(auto& logs : self->dbInfo->get().logSystemConfig.tLogs) {
if( std::count( logs.tLogs.begin(), logs.tLogs.end(), logData->logId ) ) { if( std::count( logs.tLogs.begin(), logs.tLogs.end(), logData->logId ) ) {
found = true; found = true;
@ -1871,7 +1871,7 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, false, true )); logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, false, true ));
found = true; found = true;
} }
else if( self->dbInfo->get().recoveryState >= RecoveryState::FULLY_RECOVERED ) { else if( self->dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS ) {
logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, true )); logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, true ));
found = true; found = true;
} }

View File

@ -1062,10 +1062,22 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
} }
if( finalUpdate ) { if( finalUpdate ) {
self->recoveryState = RecoveryState::REMOTE_RECOVERED; self->recoveryState = RecoveryState::FULLY_RECOVERED;
TraceEvent("MasterRecoveryState", self->dbgid) TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::remote_recovered) .detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::remote_recovered]) .detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
} else if( !newState.oldTLogData.size() && self->recoveryState < RecoveryState::STORAGE_RECOVERED ) {
self->recoveryState = RecoveryState::STORAGE_RECOVERED;
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::storage_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::storage_recovered])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
} else if( allLogs && self->recoveryState < RecoveryState::ALL_LOGS_RECRUITED ) {
self->recoveryState = RecoveryState::ALL_LOGS_RECRUITED;
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::all_logs_recruited)
.detail("Status", RecoveryStatus::names[RecoveryStatus::all_logs_recruited])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str()); .trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
} }
@ -1099,6 +1111,10 @@ ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
DatabaseConfiguration conf; DatabaseConfiguration conf;
conf.fromKeyValues((VectorRef<KeyValueRef>) results); conf.fromKeyValues((VectorRef<KeyValueRef>) results);
if(conf != self->configuration) { if(conf != self->configuration) {
if(self->recoveryState != RecoveryState::ALL_LOGS_RECRUITED && self->recoveryState != RecoveryState::FULLY_RECOVERED) {
throw master_recovery_failed();
}
self->configuration = conf; self->configuration = conf;
self->registrationTrigger.trigger(); self->registrationTrigger.trigger();
} }
@ -1268,7 +1284,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
TraceEvent(recoveryInterval.end(), self->dbgid).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion); TraceEvent(recoveryInterval.end(), self->dbgid).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);
self->recoveryState = RecoveryState::FULLY_RECOVERED; self->recoveryState = RecoveryState::ACCEPTING_COMMITS;
double recoveryDuration = now() - recoverStartTime; double recoveryDuration = now() - recoverStartTime;
TraceEvent((recoveryDuration > 4 && !g_network->isSimulated()) ? SevWarnAlways : SevInfo, "MasterRecoveryDuration", self->dbgid) TraceEvent((recoveryDuration > 4 && !g_network->isSimulated()) ? SevWarnAlways : SevInfo, "MasterRecoveryDuration", self->dbgid)
@ -1276,8 +1292,8 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
.trackLatest("MasterRecoveryDuration"); .trackLatest("MasterRecoveryDuration");
TraceEvent("MasterRecoveryState", self->dbgid) TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::fully_recovered) .detail("StatusCode", RecoveryStatus::accepting_commits)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered]) .detail("Status", RecoveryStatus::names[RecoveryStatus::accepting_commits])
.detail("StoreType", self->configuration.storageServerStoreType) .detail("StoreType", self->configuration.storageServerStoreType)
.detail("RecoveryDuration", recoveryDuration) .detail("RecoveryDuration", recoveryDuration)
.trackLatest("MasterRecoveryState"); .trackLatest("MasterRecoveryState");

View File

@ -3188,7 +3188,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
when( Void _ = wait( dbInfoChange ) ) { when( Void _ = wait( dbInfoChange ) ) {
TEST( self->logSystem ); // shardServer dbInfo changed TEST( self->logSystem ); // shardServer dbInfo changed
dbInfoChange = self->db->onChange(); dbInfoChange = self->db->onChange();
if( self->db->get().recoveryState >= RecoveryState::FULLY_RECOVERED ) { if( self->db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS ) {
self->logSystem = ILogSystem::fromServerDBInfo( self->thisServerID, self->db->get() ); self->logSystem = ILogSystem::fromServerDBInfo( self->thisServerID, self->db->get() );
if (self->logSystem) { if (self->logSystem) {
if(self->db->get().logSystemConfig.recoveredAt.present()) { if(self->db->get().logSystemConfig.recoveredAt.present()) {

View File

@ -53,6 +53,9 @@ struct RemoveServersSafelyWorkload : TestWorkload {
kill1Timeout = getOption( options, LiteralStringRef("kill1Timeout"), 60.0 ); kill1Timeout = getOption( options, LiteralStringRef("kill1Timeout"), 60.0 );
kill2Timeout = getOption( options, LiteralStringRef("kill2Timeout"), 6000.0 ); kill2Timeout = getOption( options, LiteralStringRef("kill2Timeout"), 6000.0 );
killProcesses = g_random->random01() < 0.5; killProcesses = g_random->random01() < 0.5;
if(g_network->isSimulated()) {
g_simulator.allowLogSetKills = false;
}
} }
virtual std::string description() { return "RemoveServersSafelyWorkload"; } virtual std::string description() { return "RemoveServersSafelyWorkload"; }
@ -231,7 +234,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
for (auto processInfo : getServers()) { for (auto processInfo : getServers()) {
auto processNet = AddressExclusion(processInfo->address.ip, processInfo->address.port); auto processNet = AddressExclusion(processInfo->address.ip, processInfo->address.port);
// Mark all of the unavailable as dead // Mark all of the unavailable as dead
if (!processInfo->isAvailable()) if (!processInfo->isAvailable() || processInfo->isCleared())
processesDead.push_back(processInfo); processesDead.push_back(processInfo);
// Save all processes not specified within set // Save all processes not specified within set
else if (killAddrs.find(processNet) == killAddrs.end()) else if (killAddrs.find(processNet) == killAddrs.end())

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long