Add decode function for backup progress

This commit is contained in:
Jingyu Zhou 2019-05-31 09:50:16 -07:00
parent f245084bf3
commit 41f0cf2bb5
3 changed files with 37 additions and 3 deletions

View File

@ -502,9 +502,9 @@ const Key backupProgressKeyFor(UID workerID) {
return wr.toValue();
}
const Value backupProgressValue(int64_t recoveryCount, Version version) {
const Value backupProgressValue(LogEpoch epoch, Version version) {
BinaryWriter wr(IncludeVersion());
wr << recoveryCount << version;
wr << epoch << version;
return wr.toValue();
}
@ -515,6 +515,11 @@ UID decodeBackupProgressKey(const KeyRef& key) {
return serverID;
}
void decodeBackupProgressValue(const ValueRef& value, LogEpoch& epoch, Version& version) {
BinaryReader reader(value, IncludeVersion());
reader >> epoch >> version;
}
const KeyRef coordinatorsKey = LiteralStringRef("\xff/coordinators");
const KeyRef logsKey = LiteralStringRef("\xff/logs");
const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCommitVersion");

View File

@ -178,8 +178,9 @@ ProcessData decodeWorkerListValue( ValueRef const& );
extern const KeyRangeRef backupProgressKeys;
extern const KeyRef backupProgressPrefix;
const Key backupProgressKeyFor(UID workerID);
const Value backupProgressValue(int64_t recoveryCount, Version version);
const Value backupProgressValue(LogEpoch epoch, Version version);
UID decodeBackupProgressKey(const KeyRef& key);
void decodeBackupProgressValue(const ValueRef& value, LogEpoch& epoch, Version& version);
extern const KeyRef coordinatorsKey;
extern const KeyRef logsKey;

View File

@ -1230,6 +1230,33 @@ ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
}
}
ACTOR Future<std::vector<std::tuple<UID, LogEpoch, Version>>> getBackupProgress(Reference<MasterData> self) {
state Database cx = openDBOnServer(self->dbInfo, TaskDefaultEndpoint, true, true);
state Transaction tr(cx);
loop {
try {
state std::vector<std::tuple<UID, LogEpoch, Version>> progress;
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Standalone<RangeResultRef> results = wait(tr.getRange(backupProgressKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
for (auto& it : results) {
UID workerID = decodeBackupProgressKey(it.key);
LogEpoch recoveryCount;
Version v;
decodeBackupProgressValue(it.value, recoveryCount, v);
progress.emplace_back(workerID, recoveryCount, v);
TraceEvent("GotBackupProgress", self->dbgid).detail("W", workerID).detail("Epoch", recoveryCount).detail("Version", v);
}
wait(tr.commit());
return progress;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
state TraceInterval recoveryInterval("MasterRecovery");
state double recoverStartTime = now();
@ -1257,6 +1284,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality, &self->forceRecovery);
state Future<std::vector<std::tuple<UID, LogEpoch, Version>>> backupProgress = getBackupProgress(self);
DBCoreState newState = self->cstate.myDBState;
newState.recoveryCount++;