FastRestore:Apply clang-format
This commit is contained in:
parent
03f699f2f9
commit
fe75a4cafb
|
@ -37,7 +37,7 @@
|
|||
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
|
||||
Reference<RestoreApplierData> self);
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req,
|
||||
Reference<RestoreApplierData> self);
|
||||
Reference<RestoreApplierData> self);
|
||||
ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference<RestoreApplierData> self,
|
||||
Database cx);
|
||||
|
||||
|
@ -104,7 +104,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions;
|
||||
// Only one actor can process mutations from the same file
|
||||
ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req,
|
||||
Reference<RestoreApplierData> self) {
|
||||
Reference<RestoreApplierData> self) {
|
||||
state Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
|
||||
// Assume: processedFileState[req.asset] will not be erased while the actor is active.
|
||||
// Note: Insert new items into processedFileState will not invalidate the reference.
|
||||
|
@ -142,7 +142,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersioned
|
|||
batchData->counters.receivedBytes += mutation.totalSize();
|
||||
batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified
|
||||
batchData->counters.receivedMutations += 1;
|
||||
batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type) mutation.type) ? 1 : 0;
|
||||
batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0;
|
||||
// Sanity check
|
||||
if (g_network->isSimulated()) {
|
||||
if (isRangeMutation(mutation)) {
|
||||
|
@ -197,7 +197,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
//state std::vector<std::pair<Key, Future<Optional<Value>>>> fKVs;
|
||||
// state std::vector<std::pair<Key, Future<Optional<Value>>>> fKVs;
|
||||
state std::vector<Future<Optional<Value>>> fValues;
|
||||
state std::vector<Optional<Value>> values;
|
||||
state int i = 0;
|
||||
|
@ -209,8 +209,8 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
for (auto& key : imcompleteStagingKeys) {
|
||||
//fKVs.push_back(std::make_pair(key.first, tr->get(key.first)));
|
||||
//fValues.push_back(fKVs.back().second);
|
||||
// fKVs.push_back(std::make_pair(key.first, tr->get(key.first)));
|
||||
// fValues.push_back(fKVs.back().second);
|
||||
fValues.push_back(tr->get(key.first));
|
||||
}
|
||||
for (i = 0; i < fValues.size(); i++) {
|
||||
|
@ -220,9 +220,9 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
|
||||
.detail("GetKeys", imcompleteStagingKeys.size())
|
||||
.detail("Error", e.what())
|
||||
.detail("ErrorCode", e.code());
|
||||
.detail("GetKeys", imcompleteStagingKeys.size())
|
||||
.detail("Error", e.what())
|
||||
.detail("ErrorCode", e.code());
|
||||
wait(tr->onError(e));
|
||||
fValues.clear();
|
||||
}
|
||||
|
@ -231,7 +231,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
ASSERT(values.size() == imcompleteStagingKeys.size());
|
||||
// TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map
|
||||
int i = 0;
|
||||
for(auto& key : imcompleteStagingKeys) {
|
||||
for (auto& key : imcompleteStagingKeys) {
|
||||
if (!values[i].present()) {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
|
||||
.detail("Key", key.first)
|
||||
|
@ -264,14 +264,13 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
return Void();
|
||||
}
|
||||
|
||||
|
||||
ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData> batchData, UID applierID,
|
||||
int64_t batchIndex, Database cx) {
|
||||
// Apply range mutations (i.e., clearRange) to database cx
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Applying clear range mutations to DB")
|
||||
.detail("ClearRanges", batchData->stagingKeyRanges.size());
|
||||
.detail("ClearRanges", batchData->stagingKeyRanges.size());
|
||||
state std::vector<Future<Void>> fClearRanges;
|
||||
std::vector<Standalone<VectorRef<KeyRangeRef>>> clearBuf;
|
||||
clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>());
|
||||
|
@ -296,7 +295,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Applying clear range mutations to staging keys")
|
||||
.detail("ClearRanges", batchData->stagingKeyRanges.size());
|
||||
.detail("ClearRanges", batchData->stagingKeyRanges.size());
|
||||
for (auto& rangeMutation : batchData->stagingKeyRanges) {
|
||||
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
|
||||
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2);
|
||||
|
@ -310,7 +309,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Getting and computing staging keys")
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
|
||||
// Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value
|
||||
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys;
|
||||
|
@ -326,7 +325,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Compute the other staging keys")
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
// Pre-compute pendingMutations to other keys in stagingKeys that has base value
|
||||
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
|
||||
stagingKeyIter++) {
|
||||
|
@ -374,10 +373,16 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
}
|
||||
iter++;
|
||||
if (sets > 10000000 || clears > 10000000) {
|
||||
TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears);
|
||||
TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID)
|
||||
.detail("Begin", begin->first)
|
||||
.detail("Sets", sets)
|
||||
.detail("Clears", clears);
|
||||
}
|
||||
}
|
||||
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears);
|
||||
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID)
|
||||
.detail("Begin", begin->first)
|
||||
.detail("Sets", sets)
|
||||
.detail("Clears", clears);
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -394,7 +399,9 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
std::map<Key, StagingKey>::iterator cur = begin;
|
||||
double txnSize = 0;
|
||||
std::vector<Future<Void>> fBatches;
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size());
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
while (cur != batchData->stagingKeys.end()) {
|
||||
txnSize += cur->second.expectedMutationSize();
|
||||
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
||||
|
@ -410,7 +417,9 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
|
||||
wait(waitForAll(fBatches));
|
||||
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size());
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue