Add some basic checks before doing an atomic switchover.

This commit is contained in:
Balachandar Namasivayam 2019-03-01 14:49:04 -08:00
parent 9698775b76
commit 7cf71b0931
2 changed files with 107 additions and 5 deletions

View File

@ -1394,8 +1394,8 @@ ACTOR Future<Void> updateAgentPollRate(Database src, std::string rootKey, std::s
}
}
ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest, std::string name, enumProgramExe exe, double *pollDelay, Database taskDest = Database() ) {
state std::string id = g_nondeterministic_random->randomUniqueID().toString();
ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest, std::string name, enumProgramExe exe, double *pollDelay, Database taskDest = Database(),
std::string id = g_nondeterministic_random->randomUniqueID().toString()) {
state std::string metaKey = layerStatusMetaPrefixRange.begin.toString() + "json/" + name;
state std::string rootKey = backupStatusPrefixRange.begin.toString() + name + "/json";
state std::string instanceKey = rootKey + "/" + "agent-" + id;
@ -1451,8 +1451,9 @@ ACTOR Future<Void> statusUpdateActor(Database statusUpdateDest, std::string name
ACTOR Future<Void> runDBAgent(Database src, Database dest) {
state double pollDelay = 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE;
state Future<Void> status = statusUpdateActor(src, "dr_backup", EXE_DR_AGENT, &pollDelay, dest);
state Future<Void> status_other = statusUpdateActor(dest, "dr_backup_dest", EXE_DR_AGENT, &pollDelay, dest);
std::string id = g_nondeterministic_random->randomUniqueID().toString();
state Future<Void> status = statusUpdateActor(src, "dr_backup", EXE_DR_AGENT, &pollDelay, dest, id);
state Future<Void> status_other = statusUpdateActor(dest, "dr_backup_dest", EXE_DR_AGENT, &pollDelay, dest, id);
state DatabaseBackupAgent backupAgent(src);
@ -3060,7 +3061,7 @@ int main(int argc, char* argv[]) {
}
try {
sourceDb = Database::createDatabase(ccf, -1, localities);
sourceDb = Database::createDatabase(sourceCcf, -1, localities);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());

View File

@ -18,7 +18,12 @@
* limitations under the License.
*/
#include <iterator>
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/NativeAPI.actor.h"
#include <ctime>
#include <climits>
#include "fdbrpc/IAsyncFile.h"
@ -1626,6 +1631,96 @@ namespace dbBackup {
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
}
std::set<std::string> getDRAgentsIds(StatusObjectReader statusObj, const char *context) {
std::set<std::string> drBackupAgents;
try {
StatusObjectReader statusObjLayers;
statusObj.get("cluster.layers", statusObjLayers);
StatusObjectReader instances;
std::string path = format("%s.instances", context);
if (statusObjLayers.tryGet(path, instances)) {
for (auto itr : instances.obj()) {
drBackupAgents.insert(itr.first);
}
}
}
catch (std::runtime_error &e) {
TraceEvent(SevWarn, "DBA_GetDRAgentsIdsFail").detail("Error", e.what());
throw backup_error();
}
return drBackupAgents;
}
std::string getDRMutationStreamId(StatusObjectReader statusObj, const char *context, Key tagName) {
try {
StatusObjectReader statusObjLayers;
statusObj.get("cluster.layers", statusObjLayers);
StatusObjectReader tags;
std::string path = format("%s.tags", context);
if (statusObjLayers.tryGet(path, tags)) {
for (auto itr : tags.obj()) {
if (itr.first == tagName.toString()) {
JSONDoc tag(itr.second);
return tag["mutation_stream_id"].get_str();
}
}
}
TraceEvent(SevWarn, "DBA_TagNotPresentInStatus").detail("Tag", tagName.toString()).detail("Context", context);
throw backup_error();
}
catch (std::runtime_error &e) {
TraceEvent(SevWarn, "DBA_GetDRMutationStreamIdFail").detail("Error", e.what());
throw backup_error();
}
}
bool getLockedStatus(StatusObjectReader statusObj) {
try {
StatusObjectReader statusObjCluster = statusObj["cluster"].get_obj();
return statusObjCluster["database_locked"].get_bool();
}
catch (std::runtime_error &e) {
TraceEvent(SevWarn, "DBA_GetLockedStatusFail").detail("Error", e.what());
throw backup_error();
}
}
void checkAtomicSwitchOverConfig(StatusObjectReader srcStatus, StatusObjectReader destStatus, Key tagName) {
try {
// Check if src is unlocked and dest is locked
if (getLockedStatus(srcStatus) != false) {
TraceEvent(SevWarn, "DBA_AtomicSwitchOverSrcLocked");
throw backup_error();
}
if (getLockedStatus(destStatus) != true) {
TraceEvent(SevWarn, "DBA_AtomicSwitchOverDestUnlocked");
throw backup_error();
}
// Check if mutation-stream-id matches
if (getDRMutationStreamId(srcStatus, "dr_backup", tagName) != getDRMutationStreamId(destStatus, "dr_backup_dest", tagName)) {
TraceEvent(SevWarn, "DBA_AtomicSwitchOverMutationIdMismatch").detail("SourceMutationId", getDRMutationStreamId(srcStatus, "dr_backup", tagName))
.detail("DestMutationId", getDRMutationStreamId(destStatus, "dr_back_dest", tagName));
throw backup_error();
}
// Check if there are agents set up with src as its destination cluster and dest as its source cluster
auto srcDRAgents = getDRAgentsIds(srcStatus, "dr_backup_dest");
auto destDRAgents = getDRAgentsIds(destStatus, "dr_backup");
std::set<std::string> intersectingAgents;
std::set_intersection(srcDRAgents.begin(), srcDRAgents.end(), destDRAgents.begin(), destDRAgents.end(),
std::inserter(intersectingAgents, intersectingAgents.begin()));
if (intersectingAgents.empty()) {
TraceEvent(SevWarn, "DBA_SwitchOverPossibleDRAgentsIncorrectSetup");
throw backup_error();
}
}
catch (std::runtime_error &e) {
TraceEvent(SevWarn, "DBA_UnableToCheckAtomicSwitchOverConfig").detail("RunTimeError", e.what());
throw backup_error();
}
return;
}
class DatabaseBackupAgentImpl {
public:
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
@ -1848,6 +1943,12 @@ public:
throw backup_duplicate();
}
if (!g_network->isSimulated()) {
state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src->getConnectionFile()));
StatusObject destStatus = wait(StatusClient::statusFetcher(dest->getConnectionFile()));
checkAtomicSwitchOverConfig(srcStatus, destStatus, tagName);
}
state UID logUid = g_random->randomUniqueID();
state Key logUidValue = BinaryWriter::toValue(logUid, Unversioned());
state UID logUidCurrent = wait(drAgent.getLogUid(backupAgent->taskBucket->src, tagName));