Merge pull request #3600 from xumengpanda/mengxu/fr-fix-large-pkg-PR

Fast Restore: Fix the sampling packet size at sampling phase
This commit is contained in:
Meng Xu 2020-08-10 13:53:24 -07:00 committed by GitHub
commit c5c6906a3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 233 additions and 66 deletions

View File

@ -53,6 +53,7 @@ struct RestoreSendVersionedMutationsRequest;
struct RestoreSysInfo;
struct RestoreApplierInterface;
struct RestoreFinishRequest;
struct RestoreSamplesRequest;
// RestoreSysInfo includes information each (type of) restore roles should know.
// At this moment, it only include appliers. We keep the name for future extension.
@ -203,6 +204,31 @@ struct RestoreApplierInterface : RestoreRoleInterface {
std::string toString() { return nodeID.toString(); }
};
struct RestoreControllerInterface : RestoreRoleInterface {
constexpr static FileIdentifier file_identifier = 54253047;
RequestStream<RestoreSamplesRequest> samples;
bool operator==(RestoreWorkerInterface const& r) const { return id() == r.id(); }
bool operator!=(RestoreWorkerInterface const& r) const { return id() != r.id(); }
RestoreControllerInterface() {
role = RestoreRole::Controller;
nodeID = deterministicRandom()->randomUniqueID();
}
NetworkAddress address() const { return samples.getEndpoint().addresses.address; }
void initEndpoints() { samples.getEndpoint(TaskPriority::LoadBalancedEndpoint); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, *(RestoreRoleInterface*)this, samples);
}
std::string toString() { return nodeID.toString(); }
};
// RestoreAsset uniquely identifies the work unit done by restore roles;
// It is used to ensure exact-once processing on restore loader and applier;
// By combining all RestoreAssets across all verstion batches, restore should process all mutations in
@ -361,22 +387,25 @@ struct RestoreRecruitRoleReply : TimedRequest {
struct RestoreRecruitRoleRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 87022360;
RestoreControllerInterface ci;
RestoreRole role;
int nodeIndex; // Each role is a node
ReplyPromise<RestoreRecruitRoleReply> reply;
RestoreRecruitRoleRequest() : role(RestoreRole::Invalid) {}
explicit RestoreRecruitRoleRequest(RestoreRole role, int nodeIndex) : role(role), nodeIndex(nodeIndex) {}
explicit RestoreRecruitRoleRequest(RestoreControllerInterface ci, RestoreRole role, int nodeIndex)
: ci(ci), role(role), nodeIndex(nodeIndex) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, role, nodeIndex, reply);
serializer(ar, ci, role, nodeIndex, reply);
}
std::string printable() {
std::stringstream ss;
ss << "RestoreRecruitRoleRequest Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex;
ss << "RestoreRecruitRoleRequest Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex
<< " RestoreController:" << ci.id().toString();
return ss.str();
}
@ -410,26 +439,47 @@ struct RestoreSysInfoRequest : TimedRequest {
}
};
struct RestoreLoadFileReply : TimedRequest {
constexpr static FileIdentifier file_identifier = 34077902;
struct RestoreSamplesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 34077901;
UID id; // deduplicate data
int batchIndex;
SampledMutationsVec samples; // sampled mutations
LoadingParam param;
MutationsVec samples; // sampled mutations
bool isDuplicated; // true if loader thinks the request is a duplicated one
ReplyPromise<RestoreCommonReply> reply;
RestoreLoadFileReply() = default;
explicit RestoreLoadFileReply(LoadingParam param, MutationsVec samples, bool isDuplicated)
: param(param), samples(samples), isDuplicated(isDuplicated) {}
RestoreSamplesRequest() = default;
explicit RestoreSamplesRequest(UID id, int batchIndex, SampledMutationsVec samples)
: id(id), batchIndex(batchIndex), samples(samples) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, param, samples, isDuplicated);
serializer(ar, id, batchIndex, samples, reply);
}
std::string toString() {
std::stringstream ss;
ss << "LoadingParam:" << param.toString() << " samples.size:" << samples.size()
<< " isDuplicated:" << isDuplicated;
ss << "ID:" << id.toString() << " BatchIndex:" << batchIndex << " samples:" << samples.size();
return ss.str();
}
};
struct RestoreLoadFileReply : TimedRequest {
constexpr static FileIdentifier file_identifier = 34077902;
LoadingParam param;
bool isDuplicated; // true if loader thinks the request is a duplicated one
RestoreLoadFileReply() = default;
explicit RestoreLoadFileReply(LoadingParam param, bool isDuplicated) : param(param), isDuplicated(isDuplicated) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, param, isDuplicated);
}
std::string toString() {
std::stringstream ss;
ss << "LoadingParam:" << param.toString() << " isDuplicated:" << isDuplicated;
return ss.str();
}
};

