FastRestoreApplier:Calculate conflict range in applyStagingKeysBatch
This commit is contained in:
parent
2febbe74ce
commit
5c5abd7afa
|
@ -467,7 +467,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
|
||||
std::map<Key, StagingKey>::iterator end, Database cx,
|
||||
FlowLock* applyStagingKeysBatchLock, UID applierID,
|
||||
ApplierBatchData::Counters* cc, KeyRangeRef range) {
|
||||
ApplierBatchData::Counters* cc) {
|
||||
if (SERVER_KNOBS->FASTRESTORE_NOT_WRITE_DB) {
|
||||
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchSkipped", applierID).detail("Begin", begin->first);
|
||||
ASSERT(!g_network->isSimulated());
|
||||
|
@ -484,7 +484,6 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->addWriteConflictRange(range); // Reduce resolver load
|
||||
std::map<Key, StagingKey>::iterator iter = begin;
|
||||
while (iter != end) {
|
||||
if (iter->second.type == MutationRef::SetValue) {
|
||||
|
@ -523,6 +522,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
.detail("End", endKey)
|
||||
.detail("Sets", sets)
|
||||
.detail("Clears", clears);
|
||||
tr->addWriteConflictRange(KeyRangeRef(begin->first, keyAfter(endKey))); // Reduce resolver load
|
||||
wait(tr->commit());
|
||||
cc->appliedTxns += 1;
|
||||
break;
|
||||
|
@ -549,7 +549,7 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
txnSize += cur->second.expectedMutationSize();
|
||||
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters, KeyRangeRef(begin->first, cur->first)));
|
||||
&batchData->counters));
|
||||
batchData->counters.appliedBytes += txnSize;
|
||||
batchData->appliedBytes += txnSize;
|
||||
begin = cur;
|
||||
|
@ -559,9 +559,8 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
cur++;
|
||||
}
|
||||
if (begin != batchData->stagingKeys.end()) {
|
||||
KeyRangeRef range(begin->first, keyAfter(batchData->stagingKeys.rbegin()->first));
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters, range));
|
||||
&batchData->counters));
|
||||
batchData->counters.appliedBytes += txnSize;
|
||||
batchData->appliedBytes += txnSize;
|
||||
txnBatches++;
|
||||
|
|
|
@ -39,8 +39,8 @@
|
|||
#define SevFRMutationInfo SevVerbose
|
||||
//#define SevFRMutationInfo SevInfo
|
||||
|
||||
#define SevFRDebugInfo SevVerbose
|
||||
//#define SevFRDebugInfo SevInfo
|
||||
//#define SevFRDebugInfo SevVerbose
|
||||
#define SevFRDebugInfo SevInfo
|
||||
|
||||
struct VersionedMutation {
|
||||
MutationRef mutation;
|
||||
|
|
Loading…
Reference in New Issue