Add a conflict range on the active restore ID when setting it

This commit is contained in:
A.J. Beamon 2023-02-23 13:22:45 -08:00
parent 537834ef00
commit 6adccdafa9
2 changed files with 28 additions and 0 deletions

View File

@ -435,6 +435,30 @@ public:
tr->clear(subspace);
}
template <class Transaction>
void addReadConflictKey(Transaction tr, KeyType const& key) {
Key k = subspace.begin.withSuffix(KeyCodec::pack(key));
tr->addReadConflictRange(singleKeyRange(k));
}
template <class Transaction>
void addReadConflictRange(Transaction tr, KeyType const& begin, KeyType const& end) {
tr->addReadConflictRange(subspace.begin.withSuffix(KeyCodec::pack(begin)),
subspace.begin.withSuffix(KeyCodec::pack(end)));
}
template <class Transaction>
void addWriteConflictKey(Transaction tr, KeyType const& key) {
Key k = subspace.begin.withSuffix(KeyCodec::pack(key));
tr->addWriteConflictRange(singleKeyRange(k));
}
template <class Transaction>
void addWriteConflictRange(Transaction tr, KeyType const& begin, KeyType const& end) {
tr->addWriteConflictRange(subspace.begin.withSuffix(KeyCodec::pack(begin)),
subspace.begin.withSuffix(KeyCodec::pack(end)));
}
KeyRange subspace;
};

View File

@ -1409,6 +1409,7 @@ struct RestoreClusterImpl {
} else if (transactionId.get() != self->restoreId) {
throw conflicting_restore();
} else {
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, self->clusterName);
MetaclusterMetadata::activeRestoreIds().erase(tr, self->clusterName);
}
@ -1504,6 +1505,7 @@ struct RestoreClusterImpl {
TraceEvent("RestoredClusterAlreadyExists").detail("ClusterName", self->clusterName);
throw cluster_already_exists();
} else if (!self->restoreDryRun) {
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, self->clusterName);
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
ManagementClusterMetadata::dataClusters().set(tr, self->clusterName, clusterEntry);
@ -1558,6 +1560,7 @@ struct RestoreClusterImpl {
if (!self->restoreDryRun) {
MetaclusterMetadata::metaclusterRegistration().set(tr, dataClusterEntry);
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, self->clusterName);
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
}
@ -1572,6 +1575,7 @@ struct RestoreClusterImpl {
}
void markClusterRestoring(Reference<typename DB::TransactionT> tr) {
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, clusterName);
MetaclusterMetadata::activeRestoreIds().set(tr, clusterName, restoreId);
if (ctx.dataClusterMetadata.get().entry.clusterState != DataClusterState::RESTORING) {
DataClusterEntry updatedEntry = ctx.dataClusterMetadata.get().entry;