FastRestore:resolve review comments
1) wait on whenAtLeast; 2) Put BigEndian64 into the function call and the decoder to prevent future people from making the same mistake.
This commit is contained in:
parent
eb67886b75
commit
630c29d160
|
@ -641,11 +641,11 @@ const KeyRangeRef restoreApplierKeys(LiteralStringRef("\xff\x02/restoreApplier/"
|
|||
const KeyRef restoreApplierTxnValue = LiteralStringRef("1");
|
||||
|
||||
// restoreApplierKeys: track atomic transaction progress to ensure applying atomicOp exactly once
|
||||
// Version integer must be BigEndian to maintain ordering in lexical order
|
||||
// Version is passed in as LittleEndian, it must be converted to BigEndian to maintain ordering in lexical order
|
||||
const Key restoreApplierKeyFor(UID const& applierID, Version version) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
wr.serializeBytes(restoreApplierKeys.begin);
|
||||
wr << applierID << version;
|
||||
wr << applierID << bigEndian64(version);
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
|
@ -654,7 +654,7 @@ std::pair<UID, Version> decodeRestoreApplierKey(ValueRef const& key) {
|
|||
UID applierID;
|
||||
Version version;
|
||||
rd >> applierID >> version;
|
||||
return std::make_pair(applierID, version);
|
||||
return std::make_pair(applierID, bigEndian64(version));
|
||||
}
|
||||
|
||||
// Encode restore worker key for workerID
|
||||
|
|
|
@ -65,7 +65,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(applierInterf.initVersionBatch.getFuture())) {
|
||||
requestTypeStr = "initVersionBatch";
|
||||
handleInitVersionBatchRequest(req, self);
|
||||
wait(handleInitVersionBatchRequest(req, self));
|
||||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(applierInterf.finishRestore.getFuture())) {
|
||||
requestTypeStr = "finishRestore";
|
||||
|
@ -277,9 +277,8 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
|
|||
tr->reset();
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
// Version integer must be BigEndian to maintain ordering in lexical order
|
||||
Key begin = restoreApplierKeyFor(self->id(), bigEndian64(0));
|
||||
Key end = restoreApplierKeyFor(self->id(), bigEndian64(std::numeric_limits<int64_t>::max()));
|
||||
Key begin = restoreApplierKeyFor(self->id(), 0);
|
||||
Key end = restoreApplierKeyFor(self->id(), std::numeric_limits<int64_t>::max());
|
||||
Standalone<RangeResultRef> txnIds = wait(tr->getRange(KeyRangeRef(begin, end), CLIENT_KNOBS->TOO_MANY));
|
||||
if (txnIds.size() > 0) {
|
||||
TraceEvent(SevError, "FastRestore_ApplyTxnStateNotClean").detail("TxnIds", txnIds.size());
|
||||
|
@ -287,7 +286,7 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
|
|||
std::pair<UID, Version> applierInfo = decodeRestoreApplierKey(kv.key);
|
||||
TraceEvent(SevError, "FastRestore_ApplyTxnStateNotClean")
|
||||
.detail("Applier", applierInfo.first)
|
||||
.detail("ResidueTxnID", bigEndian64(applierInfo.second));
|
||||
.detail("ResidueTxnID", applierInfo.second);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -303,8 +302,7 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
|
|||
tr->reset();
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> txnSucceeded =
|
||||
wait(tr->get(restoreApplierKeyFor(self->id(), bigEndian64(progress.curTxnId))));
|
||||
Optional<Value> txnSucceeded = wait(tr->get(restoreApplierKeyFor(self->id(), progress.curTxnId)));
|
||||
if (!txnSucceeded.present()) {
|
||||
progress.rollback();
|
||||
continue;
|
||||
|
@ -330,7 +328,7 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
|
|||
.detail("Version", progress.curItInCurTxn->first);
|
||||
|
||||
// restoreApplierKeyFor(self->id(), curTxnId) to tell if txn succeeds at an unknown error
|
||||
tr->set(restoreApplierKeyFor(self->id(), bigEndian64(progress.curTxnId)), restoreApplierTxnValue);
|
||||
tr->set(restoreApplierKeyFor(self->id(), progress.curTxnId), restoreApplierTxnValue);
|
||||
|
||||
while (1) { // Loop: Accumulate mutations in a transaction
|
||||
MutationRef m = progress.getCurrentMutation();
|
||||
|
@ -409,8 +407,8 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
// Clear txnIds in [0, progress.curTxnId). We add 100 to curTxnId just to be safe.
|
||||
tr->clear(KeyRangeRef(restoreApplierKeyFor(self->id(), bigEndian64(0)),
|
||||
restoreApplierKeyFor(self->id(), bigEndian64(progress.curTxnId + 100))));
|
||||
tr->clear(KeyRangeRef(restoreApplierKeyFor(self->id(), 0),
|
||||
restoreApplierKeyFor(self->id(), progress.curTxnId + 100)));
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -86,7 +86,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
|
|||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(loaderInterf.initVersionBatch.getFuture())) {
|
||||
requestTypeStr = "initVersionBatch";
|
||||
handleInitVersionBatchRequest(req, self);
|
||||
wait(handleInitVersionBatchRequest(req, self));
|
||||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(loaderInterf.finishRestore.getFuture())) {
|
||||
requestTypeStr = "finishRestore";
|
||||
|
|
|
@ -55,9 +55,9 @@ void handleFinishRestoreRequest(const RestoreVersionBatchRequest& req, Reference
|
|||
req.reply.send(RestoreCommonReply(self->id()));
|
||||
}
|
||||
|
||||
void handleInitVersionBatchRequest(const RestoreVersionBatchRequest& req, Reference<RestoreRoleData> self) {
|
||||
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self) {
|
||||
// batchId is continuous. (req.batchID-1) is the id of the just finished batch.
|
||||
self->versionBatchId.whenAtLeast(req.batchID - 1);
|
||||
wait(self->versionBatchId.whenAtLeast(req.batchID - 1));
|
||||
|
||||
if (self->versionBatchId.get() == req.batchID - 1) {
|
||||
self->resetPerVersionBatch();
|
||||
|
@ -69,6 +69,7 @@ void handleInitVersionBatchRequest(const RestoreVersionBatchRequest& req, Refere
|
|||
}
|
||||
|
||||
req.reply.send(RestoreCommonReply(self->id()));
|
||||
return Void();
|
||||
}
|
||||
|
||||
//-------Helper functions
|
||||
|
|
|
@ -55,7 +55,7 @@ struct RestoreSimpleRequest;
|
|||
typedef std::map<Version, Standalone<VectorRef<MutationRef>>> VersionedMutationsMap;
|
||||
|
||||
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
|
||||
void handleInitVersionBatchRequest(const RestoreVersionBatchRequest& req, Reference<RestoreRoleData> self);
|
||||
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
|
||||
void handleFinishRestoreRequest(const RestoreVersionBatchRequest& req, Reference<RestoreRoleData> self);
|
||||
|
||||
// Helper class for reading restore data from a buffer and throwing the right errors.
|
||||
|
|
Loading…
Reference in New Issue