FastRestore:Loader:Add metrics counter

This commit is contained in:
Meng Xu 2020-02-09 19:19:32 -08:00
parent fd5b4af05a
commit 1fc793d6a7
5 changed files with 33 additions and 11 deletions

View File

@ -550,6 +550,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( FASTRESTORE_VB_PARALLELISM, 3 ); if( randomize ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; } init( FASTRESTORE_VB_PARALLELISM, 3 ); if( randomize ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; } init( FASTRESTORE_VB_MONITOR_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
// clang-format on // clang-format on

View File

@ -492,6 +492,7 @@ public:
int64_t FASTRESTORE_VB_PARALLELISM; int64_t FASTRESTORE_VB_PARALLELISM;
int64_t FASTRESTORE_VB_MONITOR_DELAY; // How quickly monitor finished version batch int64_t FASTRESTORE_VB_MONITOR_DELAY; // How quickly monitor finished version batch
int64_t FASTRESTORE_VB_LAUNCH_DELAY; int64_t FASTRESTORE_VB_LAUNCH_DELAY;
int64_t FASTRESTORE_ROLE_LOGGING_DELAY;
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false); ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);
}; };

View File

@ -53,8 +53,8 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
Reference<IBackupContainer> bc, RestoreAsset asset); Reference<IBackupContainer> bc, RestoreAsset asset);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader( ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, Version version, std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
RestoreAsset asset); Version version, RestoreAsset asset);
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx) { ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx) {
state Reference<RestoreLoaderData> self = state Reference<RestoreLoaderData> self =
@ -62,6 +62,9 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
state ActorCollection actors(false); state ActorCollection actors(false);
state Future<Void> exitRole = Never(); state Future<Void> exitRole = Never();
actors.add(traceCounters("RestoreLoaderMetrics", self->id(), SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self->counters.cc, self->nodeId.toString() + "/RestoreLoaderMetrics"));
loop { loop {
state std::string requestTypeStr = "[Init]"; state std::string requestTypeStr = "[Init]";
@ -157,8 +160,8 @@ ACTOR Future<Void> _processLoadingParam(LoadingParam param, Reference<LoaderBatc
subAsset.offset = j; subAsset.offset = j;
subAsset.len = std::min<int64_t>(param.blockSize, param.asset.len - j); subAsset.len = std::min<int64_t>(param.blockSize, param.asset.len - j);
if (param.isRangeFile) { if (param.isRangeFile) {
fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(kvOpsPerLPIter, samplesIter, bc, fileParserFutures.push_back(_parseRangeFileToMutationsOnLoader(
param.rangeVersion.get(), subAsset)); kvOpsPerLPIter, samplesIter, &batchData->counters, bc, param.rangeVersion.get(), subAsset));
} else { } else {
// TODO: Sanity check the log file's range is overlapped with the restored version range // TODO: Sanity check the log file's range is overlapped with the restored version range
fileParserFutures.push_back( fileParserFutures.push_back(
@ -589,8 +592,8 @@ void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::ite
// Parsing the data blocks in a range file // Parsing the data blocks in a range file
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader( ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, Reference<IBackupContainer> bc, Version version, std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
RestoreAsset asset) { Version version, RestoreAsset asset) {
state VersionedMutationsMap& kvOps = kvOpsIter->second; state VersionedMutationsMap& kvOps = kvOpsIter->second;
state MutationsVec& sampleMutations = samplesIter->second; state MutationsVec& sampleMutations = samplesIter->second;
@ -646,6 +649,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// Should NOT add prefix or remove surfix for the backup data! // Should NOT add prefix or remove surfix for the backup data!
MutationRef m(MutationRef::Type::SetValue, data[i].key, MutationRef m(MutationRef::Type::SetValue, data[i].key,
data[i].value); // ASSUME: all operation in range file is set. data[i].value); // ASSUME: all operation in range file is set.
cc->loadedRangeBytes += m.totalSize();
// We cache all kv operations into kvOps, and apply all kv operations later in one place // We cache all kv operations into kvOps, and apply all kv operations later in one place
kvOps.insert(std::make_pair(version, MutationsVec())); kvOps.insert(std::make_pair(version, MutationsVec()));
@ -657,6 +661,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
kvOps[version].push_back_deep(kvOps[version].arena(), m); kvOps[version].push_back_deep(kvOps[version].arena(), m);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data // Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) { if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
cc->sampledBytes += m.totalSize();
sampleMutations.push_back_deep(sampleMutations.arena(), m); sampleMutations.push_back_deep(sampleMutations.arena(), m);
} }
} }

View File

@ -54,6 +54,20 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
std::map<LoadingParam, MutationsVec> sampleMutations; std::map<LoadingParam, MutationsVec> sampleMutations;
int numSampledMutations; // The total number of mutations received from sampled data. int numSampledMutations; // The total number of mutations received from sampled data.
// Status counters
struct Counters {
CounterCollection cc;
Counter loadedRangeBytes, loadedLogBytes, sentBytes;
Counter sampledBytes;
Counters(LoaderBatchData* self, UID loaderInterfID, int batchIndex)
: cc("LoaderBatch", loaderInterfID.toString() + ":" + std::to_string(batchIndex)),
loadedRangeBytes("LoadedRangeBytes", cc), loadedLogBytes("LoadedLogBytes", cc), sentBytes("SentBytes", cc),
sampledBytes("SampledBytes", cc) {}
} counters;
explicit LoaderBatchData(UID loaderInterfID, int batchIndex) : counters(this, loaderInterfID, batchIndex) {}
void reset() { void reset() {
processedFileParams.clear(); processedFileParams.clear();
kvOpsPerLP.clear(); kvOpsPerLP.clear();
@ -63,6 +77,8 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
} }
}; };
using LoaderCounters = LoaderBatchData::Counters;
struct LoaderBatchStatus : public ReferenceCounted<LoaderBatchStatus> { struct LoaderBatchStatus : public ReferenceCounted<LoaderBatchStatus> {
Optional<Future<Void>> sendAllRanges; Optional<Future<Void>> sendAllRanges;
Optional<Future<Void>> sendAllLogs; Optional<Future<Void>> sendAllLogs;
@ -108,7 +124,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
void initVersionBatch(int batchIndex) { void initVersionBatch(int batchIndex) {
TraceEvent("FastRestore").detail("InitVersionBatchOnLoader", nodeID); TraceEvent("FastRestore").detail("InitVersionBatchOnLoader", nodeID);
batch[batchIndex] = Reference<LoaderBatchData>(new LoaderBatchData()); batch[batchIndex] = Reference<LoaderBatchData>(new LoaderBatchData(nodeID, batchIndex));
status[batchIndex] = Reference<LoaderBatchStatus>(new LoaderBatchStatus()); status[batchIndex] = Reference<LoaderBatchStatus>(new LoaderBatchStatus());
} }

View File

@ -427,8 +427,7 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<MasterBatchData> batchDat
.detail("BatchIndex", batchIndex) .detail("BatchIndex", batchIndex)
.detail("FileTypeLoadedInVersionBatch", isRangeFile) .detail("FileTypeLoadedInVersionBatch", isRangeFile)
.detail("BeginVersion", versionBatch.beginVersion) .detail("BeginVersion", versionBatch.beginVersion)
.detail("EndVersion", versionBatch.endVersion) .detail("EndVersion", versionBatch.endVersion);
.detail("Files", (files != nullptr ? files->size() : -1));
return Void(); return Void();
} }
@ -687,8 +686,8 @@ ACTOR static Future<Void> collectBackupFiles(Reference<IBackupContainer> bc, std
TraceEvent("FastRestoreMasterPhaseCollectBackupFilesDone") TraceEvent("FastRestoreMasterPhaseCollectBackupFilesDone")
.detail("BackupDesc", desc.toString()) .detail("BackupDesc", desc.toString())
.detail("RangeFiles", rangeFiles.size()) .detail("RangeFiles", rangeFiles->size())
.detail("LogFiles", logFiles.size()); .detail("LogFiles", logFiles->size());
return Void(); return Void();
} }