Merge pull request #186 from bmuppana/backup-joshua-fix
Backup joshua fixes
This commit is contained in:
commit
f6822a4f6b
|
@ -1145,6 +1145,7 @@ namespace fileBackup {
|
|||
// Enable the stop key
|
||||
state Version readVersion = wait(tr->getReadVersion());
|
||||
config.stopVersion().set(tr, readVersion);
|
||||
TraceEvent(SevInfo, "FBA_setStopVersion").detail("stopVersion", readVersion);
|
||||
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
|
||||
|
@ -1755,6 +1756,8 @@ namespace fileBackup {
|
|||
Void _ = wait(mf->sync());
|
||||
|
||||
std::string fileName = format("kvmanifest,%lld,%lld,%lld,%s", minVer, maxVer, totalBytes, g_random->randomUniqueID().toString().c_str());
|
||||
|
||||
TraceEvent(SevInfo, "FBA_KVManifest").detail("fileName", fileName.c_str());
|
||||
Void _ = wait(bc->renameFile(tempFile, fileName));
|
||||
|
||||
return Void();
|
||||
|
@ -3429,7 +3432,7 @@ public:
|
|||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
state KeyBackedTag tag = makeBackupTag(tagName.toString());
|
||||
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr));
|
||||
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
|
||||
state BackupConfig config(current.first);
|
||||
state EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
|
||||
|
@ -3457,7 +3460,7 @@ public:
|
|||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
state KeyBackedTag tag = makeBackupTag(tagName);
|
||||
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr));
|
||||
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
|
||||
|
||||
state BackupConfig config(current.first);
|
||||
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
|
|
|
@ -120,11 +120,13 @@ public:
|
|||
// Get property's value or throw error if it doesn't exist
|
||||
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr, bool snapshot = false, Error err = key_not_found()) const {
|
||||
auto keyCopy = key;
|
||||
auto backtrace = platform::get_backtrace();
|
||||
return map(get(tr, snapshot), [=](Optional<T> val) -> T {
|
||||
if (!val.present()) {
|
||||
TraceEvent(SevError, "KeyBackedProperty keyNotFound")
|
||||
TraceEvent(SevInfo, "KeyBackedProperty_keyNotFound")
|
||||
.detail("key", printable(keyCopy))
|
||||
.detail("err", err.code());
|
||||
.detail("err", err.code())
|
||||
.detail("parentTrace", backtrace.c_str());
|
||||
throw err;
|
||||
}
|
||||
|
||||
|
|
|
@ -1373,22 +1373,18 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
|
|||
#define TIME_KEEPER_VERSION LiteralStringRef("1")
|
||||
|
||||
ACTOR Future<Void> timeKeeperSetVersion(ClusterControllerData *self) {
|
||||
try {
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(
|
||||
new ReadYourWritesTransaction(self->cx));
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->set(timeKeeperVersionKey, TIME_KEEPER_VERSION);
|
||||
Void _ = wait(tr->commit());
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(
|
||||
new ReadYourWritesTransaction(self->cx));
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->set(timeKeeperVersionKey, TIME_KEEPER_VERSION);
|
||||
Void _ = wait(tr->commit());
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
} catch (Error & e) {
|
||||
TraceEvent(SevWarnAlways, "TimeKeeperSetupVersionFailed").detail("cause", e.what());
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -1405,36 +1401,31 @@ ACTOR Future<Void> timeKeeper(ClusterControllerData *self) {
|
|||
Void _ = wait(timeKeeperSetVersion(self));
|
||||
|
||||
loop {
|
||||
try {
|
||||
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(self->cx));
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Reference<ReadYourWritesTransaction> tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(self->cx));
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
Optional<Value> disableValue = wait( tr->get(timeKeeperDisableKey) );
|
||||
if(disableValue.present()) {
|
||||
break;
|
||||
}
|
||||
|
||||
Version v = tr->getReadVersion().get();
|
||||
int64_t currentTime = (int64_t)now();
|
||||
versionMap.set(tr, currentTime, v);
|
||||
|
||||
int64_t ttl = currentTime - SERVER_KNOBS->TIME_KEEPER_DELAY * SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES;
|
||||
if (ttl > 0) {
|
||||
versionMap.erase(tr, 0, ttl);
|
||||
}
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
Optional<Value> disableValue = wait( tr->get(timeKeeperDisableKey) );
|
||||
if(disableValue.present()) {
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
|
||||
Version v = tr->getReadVersion().get();
|
||||
int64_t currentTime = (int64_t)now();
|
||||
versionMap.set(tr, currentTime, v);
|
||||
|
||||
int64_t ttl = currentTime - SERVER_KNOBS->TIME_KEEPER_DELAY * SERVER_KNOBS->TIME_KEEPER_MAX_ENTRIES;
|
||||
if (ttl > 0) {
|
||||
versionMap.erase(tr, 0, ttl);
|
||||
}
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
} catch (Error &e) {
|
||||
// Failed to update time-version map even after retries, just ignore this iteration
|
||||
TraceEvent(SevWarn, "TimeKeeperFailed").detail("cause", e.what());
|
||||
}
|
||||
|
||||
Void _ = wait(delay(SERVER_KNOBS->TIME_KEEPER_DELAY));
|
||||
|
|
|
@ -122,26 +122,26 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
Void _ = wait( delay( startDelay ));
|
||||
|
||||
if (startDelay || BUGGIFY) {
|
||||
TraceEvent("BARW_doBackup abortBackup1", randomID).detail("tag", printable(tag)).detail("startDelay", startDelay);
|
||||
TraceEvent("BARW_doBackupAbortBackup1", randomID).detail("tag", printable(tag)).detail("startDelay", startDelay);
|
||||
|
||||
try {
|
||||
Void _ = wait(backupAgent->abortBackup(cx, tag.toString()));
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_doBackup abortBackup Exception", randomID).detail("tag", printable(tag)).error(e);
|
||||
TraceEvent("BARW_doBackupAbortBackupException", randomID).detail("tag", printable(tag)).error(e);
|
||||
if (e.code() != error_code_backup_unneeded)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("BARW_doBackup submitBackup", randomID).detail("tag", printable(tag)).detail("stopWhenDone", stopDifferentialDelay ? "False" : "True");
|
||||
TraceEvent("BARW_doBackupSubmitBackup", randomID).detail("tag", printable(tag)).detail("stopWhenDone", stopDifferentialDelay ? "False" : "True");
|
||||
|
||||
state std::string backupContainer = "file://simfdb/backups/";
|
||||
try {
|
||||
Void _ = wait(backupAgent->submitBackup(cx, StringRef(backupContainer), tag.toString(), backupRanges, stopDifferentialDelay ? false : true));
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_doBackup submitBackup Exception", randomID).detail("tag", printable(tag)).error(e);
|
||||
TraceEvent("BARW_doBackupSubmitBackupException", randomID).detail("tag", printable(tag)).error(e);
|
||||
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
|
||||
throw;
|
||||
}
|
||||
|
@ -152,17 +152,17 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
if (stopDifferentialDelay) {
|
||||
TEST(!stopDifferentialFuture.isReady()); //Restore starts at specified time
|
||||
Void _ = wait(stopDifferentialFuture);
|
||||
TraceEvent("BARW_doBackup waitToDiscontinue", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
|
||||
TraceEvent("BARW_doBackupWaitToDiscontinue", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
|
||||
|
||||
try {
|
||||
if (BUGGIFY) {
|
||||
state KeyBackedTag backupTag = makeBackupTag(tag.toString());
|
||||
TraceEvent("BARW_doBackup waitForRestorable", randomID).detail("tag", backupTag.tagName);
|
||||
TraceEvent("BARW_doBackupWaitForRestorable", randomID).detail("tag", backupTag.tagName);
|
||||
// Wait until the backup is in a restorable state
|
||||
state int resultWait = wait(backupAgent->waitBackup(cx, backupTag.tagName, false));
|
||||
UidAndAbortedFlagT uidFlag = wait(backupTag.getOrThrow(cx));
|
||||
state UID logUid = uidFlag.first;
|
||||
state std::string lastBackupContainer = wait(BackupConfig(logUid).backupContainer().getOrThrow(cx));
|
||||
state std::string lastBackupContainer = wait(BackupConfig(logUid).backupContainer().getOrThrow(cx, false, backup_unneeded()));
|
||||
|
||||
state std::string restorableFile = joinPath(lastBackupContainer, "restorable");
|
||||
TraceEvent("BARW_lastBackupContainer", randomID).detail("backupTag", printable(tag)).detail("lastBackupContainer", lastBackupContainer)
|
||||
|
@ -190,30 +190,30 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
// Abort the backup, if not the first backup because the second backup may have aborted the backup by now
|
||||
if (startDelay) {
|
||||
TraceEvent("BARW_doBackup abortBackup2", randomID).detail("tag", printable(tag))
|
||||
TraceEvent("BARW_doBackupAbortBackup2", randomID).detail("tag", printable(tag))
|
||||
.detail("waitStatus", resultWait).detail("lastBackupContainer", lastBackupContainer).detail("restorable", restorableFile);
|
||||
Void _ = wait(backupAgent->abortBackup(cx, tag.toString()));
|
||||
}
|
||||
else {
|
||||
TraceEvent("BARW_doBackup discontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
|
||||
TraceEvent("BARW_doBackupDiscontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
|
||||
Void _ = wait(backupAgent->discontinueBackup(cx, tag));
|
||||
}
|
||||
}
|
||||
|
||||
else {
|
||||
TraceEvent("BARW_doBackup discontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
|
||||
TraceEvent("BARW_doBackupDiscontinueBackup", randomID).detail("tag", printable(tag)).detail("differentialAfter", stopDifferentialDelay);
|
||||
Void _ = wait(backupAgent->discontinueBackup(cx, tag));
|
||||
}
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_doBackup discontinueBackup Exception", randomID).detail("tag", printable(tag)).error(e);
|
||||
TraceEvent("BARW_doBackupDiscontinueBackupException", randomID).detail("tag", printable(tag)).error(e);
|
||||
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the backup to complete
|
||||
TraceEvent("BARW_doBackup waitBackup", randomID).detail("tag", printable(tag));
|
||||
TraceEvent("BARW_doBackupWaitBackup", randomID).detail("tag", printable(tag));
|
||||
state int statusValue = wait(backupAgent->waitBackup(cx, tag.toString(), true));
|
||||
|
||||
state std::string statusText;
|
||||
|
@ -222,7 +222,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
statusText = _statusText;
|
||||
// Can we validate anything about status?
|
||||
|
||||
TraceEvent("BARW_doBackup complete", randomID).detail("tag", printable(tag))
|
||||
TraceEvent("BARW_doBackupComplete", randomID).detail("tag", printable(tag))
|
||||
.detail("status", statusText).detail("statusValue", statusValue);
|
||||
|
||||
return Void();
|
||||
|
@ -318,7 +318,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
extraBackup = backupAgent.submitBackup(cx, LiteralStringRef("file://simfdb/backups/"), self->backupTag.toString(), self->backupRanges, true);
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_submitBackup2 Exception", randomID).detail("backupTag", printable(self->backupTag)).error(e);
|
||||
TraceEvent("BARW_submitBackup2Exception", randomID).detail("backupTag", printable(self->backupTag)).error(e);
|
||||
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
|
||||
throw;
|
||||
}
|
||||
|
@ -376,23 +376,23 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
if (extraBackup.isValid()) {
|
||||
TraceEvent("BARW_wait extraBackup", randomID).detail("backupTag", printable(self->backupTag));
|
||||
TraceEvent("BARW_waitExtraBackup", randomID).detail("backupTag", printable(self->backupTag));
|
||||
extraTasks = true;
|
||||
try {
|
||||
Void _ = wait(extraBackup);
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_extraBackup Exception", randomID).detail("backupTag", printable(self->backupTag)).error(e);
|
||||
TraceEvent("BARW_extraBackupException", randomID).detail("backupTag", printable(self->backupTag)).error(e);
|
||||
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
|
||||
throw;
|
||||
}
|
||||
|
||||
TraceEvent("BARW_abortBackup extra", randomID).detail("backupTag", printable(self->backupTag));
|
||||
TraceEvent("BARW_abortBackupExtra", randomID).detail("backupTag", printable(self->backupTag));
|
||||
try {
|
||||
Void _ = wait(backupAgent.abortBackup(cx, self->backupTag.toString()));
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_abortBackup extra Exception", randomID).error(e);
|
||||
TraceEvent("BARW_abortBackupExtraException", randomID).error(e);
|
||||
if (e.code() != error_code_backup_unneeded)
|
||||
throw;
|
||||
}
|
||||
|
@ -484,7 +484,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
break;
|
||||
}
|
||||
catch (Error &e) {
|
||||
TraceEvent("BARW_check Exception", randomID).error(e);
|
||||
TraceEvent("BARW_checkException", randomID).error(e);
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue