FastRestore:Apply clang-format against master

This commit is contained in:
Meng Xu 2020-02-18 16:41:59 -08:00
parent 132f5aa9ba
commit 94d799552e
8 changed files with 44 additions and 32 deletions

View File

@ -105,9 +105,7 @@ struct MutationRef {
} }
} }
bool isAtomicOp() const { bool isAtomicOp() const { return (ATOMIC_MASK & (1 << type)) != 0; }
return (ATOMIC_MASK & (1<<type)) != 0;
}
template <class Ar> template <class Ar>
void serialize( Ar& ar ) { void serialize( Ar& ar ) {

View File

@ -142,7 +142,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
batchData->counters.receivedBytes += mutation.totalSize(); batchData->counters.receivedBytes += mutation.totalSize();
batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified
batchData->counters.receivedMutations += 1; batchData->counters.receivedMutations += 1;
batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type) mutation.type) ? 1 : 0; batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0;
// Sanity check // Sanity check
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
if (isRangeMutation(mutation)) { if (isRangeMutation(mutation)) {

View File

@ -58,8 +58,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex) Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex)
: cc("ApplierBatch", applierInterfID.toString() + ":" + std::to_string(batchIndex)), : cc("ApplierBatch", applierInterfID.toString() + ":" + std::to_string(batchIndex)),
receivedBytes("ReceivedBytes", cc), receivedMutations("ReceivedMutations", cc), receivedBytes("ReceivedBytes", cc), receivedMutations("ReceivedMutations", cc),
receivedAtomicOps("ReceivedAtomicOps", cc), receivedAtomicOps("ReceivedAtomicOps", cc), receivedWeightedBytes("ReceivedWeightedMutations", cc),
receivedWeightedBytes("ReceivedWeightedMutations", cc),
appliedWeightedBytes("AppliedWeightedBytes", cc), appliedMutations("AppliedMutations", cc), appliedWeightedBytes("AppliedWeightedBytes", cc), appliedMutations("AppliedMutations", cc),
appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc) {} appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc) {}
} counters; } counters;
@ -69,8 +68,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
explicit ApplierBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) { explicit ApplierBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) {
pollMetrics = pollMetrics =
traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc, traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY,
nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex)); &counters.cc, nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));
TraceEvent("FastRestoreApplierMetricsCreated").detail("Node", nodeID); TraceEvent("FastRestoreApplierMetricsCreated").detail("Node", nodeID);
} }
~ApplierBatchData() = default; ~ApplierBatchData() = default;

View File

@ -38,7 +38,8 @@ void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mv
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs); VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap, SerializedMutationListMap* mutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, const RestoreAsset& asset); std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset);
void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self); void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self); ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self);
@ -521,7 +522,7 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
// Parse the kv pair (version, serialized_mutation), which are the results parsed from log file, into // Parse the kv pair (version, serialized_mutation), which are the results parsed from log file, into
// (version, <K, V, mutationType>) pair; // (version, <K, V, mutationType>) pair;
// Put the parsed versioned mutations into *pkvOps. // Put the parsed versioned mutations into *pkvOps.
// //
// Input key: [commitVersion_of_the_mutation_batch:uint64_t]; // Input key: [commitVersion_of_the_mutation_batch:uint64_t];
// Input value: [includeVersion:uint64_t][val_length:uint32_t][encoded_list_of_mutations], where // Input value: [includeVersion:uint64_t][val_length:uint32_t][encoded_list_of_mutations], where
// includeVersion is the serialized version in the batch commit. It is not the commitVersion in Input key. // includeVersion is the serialized version in the batch commit. It is not the commitVersion in Input key.
@ -532,7 +533,8 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
// a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent] // a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent]
void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter, void _parseSerializedMutation(std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* pmutationMap, SerializedMutationListMap* pmutationMap,
std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc, const RestoreAsset& asset) { std::map<LoadingParam, MutationsVec>::iterator samplesIter, LoaderCounters* cc,
const RestoreAsset& asset) {
VersionedMutationsMap& kvOps = kvOpsIter->second; VersionedMutationsMap& kvOps = kvOpsIter->second;
MutationsVec& samples = samplesIter->second; MutationsVec& samples = samplesIter->second;
SerializedMutationListMap& mutationMap = *pmutationMap; SerializedMutationListMap& mutationMap = *pmutationMap;

View File

@ -70,8 +70,8 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) { explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) {
pollMetrics = pollMetrics =
traceCounters("FastRestoreLoaderMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc, traceCounters("FastRestoreLoaderMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY,
nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex)); &counters.cc, nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex));
TraceEvent("FastRestoreLoaderMetricsCreated").detail("Node", nodeID); TraceEvent("FastRestoreLoaderMetricsCreated").detail("Node", nodeID);
} }

