FastRestore:Fix bug in finish restore
RestoreMaster may not receive all acks. for the last command, i.e., finishRestore, because RestoreLoaders and RestoreAppliers exit immediately after sending the ack. If the ack is lost, it will not be resent. This commit also removes some unneeded code. This commit passes 50k random tests without errors.
This commit is contained in:
parent
3fcb6ec0a1
commit
022b555b69
|
@ -648,18 +648,6 @@ const int decodeRestoreRequestTriggerValue( ValueRef const& value ) {
|
|||
}
|
||||
|
||||
// restoreRequestDone key
|
||||
const Value restoreRequestDoneValue (int const numRequests) {
|
||||
BinaryWriter wr(IncludeVersion());
|
||||
wr << numRequests;
|
||||
return wr.toValue();
|
||||
}
|
||||
const int decodeRestoreRequestDoneValue( ValueRef const& value ) {
|
||||
int s;
|
||||
BinaryReader reader( value, IncludeVersion() );
|
||||
reader >> s;
|
||||
return s;
|
||||
}
|
||||
|
||||
const Value restoreRequestDoneVersionValue (Version readVersion) {
|
||||
BinaryWriter wr(IncludeVersion());
|
||||
wr << readVersion;
|
||||
|
@ -672,7 +660,6 @@ Version decodeRestoreRequestDoneVersionValue( ValueRef const& value ) {
|
|||
return v;
|
||||
}
|
||||
|
||||
|
||||
const Key restoreRequestKeyFor( int const& index ) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
wr.serializeBytes( restoreRequestKeys.begin );
|
||||
|
|
|
@ -276,31 +276,23 @@ extern const KeyRef mustContainSystemMutationsKey;
|
|||
// Key range reserved for storing changes to monitor conf files
|
||||
extern const KeyRangeRef monitorConfKeys;
|
||||
|
||||
// Fast restore
|
||||
extern const KeyRef restoreLeaderKey;
|
||||
extern const KeyRangeRef restoreWorkersKeys;
|
||||
|
||||
extern const KeyRef restoreStatusKey;
|
||||
|
||||
extern const KeyRef restoreStatusKey; // To be used when we measure fast restore performance
|
||||
extern const KeyRef restoreRequestTriggerKey;
|
||||
extern const KeyRef restoreRequestDoneKey;
|
||||
extern const KeyRangeRef restoreRequestKeys;
|
||||
|
||||
const Key restoreWorkerKeyFor( UID const& workerID );
|
||||
|
||||
const Value restoreWorkerInterfaceValue(RestoreWorkerInterface const& server );
|
||||
RestoreWorkerInterface decodeRestoreWorkerInterfaceValue( ValueRef const& value );
|
||||
|
||||
// Fast restore
|
||||
const Value restoreRequestTriggerValue (int const numRequests);
|
||||
const int decodeRestoreRequestTriggerValue( ValueRef const& value );
|
||||
const Value restoreRequestDoneValue (int const numRequests);
|
||||
const int decodeRestoreRequestDoneValue( ValueRef const& value );
|
||||
const Value restoreRequestDoneVersionValue (Version readVersion);
|
||||
Version decodeRestoreRequestDoneVersionValue( ValueRef const& value );
|
||||
const Key restoreRequestKeyFor( int const& index );
|
||||
const Value restoreRequestValue( RestoreRequest const& server );
|
||||
RestoreRequest decodeRestoreRequestValue( ValueRef const& value );
|
||||
|
||||
const Key restoreStatusKeyFor( StringRef statusType);
|
||||
const Value restoreStatusValue( double const& val );
|
||||
|
||||
|
|
|
@ -451,7 +451,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
|
||||
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
|
||||
|
||||
|
||||
if(clientKnobs)
|
||||
clientKnobs->IS_ACCEPTABLE_DELAY = clientKnobs->IS_ACCEPTABLE_DELAY*std::min(MAX_READ_TRANSACTION_LIFE_VERSIONS, MAX_WRITE_TRANSACTION_LIFE_VERSIONS)/(5.0*VERSIONS_PER_SECOND);
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@
|
|||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/MutationList.h"
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
|
||||
#include "fdbserver/RestoreCommon.actor.h"
|
||||
#include "fdbserver/RestoreUtil.h"
|
||||
#include "fdbserver/RestoreRoleCommon.actor.h"
|
||||
|
@ -35,8 +34,8 @@
|
|||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVersionedRequest req, Reference<RestoreApplierData> self);
|
||||
ACTOR Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self, Database cx);
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVersionedRequest req, Reference<RestoreApplierData> self);
|
||||
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self, Database cx);
|
||||
|
||||
ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, RestoreApplierInterface applierInterf, Database cx) {
|
||||
state ActorCollection actors(false);
|
||||
|
@ -72,7 +71,7 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
|
|||
}
|
||||
when ( RestoreVersionBatchRequest req = waitNext(applierInterf.finishRestore.getFuture()) ) {
|
||||
requestTypeStr = "finishRestore";
|
||||
exitRole = handleFinishRestoreRequest(req, self, cx);
|
||||
exitRole = handleFinishRestoreRequest(req, self);
|
||||
}
|
||||
when ( wait(exitRole) ) {
|
||||
TraceEvent("FastRestore").detail("RestoreApplierCore", "ExitRole").detail("NodeID", self->id());
|
||||
|
@ -91,17 +90,12 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
|
|||
// The actor may be invovked multiple times and executed async.
|
||||
// No race condition as long as we do not wait or yield when operate the shared data, it should be fine,
|
||||
// because all actors run on 1 thread.
|
||||
ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVersionedRequest req, Reference<RestoreApplierData> self) {
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVersionedRequest req, Reference<RestoreApplierData> self) {
|
||||
state int numMutations = 0;
|
||||
|
||||
TraceEvent("FastRestore").detail("ApplierNode", self->id())
|
||||
.detail("LogVersion", self->logVersion.get()).detail("RangeVersion", self->rangeVersion.get())
|
||||
.detail("Request", req.toString());
|
||||
if ( debug_verbose ) {
|
||||
// NOTE: Print out the current version and received req is helpful in debugging
|
||||
printf("[VERBOSE_DEBUG] handleSendMutationVectorRequest Node:%s at rangeVersion:%ld logVersion:%ld receive mutation number:%d, req:%s\n",
|
||||
self->describeNode().c_str(), self->rangeVersion.get(), self->logVersion.get(), req.mutations.size(), req.toString().c_str());
|
||||
}
|
||||
|
||||
if ( req.isRangeFile ) {
|
||||
wait( self->rangeVersion.whenAtLeast(req.prevVersion) );
|
||||
|
@ -114,7 +108,6 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
|
||||
state Version commitVersion = req.version;
|
||||
VectorRef<MutationRef> mutations(req.mutations);
|
||||
// printf("[DEBUG] Node:%s receive %d mutations at version:%ld\n", self->describeNode().c_str(), mutations.size(), commitVersion);
|
||||
if ( self->kvOps.find(commitVersion) == self->kvOps.end() ) {
|
||||
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
|
||||
}
|
||||
|
@ -123,10 +116,6 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
MutationRef mutation = mutations[mIndex];
|
||||
self->kvOps[commitVersion].push_back_deep(self->kvOps[commitVersion].arena(), mutation);
|
||||
numMutations++;
|
||||
//if ( numMutations % 100000 == 1 ) { // Should be different value in simulation and in real mode
|
||||
// printf("[INFO][Applier] Node:%s Receives %d mutations. cur_mutation:%s\n",
|
||||
// self->describeNode().c_str(), numMutations, mutation.toString().c_str());
|
||||
//}
|
||||
}
|
||||
|
||||
// Notify the same actor and unblock the request at the next version
|
||||
|
@ -142,12 +131,10 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
}
|
||||
|
||||
ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
|
||||
state bool isPrint = false; //Debug message
|
||||
state std::string typeStr = "";
|
||||
|
||||
// Assume the process will not crash when it apply mutations to DB. The reply message can be lost though
|
||||
if (self->kvOps.empty()) {
|
||||
printf("Node:%s kvOps is empty. No-op for apply to DB\n", self->describeNode().c_str());
|
||||
TraceEvent("FastRestore").detail("ApplierApplyToDBEmpty", self->id());
|
||||
return Void();
|
||||
}
|
||||
|
@ -159,10 +146,6 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
|
||||
self->sanityCheckMutationOps();
|
||||
|
||||
if ( debug_verbose ) {
|
||||
TraceEvent("ApplyKVOPsToDB").detail("MapSize", self->kvOps.size());
|
||||
printf("ApplyKVOPsToDB num_of_version:%ld\n", self->kvOps.size());
|
||||
}
|
||||
state std::map<Version, Standalone<VectorRef<MutationRef>>>::iterator it = self->kvOps.begin();
|
||||
state std::map<Version, Standalone<VectorRef<MutationRef>>>::iterator prevIt = it;
|
||||
state int index = 0;
|
||||
|
@ -181,27 +164,13 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
for ( ; it != self->kvOps.end(); ++it ) {
|
||||
numVersion++;
|
||||
//TraceEvent("FastRestore").detail("Applier", self->id()).detail("ApplyKVsToDBVersion", it->first);
|
||||
if ( debug_verbose ) {
|
||||
TraceEvent("ApplyKVOPsToDB\t").detail("Version", it->first).detail("OpNum", it->second.size());
|
||||
}
|
||||
//printf("ApplyKVOPsToDB numVersion:%d Version:%08lx num_of_ops:%d, \n", numVersion, it->first, it->second.size());
|
||||
|
||||
state MutationRef m;
|
||||
for ( ; index < it->second.size(); ++index ) {
|
||||
m = it->second[index];
|
||||
if ( m.type >= MutationRef::Type::SetValue && m.type <= MutationRef::Type::MAX_ATOMIC_OP )
|
||||
typeStr = typeString[m.type];
|
||||
else {
|
||||
printf("ApplyKVOPsToDB MutationType:%d is out of range\n", m.type);
|
||||
}
|
||||
|
||||
if ( debug_verbose && count % 1000 == 0 ) {
|
||||
printf("ApplyKVOPsToDB Node:%s num_mutation:%d Version:%08lx num_of_ops to apply:%d\n",
|
||||
self->describeNode().c_str(), count, it->first, it->second.size());
|
||||
}
|
||||
|
||||
if ( debug_verbose ) {
|
||||
printf("[VERBOSE_DEBUG] Node:%s apply mutation:%s\n", self->describeNode().c_str(), m.toString().c_str());
|
||||
TraceEvent(SevError, "FastRestore").detail("InvalidMutationType", m.type);
|
||||
}
|
||||
|
||||
if ( m.type == MutationRef::SetValue ) {
|
||||
|
@ -212,7 +181,7 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
} else if ( isAtomicOp((MutationRef::Type) m.type) ) {
|
||||
tr->atomicOp(m.param1, m.param2, m.type);
|
||||
} else {
|
||||
printf("[WARNING] mtype:%d (%s) unhandled\n", m.type, typeStr.c_str());
|
||||
TraceEvent(SevError, "FastRestore").detail("UnhandledMutationType", m.type).detail("TypeName", typeStr);
|
||||
}
|
||||
++count;
|
||||
transactionSize += m.expectedSize();
|
||||
|
@ -226,17 +195,6 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
prevIndex = index;
|
||||
transactionSize = 0;
|
||||
}
|
||||
|
||||
if ( isPrint ) {
|
||||
printf("\tApplyKVOPsToDB Version:%016lx MType:%s K:%s, V:%s K_size:%d V_size:%d\n", it->first, typeStr.c_str(),
|
||||
getHexString(m.param1).c_str(), getHexString(m.param2).c_str(), m.param1.size(), m.param2.size());
|
||||
|
||||
TraceEvent("ApplyKVOPsToDB\t\t").detail("Version", it->first)
|
||||
.detail("MType", m.type).detail("MTypeStr", typeStr)
|
||||
.detail("MKey", getHexString(m.param1))
|
||||
.detail("MValueSize", m.param2.size())
|
||||
.detail("MValue", getHexString(m.param2));
|
||||
}
|
||||
}
|
||||
|
||||
if ( transactionSize > 0 ) { // the commit batch should NOT across versions
|
||||
|
@ -256,7 +214,6 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
}
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
printf("ApplyKVOPsToDB transaction error:%s.\n", e.what());
|
||||
wait(tr->onError(e));
|
||||
it = prevIt;
|
||||
index = prevIndex;
|
||||
|
@ -265,15 +222,13 @@ ACTOR Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVectorVers
|
|||
}
|
||||
|
||||
self->kvOps.clear();
|
||||
printf("Node:%s ApplyKVOPsToDB number of kv mutations:%d\n", self->describeNode().c_str(), count);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self, Database cx) {
|
||||
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self, Database cx) {
|
||||
TraceEvent("FastRestore").detail("ApplierApplyToDB", self->id()).detail("DBApplierPresent", self->dbApplier.present());
|
||||
if ( !self->dbApplier.present() ) {
|
||||
//self->dbApplier = Never();
|
||||
self->dbApplier = applyToDB(self, cx);
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,6 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
|
|||
// range2Applier is in master and loader node. Loader node uses this to determine which applier a mutation should be sent
|
||||
std::map<Standalone<KeyRef>, UID> range2Applier; // KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
|
||||
std::map<Standalone<KeyRef>, int> keyOpsCount; // The number of operations per key which is used to determine the key-range boundary for appliers
|
||||
int numSampledMutations; // The total number of mutations received from sampled data.
|
||||
|
||||
// For master applier to hold the lower bound of key ranges for each appliers
|
||||
std::vector<Standalone<KeyRef>> keyRangeLowerBounds;
|
||||
|
@ -93,17 +92,8 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
|
|||
if (kvOps.empty())
|
||||
return;
|
||||
|
||||
if ( isKVOpsSorted() ) {
|
||||
printf("[CORRECT] KVOps is sorted by version\n");
|
||||
} else {
|
||||
printf("[ERROR]!!! KVOps is NOT sorted by version\n");
|
||||
}
|
||||
|
||||
if ( allOpsAreKnown() ) {
|
||||
printf("[CORRECT] KVOps all operations are known.\n");
|
||||
} else {
|
||||
printf("[ERROR]!!! KVOps has unknown mutation op. Exit...\n");
|
||||
}
|
||||
ASSERT_WE_THINK( isKVOpsSorted() );
|
||||
ASSERT_WE_THINK( allOpsAreKnown() );
|
||||
}
|
||||
|
||||
bool isKVOpsSorted() {
|
||||
|
@ -127,61 +117,13 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
|
|||
|| isAtomicOp((MutationRef::Type) m->type) )
|
||||
continue;
|
||||
else {
|
||||
printf("[ERROR] Unknown mutation type:%d\n", m->type);
|
||||
TraceEvent(SevError, "FastRestore").detail("UnknownMutationType", m->type);
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
std::vector<Standalone<KeyRef>> calculateAppliersKeyRanges(int numAppliers) {
|
||||
ASSERT(numAppliers > 0);
|
||||
std::vector<Standalone<KeyRef>> lowerBounds;
|
||||
int numSampledMutations = 0;
|
||||
for (auto &count : keyOpsCount) {
|
||||
numSampledMutations += count.second;
|
||||
}
|
||||
|
||||
//intervalLength = (numSampledMutations - remainder) / (numApplier - 1)
|
||||
int intervalLength = std::max(numSampledMutations / numAppliers, 1); // minimal length is 1
|
||||
int curCount = 0;
|
||||
int curInterval = 0;
|
||||
|
||||
printf("[INFO] Node:%s calculateAppliersKeyRanges(): numSampledMutations:%d numAppliers:%d intervalLength:%d\n",
|
||||
describeNode().c_str(),
|
||||
numSampledMutations, numAppliers, intervalLength);
|
||||
for (auto &count : keyOpsCount) {
|
||||
if (curCount >= curInterval * intervalLength) {
|
||||
printf("[INFO] Node:%s calculateAppliersKeyRanges(): Add a new key range [%d]:%s: curCount:%d\n",
|
||||
describeNode().c_str(), curInterval, count.first.toString().c_str(), curCount);
|
||||
lowerBounds.push_back(count.first); // The lower bound of the current key range
|
||||
curInterval++;
|
||||
}
|
||||
curCount += count.second;
|
||||
}
|
||||
|
||||
if ( lowerBounds.size() != numAppliers ) {
|
||||
printf("[WARNING] calculateAppliersKeyRanges() WE MAY NOT USE ALL APPLIERS efficiently! num_keyRanges:%ld numAppliers:%d\n",
|
||||
lowerBounds.size(), numAppliers);
|
||||
printLowerBounds(lowerBounds);
|
||||
}
|
||||
|
||||
//ASSERT(lowerBounds.size() <= numAppliers + 1); // We may have at most numAppliers + 1 key ranges
|
||||
if ( lowerBounds.size() > numAppliers ) {
|
||||
printf("[WARNING] Key ranges number:%ld > numAppliers:%d. Merge the last ones\n", lowerBounds.size(), numAppliers);
|
||||
}
|
||||
|
||||
while ( lowerBounds.size() > numAppliers ) {
|
||||
printf("[WARNING] Key ranges number:%ld > numAppliers:%d. Merge the last ones\n", lowerBounds.size(), numAppliers);
|
||||
lowerBounds.pop_back();
|
||||
}
|
||||
|
||||
return lowerBounds;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
|
|||
}
|
||||
when ( RestoreVersionBatchRequest req = waitNext(loaderInterf.finishRestore.getFuture()) ) {
|
||||
requestTypeStr = "finishRestore";
|
||||
exitRole = handleFinishRestoreRequest(req, self, cx);
|
||||
exitRole = handleFinishRestoreRequest(req, self);
|
||||
}
|
||||
when ( wait(exitRole) ) {
|
||||
TraceEvent("FastRestore").detail("RestoreLoaderCore", "ExitRole").detail("NodeID", self->id());
|
||||
|
|
|
@ -34,7 +34,6 @@
|
|||
#include "fdbrpc/fdbrpc.h"
|
||||
#include "fdbserver/CoordinationInterface.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
|
||||
#include "fdbserver/RestoreUtil.h"
|
||||
#include "fdbserver/RestoreCommon.actor.h"
|
||||
#include "fdbserver/RestoreRoleCommon.actor.h"
|
||||
|
@ -54,9 +53,6 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
|
|||
Reference<IBackupContainer> bc; // Backup container is used to read backup files
|
||||
Key bcUrl; // The url used to get the bc
|
||||
|
||||
// Performance statistics
|
||||
double curWorkloadSize;
|
||||
|
||||
void addref() { return ReferenceCounted<RestoreLoaderData>::addref(); }
|
||||
void delref() { return ReferenceCounted<RestoreLoaderData>::delref(); }
|
||||
|
||||
|
@ -76,26 +72,15 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
|
|||
}
|
||||
|
||||
void resetPerVersionBatch() {
|
||||
printf("[INFO]Node:%s resetPerVersionBatch\n", nodeID.toString().c_str());
|
||||
TraceEvent("FastRestore").detail("ResetPerVersionBatchOnLoader", nodeID);
|
||||
RestoreRoleData::resetPerVersionBatch();
|
||||
|
||||
range2Applier.clear();
|
||||
keyOpsCount.clear();
|
||||
numSampledMutations = 0;
|
||||
|
||||
processedFileParams.clear();
|
||||
|
||||
curWorkloadSize = 0;
|
||||
}
|
||||
|
||||
vector<UID> getBusyAppliers() {
|
||||
vector<UID> busyAppliers;
|
||||
for (auto &app : range2Applier) {
|
||||
busyAppliers.push_back(app.second);
|
||||
}
|
||||
return busyAppliers;
|
||||
}
|
||||
|
||||
// Only get the appliers that are responsible for a range
|
||||
std::vector<UID> getWorkingApplierIDs() {
|
||||
std::vector<UID> applierIDs;
|
||||
for ( auto &applier : range2Applier ) {
|
||||
|
@ -110,7 +95,6 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
|
|||
if ( bcUrl == url && bc.isValid() ) {
|
||||
return;
|
||||
}
|
||||
printf("initBackupContainer, url:%s\n", url.toString().c_str());
|
||||
bcUrl = url;
|
||||
bc = IBackupContainer::openContainer(url.toString());
|
||||
}
|
||||
|
|
|
@ -100,7 +100,7 @@ ACTOR static Future<Version> processRestoreRequest(RestoreRequest request, Refer
|
|||
wait( distributeWorkloadPerVersionBatch(self, cx, request, versionBatch->second) );
|
||||
}
|
||||
|
||||
TraceEvent("FastRestore").detail("RestoreCompleted", request.randomUid);
|
||||
TraceEvent("FastRestore").detail("RestoreToVersion", request.targetVersion);
|
||||
return request.targetVersion;
|
||||
}
|
||||
|
||||
|
@ -335,13 +335,16 @@ ACTOR static Future<Void> notifyRestoreCompleted(Reference<RestoreMasterData> se
|
|||
for ( auto &loader : self->loadersInterf ) {
|
||||
requests.push_back( std::make_pair(loader.first, RestoreVersionBatchRequest(self->batchIndex)) );
|
||||
}
|
||||
wait( sendBatchRequests(&RestoreLoaderInterface::finishRestore, self->loadersInterf, requests) );
|
||||
// A loader exits immediately after it receives the request. Master may not receive acks.
|
||||
Future<Void> endLoaders = sendBatchRequests(&RestoreLoaderInterface::finishRestore, self->loadersInterf, requests);
|
||||
|
||||
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
|
||||
requests.clear();
|
||||
for ( auto &applier : self->appliersInterf ) {
|
||||
requests.push_back( std::make_pair(applier.first, RestoreVersionBatchRequest(self->batchIndex)) );
|
||||
}
|
||||
wait( sendBatchRequests(&RestoreApplierInterface::finishRestore, self->appliersInterf, requests) );
|
||||
Future<Void> endApplier = sendBatchRequests(&RestoreApplierInterface::finishRestore, self->appliersInterf, requests);
|
||||
|
||||
wait( delay(5.0) ); // Give some time for loaders and appliers to exit
|
||||
|
||||
// Notify tester that the restore has finished
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
|
|
|
@ -55,15 +55,12 @@ struct VersionBatch {
|
|||
struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMasterData> {
|
||||
// range2Applier is in master and loader node. Loader node uses this to determine which applier a mutation should be sent
|
||||
std::map<Standalone<KeyRef>, UID> range2Applier; // KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
|
||||
|
||||
std::map<Version, VersionBatch> versionBatches; // key is the beginVersion of the version batch
|
||||
|
||||
// Temporary variables to hold files and data to restore
|
||||
std::vector<RestoreFileFR> allFiles; // All backup files to be processed in all version batches
|
||||
std::vector<RestoreFileFR> files; // Backup files to be parsed and applied: range and log files in 1 version batch
|
||||
|
||||
double totalWorkloadSize;
|
||||
double curWorkloadSize;
|
||||
int batchIndex;
|
||||
|
||||
Reference<IBackupContainer> bc; // Backup container is used to read backup files
|
||||
|
@ -72,21 +69,10 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMast
|
|||
void addref() { return ReferenceCounted<RestoreMasterData>::addref(); }
|
||||
void delref() { return ReferenceCounted<RestoreMasterData>::delref(); }
|
||||
|
||||
void printAllBackupFilesInfo() {
|
||||
printf("[INFO] All backup files: num:%ld\n", allFiles.size());
|
||||
for (int i = 0; i < allFiles.size(); ++i) {
|
||||
printf("\t[INFO][File %d] %s\n", i, allFiles[i].toString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
RestoreMasterData() {
|
||||
role = RestoreRole::Master;
|
||||
nodeID = UID();
|
||||
|
||||
batchIndex = 0;
|
||||
curWorkloadSize = 0;
|
||||
totalWorkloadSize = 0;
|
||||
curWorkloadSize = 0;
|
||||
}
|
||||
|
||||
std::string describeNode() {
|
||||
|
@ -151,7 +137,6 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMast
|
|||
printf("[INFO] constructFilesWithVersionRange for num_files:%ld\n", files.size());
|
||||
allFiles.clear();
|
||||
for (int i = 0; i < files.size(); i++) {
|
||||
printf("\t[File:%d] Start %s\n", i, files[i].toString().c_str());
|
||||
Version beginVersion = 0;
|
||||
Version endVersion = 0;
|
||||
if ( files[i].isRange) {
|
||||
|
@ -163,25 +148,17 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMast
|
|||
long blockSize, len;
|
||||
int pos = files[i].fileName.find_last_of("/");
|
||||
std::string fileName = files[i].fileName.substr(pos);
|
||||
printf("\t[File:%d] Log filename:%s, pos:%d\n", i, fileName.c_str(), pos);
|
||||
//printf("\t[File:%d] Log filename:%s, pos:%d\n", i, fileName.c_str(), pos);
|
||||
sscanf(fileName.c_str(), "/log,%ld,%ld,%*[^,],%lu%ln", &beginVersion, &endVersion, &blockSize, &len);
|
||||
printf("\t[File:%d] Log filename:%s produces beginVersion:%ld endVersion:%ld\n",i, fileName.c_str(), beginVersion, endVersion);
|
||||
//printf("\t[File:%d] Log filename:%s produces beginVersion:%ld endVersion:%ld\n",i, fileName.c_str(), beginVersion, endVersion);
|
||||
}
|
||||
files[i].beginVersion = beginVersion;
|
||||
files[i].endVersion = endVersion;
|
||||
printf("\t[File:%d] End %s\n", i, files[i].toString().c_str());
|
||||
ASSERT(beginVersion <= endVersion);
|
||||
allFiles.push_back( files[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void printBackupFilesInfo() {
|
||||
printf("[INFO] The backup files for current batch to load and apply: num:%ld\n", files.size());
|
||||
for (int i = 0; i < files.size(); ++i) {
|
||||
printf("\t[INFO][File %d] %s\n", i, files[i].toString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
void logApplierKeyRange() {
|
||||
TraceEvent("FastRestore").detail("ApplierKeyRangeNum", range2Applier.size());
|
||||
for (auto &applier : range2Applier) {
|
||||
|
@ -189,16 +166,6 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMast
|
|||
}
|
||||
}
|
||||
|
||||
bool isBackupEmpty() {
|
||||
for (int i = 0; i < files.size(); ++i) {
|
||||
if (files[i].fileSize > 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void initBackupContainer(Key url) {
|
||||
if ( bcUrl == url && bc.isValid() ) {
|
||||
return;
|
||||
|
@ -206,12 +173,9 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMast
|
|||
printf("initBackupContainer, url:%s\n", url.toString().c_str());
|
||||
bcUrl = url;
|
||||
bc = IBackupContainer::openContainer(url.toString());
|
||||
//state BackupDescription desc = wait(self->bc->describeBackup());
|
||||
//return Void();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
ACTOR Future<Void> startRestoreMaster(Reference<RestoreMasterData> self, Database cx);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
|
@ -43,7 +43,7 @@ ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handleFinishRestoreRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self, Database cx) {
|
||||
ACTOR Future<Void> handleFinishRestoreRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self) {
|
||||
if ( self->versionBatchStart ) {
|
||||
self->versionBatchStart = false;
|
||||
}
|
||||
|
@ -180,16 +180,4 @@ void printBackupLogKeyHex(Standalone<StringRef> key_input, std::string prefix) {
|
|||
type, getHexString(KeyRef(k, kLen)).c_str(), getHexString(KeyRef(v, vLen)).c_str(), kLen, vLen);
|
||||
|
||||
}
|
||||
printf("----------------------------------------------------------\n");
|
||||
}
|
||||
|
||||
void printLowerBounds(std::vector<Standalone<KeyRef>> lowerBounds) {
|
||||
if ( debug_verbose == false )
|
||||
return;
|
||||
|
||||
printf("[INFO] Print out %ld keys in the lowerbounds\n", lowerBounds.size());
|
||||
for (int i = 0; i < lowerBounds.size(); i++) {
|
||||
printf("\t[INFO][%d] %s\n", i, getHexString(lowerBounds[i]).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ typedef std::map<Version, Standalone<VectorRef<MutationRef>>> VersionedMutations
|
|||
|
||||
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
|
||||
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
|
||||
ACTOR Future<Void> handleFinishRestoreRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self, Database cx);
|
||||
ACTOR Future<Void> handleFinishRestoreRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
|
||||
|
||||
|
||||
// Helper class for reading restore data from a buffer and throwing the right errors.
|
||||
|
@ -134,58 +134,8 @@ public:
|
|||
appliersInterf.clear();
|
||||
}
|
||||
|
||||
std::string describeNode() {
|
||||
std::stringstream ss;
|
||||
ss << "RestoreRoleData role:" << getRoleStr(role) << " nodeID:%s" << nodeID.toString();
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void printRestoreRoleInterfaces() {
|
||||
printf("Dump restore loaders and appliers info:\n");
|
||||
for (auto &loader : loadersInterf) {
|
||||
printf("Loader:%s\n", loader.first.toString().c_str());
|
||||
}
|
||||
|
||||
for (auto &applier : appliersInterf) {
|
||||
printf("Applier:%s\n", applier.first.toString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: To remove this function
|
||||
std::vector<UID> getApplierIDs() {
|
||||
std::vector<UID> applierIDs;
|
||||
for (auto &applier : appliersInterf) {
|
||||
applierIDs.push_back(applier.first);
|
||||
}
|
||||
return applierIDs;
|
||||
}
|
||||
|
||||
// TODO: To remove this function
|
||||
std::vector<UID> getLoaderIDs() {
|
||||
std::vector<UID> loaderIDs;
|
||||
for (auto &loader : loadersInterf) {
|
||||
loaderIDs.push_back(loader.first);
|
||||
}
|
||||
|
||||
return loaderIDs;
|
||||
}
|
||||
|
||||
// TODO: To remove this function
|
||||
std::vector<UID> getWorkerIDs() {
|
||||
std::vector<UID> workerIDs;
|
||||
for (auto &loader : loadersInterf) {
|
||||
workerIDs.push_back(loader.first);
|
||||
}
|
||||
for (auto &applier : appliersInterf) {
|
||||
workerIDs.push_back(applier.first);
|
||||
}
|
||||
|
||||
return workerIDs;
|
||||
}
|
||||
|
||||
virtual std::string describeNode() = 0;
|
||||
};
|
||||
|
||||
void printLowerBounds(std::vector<Standalone<KeyRef>> lowerBounds);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
|
@ -36,7 +36,6 @@
|
|||
enum class RestoreRole {Invalid = 0, Master = 1, Loader, Applier};
|
||||
BINARY_SERIALIZABLE( RestoreRole );
|
||||
std::string getRoleStr(RestoreRole role);
|
||||
|
||||
extern const std::vector<std::string> RestoreRoleStr;
|
||||
extern int numRoles;
|
||||
|
||||
|
@ -50,7 +49,6 @@ struct FastRestoreOpConfig {
|
|||
};
|
||||
extern FastRestoreOpConfig opConfig;
|
||||
|
||||
|
||||
struct RestoreCommonReply {
|
||||
UID id; // unique ID of the server who sends the reply
|
||||
|
||||
|
|
Loading…
Reference in New Issue