Prevent the same cluster from being restored concurrently
This commit is contained in:
parent
a3af2b7663
commit
f2ff385018
|
@ -99,3 +99,8 @@ KeyBackedSet<UID>& MetaclusterMetadata::registrationTombstones() {
|
|||
static KeyBackedSet<UID> instance("\xff/metacluster/registrationTombstones"_sr);
|
||||
return instance;
|
||||
}
|
||||
|
||||
KeyBackedMap<ClusterName, UID>& MetaclusterMetadata::activeRestoreIds() {
|
||||
static KeyBackedMap<ClusterName, UID> instance("\xff/metacluster/activeRestoreIds"_sr);
|
||||
return instance;
|
||||
}
|
|
@ -188,6 +188,7 @@ struct MetaclusterMetadata {
|
|||
// Registration information for a metacluster, stored on both management and data clusters
|
||||
static KeyBackedObjectProperty<MetaclusterRegistrationEntry, decltype(IncludeVersion())>& metaclusterRegistration();
|
||||
static KeyBackedSet<UID>& registrationTombstones();
|
||||
static KeyBackedMap<ClusterName, UID>& activeRestoreIds();
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -341,11 +341,11 @@ struct MetaclusterOperationContext {
|
|||
// already been run on the management cluster to populate the needed metadata. This verifies that the data
|
||||
// cluster has the expected ID and is part of the metacluster that previous transactions have run on.
|
||||
ACTOR template <class Function>
|
||||
static Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())>
|
||||
runDataClusterTransaction(MetaclusterOperationContext* self,
|
||||
Function func,
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster,
|
||||
RunOnMismatchedCluster runOnMismatchedCluster) {
|
||||
static Future<decltype(std::declval<Function>()(Reference<ITransaction>()).getValue())> runDataClusterTransaction(
|
||||
MetaclusterOperationContext* self,
|
||||
Function func,
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster,
|
||||
RunOnMismatchedCluster runOnMismatchedCluster) {
|
||||
ASSERT(self->dataClusterDb);
|
||||
ASSERT(runOnDisconnectedCluster || self->dataClusterMetadata.present());
|
||||
ASSERT(self->metaclusterRegistration.present() &&
|
||||
|
@ -388,10 +388,10 @@ struct MetaclusterOperationContext {
|
|||
}
|
||||
|
||||
template <class Function>
|
||||
Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())>
|
||||
runDataClusterTransaction(Function func,
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster = RunOnDisconnectedCluster::False,
|
||||
RunOnMismatchedCluster runOnMismatchedCluster = RunOnMismatchedCluster::False) {
|
||||
Future<decltype(std::declval<Function>()(Reference<ITransaction>()).getValue())> runDataClusterTransaction(
|
||||
Function func,
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster = RunOnDisconnectedCluster::False,
|
||||
RunOnMismatchedCluster runOnMismatchedCluster = RunOnMismatchedCluster::False) {
|
||||
return runDataClusterTransaction(this, func, runOnDisconnectedCluster, runOnMismatchedCluster);
|
||||
}
|
||||
|
||||
|
@ -1333,6 +1333,9 @@ struct RestoreClusterImpl {
|
|||
ForceJoinNewMetacluster forceJoinNewMetacluster;
|
||||
std::vector<std::string>& messages;
|
||||
|
||||
// Unique ID generated for this restore. Used to avoid concurrent restores
|
||||
UID restoreId;
|
||||
|
||||
// Loaded from the data cluster
|
||||
UID dataClusterId;
|
||||
|
||||
|
@ -1353,6 +1356,52 @@ struct RestoreClusterImpl {
|
|||
connectionString(connectionString), applyManagementClusterUpdates(applyManagementClusterUpdates),
|
||||
restoreDryRun(restoreDryRun), forceJoinNewMetacluster(forceJoinNewMetacluster), messages(messages) {}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
static Future<Void> checkRestoreId(RestoreClusterImpl* self, Transaction tr) {
|
||||
if (!self->restoreDryRun) {
|
||||
Optional<UID> activeRestoreId = wait(MetaclusterMetadata::activeRestoreIds().get(tr, self->clusterName));
|
||||
if (!activeRestoreId.present() || activeRestoreId.get() != self->restoreId) {
|
||||
throw conflicting_restore();
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
static Future<Void> eraseRestoreId(RestoreClusterImpl* self, Transaction tr) {
|
||||
Optional<UID> transactionId = wait(MetaclusterMetadata::activeRestoreIds().get(tr, self->clusterName));
|
||||
if (!transactionId.present()) {
|
||||
return Void();
|
||||
} else if (transactionId.get() != self->restoreId) {
|
||||
throw conflicting_restore();
|
||||
} else {
|
||||
MetaclusterMetadata::activeRestoreIds().erase(tr, self->clusterName);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
template <class Function>
|
||||
Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())>
|
||||
runRestoreManagementTransaction(Function func) {
|
||||
return ctx.runManagementTransaction([this, func](Reference<typename DB::TransactionT> tr) {
|
||||
return joinWith(func(tr), checkRestoreId(this, tr));
|
||||
});
|
||||
}
|
||||
|
||||
template <class Function>
|
||||
Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())>
|
||||
runRestoreDataClusterTransaction(
|
||||
Function func,
|
||||
RunOnDisconnectedCluster runOnDisconnectedCluster = RunOnDisconnectedCluster::False,
|
||||
RunOnMismatchedCluster runOnMismatchedCluster = RunOnMismatchedCluster::False) {
|
||||
return ctx.runDataClusterTransaction(
|
||||
[this, func](Reference<ITransaction> tr) { return joinWith(func(tr), checkRestoreId(this, tr)); },
|
||||
runOnDisconnectedCluster,
|
||||
runOnMismatchedCluster);
|
||||
}
|
||||
|
||||
// If restoring a data cluster, verify that it has a matching registration entry
|
||||
ACTOR static Future<Void> loadDataClusterRegistration(RestoreClusterImpl* self) {
|
||||
state Reference<IDatabase> db = wait(openDatabase(self->connectionString));
|
||||
|
@ -1364,6 +1413,10 @@ struct RestoreClusterImpl {
|
|||
state Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
|
||||
wait(MetaclusterMetadata::metaclusterRegistration().get(tr));
|
||||
|
||||
if (!self->restoreDryRun) {
|
||||
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
|
||||
}
|
||||
|
||||
if (!metaclusterRegistration.present()) {
|
||||
throw invalid_data_cluster();
|
||||
} else if (!metaclusterRegistration.get().matches(self->ctx.metaclusterRegistration.get())) {
|
||||
|
@ -1375,7 +1428,6 @@ struct RestoreClusterImpl {
|
|||
} else if (!self->restoreDryRun) {
|
||||
ASSERT(self->ctx.metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA);
|
||||
MetaclusterMetadata::metaclusterRegistration().set(tr, self->ctx.metaclusterRegistration.get());
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
} else {
|
||||
self->messages.push_back(fmt::format("Move data cluster to new metacluster\n"
|
||||
" original: {}\n"
|
||||
|
@ -1392,6 +1444,11 @@ struct RestoreClusterImpl {
|
|||
|
||||
self->dataClusterId = metaclusterRegistration.get().id;
|
||||
self->ctx.dataClusterDb = db;
|
||||
|
||||
if (!self->restoreDryRun) {
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
}
|
||||
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
|
@ -1427,6 +1484,7 @@ struct RestoreClusterImpl {
|
|||
|
||||
if (!self->restoreDryRun) {
|
||||
MetaclusterMetadata::metaclusterRegistration().set(tr, dataClusterEntry);
|
||||
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
}
|
||||
|
||||
|
@ -1445,6 +1503,7 @@ struct RestoreClusterImpl {
|
|||
updatedEntry.clusterState = DataClusterState::RESTORING;
|
||||
|
||||
updateClusterMetadata(tr, clusterName, ctx.dataClusterMetadata.get(), connectionString, updatedEntry);
|
||||
MetaclusterMetadata::activeRestoreIds().set(tr, clusterName, restoreId);
|
||||
|
||||
// Remove this cluster from the cluster capacity index, but leave its configured capacity intact in the
|
||||
// cluster entry. This allows us to retain the configured capacity while preventing the cluster from
|
||||
|
@ -1457,7 +1516,7 @@ struct RestoreClusterImpl {
|
|||
TraceEvent("MarkedDataClusterRestoring").detail("Name", clusterName);
|
||||
}
|
||||
|
||||
void markClusterAsReady(Reference<typename DB::TransactionT> tr) {
|
||||
Future<Void> markClusterAsReady(Reference<typename DB::TransactionT> tr) {
|
||||
if (ctx.dataClusterMetadata.get().entry.clusterState == DataClusterState::RESTORING) {
|
||||
DataClusterEntry updatedEntry = ctx.dataClusterMetadata.get().entry;
|
||||
updatedEntry.clusterState = DataClusterState::READY;
|
||||
|
@ -1468,9 +1527,11 @@ struct RestoreClusterImpl {
|
|||
DataClusterEntry noCapacityEntry = updatedEntry;
|
||||
noCapacityEntry.capacity.numTenantGroups = 0;
|
||||
updateClusterCapacityIndex(tr, clusterName, noCapacityEntry, updatedEntry);
|
||||
|
||||
return eraseRestoreId(this, tr);
|
||||
}
|
||||
|
||||
TraceEvent("MarkedDataClusterReady").detail("Name", clusterName).detail("Version", tr->getCommittedVersion());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> markManagementTenantsAsError(RestoreClusterImpl* self,
|
||||
|
@ -1480,6 +1541,7 @@ struct RestoreClusterImpl {
|
|||
for (auto tenantId : tenants) {
|
||||
getFutures.push_back(tryGetTenantTransaction(tr, tenantId));
|
||||
}
|
||||
|
||||
wait(waitForAll(getFutures));
|
||||
|
||||
for (auto const& f : getFutures) {
|
||||
|
@ -1530,7 +1592,7 @@ struct RestoreClusterImpl {
|
|||
state Optional<int64_t> beginTenant = 0;
|
||||
while (beginTenant.present()) {
|
||||
wait(store(beginTenant,
|
||||
self->ctx.runManagementTransaction(
|
||||
self->runRestoreManagementTransaction(
|
||||
[self = self, beginTenant = beginTenant](Reference<typename DB::TransactionT> tr) {
|
||||
return getTenantsFromManagementCluster(self, tr, beginTenant.get());
|
||||
})));
|
||||
|
@ -1634,7 +1696,7 @@ struct RestoreClusterImpl {
|
|||
printable(managementEntry->second.assignedCluster)));
|
||||
}
|
||||
} else {
|
||||
wait(self->ctx.runDataClusterTransaction([tenantEntry = tenantEntry](Reference<ITransaction> tr) {
|
||||
wait(self->runRestoreDataClusterTransaction([tenantEntry = tenantEntry](Reference<ITransaction> tr) {
|
||||
return TenantAPI::deleteTenantTransaction(tr, tenantEntry.id, ClusterType::METACLUSTER_DATA);
|
||||
}));
|
||||
}
|
||||
|
@ -1662,7 +1724,7 @@ struct RestoreClusterImpl {
|
|||
printable(managementTenant.tenantName),
|
||||
usingTemporaryName ? " via temporary name" : ""));
|
||||
} else {
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
wait(self->runRestoreDataClusterTransaction(
|
||||
[self = self,
|
||||
tenantName = tenantName,
|
||||
temporaryName = temporaryName,
|
||||
|
@ -1698,7 +1760,7 @@ struct RestoreClusterImpl {
|
|||
configurationChanged ? "" : " (internal metadata only)"));
|
||||
}
|
||||
} else {
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
wait(self->runRestoreDataClusterTransaction(
|
||||
[self = self, managementTenant = managementTenant](Reference<ITransaction> tr) {
|
||||
return updateTenantConfiguration(self, tr, managementTenant.id, managementTenant);
|
||||
}));
|
||||
|
@ -1712,7 +1774,7 @@ struct RestoreClusterImpl {
|
|||
}
|
||||
|
||||
Future<Void> renameTenantBatch(std::vector<std::pair<TenantName, TenantMapEntry>> tenantsToRename) {
|
||||
return ctx.runDataClusterTransaction([this, tenantsToRename](Reference<ITransaction> tr) {
|
||||
return runRestoreDataClusterTransaction([this, tenantsToRename](Reference<ITransaction> tr) {
|
||||
std::vector<Future<Void>> renameFutures;
|
||||
for (auto t : tenantsToRename) {
|
||||
renameFutures.push_back(renameTenant(
|
||||
|
@ -1781,8 +1843,8 @@ struct RestoreClusterImpl {
|
|||
if (managementTenant.tenantState != TenantState::ERROR) {
|
||||
missingTenants.push_back(tenantId);
|
||||
if (missingTenants.size() == CLIENT_KNOBS->METACLUSTER_RESTORE_BATCH_SIZE) {
|
||||
wait(self->ctx.runManagementTransaction([self = self, missingTenants = missingTenants](
|
||||
Reference<typename DB::TransactionT> tr) {
|
||||
wait(self->runRestoreManagementTransaction([self = self, missingTenants = missingTenants](
|
||||
Reference<typename DB::TransactionT> tr) {
|
||||
return markManagementTenantsAsError(self, tr, missingTenants);
|
||||
}));
|
||||
missingTenants.clear();
|
||||
|
@ -1794,7 +1856,7 @@ struct RestoreClusterImpl {
|
|||
}
|
||||
|
||||
if (!self->restoreDryRun && missingTenants.size() > 0) {
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
wait(self->runRestoreManagementTransaction(
|
||||
[self = self, missingTenants = missingTenants](Reference<typename DB::TransactionT> tr) {
|
||||
return markManagementTenantsAsError(self, tr, missingTenants);
|
||||
}));
|
||||
|
@ -2004,7 +2066,7 @@ struct RestoreClusterImpl {
|
|||
wait(getAllTenantsFromManagementCluster(self));
|
||||
|
||||
// get all the tenant information from the newly registered data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
wait(self->runRestoreDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return getTenantsFromDataCluster(self, tr); },
|
||||
RunOnDisconnectedCluster::False,
|
||||
RunOnMismatchedCluster(self->restoreDryRun && self->forceJoinNewMetacluster)));
|
||||
|
@ -2015,12 +2077,15 @@ struct RestoreClusterImpl {
|
|||
// Mark tenants that are missing from the data cluster in an error state on the management cluster
|
||||
wait(processMissingTenants(self));
|
||||
|
||||
// set restored cluster to ready state
|
||||
if (!self->restoreDryRun) {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterAsReady(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
// Remove the active restore ID from the data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return eraseRestoreId(self, tr); }));
|
||||
|
||||
// set restored cluster to ready state
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
[self = self](Reference<typename DB::TransactionT> tr) { return self->markClusterAsReady(tr); }));
|
||||
TraceEvent("MarkedDataClusterReady").detail("Name", self->clusterName);
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -2031,6 +2096,8 @@ struct RestoreClusterImpl {
|
|||
|
||||
// Record the data cluster in the management cluster
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
|
||||
|
||||
DataClusterEntry entry;
|
||||
entry.id = self->dataClusterId;
|
||||
entry.clusterState = DataClusterState::RESTORING;
|
||||
|
@ -2042,7 +2109,7 @@ struct RestoreClusterImpl {
|
|||
wait(writeDataClusterRegistration(self));
|
||||
|
||||
if (!self->restoreDryRun) {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
wait(self->runRestoreManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
return self->ctx.setCluster(tr, self->clusterName);
|
||||
}));
|
||||
}
|
||||
|
@ -2055,19 +2122,22 @@ struct RestoreClusterImpl {
|
|||
}
|
||||
|
||||
// get all the tenant information from the newly registered data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
wait(self->runRestoreDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return getTenantsFromDataCluster(self, tr); },
|
||||
RunOnDisconnectedCluster(self->restoreDryRun)));
|
||||
|
||||
// Add all tenants from the data cluster to the management cluster
|
||||
wait(addTenantsToManagementCluster(self));
|
||||
|
||||
// set restored cluster to ready state
|
||||
if (!self->restoreDryRun) {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterAsReady(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
// Remove the active restore ID from the data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return eraseRestoreId(self, tr); }));
|
||||
|
||||
// set restored cluster to ready state
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
[self = self](Reference<typename DB::TransactionT> tr) { return self->markClusterAsReady(tr); }));
|
||||
TraceEvent("MarkedDataClusterReady").detail("Name", self->clusterName);
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -271,6 +271,7 @@ ERROR( cluster_removed, 2169, "The cluster is being removed from the metacluster
|
|||
ERROR( cluster_restoring, 2170, "The cluster is being restored to the metacluster" )
|
||||
ERROR( invalid_data_cluster, 2171, "The data cluster being restored has no record of its metacluster" )
|
||||
ERROR( metacluster_mismatch, 2172, "The cluster does not have the expected name or is associated with a different metacluster" )
|
||||
ERROR( conflicting_restore, 2173, "Another restore is running for the same data cluster" )
|
||||
|
||||
// 2200 - errors from bindings and official APIs
|
||||
ERROR( api_version_unset, 2200, "API version is not set" )
|
||||
|
|
|
@ -1309,6 +1309,13 @@ inline Future<Void> operator||(Future<Void> const& lhs, Future<Void> const& rhs)
|
|||
return chooseActor(lhs, rhs);
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<T> joinWith(Future<T> f, Future<Void> other) {
|
||||
wait(other);
|
||||
T t = wait(f);
|
||||
return t;
|
||||
}
|
||||
|
||||
// wait <interval> then call what() in a loop forever
|
||||
ACTOR template <class Func>
|
||||
Future<Void> recurring(Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
||||
|
|
Loading…
Reference in New Issue