View File

@ -611,7 +611,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_NUM_LOADERS, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 2.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; } // too small value may increase chance of TooManyFile error
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 30 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
@ -638,6 +638,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_NOT_WRITE_DB, false ); // Perf test only: set it to true will cause simulation failure
init( FASTRESTORE_USE_RANGE_FILE, true ); // Perf test only: set it to false will cause simulation failure
init( FASTRESTORE_USE_LOG_FILE, true ); // Perf test only: set it to false will cause simulation failure
init( FASTRESTORE_SAMPLE_MSG_BYTES, 1048576 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLE_MSG_BYTES = deterministicRandom()->random01() * 2048;}
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );

View File

@ -570,6 +570,7 @@ public:
bool FASTRESTORE_NOT_WRITE_DB; // do not write result to DB. Only for dev testing
bool FASTRESTORE_USE_RANGE_FILE; // use range file in backup
bool FASTRESTORE_USE_LOG_FILE; // use log file in backup
int64_t FASTRESTORE_SAMPLE_MSG_BYTES; // sample message desired size
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.

View File

@ -185,7 +185,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
state int retries = 0;
state double numOps = 0;
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
TraceEvent("FastRestoreApplierClearRangeMutationsStart", applierID)
TraceEvent(delayTime > 5 ? SevWarnAlways : SevInfo, "FastRestoreApplierClearRangeMutationsStart", applierID)
.detail("BatchIndex", batchIndex)
.detail("Ranges", ranges.size())
.detail("DelayTime", delayTime);
@ -558,7 +558,10 @@ ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Referen
wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx));
wait(applyStagingKeys(batchData, applierID, batchIndex, cx));
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID).detail("BatchIndex", batchIndex);
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID)
.detail("BatchIndex", batchIndex)
.detail("AppliedBytes", batchData->appliedBytes)
.detail("ReceivedBytes", batchData->receivedBytes);
return Void();
}

View File

