Make atomicSwitchover preserve an ever-increasing commit version.
This commit is contained in:
parent
69523ce151
commit
8f4c45418b
|
@ -1410,6 +1410,7 @@ public:
|
|||
|
||||
TraceEvent("DBA_switchover_locked").detail("version", commitVersion);
|
||||
|
||||
// Wait for the destination to apply mutations up to the lock commit before switching over.
|
||||
state ReadYourWritesTransaction tr2(dest);
|
||||
loop {
|
||||
try {
|
||||
|
@ -1459,6 +1460,27 @@ public:
|
|||
|
||||
TraceEvent("DBA_switchover_started");
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr2.reset();
|
||||
tr2.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr2.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Version destVersion = wait(tr2.getReadVersion());
|
||||
if (destVersion <= commitVersion) {
|
||||
TraceEvent("DBA_switchover_version_upgrade").detail("src", commitVersion).detail("dest", destVersion);
|
||||
TEST(true); // Forcing dest backup cluster to higher version
|
||||
tr2.set(minRequiredCommitVersionKey, BinaryWriter::toValue(commitVersion+1, Unversioned()));
|
||||
Void _ = wait(tr2.commit());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
Void _ = wait(tr2.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("DBA_switchover_version_upgraded");
|
||||
|
||||
Void _ = wait( backupAgent->unlockBackup(dest, tagName) );
|
||||
|
||||
TraceEvent("DBA_switchover_unlocked");
|
||||
|
|
|
@ -290,6 +290,7 @@ ProcessData decodeWorkerListValue( ValueRef const& value ) {
|
|||
|
||||
const KeyRef coordinatorsKey = LiteralStringRef("\xff/coordinators");
|
||||
const KeyRef logsKey = LiteralStringRef("\xff/logs");
|
||||
const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCommitVersion");
|
||||
|
||||
const KeyRef globalKeysPrefix = LiteralStringRef("\xff/globals");
|
||||
const KeyRef lastEpochEndKey = LiteralStringRef("\xff/globals/lastEpochEnd");
|
||||
|
|
|
@ -117,6 +117,7 @@ ProcessData decodeWorkerListValue( ValueRef const& );
|
|||
|
||||
extern const KeyRef coordinatorsKey;
|
||||
extern const KeyRef logsKey;
|
||||
extern const KeyRef minRequiredCommitVersionKey;
|
||||
|
||||
const Value logsValue( const vector<std::pair<UID, NetworkAddress>>& logs, const vector<std::pair<UID, NetworkAddress>>& oldLogs );
|
||||
std::pair<vector<std::pair<UID, NetworkAddress>>,vector<std::pair<UID, NetworkAddress>>> decodeLogsValue( const ValueRef& value );
|
||||
|
|
|
@ -214,6 +214,14 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
|
|||
toCommit->addTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
else if (m.param1 == minRequiredCommitVersionKey) {
|
||||
Version requested = BinaryReader::fromStringRef<Version>(m.param2, Unversioned());
|
||||
TraceEvent("MinRequiredCommitVersion", dbgid).detail("min", requested).detail("current", popVersion).detail("hasConf", !!confChange);
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
if (popVersion != 0 && popVersion < requested) {
|
||||
if (confChange) *confChange = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (m.param2.size() && m.param2[0] == systemKeys.begin[0] && m.type == MutationRef::ClearRange) {
|
||||
KeyRangeRef range(m.param1, m.param2);
|
||||
|
|
|
@ -526,6 +526,13 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
|
|||
self->txnStateLogAdapter = openDiskQueueAdapter( oldLogSystem, txsTag );
|
||||
self->txnStateStore = keyValueStoreLogSystem( self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false );
|
||||
|
||||
// Fetch minRequiredCommitVersion from txnStateStore
|
||||
Optional<Standalone<StringRef>> requiredCommitVersion = wait(self->txnStateStore->readValue( minRequiredCommitVersionKey ));
|
||||
Version minRequiredCommitVersion = -1;
|
||||
if (requiredCommitVersion.present()) {
|
||||
minRequiredCommitVersion = BinaryReader::fromStringRef<Version>(requiredCommitVersion.get(), Unversioned());
|
||||
}
|
||||
|
||||
// Recover version info
|
||||
self->lastEpochEnd = oldLogSystem->getEnd() - 1;
|
||||
if (self->lastEpochEnd == 0) {
|
||||
|
@ -535,6 +542,7 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
|
|||
if(BUGGIFY) {
|
||||
self->recoveryTransactionVersion += g_random->randomInt64(0, SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT);
|
||||
}
|
||||
if ( self->recoveryTransactionVersion < minRequiredCommitVersion ) self->recoveryTransactionVersion = minRequiredCommitVersion;
|
||||
}
|
||||
|
||||
TraceEvent("MasterRecovering", self->dbgid).detail("lastEpochEnd", self->lastEpochEnd).detail("recoveryTransactionVersion", self->recoveryTransactionVersion);
|
||||
|
|
Loading…
Reference in New Issue