Fix a few minor restore bugs and add a dry-run mode. Some improvements to the fdbcli output.
This commit is contained in:
parent
f3b58a063f
commit
7284e691fb
|
@ -179,9 +179,15 @@ ACTOR Future<bool> metaclusterRemoveCommand(Reference<IDatabase> db, std::vector
|
|||
}));
|
||||
|
||||
if (clusterType == ClusterType::METACLUSTER_DATA && !force) {
|
||||
fmt::print("ERROR: cannot remove a data cluster directly. To remove a data cluster,\n"
|
||||
"use the `remove' command on the management cluster. To force a data cluster\n"
|
||||
"to forget its metacluster association without fully removing it, use FORCE.\n");
|
||||
if (tokens[2] == "FORCE"_sr) {
|
||||
fmt::print("ERROR: a cluster name must be specified.\n");
|
||||
} else {
|
||||
fmt::print("ERROR: cannot remove a data cluster directly. To remove a data cluster,\n"
|
||||
"use the `remove' command on the management cluster. To force a data cluster\n"
|
||||
"to forget its metacluster association without fully removing it, use FORCE.\n");
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool updatedDataCluster =
|
||||
|
@ -189,9 +195,11 @@ ACTOR Future<bool> metaclusterRemoveCommand(Reference<IDatabase> db, std::vector
|
|||
|
||||
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
fmt::print("The cluster `{}' has been removed\n", printable(clusterName).c_str());
|
||||
fmt::print("WARNING: the data cluster could not be updated and still contains its\n"
|
||||
"metacluster registration info. To finish removing it, FORCE remove the\n"
|
||||
"data cluster directly.");
|
||||
if (!updatedDataCluster) {
|
||||
fmt::print("WARNING: the data cluster could not be updated and may still contains its\n"
|
||||
"metacluster registration info. To finish removing it, FORCE remove the\n"
|
||||
"data cluster directly.\n");
|
||||
}
|
||||
} else {
|
||||
ASSERT(updatedDataCluster);
|
||||
fmt::print("The cluster `{}' has removed its association with its metacluster.\n"
|
||||
|
@ -201,34 +209,54 @@ ACTOR Future<bool> metaclusterRemoveCommand(Reference<IDatabase> db, std::vector
|
|||
return true;
|
||||
}
|
||||
|
||||
void printRestoreUsage() {
|
||||
fmt::print("Usage: metacluster restore <NAME> [dryrun] connection_string=<CONNECTION_STRING>\n"
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster> [force_join_new_metacluster]\n\n");
|
||||
|
||||
fmt::print("Add a restored data cluster back to a metacluster.\n\n");
|
||||
|
||||
fmt::print("Use `dryrun' to report what changes a restore would make and whether any\n");
|
||||
fmt::print("failures would occur. Without `dryrun', the restore will modify the metacluster\n");
|
||||
fmt::print("with the changes required to perform the restore.\n\n");
|
||||
|
||||
fmt::print("Use `restore_known_data_cluster' to add back a restored copy of a data cluster\n");
|
||||
fmt::print("that the metacluster is already tracking. This mode should be used if only data\n");
|
||||
fmt::print("clusters are being restored, and any discrepancies between the management and\n");
|
||||
fmt::print("data clusters will be resolved using the management cluster metadata.\n");
|
||||
fmt::print("If `force_join_new_metacluster' is specified, the cluster will try to restore\n");
|
||||
fmt::print("to a different metacluster than it was originally registered to.\n\n");
|
||||
|
||||
fmt::print("Use `repopulate_from_data_cluster' to rebuild a lost management cluster from the\n");
|
||||
fmt::print("data clusters in a metacluster. This mode should be used if the management\n");
|
||||
fmt::print("cluster is being restored. If any data clusters are also being restored, the\n");
|
||||
fmt::print("oldest data clusters should be added first before any non-recovered data\n");
|
||||
fmt::print("clusters. Any conflicts arising between the added data cluster and existing data\n");
|
||||
fmt::print("will cause the restore to fail. Before repopulating a metacluster from a data\n");
|
||||
fmt::print("cluster, that data cluster needs to be detached from its prior metacluster using\n");
|
||||
fmt::print("the `metacluster remove' command.\n");
|
||||
}
|
||||
|
||||
// metacluster restore command
|
||||
ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
|
||||
if (tokens.size() != 5) {
|
||||
fmt::print("Usage: metacluster restore <NAME> connection_string=<CONNECTION_STRING>\n"
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster>\n\n");
|
||||
|
||||
fmt::print("Add a restored data cluster back to a metacluster.\n\n");
|
||||
|
||||
fmt::print("Use `restore_known_data_cluster' to add back a restored copy of a data cluster\n");
|
||||
fmt::print("that the metacluster is already tracking. This mode should be used if only data\n");
|
||||
fmt::print("clusters are being restored, and any discrepancies between the management and\n");
|
||||
fmt::print("data clusters will be resolved using the management cluster metadata.\n\n");
|
||||
|
||||
fmt::print("Use `repopulate_from_data_cluster' to rebuild a lost management cluster from the\n");
|
||||
fmt::print("data clusters in a metacluster. This mode should be used if the management\n");
|
||||
fmt::print("cluster is being restored. If any data clusters are also being restored, the\n");
|
||||
fmt::print("oldest data clusters should be added first before any non-recovered data\n");
|
||||
fmt::print("clusters. Any conflicts arising between the added data cluster and existing data\n");
|
||||
fmt::print("will cause the restore to fail. Before repopulating a metacluster from a data\n");
|
||||
fmt::print("cluster, that data cluster needs to be detached from its prior metacluster using\n");
|
||||
fmt::print("the `metacluster remove' command.\n");
|
||||
|
||||
if (tokens.size() < 5 || tokens.size() > 7) {
|
||||
printRestoreUsage();
|
||||
return false;
|
||||
}
|
||||
|
||||
state bool dryRun = tokens[3] == "dryrun"_sr;
|
||||
state bool forceJoin = tokens[tokens.size() - 1] == "force_join_new_metacluster"_sr;
|
||||
|
||||
if (tokens.size() < 5 + (int)dryRun + (int)forceJoin) {
|
||||
printRestoreUsage();
|
||||
return false;
|
||||
}
|
||||
|
||||
state ClusterName clusterName = tokens[2];
|
||||
state StringRef restoreType = tokens[tokens.size() - 1 - (int)forceJoin];
|
||||
|
||||
// connection string
|
||||
DataClusterEntry defaultEntry;
|
||||
auto config = parseClusterConfiguration(tokens, defaultEntry, 3, 4);
|
||||
auto config = parseClusterConfiguration(tokens, defaultEntry, 3 + (int)dryRun, 3 + (int)dryRun + 1);
|
||||
if (!config.present()) {
|
||||
return false;
|
||||
} else if (!config.get().first.present()) {
|
||||
|
@ -240,15 +268,24 @@ ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vecto
|
|||
state bool success = true;
|
||||
|
||||
try {
|
||||
if (tokens[4] == "restore_known_data_cluster"_sr) {
|
||||
wait(MetaclusterAPI::restoreCluster(
|
||||
db, tokens[2], config.get().first.get(), ApplyManagementClusterUpdates::True, &messages));
|
||||
|
||||
} else if (tokens[4] == "repopulate_from_data_cluster"_sr) {
|
||||
wait(MetaclusterAPI::restoreCluster(
|
||||
db, tokens[2], config.get().first.get(), ApplyManagementClusterUpdates::False, &messages));
|
||||
if (restoreType == "restore_known_data_cluster"_sr) {
|
||||
wait(MetaclusterAPI::restoreCluster(db,
|
||||
clusterName,
|
||||
config.get().first.get(),
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun(dryRun),
|
||||
ForceJoinNewMetacluster(forceJoin),
|
||||
&messages));
|
||||
} else if (restoreType == "repopulate_from_data_cluster"_sr) {
|
||||
wait(MetaclusterAPI::restoreCluster(db,
|
||||
clusterName,
|
||||
config.get().first.get(),
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun(dryRun),
|
||||
ForceJoinNewMetacluster(forceJoin),
|
||||
&messages));
|
||||
} else {
|
||||
fmt::print(stderr, "ERROR: unrecognized restore mode `{}'\n", printable(tokens[4]));
|
||||
fmt::print(stderr, "ERROR: unrecognized restore mode `{}'\n", printable(restoreType));
|
||||
success = false;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -257,11 +294,7 @@ ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vecto
|
|||
}
|
||||
|
||||
if (!messages.empty()) {
|
||||
if (!success) {
|
||||
fmt::print(stderr, "\n");
|
||||
}
|
||||
|
||||
fmt::print(success ? stdout : stderr, "The restore reported the following messages:\n");
|
||||
fmt::print(success ? stdout : stderr, "\nThe restore reported the following messages:\n\n");
|
||||
for (int i = 0; i < messages.size(); ++i) {
|
||||
fmt::print(success ? stdout : stderr, " {}. {}\n", i + 1, messages[i]);
|
||||
}
|
||||
|
@ -272,7 +305,12 @@ ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vecto
|
|||
}
|
||||
|
||||
if (success) {
|
||||
fmt::print("The cluster `{}' has been restored\n", printable(tokens[2]).c_str());
|
||||
if (dryRun) {
|
||||
fmt::print("The restore dry run completed successfully. To perform the restore, run the same command\n");
|
||||
fmt::print("without the `dryrun' argument.\n");
|
||||
} else {
|
||||
fmt::print("The cluster `{}' has been restored\n", printable(clusterName).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
|
@ -376,6 +414,7 @@ ACTOR Future<bool> metaclusterGetCommand(Reference<IDatabase> db, std::vector<St
|
|||
obj[msgClusterKey] = metadata.toJson();
|
||||
fmt::print("{}\n", json_spirit::write_string(json_spirit::mValue(obj), json_spirit::pretty_print).c_str());
|
||||
} else {
|
||||
fmt::print(" id: {}\n", metadata.entry.id.toString().c_str());
|
||||
fmt::print(" connection string: {}\n", metadata.connectionString.toString().c_str());
|
||||
fmt::print(" cluster state: {}\n", DataClusterEntry::clusterStateToString(metadata.entry.clusterState));
|
||||
fmt::print(" tenant group capacity: {}\n", metadata.entry.capacity.numTenantGroups);
|
||||
|
@ -534,14 +573,35 @@ void metaclusterGenerator(const char* text,
|
|||
(tokens.size() == 3 && tokencmp(tokens[1], "get"))) {
|
||||
const char* opts[] = { "JSON", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else if (tokens.size() == 2 && tokencmp(tokens[1], "remove")) {
|
||||
const char* opts[] = { "FORCE", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else if (tokens.size() > 1 && tokencmp(tokens[1], "restore")) {
|
||||
if (tokens.size() == 3) {
|
||||
const char* opts[] = { "dryrun", "connection_string=", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else {
|
||||
bool dryrun = tokens[3] == "dryrun"_sr;
|
||||
if (tokens.size() == 3 + (int)dryrun) {
|
||||
const char* opts[] = { "connection_string=", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else if (tokens.size() == 4 + (int)dryrun) {
|
||||
const char* opts[] = { "restore_known_data_cluster", "repopulate_from_data_cluster", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else if (tokens.size() == 5 + (int)dryrun) {
|
||||
const char* opts[] = { "force_join_new_metacluster", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<const char*> metaclusterHintGenerator(std::vector<StringRef> const& tokens, bool inArgument) {
|
||||
if (tokens.size() == 1) {
|
||||
return { "<create_experimental|decommission|register|remove|restore|configure|list|get|status>", "[ARGS]" };
|
||||
} else if (tokencmp(tokens[1], "create_experimental")) {
|
||||
return { "<NAME> <TENANT_ID_PREFIX>" };
|
||||
} else if (tokencmp(tokens[1], "create_experimental") && tokens.size() < 4) {
|
||||
static std::vector<const char*> opts = { "<NAME>", "<TENANT_ID_PREFIX>" };
|
||||
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
|
||||
} else if (tokencmp(tokens[1], "decommission")) {
|
||||
return {};
|
||||
} else if (tokencmp(tokens[1], "register") && tokens.size() < 5) {
|
||||
|
@ -559,11 +619,19 @@ std::vector<const char*> metaclusterHintGenerator(std::vector<StringRef> const&
|
|||
} else {
|
||||
return {};
|
||||
}
|
||||
} else if (tokencmp(tokens[1], "restore") && tokens.size() < 5) {
|
||||
} else if (tokencmp(tokens[1], "restore") && tokens.size() < 7) {
|
||||
static std::vector<const char*> opts = { "<NAME>",
|
||||
"connection_string=<CONNECTION_STRING> ",
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster>" };
|
||||
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
|
||||
"[dryrun]",
|
||||
"connection_string=<CONNECTION_STRING>",
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster>",
|
||||
"[force_join_new_metacluster]" };
|
||||
if (tokens.size() < 4 || (tokens[3].size() <= 6 && "dryrun"_sr.startsWith(tokens[3]))) {
|
||||
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
|
||||
} else if (tokens.size() < 6) {
|
||||
return std::vector<const char*>(opts.begin() + tokens.size() - 1, opts.end());
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
} else if (tokencmp(tokens[1], "configure")) {
|
||||
static std::vector<const char*> opts = {
|
||||
"<NAME>", "<max_tenant_groups=<NUM_GROUPS>|connection_string=<CONNECTION_STRING>>"
|
||||
|
|
|
@ -463,13 +463,15 @@ ACTOR Future<bool> tenantGetCommand(Reference<IDatabase> db, std::vector<StringR
|
|||
std::string tenantState;
|
||||
std::string tenantGroup;
|
||||
std::string assignedCluster;
|
||||
std::string error;
|
||||
|
||||
doc.get("id", id);
|
||||
doc.get("prefix.printable", prefix);
|
||||
|
||||
doc.get("tenant_state", tenantState);
|
||||
bool hasTenantGroup = doc.tryGet("tenant_group.printable", tenantGroup);
|
||||
bool hasAssignedCluster = doc.tryGet("assigned_cluster", assignedCluster);
|
||||
bool hasAssignedCluster = doc.tryGet("assigned_cluster.printable", assignedCluster);
|
||||
bool hasError = doc.tryGet("error", error);
|
||||
|
||||
fmt::print(" id: {}\n", id);
|
||||
fmt::print(" prefix: {}\n", printable(prefix).c_str());
|
||||
|
@ -480,6 +482,9 @@ ACTOR Future<bool> tenantGetCommand(Reference<IDatabase> db, std::vector<StringR
|
|||
if (hasAssignedCluster) {
|
||||
fmt::print(" assigned cluster: {}\n", printable(assignedCluster).c_str());
|
||||
}
|
||||
if (hasError) {
|
||||
fmt::print(" error: {}\n", error);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -27,6 +27,9 @@ FDB_DEFINE_BOOLEAN_PARAM(AssignClusterAutomatically);
|
|||
FDB_DEFINE_BOOLEAN_PARAM(GroupAlreadyExists);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(IsRestoring);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(RunOnDisconnectedCluster);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(RunOnMismatchedCluster);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(RestoreDryRun);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(ForceJoinNewMetacluster);
|
||||
|
||||
std::string clusterTypeToString(const ClusterType& clusterType) {
|
||||
switch (clusterType) {
|
||||
|
@ -72,6 +75,7 @@ DataClusterState DataClusterEntry::stringToClusterState(std::string stateStr) {
|
|||
|
||||
json_spirit::mObject DataClusterEntry::toJson() const {
|
||||
json_spirit::mObject obj;
|
||||
obj["id"] = id.toString();
|
||||
obj["capacity"] = capacity.toJson();
|
||||
obj["allocated"] = allocated.toJson();
|
||||
obj["cluster_state"] = DataClusterEntry::clusterStateToString(clusterState);
|
||||
|
|
|
@ -174,6 +174,10 @@ std::string TenantMapEntry::toJson() const {
|
|||
tenantEntry["tenant_group"] = binaryToJson(tenantGroup.get());
|
||||
}
|
||||
|
||||
if (tenantState == TenantState::ERROR && error.size()) {
|
||||
tenantEntry["error"] = error;
|
||||
}
|
||||
|
||||
return json_spirit::write_string(json_spirit::mValue(tenantEntry));
|
||||
}
|
||||
|
||||
|
|
|
@ -92,6 +92,9 @@ FDB_DECLARE_BOOLEAN_PARAM(AssignClusterAutomatically);
|
|||
FDB_DECLARE_BOOLEAN_PARAM(GroupAlreadyExists);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(IsRestoring);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(RunOnDisconnectedCluster);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(RunOnMismatchedCluster);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(RestoreDryRun);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ForceJoinNewMetacluster);
|
||||
|
||||
namespace MetaclusterAPI {
|
||||
|
||||
|
@ -282,7 +285,7 @@ struct MetaclusterOperationContext {
|
|||
throw invalid_metacluster_operation();
|
||||
} else if (self->metaclusterRegistration.present() &&
|
||||
!self->metaclusterRegistration.get().matches(currentMetaclusterRegistration.get())) {
|
||||
throw invalid_metacluster_operation();
|
||||
throw metacluster_mismatch();
|
||||
}
|
||||
|
||||
// If a cluster was specified, check that the cluster metadata is present. If so, load it and store
|
||||
|
@ -341,11 +344,13 @@ struct MetaclusterOperationContext {
|
|||
static Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())>
|
||||
runDataClusterTransaction(MetaclusterOperationContext* self,
|
||||
Function func,
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster) {
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster,
|
||||
RunOnMismatchedCluster runOnMismatchedCluster) {
|
||||
ASSERT(self->dataClusterDb);
|
||||
ASSERT(self->dataClusterMetadata.present());
|
||||
ASSERT(runOnDisconnectedCluster || self->dataClusterMetadata.present());
|
||||
ASSERT(self->metaclusterRegistration.present() &&
|
||||
self->metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA);
|
||||
(runOnDisconnectedCluster ||
|
||||
self->metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA));
|
||||
|
||||
self->checkClusterState();
|
||||
|
||||
|
@ -365,7 +370,9 @@ struct MetaclusterOperationContext {
|
|||
} else if (currentMetaclusterRegistration.get().clusterType != ClusterType::METACLUSTER_DATA) {
|
||||
throw cluster_removed();
|
||||
} else if (!self->metaclusterRegistration.get().matches(currentMetaclusterRegistration.get())) {
|
||||
throw cluster_removed();
|
||||
if (!runOnMismatchedCluster) {
|
||||
throw cluster_removed();
|
||||
}
|
||||
}
|
||||
|
||||
self->dataClusterIsRegistered = currentMetaclusterRegistration.present();
|
||||
|
@ -383,8 +390,9 @@ struct MetaclusterOperationContext {
|
|||
template <class Function>
|
||||
Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())>
|
||||
runDataClusterTransaction(Function func,
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster = RunOnDisconnectedCluster::False) {
|
||||
return runDataClusterTransaction(this, func, runOnDisconnectedCluster);
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster = RunOnDisconnectedCluster::False,
|
||||
RunOnMismatchedCluster runOnMismatchedCluster = RunOnMismatchedCluster::False) {
|
||||
return runDataClusterTransaction(this, func, runOnDisconnectedCluster, runOnMismatchedCluster);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> updateClusterName(MetaclusterOperationContext* self,
|
||||
|
@ -649,13 +657,14 @@ ACTOR template <class Transaction>
|
|||
static Future<Void> registerInManagementCluster(Transaction tr,
|
||||
ClusterName clusterName,
|
||||
DataClusterEntry clusterEntry,
|
||||
ClusterConnectionString connectionString) {
|
||||
ClusterConnectionString connectionString,
|
||||
RestoreDryRun restoreDryRun) {
|
||||
state Optional<DataClusterMetadata> dataClusterMetadata = wait(tryGetClusterTransaction(tr, clusterName));
|
||||
if (dataClusterMetadata.present() &&
|
||||
!dataClusterMetadata.get().matchesConfiguration(DataClusterMetadata(clusterEntry, connectionString))) {
|
||||
TraceEvent("RegisterClusterAlreadyExists").detail("ClusterName", clusterName);
|
||||
throw cluster_already_exists();
|
||||
} else if (!dataClusterMetadata.present()) {
|
||||
} else if (!restoreDryRun && !dataClusterMetadata.present()) {
|
||||
clusterEntry.allocated = ClusterUsage();
|
||||
|
||||
if (clusterEntry.hasCapacity()) {
|
||||
|
@ -664,14 +673,14 @@ static Future<Void> registerInManagementCluster(Transaction tr,
|
|||
}
|
||||
ManagementClusterMetadata::dataClusters().set(tr, clusterName, clusterEntry);
|
||||
ManagementClusterMetadata::dataClusterConnectionRecords.set(tr, clusterName, connectionString);
|
||||
}
|
||||
|
||||
TraceEvent("RegisteredDataCluster")
|
||||
.detail("ClusterName", clusterName)
|
||||
.detail("ClusterID", clusterEntry.id)
|
||||
.detail("Capacity", clusterEntry.capacity)
|
||||
.detail("Version", tr->getCommittedVersion())
|
||||
.detail("ConnectionString", connectionString.toString());
|
||||
TraceEvent("RegisteredDataCluster")
|
||||
.detail("ClusterName", clusterName)
|
||||
.detail("ClusterID", clusterEntry.id)
|
||||
.detail("Capacity", clusterEntry.capacity)
|
||||
.detail("Version", tr->getCommittedVersion())
|
||||
.detail("ConnectionString", connectionString.toString());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -869,8 +878,10 @@ struct RemoveClusterImpl {
|
|||
ClusterType clusterType,
|
||||
bool forceRemove,
|
||||
double dataClusterTimeout)
|
||||
: ctx(db, Optional<ClusterName>(), { DataClusterState::REMOVING, DataClusterState::RESTORING }), db(db),
|
||||
clusterType(clusterType), clusterName(clusterName), forceRemove(forceRemove),
|
||||
: ctx(db,
|
||||
Optional<ClusterName>(),
|
||||
{ DataClusterState::REGISTERING, DataClusterState::REMOVING, DataClusterState::RESTORING }),
|
||||
db(db), clusterType(clusterType), clusterName(clusterName), forceRemove(forceRemove),
|
||||
dataClusterTimeout(dataClusterTimeout) {}
|
||||
|
||||
// Returns false if the cluster is no longer present, or true if it is present and the removal should proceed.
|
||||
|
@ -1098,7 +1109,7 @@ struct RemoveClusterImpl {
|
|||
.detail("ExpectedName", self->clusterName)
|
||||
.detail("MetaclusterRegistration",
|
||||
metaclusterRegistrationEntry.map(&MetaclusterRegistrationEntry::toString));
|
||||
throw invalid_metacluster_operation();
|
||||
throw metacluster_mismatch();
|
||||
}
|
||||
|
||||
wait(updateDataCluster(self, tr, metaclusterRegistrationEntry.get().id));
|
||||
|
@ -1318,6 +1329,8 @@ struct RestoreClusterImpl {
|
|||
ClusterName clusterName;
|
||||
ClusterConnectionString connectionString;
|
||||
ApplyManagementClusterUpdates applyManagementClusterUpdates;
|
||||
RestoreDryRun restoreDryRun;
|
||||
ForceJoinNewMetacluster forceJoinNewMetacluster;
|
||||
std::vector<std::string>& messages;
|
||||
|
||||
// Loaded from the data cluster
|
||||
|
@ -1333,32 +1346,57 @@ struct RestoreClusterImpl {
|
|||
ClusterName clusterName,
|
||||
ClusterConnectionString connectionString,
|
||||
ApplyManagementClusterUpdates applyManagementClusterUpdates,
|
||||
RestoreDryRun restoreDryRun,
|
||||
ForceJoinNewMetacluster forceJoinNewMetacluster,
|
||||
std::vector<std::string>& messages)
|
||||
: ctx(managementDb, {}, { DataClusterState::RESTORING }), clusterName(clusterName),
|
||||
connectionString(connectionString), applyManagementClusterUpdates(applyManagementClusterUpdates),
|
||||
messages(messages) {}
|
||||
restoreDryRun(restoreDryRun), forceJoinNewMetacluster(forceJoinNewMetacluster), messages(messages) {}
|
||||
|
||||
// If restoring a data cluster, verify that it has a matching registration entry
|
||||
ACTOR static Future<Void> loadDataClusterRegistration(RestoreClusterImpl* self) {
|
||||
state Reference<IDatabase> db = wait(openDatabase(self->connectionString));
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
|
||||
Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
|
||||
wait(MetaclusterMetadata::metaclusterRegistration().get(db));
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
|
||||
wait(MetaclusterMetadata::metaclusterRegistration().get(tr));
|
||||
|
||||
if (!metaclusterRegistration.present()) {
|
||||
throw invalid_data_cluster();
|
||||
} else if (!metaclusterRegistration.get().matches(self->ctx.metaclusterRegistration.get()) ||
|
||||
metaclusterRegistration.get().name != self->clusterName) {
|
||||
TraceEvent(SevWarn, "MetaclusterRestoreClusterMismatch")
|
||||
.detail("ExistingRegistration", metaclusterRegistration.get())
|
||||
.detail("ManagementClusterRegistration", self->ctx.metaclusterRegistration.get());
|
||||
throw cluster_already_exists();
|
||||
if (!metaclusterRegistration.present()) {
|
||||
throw invalid_data_cluster();
|
||||
} else if (!metaclusterRegistration.get().matches(self->ctx.metaclusterRegistration.get())) {
|
||||
if (!self->forceJoinNewMetacluster) {
|
||||
TraceEvent(SevWarn, "MetaclusterRestoreClusterMismatch")
|
||||
.detail("ExistingRegistration", metaclusterRegistration.get())
|
||||
.detail("ManagementClusterRegistration", self->ctx.metaclusterRegistration.get());
|
||||
throw cluster_already_registered();
|
||||
} else if (!self->restoreDryRun) {
|
||||
ASSERT(self->ctx.metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA);
|
||||
MetaclusterMetadata::metaclusterRegistration().set(tr, self->ctx.metaclusterRegistration.get());
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
} else {
|
||||
self->messages.push_back(fmt::format("Move data cluster to new metacluster\n"
|
||||
" original: {}\n"
|
||||
" updated: {}",
|
||||
metaclusterRegistration.get().toString(),
|
||||
self->ctx.metaclusterRegistration.get().toString()));
|
||||
}
|
||||
} else if (metaclusterRegistration.get().name != self->clusterName) {
|
||||
TraceEvent(SevWarn, "MetaclusterRestoreClusterNameMismatch")
|
||||
.detail("ExistingName", metaclusterRegistration.get().name)
|
||||
.detail("ManagementClusterRegistration", self->clusterName);
|
||||
throw cluster_already_registered();
|
||||
}
|
||||
|
||||
self->dataClusterId = metaclusterRegistration.get().id;
|
||||
self->ctx.dataClusterDb = db;
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
|
||||
self->dataClusterId = metaclusterRegistration.get().id;
|
||||
self->ctx.dataClusterDb = db;
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// If adding a data cluster to a restored management cluster, write a metacluster registration entry
|
||||
|
@ -1387,8 +1425,11 @@ struct RestoreClusterImpl {
|
|||
throw cluster_already_registered();
|
||||
}
|
||||
|
||||
MetaclusterMetadata::metaclusterRegistration().set(tr, dataClusterEntry);
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
if (!self->restoreDryRun) {
|
||||
MetaclusterMetadata::metaclusterRegistration().set(tr, dataClusterEntry);
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
}
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
|
@ -1403,15 +1444,14 @@ struct RestoreClusterImpl {
|
|||
DataClusterEntry updatedEntry = ctx.dataClusterMetadata.get().entry;
|
||||
updatedEntry.clusterState = DataClusterState::RESTORING;
|
||||
|
||||
updateClusterMetadata(
|
||||
tr, ctx.clusterName.get(), ctx.dataClusterMetadata.get(), connectionString, updatedEntry);
|
||||
updateClusterMetadata(tr, clusterName, ctx.dataClusterMetadata.get(), connectionString, updatedEntry);
|
||||
|
||||
// Remove this cluster from the cluster capacity index, but leave its configured capacity intact in the
|
||||
// cluster entry. This allows us to retain the configured capacity while preventing the cluster from
|
||||
// being used to allocate new tenant groups.
|
||||
DataClusterEntry noCapacityEntry = updatedEntry;
|
||||
noCapacityEntry.capacity.numTenantGroups = 0;
|
||||
updateClusterCapacityIndex(tr, ctx.clusterName.get(), updatedEntry, noCapacityEntry);
|
||||
updateClusterCapacityIndex(tr, clusterName, updatedEntry, noCapacityEntry);
|
||||
}
|
||||
|
||||
TraceEvent("MarkedDataClusterRestoring").detail("Name", clusterName);
|
||||
|
@ -1422,17 +1462,15 @@ struct RestoreClusterImpl {
|
|||
DataClusterEntry updatedEntry = ctx.dataClusterMetadata.get().entry;
|
||||
updatedEntry.clusterState = DataClusterState::READY;
|
||||
|
||||
updateClusterMetadata(tr, ctx.clusterName.get(), ctx.dataClusterMetadata.get(), {}, updatedEntry);
|
||||
updateClusterMetadata(tr, clusterName, ctx.dataClusterMetadata.get(), {}, updatedEntry);
|
||||
|
||||
// Add this cluster back to the cluster capacity index so that it can be assigned to again.
|
||||
DataClusterEntry noCapacityEntry = updatedEntry;
|
||||
noCapacityEntry.capacity.numTenantGroups = 0;
|
||||
updateClusterCapacityIndex(tr, ctx.clusterName.get(), noCapacityEntry, updatedEntry);
|
||||
updateClusterCapacityIndex(tr, clusterName, noCapacityEntry, updatedEntry);
|
||||
}
|
||||
|
||||
TraceEvent("MarkedDataClusterReady")
|
||||
.detail("Name", ctx.clusterName.get())
|
||||
.detail("Version", tr->getCommittedVersion());
|
||||
TraceEvent("MarkedDataClusterReady").detail("Name", clusterName).detail("Version", tr->getCommittedVersion());
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> markManagementTenantsAsError(RestoreClusterImpl* self,
|
||||
|
@ -1479,7 +1517,7 @@ struct RestoreClusterImpl {
|
|||
|
||||
for (auto const& t : tenants.results) {
|
||||
self->mgmtClusterTenantMap.emplace(t.first, t.second);
|
||||
if (t.second.assignedCluster.present() && self->ctx.clusterName.get() == t.second.assignedCluster.get()) {
|
||||
if (t.second.assignedCluster.present() && self->clusterName == t.second.assignedCluster.get()) {
|
||||
self->mgmtClusterTenantSetForCurrentDataCluster.emplace(t.first);
|
||||
}
|
||||
}
|
||||
|
@ -1582,10 +1620,24 @@ struct RestoreClusterImpl {
|
|||
|
||||
// A data cluster tenant is not present on the management cluster
|
||||
if (managementEntry == self->mgmtClusterTenantMap.end() ||
|
||||
managementEntry->second.assignedCluster.get() != self->ctx.clusterName.get()) {
|
||||
wait(self->ctx.runDataClusterTransaction([tenantEntry = tenantEntry](Reference<ITransaction> tr) {
|
||||
return TenantAPI::deleteTenantTransaction(tr, tenantEntry.id, ClusterType::METACLUSTER_DATA);
|
||||
}));
|
||||
managementEntry->second.assignedCluster.get() != self->clusterName) {
|
||||
if (self->restoreDryRun) {
|
||||
if (managementEntry == self->mgmtClusterTenantMap.end()) {
|
||||
self->messages.push_back(fmt::format("Delete missing tenant `{}' with ID {} on data cluster",
|
||||
printable(tenantEntry.tenantName),
|
||||
tenantEntry.id));
|
||||
} else {
|
||||
self->messages.push_back(fmt::format(
|
||||
"Delete tenant `{}' with ID {} on data cluster because it is now located on the cluster `{}'",
|
||||
printable(tenantEntry.tenantName),
|
||||
tenantEntry.id,
|
||||
printable(managementEntry->second.assignedCluster)));
|
||||
}
|
||||
} else {
|
||||
wait(self->ctx.runDataClusterTransaction([tenantEntry = tenantEntry](Reference<ITransaction> tr) {
|
||||
return TenantAPI::deleteTenantTransaction(tr, tenantEntry.id, ClusterType::METACLUSTER_DATA);
|
||||
}));
|
||||
}
|
||||
|
||||
return Optional<std::pair<TenantName, TenantMapEntry>>();
|
||||
} else {
|
||||
|
@ -1593,38 +1645,66 @@ struct RestoreClusterImpl {
|
|||
state TenantMapEntry managementTenant = managementEntry->second;
|
||||
|
||||
// Rename
|
||||
if (tenantName != managementTenant.tenantName) {
|
||||
state bool renamed = tenantName != managementTenant.tenantName;
|
||||
if (renamed) {
|
||||
state TenantName temporaryName;
|
||||
if (self->dataClusterTenantNames.count(managementTenant.tenantName) > 0) {
|
||||
state bool usingTemporaryName = self->dataClusterTenantNames.count(managementTenant.tenantName) > 0;
|
||||
if (usingTemporaryName) {
|
||||
temporaryName = metaclusterTemporaryRenamePrefix.withSuffix(managementTenant.tenantName);
|
||||
} else {
|
||||
temporaryName = managementTenant.tenantName;
|
||||
}
|
||||
|
||||
wait(self->ctx.runDataClusterTransaction([self = self,
|
||||
tenantName = tenantName,
|
||||
temporaryName = temporaryName,
|
||||
tenantEntry = tenantEntry,
|
||||
managementTenant =
|
||||
managementTenant](Reference<ITransaction> tr) {
|
||||
return renameTenant(
|
||||
self, tr, tenantEntry.id, tenantName, temporaryName, managementTenant.configurationSequenceNum);
|
||||
}));
|
||||
if (self->restoreDryRun) {
|
||||
self->messages.push_back(fmt::format("Rename tenant `{}' with ID {} to `{}' on data cluster{}",
|
||||
printable(tenantEntry.tenantName),
|
||||
tenantEntry.id,
|
||||
printable(managementTenant.tenantName),
|
||||
usingTemporaryName ? " via temporary name" : ""));
|
||||
} else {
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self,
|
||||
tenantName = tenantName,
|
||||
temporaryName = temporaryName,
|
||||
tenantEntry = tenantEntry,
|
||||
managementTenant = managementTenant](Reference<ITransaction> tr) {
|
||||
return renameTenant(self,
|
||||
tr,
|
||||
tenantEntry.id,
|
||||
tenantName,
|
||||
temporaryName,
|
||||
managementTenant.configurationSequenceNum);
|
||||
}));
|
||||
// SOMEDAY: we could mark the tenant in the management cluster as READY if it is in the RENAMING
|
||||
// state
|
||||
}
|
||||
tenantName = temporaryName;
|
||||
// SOMEDAY: we could mark the tenant in the management cluster as READY if it is in the RENAMING
|
||||
// state
|
||||
}
|
||||
|
||||
// Update configuration
|
||||
if (!managementTenant.matchesConfiguration(tenantEntry) ||
|
||||
bool configurationChanged = !managementTenant.matchesConfiguration(tenantEntry);
|
||||
if (configurationChanged ||
|
||||
managementTenant.configurationSequenceNum != tenantEntry.configurationSequenceNum) {
|
||||
ASSERT(managementTenant.configurationSequenceNum >= tenantEntry.configurationSequenceNum);
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self, managementTenant = managementTenant](Reference<ITransaction> tr) {
|
||||
return updateTenantConfiguration(self, tr, managementTenant.id, managementTenant);
|
||||
}));
|
||||
// SOMEDAY: we could mark the tenant in the management cluster as READY if it is in the
|
||||
// UPDATING_CONFIGURATION state
|
||||
if (self->restoreDryRun) {
|
||||
// If this is an update to the internal sequence number only and we are also renaming the tenant,
|
||||
// we don't need to report anything. The internal metadata update is (at least partially) caused
|
||||
// by the rename in that case
|
||||
if (configurationChanged || !renamed) {
|
||||
self->messages.push_back(
|
||||
fmt::format("Update tenant configuration for tenant `{}' with ID {} on data cluster{}",
|
||||
printable(tenantEntry.tenantName),
|
||||
tenantEntry.id,
|
||||
configurationChanged ? "" : " (internal metadata only)"));
|
||||
}
|
||||
} else {
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self, managementTenant = managementTenant](Reference<ITransaction> tr) {
|
||||
return updateTenantConfiguration(self, tr, managementTenant.id, managementTenant);
|
||||
}));
|
||||
// SOMEDAY: we could mark the tenant in the management cluster as READY if it is in the
|
||||
// UPDATING_CONFIGURATION state
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_pair(tenantName, managementTenant);
|
||||
|
@ -1642,22 +1722,79 @@ struct RestoreClusterImpl {
|
|||
++itr;
|
||||
}
|
||||
|
||||
state std::unordered_map<TenantName, TenantMapEntry>::iterator renameItr = partiallyRenamedTenants.begin();
|
||||
while (renameItr != partiallyRenamedTenants.end()) {
|
||||
wait(self->ctx.runDataClusterTransaction([self = self, renameItr = renameItr](Reference<ITransaction> tr) {
|
||||
return renameTenant(self,
|
||||
tr,
|
||||
renameItr->second.id,
|
||||
renameItr->first,
|
||||
renameItr->first.removePrefix(metaclusterTemporaryRenamePrefix),
|
||||
renameItr->second.configurationSequenceNum);
|
||||
}));
|
||||
++renameItr;
|
||||
if (!self->restoreDryRun) {
|
||||
state std::unordered_map<TenantName, TenantMapEntry>::iterator renameItr = partiallyRenamedTenants.begin();
|
||||
while (renameItr != partiallyRenamedTenants.end()) {
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self, renameItr = renameItr](Reference<ITransaction> tr) {
|
||||
return renameTenant(self,
|
||||
tr,
|
||||
renameItr->second.id,
|
||||
renameItr->first,
|
||||
renameItr->first.removePrefix(metaclusterTemporaryRenamePrefix),
|
||||
renameItr->second.configurationSequenceNum);
|
||||
}));
|
||||
++renameItr;
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> processMissingTenants(RestoreClusterImpl* self) {
|
||||
state std::unordered_set<int64_t>::iterator setItr = self->mgmtClusterTenantSetForCurrentDataCluster.begin();
|
||||
state std::vector<int64_t> missingTenants;
|
||||
state int64_t missingTenantCount = 0;
|
||||
while (setItr != self->mgmtClusterTenantSetForCurrentDataCluster.end()) {
|
||||
int64_t tenantId = *setItr;
|
||||
TenantMapEntry const& managementTenant = self->mgmtClusterTenantMap[tenantId];
|
||||
|
||||
// If a tenant is present on the management cluster and not on the data cluster, mark it in an error
|
||||
// state unless it is already in certain states (e.g. REGISTERING, REMOVING) that allow the tenant to be
|
||||
// missing on the data cluster
|
||||
//
|
||||
// SOMEDAY: this could optionally complete the partial operations (e.g. finish creating or removing the
|
||||
// tenant)
|
||||
if (self->dataClusterTenantMap.find(tenantId) == self->dataClusterTenantMap.end() &&
|
||||
managementTenant.tenantState != TenantState::REGISTERING &&
|
||||
managementTenant.tenantState != TenantState::REMOVING &&
|
||||
managementTenant.tenantState != TenantState::ERROR) {
|
||||
if (self->restoreDryRun) {
|
||||
self->messages.push_back(fmt::format("The tenant `{}' with ID {} is missing on the data cluster",
|
||||
printable(managementTenant.tenantName),
|
||||
tenantId));
|
||||
} else {
|
||||
missingTenants.push_back(tenantId);
|
||||
++missingTenantCount;
|
||||
if (missingTenants.size() == CLIENT_KNOBS->METACLUSTER_RESTORE_BATCH_SIZE) {
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
[self = self, missingTenants = missingTenants](Reference<typename DB::TransactionT> tr) {
|
||||
return markManagementTenantsAsError(self, tr, missingTenants);
|
||||
}));
|
||||
missingTenants.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
++setItr;
|
||||
}
|
||||
|
||||
if (!self->restoreDryRun && missingTenants.size() > 0) {
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
[self = self, missingTenants = missingTenants](Reference<typename DB::TransactionT> tr) {
|
||||
return markManagementTenantsAsError(self, tr, missingTenants);
|
||||
}));
|
||||
}
|
||||
|
||||
// This is a best effort attempt to communicate the number of missing tenants. If a restore needs to be run
|
||||
// twice and is interrupted in the middle of the first attempt to process missing tenants, we may not report
|
||||
// a full count.
|
||||
if (missingTenantCount > 0) {
|
||||
self->messages.push_back(fmt::format(
|
||||
"The metacluster has {} tenants that are missing in the restored data cluster", missingTenantCount));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Returns true if the group needs to be created
|
||||
ACTOR static Future<bool> addTenantToManagementCluster(RestoreClusterImpl* self,
|
||||
Reference<ITransaction> tr,
|
||||
|
@ -1670,7 +1807,7 @@ struct RestoreClusterImpl {
|
|||
|
||||
Optional<TenantMapEntry> existingEntry = wait(tryGetTenantTransaction(tr, tenantEntry.tenantName));
|
||||
if (existingEntry.present()) {
|
||||
if (existingEntry.get().assignedCluster == self->ctx.clusterName) {
|
||||
if (existingEntry.get().assignedCluster == self->clusterName) {
|
||||
ASSERT(existingEntry.get().matchesConfiguration(tenantEntry));
|
||||
// This is a retry, so return success
|
||||
return false;
|
||||
|
@ -1682,22 +1819,24 @@ struct RestoreClusterImpl {
|
|||
}
|
||||
}
|
||||
|
||||
tenantEntry.tenantState = TenantState::READY;
|
||||
tenantEntry.assignedCluster = self->ctx.clusterName;
|
||||
ManagementClusterMetadata::tenantMetadata().tenantMap.set(tr, tenantEntry.id, tenantEntry);
|
||||
ManagementClusterMetadata::tenantMetadata().tenantNameIndex.set(tr, tenantEntry.tenantName, tenantEntry.id);
|
||||
if (!self->restoreDryRun) {
|
||||
tenantEntry.tenantState = TenantState::READY;
|
||||
tenantEntry.assignedCluster = self->clusterName;
|
||||
ManagementClusterMetadata::tenantMetadata().tenantMap.set(tr, tenantEntry.id, tenantEntry);
|
||||
ManagementClusterMetadata::tenantMetadata().tenantNameIndex.set(tr, tenantEntry.tenantName, tenantEntry.id);
|
||||
|
||||
ManagementClusterMetadata::tenantMetadata().tenantCount.atomicOp(tr, 1, MutationRef::AddValue);
|
||||
ManagementClusterMetadata::clusterTenantCount.atomicOp(
|
||||
tr, tenantEntry.assignedCluster.get(), 1, MutationRef::AddValue);
|
||||
ManagementClusterMetadata::tenantMetadata().tenantCount.atomicOp(tr, 1, MutationRef::AddValue);
|
||||
ManagementClusterMetadata::clusterTenantCount.atomicOp(
|
||||
tr, tenantEntry.assignedCluster.get(), 1, MutationRef::AddValue);
|
||||
|
||||
// Updated indexes to include the new tenant
|
||||
ManagementClusterMetadata::clusterTenantIndex.insert(
|
||||
tr, Tuple::makeTuple(tenantEntry.assignedCluster.get(), tenantEntry.tenantName, tenantEntry.id));
|
||||
// Updated indexes to include the new tenant
|
||||
ManagementClusterMetadata::clusterTenantIndex.insert(
|
||||
tr, Tuple::makeTuple(tenantEntry.assignedCluster.get(), tenantEntry.tenantName, tenantEntry.id));
|
||||
}
|
||||
|
||||
wait(success(tenantGroupEntry));
|
||||
|
||||
if (tenantGroupEntry.get().present() && tenantGroupEntry.get().get().assignedCluster != self->ctx.clusterName) {
|
||||
if (tenantGroupEntry.get().present() && tenantGroupEntry.get().get().assignedCluster != self->clusterName) {
|
||||
self->messages.push_back(
|
||||
fmt::format("The tenant `{}' is part of a tenant group `{}' that already exists on cluster `{}'",
|
||||
printable(tenantEntry.tenantName),
|
||||
|
@ -1706,11 +1845,13 @@ struct RestoreClusterImpl {
|
|||
throw invalid_tenant_configuration();
|
||||
}
|
||||
|
||||
managementClusterAddTenantToGroup(tr,
|
||||
tenantEntry,
|
||||
&self->ctx.dataClusterMetadata.get(),
|
||||
GroupAlreadyExists(tenantGroupEntry.get().present()),
|
||||
IsRestoring::True);
|
||||
if (!self->restoreDryRun) {
|
||||
managementClusterAddTenantToGroup(tr,
|
||||
tenantEntry,
|
||||
&self->ctx.dataClusterMetadata.get(),
|
||||
GroupAlreadyExists(tenantGroupEntry.get().present()),
|
||||
IsRestoring::True);
|
||||
}
|
||||
|
||||
return !tenantGroupEntry.get().present();
|
||||
}
|
||||
|
@ -1746,39 +1887,43 @@ struct RestoreClusterImpl {
|
|||
|
||||
numGroupsCreated += groupsCreated.size();
|
||||
|
||||
if (numGroupsCreated > 0) {
|
||||
state DataClusterMetadata clusterMetadata = wait(getClusterTransaction(tr, self->ctx.clusterName.get()));
|
||||
if (!self->restoreDryRun) {
|
||||
if (numGroupsCreated > 0) {
|
||||
state DataClusterMetadata clusterMetadata = wait(getClusterTransaction(tr, self->clusterName));
|
||||
|
||||
DataClusterEntry updatedEntry = clusterMetadata.entry;
|
||||
updatedEntry.allocated.numTenantGroups += numGroupsCreated;
|
||||
updateClusterMetadata(tr,
|
||||
self->ctx.clusterName.get(),
|
||||
clusterMetadata,
|
||||
Optional<ClusterConnectionString>(),
|
||||
updatedEntry,
|
||||
IsRestoring::True);
|
||||
DataClusterEntry updatedEntry = clusterMetadata.entry;
|
||||
updatedEntry.allocated.numTenantGroups += numGroupsCreated;
|
||||
updateClusterMetadata(tr,
|
||||
self->clusterName,
|
||||
clusterMetadata,
|
||||
Optional<ClusterConnectionString>(),
|
||||
updatedEntry,
|
||||
IsRestoring::True);
|
||||
}
|
||||
|
||||
int64_t lastTenantId =
|
||||
wait(ManagementClusterMetadata::tenantMetadata().lastTenantId.getD(tr, Snapshot::False, 0));
|
||||
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantId.set(tr, std::max(lastTenantId, maxId));
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantModification.setVersionstamp(tr, Versionstamp(), 0);
|
||||
}
|
||||
|
||||
int64_t lastTenantId =
|
||||
wait(ManagementClusterMetadata::tenantMetadata().lastTenantId.getD(tr, Snapshot::False, 0));
|
||||
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantId.set(tr, std::max(lastTenantId, maxId));
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantModification.setVersionstamp(tr, Versionstamp(), 0);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> addTenantsToManagementCluster(RestoreClusterImpl* self) {
|
||||
state std::unordered_map<int64_t, TenantMapEntry>::iterator itr;
|
||||
state std::vector<TenantMapEntry> tenantBatch;
|
||||
state int64_t tenantsToAdd = 0;
|
||||
|
||||
for (itr = self->dataClusterTenantMap.begin(); itr != self->dataClusterTenantMap.end(); ++itr) {
|
||||
state std::unordered_map<int64_t, TenantMapEntry>::iterator managementEntry =
|
||||
self->mgmtClusterTenantMap.find(itr->second.id);
|
||||
if (managementEntry == self->mgmtClusterTenantMap.end()) {
|
||||
++tenantsToAdd;
|
||||
tenantBatch.push_back(itr->second);
|
||||
} else if (managementEntry->second.tenantName != itr->second.tenantName ||
|
||||
managementEntry->second.assignedCluster.get() != self->ctx.clusterName.get() ||
|
||||
managementEntry->second.assignedCluster.get() != self->clusterName ||
|
||||
!managementEntry->second.matchesConfiguration(itr->second)) {
|
||||
self->messages.push_back(
|
||||
fmt::format("The tenant `{}' has the same ID {} as an existing tenant `{}' on cluster `{}'",
|
||||
|
@ -1807,54 +1952,13 @@ struct RestoreClusterImpl {
|
|||
}));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> processMissingTenants(RestoreClusterImpl* self) {
|
||||
state std::unordered_set<int64_t>::iterator setItr = self->mgmtClusterTenantSetForCurrentDataCluster.begin();
|
||||
state std::vector<int64_t> missingTenants;
|
||||
state int64_t missingTenantCount = 0;
|
||||
while (setItr != self->mgmtClusterTenantSetForCurrentDataCluster.end()) {
|
||||
int64_t tenantId = *setItr;
|
||||
TenantMapEntry const& managementTenant = self->mgmtClusterTenantMap[tenantId];
|
||||
|
||||
// If a tenant is present on the management cluster and not on the data cluster, mark it in an error
|
||||
// state unless it is already in certain states (e.g. REGISTERING, REMOVING) that allow the tenant to be
|
||||
// missing on the data cluster
|
||||
//
|
||||
// SOMEDAY: this could optionally complete the partial operations (e.g. finish creating or removing the
|
||||
// tenant)
|
||||
if (self->dataClusterTenantMap.find(tenantId) == self->dataClusterTenantMap.end() &&
|
||||
managementTenant.tenantState != TenantState::REGISTERING &&
|
||||
managementTenant.tenantState != TenantState::REMOVING &&
|
||||
managementTenant.tenantState != TenantState::ERROR) {
|
||||
missingTenants.push_back(tenantId);
|
||||
++missingTenantCount;
|
||||
if (missingTenants.size() == CLIENT_KNOBS->METACLUSTER_RESTORE_BATCH_SIZE) {
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
[self = self, missingTenants = missingTenants](Reference<typename DB::TransactionT> tr) {
|
||||
return markManagementTenantsAsError(self, tr, missingTenants);
|
||||
}));
|
||||
missingTenants.clear();
|
||||
}
|
||||
}
|
||||
++setItr;
|
||||
if (self->restoreDryRun) {
|
||||
self->messages.push_back(
|
||||
fmt::format("Restore will add {} tenant(s) to the management cluster from the data cluster `{}'",
|
||||
tenantsToAdd,
|
||||
printable(self->clusterName)));
|
||||
}
|
||||
|
||||
if (missingTenants.size() > 0) {
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
[self = self, missingTenants = missingTenants](Reference<typename DB::TransactionT> tr) {
|
||||
return markManagementTenantsAsError(self, tr, missingTenants);
|
||||
}));
|
||||
}
|
||||
|
||||
// This is a best effort attempt to communicate the number of missing tenants. If a restore needs to be run
|
||||
// twice and is interrupted in the middle of the first attempt to process missing tenants, we may not report
|
||||
// a full count.
|
||||
if (missingTenantCount > 0) {
|
||||
self->messages.push_back(fmt::format(
|
||||
"The metacluster has {} tenants that are missing in the restored data cluster.", missingTenantCount));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1869,16 +1973,18 @@ struct RestoreClusterImpl {
|
|||
wait(loadDataClusterRegistration(self));
|
||||
|
||||
// set state to restoring
|
||||
try {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterRestoring(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
} catch (Error& e) {
|
||||
// If the transaction retries after success or if we are trying a second time to restore the cluster, it
|
||||
// will throw an error indicating that the restore has already started
|
||||
if (e.code() != error_code_cluster_restoring) {
|
||||
throw;
|
||||
if (!self->restoreDryRun) {
|
||||
try {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterRestoring(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
} catch (Error& e) {
|
||||
// If the transaction retries after success or if we are trying a second time to restore the cluster, it
|
||||
// will throw an error indicating that the restore has already started
|
||||
if (e.code() != error_code_cluster_restoring) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1887,7 +1993,9 @@ struct RestoreClusterImpl {
|
|||
|
||||
// get all the tenant information from the newly registered data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return getTenantsFromDataCluster(self, tr); }));
|
||||
[self = self](Reference<ITransaction> tr) { return getTenantsFromDataCluster(self, tr); },
|
||||
RunOnDisconnectedCluster::False,
|
||||
RunOnMismatchedCluster(self->restoreDryRun && self->forceJoinNewMetacluster)));
|
||||
|
||||
// Fix any differences between the data cluster and the management cluster
|
||||
wait(reconcileTenants(self));
|
||||
|
@ -1896,10 +2004,12 @@ struct RestoreClusterImpl {
|
|||
wait(processMissingTenants(self));
|
||||
|
||||
// set restored cluster to ready state
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterAsReady(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
if (!self->restoreDryRun) {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterAsReady(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -1912,31 +2022,41 @@ struct RestoreClusterImpl {
|
|||
DataClusterEntry entry;
|
||||
entry.id = self->dataClusterId;
|
||||
entry.clusterState = DataClusterState::RESTORING;
|
||||
return registerInManagementCluster(tr, self->clusterName, entry, self->connectionString);
|
||||
return registerInManagementCluster(
|
||||
tr, self->clusterName, entry, self->connectionString, self->restoreDryRun);
|
||||
}));
|
||||
|
||||
// Write a metacluster registration entry in the data cluster
|
||||
wait(writeDataClusterRegistration(self));
|
||||
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
return self->ctx.setCluster(tr, self->clusterName);
|
||||
}));
|
||||
if (!self->restoreDryRun) {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
return self->ctx.setCluster(tr, self->clusterName);
|
||||
}));
|
||||
}
|
||||
|
||||
// get all the tenants in the metacluster
|
||||
wait(getAllTenantsFromManagementCluster(self));
|
||||
|
||||
if (self->restoreDryRun) {
|
||||
wait(store(self->ctx.dataClusterDb, openDatabase(self->connectionString)));
|
||||
}
|
||||
|
||||
// get all the tenant information from the newly registered data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return getTenantsFromDataCluster(self, tr); }));
|
||||
[self = self](Reference<ITransaction> tr) { return getTenantsFromDataCluster(self, tr); },
|
||||
RunOnDisconnectedCluster(self->restoreDryRun)));
|
||||
|
||||
// Add all tenants from the data cluster to the management cluster
|
||||
wait(addTenantsToManagementCluster(self));
|
||||
|
||||
// set restored cluster to ready state
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterAsReady(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
if (!self->restoreDryRun) {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterAsReady(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -1955,8 +2075,11 @@ Future<Void> restoreCluster(Reference<DB> db,
|
|||
ClusterName name,
|
||||
ClusterConnectionString connectionString,
|
||||
ApplyManagementClusterUpdates applyManagementClusterUpdates,
|
||||
RestoreDryRun restoreDryRun,
|
||||
ForceJoinNewMetacluster forceJoinNewMetacluster,
|
||||
std::vector<std::string>* messages) {
|
||||
state RestoreClusterImpl<DB> impl(db, name, connectionString, applyManagementClusterUpdates, *messages);
|
||||
state RestoreClusterImpl<DB> impl(
|
||||
db, name, connectionString, applyManagementClusterUpdates, restoreDryRun, forceJoinNewMetacluster, *messages);
|
||||
wait(impl.run());
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -275,6 +275,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
|||
ACTOR static Future<Void> restoreCluster(MetaclusterManagementWorkload* self) {
|
||||
state ClusterName clusterName = self->chooseClusterName();
|
||||
state DataClusterData* dataDb = &self->dataDbs[clusterName];
|
||||
state bool dryRun = deterministicRandom()->coinflip();
|
||||
state bool forceJoin = deterministicRandom()->coinflip();
|
||||
|
||||
state std::vector<std::string> messages;
|
||||
try {
|
||||
|
@ -284,6 +286,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
|||
clusterName,
|
||||
dataDb->db->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun(dryRun),
|
||||
ForceJoinNewMetacluster(forceJoin),
|
||||
&messages);
|
||||
Optional<Void> result = wait(timeout(restoreFuture, deterministicRandom()->randomInt(1, 30)));
|
||||
if (result.present()) {
|
||||
|
@ -292,7 +296,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ASSERT(dataDb->registered);
|
||||
dataDb->detached = false;
|
||||
if (!dryRun) {
|
||||
dataDb->detached = false;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_cluster_not_found) {
|
||||
ASSERT(!dataDb->registered);
|
||||
|
|
|
@ -238,6 +238,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
Database dataDb,
|
||||
std::string backupUrl,
|
||||
bool addToMetacluster,
|
||||
ForceJoinNewMetacluster forceJoinNewMetacluster,
|
||||
MetaclusterRestoreWorkload* self) {
|
||||
state FileBackupAgent backupAgent;
|
||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||
|
@ -259,10 +260,27 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
state std::vector<std::string> messages;
|
||||
if (addToMetacluster) {
|
||||
TraceEvent("MetaclusterRestoreWorkloadAddClusterToMetacluster").detail("ClusterName", clusterName);
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
TraceEvent("MetaclusterRestoreWorkloadAddClusterToMetaclusterDryRun")
|
||||
.detail("ClusterName", clusterName);
|
||||
wait(MetaclusterAPI::restoreCluster(self->managementDb,
|
||||
clusterName,
|
||||
dataDb->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun::True,
|
||||
forceJoinNewMetacluster,
|
||||
&messages));
|
||||
TraceEvent("MetaclusterRestoreWorkloadAddClusterToMetaclusterDryRunDone")
|
||||
.detail("ClusterName", clusterName);
|
||||
messages.clear();
|
||||
}
|
||||
|
||||
wait(MetaclusterAPI::restoreCluster(self->managementDb,
|
||||
clusterName,
|
||||
dataDb->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun::False,
|
||||
forceJoinNewMetacluster,
|
||||
&messages));
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoreComplete").detail("ClusterName", clusterName);
|
||||
}
|
||||
|
@ -497,11 +515,34 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
.detail("FromCluster", clusterItr->first)
|
||||
.detail("TenantCollisions", collisions.first.size());
|
||||
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoreManagementClusterDryRun")
|
||||
.detail("FromCluster", clusterItr->first)
|
||||
.detail("TenantCollisions", collisions.first.size());
|
||||
|
||||
wait(MetaclusterAPI::restoreCluster(
|
||||
self->managementDb,
|
||||
clusterItr->first,
|
||||
clusterItr->second.db->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun::True,
|
||||
ForceJoinNewMetacluster(deterministicRandom()->coinflip()),
|
||||
&messages));
|
||||
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoreManagementClusterDryRunDone")
|
||||
.detail("FromCluster", clusterItr->first)
|
||||
.detail("TenantCollisions", collisions.first.size());
|
||||
|
||||
messages.clear();
|
||||
}
|
||||
|
||||
wait(MetaclusterAPI::restoreCluster(
|
||||
self->managementDb,
|
||||
clusterItr->first,
|
||||
clusterItr->second.db->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun::False,
|
||||
ForceJoinNewMetacluster(deterministicRandom()->coinflip()),
|
||||
&messages));
|
||||
|
||||
ASSERT(collisions.first.empty() && collisions.second.empty());
|
||||
|
@ -842,14 +883,31 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
|
||||
std::vector<Future<Void>> restores;
|
||||
for (auto [cluster, backupUrl] : backups) {
|
||||
restores.push_back(restoreDataCluster(
|
||||
cluster, self->dataDbs[cluster].db, backupUrl.get(), !self->recoverManagementCluster, self));
|
||||
restores.push_back(restoreDataCluster(cluster,
|
||||
self->dataDbs[cluster].db,
|
||||
backupUrl.get(),
|
||||
!self->recoverManagementCluster,
|
||||
ForceJoinNewMetacluster(deterministicRandom()->coinflip()),
|
||||
self));
|
||||
}
|
||||
|
||||
wait(waitForAll(restores));
|
||||
|
||||
if (self->recoverManagementCluster) {
|
||||
wait(restoreManagementCluster(self));
|
||||
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
std::vector<Future<Void>> secondRestores;
|
||||
for (auto [cluster, backupUrl] : backups) {
|
||||
secondRestores.push_back(restoreDataCluster(cluster,
|
||||
self->dataDbs[cluster].db,
|
||||
backupUrl.get(),
|
||||
true,
|
||||
ForceJoinNewMetacluster::True,
|
||||
self));
|
||||
}
|
||||
wait(waitForAll(secondRestores));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -270,6 +270,7 @@ ERROR( tenant_creation_permanently_failed, 2168, "The tenant creation did not co
|
|||
ERROR( cluster_removed, 2169, "The cluster is being removed from the metacluster" )
|
||||
ERROR( cluster_restoring, 2170, "The cluster is being restored to the metacluster" )
|
||||
ERROR( invalid_data_cluster, 2171, "The data cluster being restored has no record of its metacluster" )
|
||||
ERROR( metacluster_mismatch, 2172, "The cluster does not have the expected name or is associated with a different metacluster" )
|
||||
|
||||
// 2200 - errors from bindings and official APIs
|
||||
ERROR( api_version_unset, 2200, "API version is not set" )
|
||||
|
|
Loading…
Reference in New Issue