@ -256,8 +256,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
long receiveMutationReqs;
// Stats
double receivedBytes;
double appliedBytes;
long receivedBytes;
long appliedBytes;
// Status counters
struct Counters {
@ -284,7 +284,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
explicit ApplierBatchData(UID nodeID, int batchIndex)
: counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM),
vbState(ApplierVersionBatchState::NOT_INIT) {
vbState(ApplierVersionBatchState::NOT_INIT), receiveMutationReqs(0), receivedBytes(0), appliedBytes(0) {
pollMetrics = traceCounters(format("FastRestoreApplierMetrics%d", batchIndex), nodeID,
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));

View File

@ -73,9 +73,43 @@ ACTOR static Future<Void> checkRolesLiveness(Reference<RestoreControllerData> se
void splitKeyRangeForAppliers(Reference<ControllerBatchData> batchData,
std::map<UID, RestoreApplierInterface> appliersInterf, int batchIndex);
ACTOR Future<Void> sampleBackups(Reference<RestoreControllerData> self, RestoreControllerInterface ci) {
loop {
try {
RestoreSamplesRequest req = waitNext(ci.samples.getFuture());
TraceEvent(SevDebug, "FastRestoreControllerSampleBackups")
.detail("SampleID", req.id)
.detail("BatchIndex", req.batchIndex)
.detail("Samples", req.samples.size());
ASSERT(req.batchIndex < self->batch.size());
Reference<ControllerBatchData> batch = self->batch[req.batchIndex];
if (batch->sampleMsgs.find(req.id) != batch->sampleMsgs.end()) {
req.reply.send(RestoreCommonReply(req.id));
continue;
}
batch->sampleMsgs.insert(req.id);
for (auto& m : req.samples) {
batch->samples.addMetric(m.key, m.size);
batch->samplesSize += m.size;
}
req.reply.send(RestoreCommonReply(req.id));
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestoreControllerSampleBackupsError", self->id()).error(e);
break;
}
}
return Void();
}
ACTOR Future<Void> startRestoreController(Reference<RestoreWorkerData> controllerWorker, Database cx) {
state Reference<RestoreControllerData> self = Reference<RestoreControllerData>(new RestoreControllerData());
state ActorCollectionNoErrors actors;
state ActorCollection actors(false);
ASSERT(controllerWorker.isValid());
ASSERT(controllerWorker->controllerInterf.present());
state Reference<RestoreControllerData> self =
Reference<RestoreControllerData>(new RestoreControllerData(controllerWorker->controllerInterf.get().id()));
try {
// recruitRestoreRoles must come after controllerWorker has finished collectWorkerInterface
@ -85,6 +119,7 @@ ACTOR Future<Void> startRestoreController(Reference<RestoreWorkerData> controlle
actors.add(checkRolesLiveness(self));
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreController"));
actors.add(sampleBackups(self, controllerWorker->controllerInterf.get()));
wait(startProcessRestoreRequests(self, cx));
} catch (Error& e) {
@ -107,6 +142,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
.detail("NumLoaders", SERVER_KNOBS->FASTRESTORE_NUM_LOADERS)
.detail("NumAppliers", SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS);
ASSERT(controllerData->loadersInterf.empty() && controllerData->appliersInterf.empty());
ASSERT(controllerWorker->controllerInterf.present());
ASSERT(controllerData.isValid());
ASSERT(SERVER_KNOBS->FASTRESTORE_NUM_LOADERS > 0 && SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS > 0);
@ -129,7 +165,8 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
}
TraceEvent("FastRestoreController", controllerData->id()).detail("WorkerNode", workerInterf.first);
requests.emplace_back(workerInterf.first, RestoreRecruitRoleRequest(role, nodeIndex));
requests.emplace_back(workerInterf.first,
RestoreRecruitRoleRequest(controllerWorker->controllerInterf.get(), role, nodeIndex));
nodeIndex++;
}
@ -146,6 +183,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
TraceEvent(SevError, "FastRestoreController").detail("RecruitRestoreRolesInvalidRole", reply.role);
}
}
controllerData->recruitedRoles.send(Void());
TraceEvent("FastRestoreRecruitRestoreRolesDone", controllerData->id())
.detail("Workers", controllerWorker->workerInterfaces.size())
.detail("RecruitedRoles", replies.size());
@ -229,13 +267,13 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreControllerData>
} catch (Error& e) {
if (restoreIndex < restoreRequests.size()) {
TraceEvent(SevError, "FastRestoreControllerProcessRestoreRequestsFailed", self->id())
.detail("RestoreRequest", restoreRequests[restoreIndex].toString())
.error(e);
.error(e)
.detail("RestoreRequest", restoreRequests[restoreIndex].toString());
} else {
TraceEvent(SevError, "FastRestoreControllerProcessRestoreRequestsFailed", self->id())
.error(e)
.detail("RestoreRequests", restoreRequests.size())
.detail("RestoreIndex", restoreIndex)
.error(e);
.detail("RestoreIndex", restoreIndex);
}
}
@ -270,6 +308,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
state Version targetVersion =
wait(collectBackupFiles(self->bc, &rangeFiles, &logFiles, &minRangeVersion, cx, request));
ASSERT(targetVersion > 0);
ASSERT(minRangeVersion != MAX_VERSION); // otherwise, all mutations will be skipped
std::sort(rangeFiles.begin(), rangeFiles.end());
std::sort(logFiles.begin(), logFiles.end(), [](RestoreFileFR const& f1, RestoreFileFR const& f2) -> bool {
@ -453,12 +492,6 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<ControllerBatchData> batc
.detail("RestoreAsset", reply.param.asset.toString())
.detail("UnexpectedReply", reply.toString());
}
// Update sampled data
for (int i = 0; i < reply.samples.size(); ++i) {
MutationRef mutation = reply.samples[i];
batchData->samples.addMetric(mutation.param1, mutation.weightedTotalSize());
batchData->samplesSize += mutation.weightedTotalSize();
}
}
// Sanity check: all restore assets status should be Loaded
@ -737,6 +770,9 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
*minRangeVersion = std::min(*minRangeVersion, file.version);
}
}
if (MAX_VERSION == *minRangeVersion) {
*minRangeVersion = 0; // If no range file, range version must be 0 so that we apply all mutations
}
if (SERVER_KNOBS->FASTRESTORE_USE_LOG_FILE) {
for (const LogFile& f : restorable.get().logs) {
@ -1007,6 +1043,8 @@ ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreControllerData
// Update the most recent time when controller receives hearbeat from each loader and applier
ACTOR static Future<Void> updateHeartbeatTime(Reference<RestoreControllerData> self) {
wait(self->recruitedRoles.getFuture());
int numRoles = self->loadersInterf.size() + self->appliersInterf.size();
state std::map<UID, RestoreLoaderInterface>::iterator loader = self->loadersInterf.begin();
state std::map<UID, RestoreApplierInterface>::iterator applier = self->appliersInterf.begin();

View File

@ -74,9 +74,11 @@ struct ControllerBatchData : public ReferenceCounted<ControllerBatchData> {
// sent.
// KeyRef is the inclusive lower bound of the key range the applier (UID) is responsible for
std::map<Key, UID> rangeToApplier;
Optional<Future<Void>> applyToDB;
IndexedSet<Key, int64_t> samples; // sample of range and log files
double samplesSize; // sum of the metric of all samples
Optional<Future<Void>> applyToDB;
std::set<UID> sampleMsgs; // deduplicate sample messages
ControllerBatchData() = default;
~ControllerBatchData() = default;
@ -150,9 +152,9 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
void addref() { return ReferenceCounted<RestoreControllerData>::addref(); }
void delref() { return ReferenceCounted<RestoreControllerData>::delref(); }
RestoreControllerData() {
RestoreControllerData(UID interfId) {
role = RestoreRole::Controller;
nodeID = UID();
nodeID = interfId;
runningVersionBatches.set(0);
}

View File

@ -26,6 +26,7 @@
#include "fdbclient/BackupAgent.actor.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/StorageMetrics.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -41,7 +42,7 @@ void splitMutation(const KeyRangeMap<UID>& krMap, MutationRef m, Arena& mvector_
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
@ -56,13 +57,14 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
Reference<IBackupContainer> bc, RestoreAsset asset);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
Version version, RestoreAsset asset);
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
Reference<IBackupContainer> bc, Version version, RestoreAsset asset);
ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx) {
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx,
RestoreControllerInterface ci) {
state Reference<RestoreLoaderData> self =
Reference<RestoreLoaderData>(new RestoreLoaderData(loaderInterf.id(), nodeIndex));
Reference<RestoreLoaderData>(new RestoreLoaderData(loaderInterf.id(), nodeIndex, ci));
state ActorCollection actors(false);
state Future<Void> exitRole = Never();
@ -113,7 +115,8 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
}
}
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestoreLoaderError", self->id())
TraceEvent(e.code() == error_code_broken_promise ? SevError : SevWarnAlways, "FastRestoreLoaderError",
self->id())
.detail("RequestType", requestTypeStr)
.error(e, true);
actors.clear(false);
@ -125,11 +128,13 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
}
static inline bool _logMutationTooOld(KeyRangeMap<Version>* pRangeVersions, KeyRangeRef keyRange, Version v) {
ASSERT(pRangeVersions != nullptr);
auto ranges = pRangeVersions->intersectingRanges(keyRange);
Version minVersion = MAX_VERSION;
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
minVersion = std::min(minVersion, r->value());
}
ASSERT(minVersion != MAX_VERSION); // pRangeVersions is initialized as entired keyspace, ranges cannot be empty
return minVersion >= v;
}
@ -177,8 +182,8 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
KeyRangeMap<Version>* pRangeVersions, NotifiedVersion* processedFileOffset,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
RestoreAsset asset) {
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
Reference<IBackupContainer> bc, RestoreAsset asset) {
state Standalone<StringRef> buf = makeString(asset.len);
state Reference<IAsyncFile> file = wait(bc->readFile(asset.filename));
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
@ -262,9 +267,13 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
.detail("CommitVersion", msgVersion.toString())
.detail("ParsedMutation", mutation.toString());
it->second.push_back_deep(it->second.arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
samplesIter->second.push_back_deep(samplesIter->second.arena(), mutation);
cc->loadedLogBytes += mutation.totalSize();
// Sampling data similar to SS sample kvs
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samplesIter->second.push_back_deep(samplesIter->second.arena(),
SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
}
@ -294,7 +303,7 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = batchData->kvOpsPerLP.end();
state std::map<LoadingParam, MutationsVec>::iterator samplesIter = batchData->sampleMutations.end();
state std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter = batchData->sampleMutations.end();
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestoreLoaderProcessLoadingParam", loaderID).detail("LoadingParam", param.toString());
@ -306,7 +315,7 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
bool inserted;
std::tie(kvOpsPerLPIter, inserted) = batchData->kvOpsPerLP.emplace(param, VersionedMutationsMap());
ASSERT(inserted);
std::tie(samplesIter, inserted) = batchData->sampleMutations.emplace(param, MutationsVec());
std::tie(samplesIter, inserted) = batchData->sampleMutations.emplace(param, SampledMutationsVec());
ASSERT(inserted);
for (int64_t j = param.asset.offset; j < param.asset.len; j += param.blockSize) {
@ -380,7 +389,41 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ASSERT(it != batchData->processedFileParams.end());
wait(it->second); // wait on the processing of the req.param.
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param], isDuplicated));
// Send sampled mutations back to controller: batchData->sampleMutations[req.param]
std::vector<Future<RestoreCommonReply>> fSendSamples;
SampledMutationsVec& samples = batchData->sampleMutations[req.param];
SampledMutationsVec sampleBatch = SampledMutationsVec(); // sampleBatch: Standalone pointer to the created object
long sampleBatchSize = 0;
for (int i = 0; i < samples.size(); ++i) {
sampleBatchSize += samples[i].totalSize();
sampleBatch.push_back_deep(sampleBatch.arena(), samples[i]); // TODO: may not need deep copy
if (sampleBatchSize >= SERVER_KNOBS->FASTRESTORE_SAMPLE_MSG_BYTES) {
fSendSamples.push_back(self->ci.samples.getReply(
RestoreSamplesRequest(deterministicRandom()->randomUniqueID(), req.batchIndex, sampleBatch)));
sampleBatchSize = 0;
sampleBatch = SampledMutationsVec();
}
}
if (sampleBatchSize > 0) {
fSendSamples.push_back(self->ci.samples.getReply(
RestoreSamplesRequest(deterministicRandom()->randomUniqueID(), req.batchIndex, sampleBatch)));
sampleBatchSize = 0;
}
try {
state int samplesMessages = fSendSamples.size();
wait(waitForAll(fSendSamples));
} catch (Error& e) { // In case ci.samples throws broken_promise due to unstable network
if (e.code() == error_code_broken_promise) {
TraceEvent(SevWarnAlways, "FastRestoreLoaderPhaseLoadFileSendSamples")
.detail("SamplesMessages", samplesMessages);
} else {
TraceEvent(SevError, "FastRestoreLoaderPhaseLoadFileSendSamplesUnexpectedError").error(e, true);
}
}
// Ack restore controller the param is processed
req.reply.send(RestoreLoadFileReply(req.param, isDuplicated));
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFileDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("ProcessLoadParam", req.param.toString());
@ -730,10 +773,10 @@ bool concatenateBackupMutationForLogFile(SerializedMutationListMap* pMutationMap
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset) {
VersionedMutationsMap& kvOps = kvOpsIter->second;
MutationsVec& samples = samplesIter->second;
SampledMutationsVec& samples = samplesIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap;
TraceEvent(SevFRMutationInfo, "FastRestoreLoaderParseSerializedLogMutation")
@ -813,10 +856,11 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
ASSERT(sub < std::numeric_limits<int32_t>::max()); // range file mutation uses int32_max as subversion
it.first->second.push_back_deep(it.first->second.arena(), mutation);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
cc->sampledLogBytes += mutation.totalSize();
samples.push_back_deep(samples.arena(), mutation);
// Sampling data similar to how SS sample bytes
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(mutation.param1, mutation.param2));
if (sampleInfo.inSample) {
cc->sampledLogBytes += sampleInfo.sampledSize;
samples.push_back_deep(samples.arena(), SampledMutation(mutation.param1, sampleInfo.sampledSize));
}
ASSERT_WE_THINK(kLen >= 0 && kLen < val.size());
ASSERT_WE_THINK(vLen >= 0 && vLen < val.size());
@ -832,10 +876,10 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
// asset: RestoreAsset about which backup data should be parsed
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, Reference<IBackupContainer> bc,
Version version, RestoreAsset asset) {
std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter, LoaderCounters* cc,
Reference<IBackupContainer> bc, Version version, RestoreAsset asset) {
state VersionedMutationsMap& kvOps = kvOpsIter->second;
state MutationsVec& sampleMutations = samplesIter->second;
state SampledMutationsVec& sampleMutations = samplesIter->second;
TraceEvent(SevFRDebugInfo, "FastRestoreDecodedRangeFile")
.detail("Filename", asset.filename)
@ -913,9 +957,10 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
it.first->second.push_back_deep(it.first->second.arena(), m);
// Sampling (FASTRESTORE_SAMPLING_PERCENT%) data
if (deterministicRandom()->random01() * 100 < SERVER_KNOBS->FASTRESTORE_SAMPLING_PERCENT) {
cc->sampledRangeBytes += m.totalSize();
sampleMutations.push_back_deep(sampleMutations.arena(), m);
ByteSampleInfo sampleInfo = isKeyValueInSample(KeyValueRef(m.param1, m.param2));
if (sampleInfo.inSample) {
cc->sampledRangeBytes += sampleInfo.sampledSize;
sampleMutations.push_back_deep(sampleMutations.arena(), SampledMutation(m.param1, sampleInfo.sampledSize));
}
}

View File

@ -70,7 +70,7 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
std::map<Key, UID> rangeToApplier;
// Sampled mutations to be sent back to restore controller
std::map<LoadingParam, MutationsVec> sampleMutations;
std::map<LoadingParam, SampledMutationsVec> sampleMutations;
int numSampledMutations; // The total number of mutations received from sampled data.
Future<Void> pollMetrics;
@ -132,6 +132,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
// buffered data per version batch
std::map<int, Reference<LoaderBatchData>> batch;
std::map<int, Reference<LoaderBatchStatus>> status;
RestoreControllerInterface ci;
KeyRangeMap<Version> rangeVersions;
@ -141,7 +142,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
void addref() { return ReferenceCounted<RestoreLoaderData>::addref(); }
void delref() { return ReferenceCounted<RestoreLoaderData>::delref(); }
explicit RestoreLoaderData(UID loaderInterfID, int assignedIndex) {
explicit RestoreLoaderData(UID loaderInterfID, int assignedIndex, RestoreControllerInterface ci) : ci(ci) {
nodeID = loaderInterfID;
nodeIndex = assignedIndex;
role = RestoreRole::Loader;
@ -191,7 +192,8 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
}
};
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx);
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx,
RestoreControllerInterface ci);
#include "flow/unactorcompiler.h"
#endif