View File

@ -115,8 +115,7 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker
break; break;
} }
TraceEvent("FastRestoreMaster", masterData->id()) TraceEvent("FastRestoreMaster", masterData->id()).detail("WorkerNode", workerInterf.first);
.detail("WorkerNode", workerInterf.first);
requests.emplace_back(workerInterf.first, RestoreRecruitRoleRequest(role, nodeIndex)); requests.emplace_back(workerInterf.first, RestoreRecruitRoleRequest(role, nodeIndex));
nodeIndex++; nodeIndex++;
} }
@ -134,7 +133,9 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> masterWorker
TraceEvent(SevError, "FastRestoreMaster").detail("RecruitRestoreRolesInvalidRole", reply.role); TraceEvent(SevError, "FastRestoreMaster").detail("RecruitRestoreRolesInvalidRole", reply.role);
} }
} }
TraceEvent("FastRestoreRecruitRestoreRolesDone", masterData->id()).detail("Workers", masterWorker->workerInterfaces.size()).detail("RecruitedRoles", replies.size()); TraceEvent("FastRestoreRecruitRestoreRolesDone", masterData->id())
.detail("Workers", masterWorker->workerInterfaces.size())
.detail("RecruitedRoles", replies.size());
return Void(); return Void();
} }
@ -149,9 +150,11 @@ ACTOR Future<Void> distributeRestoreSysInfo(Reference<RestoreWorkerData> masterW
requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo)); requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo));
} }
TraceEvent("FastRestoreDistributeRestoreSysInfoToLoaders", masterData->id()).detail("Loaders", masterData->loadersInterf.size()); TraceEvent("FastRestoreDistributeRestoreSysInfoToLoaders", masterData->id())
.detail("Loaders", masterData->loadersInterf.size());
wait(sendBatchRequests(&RestoreLoaderInterface::updateRestoreSysInfo, masterData->loadersInterf, requests)); wait(sendBatchRequests(&RestoreLoaderInterface::updateRestoreSysInfo, masterData->loadersInterf, requests));
TraceEvent("FastRestoreDistributeRestoreSysInfoToLoadersDone", masterData->id()).detail("Loaders", masterData->loadersInterf.size()); TraceEvent("FastRestoreDistributeRestoreSysInfoToLoadersDone", masterData->id())
.detail("Loaders", masterData->loadersInterf.size());
return Void(); return Void();
} }
@ -171,7 +174,7 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
state int numTries = 0; state int numTries = 0;
state int restoreIndex = 0; state int restoreIndex = 0;
TraceEvent("FastRestoreMasterWaitOnRestoreRequests", masterData->id()); TraceEvent("FastRestoreMasterWaitOnRestoreRequests", masterData->id());
// lock DB for restore // lock DB for restore
numTries = 0; numTries = 0;
@ -184,10 +187,10 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkDatabaseLock(tr, randomUID)); wait(checkDatabaseLock(tr, randomUID));
TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("DBIsLocked", randomUID); TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("DBIsLocked", randomUID);
break; break;
} catch (Error& e) { } catch (Error& e) {
TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("CheckLockError", e.what()); TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("CheckLockError", e.what());
TraceEvent(numTries > 50 ? SevError : SevWarnAlways, "FastRestoreMayFail") TraceEvent(numTries > 50 ? SevError : SevWarnAlways, "FastRestoreMayFail")
.detail("Reason", "DB is not properly locked") .detail("Reason", "DB is not properly locked")
.detail("ExpectedLockID", randomUID); .detail("ExpectedLockID", randomUID);
@ -202,7 +205,8 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
try { try {
for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) { for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) {
RestoreRequest& request = restoreRequests[restoreIndex]; RestoreRequest& request = restoreRequests[restoreIndex];
TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("RestoreRequestInfo", request.toString()); TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id())
.detail("RestoreRequestInfo", request.toString());
// TODO: Initialize MasterData and all loaders and appliers' data for each restore request! // TODO: Initialize MasterData and all loaders and appliers' data for each restore request!
self->resetPerRestoreRequest(); self->resetPerRestoreRequest();
wait(success(processRestoreRequest(self, cx, request))); wait(success(processRestoreRequest(self, cx, request)));
@ -229,7 +233,6 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
ASSERT_WE_THINK(false); // This unlockDatabase should always succeed, we think. ASSERT_WE_THINK(false); // This unlockDatabase should always succeed, we think.
} }
TraceEvent("FastRestoreMasterRestoreCompleted", self->id()); TraceEvent("FastRestoreMasterRestoreCompleted", self->id());
return Void(); return Void();
@ -237,7 +240,10 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
ACTOR static Future<Void> monitorFinishedVersion(Reference<RestoreMasterData> self, RestoreRequest request) { ACTOR static Future<Void> monitorFinishedVersion(Reference<RestoreMasterData> self, RestoreRequest request) {
loop { loop {
TraceEvent("FastRestoreMonitorFinishedVersion", self->id()).detail("RestoreRequest", request.toString()).detail("BatchIndex", self->finishedBatch.get()).detail("Now", now()); TraceEvent("FastRestoreMonitorFinishedVersion", self->id())
.detail("RestoreRequest", request.toString())
.detail("BatchIndex", self->finishedBatch.get())
.detail("Now", now());
wait(delay(SERVER_KNOBS->FASTRESTORE_VB_MONITOR_DELAY)); wait(delay(SERVER_KNOBS->FASTRESTORE_VB_MONITOR_DELAY));
} }
} }
@ -287,7 +293,11 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
wait(self->runningVersionBatches.onChange()); wait(self->runningVersionBatches.onChange());
} }
int batchIndex = versionBatch->batchIndex; int batchIndex = versionBatch->batchIndex;
TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("BatchIndex", batchIndex).detail("BatchSize", versionBatch->size).detail("RunningVersionBatches", self->runningVersionBatches.get()).detail("Start", now()); TraceEvent("FastRestoreMasterDispatchVersionBatches")
.detail("BatchIndex", batchIndex)
.detail("BatchSize", versionBatch->size)
.detail("RunningVersionBatches", self->runningVersionBatches.get())
.detail("Start", now());
self->batch[batchIndex] = Reference<MasterBatchData>(new MasterBatchData()); self->batch[batchIndex] = Reference<MasterBatchData>(new MasterBatchData());
self->batchStatus[batchIndex] = Reference<MasterBatchStatus>(new MasterBatchStatus()); self->batchStatus[batchIndex] = Reference<MasterBatchStatus>(new MasterBatchStatus());
fBatches.push_back(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, *versionBatch)); fBatches.push_back(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, *versionBatch));

