Add pseudo locality for log routers and tlogs

This changes the logic of pop operations from log routers (LG):
- LG pops tagLocalityLogRouterMapped from TLogs;
- TLog converts tagLocalityLogRouterMapped back to tagLocalityLogRouter before
  popping.

Later when we add more psuedo localities, the same pattern can be used.
This commit is contained in:
Jingyu Zhou 2019-04-23 15:39:26 -07:00
parent 439d5a3843
commit 5462f560e7
6 changed files with 91 additions and 22 deletions

View File

@ -33,7 +33,15 @@ typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;
enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4, tagLocalitySatellite = -5, tagLocalityInvalid = -99 }; //The TLog and LogRouter require these number to be as compact as possible
enum {
tagLocalitySpecial = -1,
tagLocalityLogRouter = -2,
tagLocalityRemoteLog = -3,
tagLocalityUpgraded = -4,
tagLocalitySatellite = -5,
tagLocalityLogRouterMapped = -6,
tagLocalityInvalid = -99
}; //The TLog and LogRouter require these number to be as compact as possible
#pragma pack(push, 1)
struct Tag {

View File

@ -378,7 +378,8 @@ ACTOR Future<Void> logRouterPop( LogRouterData* self, TLogPopRequest req ) {
self->poppedVersion = std::min(minKnownCommittedVersion, self->minKnownCommittedVersion);
if(self->logSystem->get() && self->allowPops) {
self->logSystem->get()->pop(self->poppedVersion, self->routerTag);
const Tag popTag = self->logSystem->get()->getPseudoPopTag(self->routerTag, ProcessClass::LogRouterClass);
self->logSystem->get()->pop(self->poppedVersion, popTag);
}
req.reply.send(Void());
self->minPopped.set(std::max(minPopped, self->minPopped.get()));

View File

@ -697,6 +697,16 @@ struct ILogSystem {
virtual Tag getRandomRouterTag() = 0;
virtual void stopRejoins() = 0;
virtual void addPseudoLocality(int8_t locality) = 0;
// Returns the pseudo tag to be popped for the given process class. If the
// process class doesn't use pseudo tag, return the same tag.
virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) = 0;
virtual bool isPseudoLocality(int8_t locality) = 0;
virtual Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) = 0;
};
struct LengthPrefixedStringRef {

View File

@ -882,25 +882,32 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>> & getVersionMessages( Re
};
ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) {
auto tagData = logData->getTagData(req.tag);
state Version upTo = req.to;
int8_t tagLocality = req.tag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to);
tagLocality = tagLocalityLogRouter;
}
state Tag tag(tagLocality, req.tag.id);
auto tagData = logData->getTagData(tag);
if (!tagData) {
tagData = logData->createTagData(req.tag, req.to, true, true, false);
} else if (req.to > tagData->popped) {
tagData->popped = req.to;
tagData = logData->createTagData(tag, upTo, true, true, false);
} else if (upTo > tagData->popped) {
tagData->popped = upTo;
tagData->poppedRecently = true;
if(tagData->unpoppedRecovered && req.to > logData->recoveredAt) {
if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) {
tagData->unpoppedRecovered = false;
logData->unpoppedRecoveredTags--;
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", req.tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
if(logData->unpoppedRecoveredTags == 0 && logData->durableKnownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) {
logData->recoveryComplete.send(Void());
}
}
if ( req.to > logData->persistentDataDurableVersion )
wait(tagData->eraseMessagesBefore( req.to, self, logData, TaskTLogPop ));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to);
if (upTo > logData->persistentDataDurableVersion)
wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskTLogPop));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo);
}
req.reply.send(Void());

View File

@ -1122,26 +1122,33 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>> & getVersionMessages( Re
};
ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) {
auto tagData = logData->getTagData(req.tag);
state Version upTo = req.to;
int8_t tagLocality = req.tag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to);
tagLocality = tagLocalityLogRouter;
}
state Tag tag(tagLocality, req.tag.id);
auto tagData = logData->getTagData(tag);
if (!tagData) {
tagData = logData->createTagData(req.tag, req.to, true, true, false);
} else if (req.to > tagData->popped) {
tagData->popped = req.to;
tagData = logData->createTagData(tag, upTo, true, true, false);
} else if (upTo > tagData->popped) {
tagData->popped = upTo;
tagData->poppedRecently = true;
tagData->requiresPoppedLocationUpdate = true;
if(tagData->unpoppedRecovered && req.to > logData->recoveredAt) {
if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) {
tagData->unpoppedRecovered = false;
logData->unpoppedRecoveredTags--;
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", req.tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
if(logData->unpoppedRecoveredTags == 0 && logData->durableKnownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) {
logData->recoveryComplete.send(Void());
}
}
if ( req.to > logData->persistentDataDurableVersion )
wait(tagData->eraseMessagesBefore( req.to, self, logData, TaskTLogPop ));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to);
if (upTo > logData->persistentDataDurableVersion)
wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskTLogPop));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo);
}
req.reply.send(Void());

View File

@ -166,6 +166,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int repopulateRegionAntiQuorum;
bool stopped;
std::set<int8_t> pseudoLocalities;
std::map<int8_t, Version> pseudoLocalityPopVersion;
// new members
Future<Void> rejoins;
@ -216,6 +217,41 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return dbgid;
}
void addPseudoLocality(int8_t locality) override {
ASSERT(locality < 0);
pseudoLocalities.insert(locality);
pseudoLocalityPopVersion[locality] = 0;
}
Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) override {
switch (type) {
case ProcessClass::LogRouterClass:
if (tag.locality == tagLocalityLogRouter && pseudoLocalities.count(tag.locality) > 0) {
tag.locality = tagLocalityLogRouterMapped;
}
break;
default:
break;
}
return tag;
}
bool isPseudoLocality(int8_t locality) override {
return pseudoLocalities.count(locality) > 0;
}
Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) override {
ASSERT(isPseudoLocality(locality));
auto& localityVersion = pseudoLocalityPopVersion[locality];
localityVersion = std::max(localityVersion, upTo);
Version minVersion = localityVersion;
for (const auto& it : pseudoLocalityPopVersion) {
minVersion = std::min(minVersion, it.second);
}
return minVersion;
}
static Future<Void> recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool* forceRecovery) {
return epochEnd( outLogSystem, dbgid, oldState, rejoins, locality, forceRecovery );
}
@ -1083,7 +1119,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
virtual bool hasRemoteLogs() {
return logRouterTags > 0;
return logRouterTags > 0 || pseudoLocalities.size() > 0;
}
virtual Tag getRandomRouterTag() {
@ -1750,7 +1786,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(configuration.usableRegions > 1) {
logSystem->logRouterTags = recr.tLogs.size() * std::max<int>(1, configuration.desiredLogRouterCount / std::max<int>(1,recr.tLogs.size()));
logSystem->expectedLogSets++;
logSystem->pseudoLocalities.insert(tagLocalityLogRouter);
logSystem->addPseudoLocality(tagLocalityLogRouterMapped);
}
logSystem->tLogs.emplace_back(new LogSet());