Add a DB configuration option for backup workers
Right now, the default is to keep the old backup behavior, i.e., do NOT use backup workers. Specifically, if BackupType is not set (or is set to default), the master will not recruit backup workers and will not add pseudo locality for backup workers. The StartFullBackupTaskFunc is updated to check if backup worker is enabled. Only when it is not enabled, starting a backup will wait on all backup workers to be started.
This commit is contained in:
parent
f7956cfbfc
commit
38aa1903fd
|
@ -41,6 +41,7 @@ void DatabaseConfiguration::resetInternal() {
|
|||
tLogPolicy = storagePolicy = remoteTLogPolicy = Reference<IReplicationPolicy>();
|
||||
remoteDesiredTLogCount = -1;
|
||||
remoteTLogReplicationFactor = repopulateRegionAntiQuorum = 0;
|
||||
backupType = BackupType::DEFAULT;
|
||||
}
|
||||
|
||||
void parse( int* i, ValueRef const& v ) {
|
||||
|
@ -183,6 +184,8 @@ bool DatabaseConfiguration::isValid() const {
|
|||
tLogPolicy &&
|
||||
getDesiredRemoteLogs() >= 1 &&
|
||||
remoteTLogReplicationFactor >= 0 &&
|
||||
backupType >= BackupType::DEFAULT &&
|
||||
backupType < BackupType::END &&
|
||||
repopulateRegionAntiQuorum >= 0 &&
|
||||
repopulateRegionAntiQuorum <= 1 &&
|
||||
usableRegions >= 1 &&
|
||||
|
@ -322,6 +325,10 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
if (autoDesiredTLogCount != CLIENT_KNOBS->DEFAULT_AUTO_LOGS) {
|
||||
result["auto_logs"] = autoDesiredTLogCount;
|
||||
}
|
||||
|
||||
if (backupType > BackupType::DEFAULT) {
|
||||
result["backup_type"] = (int)backupType;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -434,6 +441,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
|
|||
else if (ck == LiteralStringRef("remote_logs")) parse(&remoteDesiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("remote_log_replicas")) parse(&remoteTLogReplicationFactor, value);
|
||||
else if (ck == LiteralStringRef("remote_log_policy")) parseReplicationPolicy(&remoteTLogPolicy, value);
|
||||
else if (ck == LiteralStringRef("backup_type")) { parse((&type), value); backupType = (BackupType::MutationLogType)type; }
|
||||
else if (ck == LiteralStringRef("usable_regions")) parse(&usableRegions, value);
|
||||
else if (ck == LiteralStringRef("repopulate_anti_quorum")) parse(&repopulateRegionAntiQuorum, value);
|
||||
else if (ck == LiteralStringRef("regions")) parse(®ions, value);
|
||||
|
|
|
@ -178,6 +178,9 @@ struct DatabaseConfiguration {
|
|||
int32_t remoteTLogReplicationFactor;
|
||||
Reference<IReplicationPolicy> remoteTLogPolicy;
|
||||
|
||||
// Backup Workers
|
||||
BackupType backupType;
|
||||
|
||||
//Data centers
|
||||
int32_t usableRegions;
|
||||
int32_t repopulateRegionAntiQuorum;
|
||||
|
|
|
@ -987,4 +987,61 @@ struct WorkerBackupStatus {
|
|||
}
|
||||
};
|
||||
|
||||
struct BackupType {
|
||||
// These enumerated values are stored in the database configuration, so can NEVER be changed. Only add new ones just before END.
|
||||
enum MutationLogType {
|
||||
// Use backup mutations generated at Proxies (befor 7.0).
|
||||
DEFAULT = 0,
|
||||
|
||||
// Use tagged mutations pulled from TLogs (7.0 and afterwards)
|
||||
TAGGED = 1,
|
||||
|
||||
// Use both of the above two for backup transition from default mechanism
|
||||
// to the tagged mutation logging.
|
||||
DEFAULT_AND_TAGGED = 2,
|
||||
|
||||
END = 3,
|
||||
};
|
||||
|
||||
BackupType() : type(DEFAULT) {}
|
||||
BackupType(MutationLogType t) : type(t) {
|
||||
if ((uint32_t)t >= END) {
|
||||
this->type = DEFAULT;
|
||||
}
|
||||
}
|
||||
|
||||
operator MutationLogType() const { return MutationLogType(type); }
|
||||
|
||||
bool isBackupWorkerEnabled() const {
|
||||
return type == TAGGED || type == DEFAULT_AND_TAGGED;
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, type);
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
switch (type) {
|
||||
case DEFAULT:
|
||||
return "default";
|
||||
case TAGGED:
|
||||
return "tagged";
|
||||
case DEFAULT_AND_TAGGED:
|
||||
return "default+tagged";
|
||||
default:
|
||||
ASSERT(false);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
static ErrorOr<BackupType> FromStringRef(StringRef s) {
|
||||
if (s == LiteralStringRef("1")) return TAGGED;
|
||||
if (s == LiteralStringRef("2")) return DEFAULT_AND_TAGGED;
|
||||
return default_error_or();
|
||||
}
|
||||
|
||||
uint32_t type;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -2338,6 +2338,7 @@ namespace fileBackup {
|
|||
|
||||
// Clears the backup ID from "backupStartedKey" to pause backup workers.
|
||||
ACTOR static Future<Void> clearBackupStartID(Reference<ReadYourWritesTransaction> tr, UID backupUid) {
|
||||
// If backup worker is not enabled, exit early.
|
||||
Optional<Value> started = wait(tr->get(backupStartedKey));
|
||||
std::vector<std::pair<UID, Version>> ids;
|
||||
if (started.present()) {
|
||||
|
@ -2384,6 +2385,12 @@ namespace fileBackup {
|
|||
}
|
||||
}
|
||||
|
||||
// Check if backup worker is enabled
|
||||
DatabaseConfiguration dbConfig = wait(getDatabaseConfiguration(cx));
|
||||
if (!dbConfig.backupType.isBackupWorkerEnabled()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Set the "backupStartedKey" and wait for all backup worker started
|
||||
tr->reset();
|
||||
state BackupConfig config(task);
|
||||
|
@ -2410,9 +2417,6 @@ namespace fileBackup {
|
|||
} else {
|
||||
Params.beginVersion().set(task, it->second);
|
||||
}
|
||||
for (auto p : ids) {
|
||||
std::cout << "setBackupStartedKey UID: " << p.first.toString() << " Version: " << p.second << "\n";
|
||||
}
|
||||
|
||||
tr->set(backupStartedKey, encodeBackupStartedValue(ids));
|
||||
|
||||
|
@ -3839,6 +3843,8 @@ public:
|
|||
state BackupConfig config(current.first);
|
||||
state EBackupState status = wait(config.stateEnum().getD(tr, false, EBackupState::STATE_NEVERRAN));
|
||||
|
||||
// Call clearBackupStartID().
|
||||
|
||||
if (!FileBackupAgent::isRunnable(status)) {
|
||||
throw backup_unneeded();
|
||||
}
|
||||
|
|
|
@ -780,13 +780,15 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
const int nBackup = std::max<int>(
|
||||
(req.configuration.desiredLogRouterCount > 0 ? req.configuration.desiredLogRouterCount : tlogs.size()),
|
||||
req.maxOldLogRouters);
|
||||
auto backupWorkers =
|
||||
getWorkersForRoleInDatacenter(dcId, ProcessClass::Backup, nBackup, req.configuration, id_used);
|
||||
std::transform(backupWorkers.begin(), backupWorkers.end(), std::back_inserter(result.backupWorkers),
|
||||
[](const WorkerDetails& w) { return w.interf; });
|
||||
if (req.configuration.backupType.isBackupWorkerEnabled()) {
|
||||
const int nBackup = std::max<int>(
|
||||
(req.configuration.desiredLogRouterCount > 0 ? req.configuration.desiredLogRouterCount : tlogs.size()),
|
||||
req.maxOldLogRouters);
|
||||
auto backupWorkers =
|
||||
getWorkersForRoleInDatacenter(dcId, ProcessClass::Backup, nBackup, req.configuration, id_used);
|
||||
std::transform(backupWorkers.begin(), backupWorkers.end(), std::back_inserter(result.backupWorkers),
|
||||
[](const WorkerDetails& w) { return w.interf; });
|
||||
}
|
||||
|
||||
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
||||
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
|
||||
|
@ -914,12 +916,14 @@ public:
|
|||
for(int i = 0; i < proxies.size(); i++)
|
||||
result.proxies.push_back(proxies[i].interf);
|
||||
|
||||
const int nBackup = std::max<int>(tlogs.size(), req.maxOldLogRouters);
|
||||
auto backupWorkers = getWorkersForRoleInDatacenter(dcId, ProcessClass::Backup, nBackup,
|
||||
req.configuration, id_used);
|
||||
std::transform(backupWorkers.begin(), backupWorkers.end(),
|
||||
std::back_inserter(result.backupWorkers),
|
||||
[](const WorkerDetails& w) { return w.interf; });
|
||||
if (req.configuration.backupType.isBackupWorkerEnabled()) {
|
||||
const int nBackup = std::max<int>(tlogs.size(), req.maxOldLogRouters);
|
||||
auto backupWorkers = getWorkersForRoleInDatacenter(dcId, ProcessClass::Backup, nBackup,
|
||||
req.configuration, id_used);
|
||||
std::transform(backupWorkers.begin(), backupWorkers.end(),
|
||||
std::back_inserter(result.backupWorkers),
|
||||
[](const WorkerDetails& w) { return w.interf; });
|
||||
}
|
||||
|
||||
break;
|
||||
} else {
|
||||
|
|
|
@ -2138,16 +2138,19 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->txsTags = configuration.tLogVersion >= TLogVersion::V4 ? recr.tLogs.size() : 0;
|
||||
oldLogSystem->recruitmentID = logSystem->recruitmentID;
|
||||
|
||||
logSystem->logRouterTags = recr.tLogs.size() * std::max<int>(1, configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size()));
|
||||
if(configuration.usableRegions > 1) {
|
||||
logSystem->logRouterTags = recr.tLogs.size() * std::max<int>(1, configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size()));
|
||||
logSystem->expectedLogSets++;
|
||||
logSystem->addPseudoLocality(tagLocalityLogRouterMapped);
|
||||
logSystem->addPseudoLocality(tagLocalityBackup);
|
||||
TraceEvent("AddPseudoLocality", logSystem->getDebugID())
|
||||
.detail("Locality1", "LogRouterMapped")
|
||||
.detail("Locality2", "Backup");
|
||||
} else {
|
||||
TraceEvent e("AddPseudoLocality", logSystem->getDebugID());
|
||||
e.detail("Locality1", "LogRouterMapped");
|
||||
if (configuration.backupType.isBackupWorkerEnabled()) {
|
||||
logSystem->addPseudoLocality(tagLocalityBackup);
|
||||
e.detail("Locality2", "Backup");
|
||||
}
|
||||
} else if (configuration.backupType.isBackupWorkerEnabled()) {
|
||||
// Single region uses log router tag for backup workers.
|
||||
logSystem->logRouterTags = recr.tLogs.size() * std::max<int>(1, configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size()));
|
||||
logSystem->addPseudoLocality(tagLocalityBackup);
|
||||
TraceEvent("AddPseudoLocality", logSystem->getDebugID()).detail("Locality", "Backup");
|
||||
}
|
||||
|
|
|
@ -1525,7 +1525,9 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
self->addActor.send( changeCoordinators(self) );
|
||||
Database cx = openDBOnServer(self->dbInfo, TaskPriority::DefaultEndpoint, true, true);
|
||||
self->addActor.send(configurationMonitor(self, cx));
|
||||
self->addActor.send(recruitBackupWorkers(self, cx));
|
||||
if (self->configuration.backupType.isBackupWorkerEnabled()) {
|
||||
self->addActor.send(recruitBackupWorkers(self, cx));
|
||||
}
|
||||
|
||||
wait( Future<Void>(Never()) );
|
||||
throw internal_error();
|
||||
|
|
Loading…
Reference in New Issue