View File

@ -250,7 +250,7 @@ ACTOR Future<Void> startRestoreWorker(Reference<RestoreWorkerData> self, Restore
// RestoreMaster is the leader // RestoreMaster is the leader
ACTOR Future<Void> monitorleader(Reference<AsyncVar<RestoreWorkerInterface>> leader, Database cx, ACTOR Future<Void> monitorleader(Reference<AsyncVar<RestoreWorkerInterface>> leader, Database cx,
RestoreWorkerInterface myWorkerInterf) { RestoreWorkerInterface myWorkerInterf) {
wait(delay(5.0)); wait(delay(5.0));
TraceEvent("FastRestoreWorker", myWorkerInterf.id()).detail("MonitorLeader", "StartLeaderElection"); TraceEvent("FastRestoreWorker", myWorkerInterf.id()).detail("MonitorLeader", "StartLeaderElection");
state int count = 0; state int count = 0;
loop { loop {
@ -285,7 +285,10 @@ ACTOR Future<Void> monitorleader(Reference<AsyncVar<RestoreWorkerInterface>> lea
} }
} }
TraceEvent("FastRestoreWorker", myWorkerInterf.id()).detail("MonitorLeader", "FinishLeaderElection").detail("Leader", leaderInterf.id()).detail("IamLeader", leaderInterf == myWorkerInterf); TraceEvent("FastRestoreWorker", myWorkerInterf.id())
.detail("MonitorLeader", "FinishLeaderElection")
.detail("Leader", leaderInterf.id())
.detail("IamLeader", leaderInterf == myWorkerInterf);
return Void(); return Void();
} }
@ -328,7 +331,7 @@ ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> connFile, Loca
try { try {
Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST, true, locality); Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST, true, locality);
wait(reportErrors(_restoreWorker(cx, locality), "RestoreWorker")); wait(reportErrors(_restoreWorker(cx, locality), "RestoreWorker"));
} catch (Error& e) { } catch (Error& e) {
TraceEvent("FastRestoreWorker").detail("Error", e.what()); TraceEvent("FastRestoreWorker").detail("Error", e.what());
throw e; throw e;
} }

View File

@ -1516,10 +1516,10 @@ ACTOR Future<Void> fdbd(
// SOMEDAY: start the services on the machine in a staggered fashion in simulation? // SOMEDAY: start the services on the machine in a staggered fashion in simulation?
// Endpoints should be registered first before any process trying to connect to it. // Endpoints should be registered first before any process trying to connect to it.
// So coordinationServer actor should be the first one executed before any other. // So coordinationServer actor should be the first one executed before any other.
if ( coordFolder.size() ) { if (coordFolder.size()) {
// SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files // SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up
actors.push_back(fileNotFoundToNever( // their files
coordinationServer(coordFolder))); actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder)));
} }
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder)); state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));