View File

@ -100,6 +100,7 @@ public:
std::map<UID, RestoreLoaderInterface> loadersInterf; // UID: loaderInterf's id
std::map<UID, RestoreApplierInterface> appliersInterf; // UID: applierInterf's id
Promise<Void> recruitedRoles; // sent when loaders and appliers are recruited
NotifiedVersion versionBatchId; // The index of the version batch that has been initialized and put into pipeline
NotifiedVersion finishedBatch; // The highest batch index all appliers have applied mutations

View File

@ -58,9 +58,26 @@ struct VersionedMutation {
}
};
struct SampledMutation {
KeyRef key;
long size;
explicit SampledMutation(KeyRef key, long size) : key(key), size(size) {}
explicit SampledMutation(Arena& arena, const SampledMutation& sm) : key(arena, sm.key), size(sm.size) {}
SampledMutation() = default;
int totalSize() { return key.size() + sizeof(size); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, size);
}
};
using MutationsVec = Standalone<VectorRef<MutationRef>>;
using LogMessageVersionVec = Standalone<VectorRef<LogMessageVersion>>;
using VersionedMutationsVec = Standalone<VectorRef<VersionedMutation>>;
using SampledMutationsVec = Standalone<VectorRef<SampledMutation>>;
enum class RestoreRole { Invalid = 0, Controller = 1, Loader, Applier };
BINARY_SERIALIZABLE(RestoreRole);

