Fix backup worker start version when logset start version is lower
The start version of tlog set can be smaller than the last epoch's end version. In this case, set backup worker's start version as last epoch's end version to avoid overlapping of version ranges among backup workers.
This commit is contained in:
parent
38def426f4
commit
12ed8ad536
|
@ -721,7 +721,8 @@ struct ILogSystem {
|
|||
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
|
||||
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
|
||||
|
||||
virtual Version getStartVersion() const = 0; // Returns the start version of current epoch.
|
||||
// Returns the start version of current epoch for backup workers.
|
||||
virtual Version getBackupStartVersion() const = 0;
|
||||
|
||||
struct EpochTagsVersionsInfo {
|
||||
int32_t logRouterTags; // Number of log router tags.
|
||||
|
|
|
@ -191,6 +191,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
Optional<Version> recoverAt;
|
||||
Optional<Version> recoveredAt;
|
||||
Version knownCommittedVersion;
|
||||
Version backupStartVersion = invalidVersion; // max(tLogs[0].startVersion, previous epochEnd).
|
||||
LocalityData locality;
|
||||
std::map< std::pair<UID, Tag>, std::pair<Version, Version> > outstandingPops; // For each currently running popFromLog actor, (log server #, tag)->popped version
|
||||
Optional<PromiseStream<Future<Void>>> addActor;
|
||||
|
@ -1350,9 +1351,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
int getLogRouterTags() const override { return logRouterTags; }
|
||||
|
||||
Version getStartVersion() const override {
|
||||
Version getBackupStartVersion() const final {
|
||||
ASSERT(tLogs.size() > 0);
|
||||
return tLogs[0]->startVersion;
|
||||
return backupStartVersion;
|
||||
}
|
||||
|
||||
std::map<LogEpoch, ILogSystem::EpochTagsVersionsInfo> getOldEpochTagsVersionsInfo() const override {
|
||||
|
@ -2214,6 +2215,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->oldLogData.insert(logSystem->oldLogData.end(), oldLogSystem->oldLogData.begin(), oldLogSystem->oldLogData.end());
|
||||
|
||||
logSystem->tLogs[0]->startVersion = oldLogSystem->knownCommittedVersion + 1;
|
||||
logSystem->backupStartVersion = oldLogSystem->knownCommittedVersion + 1;
|
||||
state int lockNum = 0;
|
||||
while(lockNum < oldLogSystem->lockResults.size()) {
|
||||
if(oldLogSystem->lockResults[lockNum].logSet->locality == primaryLocality) {
|
||||
|
|
|
@ -1256,7 +1256,7 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self, Datab
|
|||
idsTags.emplace_back(deterministicRandom()->randomUniqueID(), Tag(tagLocalityLogRouter, i));
|
||||
}
|
||||
|
||||
const Version startVersion = self->logSystem->getStartVersion();
|
||||
const Version startVersion = self->logSystem->getBackupStartVersion();
|
||||
state int i = 0;
|
||||
for (; i < logRouterTags; i++) {
|
||||
const auto& worker = self->backupWorkers[i % self->backupWorkers.size()];
|
||||
|
|
Loading…
Reference in New Issue