Fix review comments for PR #1473

This commit is contained in:
Jingyu Zhou 2019-04-29 16:44:11 -07:00
parent 5462f560e7
commit 8b5449e608
5 changed files with 7 additions and 7 deletions

View File

@ -698,15 +698,13 @@ struct ILogSystem {
virtual void stopRejoins() = 0;
virtual void addPseudoLocality(int8_t locality) = 0;
// Returns the pseudo tag to be popped for the given process class. If the
// process class doesn't use pseudo tag, return the same tag.
virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) = 0;
virtual bool isPseudoLocality(int8_t locality) = 0;
virtual Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) = 0;
virtual Version popPseudoLocalityTag(int8_t locality, Version upTo) = 0;
};
struct LengthPrefixedStringRef {

View File

@ -885,7 +885,7 @@ ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogDat
state Version upTo = req.to;
int8_t tagLocality = req.tag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to);
upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, req.to);
tagLocality = tagLocalityLogRouter;
}
state Tag tag(tagLocality, req.tag.id);

View File

@ -1125,7 +1125,7 @@ ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogDat
state Version upTo = req.to;
int8_t tagLocality = req.tag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->getPseudoLocalityPopVersion(tagLocality, req.to);
upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, req.to);
tagLocality = tagLocalityLogRouter;
}
state Tag tag(tagLocality, req.tag.id);

View File

@ -217,7 +217,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return dbgid;
}
void addPseudoLocality(int8_t locality) override {
void addPseudoLocality(int8_t locality) {
ASSERT(locality < 0);
pseudoLocalities.insert(locality);
pseudoLocalityPopVersion[locality] = 0;
@ -241,7 +241,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return pseudoLocalities.count(locality) > 0;
}
Version getPseudoLocalityPopVersion(int8_t locality, Version upTo) override {
Version popPseudoLocalityTag(int8_t locality, Version upTo) override {
ASSERT(isPseudoLocality(locality));
auto& localityVersion = pseudoLocalityPopVersion[locality];
localityVersion = std::max(localityVersion, upTo);

View File

@ -326,10 +326,12 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max<int>(1, self->configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size())), exclusionWorkerIds) ) );
self->primaryLocality = self->dcId_locality[recr.dcId];
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
} else {
self->primaryLocality = tagLocalitySpecial;
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
}