Add MetaclusterOperationContext to make it easier to run multi-transaction operations

This commit is contained in:
A.J. Beamon 2022-07-28 16:44:20 -07:00
parent cd19f2cdc2
commit 5db1135ac6
5 changed files with 873 additions and 859 deletions

View File

@ -120,7 +120,31 @@ struct MetaclusterRegistrationEntry {
ASSERT(metaclusterName != name && metaclusterId != id);
}
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion()); }
// Returns true if this entry is associated with the same cluster as the passed in entry. If one entry is from the
// management cluster and the other is from a data cluster, this checks whether they are part of the same
// metacluster.
bool matches(MetaclusterRegistrationEntry const& other) const {
if (metaclusterName != other.metaclusterName || metaclusterId != other.metaclusterId) {
return false;
} else if (clusterType == ClusterType::METACLUSTER_DATA && other.clusterType == ClusterType::METACLUSTER_DATA &&
(name != other.name || id != other.id)) {
return false;
}
return true;
}
MetaclusterRegistrationEntry toManagementClusterRegistration() const {
ASSERT(clusterType == ClusterType::METACLUSTER_DATA);
return MetaclusterRegistrationEntry(metaclusterName, metaclusterId);
}
MetaclusterRegistrationEntry toDataClusterRegistration(ClusterName name, UID id) const {
ASSERT(clusterType == ClusterType::METACLUSTER_MANAGEMENT);
return MetaclusterRegistrationEntry(metaclusterName, name, metaclusterId, id);
}
Value encode() const { return ObjectWriter::toValue(*this, IncludeVersion()); }
static MetaclusterRegistrationEntry decode(ValueRef const& value) {
MetaclusterRegistrationEntry entry;
ObjectReader reader(value.begin(), IncludeVersion());
@ -132,12 +156,30 @@ struct MetaclusterRegistrationEntry {
[](ValueRef const& v) { return MetaclusterRegistrationEntry::decode(v); });
}
std::string toString() const {
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
return fmt::format(
"metacluster name: {}, metacluster id: {}\n", printable(metaclusterName), metaclusterId.shortString());
} else {
return fmt::format("metacluster name: {}, metacluster id: {}, data cluster name: {}, data cluster id: {}\n",
printable(metaclusterName),
metaclusterId.shortString(),
printable(name),
id.shortString());
}
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clusterType, metaclusterName, name, metaclusterId, id);
}
};
template <>
struct Traceable<MetaclusterRegistrationEntry> : std::true_type {
static std::string toString(MetaclusterRegistrationEntry const& entry) { return entry.toString(); }
};
struct MetaclusterMetadata {
// Registration information for a metacluster, stored on both management and data clusters
static inline KeyBackedObjectProperty<MetaclusterRegistrationEntry, decltype(IncludeVersion())>

File diff suppressed because it is too large Load Diff

View File

@ -313,47 +313,47 @@ Future<Void> deleteTenantTransaction(Transaction tr,
TenantMetadata::tenantGroupMap.erase(tr, tenantEntry.get().tenantGroup.get());
}
}
}
if (clusterType == ClusterType::METACLUSTER_DATA) {
// In data clusters, we store a tombstone
state Future<KeyBackedRangeResult<int64_t>> latestTombstoneFuture =
TenantMetadata::tenantTombstones.getRange(tr, {}, {}, 1, Snapshot::False, Reverse::True);
state Optional<TenantTombstoneCleanupData> cleanupData = wait(TenantMetadata::tombstoneCleanupData.get(tr));
state Version transactionReadVersion = wait(safeThreadFutureToFuture(tr->getReadVersion()));
if (clusterType == ClusterType::METACLUSTER_DATA) {
// In data clusters, we store a tombstone
state Future<KeyBackedRangeResult<int64_t>> latestTombstoneFuture =
TenantMetadata::tenantTombstones.getRange(tr, {}, {}, 1, Snapshot::False, Reverse::True);
state Optional<TenantTombstoneCleanupData> cleanupData = wait(TenantMetadata::tombstoneCleanupData.get(tr));
state Version transactionReadVersion = wait(safeThreadFutureToFuture(tr->getReadVersion()));
// If it has been long enough since we last cleaned up the tenant tombstones, we do that first
if (!cleanupData.present() || cleanupData.get().nextTombstoneEraseVersion <= transactionReadVersion) {
state int64_t deleteThroughId = cleanupData.present() ? cleanupData.get().nextTombstoneEraseId : -1;
// Delete all tombstones up through the one currently marked in the cleanup data
if (deleteThroughId >= 0) {
TenantMetadata::tenantTombstones.erase(tr, 0, deleteThroughId + 1);
}
// If it has been long enough since we last cleaned up the tenant tombstones, we do that first
if (!cleanupData.present() || cleanupData.get().nextTombstoneEraseVersion <= transactionReadVersion) {
state int64_t deleteThroughId = cleanupData.present() ? cleanupData.get().nextTombstoneEraseId : -1;
// Delete all tombstones up through the one currently marked in the cleanup data
if (deleteThroughId >= 0) {
TenantMetadata::tenantTombstones.erase(tr, 0, deleteThroughId + 1);
}
KeyBackedRangeResult<int64_t> latestTombstone = wait(latestTombstoneFuture);
int64_t nextDeleteThroughId = std::max(deleteThroughId, tenantId.get());
if (!latestTombstone.results.empty()) {
nextDeleteThroughId = std::max(nextDeleteThroughId, latestTombstone.results[0]);
}
KeyBackedRangeResult<int64_t> latestTombstone = wait(latestTombstoneFuture);
int64_t nextDeleteThroughId = std::max(deleteThroughId, tenantId.get());
if (!latestTombstone.results.empty()) {
nextDeleteThroughId = std::max(nextDeleteThroughId, latestTombstone.results[0]);
}
// The next cleanup will happen at or after TENANT_TOMBSTONE_CLEANUP_INTERVAL seconds have elapsed and
// will clean up tombstones through the most recently allocated ID.
TenantTombstoneCleanupData updatedCleanupData;
updatedCleanupData.tombstonesErasedThrough = deleteThroughId;
updatedCleanupData.nextTombstoneEraseId = nextDeleteThroughId;
updatedCleanupData.nextTombstoneEraseVersion =
transactionReadVersion +
CLIENT_KNOBS->TENANT_TOMBSTONE_CLEANUP_INTERVAL * CLIENT_KNOBS->VERSIONS_PER_SECOND;
// The next cleanup will happen at or after TENANT_TOMBSTONE_CLEANUP_INTERVAL seconds have elapsed and
// will clean up tombstones through the most recently allocated ID.
TenantTombstoneCleanupData updatedCleanupData;
updatedCleanupData.tombstonesErasedThrough = deleteThroughId;
updatedCleanupData.nextTombstoneEraseId = nextDeleteThroughId;
updatedCleanupData.nextTombstoneEraseVersion =
transactionReadVersion +
CLIENT_KNOBS->TENANT_TOMBSTONE_CLEANUP_INTERVAL * CLIENT_KNOBS->VERSIONS_PER_SECOND;
TenantMetadata::tombstoneCleanupData.set(tr, updatedCleanupData);
TenantMetadata::tombstoneCleanupData.set(tr, updatedCleanupData);
// If the tenant being deleted is within the tombstone window, record the tombstone
if (tenantId.get() > updatedCleanupData.tombstonesErasedThrough) {
TenantMetadata::tenantTombstones.insert(tr, tenantId.get());
}
} else if (tenantId.get() > cleanupData.get().tombstonesErasedThrough) {
// If the tenant being deleted is within the tombstone window, record the tombstone
// If the tenant being deleted is within the tombstone window, record the tombstone
if (tenantId.get() > updatedCleanupData.tombstonesErasedThrough) {
TenantMetadata::tenantTombstones.insert(tr, tenantId.get());
}
} else if (tenantId.get() > cleanupData.get().tombstonesErasedThrough) {
// If the tenant being deleted is within the tombstone window, record the tombstone
TenantMetadata::tenantTombstones.insert(tr, tenantId.get());
}
}

View File

@ -169,34 +169,22 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
try {
loop {
try {
Future<Void> createFuture =
self->useMetacluster
? MetaclusterAPI::createTenant(self->mvDb, tenant, entry)
: success(TenantAPI::createTenant(self->dataDb.getReference(), tenant, entry));
Optional<Void> result = wait(timeout(createFuture, 30));
if (result.present()) {
break;
}
} catch (Error& e) {
// If we retried the creation after our initial attempt succeeded, then we proceed with the rest
// of the creation steps normally. Otherwise, the creation happened elsewhere and we failed
// here, so we can rethrow the error.
if (e.code() == error_code_tenant_already_exists) {
break;
} else if (e.code() == error_code_tenant_removed) {
ASSERT(self->useMetacluster);
break;
} else {
throw;
}
Future<Void> createFuture =
self->useMetacluster ? MetaclusterAPI::createTenant(self->mvDb, tenant, entry)
: success(TenantAPI::createTenant(self->dataDb.getReference(), tenant, entry));
Optional<Void> result = wait(timeout(createFuture, 30));
if (result.present()) {
break;
}
}
return Void();
} catch (Error& e) {
if (e.code() != error_code_tenant_already_exists && e.code() != error_code_cluster_no_capacity) {
if (e.code() == error_code_tenant_removed) {
ASSERT(self->useMetacluster);
} else if (e.code() != error_code_tenant_already_exists && e.code() != error_code_cluster_no_capacity) {
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
ASSERT(false);
}
return Void();
@ -208,24 +196,13 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
try {
loop {
try {
Future<Void> deleteFuture = self->useMetacluster
? MetaclusterAPI::deleteTenant(self->mvDb, tenant)
: TenantAPI::deleteTenant(self->dataDb.getReference(), tenant);
Optional<Void> result = wait(timeout(deleteFuture, 30));
Future<Void> deleteFuture = self->useMetacluster
? MetaclusterAPI::deleteTenant(self->mvDb, tenant)
: TenantAPI::deleteTenant(self->dataDb.getReference(), tenant);
Optional<Void> result = wait(timeout(deleteFuture, 30));
if (result.present()) {
break;
}
} catch (Error& e) {
// If we retried the deletion after our initial attempt succeeded, then we proceed with the
// rest of the deletion steps normally. Otherwise, the deletion happened elsewhere and we
// failed here, so we can rethrow the error.
if (e.code() == error_code_tenant_not_found) {
break;
} else {
throw;
}
if (result.present()) {
break;
}
}

View File

@ -250,6 +250,7 @@ ERROR( metacluster_no_capacity, 2157, "Metacluster does not have capacity to cre
ERROR( management_cluster_invalid_access, 2158, "Standard transactions cannot be run against the management cluster" )
ERROR( tenant_cannot_be_moved, 2159, "The tenant cannot be moved between clusters" )
ERROR( tenant_creation_permanently_failed, 2160, "The tenant creation did not complete in a timely manner and has permanently failed" )
ERROR( cluster_locked, 2161, "The cluster has been locked" )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )