Add concurrent restore testing to the metacluster restore workload
This commit is contained in:
parent
e151a2d363
commit
dcae48cbbd
|
@ -293,13 +293,38 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
messages.clear();
|
||||
}
|
||||
|
||||
wait(MetaclusterAPI::restoreCluster(self->managementDb,
|
||||
clusterName,
|
||||
dataDb->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun::False,
|
||||
forceJoin,
|
||||
&messages));
|
||||
state int numRestores = deterministicRandom()->randomInt(1, 3);
|
||||
state bool successfulRestore = false;
|
||||
while (!successfulRestore) {
|
||||
state std::vector<Future<ErrorOr<Void>>> restoreFutures;
|
||||
for (; numRestores > 0; numRestores--) {
|
||||
restoreFutures.push_back(
|
||||
errorOr(MetaclusterAPI::restoreCluster(self->managementDb,
|
||||
clusterName,
|
||||
dataDb->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun::False,
|
||||
forceJoin,
|
||||
&messages)));
|
||||
wait(delay(deterministicRandom()->random01() * 5));
|
||||
}
|
||||
|
||||
wait(waitForAll(restoreFutures));
|
||||
|
||||
for (auto const& f : restoreFutures) {
|
||||
if (!f.get().isError()) {
|
||||
successfulRestore = true;
|
||||
} else {
|
||||
ASSERT(f.get().getError().code() == error_code_conflicting_restore);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(successfulRestore || restoreFutures.size() > 1);
|
||||
numRestores = 1;
|
||||
}
|
||||
|
||||
DataClusterMetadata clusterMetadata = wait(MetaclusterAPI::getCluster(self->managementDb, clusterName));
|
||||
ASSERT_EQ(clusterMetadata.entry.clusterState, DataClusterState::READY);
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoreComplete").detail("ClusterName", clusterName);
|
||||
}
|
||||
|
||||
|
@ -578,14 +603,69 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
messages.clear();
|
||||
}
|
||||
|
||||
wait(MetaclusterAPI::restoreCluster(
|
||||
self->managementDb,
|
||||
clusterItr->first,
|
||||
clusterItr->second.db->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun::False,
|
||||
ForceJoin(deterministicRandom()->coinflip()),
|
||||
&messages));
|
||||
state int numRestores = deterministicRandom()->randomInt(1, 3);
|
||||
state bool successfulRestore = false;
|
||||
loop {
|
||||
state std::vector<std::vector<std::string>> messagesList(numRestores);
|
||||
state std::vector<Future<ErrorOr<Void>>> restoreFutures;
|
||||
for (; numRestores > 0; numRestores--) {
|
||||
restoreFutures.push_back(errorOr(MetaclusterAPI::restoreCluster(
|
||||
self->managementDb,
|
||||
clusterItr->first,
|
||||
clusterItr->second.db->getConnectionRecord()->getConnectionString(),
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun::False,
|
||||
ForceJoin(deterministicRandom()->coinflip()),
|
||||
&messagesList[restoreFutures.size()])));
|
||||
wait(delay(deterministicRandom()->random01() * 5));
|
||||
}
|
||||
|
||||
wait(waitForAll(restoreFutures));
|
||||
|
||||
Optional<Error> nonConflictError;
|
||||
for (int i = 0; i < restoreFutures.size(); ++i) {
|
||||
ErrorOr<Void> result = restoreFutures[i].get();
|
||||
fmt::print("Restore result for {}: {}\n",
|
||||
printable(clusterItr->first),
|
||||
result.isError() ? result.getError().what() : "success");
|
||||
if (!result.isError()) {
|
||||
ASSERT(!successfulRestore);
|
||||
successfulRestore = true;
|
||||
messages = messagesList[i];
|
||||
} else if (result.getError().code() != error_code_conflicting_restore &&
|
||||
result.getError().code() != error_code_cluster_already_registered &&
|
||||
result.getError().code() != error_code_cluster_already_exists &&
|
||||
!nonConflictError.present()) {
|
||||
nonConflictError = result.getError().code();
|
||||
messages = messagesList[i];
|
||||
}
|
||||
}
|
||||
|
||||
if (nonConflictError.present()) {
|
||||
ASSERT(!successfulRestore);
|
||||
throw nonConflictError.get();
|
||||
} else if (successfulRestore) {
|
||||
break;
|
||||
}
|
||||
|
||||
ASSERT_GT(restoreFutures.size(), 1);
|
||||
|
||||
numRestores = 1;
|
||||
wait(success(MetaclusterAPI::removeCluster(clusterItr->second.db.getReference(),
|
||||
clusterItr->first,
|
||||
ClusterType::METACLUSTER_DATA,
|
||||
ForceRemove::True)));
|
||||
wait(success(MetaclusterAPI::removeCluster(self->managementDb,
|
||||
clusterItr->first,
|
||||
ClusterType::METACLUSTER_MANAGEMENT,
|
||||
ForceRemove::True)));
|
||||
TraceEvent("MetaclusterRestoreWorkloadRemovedFailedCluster")
|
||||
.detail("ClusterName", clusterItr->first);
|
||||
}
|
||||
|
||||
DataClusterMetadata clusterMetadata =
|
||||
wait(MetaclusterAPI::getCluster(self->managementDb, clusterItr->first));
|
||||
ASSERT_EQ(clusterMetadata.entry.clusterState, DataClusterState::READY);
|
||||
|
||||
ASSERT(collisions.first.empty() && collisions.second.empty());
|
||||
completed = true;
|
||||
|
@ -594,7 +674,11 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
(e.code() == error_code_tenant_already_exists && !collisions.first.empty()) ||
|
||||
(e.code() == error_code_invalid_tenant_configuration && !collisions.second.empty());
|
||||
if (!failedDueToCollision) {
|
||||
throw;
|
||||
TraceEvent(SevError, "MetaclusterRestoreWorkloadRestoreManagementClusterFailed")
|
||||
.error(e)
|
||||
.detail("FromCluster", clusterItr->first)
|
||||
.detail("TenantCollisions", collisions.first.size());
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
// If the restore did not succeed, remove the partially restored cluster
|
||||
|
@ -603,7 +687,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
|||
clusterItr->first,
|
||||
ClusterType::METACLUSTER_MANAGEMENT,
|
||||
ForceRemove::True)));
|
||||
TraceEvent("MetaclusterRestoreWorkloadRemoveFailedCluster")
|
||||
TraceEvent("MetaclusterRestoreWorkloadRemovedFailedCluster")
|
||||
.detail("ClusterName", clusterItr->first);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_cluster_not_found) {
|
||||
|
|
Loading…
Reference in New Issue