All metacluster operations to the management cluster go through the management cluster special key space
This commit is contained in:
parent
d6ac3e57a4
commit
80a3c8dd50
|
@ -33,10 +33,10 @@
|
|||
|
||||
namespace fdb_cli {
|
||||
|
||||
Optional<std::pair<Optional<std::string>, Optional<DataClusterEntry>>>
|
||||
Optional<std::pair<Optional<ClusterConnectionString>, Optional<DataClusterEntry>>>
|
||||
parseClusterConfiguration(std::vector<StringRef> const& tokens, DataClusterEntry const& defaults, int startIndex) {
|
||||
Optional<DataClusterEntry> entry;
|
||||
Optional<std::string> connectionString;
|
||||
Optional<ClusterConnectionString> connectionString;
|
||||
|
||||
for (int tokenNum = startIndex; tokenNum < tokens.size(); ++tokenNum) {
|
||||
StringRef token = tokens[tokenNum];
|
||||
|
@ -49,13 +49,13 @@ parseClusterConfiguration(std::vector<StringRef> const& tokens, DataClusterEntry
|
|||
if (sscanf(value.c_str(), "%d%n", &entry.get().capacity.numTenantGroups, &n) != 1 || n != value.size() ||
|
||||
entry.get().capacity.numTenantGroups < 0) {
|
||||
fprintf(stderr, "ERROR: invalid number of tenant groups %s\n", value.c_str());
|
||||
return Optional<std::pair<Optional<std::string>, Optional<DataClusterEntry>>>();
|
||||
return Optional<std::pair<Optional<ClusterConnectionString>, Optional<DataClusterEntry>>>();
|
||||
}
|
||||
} else if (tokencmp(param, "connection_string")) {
|
||||
connectionString = value;
|
||||
connectionString = ClusterConnectionString(value);
|
||||
} else {
|
||||
fprintf(stderr, "ERROR: unrecognized configuration parameter %s\n", param.toString().c_str());
|
||||
return Optional<std::pair<Optional<std::string>, Optional<DataClusterEntry>>>();
|
||||
return Optional<std::pair<Optional<ClusterConnectionString>, Optional<DataClusterEntry>>>();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,6 +180,7 @@ ACTOR Future<bool> metaclusterConfigureCommand(Reference<IDatabase> db, std::vec
|
|||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
Optional<DataClusterMetadata> metadata = wait(MetaclusterAPI::tryGetClusterTransaction(tr, tokens[2]));
|
||||
if (!metadata.present()) {
|
||||
throw cluster_not_found();
|
||||
|
@ -190,7 +191,7 @@ ACTOR Future<bool> metaclusterConfigureCommand(Reference<IDatabase> db, std::vec
|
|||
return false;
|
||||
}
|
||||
|
||||
MetaclusterAPI::updateClusterMetadataTransaction(tr, tokens[2], config.get().first, config.get().second);
|
||||
MetaclusterAPI::updateClusterMetadata(tr, tokens[2], config.get().first, config.get().second);
|
||||
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
break;
|
||||
|
|
|
@ -61,6 +61,8 @@ struct ClientLeaderRegInterface {
|
|||
// - There is no address present more than once
|
||||
class ClusterConnectionString {
|
||||
public:
|
||||
constexpr static FileIdentifier file_identifier = 13602011;
|
||||
|
||||
ClusterConnectionString() {}
|
||||
ClusterConnectionString(const std::string& connectionString);
|
||||
ClusterConnectionString(const std::vector<NetworkAddress>& coordinators, Key key);
|
||||
|
@ -86,6 +88,12 @@ public:
|
|||
private:
|
||||
void parseConnString();
|
||||
Key key, keyDesc;
|
||||
|
||||
public:
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, coords, hostnames, key, keyDesc);
|
||||
}
|
||||
};
|
||||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ConnectionStringNeedsPersisted);
|
||||
|
|
|
@ -36,12 +36,6 @@ const KeyRangeRef MetaclusterInternalManagementClusterImpl::submoduleRange =
|
|||
MetaclusterInternalManagementClusterImpl::MetaclusterInternalManagementClusterImpl(KeyRangeRef kr)
|
||||
: SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<RangeResult> MetaclusterInternalManagementClusterImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
KeyRef extractCommand(ReadYourWritesTransaction* ryw,
|
||||
KeyRef& beginKey,
|
||||
KeyRef& endKey,
|
||||
|
@ -71,6 +65,49 @@ KeyRef extractCommand(ReadYourWritesTransaction* ryw,
|
|||
return command;
|
||||
}
|
||||
|
||||
ACTOR Future<RangeResult> getDataClusterList(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) {
|
||||
std::map<ClusterName, DataClusterMetadata> clusters =
|
||||
wait(MetaclusterAPI::managementClusterListClusters(&ryw->getTransaction(), kr.begin, kr.end, limitsHint.rows));
|
||||
|
||||
RangeResult results;
|
||||
for (auto cluster : clusters) {
|
||||
ValueRef value =
|
||||
ObjectWriter::toValue(cluster.second, IncludeVersion(ProtocolVersion::withMetacluster()), results.arena());
|
||||
results.push_back(
|
||||
results.arena(),
|
||||
KeyValueRef(cluster.first.withPrefix(
|
||||
"\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr, results.arena()),
|
||||
value));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
Future<RangeResult> MetaclusterInternalManagementClusterImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
|
||||
KeyRangeRef subRange =
|
||||
kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER_INTERNAL).begin)
|
||||
.removePrefix(MetaclusterInternalManagementClusterImpl::submoduleRange.begin);
|
||||
|
||||
KeyRef begin = subRange.begin;
|
||||
KeyRef end = subRange.end;
|
||||
|
||||
KeyRef command = extractCommand(ryw, begin, end, kr);
|
||||
|
||||
if (command == "data_cluster"_sr) {
|
||||
command = extractCommand(ryw, begin, end, kr, "\xff\xff"_sr);
|
||||
if (command == "map"_sr) {
|
||||
return getDataClusterList(ryw, KeyRangeRef(begin, end), limitsHint);
|
||||
}
|
||||
}
|
||||
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
void applyDataClusterConfig(ReadYourWritesTransaction* ryw,
|
||||
ClusterNameRef clusterName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries,
|
||||
|
@ -140,7 +177,7 @@ ACTOR Future<Void> changeDataClusterConfig(ReadYourWritesTransaction* ryw,
|
|||
ClusterNameRef clusterName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries) {
|
||||
state Optional<DataClusterMetadata> clusterMetadata =
|
||||
wait(MetaclusterAPI::tryGetClusterTransaction(&ryw->getTransaction(), clusterName));
|
||||
wait(MetaclusterAPI::managementClusterTryGetCluster(&ryw->getTransaction(), clusterName));
|
||||
if (!clusterMetadata.present()) {
|
||||
TraceEvent(SevWarn, "ConfigureUnknownDataCluster").detail("ClusterName", clusterName);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
|
@ -152,8 +189,7 @@ ACTOR Future<Void> changeDataClusterConfig(ReadYourWritesTransaction* ryw,
|
|||
|
||||
DataClusterMetadata& metadata = clusterMetadata.get();
|
||||
applyDataClusterConfig(ryw, clusterName, configEntries, &metadata);
|
||||
MetaclusterAPI::updateClusterMetadataTransaction(
|
||||
&ryw->getTransaction(), clusterName, metadata.connectionString.toString(), metadata.entry);
|
||||
wait(MetaclusterAPI::managementClusterUpdateClusterMetadata(&ryw->getTransaction(), clusterName, metadata));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -161,7 +197,7 @@ ACTOR Future<Void> changeDataClusterConfig(ReadYourWritesTransaction* ryw,
|
|||
ACTOR Future<Void> removeClusterRange(ReadYourWritesTransaction* ryw,
|
||||
ClusterName beginCluster,
|
||||
ClusterName endCluster) {
|
||||
std::map<ClusterName, DataClusterMetadata> clusters = wait(MetaclusterAPI::listClustersTransaction(
|
||||
std::map<ClusterName, DataClusterMetadata> clusters = wait(MetaclusterAPI::managementClusterListClusters(
|
||||
&ryw->getTransaction(), beginCluster, endCluster, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
if (clusters.size() == CLIENT_KNOBS->TOO_MANY) {
|
||||
|
@ -175,7 +211,7 @@ ACTOR Future<Void> removeClusterRange(ReadYourWritesTransaction* ryw,
|
|||
|
||||
std::vector<Future<Void>> removeFutures;
|
||||
for (auto cluster : clusters) {
|
||||
removeFutures.push_back(MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), cluster.first));
|
||||
removeFutures.push_back(MetaclusterAPI::managementClusterRemove(&ryw->getTransaction(), cluster.first));
|
||||
}
|
||||
|
||||
wait(waitForAll(removeFutures));
|
||||
|
@ -230,7 +266,7 @@ Future<Void> processDataClusterCommandCommit(ReadYourWritesTransaction* ryw) {
|
|||
// For a single key clear, just issue the delete
|
||||
if (mapMutation.first.singleKeyRange()) {
|
||||
clusterManagementFutures.push_back(
|
||||
MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), mapMutation.first.begin));
|
||||
MetaclusterAPI::managementClusterRemove(&ryw->getTransaction(), mapMutation.first.begin));
|
||||
} else {
|
||||
clusterManagementFutures.push_back(
|
||||
removeClusterRange(ryw, mapMutation.first.begin, mapMutation.first.end));
|
||||
|
|
|
@ -37,12 +37,19 @@
|
|||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
struct DataClusterMetadata {
|
||||
constexpr static FileIdentifier file_identifier = 5573993;
|
||||
|
||||
DataClusterEntry entry;
|
||||
ClusterConnectionString connectionString;
|
||||
|
||||
DataClusterMetadata() = default;
|
||||
DataClusterMetadata(DataClusterEntry const& entry, ClusterConnectionString const& connectionString)
|
||||
: entry(entry), connectionString(connectionString) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, connectionString, entry);
|
||||
}
|
||||
};
|
||||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(AddNewTenants);
|
||||
|
@ -51,7 +58,7 @@ FDB_DECLARE_BOOLEAN_PARAM(RemoveMissingTenants);
|
|||
namespace MetaclusterAPI {
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Optional<DataClusterMetadata>> tryGetClusterTransaction(Transaction tr, ClusterNameRef name) {
|
||||
Future<Optional<DataClusterMetadata>> managementClusterTryGetCluster(Transaction tr, ClusterNameRef name) {
|
||||
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
|
||||
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
|
||||
|
||||
|
@ -74,6 +81,25 @@ Future<Optional<DataClusterMetadata>> tryGetClusterTransaction(Transaction tr, C
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Optional<DataClusterMetadata>> tryGetClusterTransaction(Transaction tr, ClusterNameRef name) {
|
||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
||||
state typename transaction_future_type<Transaction, Optional<Value>>::type clusterEntryFuture =
|
||||
tr->get("\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr.withSuffix(name));
|
||||
|
||||
state Optional<Value> clusterEntry = wait(safeThreadFutureToFuture(clusterEntryFuture));
|
||||
|
||||
if (clusterEntry.present()) {
|
||||
DataClusterMetadata metadata;
|
||||
ObjectReader reader(clusterEntry.get().begin(), IncludeVersion());
|
||||
reader.deserialize(metadata);
|
||||
return metadata;
|
||||
} else {
|
||||
return Optional<DataClusterMetadata>();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class DB>
|
||||
Future<Optional<DataClusterMetadata>> tryGetCluster(Reference<DB> db, ClusterName name) {
|
||||
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
||||
|
@ -109,21 +135,40 @@ Future<DataClusterMetadata> getCluster(Reference<DB> db, ClusterName name) {
|
|||
return metadata.get();
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Void> managementClusterUpdateClusterMetadata(Transaction tr, ClusterNameRef name, DataClusterMetadata metadata) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
// Check that the tenant exists
|
||||
Optional<DataClusterMetadata> existingMetadata = wait(managementClusterTryGetCluster(tr, name));
|
||||
if (!existingMetadata.present()) {
|
||||
throw cluster_not_found();
|
||||
}
|
||||
|
||||
tr->set(name.withPrefix(dataClusterMetadataPrefix), encodeDataClusterEntry(metadata.entry));
|
||||
tr->set(name.withPrefix(dataClusterConnectionRecordPrefix), metadata.connectionString.toString());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// This should only be called from a transaction that has already confirmed that the cluster entry
|
||||
// is present. The updatedEntry should use the existing entry and modify only those fields that need
|
||||
// to be changed.
|
||||
template <class Transaction>
|
||||
void updateClusterMetadataTransaction(Transaction tr,
|
||||
ClusterNameRef name,
|
||||
Optional<std::string> updatedConnectionString,
|
||||
Optional<DataClusterEntry> updatedEntry) {
|
||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
||||
void updateClusterMetadata(Transaction tr,
|
||||
ClusterNameRef name,
|
||||
Optional<ClusterConnectionString> updatedConnectionString,
|
||||
Optional<DataClusterEntry> updatedEntry) {
|
||||
if (updatedEntry.present()) {
|
||||
tr->set(name.withPrefix(dataClusterMetadataPrefix), encodeDataClusterEntry(updatedEntry.get()));
|
||||
tr->set(
|
||||
name.withPrefix(
|
||||
"\xff\xff/metacluster_internal/management_cluster/data_cluster/configure/capacity.num_tenant_groups/"_sr),
|
||||
format("%d", updatedEntry.get().capacity.numTenantGroups));
|
||||
}
|
||||
if (updatedConnectionString.present()) {
|
||||
tr->set(name.withPrefix(dataClusterConnectionRecordPrefix), updatedConnectionString.get());
|
||||
tr->set(name.withPrefix(
|
||||
"\xff\xff/metacluster_internal/management_cluster/data_cluster/configure/connection_string/"_sr),
|
||||
updatedConnectionString.get().toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,7 +184,7 @@ Future<Void> managementClusterRegister(Transaction tr,
|
|||
throw invalid_cluster_name();
|
||||
}
|
||||
|
||||
state Future<Optional<DataClusterMetadata>> dataClusterMetadataFuture = tryGetClusterTransaction(tr, name);
|
||||
state Future<Optional<DataClusterMetadata>> dataClusterMetadataFuture = managementClusterTryGetCluster(tr, name);
|
||||
state typename transaction_future_type<Transaction, Optional<Value>>::type lastIdFuture =
|
||||
tr->get(dataClusterLastIdKey);
|
||||
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture =
|
||||
|
@ -177,69 +222,78 @@ Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name) {
|
|||
std::map<TenantName, TenantMapEntry> existingTenants = wait(safeThreadFutureToFuture(existingTenantsFuture));
|
||||
if (!existingTenants.empty()) {
|
||||
TraceEvent(SevWarn, "CannotRegisterClusterWithTenants").detail("ClusterName", name);
|
||||
throw register_nonempty_cluster();
|
||||
throw cluster_not_empty();
|
||||
}
|
||||
|
||||
RangeResult existingData = wait(safeThreadFutureToFuture(existingDataFuture));
|
||||
if (!existingData.empty()) {
|
||||
TraceEvent(SevWarn, "CannotRegisterClusterWithData").detail("ClusterName", name);
|
||||
throw register_nonempty_cluster();
|
||||
throw cluster_not_empty();
|
||||
}
|
||||
|
||||
// TODO: change config to subordinate cluster
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Void> registerClusterTransaction(Transaction tr,
|
||||
ClusterNameRef name,
|
||||
std::string connectionString,
|
||||
DataClusterEntry entry) {
|
||||
ACTOR template <class DB>
|
||||
Future<Void> registerCluster(Reference<DB> db,
|
||||
ClusterName name,
|
||||
ClusterConnectionString connectionString,
|
||||
DataClusterEntry entry) {
|
||||
if (name.startsWith("\xff"_sr)) {
|
||||
throw invalid_cluster_name();
|
||||
}
|
||||
|
||||
state Reference<IDatabase> dbToRegister =
|
||||
// Step 1: Configure the data cluster as a subordinate cluster
|
||||
state Reference<IDatabase> dataClusterDb =
|
||||
MultiVersionApi::api->createDatabase(makeReference<ClusterConnectionMemoryRecord>(connectionString));
|
||||
|
||||
Reference<ITransaction> registerTr = dbToRegister->createTransaction();
|
||||
state Reference<ITransaction> dataClusterTr = dataClusterDb->createTransaction();
|
||||
|
||||
registerTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
registerTr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
// TODO: use the special key-space rather than running the logic ourselves
|
||||
loop {
|
||||
try {
|
||||
dataClusterTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
dataClusterTr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
// TODO: use the special key-space rather than running the logic ourselves
|
||||
|
||||
// registerTr->set("\xff\xff/metacluster/management/data_cluster/register"_sr, ""_sr);
|
||||
wait(dataClusterRegister(registerTr, name));
|
||||
// registerTr->set("\xff\xff/metacluster/management/data_cluster/register"_sr, ""_sr);
|
||||
wait(dataClusterRegister(dataClusterTr, name));
|
||||
|
||||
// Once the data cluster is configured, we can add it to the metacluster
|
||||
tr->set(name.withPrefix("\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr), connectionString);
|
||||
tr->set(
|
||||
name.withPrefix(
|
||||
"\xff\xff/metacluster_internal/management_cluster/data_cluster/configure/capacity.num_tenant_groups/"_sr),
|
||||
format("%d", entry.capacity.numTenantGroups));
|
||||
return Void();
|
||||
}
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
ACTOR template <class DB>
|
||||
Future<Void> registerCluster(Reference<DB> db, ClusterName name, std::string connectionString, DataClusterEntry entry) {
|
||||
wait(safeThreadFutureToFuture(dataClusterTr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
TraceEvent("ConfiguredDataCluster")
|
||||
.detail("ClusterName", name)
|
||||
.detail("Capacity", entry.capacity)
|
||||
.detail("Version", dataClusterTr->getCommittedVersion())
|
||||
.detail("ConnectionString", connectionString.toString());
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(dataClusterTr->onError(e)));
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Register the data cluster in the management cluster
|
||||
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
||||
|
||||
state bool firstTry = true;
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
|
||||
if (firstTry) {
|
||||
Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
|
||||
if (metadata.present()) {
|
||||
throw cluster_already_exists();
|
||||
}
|
||||
|
||||
firstTry = false;
|
||||
}
|
||||
|
||||
wait(registerClusterTransaction(tr, name, connectionString, entry));
|
||||
tr->set(name.withPrefix("\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr),
|
||||
connectionString.toString());
|
||||
tr->set(
|
||||
name.withPrefix(
|
||||
"\xff\xff/metacluster_internal/management_cluster/data_cluster/configure/capacity.num_tenant_groups/"_sr),
|
||||
format("%d", entry.capacity.numTenantGroups));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
|
@ -255,13 +309,15 @@ Future<Void> registerCluster(Reference<DB> db, ClusterName name, std::string con
|
|||
.detail("ClusterName", name)
|
||||
.detail("Capacity", entry.capacity)
|
||||
.detail("Version", tr->getCommittedVersion())
|
||||
.detail("ConnectionString", connectionString);
|
||||
.detail("ConnectionString", connectionString.toString());
|
||||
|
||||
return Void();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
|
@ -314,18 +370,18 @@ Future<Void> restoreCluster(Reference<DB> db,
|
|||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Void> removeClusterTransaction(Transaction tr, ClusterNameRef name) {
|
||||
Future<Void> managementClusterRemove(Transaction tr, ClusterNameRef name) {
|
||||
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
|
||||
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
|
||||
|
||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
||||
state Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
|
||||
state Optional<DataClusterMetadata> metadata = wait(managementClusterTryGetCluster(tr, name));
|
||||
if (!metadata.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// TODO: verify tenant map for cluster is empty
|
||||
// TODO: validate that no tenants are assigned to the target cluster
|
||||
|
||||
tr->clear(dataClusterMetadataKey);
|
||||
tr->clear(dataClusterConnectionRecordKey);
|
||||
|
@ -333,25 +389,29 @@ Future<Void> removeClusterTransaction(Transaction tr, ClusterNameRef name) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Void> dataClusterRemove(Transaction tr, ClusterNameRef name) {
|
||||
// TODO
|
||||
wait(delay(0.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class DB>
|
||||
Future<Void> removeCluster(Reference<DB> db, ClusterName name) {
|
||||
// Step 1: Remove the data cluster from the metacluster
|
||||
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
||||
state Optional<DataClusterMetadata> metadata;
|
||||
|
||||
state bool firstTry = true;
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
if (firstTry) {
|
||||
Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
|
||||
if (!metadata.present()) {
|
||||
throw cluster_not_found();
|
||||
}
|
||||
|
||||
firstTry = false;
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
if (!metadata.present()) {
|
||||
DataClusterMetadata _metadata = wait(getClusterTransaction(tr, name));
|
||||
metadata = _metadata;
|
||||
}
|
||||
|
||||
wait(removeClusterTransaction(tr, name));
|
||||
tr->clear(name.withPrefix("\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
|
@ -364,11 +424,73 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name) {
|
|||
}
|
||||
|
||||
TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
|
||||
return Void();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Reconfigure data cluster and remove metadata.
|
||||
// Note that this is best effort; if it fails the cluster will still have been removed.
|
||||
state Reference<IDatabase> dataClusterDb = MultiVersionApi::api->createDatabase(
|
||||
makeReference<ClusterConnectionMemoryRecord>(metadata.get().connectionString));
|
||||
|
||||
state Reference<ITransaction> dataClusterTr = dataClusterDb->createTransaction();
|
||||
loop {
|
||||
try {
|
||||
dataClusterTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
|
||||
// TODO: use special keys
|
||||
wait(dataClusterRemove(dataClusterTr, name));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(dataClusterTr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
TraceEvent("ReconfiguredDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(dataClusterTr->onError(e)));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<std::map<ClusterName, DataClusterMetadata>> managementClusterListClusters(Transaction tr,
|
||||
ClusterNameRef begin,
|
||||
ClusterNameRef end,
|
||||
int limit) {
|
||||
state KeyRange metadataRange = KeyRangeRef(begin, end).withPrefix(dataClusterMetadataPrefix);
|
||||
state KeyRange connectionStringRange = KeyRangeRef(begin, end).withPrefix(dataClusterConnectionRecordPrefix);
|
||||
|
||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
||||
state typename transaction_future_type<Transaction, RangeResult>::type metadataFuture =
|
||||
tr->getRange(firstGreaterOrEqual(metadataRange.begin), firstGreaterOrEqual(metadataRange.end), limit);
|
||||
state typename transaction_future_type<Transaction, RangeResult>::type connectionStringFuture = tr->getRange(
|
||||
firstGreaterOrEqual(connectionStringRange.begin), firstGreaterOrEqual(connectionStringRange.end), limit);
|
||||
|
||||
state RangeResult metadata = wait(safeThreadFutureToFuture(metadataFuture));
|
||||
RangeResult connectionStrings = wait(safeThreadFutureToFuture(connectionStringFuture));
|
||||
|
||||
ASSERT(metadata.size() == connectionStrings.size());
|
||||
|
||||
std::map<ClusterName, DataClusterMetadata> clusters;
|
||||
for (int i = 0; i < metadata.size(); ++i) {
|
||||
clusters[metadata[i].key.removePrefix(dataClusterMetadataPrefix)] = DataClusterMetadata(
|
||||
decodeDataClusterEntry(metadata[i].value), ClusterConnectionString(connectionStrings[i].value.toString()));
|
||||
}
|
||||
|
||||
return clusters;
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
|
@ -406,18 +528,30 @@ Future<std::map<ClusterName, DataClusterMetadata>> listClusters(Reference<DB> db
|
|||
ClusterName end,
|
||||
int limit) {
|
||||
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
||||
state KeyRef prefix = "\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
std::map<ClusterName, DataClusterMetadata> clusters = wait(listClustersTransaction(tr, begin, end, limit));
|
||||
state typename transaction_future_type<typename DB::TransactionT, RangeResult>::type listFuture =
|
||||
tr->getRange(KeyRangeRef(begin, end).withPrefix(prefix), limit);
|
||||
|
||||
RangeResult results = wait(safeThreadFutureToFuture(listFuture));
|
||||
std::map<ClusterName, DataClusterMetadata> clusters;
|
||||
|
||||
for (auto result : results) {
|
||||
DataClusterMetadata metadata;
|
||||
ObjectReader reader(result.value.begin(), IncludeVersion());
|
||||
reader.deserialize(metadata);
|
||||
clusters[result.key.removePrefix(prefix)] = metadata;
|
||||
}
|
||||
|
||||
return clusters;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}; // namespace MetaclusterAPI
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
|
@ -247,6 +247,11 @@ public:
|
|||
return Standalone<StringRef>(toStringRef(), arena);
|
||||
}
|
||||
|
||||
StringRef toString(Arena& arena) const {
|
||||
ASSERT(!customAllocator);
|
||||
return StringRef(arena, toStringRef());
|
||||
}
|
||||
|
||||
template <class Item, class VersionOptions>
|
||||
static Standalone<StringRef> toValue(Item const& item, VersionOptions vo) {
|
||||
ObjectWriter writer(vo);
|
||||
|
@ -254,6 +259,13 @@ public:
|
|||
return writer.toString();
|
||||
}
|
||||
|
||||
template <class Item, class VersionOptions>
|
||||
static StringRef toValue(Item const& item, VersionOptions vo, Arena& arena) {
|
||||
ObjectWriter writer(vo);
|
||||
writer.serialize(item);
|
||||
return writer.toString(arena);
|
||||
}
|
||||
|
||||
ProtocolVersion protocolVersion() const { return mProtocolVersion; }
|
||||
void setProtocolVersion(ProtocolVersion v) {
|
||||
mProtocolVersion = v;
|
||||
|
|
|
@ -172,6 +172,8 @@ public: // introduced features
|
|||
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, Metacluster);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, TenantGroups);
|
||||
};
|
||||
|
||||
template <>
|
||||
|
|
|
@ -236,7 +236,7 @@ ERROR( invalid_cluster_name, 2150, "Data cluster name cannot begin with \\xff" )
|
|||
ERROR( invalid_metacluster_operation, 2151, "Metacluster operation performed on non-metacluster" )
|
||||
ERROR( cluster_already_exists, 2152, "A data cluster with the given name already exists" )
|
||||
ERROR( cluster_not_found, 2153, "Data cluster does not exist" )
|
||||
ERROR( register_nonempty_cluster, 2154, "Cannot register non-empty data cluster" )
|
||||
ERROR( cluster_not_empty, 2154, "Data cluster must be empty" )
|
||||
|
||||
// 2200 - errors from bindings and official APIs
|
||||
ERROR( api_version_unset, 2200, "API version is not set" )
|
||||
|
|
Loading…
Reference in New Issue