Using BackupConfig from backup.actor.cpp to reduce intermediate
functions.
This commit is contained in:
parent
fe208d6adf
commit
c7df951f7c
|
@ -863,6 +863,7 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
totalBlobStats.create(p.first + ".$sum") = p.second;
|
||||
|
||||
state FileBackupAgent fba;
|
||||
state std::vector<KeyBackedTag> backupTags = wait(getAllBackupTags(tr));
|
||||
state Standalone<RangeResultRef> backupTagNames = wait( tr->getRange(fba.tagNames.range(), 10000));
|
||||
state std::vector<Future<Version>> tagLastRestorableVersions;
|
||||
state std::vector<Future<int>> tagStates;
|
||||
|
@ -871,14 +872,26 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
state std::vector<Future<int64_t>> tagLogBytes;
|
||||
state int i = 0;
|
||||
|
||||
for(i = 0; i < backupTagNames.size(); i++) {
|
||||
Standalone<KeyRef> tagName = fba.tagNames.unpack(backupTagNames[i].key).getString(0);
|
||||
UID tagUID = BinaryReader::fromStringRef<UID>(backupTagNames[i].value, Unversioned());
|
||||
tagLastRestorableVersions.push_back(fba.getLastRestorable(tr, tagName));
|
||||
tagStates.push_back(fba.getStateValue(tr, tagUID));
|
||||
tagContainers.push_back(fba.getLastBackupContainer(tr, tagUID));
|
||||
tagRangeBytes.push_back(fba.getRangeBytesWritten(tr, tagUID));
|
||||
tagLogBytes.push_back(fba.getLogBytesWritten(tr, tagUID));
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
for (KeyBackedTag eachTag : backupTags) {
|
||||
state KeyBackedTag tag = eachTag;
|
||||
UidAndAbortedFlagT uidAndAbortedFlag = wait(tag.getOrThrow(tr));
|
||||
state BackupConfig config(uidAndAbortedFlag.first);
|
||||
|
||||
EBackupState status = wait(config.stateEnum().getOrThrow(tr));
|
||||
tagStates.push_back(status);
|
||||
|
||||
int64_t rangeBytesWritten = wait(config.rangeBytesWritten().getD(tr, 0));
|
||||
tagRangeBytes.push_back(rangeBytesWritten);
|
||||
|
||||
int64_t logBytesWritten = wait(config.logBytesWritten().getD(tr, 0));
|
||||
tagLogBytes.push_back(logBytesWritten);
|
||||
|
||||
std::string backupContainer = wait(config.backupContainer().getOrThrow(tr));
|
||||
tagContainers.push_back(backupContainer);
|
||||
|
||||
tagLastRestorableVersions.push_back(fba.getLastRestorable(tr, StringRef(tag.tagName)));
|
||||
}
|
||||
|
||||
Void _ = wait( waitForAll(tagLastRestorableVersions) && waitForAll(tagStates) && waitForAll(tagContainers) && waitForAll(tagRangeBytes) && waitForAll(tagLogBytes));
|
||||
|
@ -886,16 +899,15 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
JSONDoc tagsRoot = layerRoot.subDoc("tags.$latest");
|
||||
layerRoot.create("tags.timestamp") = now();
|
||||
|
||||
for (int j = 0; j < backupTagNames.size(); j++) {
|
||||
std::string tagName = fba.tagNames.unpack(backupTagNames[j].key).getString(0).toString();
|
||||
|
||||
int j = 0;
|
||||
for (KeyBackedTag eachTag : backupTags) {
|
||||
Version last_restorable_version = tagLastRestorableVersions[j].get();
|
||||
double last_restorable_seconds_behind = ((double)readVer - last_restorable_version) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
|
||||
BackupAgentBase::enumState status = (BackupAgentBase::enumState)tagStates[j].get();
|
||||
const char *statusText = fba.getStateText(status);
|
||||
|
||||
// The object for this backup tag inside this instance's subdocument
|
||||
JSONDoc tagRoot = tagsRoot.subDoc(tagName);
|
||||
JSONDoc tagRoot = tagsRoot.subDoc(eachTag.tagName);
|
||||
tagRoot.create("current_container") = tagContainers[j].get();
|
||||
tagRoot.create("current_status") = statusText;
|
||||
tagRoot.create("last_restorable_version") = tagLastRestorableVersions[j].get();
|
||||
|
@ -904,6 +916,8 @@ ACTOR Future<std::string> getLayerStatus(Reference<ReadYourWritesTransaction> tr
|
|||
tagRoot.create("running_backup_is_restorable") = (status == BackupAgentBase::STATE_DIFFERENTIAL);
|
||||
tagRoot.create("range_bytes_written") = tagRangeBytes[j].get();
|
||||
tagRoot.create("mutation_log_bytes_written") = tagLogBytes[j].get();
|
||||
|
||||
j++;
|
||||
}
|
||||
}
|
||||
else if(exe == EXE_DR_AGENT) {
|
||||
|
@ -1237,12 +1251,12 @@ ACTOR Future<Void> submitBackup(Database db, std::string destinationDir, Standal
|
|||
}
|
||||
|
||||
else {
|
||||
Void _ = wait(backupAgent.submitBackup(db, KeyRef(destinationDir), KeyRef(tagName), backupRanges, stopWhenDone));
|
||||
Void _ = wait(backupAgent.submitBackup(db, KeyRef(destinationDir), tagName, backupRanges, stopWhenDone));
|
||||
|
||||
// Wait for the backup to complete, if requested
|
||||
if (waitForCompletion) {
|
||||
printf("Submitted and now waiting for the backup on tag `%s' to complete.\n", printable(StringRef(tagName)).c_str());
|
||||
int _ = wait(backupAgent.waitBackup(db, StringRef(tagName)));
|
||||
int _ = wait(backupAgent.waitBackup(db, tagName));
|
||||
}
|
||||
else {
|
||||
// Check if a backup agent is running
|
||||
|
@ -1421,7 +1435,7 @@ ACTOR Future<Void> waitBackup(Database db, std::string tagName, bool stopWhenDon
|
|||
{
|
||||
state FileBackupAgent backupAgent;
|
||||
|
||||
int status = wait(backupAgent.waitBackup(db, StringRef(tagName), stopWhenDone));
|
||||
int status = wait(backupAgent.waitBackup(db, tagName, stopWhenDone));
|
||||
|
||||
printf("The backup on tag `%s' %s.\n", printable(StringRef(tagName)).c_str(),
|
||||
BackupAgentBase::getStateText((BackupAgentBase::enumState) status));
|
||||
|
@ -1446,7 +1460,7 @@ ACTOR Future<Void> discontinueBackup(Database db, std::string tagName, bool wait
|
|||
// Wait for the backup to complete, if requested
|
||||
if (waitForCompletion) {
|
||||
printf("Discontinued and now waiting for the backup on tag `%s' to complete.\n", printable(StringRef(tagName)).c_str());
|
||||
int _ = wait(backupAgent.waitBackup(db, StringRef(tagName)));
|
||||
int _ = wait(backupAgent.waitBackup(db, tagName));
|
||||
}
|
||||
else {
|
||||
printf("The backup on tag `%s' was successfully discontinued.\n", printable(StringRef(tagName)).c_str());
|
||||
|
|
|
@ -307,21 +307,11 @@ public:
|
|||
}
|
||||
|
||||
Future<Version> getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
||||
Future<Version> getLastRestorable(Database cx, Key tagName) {
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getLastRestorable(tr, tagName); });
|
||||
}
|
||||
|
||||
Future<int64_t> getRangeBytesWritten(Reference<ReadYourWritesTransaction> tr, UID logUid);
|
||||
Future<int64_t> getLogBytesWritten(Reference<ReadYourWritesTransaction> tr, UID logUid);
|
||||
|
||||
// stopWhenDone will return when the backup is stopped, if enabled. Otherwise, it
|
||||
// will return when the backup directory is restorable.
|
||||
Future<int> waitBackup(Database cx, std::string tagName, bool stopWhenDone = true);
|
||||
|
||||
Future<std::string> getLastBackupContainer(Reference<ReadYourWritesTransaction> tr, UID logUid);
|
||||
Future<std::string> getLastBackupContainer(Database cx, UID logUid) {
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getLastBackupContainer(tr, logUid); });
|
||||
}
|
||||
static Future<std::string> getBackupInfo(std::string backupContainer, Version* defaultVersion = NULL);
|
||||
|
||||
static std::string getTempFilename();
|
||||
|
|
|
@ -725,14 +725,6 @@ namespace fileBackup {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<std::string> getPath(Reference<ReadYourWritesTransaction> tr, UID uid) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state std::string dir = wait(BackupConfig(uid).backupContainer().getD(tr, ""));
|
||||
|
||||
return dir;
|
||||
}
|
||||
|
||||
ACTOR template <class Tr>
|
||||
Future<Void> checkTaskVersion(Tr tr, Reference<Task> task, StringRef name, uint32_t version) {
|
||||
uint32_t taskVersion = task->getVersion();
|
||||
|
@ -1514,7 +1506,6 @@ namespace fileBackup {
|
|||
state Key configPath = uidPrefixKey(logRangesRange.begin, uid);
|
||||
state Key logsPath = uidPrefixKey(backupLogKeys.begin, uid);
|
||||
|
||||
backup.clear(tr);
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
backup.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
|
||||
|
@ -3269,11 +3260,9 @@ public:
|
|||
|
||||
state BackupConfig config(oldUidAndAborted.get().first);
|
||||
state EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
Optional<EBackupState> configState = wait(config.stateEnum().get(tr));
|
||||
ASSERT(configState.present() && configState.get() == status);
|
||||
|
||||
// Break, if no longer runnable
|
||||
if (!FileBackupAgent::isRunnable((BackupAgentBase::enumState)status)) {
|
||||
if (!FileBackupAgent::isRunnable(status)) {
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -3488,12 +3477,13 @@ public:
|
|||
ACTOR static Future<Void> discontinueBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key tagName) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state UID uid = wait(backupAgent->getLogUid(tr, tagName));
|
||||
|
||||
state BackupConfig config(uid);
|
||||
state KeyBackedTag tag = makeBackupTag(tagName.toString());
|
||||
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr));
|
||||
state BackupConfig config(current.first);
|
||||
state EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
|
||||
if (!FileBackupAgent::isRunnable((BackupAgentBase::enumState)status)) {
|
||||
if (!FileBackupAgent::isRunnable(status)) {
|
||||
throw backup_unneeded();
|
||||
}
|
||||
|
||||
|
@ -3503,7 +3493,6 @@ public:
|
|||
throw backup_duplicate();
|
||||
}
|
||||
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
config.stopWhenDone().set(tr, true);
|
||||
|
||||
return Void();
|
||||
|
@ -3514,12 +3503,9 @@ public:
|
|||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
state KeyBackedTag tag = makeBackupTag(tagName);
|
||||
state Optional<UidAndAbortedFlagT> current = wait(tag.get(tr));
|
||||
if (!current.present()) {
|
||||
throw backup_error();
|
||||
}
|
||||
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr));
|
||||
|
||||
state BackupConfig config(current.get().first);
|
||||
state BackupConfig config(current.first);
|
||||
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
|
||||
if (!backupAgent->isRunnable((BackupAgentBase::enumState)status)) {
|
||||
|
@ -3558,7 +3544,7 @@ public:
|
|||
state Optional<UidAndAbortedFlagT> uidAndAbortedFlag = wait(tag.get(tr));
|
||||
if (uidAndAbortedFlag.present()) {
|
||||
config = BackupConfig(uidAndAbortedFlag.get().first);
|
||||
state EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
backupState = status;
|
||||
}
|
||||
|
||||
|
@ -3566,7 +3552,7 @@ public:
|
|||
statusText += "No previous backups found.\n";
|
||||
} else {
|
||||
state std::string backupStatus(BackupAgentBase::getStateText(backupState));
|
||||
state std::string outContainer = wait(backupAgent->getLastBackupContainer(tr, config.getUid()));
|
||||
state std::string outContainer = wait(config.backupContainer().getOrThrow(tr));
|
||||
state Version stopVersion = wait(config.stopVersion().getD(tr, -1));
|
||||
|
||||
switch (backupState) {
|
||||
|
@ -3744,16 +3730,17 @@ public:
|
|||
//the tagname of the backup must be the same as the restore.
|
||||
ACTOR static Future<Version> atomicRestore(FileBackupAgent* backupAgent, Database cx, Key tagName, KeyRange range, Key addPrefix, Key removePrefix) {
|
||||
state Reference<ReadYourWritesTransaction> ryw_tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
|
||||
state UID logUid;
|
||||
state BackupConfig backupConfig;
|
||||
loop {
|
||||
try {
|
||||
ryw_tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
ryw_tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
UID _logUid = wait(backupAgent->getLogUid(ryw_tr, tagName));
|
||||
logUid = _logUid;
|
||||
state int status = wait(backupAgent->getStateValue(ryw_tr, logUid));
|
||||
state KeyBackedTag tag = makeBackupTag(tagName.toString());
|
||||
UidAndAbortedFlagT uidFlag = wait(tag.getOrThrow(ryw_tr));
|
||||
backupConfig = BackupConfig(uidFlag.first);
|
||||
state EBackupState status = wait(backupConfig.stateEnum().getOrThrow(ryw_tr));
|
||||
|
||||
if ( (BackupAgentBase::enumState)status != BackupAgentBase::STATE_DIFFERENTIAL ) {
|
||||
if (status != BackupAgentBase::STATE_DIFFERENTIAL ) {
|
||||
throw backup_duplicate();
|
||||
}
|
||||
|
||||
|
@ -3808,8 +3795,8 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
std::string lastBackupContainer = wait(backupAgent->getLastBackupContainer(cx, logUid));
|
||||
|
||||
std::string lastBackupContainer = wait(backupConfig.backupContainer().getOrThrow(cx));
|
||||
|
||||
Version ver = wait( restore(backupAgent, cx, tagName, KeyRef(lastBackupContainer), true, -1, true, range, addPrefix, removePrefix, true, randomUid) );
|
||||
return ver;
|
||||
}
|
||||
|
@ -3859,14 +3846,6 @@ Future<int> FileBackupAgent::getStateValue(Reference<ReadYourWritesTransaction>
|
|||
return FileBackupAgentImpl::getStateValue(this, tr, logUid);
|
||||
}
|
||||
|
||||
Future<int64_t> FileBackupAgent::getRangeBytesWritten(Reference<ReadYourWritesTransaction> tr, UID logUid) {
|
||||
return BackupConfig(logUid).rangeBytesWritten().getD(tr);
|
||||
}
|
||||
|
||||
Future<int64_t> FileBackupAgent::getLogBytesWritten(Reference<ReadYourWritesTransaction> tr, UID logUid) {
|
||||
return BackupConfig(logUid).logBytesWritten().getD(tr);
|
||||
}
|
||||
|
||||
Future<Version> FileBackupAgent::getStateStopVersion(Reference<ReadYourWritesTransaction> tr, UID logUid) {
|
||||
return FileBackupAgentImpl::getStateStopVersion(this, tr, logUid);
|
||||
}
|
||||
|
@ -3898,7 +3877,3 @@ std::string FileBackupAgent::getLogFilename(Version beginVer, Version endVer, in
|
|||
Future<std::string> FileBackupAgent::getBackupInfo(std::string container, Version* defaultVersion) {
|
||||
return FileBackupAgentImpl::getBackupInfo(IBackupContainer::openContainer(container), defaultVersion);
|
||||
}
|
||||
|
||||
Future<std::string> FileBackupAgent::getLastBackupContainer(Reference<ReadYourWritesTransaction> tr, UID logUid) {
|
||||
return fileBackup::getPath(tr, logUid);
|
||||
}
|
||||
|
|
|
@ -119,14 +119,28 @@ public:
|
|||
}
|
||||
// Get property's value or throw error if it doesn't exist
|
||||
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr, bool snapshot = false, Error err = key_not_found()) const {
|
||||
auto keyCopy = key;
|
||||
return map(get(tr, snapshot), [=](Optional<T> val) -> T {
|
||||
if (!val.present())
|
||||
if (!val.present()) {
|
||||
TraceEvent(SevError, "KeyBackedProperty keyNotFound")
|
||||
.detail("key", printable(keyCopy))
|
||||
.detail("err", err.code());
|
||||
throw err;
|
||||
}
|
||||
|
||||
return val.get();
|
||||
});
|
||||
}
|
||||
|
||||
Future<T> getOrThrow(Database cx, bool snapshot = false, Error err = key_not_found()) const {
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return getOrThrow(tr, snapshot, err);
|
||||
});
|
||||
}
|
||||
|
||||
void set(Reference<ReadYourWritesTransaction> tr, T const &val) {
|
||||
return tr->set(key, Codec<T>::pack(val).pack());
|
||||
}
|
||||
|
|
|
@ -143,11 +143,13 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
try {
|
||||
if (BUGGIFY) {
|
||||
TraceEvent("BARW_doBackup waitForRestorable", randomID).detail("tag", printable(tag));
|
||||
state KeyBackedTag backupTag = makeBackupTag(tag.toString());
|
||||
TraceEvent("BARW_doBackup waitForRestorable", randomID).detail("tag", backupTag.tagName);
|
||||
// Wait until the backup is in a restorable state
|
||||
state int resultWait = wait(backupAgent->waitBackup(cx, tag.toString(), false));
|
||||
state UID logUid = wait(backupAgent->getLogUid(cx, tag));
|
||||
state std::string lastBackupContainer = wait(backupAgent->getLastBackupContainer(cx, logUid));
|
||||
state int resultWait = wait(backupAgent->waitBackup(cx, backupTag.tagName, false));
|
||||
UidAndAbortedFlagT uidFlag = wait(backupTag.getOrThrow(cx));
|
||||
state UID logUid = uidFlag.first;
|
||||
state std::string lastBackupContainer = wait(BackupConfig(logUid).backupContainer().getOrThrow(cx));
|
||||
|
||||
state std::string restorableFile = joinPath(lastBackupContainer, "restorable");
|
||||
TraceEvent("BARW_lastBackupContainer", randomID).detail("backupTag", printable(tag)).detail("lastBackupContainer", lastBackupContainer)
|
||||
|
@ -291,8 +293,10 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
TraceEvent("BARW_doBackupDone", randomID).detail("backupTag", printable(self->backupTag)).detail("abortAndRestartAfter", self->abortAndRestartAfter);
|
||||
|
||||
state UID logUid = wait(backupAgent.getLogUid(cx, self->backupTag));
|
||||
state std::string lastBackupContainer = wait(backupAgent.getLastBackupContainer(cx, logUid));
|
||||
state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString());
|
||||
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx));
|
||||
state UID logUid = uidFlag.first;
|
||||
state std::string lastBackupContainer = wait(BackupConfig(logUid).backupContainer().getOrThrow(cx));
|
||||
|
||||
// Occasionally start yet another backup that might still be running when we restore
|
||||
if (!self->locked && BUGGIFY) {
|
||||
|
|
Loading…
Reference in New Issue