Merge pull request #1220 from bnamasivayam/dr-switchover-checks
Add some basic checks before doing an atomic switchover.
This commit is contained in:
commit
6d8aa20ae3
|
@ -657,6 +657,7 @@ CSimpleOpt::SOption g_rgDBSwitchOptions[] = {
|
||||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||||
{ OPT_VERSION, "--version", SO_NONE },
|
{ OPT_VERSION, "--version", SO_NONE },
|
||||||
{ OPT_VERSION, "-v", SO_NONE },
|
{ OPT_VERSION, "-v", SO_NONE },
|
||||||
|
{ OPT_FORCE, "-f", SO_NONE },
|
||||||
{ OPT_CRASHONERROR, "--crash", SO_NONE },
|
{ OPT_CRASHONERROR, "--crash", SO_NONE },
|
||||||
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
|
{ OPT_MEMLIMIT, "-m", SO_REQ_SEP },
|
||||||
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
|
{ OPT_MEMLIMIT, "--memory", SO_REQ_SEP },
|
||||||
|
@ -1394,8 +1395,8 @@ ACTOR Future<Void> updateAgentPollRate(Database src, std::string rootKey, std::s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest, std::string name, enumProgramExe exe, double *pollDelay, Database taskDest = Database() ) {
|
ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest, std::string name, enumProgramExe exe, double *pollDelay, Database taskDest = Database(),
|
||||||
state std::string id = g_nondeterministic_random->randomUniqueID().toString();
|
std::string id = g_nondeterministic_random->randomUniqueID().toString()) {
|
||||||
state std::string metaKey = layerStatusMetaPrefixRange.begin.toString() + "json/" + name;
|
state std::string metaKey = layerStatusMetaPrefixRange.begin.toString() + "json/" + name;
|
||||||
state std::string rootKey = backupStatusPrefixRange.begin.toString() + name + "/json";
|
state std::string rootKey = backupStatusPrefixRange.begin.toString() + name + "/json";
|
||||||
state std::string instanceKey = rootKey + "/" + "agent-" + id;
|
state std::string instanceKey = rootKey + "/" + "agent-" + id;
|
||||||
|
@ -1451,8 +1452,9 @@ ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest, std::string name
|
||||||
|
|
||||||
ACTOR Future<Void> runDBAgent(Database src, Database dest) {
|
ACTOR Future<Void> runDBAgent(Database src, Database dest) {
|
||||||
state double pollDelay = 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE;
|
state double pollDelay = 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE;
|
||||||
state Future<Void> status = statusUpdateActor(src, "dr_backup", EXE_DR_AGENT, &pollDelay, dest);
|
std::string id = g_nondeterministic_random->randomUniqueID().toString();
|
||||||
state Future<Void> status_other = statusUpdateActor(dest, "dr_backup_dest", EXE_DR_AGENT, &pollDelay, dest);
|
state Future<Void> status = statusUpdateActor(src, "dr_backup", EXE_DR_AGENT, &pollDelay, dest, id);
|
||||||
|
state Future<Void> status_other = statusUpdateActor(dest, "dr_backup_dest", EXE_DR_AGENT, &pollDelay, dest, id);
|
||||||
|
|
||||||
state DatabaseBackupAgent backupAgent(src);
|
state DatabaseBackupAgent backupAgent(src);
|
||||||
|
|
||||||
|
@ -1639,7 +1641,7 @@ ACTOR Future<Void> submitBackup(Database db, std::string url, int snapshotInterv
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> switchDBBackup(Database src, Database dest, Standalone<VectorRef<KeyRangeRef>> backupRanges, std::string tagName) {
|
ACTOR Future<Void> switchDBBackup(Database src, Database dest, Standalone<VectorRef<KeyRangeRef>> backupRanges, std::string tagName, bool forceAction) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
state DatabaseBackupAgent backupAgent(src);
|
state DatabaseBackupAgent backupAgent(src);
|
||||||
|
@ -1650,7 +1652,7 @@ ACTOR Future<Void> switchDBBackup(Database src, Database dest, Standalone<Vector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
wait(backupAgent.atomicSwitchover(dest, KeyRef(tagName), backupRanges, StringRef(), StringRef()));
|
wait(backupAgent.atomicSwitchover(dest, KeyRef(tagName), backupRanges, StringRef(), StringRef(), forceAction));
|
||||||
printf("The DR on tag `%s' was successfully switched.\n", printable(StringRef(tagName)).c_str());
|
printf("The DR on tag `%s' was successfully switched.\n", printable(StringRef(tagName)).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3060,7 +3062,7 @@ int main(int argc, char* argv[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
sourceDb = Database::createDatabase(ccf, -1, localities);
|
sourceDb = Database::createDatabase(sourceCcf, -1, localities);
|
||||||
}
|
}
|
||||||
catch (Error& e) {
|
catch (Error& e) {
|
||||||
fprintf(stderr, "ERROR: %s\n", e.what());
|
fprintf(stderr, "ERROR: %s\n", e.what());
|
||||||
|
@ -3221,7 +3223,7 @@ int main(int argc, char* argv[]) {
|
||||||
f = stopAfter( statusDBBackup(sourceDb, db, tagName, maxErrors) );
|
f = stopAfter( statusDBBackup(sourceDb, db, tagName, maxErrors) );
|
||||||
break;
|
break;
|
||||||
case DB_SWITCH:
|
case DB_SWITCH:
|
||||||
f = stopAfter( switchDBBackup(sourceDb, db, backupKeys, tagName) );
|
f = stopAfter( switchDBBackup(sourceDb, db, backupKeys, tagName, forceAction) );
|
||||||
break;
|
break;
|
||||||
case DB_ABORT:
|
case DB_ABORT:
|
||||||
f = stopAfter( abortDBBackup(sourceDb, db, tagName, partial) );
|
f = stopAfter( abortDBBackup(sourceDb, db, tagName, partial) );
|
||||||
|
|
|
@ -335,7 +335,7 @@ public:
|
||||||
return taskBucket->run(cx, futureBucket, pollDelay, maxConcurrentTasks);
|
return taskBucket->run(cx, futureBucket, pollDelay, maxConcurrentTasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> atomicSwitchover(Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix);
|
Future<Void> atomicSwitchover(Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix, bool forceAction=false);
|
||||||
|
|
||||||
Future<Void> unlockBackup(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
Future<Void> unlockBackup(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
||||||
Future<Void> unlockBackup(Database cx, Key tagName) {
|
Future<Void> unlockBackup(Database cx, Key tagName) {
|
||||||
|
|
|
@ -18,7 +18,12 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <iterator>
|
||||||
#include "fdbclient/BackupAgent.actor.h"
|
#include "fdbclient/BackupAgent.actor.h"
|
||||||
|
#include "fdbclient/Status.h"
|
||||||
|
#include "fdbclient/StatusClient.h"
|
||||||
|
#include "fdbclient/DatabaseContext.h"
|
||||||
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
#include <ctime>
|
#include <ctime>
|
||||||
#include <climits>
|
#include <climits>
|
||||||
#include "fdbrpc/IAsyncFile.h"
|
#include "fdbrpc/IAsyncFile.h"
|
||||||
|
@ -1626,6 +1631,96 @@ namespace dbBackup {
|
||||||
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
|
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::set<std::string> getDRAgentsIds(StatusObjectReader statusObj, const char *context) {
|
||||||
|
std::set<std::string> drBackupAgents;
|
||||||
|
try {
|
||||||
|
StatusObjectReader statusObjLayers;
|
||||||
|
statusObj.get("cluster.layers", statusObjLayers);
|
||||||
|
StatusObjectReader instances;
|
||||||
|
std::string path = format("%s.instances", context);
|
||||||
|
if (statusObjLayers.tryGet(path, instances)) {
|
||||||
|
for (auto itr : instances.obj()) {
|
||||||
|
drBackupAgents.insert(itr.first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::runtime_error &e) {
|
||||||
|
TraceEvent(SevWarn, "DBA_GetDRAgentsIdsFail").detail("Error", e.what());
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
return drBackupAgents;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string getDRMutationStreamId(StatusObjectReader statusObj, const char *context, Key tagName) {
|
||||||
|
try {
|
||||||
|
StatusObjectReader statusObjLayers;
|
||||||
|
statusObj.get("cluster.layers", statusObjLayers);
|
||||||
|
StatusObjectReader tags;
|
||||||
|
std::string path = format("%s.tags", context);
|
||||||
|
if (statusObjLayers.tryGet(path, tags)) {
|
||||||
|
for (auto itr : tags.obj()) {
|
||||||
|
if (itr.first == tagName.toString()) {
|
||||||
|
JSONDoc tag(itr.second);
|
||||||
|
return tag["mutation_stream_id"].get_str();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TraceEvent(SevWarn, "DBA_TagNotPresentInStatus").detail("Tag", tagName.toString()).detail("Context", context);
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
catch (std::runtime_error &e) {
|
||||||
|
TraceEvent(SevWarn, "DBA_GetDRMutationStreamIdFail").detail("Error", e.what());
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool getLockedStatus(StatusObjectReader statusObj) {
|
||||||
|
try {
|
||||||
|
StatusObjectReader statusObjCluster = statusObj["cluster"].get_obj();
|
||||||
|
return statusObjCluster["database_locked"].get_bool();
|
||||||
|
}
|
||||||
|
catch (std::runtime_error &e) {
|
||||||
|
TraceEvent(SevWarn, "DBA_GetLockedStatusFail").detail("Error", e.what());
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void checkAtomicSwitchOverConfig(StatusObjectReader srcStatus, StatusObjectReader destStatus, Key tagName) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Check if src is unlocked and dest is locked
|
||||||
|
if (getLockedStatus(srcStatus) != false) {
|
||||||
|
TraceEvent(SevWarn, "DBA_AtomicSwitchOverSrcLocked");
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
if (getLockedStatus(destStatus) != true) {
|
||||||
|
TraceEvent(SevWarn, "DBA_AtomicSwitchOverDestUnlocked");
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
// Check if mutation-stream-id matches
|
||||||
|
if (getDRMutationStreamId(srcStatus, "dr_backup", tagName) != getDRMutationStreamId(destStatus, "dr_backup_dest", tagName)) {
|
||||||
|
TraceEvent(SevWarn, "DBA_AtomicSwitchOverMutationIdMismatch").detail("SourceMutationId", getDRMutationStreamId(srcStatus, "dr_backup", tagName))
|
||||||
|
.detail("DestMutationId", getDRMutationStreamId(destStatus, "dr_back_dest", tagName));
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
// Check if there are agents set up with src as its destination cluster and dest as its source cluster
|
||||||
|
auto srcDRAgents = getDRAgentsIds(srcStatus, "dr_backup_dest");
|
||||||
|
auto destDRAgents = getDRAgentsIds(destStatus, "dr_backup");
|
||||||
|
std::set<std::string> intersectingAgents;
|
||||||
|
std::set_intersection(srcDRAgents.begin(), srcDRAgents.end(), destDRAgents.begin(), destDRAgents.end(),
|
||||||
|
std::inserter(intersectingAgents, intersectingAgents.begin()));
|
||||||
|
if (intersectingAgents.empty()) {
|
||||||
|
TraceEvent(SevWarn, "DBA_SwitchOverPossibleDRAgentsIncorrectSetup");
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::runtime_error &e) {
|
||||||
|
TraceEvent(SevWarn, "DBA_UnableToCheckAtomicSwitchOverConfig").detail("RunTimeError", e.what());
|
||||||
|
throw backup_error();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
class DatabaseBackupAgentImpl {
|
class DatabaseBackupAgentImpl {
|
||||||
public:
|
public:
|
||||||
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
|
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
|
||||||
|
@ -1838,7 +1933,7 @@ public:
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> atomicSwitchover(DatabaseBackupAgent* backupAgent, Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix) {
|
ACTOR static Future<Void> atomicSwitchover(DatabaseBackupAgent* backupAgent, Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix, bool forceAction) {
|
||||||
state DatabaseBackupAgent drAgent(dest);
|
state DatabaseBackupAgent drAgent(dest);
|
||||||
state UID destlogUid = wait(backupAgent->getLogUid(dest, tagName));
|
state UID destlogUid = wait(backupAgent->getLogUid(dest, tagName));
|
||||||
state int status = wait(backupAgent->getStateValue(dest, destlogUid));
|
state int status = wait(backupAgent->getStateValue(dest, destlogUid));
|
||||||
|
@ -1848,6 +1943,12 @@ public:
|
||||||
throw backup_duplicate();
|
throw backup_duplicate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!g_network->isSimulated() && !forceAction) {
|
||||||
|
state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src->getConnectionFile()));
|
||||||
|
StatusObject destStatus = wait(StatusClient::statusFetcher(dest->getConnectionFile()));
|
||||||
|
checkAtomicSwitchOverConfig(srcStatus, destStatus, tagName);
|
||||||
|
}
|
||||||
|
|
||||||
state UID logUid = g_random->randomUniqueID();
|
state UID logUid = g_random->randomUniqueID();
|
||||||
state Key logUidValue = BinaryWriter::toValue(logUid, Unversioned());
|
state Key logUidValue = BinaryWriter::toValue(logUid, Unversioned());
|
||||||
state UID logUidCurrent = wait(drAgent.getLogUid(backupAgent->taskBucket->src, tagName));
|
state UID logUidCurrent = wait(drAgent.getLogUid(backupAgent->taskBucket->src, tagName));
|
||||||
|
@ -2296,8 +2397,8 @@ Future<Void> DatabaseBackupAgent::unlockBackup(Reference<ReadYourWritesTransacti
|
||||||
return DatabaseBackupAgentImpl::unlockBackup(this, tr, tagName);
|
return DatabaseBackupAgentImpl::unlockBackup(this, tr, tagName);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> DatabaseBackupAgent::atomicSwitchover(Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix) {
|
Future<Void> DatabaseBackupAgent::atomicSwitchover(Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix, bool forceAction) {
|
||||||
return DatabaseBackupAgentImpl::atomicSwitchover(this, dest, tagName, backupRanges, addPrefix, removePrefix);
|
return DatabaseBackupAgentImpl::atomicSwitchover(this, dest, tagName, backupRanges, addPrefix, removePrefix, forceAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> DatabaseBackupAgent::submitBackup(Reference<ReadYourWritesTransaction> tr, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone, Key addPrefix, Key removePrefix, bool lockDatabase, bool databasesInSync) {
|
Future<Void> DatabaseBackupAgent::submitBackup(Reference<ReadYourWritesTransaction> tr, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone, Key addPrefix, Key removePrefix, bool lockDatabase, bool databasesInSync) {
|
||||||
|
|
Loading…
Reference in New Issue