FastRestore: conitnue debugging the cyclic test failure error

This commit is contained in:
Meng Xu 2019-03-25 14:09:45 -07:00
parent 1859e684f2
commit e30f5ff70d
2 changed files with 109 additions and 28 deletions

View File

@ -55,7 +55,6 @@ Future<Void> registerMutationsToMasterApplier(Reference<RestoreData> const& rd);
Future<Void> sampleHandler(Reference<RestoreData> const& rd, RestoreCommandInterface const& interf, RestoreCommandInterface const& leaderInter);
Future<Void> receiveSampledMutations(Reference<RestoreData> const& rd, RestoreCommandInterface const& interf);
static Future<Void> finishRestore(Database const& cx, Standalone<VectorRef<RestoreRequest>> const& restoreRequests); // Forward declaration
void parseSerializedMutation(Reference<RestoreData> rd);
void sanityCheckMutationOps(Reference<RestoreData> rd);
void printRestorableFileSet(Optional<RestorableFileSet> files);
void parseSerializedMutation(Reference<RestoreData> rd, bool isSampling = false);
@ -104,7 +103,7 @@ struct StringRefReaderMX {
Error failure_error;
};
bool debug_verbose = false;
bool debug_verbose = true;
////-- Restore code declaration START
@ -796,6 +795,14 @@ struct RestoreData : NonCopyable, public ReferenceCounted<RestoreData> {
processedCmd.clear();
}
vector<UID> getBusyAppliers() {
vector<UID> busyAppliers;
for (auto &app : range2Applier) {
busyAppliers.push_back(app.second);
}
return busyAppliers;
}
RestoreData() {
cmdID.initPhase(RestoreCommandEnum::Init);
localNodeStatus.role = RestoreRole::Invalid;
@ -813,13 +820,13 @@ typedef RestoreData::LoadingState LoadingState;
// Log error message when the command is unexpected
// Use stdout so that correctness test won't report error.
void logUnexpectedCmd(Reference<RestoreData> rd, RestoreCommandEnum current, RestoreCommandEnum received, CMDUID cmdID) {
fprintf(stdout, "[ERROR]Node:%s Log Unexpected Cmd: CurrentCmd:%d(%s), Received cmd:%d(%s), Received CmdUID:%s, Expected cmd:%s\n",
fprintf(stdout, "[WARNING!] Node:%s Log Unexpected Cmd: CurrentCmd:%d(%s), Received cmd:%d(%s), Received CmdUID:%s, Expected cmd:%s\n",
rd->describeNode().c_str(), current, RestoreCommandEnumStr[(int)current], received, RestoreCommandEnumStr[(int)received], cmdID.toString().c_str(), getPreviousCmdStr(current).c_str());
}
// Log message when we receive a command from the old phase
void logExpectedOldCmd(Reference<RestoreData> rd, RestoreCommandEnum current, RestoreCommandEnum received, CMDUID cmdID) {
fprintf(stdout, "[Warning]Node:%s Log Expected Old Cmd: CurrentCmd:%d(%s) Received cmd:%d(%s), Received CmdUID:%s, Expected cmd:%s\n",
fprintf(stdout, "[Warning] Node:%s Log Expected Old Cmd: CurrentCmd:%d(%s) Received cmd:%d(%s), Received CmdUID:%s, Expected cmd:%s\n",
rd->describeNode().c_str(), current, RestoreCommandEnumStr[(int)current], received, RestoreCommandEnumStr[(int)received], cmdID.toString().c_str(), getPreviousCmdStr(current).c_str());
}
@ -1148,6 +1155,13 @@ ACTOR static Future<Void> prepareRestoreFilesV2(Reference<RestoreData> rd, Datab
state Standalone<VectorRef<KeyValueRef>> blockData = wait(parallelFileRestore::decodeRangeFileBlock(inFile, readOffset, readLen));
printf("[VERBOSE_DEBUG] Parse range file and get mutations\n");
int tmpi = 0;
for (tmpi = 0; tmpi < blockData.size(); tmpi++) {
printf("\t[VERBOSE_DEBUG] mutation: key:%s value:%s\n", blockData[tmpi].key.toString().c_str(), blockData[tmpi].value.toString().c_str());
}
// First and last key are the range for this file
state KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
printf("[INFO] RangeFile:%s KeyRange:%s, restoreRange:%s\n",
@ -1161,15 +1175,25 @@ ACTOR static Future<Void> prepareRestoreFilesV2(Reference<RestoreData> rd, Datab
// We know the file range intersects the restore range but there could still be keys outside the restore range.
// Find the subvector of kv pairs that intersect the restore range. Note that the first and last keys are just the range endpoints for this file
int rangeStart = 1;
int rangeEnd = blockData.size() - 1;
// The blockData's first and last entries are metadata, not the real data
int rangeStart = 1; //1
int rangeEnd = blockData.size() -1; //blockData.size() - 1 // Q: the rangeStart and rangeEnd is [,)?
printf("[VERBOSE_DEBUG] Range file decoded blockData\n");
for (auto& data : blockData ) {
printf("\t[VERBOSE_DEBUG] data key:%s val:%s\n", data.key.toString().c_str(), data.value.toString().c_str());
}
// Slide start forward, stop if something in range is found
// Move rangeStart and rangeEnd until they is within restoreRange
while(rangeStart < rangeEnd && !restoreRange.contains(blockData[rangeStart].key))
++rangeStart;
while(rangeStart < rangeEnd && !restoreRange.contains(blockData[rangeStart].key)) {
printf("[VERBOSE_DEBUG] rangeStart:%d key:%s is not in the range:%s\n", rangeStart, blockData[rangeStart].key.toString().c_str(), restoreRange.toString().c_str());
++rangeStart;
}
// Side end backward, stop if something in range is found
while(rangeEnd > rangeStart && !restoreRange.contains(blockData[rangeEnd - 1].key))
--rangeEnd;
while(rangeEnd > rangeStart && !restoreRange.contains(blockData[rangeEnd - 1].key)) {
printf("[VERBOSE_DEBUG] (rangeEnd:%d - 1) key:%s is not in the range:%s\n", rangeEnd, blockData[rangeStart].key.toString().c_str(), restoreRange.toString().c_str());
--rangeEnd;
}
// MX: now data only contains the kv mutation within restoreRange
state VectorRef<KeyValueRef> data = blockData.slice(rangeStart, rangeEnd);
@ -1209,7 +1233,7 @@ ACTOR static Future<Void> prepareRestoreFilesV2(Reference<RestoreData> rd, Datab
for(; i < iend; ++i) {
//MXX: print out the key value version, and operations.
// printf("RangeFile [key:%s, value:%s, version:%ld, op:set]\n", data[i].key.printable().c_str(), data[i].value.printable().c_str(), rangeFile.version);
printf("RangeFile [key:%s, value:%s, version:%ld, op:set]\n", data[i].key.printable().c_str(), data[i].value.printable().c_str(), version);
// TraceEvent("PrintRangeFile_MX").detail("Key", data[i].key.printable()).detail("Value", data[i].value.printable())
// .detail("Version", rangeFile.version).detail("Op", "set");
//// printf("PrintRangeFile_MX: mType:set param1:%s param2:%s param1_size:%d, param2_size:%d\n",
@ -1397,8 +1421,9 @@ ACTOR static Future<Void> prepareRestoreFilesV2(Reference<RestoreData> rd, Datab
printf("%s[PARSE ERROR]!!!! kLen:%d(0x%04x) vLen:%d(0x%04x)\n", prefix.c_str(), kLen, kLen, vLen, vLen);
}
if ( debug_verbose ) {
printf("%s---RegisterBackupMutation[%d]: Version:%016lx Type:%d K:%s V:%s k_size:%d v_size:%d\n", prefix.c_str(),
//if ( debug_verbose ) {
if ( true ) {
printf("%s---LogFile parsed mutations. Prefix:[%d]: Version:%016lx Type:%d K:%s V:%s k_size:%d v_size:%d\n", prefix.c_str(),
kvCount,
commitVersion, type, getHexString(KeyRef(k, kLen)).c_str(), getHexString(KeyRef(v, vLen)).c_str(), kLen, vLen);
}
@ -1445,14 +1470,14 @@ ACTOR static Future<Void> prepareRestoreFilesV2(Reference<RestoreData> rd, Datab
rd->describeNode().c_str(), count, it->first, it->second.size());
}
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
// Mutation types SetValue=0, ClearRange, AddValue, DebugKeyRange, DebugKey, NoOp, And, Or,
// Xor, AppendIfFits, AvailableForReuse, Reserved_For_LogProtocolMessage /* See fdbserver/LogProtocolMessage.h */, Max, Min, SetVersionstampedKey, SetVersionstampedValue,
// ByteMin, ByteMax, MinV2, AndV2, MAX_ATOMIC_OP
printf("[VERBOSE_DEBUG] Node:%s apply mutation:%s\n", rd->describeNode().c_str(), m.toString().c_str());
loop {
try {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -1789,11 +1814,11 @@ ACTOR Future<Void> assignKeyRangeToAppliers(Reference<RestoreData> rd, Database
}
state std::vector<Future<RestoreCommandReply>> cmdReplies;
loop {
wait(delay(1.0));
try {
cmdReplies.clear();
rd->cmdID.initPhase(RestoreCommandEnum::Assign_Applier_KeyRange);
state std::vector<Future<RestoreCommandReply>> cmdReplies;
for (auto& applier : appliers) {
KeyRangeRef keyRange = applier.second;
UID nodeID = applier.first;
@ -1814,6 +1839,23 @@ ACTOR Future<Void> assignKeyRangeToAppliers(Reference<RestoreData> rd, Database
reps[i].toString().c_str());
}
break;
} catch (Error &e) {
// TODO: Handle the command reply timeout error
if (e.code() != error_code_io_timeout) {
fprintf(stderr, "[ERROR] Node:%s, Commands before cmdID:%s timeout\n", rd->describeNode().c_str(), rd->cmdID.toString().c_str());
} else {
fprintf(stderr, "[ERROR] Node:%s, Commands before cmdID:%s error. error code:%d, error message:%s\n", rd->describeNode().c_str(),
rd->cmdID.toString().c_str(), e.code(), e.what());
}
//fprintf(stderr, "[ERROR] WE STOP HERE FOR DEBUG\n");
//break;
}
}
loop {
//wait(delay(1.0));
try {
cmdReplies.clear();
rd->cmdID.initPhase(RestoreCommandEnum::Assign_Applier_KeyRange_Done);
for (auto& applier : appliers) {
@ -1825,7 +1867,7 @@ ACTOR Future<Void> assignKeyRangeToAppliers(Reference<RestoreData> rd, Database
cmdReplies.push_back( cmdInterf.cmd.getReply(RestoreCommand(RestoreCommandEnum::Assign_Applier_KeyRange_Done, rd->cmdID, nodeID)) );
}
std::vector<RestoreCommandReply> reps = wait( timeoutError(getAll(cmdReplies), FastRestore_Failure_Timeout) );
std::vector<RestoreCommandReply> reps = wait( timeoutError(getAll(cmdReplies), FastRestore_Failure_Timeout) );
for (int i = 0; i < reps.size(); ++i) {
printf("[INFO] Assign_Applier_KeyRange_Done: Get reply:%s\n",
reps[i].toString().c_str());
@ -1840,10 +1882,9 @@ ACTOR Future<Void> assignKeyRangeToAppliers(Reference<RestoreData> rd, Database
fprintf(stderr, "[ERROR] Node:%s, Commands before cmdID:%s error. error code:%d, error message:%s\n", rd->describeNode().c_str(),
rd->cmdID.toString().c_str(), e.code(), e.what());
}
fprintf(stderr, "[ERROR] WE STOP HERE FOR DEBUG\n");
break;
//fprintf(stderr, "[ERROR] WE STOP HERE FOR DEBUG\n");
//break;
}
}
return Void();
@ -2156,9 +2197,11 @@ ACTOR Future<Void> receiveMutations(Reference<RestoreData> rd, RestoreCommandInt
}
if ( req.cmd == RestoreCommandEnum::Loader_Send_Mutations_To_Applier ) {
ASSERT(req.cmd == (RestoreCommandEnum) req.cmdID.phase);
printf("[VERBOSE_DEBUG] Node:%s receive mutation:%s\n", rd->describeNode().c_str(), req.mutation.toString().c_str());
// Handle duplicat cmd
if ( rd->isCmdProcessed(req.cmdID) ) {
printf("[DEBUG] NODE:%s skip duplicate cmd:%s\n", rd->describeNode().c_str(), req.cmdID.toString().c_str());
printf("[DEBUG] Skipped mutation:%s\n", req.mutation.toString().c_str());
req.reply.send(RestoreCommandReply(interf.id(), req.cmdID));
continue;
}
@ -3045,7 +3088,9 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(RestoreCommandInterf
try {
// Notify appliers the end of the loading
printf("[INFO][Master] Notify appliers the end of loading\n");
applierIDs = getApplierIDs(rd);
//applierIDs = getApplierIDs(rd);
// Only the appliers that are responsible for a key range should be sent result
applierIDs = rd->getBusyAppliers();
cmdReplies.clear();
rd->cmdID.initPhase(RestoreCommandEnum::Loader_Send_Mutations_To_Applier_Done);
for (auto& id : applierIDs) {
@ -3263,6 +3308,8 @@ ACTOR Future<Void> loadingHandler(Reference<RestoreData> rd, RestoreCommandInter
return Void();
}
// Loader: sample's loading handler
ACTOR Future<Void> sampleHandler(Reference<RestoreData> rd, RestoreCommandInterface interf, RestoreCommandInterface leaderInter) {
printf("[sampleHandler] Worker Node:%s starts\n",
@ -3904,6 +3951,22 @@ ACTOR static Future<Version> processRestoreRequest(RestoreCommandInterface inter
// lock DB for restore
wait( _lockDB(cx, randomUid, lockDB) );
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->clear(normalKeys);
tr->commit();
break;
} catch(Error &e) {
printf("[ERROR] At clean up DB before restore. error code:%d message:%s. Retry...\n", e.code(), e.what());
if(e.code() != error_code_restore_duplicate_tag) {
wait(tr->onError(e));
}
}
}
// Step: Collect all backup files
loop {
try {
@ -3957,8 +4020,8 @@ ACTOR static Future<Version> processRestoreRequest(RestoreCommandInterface inter
printf("[DEBUG][Batch:%d] Calculate backup files for a version batch: endVersion:%lld isRange:%d validVersion:%d curWorkloadSize:%.2fB curBackupFilesBeginIndex:%d curBackupFilesEndIndex:%d, files.size:%d\n",
restoreBatchIndex, endVersion, isRange, validVersion, curWorkloadSize, curBackupFilesBeginIndex, curBackupFilesEndIndex, rd->allFiles.size());
}
if ( (validVersion && curWorkloadSize >= loadBatchSizeThresholdB) || curBackupFilesEndIndex > rd->allFiles.size()-1 ) {
if ( curBackupFilesEndIndex > rd->allFiles.size()-1 && curWorkloadSize <= 0 ) {
if ( (validVersion && curWorkloadSize >= loadBatchSizeThresholdB) || curBackupFilesEndIndex >= rd->allFiles.size() ) {
if ( curBackupFilesEndIndex >= rd->allFiles.size() && curWorkloadSize <= 0 ) {
printf("Restore finishes: curBackupFilesEndIndex:%d, allFiles.size:%d, curWorkloadSize:%.2f\n",
curBackupFilesEndIndex, rd->allFiles.size(), curWorkloadSize);
break;
@ -3966,12 +4029,10 @@ ACTOR static Future<Version> processRestoreRequest(RestoreCommandInterface inter
//TODO: Construct the files [curBackupFilesBeginIndex, curBackupFilesEndIndex]
rd->files.clear();
rd->resetPerVersionBatch();
if ( curBackupFilesBeginIndex != curBackupFilesEndIndex ) {
if ( curBackupFilesBeginIndex < rd->allFiles.size()) {
for (int fileIndex = curBackupFilesBeginIndex; fileIndex <= curBackupFilesEndIndex && fileIndex < rd->allFiles.size(); fileIndex++) {
rd->files.push_back(rd->allFiles[fileIndex]);
}
} else {
rd->files.push_back(rd->allFiles[curBackupFilesBeginIndex]);
}
printBackupFilesInfo(rd);
@ -4502,12 +4563,14 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreData> rd) {
state MutationRef kvm;
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
kvm = kvOp->second[mIndex];
printf("[VERBOSE_DEBUG] mutation to sent to applier, mutation:%s\n", kvm.toString().c_str());
// Send the mutation to applier
if (isRangeMutation(kvm)) {
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
state Standalone<VectorRef<MutationRef>> mvector;
state Standalone<VectorRef<UID>> nodeIDs;
// '' Bug may be here! The splitMutation() may be wrong!
splitMutation(rd, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
@ -4517,6 +4580,7 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreData> rd) {
applierCmdInterf = rd->workers_interface[applierID];
rd->cmdID.nextCmd();
printf("[VERBOSE_DEBUG] mutation:%s\n", mutation.toString().c_str());
cmdReplies.push_back(applierCmdInterf.cmd.getReply(
RestoreCommand(RestoreCommandEnum::Loader_Send_Mutations_To_Applier, rd->cmdID, applierID, commitVersion, mutation)));
@ -4612,6 +4676,7 @@ ACTOR Future<Void> registerMutationsToMasterApplier(Reference<RestoreData> rd) {
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
kvm = kvOp->second[mIndex];
rd->cmdID.nextCmd();
printf("[VERBOSE_DEBUG] send mutation to applier, mutation:%s\n", kvm.toString().c_str());
cmdReplies.push_back(applierCmdInterf.cmd.getReply(
RestoreCommand(RestoreCommandEnum::Loader_Send_Sample_Mutation_To_Applier, rd->cmdID, applierID, commitVersion, kvm)));
packMutationNum++;
@ -4734,7 +4799,8 @@ ACTOR Future<Void> notifyApplierToApplyMutations(Reference<RestoreData> rd) {
state int packMutationThreshold = 1;
state int kvCount = 0;
state std::vector<Future<RestoreCommandReply>> cmdReplies;
state std::vector<UID> applierIDs = getApplierIDs(rd);
//state std::vector<UID> applierIDs = getApplierIDs(rd);
state std::vector<UID> applierIDs = rd->getBusyAppliers();
state int applierIndex = 0;
state UID applierID;
state RestoreCommandInterface applierCmdInterf;

View File

@ -115,6 +115,9 @@ struct CycleWorkload : TestWorkload {
tr.set( self->key(r), self->value(r3) );
tr.set( self->key(r2), self->value(r4) );
tr.set( self->key(r3), self->value(r2) );
TraceEvent("CyclicTestMX").detail("Key", self->key(r).toString()).detail("Value", self->value(r3).toString());
TraceEvent("CyclicTestMX").detail("Key", self->key(r2).toString()).detail("Value", self->value(r4).toString());
TraceEvent("CyclicTestMX").detail("Key", self->key(r3).toString()).detail("Value", self->value(r2).toString());
wait( tr.commit() );
//TraceEvent("CycleCommit");
@ -134,8 +137,19 @@ struct CycleWorkload : TestWorkload {
throw;
}
}
void logTestData(const VectorRef<KeyValueRef>& data) {
TraceEvent("MXTestFailureDetail");
int index = 0;
for(auto &entry : data) {
TraceEvent("CurrentDataEntry").detail("Index", index).detail("Key", entry.key.toString()).detail("Value", entry.value.toString());
index++;
}
}
bool cycleCheckData( const VectorRef<KeyValueRef>& data, Version v ) {
if (data.size() != nodeCount) {
logTestData(data);
TraceEvent(SevError, "TestFailure").detail("Reason", "Node count changed").detail("Before", nodeCount).detail("After", data.size()).detail("Version", v).detail("KeyPrefix", keyPrefix.printable());
TraceEvent(SevError, "TestFailureInfo").detail("DataSize", data.size()).detail("NodeCount", nodeCount).detail("Workload", description());
return false;
@ -144,6 +158,7 @@ struct CycleWorkload : TestWorkload {
for(int c=0; c<nodeCount; c++) {
if (c && !i) {
TraceEvent(SevError, "TestFailure").detail("Reason", "Cycle got shorter").detail("Before", nodeCount).detail("After", c).detail("KeyPrefix", keyPrefix.printable());
logTestData(data);
return false;
}
if (data[i].key != key(i)) {