FastRestore: Fix splitMutation bug

This commit is contained in:
Meng Xu 2019-05-13 17:24:57 -07:00
parent c7cd758e01
commit 76dd8dc8a8
4 changed files with 82 additions and 24 deletions

View File

@ -255,10 +255,10 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorRequ
MutationRef mutation = mutations[mIndex];
self->kvOps[commitVersion].push_back_deep(self->kvOps[commitVersion].arena(), mutation);
numMutations++;
if ( numMutations % 100000 == 1 ) { // Should be different value in simulation and in real mode
//if ( numMutations % 100000 == 1 ) { // Should be different value in simulation and in real mode
printf("[INFO][Applier] Node:%s Receives %d mutations. cur_mutation:%s\n",
self->describeNode().c_str(), numMutations, mutation.toString().c_str());
}
//}
}
req.reply.send(RestoreCommonReply(self->id(), req.cmdID));
@ -272,7 +272,7 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorRequ
ACTOR Future<Void> handleSendSampleMutationVectorRequest(RestoreSendMutationVectorRequest req, Reference<RestoreApplierData> self) {
state int numMutations = 0;
self->numSampledMutations = 0;
// NOTE: We have insert operation to self->kvOps. For the same worker, we should only allow one actor of this kind to run at any time!
// Otherwise, race condition may happen!
while (self->isInProgress(RestoreCommandEnum::Loader_Send_Sample_Mutation_To_Applier)) {

View File

@ -44,7 +44,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(Reference<RestoreLo
ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self);
void parseSerializedMutation(Reference<RestoreLoaderData> self, bool isSampling);
bool isRangeMutation(MutationRef m);
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef> mvector, Arena& nodeIDs_arena, VectorRef<UID> nodeIDs) ;
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) ;
ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreLoaderInterface loaderInterf, Database cx) {
@ -530,6 +530,8 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
//state double mutationVectorThreshold = 1;//1024 * 10; // Bytes.
state std::map<UID, Standalone<VectorRef<MutationRef>>> applierMutationsBuffer; // The mutation vector to be sent to each applier
state std::map<UID, double> applierMutationsSize; // buffered mutation vector size for each applier
state Standalone<VectorRef<MutationRef>> mvector;
state Standalone<VectorRef<UID>> nodeIDs;
// Initialize the above two maps
state std::vector<UID> applierIDs = self->getWorkingApplierIDs();
loop {
@ -556,18 +558,28 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
printf("[VERBOSE_DEBUG] mutation to sent to applier, mutation:%s\n", kvm.toString().c_str());
}
// Send the mutation to applier
if (isRangeMutation(kvm)) {
if (isRangeMutation(kvm) && false) { // MX: Use false to skip the range mutation handling
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
state Standalone<VectorRef<MutationRef>> mvector;
state Standalone<VectorRef<UID>> nodeIDs;
mvector.pop_front(mvector.size());
nodeIDs.pop_front(nodeIDs.size());
//state std::map<Standalone<MutationRef>, UID> m2appliers;
// '' Bug may be here! The splitMutation() may be wrong!
splitMutation(self, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
// m2appliers = splitMutationv2(self, kvm);
// // convert m2appliers to mvector and nodeIDs
// for (auto& m2applier : m2appliers) {
// mvector.push_back(m2applier.first);
// nodeIDs.push_back(m2applier.second);
// }
printf("SPLITMUTATION: mvector.size:%d\n", mvector.size());
ASSERT(mvector.size() == nodeIDs.size());
for (splitMutationIndex = 0; splitMutationIndex < mvector.size(); splitMutationIndex++ ) {
MutationRef mutation = mvector[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
printf("SPLITTED MUTATION: %d: mutation:%s\n", splitMutationIndex, mutation.toString().c_str());
applierCmdInterf = self->appliersInterf[applierID];
applierMutationsBuffer[applierID].push_back(applierMutationsBuffer[applierID].arena(), mutation); // Q: Maybe push_back_deep()?
applierMutationsSize[applierID] += mutation.expectedSize();
@ -577,13 +589,14 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
for (auto &applierID : applierIDs) {
if ( applierMutationsSize[applierID] >= mutationVectorThreshold ) {
state int tmpNumMutations = applierMutationsBuffer[applierID].size();
self->cmdID.nextCmd();
cmdReplies.push_back(applierCmdInterf.sendMutationVector.getReply(
RestoreSendMutationVectorRequest(self->cmdID, commitVersion, applierMutationsBuffer[applierID])));
applierMutationsBuffer[applierID].pop_front(applierMutationsBuffer[applierID].size());
applierMutationsSize[applierID] = 0;
printf("[INFO][Loader] Waits for applier to receive %ld range mutations\n", cmdReplies.size());
printf("[INFO][Loader] Waits for applier:%s to receive %ld range mutations\n", applierID.toString().c_str(), tmpNumMutations);
std::vector<RestoreCommonReply> reps = wait( timeoutError( getAll(cmdReplies), FastRestore_Failure_Timeout ) );
cmdReplies.clear();
}
@ -655,48 +668,88 @@ ACTOR Future<Void> registerMutationsToApplier(Reference<RestoreLoaderData> self)
return Void();
}
// std::map<Standalone<MutationRef>, UID> splitMutationv2(Reference<RestoreLoaderData> self, MutationRef m) {
// std::map<Standalone<MutationRef>, UID> m2appliers;
// // key range [m->param1, m->param2)
// //std::map<Standalone<KeyRef>, UID>;
// printf("SPLITMUTATION: mutation:%s\n", m.toString().c_str());
// std::map<Standalone<KeyRef>, UID>::iterator itlow, itup; //we will return [itlow, itup)
// itlow = self->range2Applier.lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
// itup = self->range2Applier.upper_bound(m.param2); // upper_bound returns the iterator that is > m.param2; return rmap::end if no keys are considered to go after m.param2.
// printf("SPLITMUTATION: itlow_key:%s itup_key:%s\n", itlow->first.toString().c_str(), itup->first.toString().c_str());
// ASSERT( itup == self->range2Applier.end() || itup->first >= m.param2 );
// while (itlow != itup) {
// MutationRef curm; //current mutation
// curm.type = m.type;
// curm.param1 = itlow->first;
// itlow++;
// if (itlow == self->range2Applier.end()) {
// curm.param2 = normalKeys.end;
// } else {
// curm.param2 = itlow->first;
// }
// printf("SPLITMUTATION: m2appliers.push_back:%s\n", curm.toString().c_str());
// m2appliers[curm] = itlow->second;
// }
// printf("SPLITMUTATION: m2appliers.size:%d\n", m2appliers.size());
// return m2appliers;
// }
// TODO: Add a unit test for this function
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef> mvector, Arena& nodeIDs_arena, VectorRef<UID> nodeIDs) {
void splitMutation(Reference<RestoreLoaderData> self, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
// key range [m->param1, m->param2)
//std::map<Standalone<KeyRef>, UID>;
printf("SPLITMUTATION: orignal mutation:%s\n", m.toString().c_str());
std::map<Standalone<KeyRef>, UID>::iterator itlow, itup; //we will return [itlow, itup)
itlow = self->range2Applier.lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
if ( itlow != self->range2Applier.begin()) { // m.param1 is not the smallest key \00
if ( itlow != self->range2Applier.begin() && itlow->first > m.param1 ) { // m.param1 is not the smallest key \00
// (itlow-1) is the node whose key range includes m.param1
--itlow;
} else {
if (m.param1 != LiteralStringRef("\00")) {
if ( m.param1 != LiteralStringRef("\00") || itlow->first != m.param1 ) { // MX: This is useless
printf("[ERROR] splitMutation has bug on range mutation:%s\n", m.toString().c_str());
}
}
itup = self->range2Applier.upper_bound(m.param2); // upper_bound returns the iterator that is > m.param2; return rmap::end if no keys are considered to go after m.param2.
printf("SPLITMUTATION: itlow_key:%s itup_key:%s\n", itlow->first.toString().c_str(), itup == self->range2Applier.end() ? "[end]" : itup->first.toString().c_str());
ASSERT( itup == self->range2Applier.end() || itup->first >= m.param2 );
// Now adjust for the case: example: mutation range is [a, d); we have applier's ranges' inclusive lower bound values are: a, b, c, d, e; upper_bound(d) returns itup to e, but we want itup to d.
--itup;
ASSERT( itup->first <= m.param2 );
if ( itup->first < m.param2 ) {
++itup; //make sure itup is >= m.param2, that is, itup is the next key range >= m.param2
}
//--itup;
//ASSERT( itup->first <= m.param2 );
// if ( itup->first < m.param2 ) {
// ++itup; //make sure itup is >= m.param2, that is, itup is the next key range >= m.param2
// }
while (itlow->first < itup->first) {
MutationRef curm; //current mutation
while (itlow != itup) {
Standalone<MutationRef> curm; //current mutation
curm.type = m.type;
curm.param1 = itlow->first;
itlow++;
if (itlow == self->range2Applier.end()) {
curm.param2 = normalKeys.end;
if (itlow == itup) {
ASSERT( m.param2 <= normalKeys.end );
curm.param2 = m.param2;
} else if ( m.param2 < itlow->first ) {
curm.param2 = m.param2;
} else {
curm.param2 = itlow->first;
}
mvector.push_back(mvector_arena, curm);
printf("SPLITMUTATION: mvector.push_back:%s\n", curm.toString().c_str());
ASSERT( curm.param1 <= curm.param2 );
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itlow->second);
}
printf("SPLITMUTATION: mvector.size:%d\n", mvector.size());
return;
}

View File

@ -132,11 +132,16 @@ public:
}
void printAppliersKeyRange() {
printf("[INFO] The mapping of KeyRange_start --> Applier ID\n");
printf("[INFO] The mapping of KeyRange_start --> Applier ID: getHexString\n");
// applier type: std::map<Standalone<KeyRef>, UID>
for (auto &applier : range2Applier) {
printf("\t[INFO]%s -> %s\n", getHexString(applier.first).c_str(), applier.second.toString().c_str());
}
printf("[INFO] The mapping of KeyRange_start --> Applier ID: toString\n");
// applier type: std::map<Standalone<KeyRef>, UID>
for (auto &applier : range2Applier) {
printf("\t[INFO]%s -> %s\n", applier.first.toString().c_str(), applier.second.toString().c_str());
}
}
};

View File

@ -290,7 +290,7 @@ struct RestoreLoadFileRequest : TimedRequest {
struct RestoreSendMutationVectorRequest : TimedRequest {
CMDUID cmdID;
uint64_t commitVersion;
VectorRef<MutationRef> mutations;
Standalone<VectorRef<MutationRef>> mutations;
ReplyPromise<RestoreCommonReply> reply;