update restore status
This commit is contained in:
parent
550f2e2682
commit
a56ba2faf6
|
@ -596,7 +596,7 @@ const KeyRangeRef restoreWorkersKeys(
|
|||
LiteralStringRef("\xff\x02/restoreWorkers/"),
|
||||
LiteralStringRef("\xff\x02/restoreWorkers0")
|
||||
);
|
||||
const KeyRef restoreStatusKey = LiteralStringRef("\xff\x02/restoreStatus");
|
||||
const KeyRef restoreStatusKey = LiteralStringRef("\xff\x02/restoreStatus/");
|
||||
|
||||
|
||||
const KeyRef restoreRequestTriggerKey = LiteralStringRef("\xff\x02/restoreRequestTrigger");
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <algorithm>
|
||||
|
||||
const int min_num_workers = 5; //10; // TODO: This can become a configuration param later
|
||||
const int min_num_workers = 50; //10; // TODO: This can become a configuration param later
|
||||
|
||||
class RestoreConfig;
|
||||
struct RestoreData; // Only declare the struct exist but we cannot use its field
|
||||
|
@ -1981,7 +1981,7 @@ ACTOR Future<Void> receiveMutations(Reference<RestoreData> rd, RestoreCommandInt
|
|||
}
|
||||
rd->kvOps[commitVersion].push_back_deep(rd->kvOps[commitVersion].arena(), mutation);
|
||||
numMutations++;
|
||||
if ( numMutations % 1000 == 1 ) {
|
||||
if ( numMutations % 100000 == 1 ) { // Should be different value in simulation and in real mode
|
||||
printf("[INFO][Applier] Node:%s Receives %d mutations. cur_mutation:%s\n",
|
||||
rd->getNodeID().c_str(), numMutations, mutation.toString().c_str());
|
||||
}
|
||||
|
@ -2304,13 +2304,13 @@ ACTOR static Future<Void> sampleWorkload(Reference<RestoreData> rd, RestoreReque
|
|||
state int64_t loadingCmdIndex = 0;
|
||||
state int64_t sampleIndex = 0;
|
||||
state double totalBackupSizeB = 0;
|
||||
state double samplePercent = 0.01;
|
||||
state double samplePercent = 0.05; // sample 1 data block per samplePercent (0.01) of data. num_sample = 1 / samplePercent
|
||||
|
||||
// We should sample 1% data
|
||||
for (int i = 0; i < rd->files.size(); i++) {
|
||||
totalBackupSizeB += rd->files[i].fileSize;
|
||||
}
|
||||
sampleB = std::max((int) (samplePercent * totalBackupSizeB), 1024 * 1024); // The minimal sample size is 1MB
|
||||
sampleB = std::max((int) (samplePercent * totalBackupSizeB), 10 * 1024 * 1024); // The minimal sample size is 10MB
|
||||
printf("[INFO] totalBackupSizeB:%.1fB (%.1fMB) samplePercent:%.2f, sampleB:%d\n",
|
||||
totalBackupSizeB, totalBackupSizeB / 1024 / 1024, samplePercent, sampleB);
|
||||
|
||||
|
@ -2567,8 +2567,8 @@ ACTOR static Future<Void> distributeWorkload(RestoreCommandInterface interf, Ref
|
|||
ASSERT( numLoaders > 0 );
|
||||
ASSERT( numAppliers > 0 );
|
||||
|
||||
state int loadingSizeMB = numLoaders * 1000; //NOTE: We want to load the entire file in the first version, so we want to make this as large as possible
|
||||
int64_t sampleSizeMB = loadingSizeMB / 100;
|
||||
state int loadingSizeMB = 0; //numLoaders * 1000; //NOTE: We want to load the entire file in the first version, so we want to make this as large as possible
|
||||
int64_t sampleSizeMB = 0; loadingSizeMB / 100; // Will be overwritten. The sampleSizeMB will be calculated based on the batch size
|
||||
|
||||
// TODO: WiP Sample backup files to determine the key range for appliers
|
||||
wait( sampleWorkload(restoreData, request, restoreConfig, sampleSizeMB) );
|
||||
|
@ -3511,6 +3511,7 @@ ACTOR static Future<Void> _finishMX(Reference<ReadYourWritesTransaction> tr, Re
|
|||
double totalSpeed;
|
||||
};
|
||||
|
||||
int restoreStatusIndex = 0;
|
||||
ACTOR static Future<Void> registerStatus(Reference<ReadYourWritesTransaction> tr, struct FastRestoreStatus status) {
|
||||
loop {
|
||||
try {
|
||||
|
@ -3518,15 +3519,18 @@ ACTOR static Future<Void> _finishMX(Reference<ReadYourWritesTransaction> tr, Re
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
tr->set(restoreStatusKeyFor("/curWorkload"), restoreStatusValue(status.curWorkloadSize));
|
||||
tr->set(restoreStatusKeyFor("/curRunningTime"), restoreStatusValue(status.curRunningTime));
|
||||
tr->set(restoreStatusKeyFor("/curSpeed"), restoreStatusValue(status.curSpeed));
|
||||
tr->set(restoreStatusKeyFor("curWorkload" + restoreStatusIndex), restoreStatusValue(status.curWorkloadSize));
|
||||
tr->set(restoreStatusKeyFor("curRunningTime" + restoreStatusIndex), restoreStatusValue(status.curRunningTime));
|
||||
tr->set(restoreStatusKeyFor("curSpeed" + restoreStatusIndex), restoreStatusValue(status.curSpeed));
|
||||
|
||||
tr->set(restoreStatusKeyFor("/totalWorkload"), restoreStatusValue(status.totalWorkloadSize));
|
||||
tr->set(restoreStatusKeyFor("/totalRunningTime"), restoreStatusValue(status.totalRunningTime));
|
||||
tr->set(restoreStatusKeyFor("/totalSpeed"), restoreStatusValue(status.totalSpeed));
|
||||
tr->set(restoreStatusKeyFor("totalWorkload"), restoreStatusValue(status.totalWorkloadSize));
|
||||
tr->set(restoreStatusKeyFor("totalRunningTime"), restoreStatusValue(status.totalRunningTime));
|
||||
tr->set(restoreStatusKeyFor("totalSpeed"), restoreStatusValue(status.totalSpeed));
|
||||
|
||||
wait( tr->commit() );
|
||||
restoreStatusIndex++;
|
||||
printf("[Restore Status][%d] curWorkload:%.2f curRunningtime:%.2f curSpeed:%.2f totalWorkload:%.2f totalRunningTime:%.2f totalSpeed:%.2f\n",
|
||||
restoreStatusIndex, status.curWorkloadSize, status.curRunningTime, status.curSpeed, status.totalWorkloadSize, status.totalRunningTime, status.totalSpeed);
|
||||
|
||||
break;
|
||||
} catch( Error &e ) {
|
||||
|
|
Loading…
Reference in New Issue