New backup:Revise event name and explain code

This commit is contained in:
Meng Xu 2020-03-20 18:39:51 -07:00
parent 8bdda0fe04
commit 3f31ebf659
8 changed files with 34 additions and 18 deletions

View File

@ -695,6 +695,9 @@ struct TLogVersion {
UNSET = 0,
// Everything between BEGIN and END should be densely packed, so that we
// can iterate over them easily.
// V3 was the introduction of spill by reference;
// V4 changed how data gets written to satellite TLogs so that we can peek from them;
// V5 merged reference and value spilling
// V1 = 1, // 4.6 is dispatched to via 6.0
V2 = 2, // 6.0
V3 = 3, // 6.1

View File

@ -128,7 +128,7 @@ ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupPro
const WorkerBackupStatus status = decodeBackupProgressValue(it.value);
bStatus->addBackupStatus(status);
TraceEvent("GotBackupProgress", dbgid)
.detail("W", workerID)
.detail("BackupWorker", workerID)
.detail("Epoch", status.epoch)
.detail("Version", status.version)
.detail("Tag", status.tag.toString())

View File

@ -71,8 +71,8 @@ struct BackupData {
const Version startVersion;
const Optional<Version> endVersion; // old epoch's end version (inclusive), or empty for current epoch
const LogEpoch recruitedEpoch;
const LogEpoch backupEpoch;
LogEpoch oldestBackupEpoch = 0;
const LogEpoch backupEpoch; // most recent active epoch whose tLogs are receiving mutations
LogEpoch oldestBackupEpoch = 0; // oldest epoch that still has data on tLogs for backup to pull
Version minKnownCommittedVersion;
Version savedVersion;
AsyncVar<Reference<ILogSystem>> logSystem;
@ -820,6 +820,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
if (hasPseudoLocality) {
self.logSystem.set(ls);
self.pop();
// Q: When will self.oldestBackupEpoch > ls->getOldestBackupEpoch()
self.oldestBackupEpoch = std::max(self.oldestBackupEpoch, ls->getOldestBackupEpoch());
}
TraceEvent("BackupWorkerLogSystem", self.myId)

View File

@ -36,7 +36,7 @@ struct DBCoreState;
struct TLogSet;
struct CoreTLogSet;
// The set of tLog servers and logRouters for a log tag
// The set of tLog servers, logRouters and backupWorkers for a log tag
class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
public:
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logServers;

View File

@ -97,6 +97,7 @@ struct StagingKey {
}
// Precompute the final value of the key.
// TODO: Look at the last LogMessageVersion, if it set or clear, we can ignore the rest of versions.
void precomputeResult() {
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
.detail("Key", key)

View File

@ -41,6 +41,7 @@ ACTOR Future<Version> minVersionWhenReady(Future<Void> f, std::vector<Future<TLo
return minVersion;
}
// TagPartitionedLogSystem info in old epoch
struct OldLogData {
std::vector<Reference<LogSet>> tLogs;
int32_t logRouterTags;
@ -165,7 +166,7 @@ OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData)
struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogSystem> {
const UID dbgid;
LogSystemType logSystemType;
std::vector<Reference<LogSet>> tLogs;
std::vector<Reference<LogSet>> tLogs; // LogSets in different locations: primary, remote or satellite
int expectedLogSets;
int logRouterTags;
int txsTags;
@ -196,7 +197,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::map< std::pair<UID, Tag>, std::pair<Version, Version> > outstandingPops; // For each currently running popFromLog actor, (log server #, tag)->popped version
Optional<PromiseStream<Future<Void>>> addActor;
ActorCollection popActors;
std::vector<OldLogData> oldLogData;
std::vector<OldLogData> oldLogData; // each element has the log info. in one old epoch.
AsyncTrigger logSystemConfigChanged;
TagPartitionedLogSystem(UID dbgid, LocalityData locality, LogEpoch e,
@ -1059,24 +1060,32 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
void pop(Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality) override {
if (upTo <= 0) return;
if( tag.locality == tagLocalityRemoteLog) {
if (tag.locality == tagLocalityRemoteLog) {
popLogRouter(upTo, tag, durableKnownCommittedVersion, popLocality);
return;
}
for(auto& t : tLogs) {
if(t->locality == tagLocalitySpecial || t->locality == tag.locality || tag.locality == tagLocalityUpgraded || (tag.locality < 0 && ((popLocality == tagLocalityInvalid) == t->isLocal))) {
for (auto& t : tLogs) {
if (t->locality == tagLocalitySpecial || t->locality == tag.locality ||
tag.locality == tagLocalityUpgraded ||
(tag.locality < 0 && ((popLocality == tagLocalityInvalid) == t->isLocal))) {
for(auto& log : t->logServers) {
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first;
if (prev < upTo)
if (prev < upTo) {
// update pop version for popFromLog actor
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, durableKnownCommittedVersion);
if (prev == 0)
}
if (prev == 0) {
// pop tag from log upto version defined in outstandingPops[].first
popActors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
}
}
}
}
}
ACTOR static Future<Void> popFromLog( TagPartitionedLogSystem* self, Reference<AsyncVar<OptionalInterface<TLogInterface>>> log, Tag tag, double time ) {
ACTOR static Future<Void> popFromLog(TagPartitionedLogSystem* self,
Reference<AsyncVar<OptionalInterface<TLogInterface>>> log, Tag tag,
double time) {
state Version last = 0;
loop {
wait( delay(time, TaskPriority::TLogPop) );
@ -2363,7 +2372,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
}
for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) {
InitializeTLogRequest &req = sreqs[i];
req.recruitmentID = logSystem->recruitmentID;

View File

@ -1288,19 +1288,19 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self, Datab
wait(gotProgress);
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> toRecruit =
backupProgress->getUnfinishedBackup();
for (const auto& [epochVersionCount, tagVersions] : toRecruit) {
for (const auto& [epochVersionTags, tagVersions] : toRecruit) {
for (const auto& [tag, version] : tagVersions) {
const auto& worker = self->backupWorkers[i % self->backupWorkers.size()];
i++;
InitializeBackupRequest req(deterministicRandom()->randomUniqueID());
req.recruitedEpoch = epoch;
req.backupEpoch = std::get<0>(epochVersionCount);
req.backupEpoch = std::get<0>(epochVersionTags);
req.routerTag = tag;
req.totalTags = std::get<2>(epochVersionCount);
req.totalTags = std::get<2>(epochVersionTags);
req.startVersion = version; // savedVersion + 1
req.endVersion = std::get<1>(epochVersionCount) - 1;
req.endVersion = std::get<1>(epochVersionTags) - 1;
TraceEvent("BackupRecruitment", self->dbgid)
.detail("BKID", req.reqId)
.detail("BackupWorker", req.reqId)
.detail("Tag", req.routerTag.toString())
.detail("Epoch", epoch)
.detail("BackupEpoch", req.backupEpoch)

View File

@ -440,6 +440,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
} else if (deterministicRandom()->random01() < 0.1) {
targetVersion = desc.maxRestorableVersion.get();
} else if (deterministicRandom()->random01() < 0.5) {
ASSERT_WE_THINK(desc.minRestorableVersion.get() <= desc.contiguousLogEnd.get());
ASSERT_WE_THINK(desc.contiguousLogEnd.get() <= desc.maxRestorableVersion.get());
targetVersion = deterministicRandom()->randomInt64(desc.minRestorableVersion.get(),
desc.contiguousLogEnd.get());
}