FastRestore:ApplyToDB:BugFix:Serialize integer as bigEndian to ensure lexico order

This commit is contained in:
Meng Xu 2019-11-03 17:16:21 -08:00
parent b326d26a47
commit 58aa6711e4
3 changed files with 40 additions and 5 deletions

View File

@ -643,11 +643,19 @@ const KeyRef restoreApplierTxnValue = LiteralStringRef("1");
// restoreApplierKeys: track atomic transaction progress to ensure applying atomicOp exactly once
const Key restoreApplierKeyFor(UID const& applierID, Version version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreWorkersKeys.begin);
wr.serializeBytes(restoreApplierKeys.begin);
wr << applierID << version;
return wr.toValue();
}
std::pair<UID, Version> decodeRestoreApplierKey(ValueRef const& key) {
BinaryReader rd(key, Unversioned());
UID applierID;
Version version;
rd >> applierID >> version;
return std::make_pair(applierID, version);
}
// Encode restore worker key for workerID
const Key restoreWorkerKeyFor(UID const& workerID) {
BinaryWriter wr(Unversioned());

View File

@ -298,6 +298,7 @@ extern const KeyRangeRef restoreApplierKeys;
extern const KeyRef restoreApplierTxnValue;
const Key restoreApplierKeyFor(UID const& applierID, Version version);
std::pair<UID, Version> decodeRestoreApplierKey(ValueRef const& key);
const Key restoreWorkerKeyFor(UID const& workerID);
const Value restoreWorkerInterfaceValue(RestoreWorkerInterface const& server);
RestoreWorkerInterface decodeRestoreWorkerInterfaceValue(ValueRef const& value);

View File

@ -271,6 +271,30 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
}
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
// Sanity check the restoreApplierKeys, which should be empty at this point
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Key begin = restoreApplierKeyFor(
self->id(), bigEndian64(0)); // Integer must be BigEndian to maintain ordering in lexical order
Key end = restoreApplierKeyFor(self->id(), bigEndian64(std::numeric_limits<int64_t>::max()));
Standalone<RangeResultRef> txnIds = wait(tr->getRange(KeyRangeRef(begin, end), CLIENT_KNOBS->TOO_MANY));
if (txnIds.size() > 0) {
TraceEvent(SevError, "FastRestore_ApplyTxnStateNotClean").detail("TxnIds", txnIds.size());
for (auto& kv : txnIds) {
std::pair<UID, Version> applierInfo = decodeRestoreApplierKey(kv.key);
TraceEvent(SevError, "FastRestore_ApplyTxnStateNotClean")
.detail("Applier", applierInfo.first)
.detail("ResidueTxnID", applierInfo.second);
}
}
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
loop { // Transaction retry loop
try {
@ -279,7 +303,8 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> txnSucceeded = wait(tr->get(restoreApplierKeyFor(self->id(), progress.curTxnId)));
Optional<Value> txnSucceeded =
wait(tr->get(restoreApplierKeyFor(self->id(), bigEndian64(progress.curTxnId))));
if (!txnSucceeded.present()) {
progress.rollback();
continue;
@ -305,7 +330,7 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
.detail("Version", progress.curItInCurTxn->first);
// restoreApplierKeyFor(self->id(), curTxnId) to tell if txn succeeds at an unknown error
tr->set(restoreApplierKeyFor(self->id(), progress.curTxnId), restoreApplierTxnValue);
tr->set(restoreApplierKeyFor(self->id(), bigEndian64(progress.curTxnId)), restoreApplierTxnValue);
while (1) { // Loop: Accumulate mutations in a transaction
MutationRef m = progress.getCurrentMutation();
@ -383,8 +408,9 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->clear(KeyRangeRef(restoreApplierKeyFor(self->id(), 0),
restoreApplierKeyFor(self->id(), progress.curTxnId + 1)));
// Clear txnIds in [0, progress.curTxnId). We add 100 to curTxnId just to be safe.
tr->clear(KeyRangeRef(restoreApplierKeyFor(self->id(), bigEndian64(0)),
restoreApplierKeyFor(self->id(), bigEndian64(progress.curTxnId + 100))));
wait(tr->commit());
break;
} catch (Error& e) {