FastRestore:Resolve review comments
This commit is contained in:
parent
898a1ea3ed
commit
6bd4703a9f
|
@ -126,8 +126,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
// mutations in log file is in [beginVersion, endVersion], both inclusive.
|
||||
ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion);
|
||||
// Loader sends the endVersion to ensure all useful versions are sent
|
||||
ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) ||
|
||||
(!req.isRangeFile && commitVersion <= req.asset.endVersion));
|
||||
ASSERT_WE_THINK(commitVersion <= req.asset.endVersion);
|
||||
|
||||
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
|
||||
MutationRef mutation = mutations[mIndex];
|
||||
|
@ -196,7 +195,6 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state std::vector<Future<Optional<Value>>> fValues;
|
||||
state std::vector<Optional<Value>> values;
|
||||
state int i = 0;
|
||||
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
|
||||
.detail("GetKeys", imcompleteStagingKeys.size());
|
||||
|
@ -208,10 +206,7 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
for (auto& key : imcompleteStagingKeys) {
|
||||
fValues.push_back(tr->get(key.first));
|
||||
}
|
||||
for (i = 0; i < fValues.size(); i++) {
|
||||
Optional<Value> val = wait(fValues[i]);
|
||||
values.push_back(val);
|
||||
}
|
||||
wait(waitForAll(fValues));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
|
||||
|
@ -223,10 +218,10 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(values.size() == imcompleteStagingKeys.size());
|
||||
ASSERT(fValues.size() == imcompleteStagingKeys.size());
|
||||
int i = 0;
|
||||
for (auto& key : imcompleteStagingKeys) {
|
||||
if (!values[i].present()) {
|
||||
if (!fValues[i].get().present()) {
|
||||
TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
|
||||
.detail("Key", key.first)
|
||||
.detail("Reason", "Not found in DB")
|
||||
|
|
|
@ -29,8 +29,8 @@
|
|||
|
||||
#include <sstream>
|
||||
#include "flow/Stats.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Atomic.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
|
@ -241,10 +241,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
|
||||
void addMutation(MutationRef m, Version ver) {
|
||||
if (!isRangeMutation(m)) {
|
||||
if (stagingKeys.find(m.param1) == stagingKeys.end()) {
|
||||
stagingKeys.emplace(m.param1, StagingKey());
|
||||
}
|
||||
stagingKeys[m.param1].add(m, ver);
|
||||
auto item = stagingKeys.emplace(m.param1, StagingKey());
|
||||
item.first->second.add(m, ver);
|
||||
} else {
|
||||
stagingKeyRanges.insert(StagingKeyRange(m, ver));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue