diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 21543f417f..71a5f0cc5a 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -1106,17 +1106,23 @@ public: } if(!hasSatelliteReplication) { - notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy); if(usableRegions > 1) { tooManyDead = primaryTLogsDead || remoteTLogsDead || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) ); + notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy); } else { tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy); + notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy); } } else { bool primarySatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, primarySatelliteProcessesDead, satelliteTLogPolicy, primarySatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : primarySatelliteProcessesDead.validate(satelliteTLogPolicy); bool remoteSatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteSatelliteProcessesDead, satelliteTLogPolicy, remoteSatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : remoteSatelliteProcessesDead.validate(satelliteTLogPolicy); - notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy); + if(usableRegions > 1) { + notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy); + } else { + notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy); + } + if(usableRegions > 1 && allowLogSetKills) { tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) || ( primaryTLogsDead && remoteTLogsDead ) || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) ); } else { @@ -1257,33 +1263,31 @@ public: std::vector processesLeft, processesDead; int protectedWorker = 0, unavailable = 0, excluded = 0, cleared = 0; - for (auto machineRec : machines) { - for (auto processInfo : machineRec.second.processes) { - // Add non-test processes (ie. datahall is not be set for test processes) - if (processInfo->isAvailableClass()) { - // Do not include any excluded machines - if (processInfo->isExcluded()) { - processesDead.push_back(processInfo); - excluded++; - } - else if (processInfo->isCleared()) { - processesDead.push_back(processInfo); - cleared++; - } - else if (!processInfo->isAvailable()) { - processesDead.push_back(processInfo); - unavailable++; - } - else if (protectedAddresses.count(processInfo->address)) { - processesLeft.push_back(processInfo); - protectedWorker++; - } - else if (machineRec.second.zoneId != zoneId) - processesLeft.push_back(processInfo); - // Add processes from dead machines and datacenter machines to dead group - else - processesDead.push_back(processInfo); + for (auto processInfo : getAllProcesses()) { + // Add non-test processes (ie. datahall is not be set for test processes) + if (processInfo->isAvailableClass()) { + // Do not include any excluded machines + if (processInfo->isExcluded()) { + processesDead.push_back(processInfo); + excluded++; } + else if (processInfo->isCleared()) { + processesDead.push_back(processInfo); + cleared++; + } + else if (!processInfo->isAvailable()) { + processesDead.push_back(processInfo); + unavailable++; + } + else if (protectedAddresses.count(processInfo->address)) { + processesLeft.push_back(processInfo); + protectedWorker++; + } + else if (processInfo->locality.zoneId() != zoneId) + processesLeft.push_back(processInfo); + // Add processes from dead machines and datacenter machines to dead group + else + processesDead.push_back(processInfo); } } if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) { @@ -1380,25 +1384,23 @@ public: if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) { std::vector processesLeft, processesDead; - for (auto machineRec : machines) { - for (auto processInfo : machineRec.second.processes) { - // Add non-test processes (ie. datahall is not be set for test processes) - if (processInfo->isAvailableClass()) { - // Mark all of the unavailable as dead - if (processInfo->isExcluded()) - processesDead.push_back(processInfo); - else if (processInfo->isCleared()) - processesDead.push_back(processInfo); - else if (!processInfo->isAvailable()) - processesDead.push_back(processInfo); - else if (protectedAddresses.count(processInfo->address)) - processesLeft.push_back(processInfo); - // Keep all not in the datacenter zones - else if (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end()) - processesLeft.push_back(processInfo); - else - processesDead.push_back(processInfo); - } + for (auto processInfo : getAllProcesses()) { + // Add non-test processes (ie. datahall is not be set for test processes) + if (processInfo->isAvailableClass()) { + // Mark all of the unavailable as dead + if (processInfo->isExcluded()) + processesDead.push_back(processInfo); + else if (processInfo->isCleared()) + processesDead.push_back(processInfo); + else if (!processInfo->isAvailable()) + processesDead.push_back(processInfo); + else if (protectedAddresses.count(processInfo->address)) + processesLeft.push_back(processInfo); + // Keep all not in the datacenter zones + else if (datacenterZones.find(processInfo->locality.zoneId()) == datacenterZones.end()) + processesLeft.push_back(processInfo); + else + processesDead.push_back(processInfo); } } @@ -1473,8 +1475,12 @@ public: } virtual std::vector getAllProcesses() const { std::vector processes; - for( auto c = machines.begin(); c != machines.end(); ++c ) - processes.insert( processes.end(), c->second.processes.begin(), c->second.processes.end() ); + for( auto& c : machines ) { + processes.insert( processes.end(), c.second.processes.begin(), c.second.processes.end() ); + } + for( auto& c : currentlyRebootingProcesses ) { + processes.push_back( c.second ); + } return processes; } virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) { diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index bed0ea6a42..959074a73e 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -102,8 +102,8 @@ public: inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; }; std::string toString() const { - return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s coord: %s data: %s excluded: %d cleared: %d", - name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), coordinationFolder, dataFolder, excluded, cleared); } + return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s excluded: %d cleared: %d", + name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), excluded, cleared); } // Members not for external use Promise shutdownSignal; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 3e6364aa94..1a9446deba 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -604,6 +604,11 @@ public: if(regions[0].priority == regions[1].priority && clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get()) { std::swap(regions[0], regions[1]); } + + if(clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get() && (!versionDifferenceUpdated || datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE)) { + std::swap(regions[0], regions[1]); + } + bool setPrimaryDesired = false; try { auto reply = findWorkersForConfiguration(req, regions[0].dcId); @@ -2050,8 +2055,14 @@ ACTOR Future updateDatacenterVersionDifference( ClusterControllerData *sel loop { self->versionDifferenceUpdated = false; if(self->db.serverInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) { + bool oldDifferenceTooLarge = !self->versionDifferenceUpdated || self->datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE; self->versionDifferenceUpdated = true; self->datacenterVersionDifference = 0; + + if(oldDifferenceTooLarge) { + checkOutstandingRequests(self); + } + Void _ = wait(self->db.serverInfo->onChange()); continue; } @@ -2094,8 +2105,14 @@ ACTOR Future updateDatacenterVersionDifference( ClusterControllerData *sel break; } + bool oldDifferenceTooLarge = !self->versionDifferenceUpdated || self->datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE; self->versionDifferenceUpdated = true; self->datacenterVersionDifference = primaryMetrics.get().v - remoteMetrics.get().v; + + if(oldDifferenceTooLarge && self->datacenterVersionDifference < SERVER_KNOBS->MAX_VERSION_DIFFERENCE) { + checkOutstandingRequests(self); + } + if(now() - lastLogTime > SERVER_KNOBS->CLUSTER_CONTROLLER_LOGGING_DELAY) { lastLogTime = now(); TraceEvent("DatacenterVersionDifference", self->id).detail("Difference", self->datacenterVersionDifference); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index e6fc79709e..fc920ed202 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -379,6 +379,9 @@ struct LogData : NonCopyable, public ReferenceCounted { //only callable after getTagData returns a null reference Reference createTagData(Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered) { + if(tag.locality != tagLocalityLogRouter && allTags.size() && !allTags.count(tag) && popped < recoveredAt) { + popped = recoveredAt; + } Reference newTagData = Reference( new TagData(tag, popped, nothingPersistent, poppedRecently, unpoppedRecovered) ); int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality); tag_data[idx][tag.id] = newTagData; @@ -406,10 +409,11 @@ struct LogData : NonCopyable, public ReferenceCounted { Version logRouterPoppedVersion, logRouterPopToVersion; int8_t locality; UID recruitmentID; + std::set allTags; - explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID) : tLogData(tLogData), knownCommittedVersion(1), logId(interf.id()), + explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, std::vector tags) : tLogData(tLogData), knownCommittedVersion(1), logId(interf.id()), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID), - logSystem(new AsyncVar>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), + logSystem(new AsyncVar>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), // These are initialized differently on init() or recovery recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0), logRouterPopToVersion(0), locality(tagLocalityInvalid) @@ -1242,7 +1246,7 @@ ACTOR Future tLogCommit( return Void(); } -ACTOR Future initPersistentState( TLogData* self, Reference logData, std::vector allTags ) { +ACTOR Future initPersistentState( TLogData* self, Reference logData ) { // PERSIST: Initial setup of persistentData for a brand new tLog for a new database IKeyValueStore *storage = self->persistentData; storage->set( persistFormat ); @@ -1252,7 +1256,7 @@ ACTOR Future initPersistentState( TLogData* self, Reference logDa storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistLogRouterTagsKeys.begin), BinaryWriter::toValue(logData->logRouterTags, Unversioned()) ) ); storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistRecoveryCountKeys.begin), BinaryWriter::toValue(logData->recoveryCount, Unversioned()) ) ); - for(auto tag : allTags) { + for(auto tag : logData->allTags) { ASSERT(!logData->getTagData(tag)); logData->createTagData(tag, 0, true, true, true); updatePersistentPopped( self, logData, logData->getTagData(tag) ); @@ -1733,7 +1737,7 @@ ACTOR Future restorePersistentState( TLogData* self, LocalityData locality DUMPTOKEN( recruited.confirmRunning ); //We do not need the remoteTag, because we will not be loading any additional data - logData = Reference( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], UID()) ); + logData = Reference( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], UID(), std::vector()) ); logData->locality = id_locality[id1]; logData->stopped = true; self->id_data[id1] = logData; @@ -1918,7 +1922,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit it.second->stopCommit.trigger(); } - state Reference logData = Reference( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.recruitmentID) ); + state Reference logData = Reference( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.recruitmentID, req.allTags) ); self->id_data[recruited.id()] = logData; logData->locality = req.locality; logData->recoveryCount = req.epoch; @@ -1942,7 +1946,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit logData->version.set( logData->unrecoveredBefore - 1 ); logData->unpoppedRecoveredTags = req.allTags.size(); - Void _ = wait( initPersistentState( self, logData, req.allTags ) || logData->removed ); + Void _ = wait( initPersistentState( self, logData ) || logData->removed ); TraceEvent("TLogRecover", self->dbgid).detail("LogId", logData->logId).detail("At", req.recoverAt).detail("Known", req.knownCommittedVersion).detail("Unrecovered", logData->unrecoveredBefore).detail("Tags", describe(req.recoverTags)).detail("Locality", req.locality).detail("LogRouterTags", logData->logRouterTags); @@ -1989,27 +1993,10 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit throw worker_removed(); } - //PullAsyncData will add tags that were popped in the previous generation, - //so we need to pop all tags that did not have data at the recovery version. - std::vector> popFutures; - std::set allTags(req.allTags.begin(), req.allTags.end()); - for(int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) { - for(int tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) { - auto data = logData->tag_data[tagLocality][tagId]; - if(data && !allTags.count(data->tag) && data->tag.locality != tagLocalityLogRouter) { - TraceEvent("TLogPopOnRecover", self->dbgid).detail("LogId", logData->logId).detail("Tag", data->tag.toString()).detail("Ver", req.recoverAt); - popFutures.push_back(tLogPop(self, TLogPopRequest(req.recoverAt, 0, data->tag), logData)); - } - } - } - - Void _ = wait(waitForAll(popFutures)); - - TraceEvent("TLogPull2Complete", self->dbgid).detail("LogId", logData->logId); logData->addActor.send( respondToRecovered( recruited, logData->recoveryComplete ) ); } else { // Brand new tlog, initialization has already been done by caller - Void _ = wait( initPersistentState( self, logData, std::vector() ) || logData->removed ); + Void _ = wait( initPersistentState( self, logData ) || logData->removed ); if(logData->recoveryComplete.isSet()) { throw worker_removed(); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 7ec50f5a0e..4803cf6fb6 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -111,12 +111,14 @@ ACTOR Future forwardError( PromiseStream errors, } ACTOR Future handleIOErrors( Future actor, IClosable* store, UID id, Future onClosed = Void() ) { + state Future> storeError = actor.isReady() ? Never() : errorOr( store->getError() ); choose { when (state ErrorOr e = wait( errorOr(actor) )) { Void _ = wait(onClosed); + if(storeError.isReady()) throw storeError.getError(); if (e.isError()) throw e.getError(); else return e.get(); } - when (ErrorOr e = wait( actor.isReady() ? Never() : errorOr( store->getError() ) )) { + when (ErrorOr e = wait( storeError )) { TraceEvent("WorkerTerminatingByIOError", id).error(e.getError(), true); actor.cancel(); // file_not_found can occur due to attempting to open a partially deleted DiskQueue, which should not be reported SevError. @@ -144,7 +146,7 @@ ACTOR Future workerHandleErrors(FutureStream errors) { endRole(err.id, err.context, "Error", ok, err.error); - if (err.error.code() == error_code_please_reboot || err.error.code() == error_code_io_timeout) throw err.error; + if (err.error.code() == error_code_please_reboot || err.error.code() == error_code_io_timeout || err.error.code() == error_code_io_error) throw err.error; } } }