diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index ecbb7ac678..be5821cfa4 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -105,9 +105,7 @@ struct MutationRef { } } - bool isAtomicOp() const { - return (ATOMIC_MASK & (1< void serialize( Ar& ar ) { diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index c084767569..0a714bd488 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -142,7 +142,7 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMu batchData->counters.receivedBytes += mutation.totalSize(); batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified batchData->counters.receivedMutations += 1; - batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type) mutation.type) ? 1 : 0; + batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0; // Sanity check if (g_network->isSimulated()) { if (isRangeMutation(mutation)) { diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 82bff8506e..e06e772dca 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -58,8 +58,7 @@ struct ApplierBatchData : public ReferenceCounted { Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex) : cc("ApplierBatch", applierInterfID.toString() + ":" + std::to_string(batchIndex)), receivedBytes("ReceivedBytes", cc), receivedMutations("ReceivedMutations", cc), - receivedAtomicOps("ReceivedAtomicOps", cc), - receivedWeightedBytes("ReceivedWeightedMutations", cc), + receivedAtomicOps("ReceivedAtomicOps", cc), receivedWeightedBytes("ReceivedWeightedMutations", cc), appliedWeightedBytes("AppliedWeightedBytes", cc), appliedMutations("AppliedMutations", cc), appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc) {} } counters; @@ -69,8 +68,8 @@ struct ApplierBatchData : public ReferenceCounted { explicit ApplierBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) { pollMetrics = - traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc, - nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex)); + traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, + &counters.cc, nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex)); TraceEvent("FastRestoreApplierMetricsCreated").detail("Node", nodeID); } ~ApplierBatchData() = default; diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index b449e0e0b4..8eb9bc2e4b 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -38,7 +38,8 @@ void splitMutation(std::map* pRangeToApplier, MutationRef m, Arena& mv VectorRef& mvector, Arena& nodeIDs_arena, VectorRef& nodeIDs); void _parseSerializedMutation(std::map::iterator kvOpsIter, SerializedMutationListMap* mutationMap, - std::map::iterator samplesIter, LoaderCounters* cc, const RestoreAsset& asset); + std::map::iterator samplesIter, LoaderCounters* cc, + const RestoreAsset& asset); void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference self); ACTOR Future handleLoadFileRequest(RestoreLoadFileRequest req, Reference self); @@ -521,7 +522,7 @@ bool concatenateBackupMutationForLogFile(std::map, Standal // Parse the kv pair (version, serialized_mutation), which are the results parsed from log file, into // (version, ) pair; // Put the parsed versioned mutations into *pkvOps. -// +// // Input key: [commitVersion_of_the_mutation_batch:uint64_t]; // Input value: [includeVersion:uint64_t][val_length:uint32_t][encoded_list_of_mutations], where // includeVersion is the serialized version in the batch commit. It is not the commitVersion in Input key. @@ -532,7 +533,8 @@ bool concatenateBackupMutationForLogFile(std::map, Standal // a mutation is encoded as [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][keyContent][valueContent] void _parseSerializedMutation(std::map::iterator kvOpsIter, SerializedMutationListMap* pmutationMap, - std::map::iterator samplesIter, LoaderCounters* cc, const RestoreAsset& asset) { + std::map::iterator samplesIter, LoaderCounters* cc, + const RestoreAsset& asset) { VersionedMutationsMap& kvOps = kvOpsIter->second; MutationsVec& samples = samplesIter->second; SerializedMutationListMap& mutationMap = *pmutationMap; diff --git a/fdbserver/RestoreLoader.actor.h b/fdbserver/RestoreLoader.actor.h index 051d30dff1..d53bbf4412 100644 --- a/fdbserver/RestoreLoader.actor.h +++ b/fdbserver/RestoreLoader.actor.h @@ -70,8 +70,8 @@ struct LoaderBatchData : public ReferenceCounted { explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) { pollMetrics = - traceCounters("FastRestoreLoaderMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc, - nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex)); + traceCounters("FastRestoreLoaderMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, + &counters.cc, nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex)); TraceEvent("FastRestoreLoaderMetricsCreated").detail("Node", nodeID); } diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index 59ade604c5..9c1f828211 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -115,8 +115,7 @@ ACTOR Future recruitRestoreRoles(Reference masterWorker break; } - TraceEvent("FastRestoreMaster", masterData->id()) - .detail("WorkerNode", workerInterf.first); + TraceEvent("FastRestoreMaster", masterData->id()).detail("WorkerNode", workerInterf.first); requests.emplace_back(workerInterf.first, RestoreRecruitRoleRequest(role, nodeIndex)); nodeIndex++; } @@ -134,7 +133,9 @@ ACTOR Future recruitRestoreRoles(Reference masterWorker TraceEvent(SevError, "FastRestoreMaster").detail("RecruitRestoreRolesInvalidRole", reply.role); } } - TraceEvent("FastRestoreRecruitRestoreRolesDone", masterData->id()).detail("Workers", masterWorker->workerInterfaces.size()).detail("RecruitedRoles", replies.size()); + TraceEvent("FastRestoreRecruitRestoreRolesDone", masterData->id()) + .detail("Workers", masterWorker->workerInterfaces.size()) + .detail("RecruitedRoles", replies.size()); return Void(); } @@ -149,9 +150,11 @@ ACTOR Future distributeRestoreSysInfo(Reference masterW requests.emplace_back(loader.first, RestoreSysInfoRequest(sysInfo)); } - TraceEvent("FastRestoreDistributeRestoreSysInfoToLoaders", masterData->id()).detail("Loaders", masterData->loadersInterf.size()); + TraceEvent("FastRestoreDistributeRestoreSysInfoToLoaders", masterData->id()) + .detail("Loaders", masterData->loadersInterf.size()); wait(sendBatchRequests(&RestoreLoaderInterface::updateRestoreSysInfo, masterData->loadersInterf, requests)); - TraceEvent("FastRestoreDistributeRestoreSysInfoToLoadersDone", masterData->id()).detail("Loaders", masterData->loadersInterf.size()); + TraceEvent("FastRestoreDistributeRestoreSysInfoToLoadersDone", masterData->id()) + .detail("Loaders", masterData->loadersInterf.size()); return Void(); } @@ -171,7 +174,7 @@ ACTOR Future startProcessRestoreRequests(Reference self state int numTries = 0; state int restoreIndex = 0; - TraceEvent("FastRestoreMasterWaitOnRestoreRequests", masterData->id()); + TraceEvent("FastRestoreMasterWaitOnRestoreRequests", masterData->id()); // lock DB for restore numTries = 0; @@ -184,10 +187,10 @@ ACTOR Future startProcessRestoreRequests(Reference self tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); wait(checkDatabaseLock(tr, randomUID)); - TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("DBIsLocked", randomUID); + TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("DBIsLocked", randomUID); break; } catch (Error& e) { - TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("CheckLockError", e.what()); + TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("CheckLockError", e.what()); TraceEvent(numTries > 50 ? SevError : SevWarnAlways, "FastRestoreMayFail") .detail("Reason", "DB is not properly locked") .detail("ExpectedLockID", randomUID); @@ -202,7 +205,8 @@ ACTOR Future startProcessRestoreRequests(Reference self try { for (restoreIndex = 0; restoreIndex < restoreRequests.size(); restoreIndex++) { RestoreRequest& request = restoreRequests[restoreIndex]; - TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()).detail("RestoreRequestInfo", request.toString()); + TraceEvent("FastRestoreMasterProcessRestoreRequests", masterData->id()) + .detail("RestoreRequestInfo", request.toString()); // TODO: Initialize MasterData and all loaders and appliers' data for each restore request! self->resetPerRestoreRequest(); wait(success(processRestoreRequest(self, cx, request))); @@ -229,7 +233,6 @@ ACTOR Future startProcessRestoreRequests(Reference self ASSERT_WE_THINK(false); // This unlockDatabase should always succeed, we think. } - TraceEvent("FastRestoreMasterRestoreCompleted", self->id()); return Void(); @@ -237,7 +240,10 @@ ACTOR Future startProcessRestoreRequests(Reference self ACTOR static Future monitorFinishedVersion(Reference self, RestoreRequest request) { loop { - TraceEvent("FastRestoreMonitorFinishedVersion", self->id()).detail("RestoreRequest", request.toString()).detail("BatchIndex", self->finishedBatch.get()).detail("Now", now()); + TraceEvent("FastRestoreMonitorFinishedVersion", self->id()) + .detail("RestoreRequest", request.toString()) + .detail("BatchIndex", self->finishedBatch.get()) + .detail("Now", now()); wait(delay(SERVER_KNOBS->FASTRESTORE_VB_MONITOR_DELAY)); } } @@ -287,7 +293,11 @@ ACTOR static Future processRestoreRequest(Reference wait(self->runningVersionBatches.onChange()); } int batchIndex = versionBatch->batchIndex; - TraceEvent("FastRestoreMasterDispatchVersionBatches").detail("BatchIndex", batchIndex).detail("BatchSize", versionBatch->size).detail("RunningVersionBatches", self->runningVersionBatches.get()).detail("Start", now()); + TraceEvent("FastRestoreMasterDispatchVersionBatches") + .detail("BatchIndex", batchIndex) + .detail("BatchSize", versionBatch->size) + .detail("RunningVersionBatches", self->runningVersionBatches.get()) + .detail("Start", now()); self->batch[batchIndex] = Reference(new MasterBatchData()); self->batchStatus[batchIndex] = Reference(new MasterBatchStatus()); fBatches.push_back(distributeWorkloadPerVersionBatch(self, batchIndex, cx, request, *versionBatch)); diff --git a/fdbserver/RestoreWorker.actor.cpp b/fdbserver/RestoreWorker.actor.cpp index d8cc31a685..d6d2a4e08f 100644 --- a/fdbserver/RestoreWorker.actor.cpp +++ b/fdbserver/RestoreWorker.actor.cpp @@ -250,7 +250,7 @@ ACTOR Future startRestoreWorker(Reference self, Restore // RestoreMaster is the leader ACTOR Future monitorleader(Reference> leader, Database cx, RestoreWorkerInterface myWorkerInterf) { - wait(delay(5.0)); + wait(delay(5.0)); TraceEvent("FastRestoreWorker", myWorkerInterf.id()).detail("MonitorLeader", "StartLeaderElection"); state int count = 0; loop { @@ -285,7 +285,10 @@ ACTOR Future monitorleader(Reference> lea } } - TraceEvent("FastRestoreWorker", myWorkerInterf.id()).detail("MonitorLeader", "FinishLeaderElection").detail("Leader", leaderInterf.id()).detail("IamLeader", leaderInterf == myWorkerInterf); + TraceEvent("FastRestoreWorker", myWorkerInterf.id()) + .detail("MonitorLeader", "FinishLeaderElection") + .detail("Leader", leaderInterf.id()) + .detail("IamLeader", leaderInterf == myWorkerInterf); return Void(); } @@ -328,7 +331,7 @@ ACTOR Future restoreWorker(Reference connFile, Loca try { Database cx = Database::createDatabase(connFile, Database::API_VERSION_LATEST, true, locality); wait(reportErrors(_restoreWorker(cx, locality), "RestoreWorker")); - } catch (Error& e) { + } catch (Error& e) { TraceEvent("FastRestoreWorker").detail("Error", e.what()); throw e; } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 9bb469ee06..aee3995695 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1516,10 +1516,10 @@ ACTOR Future fdbd( // SOMEDAY: start the services on the machine in a staggered fashion in simulation? // Endpoints should be registered first before any process trying to connect to it. // So coordinationServer actor should be the first one executed before any other. - if ( coordFolder.size() ) { - // SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up their files - actors.push_back(fileNotFoundToNever( - coordinationServer(coordFolder))); + if (coordFolder.size()) { + // SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up + // their files + actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder))); } state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));