View File

@ -88,6 +88,7 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
if (req.role == RestoreRole::Loader) {
ASSERT(!self->loaderInterf.present());
self->controllerInterf = req.ci;
self->loaderInterf = RestoreLoaderInterface();
self->loaderInterf.get().initEndpoints();
RestoreLoaderInterface& recruited = self->loaderInterf.get();
@ -100,12 +101,13 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
DUMPTOKEN(recruited.finishVersionBatch);
DUMPTOKEN(recruited.collectRestoreRoleInterfaces);
DUMPTOKEN(recruited.finishRestore);
actors->add(restoreLoaderCore(self->loaderInterf.get(), req.nodeIndex, cx));
actors->add(restoreLoaderCore(self->loaderInterf.get(), req.nodeIndex, cx, req.ci));
TraceEvent("FastRestoreWorker").detail("RecruitedLoaderNodeIndex", req.nodeIndex);
req.reply.send(
RestoreRecruitRoleReply(self->loaderInterf.get().id(), RestoreRole::Loader, self->loaderInterf.get()));
} else if (req.role == RestoreRole::Applier) {
ASSERT(!self->applierInterf.present());
self->controllerInterf = req.ci;
self->applierInterf = RestoreApplierInterface();
self->applierInterf.get().initEndpoints();
RestoreApplierInterface& recruited = self->applierInterf.get();
@ -202,6 +204,10 @@ ACTOR Future<Void> startRestoreWorkerLeader(Reference<RestoreWorkerData> self, R
// TODO: Needs to keep this monitor's future. May use actorCollection
state Future<Void> workersFailureMonitor = monitorWorkerLiveness(self);
RestoreControllerInterface recruited;
DUMPTOKEN(recruited.samples);
self->controllerInterf = recruited;
wait(startRestoreController(self, cx) || workersFailureMonitor);
return Void();

View File

@ -49,6 +49,7 @@ struct RestoreWorkerData : NonCopyable, public ReferenceCounted<RestoreWorkerDa
std::map<UID, RestoreWorkerInterface> workerInterfaces; // UID is worker's node id, RestoreWorkerInterface is worker's communication workerInterface
// Restore Roles
Optional<RestoreControllerInterface> controllerInterf;
Optional<RestoreLoaderInterface> loaderInterf;
Optional<RestoreApplierInterface> applierInterf;