FastRestoreApplier:Add applierID and batchIndex for precompute stage
This commit is contained in:
parent
abda13e9df
commit
d22af629cd
|
@ -188,12 +188,14 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
||||||
|
|
||||||
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
|
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
|
||||||
ACTOR static Future<Void> getAndComputeStagingKeys(
|
ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||||
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, Database cx, UID applierID) {
|
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, Database cx, UID applierID,
|
||||||
|
int batchIndex) {
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||||
state std::vector<Future<Optional<Value>>> fValues;
|
state std::vector<Future<Optional<Value>>> fValues;
|
||||||
state int retries = 0;
|
state int retries = 0;
|
||||||
|
|
||||||
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
.detail("GetKeys", incompleteStagingKeys.size());
|
.detail("GetKeys", incompleteStagingKeys.size());
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
|
@ -207,7 +209,8 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||||
break;
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
if (retries++ > 10) {
|
if (retries++ > 10) {
|
||||||
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysGetKeysStuck")
|
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysGetKeysStuck", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
.detail("GetKeys", incompleteStagingKeys.size())
|
.detail("GetKeys", incompleteStagingKeys.size())
|
||||||
.error(e);
|
.error(e);
|
||||||
break;
|
break;
|
||||||
|
@ -221,7 +224,8 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (auto& key : incompleteStagingKeys) {
|
for (auto& key : incompleteStagingKeys) {
|
||||||
if (!fValues[i].get().present()) { // Debug info to understand which key does not exist in DB
|
if (!fValues[i].get().present()) { // Debug info to understand which key does not exist in DB
|
||||||
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB")
|
TraceEvent(SevWarn, "FastRestoreApplierGetAndComputeStagingKeysNoBaseValueInDB", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
.detail("Key", key.first)
|
.detail("Key", key.first)
|
||||||
.detail("Reason", "Not found in DB")
|
.detail("Reason", "Not found in DB")
|
||||||
.detail("PendingMutations", key.second->second.pendingMutations.size())
|
.detail("PendingMutations", key.second->second.pendingMutations.size())
|
||||||
|
@ -231,7 +235,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||||
.detail("PendingMutationVersion", vm.first.toString())
|
.detail("PendingMutationVersion", vm.first.toString())
|
||||||
.detail("PendingMutation", vm.second.toString());
|
.detail("PendingMutation", vm.second.toString());
|
||||||
}
|
}
|
||||||
key.second->second.precomputeResult("GetAndComputeStagingKeysNoBaseValueInDB");
|
key.second->second.precomputeResult("GetAndComputeStagingKeysNoBaseValueInDB", applierID, batchIndex);
|
||||||
i++;
|
i++;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -239,12 +243,13 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
||||||
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
|
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
|
||||||
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
|
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
|
||||||
key.second->second.add(m, LogMessageVersion(1));
|
key.second->second.add(m, LogMessageVersion(1));
|
||||||
key.second->second.precomputeResult("GetAndComputeStagingKeys");
|
key.second->second.precomputeResult("GetAndComputeStagingKeys", applierID, batchIndex);
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
|
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
.detail("GetKeys", incompleteStagingKeys.size());
|
.detail("GetKeys", incompleteStagingKeys.size());
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
|
@ -289,7 +294,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
||||||
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param2);
|
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param2);
|
||||||
while (lb != ub) {
|
while (lb != ub) {
|
||||||
if (lb->first >= rangeMutation.mutation.param2) {
|
if (lb->first >= rangeMutation.mutation.param2) {
|
||||||
TraceEvent(SevError, "FastRestoreApplerPhasePrecomputeMutationsResult_IncorrectUpperBound")
|
TraceEvent(SevError, "FastRestoreApplerPhasePrecomputeMutationsResultIncorrectUpperBound")
|
||||||
.detail("Key", lb->first)
|
.detail("Key", lb->first)
|
||||||
.detail("ClearRangeUpperBound", rangeMutation.mutation.param2)
|
.detail("ClearRangeUpperBound", rangeMutation.mutation.param2)
|
||||||
.detail("UsedUpperBound", ub->first);
|
.detail("UsedUpperBound", ub->first);
|
||||||
|
@ -320,13 +325,13 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
||||||
numKeysInBatch++;
|
numKeysInBatch++;
|
||||||
}
|
}
|
||||||
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
|
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
|
||||||
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID));
|
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID, batchIndex));
|
||||||
numKeysInBatch = 0;
|
numKeysInBatch = 0;
|
||||||
incompleteStagingKeys.clear();
|
incompleteStagingKeys.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (numKeysInBatch > 0) {
|
if (numKeysInBatch > 0) {
|
||||||
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID));
|
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID, batchIndex));
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||||
|
@ -337,7 +342,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
||||||
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
|
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
|
||||||
stagingKeyIter++) {
|
stagingKeyIter++) {
|
||||||
if (stagingKeyIter->second.hasBaseValue()) {
|
if (stagingKeyIter->second.hasBaseValue()) {
|
||||||
stagingKeyIter->second.precomputeResult("HasBaseValue");
|
stagingKeyIter->second.precomputeResult("HasBaseValue", applierID, batchIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -117,8 +117,9 @@ struct StagingKey {
|
||||||
|
|
||||||
// Precompute the final value of the key.
|
// Precompute the final value of the key.
|
||||||
// TODO: Look at the last LogMessageVersion, if it set or clear, we can ignore the rest of versions.
|
// TODO: Look at the last LogMessageVersion, if it set or clear, we can ignore the rest of versions.
|
||||||
void precomputeResult(const char* context) {
|
void precomputeResult(const char* context, UID applierID, int batchIndex) {
|
||||||
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
|
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
.detail("Context", context)
|
.detail("Context", context)
|
||||||
.detail("Version", version.toString())
|
.detail("Version", version.toString())
|
||||||
.detail("Key", key)
|
.detail("Key", key)
|
||||||
|
@ -136,7 +137,9 @@ struct StagingKey {
|
||||||
MutationRef m = lb->second;
|
MutationRef m = lb->second;
|
||||||
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
|
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
|
||||||
if (std::tie(type, key, val) != std::tie(m.type, m.param1, m.param2)) {
|
if (std::tie(type, key, val) != std::tie(m.type, m.param1, m.param2)) {
|
||||||
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnhandledSituation")
|
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnhandledSituation", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
|
.detail("Context", context)
|
||||||
.detail("BufferedType", getTypeString(type))
|
.detail("BufferedType", getTypeString(type))
|
||||||
.detail("PendingType", getTypeString(m.type))
|
.detail("PendingType", getTypeString(m.type))
|
||||||
.detail("BufferedVal", val.toString())
|
.detail("BufferedVal", val.toString())
|
||||||
|
@ -167,11 +170,15 @@ struct StagingKey {
|
||||||
type = MutationRef::SetValue; // Precomputed result should be set to DB.
|
type = MutationRef::SetValue; // Precomputed result should be set to DB.
|
||||||
} else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) {
|
} else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) {
|
||||||
type = MutationRef::SetValue; // Precomputed result should be set to DB.
|
type = MutationRef::SetValue; // Precomputed result should be set to DB.
|
||||||
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet")
|
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
|
.detail("Context", context)
|
||||||
.detail("MutationType", getTypeString(mutation.type))
|
.detail("MutationType", getTypeString(mutation.type))
|
||||||
.detail("Version", lb->first.toString());
|
.detail("Version", lb->first.toString());
|
||||||
} else {
|
} else {
|
||||||
TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation")
|
TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation", applierID)
|
||||||
|
.detail("BatchIndex", batchIndex)
|
||||||
|
.detail("Context", context)
|
||||||
.detail("MutationType", getTypeString(mutation.type))
|
.detail("MutationType", getTypeString(mutation.type))
|
||||||
.detail("Version", lb->first.toString());
|
.detail("Version", lb->first.toString());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue