FastRestore:Add counters for applier and disable FlowLock on applyStagingKeysBatch
This commit is contained in:
parent
8f842a66e4
commit
ecd2d8b239
|
@ -214,13 +214,13 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
|
||||
ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, double delayTime, Database cx,
|
||||
UID applierID, int batchIndex) {
|
||||
UID applierID, int batchIndex, ApplierBatchData::Counters* cc) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state std::vector<Future<Optional<Value>>> fValues(incompleteStagingKeys.size(), Never());
|
||||
state int retries = 0;
|
||||
state UID randomID = deterministicRandom()->randomUniqueID();
|
||||
|
||||
wait(delay(delayTime + deterministicRandom()->random01() * delayTime));
|
||||
wait(delay(deterministicRandom()->random01() * delayTime));
|
||||
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
|
||||
.detail("RandomUID", randomID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
|
@ -234,10 +234,13 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
for (auto& key : incompleteStagingKeys) {
|
||||
fValues[i++] = tr->get(key.first);
|
||||
cc->fetchKeys += 1;
|
||||
}
|
||||
wait(waitForAll(fValues));
|
||||
cc->fetchTxns += 1;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
cc->fetchTxnRetries += 1;
|
||||
if (retries++ > incompleteStagingKeys.size()) {
|
||||
TraceEvent(SevWarnAlways, "GetAndComputeStagingKeys", applierID)
|
||||
.suppressFor(1.0)
|
||||
|
@ -354,24 +357,23 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys;
|
||||
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
|
||||
int numKeysInBatch = 0;
|
||||
double delayTime = 0; // Start transactions at different time to avoid overwelming FDB.
|
||||
double delayTime = 0; // Start transactions at different time to avoid overwhelming FDB.
|
||||
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
|
||||
if (!stagingKeyIter->second.hasBaseValue()) {
|
||||
incompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter);
|
||||
batchData->counters.fetchKeys += 1;
|
||||
numKeysInBatch++;
|
||||
}
|
||||
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
|
||||
fGetAndComputeKeys.push_back(
|
||||
getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex));
|
||||
delayTime += 0.1;
|
||||
getAndComputeStagingKeys(incompleteStagingKeys, 0.1, cx, applierID, batchIndex, &batchData->counters));
|
||||
delayTime += 0.1; // TODO: Delete this because we may not need this
|
||||
numKeysInBatch = 0;
|
||||
incompleteStagingKeys.clear();
|
||||
}
|
||||
}
|
||||
if (numKeysInBatch > 0) {
|
||||
fGetAndComputeKeys.push_back(
|
||||
getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex));
|
||||
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID,
|
||||
batchIndex, &batchData->counters));
|
||||
}
|
||||
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
|
@ -400,9 +402,10 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
// Apply mutations in batchData->stagingKeys [begin, end).
|
||||
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
|
||||
std::map<Key, StagingKey>::iterator end, Database cx,
|
||||
FlowLock* applyStagingKeysBatchLock, UID applierID) {
|
||||
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB));
|
||||
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
||||
FlowLock* applyStagingKeysBatchLock, UID applierID,
|
||||
ApplierBatchData::Counters* cc) {
|
||||
// wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock?
|
||||
// state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state int sets = 0;
|
||||
state int clears = 0;
|
||||
|
@ -416,6 +419,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
while (iter != end) {
|
||||
if (iter->second.type == MutationRef::SetValue) {
|
||||
tr->set(iter->second.key, iter->second.val);
|
||||
cc->appliedMutations += 1;
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
|
||||
.detail("SetKey", iter->second.key);
|
||||
sets++;
|
||||
|
@ -428,6 +432,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
.detail("SubVersion", iter->second.version.sub);
|
||||
}
|
||||
tr->clear(singleKeyRange(iter->second.key));
|
||||
cc->appliedMutations += 1;
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID)
|
||||
.detail("ClearKey", iter->second.key);
|
||||
clears++;
|
||||
|
@ -449,8 +454,10 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
.detail("Sets", sets)
|
||||
.detail("Clears", clears);
|
||||
wait(tr->commit());
|
||||
cc->appliedTxns += 1;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
cc->appliedTxnRetries += 1;
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
@ -470,14 +477,16 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
while (cur != batchData->stagingKeys.end()) {
|
||||
txnSize += cur->second.expectedMutationSize();
|
||||
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters));
|
||||
begin = cur;
|
||||
txnSize = 0;
|
||||
}
|
||||
cur++;
|
||||
}
|
||||
if (begin != batchData->stagingKeys.end()) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters));
|
||||
}
|
||||
|
||||
wait(waitForAll(fBatches));
|
||||
|
|
|
@ -258,15 +258,18 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
CounterCollection cc;
|
||||
Counter receivedBytes, receivedWeightedBytes, receivedMutations, receivedAtomicOps;
|
||||
Counter appliedWeightedBytes, appliedMutations, appliedAtomicOps;
|
||||
Counter appliedTxns;
|
||||
Counter fetchKeys; // number of keys to fetch from dest. FDB cluster.
|
||||
Counter appliedTxns, appliedTxnRetries;
|
||||
Counter fetchKeys, fetchTxns, fetchTxnRetries; // number of keys to fetch from dest. FDB cluster.
|
||||
// TODO: Add the counter in applying phase
|
||||
|
||||
Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex)
|
||||
: cc("ApplierBatch", applierInterfID.toString() + ":" + std::to_string(batchIndex)),
|
||||
receivedBytes("ReceivedBytes", cc), receivedMutations("ReceivedMutations", cc),
|
||||
receivedAtomicOps("ReceivedAtomicOps", cc), receivedWeightedBytes("ReceivedWeightedMutations", cc),
|
||||
appliedWeightedBytes("AppliedWeightedBytes", cc), appliedMutations("AppliedMutations", cc),
|
||||
appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc), fetchKeys("FetchKeys", cc) {}
|
||||
appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc),
|
||||
appliedTxnRetries("AppliedTxnRetries", cc), fetchKeys("FetchKeys", cc), fetchTxns("FetchTxns", cc),
|
||||
fetchTxnRetries("FetchTxnRetries", cc) {}
|
||||
} counters;
|
||||
|
||||
void addref() { return ReferenceCounted<ApplierBatchData>::addref(); }
|
||||
|
|
|
@ -139,6 +139,7 @@ ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatch
|
|||
|
||||
ACTOR Future<Void> traceProcessMetrics(Reference<RestoreRoleData> self, std::string role) {
|
||||
loop {
|
||||
// TODO: Add node ID and batchIndex
|
||||
TraceEvent("FastRestoreTraceProcessMetrics")
|
||||
.detail("Role", role)
|
||||
.detail("Node", self->nodeID)
|
||||
|
|
Loading…
Reference in New Issue