parent
09db06c476
commit
e277ce81d5
|
@ -1064,5 +1064,3 @@ const KeyRangeRef testOnlyTxnStateStorePrefixRange(
|
|||
const KeyRef writeRecoveryKey = LiteralStringRef("\xff/writeRecovery");
|
||||
const ValueRef writeRecoveryKeyTrue = LiteralStringRef("1");
|
||||
const KeyRef snapshotEndVersionKey = LiteralStringRef("\xff/snapshotEndVersion");
|
||||
const KeyRef snapSourceClusterKey = LiteralStringRef("\xff/snapSourceCluster");
|
||||
const ValueRef snapSourceClusterKeyTrue = LiteralStringRef("1");
|
||||
|
|
|
@ -400,8 +400,6 @@ extern const KeyRangeRef testOnlyTxnStateStorePrefixRange;
|
|||
extern const KeyRef writeRecoveryKey;
|
||||
extern const ValueRef writeRecoveryKeyTrue;
|
||||
extern const KeyRef snapshotEndVersionKey;
|
||||
extern const KeyRef snapSourceClusterKey;
|
||||
extern const ValueRef snapSourceClusterKeyTrue;
|
||||
|
||||
#pragma clang diagnostic pop
|
||||
|
||||
|
|
|
@ -308,10 +308,6 @@ void applyMetadataMutations(SpanID const& spanContext, UID const& dbgid, Arena&
|
|||
TraceEvent("WriteRecoveryKeySet", dbgid);
|
||||
if (!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
TEST(true); // Snapshot created, setting writeRecoveryKey in txnStateStore
|
||||
} else if (m.param1 == snapSourceClusterKey) {
|
||||
TraceEvent("SnapSourceClusterKeySet", dbgid);
|
||||
if (!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
TEST(true); // Snapshot created, setting snapSourceKey in txnStateStore
|
||||
}
|
||||
} else if (m.param2.size() > 1 && m.param2[0] == systemKeys.begin[0] && m.type == MutationRef::ClearRange) {
|
||||
KeyRangeRef range(m.param1, m.param2);
|
||||
|
@ -398,9 +394,6 @@ void applyMetadataMutations(SpanID const& spanContext, UID const& dbgid, Arena&
|
|||
if (range.contains(writeRecoveryKey)) {
|
||||
if(!initialCommit) txnStateStore->clear(singleKeyRange(writeRecoveryKey));
|
||||
}
|
||||
if (range.contains(snapSourceClusterKey)) {
|
||||
if(!initialCommit) txnStateStore->clear(singleKeyRange(snapSourceClusterKey));
|
||||
}
|
||||
if (range.intersects(testOnlyTxnStateStorePrefixRange)) {
|
||||
if(!initialCommit) txnStateStore->clear(range & testOnlyTxnStateStorePrefixRange);
|
||||
}
|
||||
|
|
|
@ -4736,7 +4736,6 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
TraceEvent("SnapDataDistributor_WriteFlagAttempt")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
tr.clear(singleKeyRange(snapSourceClusterKey));
|
||||
tr.set(writeRecoveryKey, writeRecoveryKeyTrue);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
|
@ -4817,21 +4816,6 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
TraceEvent("SnapDataDistributor_AfterSnapCoords")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
TraceEvent("SnapDataDistributor_SourceClusterAttempt")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
tr.set(snapSourceClusterKey, snapSourceClusterKeyTrue);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SnapDataDistributor_SourceClusterError").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
} catch (Error& err) {
|
||||
state Error e = err;
|
||||
TraceEvent("SnapDataDistributor_SnapReqExit")
|
||||
|
|
|
@ -1571,12 +1571,10 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
int mmApplied = 0; // The number of mutations in tr.mutations that have been applied to the txnStateStore so far
|
||||
if (self->lastEpochEnd != 0) {
|
||||
Optional<Value> snapRecoveryFlag = self->txnStateStore->readValue(writeRecoveryKey).get();
|
||||
Optional<Value> snapSourceCluster = self->txnStateStore->readValue(snapSourceClusterKey).get();
|
||||
TraceEvent("MasterRecoverySnap")
|
||||
.detail("SnapRecoveryFlag", snapRecoveryFlag.present() ? snapRecoveryFlag.get().toString() : "N/A")
|
||||
.detail("SnapSourceCluster", snapSourceCluster.present() ? snapSourceCluster.get().toString() : "N/A")
|
||||
.detail("LastEpochEnd", self->lastEpochEnd);
|
||||
if (snapRecoveryFlag.present() && !snapSourceCluster.present()) {
|
||||
if (snapRecoveryFlag.present()) {
|
||||
TEST(true); // Recovering from snapshot, writing to snapShotEndVersionKey
|
||||
BinaryWriter bw(Unversioned());
|
||||
tr.set(recoveryCommitRequest.arena, snapshotEndVersionKey, (bw << self->lastEpochEnd).toValue());
|
||||
|
|
Loading…
Reference in New Issue