FastRestore:Improve code style and fix typos

This commit is contained in:
Meng Xu 2020-01-27 18:13:14 -08:00
parent 75dc34f775
commit 141609e80a
12 changed files with 37 additions and 42 deletions

View File

@ -424,13 +424,13 @@ struct RestoreSendMutationsToAppliersRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, rangeToApplier, useRangeFile, batchIndex, reply);
serializer(ar, batchIndex, rangeToApplier, useRangeFile, reply);
}
std::string toString() {
std::stringstream ss;
ss << "RestoreSendMutationsToAppliersRequest keyToAppliers.size:" << rangeToApplier.size()
<< " batchIndex:" << batchIndex << " useRangeFile:" << useRangeFile;
ss << "RestoreSendMutationsToAppliersRequest batchIndex:" << batchIndex
<< " keyToAppliers.size:" << rangeToApplier.size() << " useRangeFile:" << useRangeFile;
return ss.str();
}
};
@ -492,7 +492,7 @@ struct RestoreVersionBatchRequest : TimedRequest {
struct RestoreFinishRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 13018413;
bool terminate;
bool terminate; // Should role exit?
ReplyPromise<RestoreCommonReply> reply;

View File

@ -737,7 +737,8 @@ const KeyRangeRef restoreApplierKeys(LiteralStringRef("\xff\x02/restoreApplier/"
const KeyRef restoreApplierTxnValue = LiteralStringRef("1");
// restoreApplierKeys: track atomic transaction progress to ensure applying atomicOp exactly once
// Version is passed in as LittleEndian, it must be converted to BigEndian to maintain ordering in lexical order
// Version and batchIndex is passed in as LittleEndian,
// they must be converted to BigEndian to maintain ordering in lexical order
const Key restoreApplierKeyFor(UID const& applierID, int64_t batchIndex, Version version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreApplierKeys.begin);

View File

@ -542,10 +542,10 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
init( FASTRESTORE_SAMPLING_PERCENT, 1 ); if( randomize ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 512.0 ); if( randomize ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0); if( randomize ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
// clang-format on

View File

@ -484,7 +484,7 @@ public:
double FASTRESTORE_SAMPLING_PERCENT;
int64_t FASTRESTORE_NUM_LOADERS;
int64_t FASTRESTORE_NUM_APPLIERS;
// FASTRESTORE_TXN_BATCH_MAX_BYTES is used when applier applies multiple mutations in a transaction to DB
// FASTRESTORE_TXN_BATCH_MAX_BYTES is txn size appliers apply mutations
double FASTRESTORE_TXN_BATCH_MAX_BYTES;
// FASTRESTORE_VERSIONBATCH_MAX_BYTES is the maximum data size in each version batch
double FASTRESTORE_VERSIONBATCH_MAX_BYTES;

View File

@ -147,11 +147,6 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
curFilePos.set(req.version);
}
TraceEvent("FastRestoreApplierPhaseReceiveMutationsDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("ProcessedFileVersion", curFilePos.get())
.detail("Request", req.toString());
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
return Void();
}
@ -226,7 +221,7 @@ struct DBApplyProgress {
// Rollback to the starting point of the uncommitted-and-failed transaction to
// re-execute uncommitted txn
void rollback() {
TraceEvent(SevWarn, "FastRestore_ApplyTxnError")
TraceEvent(SevWarn, "FastRestoreApplyTxnError")
.detail("TxnStatusFailed", curTxnId)
.detail("ApplierApplyToDB", applierId)
.detail("UncommittedTxnId", uncommittedTxnId)
@ -254,7 +249,7 @@ struct DBApplyProgress {
bool hasError() { return lastTxnHasError; }
void setTxnError(Error& e) {
TraceEvent(SevWarnAlways, "FastRestore_ApplyTxnError")
TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError")
.detail("TxnStatus", "?")
.detail("ApplierApplyToDB", applierId)
.detail("TxnId", curTxnId)
@ -306,13 +301,13 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
Key end = restoreApplierKeyFor(applierID, batchIndex, std::numeric_limits<int64_t>::max());
Standalone<RangeResultRef> txnIds = wait(tr->getRange(KeyRangeRef(begin, end), CLIENT_KNOBS->TOO_MANY));
if (txnIds.size() > 0) {
TraceEvent(SevError, "FastRestore_ApplyTxnStateNotClean").detail("TxnIds", txnIds.size());
TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean").detail("TxnIds", txnIds.size());
for (auto& kv : txnIds) {
UID id;
int64_t index;
Version txnId;
std::tie(id, index, txnId) = decodeRestoreApplierKey(kv.key);
TraceEvent(SevError, "FastRestore_ApplyTxnStateNotClean")
TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean")
.detail("Applier", id)
.detail("BatchIndex", index)
.detail("ResidueTxnID", txnId);
@ -337,7 +332,7 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
progress.rollback();
continue;
} else {
TraceEvent(SevWarn, "FastRestore_ApplyTxnError")
TraceEvent(SevWarn, "FastRestoreApplyTxnError")
.detail("TxnStatusSucceeded", progress.curTxnId)
.detail("ApplierApplyToDB", applierID)
.detail("CurIteratorVersion", progress.curItInCurTxn->first)
@ -410,7 +405,7 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
}
progress.nextTxn();
} catch (Error& e) {
TraceEvent(SevWarnAlways, "FastRestore_ApplyTxnError")
TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError")
.detail("TxnStatus", "?")
.detail("ApplierApplyToDB", applierID)
.detail("TxnId", progress.curTxnId)
@ -452,7 +447,7 @@ ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<Applie
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
Database cx) {
// Ensure batch i is applied before batch (i+1)
// Ensure batch (i-1) is applied before batch i
wait(self->finishedBatch.whenAtLeast(req.batchIndex-1));
state bool isDuplicated = true;

View File

@ -114,7 +114,8 @@ struct RestoreApplierData : RestoreRoleData, public ReferenceCounted<RestoreAppl
~RestoreApplierData() = default;
void resetPerVersionBatch(int batchIndex) {
void initVersionBatch(int batchIndex) {
TraceEvent("FastRestoreApplierInitVersionBatch", id()).detail("BatchIndex", batchIndex);
batch[batchIndex] = Reference<ApplierBatchData>(new ApplierBatchData());
}

View File

@ -222,26 +222,26 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
.detail("LoaderSendStatus", batchStatus->toString());
if (!req.useRangeFile) {
if (!batchStatus->sendAllLogs.present()) {
if (!batchStatus->sendAllLogs.present()) { // Has not sent
batchStatus->sendAllLogs = Never();
isDuplicated = false;
TraceEvent(SevInfo, "FastRestoreSendMutationsProcessLogRequest", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile);
ASSERT(!batchStatus->sendAllRanges.present());
} else if (!batchStatus->sendAllLogs.get().isReady()) {
} else if (!batchStatus->sendAllLogs.get().isReady()) { // In the process of sending
TraceEvent(SevDebug, "FastRestoreSendMutationsWaitDuplicateLogRequest", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile);
wait(batchStatus->sendAllLogs.get());
} else {
} else { // Already sent
TraceEvent(SevDebug, "FastRestoreSendMutationsSkipDuplicateLogRequest", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile);
}
} else {
if (!batchStatus->sendAllRanges.present()) {
batchStatus->sendAllRanges = Never(); // Signal the loader is sending kv parsed from range files
batchStatus->sendAllRanges = Never();
isDuplicated = false;
TraceEvent(SevInfo, "FastRestoreSendMutationsProcessRangeRequest", self->id())
.detail("BatchIndex", req.batchIndex)
@ -285,8 +285,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
return Void();
}
// TODO: This function can be revised better
// Assume: kvOps data are from the same file.
// Assume: kvOps data are from the same RestoreAsset.
// Input: pkvOps: versioned kv mutation for the asset in the version batch (batchIndex)
// isRangeFile: is pkvOps from range file? Let receiver (applier) know if the mutation is log mutation;
// pRangeToApplier: range to applierID mapping, deciding which applier is responsible for which range

View File

@ -106,8 +106,8 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
return ss.str();
}
void resetPerVersionBatch(int batchIndex) {
TraceEvent("FastRestore").detail("ResetPerVersionBatchOnLoader", nodeID);
void initVersionBatch(int batchIndex) {
TraceEvent("FastRestore").detail("InitVersionBatchOnLoader", nodeID);
batch[batchIndex] = Reference<LoaderBatchData>(new LoaderBatchData());
status[batchIndex] = Reference<LoaderBatchStatus>(new LoaderBatchStatus());
}

View File

@ -266,7 +266,6 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
self->batchStatus[batchIndex] = Reference<MasterBatchStatus>(new MasterBatchStatus());
fBatches.push_back(
distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
// wait(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
batchIndex++;
}
TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("VersionBatchEnd", batchIndex);
@ -278,7 +277,6 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
self->batchStatus[batchIndex] = Reference<MasterBatchStatus>(new MasterBatchStatus());
fBatches.push_back(
distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
// wait(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, versionBatch->second));
batchIndex--;
}
TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("VersionBatchEnd", batchIndex);
@ -356,7 +354,7 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<MasterBatchData> batchDat
}
batchStatus->raStatus[param.asset] = RestoreAssetStatus::Loading;
assets.push_back(param.asset);
loader++;
++loader;
++paramIdx;
}
TraceEvent(files->size() != paramIdx ? SevError : SevInfo, "FastRestoreLoadFiles").detail("Files", files->size()).detail("LoadParams", paramIdx);
@ -426,6 +424,7 @@ ACTOR static Future<Void> sendMutationsFromLoaders(Reference<MasterBatchData> ba
state std::vector<RestoreCommonReply> replies;
wait(getBatchReplies(&RestoreLoaderInterface::sendMutations, loadersInterf, requests, &replies));
// Update status and sanity check
for (auto& reply : replies) {
RestoreSendStatus status = batchStatus->loadStatus[reply.id];
if ((status == RestoreSendStatus::SendingRanges || status == RestoreSendStatus::SendingLogs)) {
@ -707,7 +706,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
.detail("BatchIndex", batchIndex)
.detail("Attention", "Actor should not be invoked twice for the same batch index");
}
// wait(getBatchReplies(&RestoreApplierInterface::applyToDB, appliersInterf, requests, &replies));
ASSERT(batchData->applyToDB.present());
wait(batchData->applyToDB.get());

View File

@ -142,9 +142,8 @@ struct RestoreMasterData : RestoreRoleData, public ReferenceCounted<RestoreMaste
~RestoreMasterData() = default;
void resetPerVersionBatch(int batchIndex) {
TraceEvent("FastRestore")
.detail("RestoreMaster", "ResetPerVersionBatch")
void initVersionBatch(int batchIndex) {
TraceEvent("FastRestoreMasterInitVersionBatch", id())
.detail("VersionBatchIndex", batchIndex);
}

View File

@ -51,18 +51,19 @@ void handleFinishRestoreRequest(const RestoreFinishRequest& req, Reference<Resto
req.reply.send(RestoreCommonReply(self->id()));
}
// Multiple version batches may execute in parallel and init their version batches
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self) {
TraceEvent("FastRestoreRolePhaseInitVersionBatch", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("Role", getRoleStr(self->role))
.detail("VersionBatchId", self->versionBatchId.get());
.detail("VersionBatchNotifiedVersion", self->versionBatchId.get());
// batchId is continuous. (req.batchIndex-1) is the id of the just finished batch.
wait(self->versionBatchId.whenAtLeast(req.batchIndex - 1));
if (self->versionBatchId.get() == req.batchIndex - 1) {
self->resetPerVersionBatch(req.batchIndex);
TraceEvent("FastRestore")
.detail("InitVersionBatch", req.batchIndex)
self->initVersionBatch(req.batchIndex);
TraceEvent("FastRestoreInitVersionBatch")
.detail("BatchIndex", req.batchIndex)
.detail("Role", getRoleStr(self->role))
.detail("Node", self->id());
self->versionBatchId.set(req.batchIndex);

View File

@ -122,7 +122,7 @@ public:
UID id() const { return nodeID; }
virtual void resetPerVersionBatch(int batchIndex) = 0;
virtual void initVersionBatch(int batchIndex) = 0;
virtual void resetPerRestoreRequest() = 0;