Merge pull request #568 from etschannen/feature-remote-logs

Many bug fixes
This commit is contained in:
Evan Tschannen 2018-07-06 16:12:56 -07:00 committed by GitHub
commit 6dbddab530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 78 deletions

View File

@ -1106,17 +1106,23 @@ public:
}
if(!hasSatelliteReplication) {
notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy);
if(usableRegions > 1) {
tooManyDead = primaryTLogsDead || remoteTLogsDead || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy);
} else {
tooManyDead = primaryTLogsDead || remoteTLogsDead || primaryProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(storagePolicy);
notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy);
}
} else {
bool primarySatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, primarySatelliteProcessesDead, satelliteTLogPolicy, primarySatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : primarySatelliteProcessesDead.validate(satelliteTLogPolicy);
bool remoteSatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteSatelliteProcessesDead, satelliteTLogPolicy, remoteSatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : remoteSatelliteProcessesDead.validate(satelliteTLogPolicy);
notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
if(usableRegions > 1) {
notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(remoteTLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(remoteTLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
} else {
notEnoughLeft = !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) || !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy);
}
if(usableRegions > 1 && allowLogSetKills) {
tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) || ( primaryTLogsDead && remoteTLogsDead ) || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
} else {
@ -1257,33 +1263,31 @@ public:
std::vector<ProcessInfo*> processesLeft, processesDead;
int protectedWorker = 0, unavailable = 0, excluded = 0, cleared = 0;
for (auto machineRec : machines) {
for (auto processInfo : machineRec.second.processes) {
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->isAvailableClass()) {
// Do not include any excluded machines
if (processInfo->isExcluded()) {
processesDead.push_back(processInfo);
excluded++;
}
else if (processInfo->isCleared()) {
processesDead.push_back(processInfo);
cleared++;
}
else if (!processInfo->isAvailable()) {
processesDead.push_back(processInfo);
unavailable++;
}
else if (protectedAddresses.count(processInfo->address)) {
processesLeft.push_back(processInfo);
protectedWorker++;
}
else if (machineRec.second.zoneId != zoneId)
processesLeft.push_back(processInfo);
// Add processes from dead machines and datacenter machines to dead group
else
processesDead.push_back(processInfo);
for (auto processInfo : getAllProcesses()) {
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->isAvailableClass()) {
// Do not include any excluded machines
if (processInfo->isExcluded()) {
processesDead.push_back(processInfo);
excluded++;
}
else if (processInfo->isCleared()) {
processesDead.push_back(processInfo);
cleared++;
}
else if (!processInfo->isAvailable()) {
processesDead.push_back(processInfo);
unavailable++;
}
else if (protectedAddresses.count(processInfo->address)) {
processesLeft.push_back(processInfo);
protectedWorker++;
}
else if (processInfo->locality.zoneId() != zoneId)
processesLeft.push_back(processInfo);
// Add processes from dead machines and datacenter machines to dead group
else
processesDead.push_back(processInfo);
}
}
if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
@ -1380,25 +1384,23 @@ public:
if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
{
std::vector<ProcessInfo*> processesLeft, processesDead;
for (auto machineRec : machines) {
for (auto processInfo : machineRec.second.processes) {
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->isAvailableClass()) {
// Mark all of the unavailable as dead
if (processInfo->isExcluded())
processesDead.push_back(processInfo);
else if (processInfo->isCleared())
processesDead.push_back(processInfo);
else if (!processInfo->isAvailable())
processesDead.push_back(processInfo);
else if (protectedAddresses.count(processInfo->address))
processesLeft.push_back(processInfo);
// Keep all not in the datacenter zones
else if (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end())
processesLeft.push_back(processInfo);
else
processesDead.push_back(processInfo);
}
for (auto processInfo : getAllProcesses()) {
// Add non-test processes (ie. datahall is not be set for test processes)
if (processInfo->isAvailableClass()) {
// Mark all of the unavailable as dead
if (processInfo->isExcluded())
processesDead.push_back(processInfo);
else if (processInfo->isCleared())
processesDead.push_back(processInfo);
else if (!processInfo->isAvailable())
processesDead.push_back(processInfo);
else if (protectedAddresses.count(processInfo->address))
processesLeft.push_back(processInfo);
// Keep all not in the datacenter zones
else if (datacenterZones.find(processInfo->locality.zoneId()) == datacenterZones.end())
processesLeft.push_back(processInfo);
else
processesDead.push_back(processInfo);
}
}
@ -1473,8 +1475,12 @@ public:
}
virtual std::vector<ProcessInfo*> getAllProcesses() const {
std::vector<ProcessInfo*> processes;
for( auto c = machines.begin(); c != machines.end(); ++c )
processes.insert( processes.end(), c->second.processes.begin(), c->second.processes.end() );
for( auto& c : machines ) {
processes.insert( processes.end(), c.second.processes.begin(), c.second.processes.end() );
}
for( auto& c : currentlyRebootingProcesses ) {
processes.push_back( c.second );
}
return processes;
}
virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) {

View File

@ -102,8 +102,8 @@ public:
inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; };
std::string toString() const {
return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s coord: %s data: %s excluded: %d cleared: %d",
name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), coordinationFolder, dataFolder, excluded, cleared); }
return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s excluded: %d cleared: %d",
name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), excluded, cleared); }
// Members not for external use
Promise<KillType> shutdownSignal;

View File

@ -604,6 +604,11 @@ public:
if(regions[0].priority == regions[1].priority && clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get()) {
std::swap(regions[0], regions[1]);
}
if(clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get() && (!versionDifferenceUpdated || datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE)) {
std::swap(regions[0], regions[1]);
}
bool setPrimaryDesired = false;
try {
auto reply = findWorkersForConfiguration(req, regions[0].dcId);
@ -2050,8 +2055,14 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
loop {
self->versionDifferenceUpdated = false;
if(self->db.serverInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) {
bool oldDifferenceTooLarge = !self->versionDifferenceUpdated || self->datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE;
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = 0;
if(oldDifferenceTooLarge) {
checkOutstandingRequests(self);
}
Void _ = wait(self->db.serverInfo->onChange());
continue;
}
@ -2094,8 +2105,14 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
break;
}
bool oldDifferenceTooLarge = !self->versionDifferenceUpdated || self->datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE;
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = primaryMetrics.get().v - remoteMetrics.get().v;
if(oldDifferenceTooLarge && self->datacenterVersionDifference < SERVER_KNOBS->MAX_VERSION_DIFFERENCE) {
checkOutstandingRequests(self);
}
if(now() - lastLogTime > SERVER_KNOBS->CLUSTER_CONTROLLER_LOGGING_DELAY) {
lastLogTime = now();
TraceEvent("DatacenterVersionDifference", self->id).detail("Difference", self->datacenterVersionDifference);

View File

@ -379,6 +379,9 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
//only callable after getTagData returns a null reference
Reference<TagData> createTagData(Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered) {
if(tag.locality != tagLocalityLogRouter && allTags.size() && !allTags.count(tag) && popped < recoveredAt) {
popped = recoveredAt;
}
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, nothingPersistent, poppedRecently, unpoppedRecovered) );
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
tag_data[idx][tag.id] = newTagData;
@ -406,10 +409,11 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Version logRouterPoppedVersion, logRouterPopToVersion;
int8_t locality;
UID recruitmentID;
std::set<Tag> allTags;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID) : tLogData(tLogData), knownCommittedVersion(1), logId(interf.id()),
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(1), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid)
@ -1242,7 +1246,7 @@ ACTOR Future<Void> tLogCommit(
return Void();
}
ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logData, std::vector<Tag> allTags ) {
ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logData ) {
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
IKeyValueStore *storage = self->persistentData;
storage->set( persistFormat );
@ -1252,7 +1256,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistLogRouterTagsKeys.begin), BinaryWriter::toValue(logData->logRouterTags, Unversioned()) ) );
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistRecoveryCountKeys.begin), BinaryWriter::toValue(logData->recoveryCount, Unversioned()) ) );
for(auto tag : allTags) {
for(auto tag : logData->allTags) {
ASSERT(!logData->getTagData(tag));
logData->createTagData(tag, 0, true, true, true);
updatePersistentPopped( self, logData, logData->getTagData(tag) );
@ -1733,7 +1737,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
DUMPTOKEN( recruited.confirmRunning );
//We do not need the remoteTag, because we will not be loading any additional data
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], UID()) );
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], UID(), std::vector<Tag>()) );
logData->locality = id_locality[id1];
logData->stopped = true;
self->id_data[id1] = logData;
@ -1918,7 +1922,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
it.second->stopCommit.trigger();
}
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.recruitmentID) );
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.recruitmentID, req.allTags) );
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
@ -1942,7 +1946,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
logData->version.set( logData->unrecoveredBefore - 1 );
logData->unpoppedRecoveredTags = req.allTags.size();
Void _ = wait( initPersistentState( self, logData, req.allTags ) || logData->removed );
Void _ = wait( initPersistentState( self, logData ) || logData->removed );
TraceEvent("TLogRecover", self->dbgid).detail("LogId", logData->logId).detail("At", req.recoverAt).detail("Known", req.knownCommittedVersion).detail("Unrecovered", logData->unrecoveredBefore).detail("Tags", describe(req.recoverTags)).detail("Locality", req.locality).detail("LogRouterTags", logData->logRouterTags);
@ -1989,27 +1993,10 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
throw worker_removed();
}
//PullAsyncData will add tags that were popped in the previous generation,
//so we need to pop all tags that did not have data at the recovery version.
std::vector<Future<Void>> popFutures;
std::set<Tag> allTags(req.allTags.begin(), req.allTags.end());
for(int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
for(int tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
auto data = logData->tag_data[tagLocality][tagId];
if(data && !allTags.count(data->tag) && data->tag.locality != tagLocalityLogRouter) {
TraceEvent("TLogPopOnRecover", self->dbgid).detail("LogId", logData->logId).detail("Tag", data->tag.toString()).detail("Ver", req.recoverAt);
popFutures.push_back(tLogPop(self, TLogPopRequest(req.recoverAt, 0, data->tag), logData));
}
}
}
Void _ = wait(waitForAll(popFutures));
TraceEvent("TLogPull2Complete", self->dbgid).detail("LogId", logData->logId);
logData->addActor.send( respondToRecovered( recruited, logData->recoveryComplete ) );
} else {
// Brand new tlog, initialization has already been done by caller
Void _ = wait( initPersistentState( self, logData, std::vector<Tag>() ) || logData->removed );
Void _ = wait( initPersistentState( self, logData ) || logData->removed );
if(logData->recoveryComplete.isSet()) {
throw worker_removed();

View File

@ -111,12 +111,14 @@ ACTOR Future<Void> forwardError( PromiseStream<ErrorInfo> errors,
}
ACTOR Future<Void> handleIOErrors( Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void() ) {
state Future<ErrorOr<Void>> storeError = actor.isReady() ? Never() : errorOr( store->getError() );
choose {
when (state ErrorOr<Void> e = wait( errorOr(actor) )) {
Void _ = wait(onClosed);
if(storeError.isReady()) throw storeError.getError();
if (e.isError()) throw e.getError(); else return e.get();
}
when (ErrorOr<Void> e = wait( actor.isReady() ? Never() : errorOr( store->getError() ) )) {
when (ErrorOr<Void> e = wait( storeError )) {
TraceEvent("WorkerTerminatingByIOError", id).error(e.getError(), true);
actor.cancel();
// file_not_found can occur due to attempting to open a partially deleted DiskQueue, which should not be reported SevError.
@ -144,7 +146,7 @@ ACTOR Future<Void> workerHandleErrors(FutureStream<ErrorInfo> errors) {
endRole(err.id, err.context, "Error", ok, err.error);
if (err.error.code() == error_code_please_reboot || err.error.code() == error_code_io_timeout) throw err.error;
if (err.error.code() == error_code_please_reboot || err.error.code() == error_code_io_timeout || err.error.code() == error_code_io_error) throw err.error;
}
}
}