FastRestore:Sampling:Resolve review comments
This commit is contained in:
parent
b7db2dec7e
commit
c6b36dbffb
|
@ -337,7 +337,7 @@ struct RestoreLoadFileReply : TimedRequest {
|
|||
|
||||
std::string toString() {
|
||||
std::stringstream ss;
|
||||
ss << "LoadingParam:" << param.toString() << "samples.size:" << samples.size();
|
||||
ss << "LoadingParam:" << param.toString() << " samples.size:" << samples.size();
|
||||
return ss.str();
|
||||
}
|
||||
};
|
||||
|
@ -396,7 +396,7 @@ struct RestoreSendMutationVectorVersionedRequest : TimedRequest {
|
|||
Version prevVersion, version; // version is the commitVersion of the mutation vector.
|
||||
int fileIndex; // Unique index for a backup file
|
||||
bool isRangeFile;
|
||||
MutationsVec mutations; // All mutations are at version
|
||||
MutationsVec mutations; // All mutations at the same version parsed by one loader
|
||||
|
||||
ReplyPromise<RestoreCommonReply> reply;
|
||||
|
||||
|
|
|
@ -531,7 +531,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
// Fast Restore
|
||||
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
|
||||
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
|
||||
init( FASTRESTORE_SAMPLING_RATE, 0.01 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_RATE = deterministicRandom()->random01(); } // smallest granularity is 0.01%
|
||||
init( FASTRESTORE_SAMPLING_PERCENT, 0.01 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01(); } // smallest granularity is 0.01%
|
||||
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -471,7 +471,7 @@ public:
|
|||
// Fast Restore
|
||||
int64_t FASTRESTORE_FAILURE_TIMEOUT;
|
||||
int64_t FASTRESTORE_HEARTBEAT_INTERVAL;
|
||||
double FASTRESTORE_SAMPLING_RATE;
|
||||
double FASTRESTORE_SAMPLING_PERCENT;
|
||||
|
||||
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL);
|
||||
};
|
||||
|
|
|
@ -97,9 +97,6 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVec
|
|||
// Assume: self->processedFileState[req.fileIndex] will not be erased while the actor is active.
|
||||
// Note: Insert new items into processedFileState will not invalidate the reference.
|
||||
state NotifiedVersion& curFilePos = self->processedFileState[req.fileIndex];
|
||||
// Applier will cache the mutations at each version. Once receive all mutations, applier will apply them to DB
|
||||
state Version commitVersion = req.version;
|
||||
state int mIndex = 0;
|
||||
|
||||
TraceEvent("FastRestore")
|
||||
.detail("ApplierNode", self->id())
|
||||
|
@ -110,7 +107,8 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendMutationVec
|
|||
wait(curFilePos.whenAtLeast(req.prevVersion));
|
||||
|
||||
if (curFilePos.get() == req.prevVersion) {
|
||||
|
||||
Version commitVersion = req.version;
|
||||
int mIndex = 0;
|
||||
VectorRef<MutationRef> mutations(req.mutations);
|
||||
if (self->kvOps.find(commitVersion) == self->kvOps.end()) {
|
||||
self->kvOps.insert(std::make_pair(commitVersion, VectorRef<MutationRef>()));
|
||||
|
|
|
@ -252,8 +252,8 @@ ACTOR Future<Void> sendMutationsToApplier(Reference<RestoreLoaderData> self, Ver
|
|||
applierMutationsSize[applierID] = 0.0;
|
||||
}
|
||||
state Version commitVersion = kvOp->first;
|
||||
state int mIndex = 0;
|
||||
state MutationRef kvm;
|
||||
int mIndex = 0;
|
||||
MutationRef kvm;
|
||||
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
|
||||
kvm = kvOp->second[mIndex];
|
||||
// Send the mutation to applier
|
||||
|
@ -478,8 +478,8 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
|
|||
.detail("CommitVersion", commitVersion)
|
||||
.detail("ParsedMutation", mutation.toString());
|
||||
kvOps[commitVersion].push_back_deep(kvOps[commitVersion].arena(), mutation);
|
||||
// Sampling (FASTRESTORE_SAMPLING_RATE * 100 %) data
|
||||
if (deterministicRandom()->randomInt(0, 10000) < 10000 * SERVER_KNOBS->FASTRESTORE_SAMPLING_RATE) {
|
||||
// Sampling (FASTRESTORE_SAMPLING_PERCENT * 100 %) data
|
||||
if (deterministicRandom()->randomInt(0, 10000) < 10000 * SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
|
||||
samples.push_back_deep(samples.arena(), mutation);
|
||||
}
|
||||
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
|
||||
|
@ -549,8 +549,8 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
|
|||
|
||||
ASSERT_WE_THINK(kvOps.find(version) != kvOps.end());
|
||||
kvOps[version].push_back_deep(kvOps[version].arena(), m);
|
||||
// Sampling (FASTRESTORE_SAMPLING_RATE * 100 %) data
|
||||
if (deterministicRandom()->randomInt(0, 10000) < 10000 * SERVER_KNOBS->FASTRESTORE_SAMPLING_RATE) {
|
||||
// Sampling (FASTRESTORE_SAMPLING_PERCENT * 100 %) data
|
||||
if (deterministicRandom()->randomInt(0, 10000) < 10000 * SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
|
||||
sampleMutations.push_back_deep(sampleMutations.arena(), m);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,8 @@ struct RestoreMasterData;
|
|||
|
||||
struct RestoreSimpleRequest;
|
||||
|
||||
typedef std::map<Version, MutationsVec> VersionedMutationsMap;
|
||||
//typedef std::map<Version, MutationsVec> VersionedMutationsMap;
|
||||
using VersionedMutationsMap = std::map<Version, MutationsVec>;
|
||||
|
||||
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
|
||||
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
|
||||
|
|
|
@ -38,7 +38,8 @@
|
|||
//#define SevFRMutationInfo SevVerbose
|
||||
#define SevFRMutationInfo SevInfo
|
||||
|
||||
typedef Standalone<VectorRef<MutationRef>> MutationsVec;
|
||||
//typedef Standalone<VectorRef<MutationRef>> MutationsVec;
|
||||
using MutationsVec = Standalone<VectorRef<MutationRef>>;
|
||||
|
||||
enum class RestoreRole { Invalid = 0, Master = 1, Loader, Applier };
|
||||
BINARY_SERIALIZABLE(RestoreRole);
|
||||
|
|
|
@ -615,11 +615,9 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
state std::vector<Future<Version>> restores;
|
||||
state std::vector<Standalone<StringRef>> restoreTags;
|
||||
// state int restoreIndex = 0;
|
||||
|
||||
// Restore each range by calling backupAgent.restore()
|
||||
printf("Prepare for restore requests. Number of backupRanges:%d\n", self->backupRanges.size());
|
||||
// state Transaction tr1(cx);
|
||||
loop {
|
||||
tr1.reset();
|
||||
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
|
|
@ -3,7 +3,6 @@ testTitle=BackupAndRestore
|
|||
; nodeCount=30000
|
||||
nodeCount=1000
|
||||
transactionsPerSecond=500.0
|
||||
; transactionsPerSecond=50.0
|
||||
; transactionsPerSecond=2500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
|
|
Loading…
Reference in New Issue