sequantial restore: pass 1 test case

-r simulation --logsize 1024MiB -f foundationdb/tests/fast/ParallelRestoreCorrectness.txt -b off -s 95208406
This commit is contained in:
Meng Xu 2018-11-29 10:31:47 -08:00
parent c800ccde03
commit 1b085a9817
12 changed files with 2219 additions and 24 deletions

View File

@ -69,6 +69,14 @@ struct LogFile {
bool operator< (const LogFile &rhs) const {
return beginVersion == rhs.beginVersion ? endVersion < rhs.endVersion : beginVersion < rhs.beginVersion;
}
//return info
std::string toString() const {
std::string ret;
ret = "beginVersion:" + std::to_string(beginVersion) + " endVersion:" + std::to_string(endVersion)
+ " blockSize:" + std::to_string(blockSize) + " filename:" + fileName + " fileSize:" + std::to_string(fileSize);
return ret;
}
};
struct RangeFile {
@ -81,6 +89,14 @@ struct RangeFile {
bool operator< (const RangeFile &rhs) const {
return version == rhs.version ? fileName < rhs.fileName : version < rhs.version;
}
//return info
std::string toString() const {
std::string ret;
ret = "version:" + std::to_string(version) + " blockSize:" + std::to_string(blockSize) + " fileName:" + fileName
+ " fileSize:" + std::to_string(fileSize);
return ret;
}
};
struct KeyspaceSnapshotFile {

View File

@ -26,10 +26,19 @@
static const char * typeString[] = { "SetValue", "ClearRange", "AddValue", "DebugKeyRange", "DebugKey", "NoOp", "And", "Or", "Xor", "AppendIfFits", "AvailableForReuse", "Reserved_For_LogProtocolMessage", "Max", "Min", "SetVersionstampedKey", "SetVersionstampedValue", "ByteMin", "ByteMax", "MinV2", "AndV2" };
struct MutationRef;
std::string getHexString(StringRef input);
std::string getHexKey(StringRef input, int skip);
void printBackupMutationRefValueHex(Standalone<StringRef> val_input, std::string prefix);
struct MutationRef {
static const int OVERHEAD_BYTES = 12; //12 is the size of Header in MutationList entries
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or, Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue, ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP };
// This is stored this way for serialization purposes.
enum Type : uint8_t { SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or,
Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue,
ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP }; // This is stored this way for serialization purposes.
uint8_t type;
StringRef param1, param2;

View File

@ -206,6 +206,10 @@ struct KeyRangeRef {
return a.end < b.end;
}
};
std::string toString() const {
return "begin:" + begin.toString() + " end:" + end.toString();
}
};
inline KeyRangeRef operator & (const KeyRangeRef& lhs, const KeyRangeRef& rhs) {
@ -472,6 +476,11 @@ struct RangeResultRef : VectorRef<KeyValueRef> {
void serialize( Ar& ar ) {
ar & ((VectorRef<KeyValueRef>&)*this) & more & readThrough & readToBegin & readThroughEnd;
}
std::string toString() const {
return "more:" + std::to_string(more) + " readThrough:" + (readThrough.present() ? readThrough.get().toString() : "[unset]")
+ " readToBegin:" + std::to_string(readToBegin) + " readThroughEnd:" + std::to_string(readThroughEnd);
}
};
struct KeyValueStoreType {
@ -643,4 +652,6 @@ struct ClusterControllerPriorityInfo {
}
};
class Database;
#endif

View File

@ -1399,7 +1399,8 @@ ACTOR Future<Void> unlockDatabase( Transaction* tr, UID id ) {
return Void();
if(val.present() && BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()) != id) {
//TraceEvent("DBA_UnlockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()));
TraceEvent("DBA_UnlockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()));
printf("DBA_CheckLocked Expecting:%s Lock:%s\n", id.toString().c_str(), BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()).toString().c_str());
throw database_locked();
}
@ -1416,7 +1417,8 @@ ACTOR Future<Void> unlockDatabase( Reference<ReadYourWritesTransaction> tr, UID
return Void();
if(val.present() && BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()) != id) {
//TraceEvent("DBA_UnlockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()));
TraceEvent("DBA_UnlockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()));
printf("DBA_CheckLocked Expecting:%s Lock:%s\n", id.toString().c_str(), BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()).toString().c_str());
throw database_locked();
}
@ -1444,8 +1446,15 @@ ACTOR Future<Void> checkDatabaseLock( Transaction* tr, UID id ) {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait( tr->get(databaseLockedKey) );
if ( val.present() ) {
printf("DB is locked at uid:%s\n", id.toString().c_str());
} else {
printf("DB is not locked!\n");
}
if (val.present() && BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()) != id) {
//TraceEvent("DBA_CheckLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned())).backtrace();
TraceEvent("DBA_CheckLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned())).backtrace();
printf("DBA_CheckLocked Expecting:%s Lock:%s\n", id.toString().c_str(), BinaryReader::fromStringRef<UID>(val.get().substr(10), Unversioned()).toString().c_str());
throw database_locked();
}

View File

@ -28,15 +28,21 @@
struct MutationListRef {
// Represents an ordered, but not random-access, list of mutations that can be O(1) deserialized and
// quickly serialized, (forward) iterated or appended to.
// MX: MutationListRef is a list of struct Blob
// MX: Each blob has a struct Header following by the mutation's param1 and param2 content. The Header has the mutation's type and the length of param1 and param2
private:
struct Blob {
//StringRef data Format: |type|p1len|p2len|p1_content|p2_content|
// |type|p1len|p2len| is the header; p1_content has p1len length; p2_content has p2len length
StringRef data;
Blob* next;
};
struct Header {
int type, p1len, p2len;
const uint8_t* p1begin() const { return (const uint8_t*)(this+1); }
const uint8_t* p1begin() const { return (const uint8_t*)(this+1); } //(this+1) moves the pointer by Header size and get to the beginning of p1_content
const uint8_t* p2begin() const { return (const uint8_t*)(this+1) + p1len; }
const uint8_t* end() const { return (const uint8_t*)(this+1) + p1len + p2len; }
};
@ -49,8 +55,8 @@ public:
const MutationRef* operator->() { return &item; }
void operator++() {
ASSERT(blob->data.size() > 0);
auto e = ptr->end();
if (e == blob->data.end()) {
auto e = ptr->end(); // e points to the end of the current blob
if (e == blob->data.end()) { // the condition sanity checks e is at the end of current blob
blob = blob->next;
e = blob ? blob->data.begin() : NULL;
}
@ -180,4 +186,6 @@ typedef Standalone<MutationListRef> MutationList;
template <class Ar> void load( Ar& ar, MutationListRef& r ) { r.serialize_load(ar); }
template <class Ar> void save( Ar& ar, MutationListRef const& r ) { r.serialize_save(ar); }
void printMutationListRefHex(MutationListRef m, std::string prefix);
#endif

View File

@ -26,7 +26,7 @@
#include "flow/TDMetric.actor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBOptions.g.h" //Must use the generated .h
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/ClientLogEvents.h"

View File

@ -523,6 +523,7 @@ Key uidPrefixKey(KeyRef keyPrefix, UID logUid) {
// Apply mutations constant variables
// \xff/applyMutationsEnd/[16-byte UID] := serialize( endVersion, Unversioned() )
// MX: This indicates what is the highest version the mutation log can be applied
const KeyRangeRef applyMutationsEndRange(LiteralStringRef("\xff/applyMutationsEnd/"), LiteralStringRef("\xff/applyMutationsEnd0"));
// \xff/applyMutationsBegin/[16-byte UID] := serialize( beginVersion, Unversioned() )
@ -596,9 +597,80 @@ const KeyRangeRef restoreWorkersKeys(
LiteralStringRef("\xff\x02/restoreWorkers0")
);
const KeyRef restoreRequestTriggerKey = LiteralStringRef("\xff\x02/restoreRequestTrigger");
const KeyRef restoreRequestDoneKey = LiteralStringRef("\xff\x02/restoreRequestDone");
const KeyRangeRef restoreRequestKeys(
LiteralStringRef("\xff\x02/restoreRequests/"),
LiteralStringRef("\xff\x02/restoreRequests0")
);
// Encode restore agent key for agentID
const Key restoreWorkerKeyFor( UID const& agentID ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( restoreWorkersKeys.begin );
wr << agentID;
return wr.toStringRef();
}
// Encode restore agent value
const Value restoreWorkerValue( RestoreInterface const& server ) {
BinaryWriter wr(IncludeVersion());
wr << server;
return wr.toStringRef();
}
RestoreInterface decodeRestoreWorkerValue( ValueRef const& value ) {
RestoreInterface s;
BinaryReader reader( value, IncludeVersion() );
reader >> s;
return s;
}
// Encode and decode restore request value
// restoreRequestTrigger key
const Value restoreRequestTriggerValue (int const numRequests) {
BinaryWriter wr(IncludeVersion());
wr << numRequests;
return wr.toStringRef();
}
const int decodeRestoreRequestTriggerValue( ValueRef const& value ) {
int s;
BinaryReader reader( value, IncludeVersion() );
reader >> s;
return s;
}
// restoreRequestDone key
const Value restoreRequestDoneValue (int const numRequests) {
BinaryWriter wr(IncludeVersion());
wr << numRequests;
return wr.toStringRef();
}
const int decodeRestoreRequestDoneValue( ValueRef const& value ) {
int s;
BinaryReader reader( value, IncludeVersion() );
reader >> s;
return s;
}
const Key restoreRequestKeyFor( int const& index ) {
BinaryWriter wr(Unversioned());
wr.serializeBytes( restoreRequestKeys.begin );
wr << index;
return wr.toStringRef();
}
const Value restoreRequestValue( RestoreRequest const& request ) {
BinaryWriter wr(IncludeVersion());
wr << request;
return wr.toStringRef();
}
RestoreRequest decodeRestoreRequestValue( ValueRef const& value ) {
RestoreRequest s;
BinaryReader reader( value, IncludeVersion() );
reader >> s;
return s;
}

View File

@ -26,6 +26,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/RestoreInterface.h"
extern const KeyRangeRef normalKeys; // '' to systemKeys.begin
extern const KeyRangeRef systemKeys; // [FF] to [FF][FF]
@ -266,6 +267,21 @@ extern const KeyRangeRef monitorConfKeys;
extern const KeyRef restoreLeaderKey;
extern const KeyRangeRef restoreWorkersKeys;
extern const KeyRef restoreRequestTriggerKey;
extern const KeyRef restoreRequestDoneKey;
extern const KeyRangeRef restoreRequestKeys;
const Key restoreWorkerKeyFor( UID const& agentID );
const Value restoreWorkerValue( RestoreInterface const& server );
RestoreInterface decodeRestoreWorkerValue( ValueRef const& value );
// MX: parallel restore
const Value restoreRequestTriggerValue (int const numRequests);
const int decodeRestoreRequestTriggerValue( ValueRef const& value );
const Value restoreRequestDoneValue (int const numRequests);
const int decodeRestoreRequestDoneValue( ValueRef const& value );
const Key restoreRequestKeyFor( int const& index );
const Value restoreRequestValue( RestoreRequest const& server );
RestoreRequest decodeRestoreRequestValue( ValueRef const& value );
#endif

File diff suppressed because it is too large Load Diff

View File

@ -23,13 +23,16 @@
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.h"
//#include "fdbclient/NativeAPI.h" //MX: Cannot have NativeAPI.h in this .h
#include "fdbrpc/fdbrpc.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbrpc/Locality.h"
class RestoreConfig;
struct RestoreInterface {
RequestStream< struct TestRequest > test;
RequestStream< struct RestoreRequest > request;
bool operator == (RestoreInterface const& r) const { return id() == r.id(); }
bool operator != (RestoreInterface const& r) const { return id() != r.id(); }
@ -42,7 +45,7 @@ struct RestoreInterface {
template <class Ar>
void serialize( Ar& ar ) {
ar & test;
ar & test & request;
}
};
@ -71,8 +74,67 @@ struct TestReply {
}
};
struct RestoreRequest {
//Database cx;
int index;
Key tagName;
Key url;
bool waitForComplete;
Version targetVersion;
bool verbose;
KeyRange range;
Key addPrefix;
Key removePrefix;
bool lockDB;
UID randomUid;
int testData;
std::vector<int> restoreRequests;
//Key restoreTag;
ReplyPromise< struct RestoreReply > reply;
RestoreRequest() : testData(0) {}
explicit RestoreRequest(int testData) : testData(testData) {}
explicit RestoreRequest(int testData, std::vector<int> &restoreRequests) : testData(testData), restoreRequests(restoreRequests) {}
explicit RestoreRequest(const int index, const Key &tagName, const Key &url, bool waitForComplete, Version targetVersion, bool verbose,
const KeyRange &range, const Key &addPrefix, const Key &removePrefix, bool lockDB,
const UID &randomUid) : index(index), tagName(tagName), url(url), waitForComplete(waitForComplete),
targetVersion(targetVersion), verbose(verbose), range(range),
addPrefix(addPrefix), removePrefix(removePrefix), lockDB(lockDB),
randomUid(randomUid) {}
template <class Ar>
void serialize(Ar& ar) {
ar & index & tagName & url & waitForComplete & targetVersion & verbose & range & addPrefix & removePrefix & lockDB & randomUid &
testData & restoreRequests & reply;
}
std::string toString() const {
return "index:" + std::to_string(index) + " tagName:" + tagName.contents().toString() + " url:" + url.contents().toString()
+ " waitForComplete:" + std::to_string(waitForComplete) + " targetVersion:" + std::to_string(targetVersion)
+ " verbose:" + std::to_string(verbose) + " range:" + range.toString() + " addPrefix:" + addPrefix.contents().toString()
+ " removePrefix:" + removePrefix.contents().toString() + " lockDB:" + std::to_string(lockDB) + " randomUid:" + randomUid.toString();
}
};
struct RestoreReply {
int replyData;
std::vector<int> restoreReplies;
RestoreReply() : replyData(0) {}
explicit RestoreReply(int replyData) : replyData(replyData) {}
explicit RestoreReply(int replyData, std::vector<int> restoreReplies) : replyData(replyData), restoreReplies(restoreReplies) {}
template <class Ar>
void serialize(Ar& ar) {
ar & replyData & restoreReplies;
}
};
Future<Void> _restoreWorker(Database const& cx, LocalityData const& locality);
Future<Void> restoreWorker(Reference<ClusterConnectionFile> const& ccf, LocalityData const& locality);
#endif

View File

@ -23,6 +23,7 @@
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbserver/RestoreInterface.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -39,6 +40,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
bool allowPauses;
bool shareLogRange;
std::map<Standalone<KeyRef>, Standalone<ValueRef>> dbKVs;
BackupAndParallelRestoreCorrectnessWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx) {
locked = sharedRandomNumber % 2;
@ -90,6 +93,123 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
}
}
static void compareDBKVs(Standalone<RangeResultRef> data, BackupAndParallelRestoreCorrectnessWorkload* self) {
bool hasDiff = false;
//Get the new KV pairs in the DB
std::map<Standalone<KeyRef>, Standalone<ValueRef>> newDbKVs;
for ( auto kvRef = data.contents().begin(); kvRef != data.contents().end(); kvRef++ ) {
newDbKVs.insert(std::make_pair(kvRef->key, kvRef->value));
}
if ( self->dbKVs.empty() ) {
printf("[CheckDB] set DB kv for the first time.\n");
self->dbKVs = newDbKVs;
return;
}
printf("[CheckDB] KV Number. Prev DB:%d Current DB:%d\n", self->dbKVs.size(), newDbKVs.size());
//compare the KV pairs in the DB
printf("---------------------Now print out the diff between the prev DB and current DB----------------------\n");
if ( self->dbKVs.size() >= newDbKVs.size() ) {
for ( auto kv = self->dbKVs.begin(); kv != self->dbKVs.end(); kv++ ) {
bool exist = (newDbKVs.find(kv->first) != newDbKVs.end());
if ( !exist ) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(kv->first).c_str(), getHexString(kv->second).c_str(),
"[Not Exist]");
hasDiff = true;
}
if ( exist && (newDbKVs[kv->first] != self->dbKVs[kv->first]) ) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(kv->first).c_str(), getHexString(kv->second).c_str(),
getHexString(newDbKVs[kv->first]).c_str());
hasDiff = true;
}
}
} else {
for ( auto newKV = newDbKVs.begin(); newKV != newDbKVs.end(); newKV++ ) {
bool exist = (self->dbKVs.find(newKV->first) != self->dbKVs.end());
if ( !exist ) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", "[Not Exist]",
getHexString(newKV->first).c_str(), getHexString(newKV->second).c_str());
hasDiff = true;
}
if ( exist && (newDbKVs[newKV->first] != self->dbKVs[newKV->first]) ) {
printf("\tPrevKey:%s PrevValue:%s newValue:%s\n", getHexString(newKV->first).c_str(), getHexString(self->dbKVs[newKV->first]).c_str(),
getHexString(newDbKVs[newKV->first]).c_str());
hasDiff = true;
}
}
}
int numEntries = 10;
int i = 0;
if ( hasDiff ) {
//print out the first and last 10 entries
printf("\t---Prev DB first and last %d entries\n", numEntries);
auto kv = self->dbKVs.begin();
for ( ; kv != self->dbKVs.end(); kv++ ) {
if ( i >= numEntries )
break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i++, getHexString(kv->first).c_str(), getHexString(kv->second).c_str());
}
i = self->dbKVs.size();
kv = self->dbKVs.end();
for ( --kv; kv != self->dbKVs.begin(); kv-- ) {
if ( i <= self->dbKVs.size() - numEntries )
break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i--, getHexString(kv->first).c_str(), getHexString(kv->second).c_str());
}
printf("\t---Current DB first and last %d entries\n", numEntries);
kv = newDbKVs.begin();
i = 0;
for ( ; kv != newDbKVs.end(); kv++ ) {
if ( i >= numEntries )
break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i++, getHexString(kv->first).c_str(), getHexString(kv->second).c_str());
}
i = newDbKVs.size();
kv = newDbKVs.end();
for ( --kv; kv != newDbKVs.begin(); kv-- ) {
if ( i <= newDbKVs.size() - numEntries )
break;
printf("\t[Entry:%d]Key:%s Value:%s\n", i--, getHexString(kv->first).c_str(), getHexString(kv->second).c_str());
}
}
self->dbKVs = newDbKVs; //update the dbKVs
}
ACTOR static Future<Void> checkDB(Database cx, std::string when, BackupAndParallelRestoreCorrectnessWorkload* self) {
state Key keyPrefix = LiteralStringRef("");
// int numPrint = 20; //number of entries in the front and end to print out.
state Transaction tr(cx);
state int retryCount = 0;
loop {
try {
state Version v = wait( tr.getReadVersion() );
state Standalone<RangeResultRef> data = wait(tr.getRange(firstGreaterOrEqual(doubleToTestKey(0.0, keyPrefix)), firstGreaterOrEqual(doubleToTestKey(1.0, keyPrefix)), std::numeric_limits<int>::max()));
printf("Check DB, at %s. retryCount:%d Data size:%d, rangeResultInfo:%s\n", when.c_str(), retryCount,
data.size(), data.contents().toString().c_str());
compareDBKVs(data, self);
break;
} catch (Error& e) {
retryCount++;
TraceEvent(retryCount > 20 ? SevWarnAlways : SevWarn, "CheckDBError").error(e);
wait(tr.onError(e));
}
}
return Void();
}
virtual std::string description() {
return "BackupAndParallelRestoreCorrectness";
}
@ -383,11 +503,12 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// restore database
TraceEvent("BARW_Restore", randomID).detail("LastBackupContainer", lastBackupContainer->getURL()).detail("RestoreAfter", self->restoreAfter).detail("BackupTag", printable(self->backupTag));
printf("MX:BARW_Restore, LastBackupContainer url:%s BackupTag:%s\n",lastBackupContainer->getURL().c_str(), printable(self->backupTag).c_str() );
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
BackupDescription desc = wait( container->describeBackup() );
Version targetVersion = -1;
state Version targetVersion = -1;
if(desc.maxRestorableVersion.present()) {
if( g_random->random01() < 0.1 ) {
targetVersion = desc.minRestorableVersion.get();
@ -405,12 +526,32 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state int restoreIndex;
// MX: Restore each range by calling backupAgent.restore()
for (restoreIndex = 0; restoreIndex < self->backupRanges.size(); restoreIndex++) {
auto range = self->backupRanges[restoreIndex];
Standalone<StringRef> restoreTag(self->backupTag.toString() + "_" + std::to_string(restoreIndex));
restoreTags.push_back(restoreTag);
restores.push_back(backupAgent.restore(cx, restoreTag, KeyRef(lastBackupContainer->getURL()), true, targetVersion, true, range, Key(), Key(), self->locked));
}
printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size());
state int numTry = 0;
loop {
state Transaction tr1(cx);
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr1.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
printf("Prepare for restore requests. Number of backupRanges:%d, numTry:%d\n", self->backupRanges.size(), numTry++);
for (restoreIndex = 0; restoreIndex < self->backupRanges.size(); restoreIndex++) {
auto range = self->backupRanges[restoreIndex];
Standalone<StringRef> restoreTag(self->backupTag.toString() + "_" + std::to_string(restoreIndex));
restoreTags.push_back(restoreTag);
// restores.push_back(backupAgent.restore(cx, restoreTag, KeyRef(lastBackupContainer->getURL()), true, targetVersion, true, range, Key(), Key(), self->locked));
//MX: restore the key range
struct RestoreRequest restoreRequest(restoreIndex, restoreTag, KeyRef(lastBackupContainer->getURL()), true, targetVersion, true, range, Key(), Key(), self->locked, g_random->randomUniqueID());
tr1.set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
}
tr1.set(restoreRequestTriggerKey, restoreRequestTriggerValue(self->backupRanges.size()));
wait(tr1.commit()); //Trigger MX restore
break;
} catch( Error &e ) {
TraceEvent("SetRestoreRequestError").detail("ErrorInfo", e.what());
wait( tr1.onError(e) );
}
};
printf("MX:Test workload triggers the restore\n");
// Sometimes kill and restart the restore
if(BUGGIFY) {
@ -429,7 +570,42 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
}
}
wait(waitForAll(restores));
// wait(waitForAll(restores)); //MX: Can be removed because we no longer reply on the Future event to mark the finish of restore
// MX: We should wait on all restore before proceeds
printf("Wait for restore to finish\n");
state int waitNum = 0;
loop {
state Transaction tr2(cx);
tr2.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr2.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
TraceEvent("CheckRestoreRequestDoneMX");
state Optional<Value> numFinished = wait(tr2.get(restoreRequestDoneKey));
if ( !numFinished.present() ) { // restore has not been finished yet
if ( waitNum++ % 10 == 0 ) {
TraceEvent("CheckRestoreRequestDone").detail("SecondsOfWait", 5);
printf("Still waiting for restore to finish, has wait for %d seconds\n", waitNum * 5);
}
wait( delay(5.0) );
continue;
}
int num = decodeRestoreRequestDoneValue(numFinished.get());
TraceEvent("RestoreRequestKeyDoneFinished").detail("NumFinished", num);
printf("RestoreRequestKeyDone, numFinished:%d\n", num);
tr2.clear(restoreRequestDoneKey);
wait( tr2.commit() );
break;
} catch( Error &e ) {
TraceEvent("CheckRestoreRequestDoneErrorMX").detail("ErrorInfo", e.what());
wait( tr2.onError(e) );
}
}
printf("MX: Restore is finished\n");
wait(checkDB(cx, "FinishRestore", self));
for (auto &restore : restores) {
assert(!restore.isError());
@ -587,6 +763,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
}
};
int BackupAndParallelRestoreCorrectnessWorkload::backupAgentRequests = 0;
WorkloadFactory<BackupAndParallelRestoreCorrectnessWorkload> BackupAndParallelRestoreCorrectnessWorkloadFactory("BackupAndParallelRestoreCorrectness");

View File

@ -6,7 +6,7 @@ testTitle=BackupAndRestore
expectedRate=0
clearAfterTest=false
; testName=RunRestoreWorkerWorkload
testName=RunRestoreWorkerWorkload
; Test case for parallel restore
testName=BackupAndParallelRestoreCorrectness