diff --git a/fdbserver/Backup.actor.cpp b/fdbserver/Backup.actor.cpp index 508453de82..4a5b5d10da 100644 --- a/fdbserver/Backup.actor.cpp +++ b/fdbserver/Backup.actor.cpp @@ -197,9 +197,13 @@ ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest Reference ls = ILogSystem::fromServerDBInfo(self.myId, db->get(), true); if (ls && ls->hasPseudoLocality(tagLocalityBackup)) { self.logSystem.set(ls); - TraceEvent("BackupWorkerLogSystem", interf.id()).detail("HasBackupLocality", true); + TraceEvent("BackupWorkerLogSystem", interf.id()) + .detail("HasBackupLocality", true) + .detail("Tag", self.tag.toString()); } else { - TraceEvent("BackupWorkerLogSystem", interf.id()).detail("HasBackupLocality", false); + TraceEvent("BackupWorkerLogSystem", interf.id()) + .detail("HasBackupLocality", false) + .detail("Tag", self.tag.toString()); } } when(HaltBackupRequest req = waitNext(interf.haltBackup.getFuture())) { diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index b06034a8b4..54b4a6e803 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -757,7 +757,10 @@ struct ILogSystem { virtual bool hasPseudoLocality(int8_t locality) = 0; - virtual Version popPseudoLocalityTag(int8_t locality, Version upTo) = 0; + // Returns the actual version to be popped from the log router tag for the given pseudo tag. + // For instance, a pseudo tag (-8, 2) means the actual popping tag is (-2, 2). Assuming there + // are multiple pseudo tags, the returned version is the min(all pseudo tags' "upTo" versions). + virtual Version popPseudoLocalityTag(Tag tag, Version upTo) = 0; virtual void setBackupWorkers( std::vector>>> backupWorkers) = 0; diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 5c4259ea02..46f3b68fa2 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -697,7 +697,7 @@ ACTOR Future tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere int8_t tagLocality = inputTag.locality; if (isPseudoLocality(tagLocality)) { if (logData->logSystem->get().isValid()) { - upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, to); + upTo = logData->logSystem->get()->popPseudoLocalityTag(inputTag, to); tagLocality = tagLocalityLogRouter; } else { TraceEvent("TLogPopNoLogSystem", self->dbgid).detail("Locality", tagLocality).detail("Version", upTo); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 9b5630d63e..03a44cd7a3 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -985,7 +985,7 @@ ACTOR Future tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere int8_t tagLocality = inputTag.locality; if (isPseudoLocality(tagLocality)) { if (logData->logSystem->get().isValid()) { - upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, to); + upTo = logData->logSystem->get()->popPseudoLocalityTag(inputTag, to); tagLocality = tagLocalityLogRouter; } else { // TODO: if this happens, need to save the popped version? or discard the pop? @@ -995,6 +995,7 @@ ACTOR Future tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere } state Tag tag(tagLocality, inputTag.id); auto tagData = logData->getTagData(tag); + TraceEvent("TLogPop0", logData->logId).detail("Tag", tag.toString()).detail("To", to).detail("UpTo", upTo).detail("Popped", tagData ? tagData->popped : -1); if (!tagData) { tagData = logData->createTagData(tag, upTo, true, true, false); } else if (upTo > tagData->popped) { @@ -1012,7 +1013,7 @@ ACTOR Future tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere if (upTo > logData->persistentDataDurableVersion) wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop)); - //TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo); + TraceEvent("TLogPop", logData->logId).detail("Tag", tag.toString()).detail("To", upTo); } return Void(); } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index a1184732e7..d4bc8fc235 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -181,7 +181,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted pseudoLocalityPopVersion; + std::map pseudoLocalityPopVersion; Future rejoins; Future recoveryComplete; Future remoteRecovery; @@ -238,7 +238,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 0; } - Version popPseudoLocalityTag(int8_t locality, Version upTo) override { - ASSERT(hasPseudoLocality(locality)); - auto& localityVersion = pseudoLocalityPopVersion[locality]; + Version popPseudoLocalityTag(Tag tag, Version upTo) override { + ASSERT(isPseudoLocality(tag.locality) && hasPseudoLocality(tag.locality)); + + Version& localityVersion = pseudoLocalityPopVersion[tag]; localityVersion = std::max(localityVersion, upTo); Version minVersion = localityVersion; - for (const auto& it : pseudoLocalityPopVersion) { - minVersion = std::min(minVersion, it.second); + for (const int8_t locality : pseudoLocalities) { + minVersion = std::min(minVersion, pseudoLocalityPopVersion[Tag(locality, tag.id)]); } - TraceEvent("Pop").detail("L", locality).detail("Version", upTo).detail("PopVersion", minVersion); + TraceEvent("Pop", dbgid).detail("Tag", tag.toString()).detail("Version", upTo).detail("PopVersion", minVersion); return minVersion; } @@ -304,6 +307,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData.emplace_back(oldTlogConf); + //TraceEvent("BWFromLSConf") + // .detail("Epoch", logSystem->oldLogData.back().epoch) + // .detail("Version", logSystem->oldLogData.back().epochEnd); } logSystem->logSystemType = lsConf.logSystemType; @@ -326,6 +332,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData.emplace_back(lsConf.oldTLogs[i]); + //TraceEvent("BWFromOldLSConf") + // .detail("Epoch", logSystem->oldLogData.back().epoch) + // .detail("Version", logSystem->oldLogData.back().epochEnd); } } logSystem->logSystemType = lsConf.logSystemType; @@ -360,6 +369,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedoldLogData[0].epoch = oldLogSystem->epoch; } logSystem->oldLogData.insert(logSystem->oldLogData.end(), oldLogSystem->oldLogData.begin(), oldLogSystem->oldLogData.end()); + //for (const auto& old : logSystem->oldLogData) { + // TraceEvent("BWEndVersion").detail("Epoch", old.epoch).detail("Version", old.epochEnd); + //} logSystem->tLogs[0]->startVersion = oldLogSystem->knownCommittedVersion + 1; state int lockNum = 0; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 5e223f954a..de7ea25eff 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -128,7 +128,7 @@ private: if(finalWrite) { self->finalWriteStarted = true; } - + try { wait( self->cstate.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) ); } catch (Error& e) { @@ -1399,6 +1399,9 @@ ACTOR Future masterCore( Reference self ) { .detail("MyRecoveryCount", self->cstate.prevDBState.recoveryCount+2) .detail("ForceRecovery", self->forceRecovery) .trackLatest("MasterRecoveryState"); + //for (const auto& old : self->cstate.prevDBState.oldTLogData) { + // TraceEvent("BWReadCoreState", self->dbgid).detail("Epoch", old.epoch).detail("Version", old.epochEnd); + //} state Reference>> oldLogSystems( new AsyncVar> ); state Future recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality, &self->forceRecovery);