Fix oldest backup epoch for backup workers
The oldest backup epoch is piggybacked in LogSystemConfig from master to cluster controller and then to all workers. Previously, this epoch is set to the current master epoch, which is wrong.
This commit is contained in:
parent
fea6155714
commit
5b36dcaad5
|
@ -72,6 +72,7 @@ struct BackupData {
|
|||
const Optional<Version> endVersion; // old epoch's end version (inclusive), or empty for current epoch
|
||||
const LogEpoch recruitedEpoch;
|
||||
const LogEpoch backupEpoch;
|
||||
LogEpoch oldestBackupEpoch = 0;
|
||||
Version minKnownCommittedVersion;
|
||||
Version savedVersion;
|
||||
AsyncVar<Reference<ILogSystem>> logSystem;
|
||||
|
@ -169,13 +170,12 @@ struct BackupData {
|
|||
}
|
||||
|
||||
void pop() {
|
||||
const LogEpoch oldest = logSystem.get()->getOldestBackupEpoch();
|
||||
if (backupEpoch > oldest) {
|
||||
if (backupEpoch > oldestBackupEpoch) {
|
||||
// Defer pop if old epoch hasn't finished popping yet.
|
||||
TraceEvent("BackupWorkerPopDeferred", myId)
|
||||
.suppressFor(1.0)
|
||||
.detail("BackupEpoch", backupEpoch)
|
||||
.detail("OldestEpoch", oldest)
|
||||
.detail("OldestEpoch", oldestBackupEpoch)
|
||||
.detail("Version", savedVersion);
|
||||
return;
|
||||
}
|
||||
|
@ -552,6 +552,14 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
|
|||
MutationRef m;
|
||||
if (!message.isBackupMessage(&m)) continue;
|
||||
|
||||
if (debugMutation("addMutation", message.version.version, m)) {
|
||||
TraceEvent("BackupWorkerDebug", self->myId)
|
||||
.detail("Version", message.version.toString())
|
||||
.detail("Mutation", m.toString())
|
||||
.detail("KCV", self->minKnownCommittedVersion)
|
||||
.detail("SavedVersion", self->savedVersion);
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> adds;
|
||||
if (m.type != MutationRef::Type::ClearRange) {
|
||||
for (int index : keyRangeMap[m.param1]) {
|
||||
|
@ -804,15 +812,14 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
|||
dbInfoChange = db->onChange();
|
||||
Reference<ILogSystem> ls = ILogSystem::fromServerDBInfo(self.myId, db->get(), true);
|
||||
bool hasPseudoLocality = ls.isValid() && ls->hasPseudoLocality(tagLocalityBackup);
|
||||
LogEpoch oldestBackupEpoch = 0;
|
||||
if (hasPseudoLocality) {
|
||||
self.logSystem.set(ls);
|
||||
self.pop();
|
||||
oldestBackupEpoch = ls->getOldestBackupEpoch();
|
||||
self.oldestBackupEpoch = std::max(self.oldestBackupEpoch, ls->getOldestBackupEpoch());
|
||||
}
|
||||
TraceEvent("BackupWorkerLogSystem", self.myId)
|
||||
.detail("HasBackupLocality", hasPseudoLocality)
|
||||
.detail("OldestBackupEpoch", oldestBackupEpoch)
|
||||
.detail("OldestBackupEpoch", self.oldestBackupEpoch)
|
||||
.detail("Tag", self.tag.toString());
|
||||
}
|
||||
when(wait(done)) {
|
||||
|
|
|
@ -2052,8 +2052,17 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
|
|||
void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest const& req ) {
|
||||
req.reply.send( Void() );
|
||||
|
||||
TraceEvent("MasterRegistrationReceived", self->id).detail("MasterId", req.id).detail("Master", req.mi.toString()).detail("Tlogs", describe(req.logSystemConfig.tLogs)).detail("Resolvers", req.resolvers.size())
|
||||
.detail("RecoveryState", (int)req.recoveryState).detail("RegistrationCount", req.registrationCount).detail("Proxies", req.proxies.size()).detail("RecoveryCount", req.recoveryCount).detail("Stalled", req.recoveryStalled);
|
||||
TraceEvent("MasterRegistrationReceived", self->id)
|
||||
.detail("MasterId", req.id)
|
||||
.detail("Master", req.mi.toString())
|
||||
.detail("Tlogs", describe(req.logSystemConfig.tLogs))
|
||||
.detail("Resolvers", req.resolvers.size())
|
||||
.detail("RecoveryState", (int)req.recoveryState)
|
||||
.detail("RegistrationCount", req.registrationCount)
|
||||
.detail("Proxies", req.proxies.size())
|
||||
.detail("RecoveryCount", req.recoveryCount)
|
||||
.detail("Stalled", req.recoveryStalled)
|
||||
.detail("OldestBackupEpoch", req.logSystemConfig.oldestBackupEpoch);
|
||||
|
||||
//make sure the request comes from an active database
|
||||
auto db = &self->db;
|
||||
|
|
|
@ -202,7 +202,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
TagPartitionedLogSystem(UID dbgid, LocalityData locality, LogEpoch e,
|
||||
Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>())
|
||||
: dbgid(dbgid), logSystemType(LogSystemType::empty), expectedLogSets(0), logRouterTags(0), txsTags(0),
|
||||
repopulateRegionAntiQuorum(0), epoch(e), oldestBackupEpoch(e), recoveryCompleteWrittenToCoreState(false),
|
||||
repopulateRegionAntiQuorum(0), epoch(e), oldestBackupEpoch(0), recoveryCompleteWrittenToCoreState(false),
|
||||
locality(locality), remoteLogsWrittenToCoreState(false), hasRemoteServers(false), stopped(false),
|
||||
addActor(addActor), popActors(false) {}
|
||||
|
||||
|
@ -309,6 +309,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
|
||||
logSystem->logSystemType = lsConf.logSystemType;
|
||||
logSystem->oldestBackupEpoch = lsConf.oldestBackupEpoch;
|
||||
return logSystem;
|
||||
}
|
||||
|
||||
|
@ -1394,6 +1395,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
logset->backupWorkers.push_back(worker);
|
||||
}
|
||||
TraceEvent("SetOldestBackupEpoch", dbgid).detail("Epoch", oldestBackupEpoch);
|
||||
backupWorkerChanged.trigger();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue