From 7cf71b0931b11ae8011ca3c1f053320974c43aab Mon Sep 17 00:00:00 2001 From: Balachandar Namasivayam Date: Fri, 1 Mar 2019 14:49:04 -0800 Subject: [PATCH] Add some basic checks before doing an atomic switchover. --- fdbbackup/backup.actor.cpp | 11 +-- fdbclient/DatabaseBackupAgent.actor.cpp | 101 ++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 5 deletions(-) diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index b20c7ad210..e2b40d4963 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -1394,8 +1394,8 @@ ACTOR Future updateAgentPollRate(Database src, std::string rootKey, std::s } } -ACTOR Future statusUpdateActor(Database statusUpdateDest, std::string name, enumProgramExe exe, double *pollDelay, Database taskDest = Database() ) { - state std::string id = g_nondeterministic_random->randomUniqueID().toString(); +ACTOR Future statusUpdateActor(Database statusUpdateDest, std::string name, enumProgramExe exe, double *pollDelay, Database taskDest = Database(), + std::string id = g_nondeterministic_random->randomUniqueID().toString()) { state std::string metaKey = layerStatusMetaPrefixRange.begin.toString() + "json/" + name; state std::string rootKey = backupStatusPrefixRange.begin.toString() + name + "/json"; state std::string instanceKey = rootKey + "/" + "agent-" + id; @@ -1451,8 +1451,9 @@ ACTOR Future statusUpdateActor(Database statusUpdateDest, std::string name ACTOR Future runDBAgent(Database src, Database dest) { state double pollDelay = 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE; - state Future status = statusUpdateActor(src, "dr_backup", EXE_DR_AGENT, &pollDelay, dest); - state Future status_other = statusUpdateActor(dest, "dr_backup_dest", EXE_DR_AGENT, &pollDelay, dest); + std::string id = g_nondeterministic_random->randomUniqueID().toString(); + state Future status = statusUpdateActor(src, "dr_backup", EXE_DR_AGENT, &pollDelay, dest, id); + state Future status_other = statusUpdateActor(dest, "dr_backup_dest", EXE_DR_AGENT, &pollDelay, dest, id); state DatabaseBackupAgent backupAgent(src); @@ -3060,7 +3061,7 @@ int main(int argc, char* argv[]) { } try { - sourceDb = Database::createDatabase(ccf, -1, localities); + sourceDb = Database::createDatabase(sourceCcf, -1, localities); } catch (Error& e) { fprintf(stderr, "ERROR: %s\n", e.what()); diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index 42389466b4..dfb9d5027e 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -18,7 +18,12 @@ * limitations under the License. */ +#include #include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/Status.h" +#include "fdbclient/StatusClient.h" +#include "fdbclient/DatabaseContext.h" +#include "fdbclient/NativeAPI.actor.h" #include #include #include "fdbrpc/IAsyncFile.h" @@ -1626,6 +1631,96 @@ namespace dbBackup { REGISTER_TASKFUNC(StartFullBackupTaskFunc); } +std::set getDRAgentsIds(StatusObjectReader statusObj, const char *context) { + std::set drBackupAgents; + try { + StatusObjectReader statusObjLayers; + statusObj.get("cluster.layers", statusObjLayers); + StatusObjectReader instances; + std::string path = format("%s.instances", context); + if (statusObjLayers.tryGet(path, instances)) { + for (auto itr : instances.obj()) { + drBackupAgents.insert(itr.first); + } + } + } + catch (std::runtime_error &e) { + TraceEvent(SevWarn, "DBA_GetDRAgentsIdsFail").detail("Error", e.what()); + throw backup_error(); + } + return drBackupAgents; +} + +std::string getDRMutationStreamId(StatusObjectReader statusObj, const char *context, Key tagName) { + try { + StatusObjectReader statusObjLayers; + statusObj.get("cluster.layers", statusObjLayers); + StatusObjectReader tags; + std::string path = format("%s.tags", context); + if (statusObjLayers.tryGet(path, tags)) { + for (auto itr : tags.obj()) { + if (itr.first == tagName.toString()) { + JSONDoc tag(itr.second); + return tag["mutation_stream_id"].get_str(); + } + } + } + TraceEvent(SevWarn, "DBA_TagNotPresentInStatus").detail("Tag", tagName.toString()).detail("Context", context); + throw backup_error(); + } + catch (std::runtime_error &e) { + TraceEvent(SevWarn, "DBA_GetDRMutationStreamIdFail").detail("Error", e.what()); + throw backup_error(); + } +} + +bool getLockedStatus(StatusObjectReader statusObj) { + try { + StatusObjectReader statusObjCluster = statusObj["cluster"].get_obj(); + return statusObjCluster["database_locked"].get_bool(); + } + catch (std::runtime_error &e) { + TraceEvent(SevWarn, "DBA_GetLockedStatusFail").detail("Error", e.what()); + throw backup_error(); + } +} + +void checkAtomicSwitchOverConfig(StatusObjectReader srcStatus, StatusObjectReader destStatus, Key tagName) { + + try { + // Check if src is unlocked and dest is locked + if (getLockedStatus(srcStatus) != false) { + TraceEvent(SevWarn, "DBA_AtomicSwitchOverSrcLocked"); + throw backup_error(); + } + if (getLockedStatus(destStatus) != true) { + TraceEvent(SevWarn, "DBA_AtomicSwitchOverDestUnlocked"); + throw backup_error(); + } + // Check if mutation-stream-id matches + if (getDRMutationStreamId(srcStatus, "dr_backup", tagName) != getDRMutationStreamId(destStatus, "dr_backup_dest", tagName)) { + TraceEvent(SevWarn, "DBA_AtomicSwitchOverMutationIdMismatch").detail("SourceMutationId", getDRMutationStreamId(srcStatus, "dr_backup", tagName)) + .detail("DestMutationId", getDRMutationStreamId(destStatus, "dr_back_dest", tagName)); + throw backup_error(); + } + // Check if there are agents set up with src as its destination cluster and dest as its source cluster + auto srcDRAgents = getDRAgentsIds(srcStatus, "dr_backup_dest"); + auto destDRAgents = getDRAgentsIds(destStatus, "dr_backup"); + std::set intersectingAgents; + std::set_intersection(srcDRAgents.begin(), srcDRAgents.end(), destDRAgents.begin(), destDRAgents.end(), + std::inserter(intersectingAgents, intersectingAgents.begin())); + if (intersectingAgents.empty()) { + TraceEvent(SevWarn, "DBA_SwitchOverPossibleDRAgentsIncorrectSetup"); + throw backup_error(); + } + } + catch (std::runtime_error &e) { + TraceEvent(SevWarn, "DBA_UnableToCheckAtomicSwitchOverConfig").detail("RunTimeError", e.what()); + throw backup_error(); + } + return; +} + class DatabaseBackupAgentImpl { public: static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8; @@ -1848,6 +1943,12 @@ public: throw backup_duplicate(); } + if (!g_network->isSimulated()) { + state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src->getConnectionFile())); + StatusObject destStatus = wait(StatusClient::statusFetcher(dest->getConnectionFile())); + checkAtomicSwitchOverConfig(srcStatus, destStatus, tagName); + } + state UID logUid = g_random->randomUniqueID(); state Key logUidValue = BinaryWriter::toValue(logUid, Unversioned()); state UID logUidCurrent = wait(drAgent.getLogUid(backupAgent->taskBucket->src, tagName));