Add internal metacluster special key-space modules
This commit is contained in:
parent
7c3b99f5bd
commit
d6ac3e57a4
|
@ -85,6 +85,8 @@ set(FDBCLIENT_SRCS
|
|||
Metacluster.cpp
|
||||
Metacluster.h
|
||||
MetaclusterManagement.actor.h
|
||||
MetaclusterInternalDataClusterSpecialKeys.actor.cpp
|
||||
MetaclusterInternalManagementClusterSpecialKeys.actor.cpp
|
||||
MonitorLeader.actor.cpp
|
||||
MonitorLeader.h
|
||||
MultiVersionAssignmentVars.h
|
||||
|
@ -143,6 +145,7 @@ set(FDBCLIENT_SRCS
|
|||
TaskBucket.h
|
||||
Tenant.cpp
|
||||
Tenant.h
|
||||
TenantSpecialKeys.actor.cpp
|
||||
TestKnobCollection.cpp
|
||||
TestKnobCollection.h
|
||||
ThreadSafeTransaction.cpp
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* MetaclusterInternalDataClusterSpecialKeys.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ActorLineageProfiler.h"
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/MetaclusterManagement.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const KeyRangeRef MetaclusterInternalDataClusterImpl::submoduleRange =
|
||||
KeyRangeRef("data_cluster/"_sr, "data_cluster0"_sr);
|
||||
|
||||
MetaclusterInternalDataClusterImpl::MetaclusterInternalDataClusterImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<RangeResult> MetaclusterInternalDataClusterImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> MetaclusterInternalDataClusterImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
return Optional<std::string>();
|
||||
}
|
|
@ -0,0 +1,255 @@
|
|||
/*
|
||||
* MetaclusterInternalManagementClusterSpecialKeys.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ActorLineageProfiler.h"
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/Metacluster.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/MetaclusterManagement.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const KeyRangeRef MetaclusterInternalManagementClusterImpl::submoduleRange =
|
||||
KeyRangeRef("management_cluster/"_sr, "management_cluster0"_sr);
|
||||
|
||||
MetaclusterInternalManagementClusterImpl::MetaclusterInternalManagementClusterImpl(KeyRangeRef kr)
|
||||
: SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<RangeResult> MetaclusterInternalManagementClusterImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
return RangeResult();
|
||||
}
|
||||
|
||||
KeyRef extractCommand(ReadYourWritesTransaction* ryw,
|
||||
KeyRef& beginKey,
|
||||
KeyRef& endKey,
|
||||
KeyRangeRef fullRange,
|
||||
Optional<KeyRef> endRangeReplacement = Optional<KeyRef>()) {
|
||||
|
||||
KeyRef command = beginKey.eat("/");
|
||||
KeyRef endCommand = endKey.substr(0, std::min(endKey.size(), command.size()));
|
||||
endKey = endKey.substr(endCommand.size());
|
||||
|
||||
if (command != endCommand && !endKey.empty() && endKey[0] != '/' &&
|
||||
(endKey[0] != '0' || !endRangeReplacement.present())) {
|
||||
TraceEvent(SevWarn, "InvalidMetaclusterInternalCommand")
|
||||
.detail("Reason", "Clear spans multiple submodules")
|
||||
.detail("Range", fullRange);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false, "metacluster internal management cluster", "clear spans multiple submodules"));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
if (endKey.size() > 0 && endKey[0] == '/') {
|
||||
endKey = endKey.substr(1);
|
||||
} else {
|
||||
endKey = endRangeReplacement.get();
|
||||
}
|
||||
|
||||
return command;
|
||||
}
|
||||
|
||||
void applyDataClusterConfig(ReadYourWritesTransaction* ryw,
|
||||
ClusterNameRef clusterName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries,
|
||||
DataClusterMetadata* clusterMetadata) {
|
||||
for (auto config : configEntries) {
|
||||
if (config.first == "connection_string"_sr && config.second.present()) {
|
||||
clusterMetadata->connectionString = ClusterConnectionString(config.second.get().toString());
|
||||
} else if (config.first == "capacity.num_tenant_groups"_sr) {
|
||||
if (config.second.present()) {
|
||||
int n;
|
||||
if (sscanf(config.second.get().toString().c_str(),
|
||||
"%d%n",
|
||||
&clusterMetadata->entry.capacity.numTenantGroups,
|
||||
&n) != 1 ||
|
||||
n != config.second.get().size() || clusterMetadata->entry.capacity.numTenantGroups < 0) {
|
||||
|
||||
TraceEvent(SevWarn, "InvalidDataClusterTenantGroupConfig")
|
||||
.detail("ClusterName", clusterName)
|
||||
.detail("Value", config.second);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set data cluster tenant group capacity",
|
||||
format("invalid data cluster tenant group capacity `%s' for cluster `%s'",
|
||||
config.second.get().toString().c_str(),
|
||||
clusterName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
} else {
|
||||
clusterMetadata->entry.capacity.numTenantGroups = 0;
|
||||
}
|
||||
} else {
|
||||
TraceEvent(SevWarn, "InvalidDataClusterConfig")
|
||||
.detail("ClusterName", clusterName)
|
||||
.detail("ConfigName", config.first)
|
||||
.detail("Value", config.second);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set data cluster configuration",
|
||||
format("invalid data cluster configuration option `%s' for cluster `%s'",
|
||||
config.first.toString().c_str(),
|
||||
clusterName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> registerDataCluster(ReadYourWritesTransaction* ryw,
|
||||
ClusterNameRef clusterName,
|
||||
std::string connectionString,
|
||||
Optional<std::vector<std::pair<StringRef, Optional<Value>>>> configMutations) {
|
||||
state DataClusterMetadata metadata;
|
||||
|
||||
// TODO: what happens if connection string is invalid?
|
||||
metadata.connectionString = ClusterConnectionString(connectionString);
|
||||
|
||||
if (configMutations.present()) {
|
||||
applyDataClusterConfig(ryw, clusterName, configMutations.get(), &metadata);
|
||||
}
|
||||
|
||||
wait(MetaclusterAPI::managementClusterRegister(
|
||||
&ryw->getTransaction(), clusterName, metadata.connectionString.toString(), metadata.entry));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeDataClusterConfig(ReadYourWritesTransaction* ryw,
|
||||
ClusterNameRef clusterName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries) {
|
||||
state Optional<DataClusterMetadata> clusterMetadata =
|
||||
wait(MetaclusterAPI::tryGetClusterTransaction(&ryw->getTransaction(), clusterName));
|
||||
if (!clusterMetadata.present()) {
|
||||
TraceEvent(SevWarn, "ConfigureUnknownDataCluster").detail("ClusterName", clusterName);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set data cluster configuration",
|
||||
format("cannot configure data cluster `%s': cluster not found", clusterName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
DataClusterMetadata& metadata = clusterMetadata.get();
|
||||
applyDataClusterConfig(ryw, clusterName, configEntries, &metadata);
|
||||
MetaclusterAPI::updateClusterMetadataTransaction(
|
||||
&ryw->getTransaction(), clusterName, metadata.connectionString.toString(), metadata.entry);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> removeClusterRange(ReadYourWritesTransaction* ryw,
|
||||
ClusterName beginCluster,
|
||||
ClusterName endCluster) {
|
||||
std::map<ClusterName, DataClusterMetadata> clusters = wait(MetaclusterAPI::listClustersTransaction(
|
||||
&ryw->getTransaction(), beginCluster, endCluster, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
if (clusters.size() == CLIENT_KNOBS->TOO_MANY) {
|
||||
TraceEvent(SevWarn, "RemoveClustersRangeTooLange")
|
||||
.detail("BeginCluster", beginCluster)
|
||||
.detail("EndCluster", endCluster);
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false, "remove cluster", "too many cluster to range remove"));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> removeFutures;
|
||||
for (auto cluster : clusters) {
|
||||
removeFutures.push_back(MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), cluster.first));
|
||||
}
|
||||
|
||||
wait(waitForAll(removeFutures));
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> processDataClusterCommandCommit(ReadYourWritesTransaction* ryw) {
|
||||
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(
|
||||
KeyRangeRef("management_cluster/data_cluster/"_sr, "management_cluster/data_cluster0"_sr)
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER_INTERNAL).begin));
|
||||
|
||||
std::vector<Future<Void>> clusterManagementFutures;
|
||||
|
||||
std::vector<std::pair<KeyRangeRef, Optional<Value>>> mapMutations;
|
||||
std::map<ClusterNameRef, std::vector<std::pair<KeyRef, Optional<Value>>>> configMutations;
|
||||
|
||||
for (auto itr : ranges) {
|
||||
if (!itr.value().first) {
|
||||
continue;
|
||||
}
|
||||
|
||||
KeyRangeRef range =
|
||||
itr.range()
|
||||
.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER_INTERNAL).begin)
|
||||
.removePrefix(MetaclusterInternalManagementClusterImpl::submoduleRange.begin)
|
||||
.removePrefix("data_cluster/"_sr);
|
||||
|
||||
KeyRef begin = range.begin;
|
||||
KeyRef end = range.end;
|
||||
KeyRef command = extractCommand(ryw, begin, end, itr.range(), "\xff\xff"_sr);
|
||||
|
||||
if (command == "map"_sr) {
|
||||
mapMutations.push_back(std::make_pair(KeyRangeRef(begin, end), itr.value().second));
|
||||
} else if (command == "configure"_sr && KeyRangeRef(begin, end).singleKeyRange()) {
|
||||
// TODO: handle capacity/num_tenant_groups
|
||||
KeyRef param = begin.eat("/");
|
||||
configMutations[begin].push_back(std::make_pair(param, itr.value().second));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto mapMutation : mapMutations) {
|
||||
if (mapMutation.second.present()) {
|
||||
Optional<std::vector<std::pair<StringRef, Optional<Value>>>> registerMutations;
|
||||
auto itr = configMutations.find(mapMutation.first.begin);
|
||||
if (itr != configMutations.end()) {
|
||||
registerMutations = itr->second;
|
||||
configMutations.erase(itr);
|
||||
}
|
||||
clusterManagementFutures.push_back(registerDataCluster(
|
||||
ryw, mapMutation.first.begin, mapMutation.second.get().toString(), registerMutations));
|
||||
} else {
|
||||
// For a single key clear, just issue the delete
|
||||
if (mapMutation.first.singleKeyRange()) {
|
||||
clusterManagementFutures.push_back(
|
||||
MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), mapMutation.first.begin));
|
||||
} else {
|
||||
clusterManagementFutures.push_back(
|
||||
removeClusterRange(ryw, mapMutation.first.begin, mapMutation.first.end));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto configMutation : configMutations) {
|
||||
clusterManagementFutures.push_back(changeDataClusterConfig(ryw, configMutation.first, configMutation.second));
|
||||
}
|
||||
|
||||
return waitForAll(clusterManagementFutures);
|
||||
}
|
||||
|
||||
Future<Void> processTenantCommandCommit(ReadYourWritesTransaction* ryw) {
|
||||
// TODO
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> MetaclusterInternalManagementClusterImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
return tag(processDataClusterCommandCommit(ryw) && processTenantCommandCommit(ryw), Optional<std::string>());
|
||||
}
|
|
@ -128,10 +128,10 @@ void updateClusterMetadataTransaction(Transaction tr,
|
|||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Void> managementClusterRegisterClusterTransaction(Transaction tr,
|
||||
ClusterNameRef name,
|
||||
std::string connectionString,
|
||||
DataClusterEntry entry) {
|
||||
Future<Void> managementClusterRegister(Transaction tr,
|
||||
ClusterNameRef name,
|
||||
std::string connectionString,
|
||||
DataClusterEntry entry) {
|
||||
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
|
||||
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
|
||||
|
||||
|
@ -169,7 +169,7 @@ Future<Void> managementClusterRegisterClusterTransaction(Transaction tr,
|
|||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Void> dataClusterRegisterClusterTransaction(Transaction tr, ClusterNameRef name) {
|
||||
Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name) {
|
||||
state Future<std::map<TenantName, TenantMapEntry>> existingTenantsFuture =
|
||||
ManagementAPI::listTenantsTransaction(tr, ""_sr, "\xff\xff"_sr, 1);
|
||||
state ThreadFuture<RangeResult> existingDataFuture = tr->getRange(normalKeys, 1);
|
||||
|
@ -209,12 +209,14 @@ Future<Void> registerClusterTransaction(Transaction tr,
|
|||
// TODO: use the special key-space rather than running the logic ourselves
|
||||
|
||||
// registerTr->set("\xff\xff/metacluster/management/data_cluster/register"_sr, ""_sr);
|
||||
wait(dataClusterRegisterClusterTransaction(registerTr, name));
|
||||
wait(dataClusterRegister(registerTr, name));
|
||||
|
||||
// Once the data cluster is configured, we can add it to the metacluster
|
||||
// tr->set(name.withPrefix("\xff\xff/metacluster/map/"_sr), connectionString);
|
||||
// wait(safeThreadFutureToFuture(tr->commit()));
|
||||
wait(managementClusterRegisterClusterTransaction(tr, name, connectionString, entry));
|
||||
tr->set(name.withPrefix("\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr), connectionString);
|
||||
tr->set(
|
||||
name.withPrefix(
|
||||
"\xff\xff/metacluster_internal/management_cluster/data_cluster/configure/capacity.num_tenant_groups/"_sr),
|
||||
format("%d", entry.capacity.numTenantGroups));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -226,6 +228,7 @@ Future<Void> registerCluster(Reference<DB> db, ClusterName name, std::string con
|
|||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
|
||||
if (firstTry) {
|
||||
Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
|
||||
|
|
|
@ -1480,10 +1480,14 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
|
||||
|
||||
if (apiVersionAtLeast(720)) {
|
||||
registerSpecialKeysImpl(SpecialKeySpace::MODULE::METACLUSTER,
|
||||
registerSpecialKeysImpl(SpecialKeySpace::MODULE::METACLUSTER_INTERNAL,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<DataClusterMapRangeImpl>(
|
||||
SpecialKeySpace::getMetaclusterApiCommandRange("dataclustermap")));
|
||||
std::make_unique<MetaclusterInternalManagementClusterImpl>(
|
||||
SpecialKeySpace::getMetaclusterInternalApiCommandRange("management_cluster")));
|
||||
registerSpecialKeysImpl(SpecialKeySpace::MODULE::METACLUSTER_INTERNAL,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
std::make_unique<MetaclusterInternalDataClusterImpl>(
|
||||
SpecialKeySpace::getMetaclusterInternalApiCommandRange("data_cluster")));
|
||||
registerSpecialKeysImpl(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT,
|
||||
SpecialKeySpace::IMPLTYPE::READWRITE,
|
||||
|
|
|
@ -56,9 +56,6 @@ static bool isAlphaNumeric(const std::string& key) {
|
|||
}
|
||||
} // namespace
|
||||
|
||||
const KeyRangeRef TenantRangeImpl::submoduleRange = KeyRangeRef("tenant/"_sr, "tenant0"_sr);
|
||||
const KeyRangeRef DataClusterMapRangeImpl::submoduleRange = KeyRangeRef("data_cluster/"_sr, "data_cluster0"_sr);
|
||||
|
||||
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
|
||||
{ SpecialKeySpace::MODULE::TRANSACTION,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0")) },
|
||||
|
@ -83,8 +80,9 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
|
|||
{ SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"),
|
||||
LiteralStringRef("\xff\xff/actor_profiler_conf0")) },
|
||||
{ SpecialKeySpace::MODULE::METACLUSTER,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/metacluster/"), LiteralStringRef("\xff\xff/metacluster0")) }
|
||||
{ SpecialKeySpace::MODULE::METACLUSTER_INTERNAL,
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/metacluster_internal/"),
|
||||
LiteralStringRef("\xff\xff/metacluster_internal0")) }
|
||||
};
|
||||
|
||||
std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
|
||||
|
@ -121,7 +119,7 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
|
|||
{ "datadistribution",
|
||||
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
|
||||
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
|
||||
{ "tenant", TenantRangeImpl::submoduleRange.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
{ "tenant", KeyRangeRef("tenant/"_sr, "tenant0"_sr).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
|
||||
};
|
||||
|
||||
std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiCommandToRange = {
|
||||
|
@ -133,9 +131,13 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiComman
|
|||
.withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) }
|
||||
};
|
||||
|
||||
std::unordered_map<std::string, KeyRange> SpecialKeySpace::metaclusterApiCommandToRange = {
|
||||
{ "dataclustermap",
|
||||
DataClusterMapRangeImpl::submoduleRange.withPrefix(moduleToBoundary[MODULE::METACLUSTER].begin) }
|
||||
std::unordered_map<std::string, KeyRange> SpecialKeySpace::metaclusterInternalApiCommandToRange = {
|
||||
{ "management_cluster",
|
||||
MetaclusterInternalManagementClusterImpl::submoduleRange.withPrefix(
|
||||
moduleToBoundary[MODULE::METACLUSTER_INTERNAL].begin) },
|
||||
{ "data_cluster",
|
||||
MetaclusterInternalDataClusterImpl::submoduleRange.withPrefix(
|
||||
moduleToBoundary[MODULE::METACLUSTER_INTERNAL].begin) }
|
||||
};
|
||||
|
||||
std::set<std::string> SpecialKeySpace::options = { "excluded/force",
|
||||
|
@ -2753,377 +2755,3 @@ Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTr
|
|||
// exclude locality with failed option as true.
|
||||
return excludeLocalityCommitActor(ryw, true);
|
||||
}
|
||||
|
||||
TenantRangeImpl::TenantRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
const KeyRangeRef TenantRangeImpl::mapSubRange = KeyRangeRef("map/"_sr, "map0"_sr);
|
||||
const KeyRangeRef TenantRangeImpl::configureSubRange = KeyRangeRef("configure/"_sr, "configure0"_sr);
|
||||
|
||||
KeyRangeRef removePrefix(KeyRangeRef range, KeyRef prefix, KeyRef defaultEnd) {
|
||||
KeyRef begin = range.begin.removePrefix(prefix);
|
||||
KeyRef end;
|
||||
if (range.end.startsWith(prefix)) {
|
||||
end = range.end.removePrefix(TenantRangeImpl::mapSubRange.begin);
|
||||
} else {
|
||||
end = defaultEnd;
|
||||
}
|
||||
|
||||
return KeyRangeRef(begin, end);
|
||||
}
|
||||
|
||||
KeyRef withTenantMapPrefix(Database db, KeyRef key, Arena& ar) {
|
||||
int keySize = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
|
||||
TenantRangeImpl::submoduleRange.begin.size() + key.size();
|
||||
if (db->apiVersionAtLeast(720)) {
|
||||
keySize += TenantRangeImpl::mapSubRange.begin.size();
|
||||
}
|
||||
|
||||
KeyRef prefixedKey = makeString(keySize, ar);
|
||||
uint8_t* mutableKey = mutateString(prefixedKey);
|
||||
|
||||
mutableKey = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.copyTo(mutableKey);
|
||||
mutableKey = TenantRangeImpl::submoduleRange.begin.copyTo(mutableKey);
|
||||
if (db->apiVersionAtLeast(720)) {
|
||||
mutableKey = TenantRangeImpl::mapSubRange.begin.copyTo(mutableKey);
|
||||
}
|
||||
key.copyTo(mutableKey);
|
||||
return prefixedKey;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getTenantList(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
RangeResult* results,
|
||||
GetRangeLimits limitsHint) {
|
||||
std::map<TenantName, TenantMapEntry> tenants =
|
||||
wait(ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), kr.begin, kr.end, limitsHint.rows));
|
||||
|
||||
for (auto tenant : tenants) {
|
||||
json_spirit::mObject tenantEntry;
|
||||
tenantEntry["id"] = tenant.second.id;
|
||||
tenantEntry["prefix"] = tenant.second.prefix.toString();
|
||||
if (tenant.second.tenantGroup.present()) {
|
||||
tenantEntry["tenant_group"] = tenant.second.tenantGroup.get().toString();
|
||||
}
|
||||
std::string tenantEntryString = json_spirit::write_string(json_spirit::mValue(tenantEntry));
|
||||
ValueRef tenantEntryBytes(results->arena(), tenantEntryString);
|
||||
results->push_back(
|
||||
results->arena(),
|
||||
KeyValueRef(withTenantMapPrefix(ryw->getDatabase(), tenant.first, results->arena()), tenantEntryBytes));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<RangeResult> getTenantRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) {
|
||||
|
||||
state RangeResult results;
|
||||
kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
|
||||
if (ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
kr = kr.removePrefix(TenantRangeImpl::submoduleRange.begin);
|
||||
if (kr.intersects(TenantRangeImpl::mapSubRange)) {
|
||||
GetRangeLimits limits = limitsHint;
|
||||
limits.decrement(results);
|
||||
wait(getTenantList(
|
||||
ryw,
|
||||
removePrefix(kr & TenantRangeImpl::mapSubRange, TenantRangeImpl::mapSubRange.begin, "\xff"_sr),
|
||||
&results,
|
||||
limits));
|
||||
}
|
||||
} else {
|
||||
wait(getTenantList(
|
||||
ryw, removePrefix(kr, TenantRangeImpl::submoduleRange.begin, "\xff"_sr), &results, limitsHint));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
Future<RangeResult> TenantRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
return getTenantRange(ryw, kr, limitsHint);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw, TenantName beginTenant, TenantName endTenant) {
|
||||
std::map<TenantName, TenantMapEntry> tenants = wait(
|
||||
ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
if (tenants.size() == CLIENT_KNOBS->TOO_MANY) {
|
||||
TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
|
||||
.detail("BeginTenant", beginTenant)
|
||||
.detail("EndTenant", endTenant);
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false, "delete tenants", "too many tenants to range delete"));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> deleteFutures;
|
||||
for (auto tenant : tenants) {
|
||||
deleteFutures.push_back(ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenant.first));
|
||||
}
|
||||
|
||||
wait(waitForAll(deleteFutures));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<bool> checkTenantGroup(ReadYourWritesTransaction* ryw,
|
||||
Optional<TenantGroupName> currentGroup,
|
||||
Optional<TenantGroupName> desiredGroup) {
|
||||
if (!desiredGroup.present() || currentGroup == desiredGroup) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: check where desired group is assigned and allow if the cluster is the same
|
||||
// SOMEDAY: It should also be possible to change the tenant group when we support tenant movement.
|
||||
wait(delay(0));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> applyTenantConfig(ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries,
|
||||
TenantMapEntry* tenantEntry,
|
||||
bool creatingTenant) {
|
||||
|
||||
state std::vector<std::pair<StringRef, Optional<Value>>>::iterator configItr;
|
||||
for (configItr = configEntries.begin(); configItr != configEntries.end(); ++configItr) {
|
||||
if (configItr->first == "tenant_group"_sr) {
|
||||
state bool isValidTenantGroup = true;
|
||||
if (!creatingTenant) {
|
||||
bool result = wait(checkTenantGroup(ryw, tenantEntry->tenantGroup, configItr->second));
|
||||
isValidTenantGroup = result;
|
||||
}
|
||||
if (isValidTenantGroup) {
|
||||
tenantEntry->tenantGroup = configItr->second;
|
||||
} else {
|
||||
TraceEvent(SevWarn, "CannotChangeTenantGroup")
|
||||
.detail("TenantName", tenantName)
|
||||
.detail("CurrentTenantGroup", tenantEntry->tenantGroup)
|
||||
.detail("DesiredTenantGroup", configItr->second);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set tenant configuration",
|
||||
format("cannot change tenant group for tenant `%s'", tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
} else {
|
||||
TraceEvent(SevWarn, "InvalidTenantConfig")
|
||||
.detail("TenantName", tenantName)
|
||||
.detail("ConfigName", configItr->first);
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false,
|
||||
"set tenant configuration",
|
||||
format("invalid tenant configuration option `%s' for tenant `%s'",
|
||||
configItr->first.toString().c_str(),
|
||||
tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> createTenant(ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
Optional<std::vector<std::pair<StringRef, Optional<Value>>>> configMutations) {
|
||||
state TenantMapEntry tenantEntry;
|
||||
|
||||
if (configMutations.present()) {
|
||||
wait(applyTenantConfig(ryw, tenantName, configMutations.get(), &tenantEntry, true));
|
||||
}
|
||||
|
||||
Optional<TenantMapEntry> entry =
|
||||
wait(ManagementAPI::createTenantTransaction(&ryw->getTransaction(), tenantName, tenantEntry));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeTenantConfig(ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries) {
|
||||
state Optional<TenantMapEntry> tenantEntry = wait(ManagementAPI::tryGetTenantTransaction(ryw, tenantName));
|
||||
if (!tenantEntry.present()) {
|
||||
TraceEvent(SevWarn, "ConfigureUnknownTenant").detail("TenantName", tenantName);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set tenant configuration",
|
||||
format("cannot configure tenant `%s': tenant not found", tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
TenantMapEntry& entry = tenantEntry.get();
|
||||
wait(applyTenantConfig(ryw, tenantName, configEntries, &entry, false));
|
||||
wait(ManagementAPI::configureTenantTransaction(ryw, tenantName, tenantEntry.get()));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> TenantRangeImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
|
||||
std::vector<Future<Void>> tenantManagementFutures;
|
||||
|
||||
std::vector<std::pair<KeyRangeRef, Optional<Value>>> mapMutations;
|
||||
std::map<TenantNameRef, std::vector<std::pair<StringRef, Optional<Value>>>> configMutations;
|
||||
|
||||
for (auto range : ranges) {
|
||||
if (!range.value().first) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
KeyRangeRef adjustedRange =
|
||||
range.range().removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
|
||||
|
||||
if (ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
adjustedRange = adjustedRange.removePrefix(TenantRangeImpl::submoduleRange.begin);
|
||||
}
|
||||
|
||||
if (TenantRangeImpl::mapSubRange.intersects(adjustedRange) || !ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
if (ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
adjustedRange = TenantRangeImpl::mapSubRange & adjustedRange;
|
||||
adjustedRange = removePrefix(adjustedRange, TenantRangeImpl::mapSubRange.begin, "\xff"_sr);
|
||||
} else {
|
||||
adjustedRange = removePrefix(adjustedRange, TenantRangeImpl::submoduleRange.begin, "\xff"_sr);
|
||||
}
|
||||
mapMutations.push_back(std::make_pair(adjustedRange, range.value().second));
|
||||
} else if (TenantRangeImpl::configureSubRange.contains(adjustedRange) && adjustedRange.singleKeyRange()) {
|
||||
StringRef tenantName = adjustedRange.begin.removePrefix(TenantRangeImpl::configureSubRange.begin);
|
||||
StringRef configName = tenantName.eat("/");
|
||||
configMutations[tenantName].push_back(std::make_pair(configName, range.value().second));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto mapMutation : mapMutations) {
|
||||
TenantNameRef tenantName = mapMutation.first.begin;
|
||||
if (mapMutation.second.present()) {
|
||||
Optional<std::vector<std::pair<StringRef, Optional<Value>>>> createMutations;
|
||||
auto itr = configMutations.find(tenantName);
|
||||
if (itr != configMutations.end()) {
|
||||
createMutations = itr->second;
|
||||
configMutations.erase(itr);
|
||||
}
|
||||
tenantManagementFutures.push_back(createTenant(ryw, tenantName, createMutations));
|
||||
} else {
|
||||
// For a single key clear, just issue the delete
|
||||
if (mapMutation.first.singleKeyRange()) {
|
||||
tenantManagementFutures.push_back(
|
||||
ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
|
||||
} else {
|
||||
tenantManagementFutures.push_back(deleteTenantRange(ryw, tenantName, mapMutation.first.end));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto configMutation : configMutations) {
|
||||
tenantManagementFutures.push_back(changeTenantConfig(ryw, configMutation.first, configMutation.second));
|
||||
}
|
||||
|
||||
return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
|
||||
}
|
||||
|
||||
DataClusterMapRangeImpl::DataClusterMapRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
ACTOR Future<RangeResult> getDataClusterList(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) {
|
||||
state KeyRef metaclusterPrefix =
|
||||
kr.begin.substr(0,
|
||||
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin.size() +
|
||||
DataClusterMapRangeImpl::submoduleRange.begin.size());
|
||||
|
||||
kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin);
|
||||
ClusterNameRef beginCluster = kr.begin.removePrefix(DataClusterMapRangeImpl::submoduleRange.begin);
|
||||
|
||||
ClusterNameRef endCluster = kr.end;
|
||||
if (endCluster.startsWith(DataClusterMapRangeImpl::submoduleRange.begin)) {
|
||||
endCluster = endCluster.removePrefix(DataClusterMapRangeImpl::submoduleRange.begin);
|
||||
} else {
|
||||
endCluster = "\xff"_sr;
|
||||
}
|
||||
|
||||
std::map<ClusterName, DataClusterMetadata> clusters = wait(
|
||||
MetaclusterAPI::listClustersTransaction(&ryw->getTransaction(), beginCluster, endCluster, limitsHint.rows));
|
||||
|
||||
RangeResult results;
|
||||
for (auto cluster : clusters) {
|
||||
json_spirit::mObject clusterEntry;
|
||||
clusterEntry["id"] = cluster.second.entry.id;
|
||||
clusterEntry["connection_string"] = cluster.second.connectionString.toString();
|
||||
clusterEntry["capacity"] = cluster.second.entry.capacity.toJson();
|
||||
clusterEntry["allocation"] = cluster.second.entry.allocated.toJson();
|
||||
std::string clusterEntryString = json_spirit::write_string(json_spirit::mValue(clusterEntry));
|
||||
ValueRef clusterEntryBytes(results.arena(), clusterEntryString);
|
||||
results.push_back(results.arena(),
|
||||
KeyValueRef(cluster.first.withPrefix(metaclusterPrefix, results.arena()), clusterEntryBytes));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
Future<RangeResult> DataClusterMapRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
return getDataClusterList(ryw, kr, limitsHint);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> removeClusterRange(ReadYourWritesTransaction* ryw,
|
||||
ClusterName beginCluster,
|
||||
ClusterName endCluster) {
|
||||
std::map<ClusterName, DataClusterMetadata> clusters = wait(MetaclusterAPI::listClustersTransaction(
|
||||
&ryw->getTransaction(), beginCluster, endCluster, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
if (clusters.size() == CLIENT_KNOBS->TOO_MANY) {
|
||||
TraceEvent(SevWarn, "RemoveClustersRangeTooLange")
|
||||
.detail("BeginCluster", beginCluster)
|
||||
.detail("EndCluster", endCluster);
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false, "remove cluster", "too many cluster to range remove"));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> removeFutures;
|
||||
for (auto cluster : clusters) {
|
||||
removeFutures.push_back(MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), cluster.first));
|
||||
}
|
||||
|
||||
wait(waitForAll(removeFutures));
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> DataClusterMapRangeImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
|
||||
std::vector<Future<Void>> clusterManagementFutures;
|
||||
for (auto range : ranges) {
|
||||
if (!range.value().first) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ClusterNameRef clusterName =
|
||||
range.begin()
|
||||
.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin)
|
||||
.removePrefix(DataClusterMapRangeImpl::submoduleRange.begin);
|
||||
|
||||
if (range.value().second.present()) {
|
||||
DataClusterEntry entry;
|
||||
clusterManagementFutures.push_back(success(MetaclusterAPI::registerClusterTransaction(
|
||||
&ryw->getTransaction(), clusterName, range.value().second.get().toString(), entry)));
|
||||
} else {
|
||||
// For a single key clear, just issue the delete
|
||||
if (KeyRangeRef(range.begin(), range.end()).singleKeyRange()) {
|
||||
clusterManagementFutures.push_back(
|
||||
MetaclusterAPI::removeClusterTransaction(&ryw->getTransaction(), clusterName));
|
||||
} else {
|
||||
ClusterNameRef endCluster = range.end().removePrefix(
|
||||
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METACLUSTER).begin);
|
||||
if (endCluster.startsWith(submoduleRange.begin)) {
|
||||
endCluster = endCluster.removePrefix(submoduleRange.begin);
|
||||
} else {
|
||||
endCluster = "\xff"_sr;
|
||||
}
|
||||
clusterManagementFutures.push_back(removeClusterRange(ryw, clusterName, endCluster));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return tag(waitForAll(clusterManagementFutures), Optional<std::string>());
|
||||
}
|
||||
|
|
|
@ -171,7 +171,7 @@ public:
|
|||
ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space
|
||||
GLOBALCONFIG, // Global configuration options synchronized to all nodes
|
||||
MANAGEMENT, // Management-API
|
||||
METACLUSTER, // Configuration for a metacluster
|
||||
METACLUSTER_INTERNAL, // Internal APIs for a metacluster
|
||||
METRICS, // data-distribution metrics
|
||||
TESTONLY, // only used by correctness tests
|
||||
TRACING, // Distributed tracing options
|
||||
|
@ -229,8 +229,8 @@ public:
|
|||
static KeyRef getActorLineageApiCommandPrefix(const std::string& command) {
|
||||
return actorLineageApiCommandToRange.at(command).begin;
|
||||
}
|
||||
static KeyRangeRef getMetaclusterApiCommandRange(const std::string& command) {
|
||||
return metaclusterApiCommandToRange.at(command);
|
||||
static KeyRangeRef getMetaclusterInternalApiCommandRange(const std::string& command) {
|
||||
return metaclusterInternalApiCommandToRange.at(command);
|
||||
}
|
||||
static Key getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option);
|
||||
static const std::set<std::string>& getManagementApiOptionsSet() { return options; }
|
||||
|
@ -264,7 +264,7 @@ private:
|
|||
// module command to special keys range
|
||||
static std::unordered_map<std::string, KeyRange> managementApiCommandToRange;
|
||||
static std::unordered_map<std::string, KeyRange> actorLineageApiCommandToRange;
|
||||
static std::unordered_map<std::string, KeyRange> metaclusterApiCommandToRange;
|
||||
static std::unordered_map<std::string, KeyRange> metaclusterInternalApiCommandToRange;
|
||||
|
||||
// "<command>/<option>"
|
||||
static std::set<std::string> options;
|
||||
|
@ -556,11 +556,22 @@ public:
|
|||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
class DataClusterMapRangeImpl : public SpecialKeyRangeRWImpl {
|
||||
class MetaclusterInternalManagementClusterImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
const static KeyRangeRef submoduleRange;
|
||||
|
||||
explicit DataClusterMapRangeImpl(KeyRangeRef kr);
|
||||
explicit MetaclusterInternalManagementClusterImpl(KeyRangeRef kr);
|
||||
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const override;
|
||||
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
class MetaclusterInternalDataClusterImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
const static KeyRangeRef submoduleRange;
|
||||
|
||||
explicit MetaclusterInternalDataClusterImpl(KeyRangeRef kr);
|
||||
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const override;
|
||||
|
|
|
@ -0,0 +1,297 @@
|
|||
/*
|
||||
* TenantSpecialKeys.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ActorLineageProfiler.h"
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const KeyRangeRef TenantRangeImpl::submoduleRange = KeyRangeRef("tenant/"_sr, "tenant0"_sr);
|
||||
|
||||
TenantRangeImpl::TenantRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
const KeyRangeRef TenantRangeImpl::mapSubRange = KeyRangeRef("map/"_sr, "map0"_sr);
|
||||
const KeyRangeRef TenantRangeImpl::configureSubRange = KeyRangeRef("configure/"_sr, "configure0"_sr);
|
||||
|
||||
KeyRangeRef removePrefix(KeyRangeRef range, KeyRef prefix, KeyRef defaultEnd) {
|
||||
KeyRef begin = range.begin.removePrefix(prefix);
|
||||
KeyRef end;
|
||||
if (range.end.startsWith(prefix)) {
|
||||
end = range.end.removePrefix(TenantRangeImpl::mapSubRange.begin);
|
||||
} else {
|
||||
end = defaultEnd;
|
||||
}
|
||||
|
||||
return KeyRangeRef(begin, end);
|
||||
}
|
||||
|
||||
KeyRef withTenantMapPrefix(Database db, KeyRef key, Arena& ar) {
|
||||
int keySize = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
|
||||
TenantRangeImpl::submoduleRange.begin.size() + key.size();
|
||||
if (db->apiVersionAtLeast(720)) {
|
||||
keySize += TenantRangeImpl::mapSubRange.begin.size();
|
||||
}
|
||||
|
||||
KeyRef prefixedKey = makeString(keySize, ar);
|
||||
uint8_t* mutableKey = mutateString(prefixedKey);
|
||||
|
||||
mutableKey = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.copyTo(mutableKey);
|
||||
mutableKey = TenantRangeImpl::submoduleRange.begin.copyTo(mutableKey);
|
||||
if (db->apiVersionAtLeast(720)) {
|
||||
mutableKey = TenantRangeImpl::mapSubRange.begin.copyTo(mutableKey);
|
||||
}
|
||||
key.copyTo(mutableKey);
|
||||
return prefixedKey;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getTenantList(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
RangeResult* results,
|
||||
GetRangeLimits limitsHint) {
|
||||
std::map<TenantName, TenantMapEntry> tenants =
|
||||
wait(ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), kr.begin, kr.end, limitsHint.rows));
|
||||
|
||||
for (auto tenant : tenants) {
|
||||
json_spirit::mObject tenantEntry;
|
||||
tenantEntry["id"] = tenant.second.id;
|
||||
tenantEntry["prefix"] = tenant.second.prefix.toString();
|
||||
if (tenant.second.tenantGroup.present()) {
|
||||
tenantEntry["tenant_group"] = tenant.second.tenantGroup.get().toString();
|
||||
}
|
||||
std::string tenantEntryString = json_spirit::write_string(json_spirit::mValue(tenantEntry));
|
||||
ValueRef tenantEntryBytes(results->arena(), tenantEntryString);
|
||||
results->push_back(
|
||||
results->arena(),
|
||||
KeyValueRef(withTenantMapPrefix(ryw->getDatabase(), tenant.first, results->arena()), tenantEntryBytes));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<RangeResult> getTenantRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) {
|
||||
|
||||
state RangeResult results;
|
||||
kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
|
||||
if (ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
kr = kr.removePrefix(TenantRangeImpl::submoduleRange.begin);
|
||||
if (kr.intersects(TenantRangeImpl::mapSubRange)) {
|
||||
GetRangeLimits limits = limitsHint;
|
||||
limits.decrement(results);
|
||||
wait(getTenantList(
|
||||
ryw,
|
||||
removePrefix(kr & TenantRangeImpl::mapSubRange, TenantRangeImpl::mapSubRange.begin, "\xff"_sr),
|
||||
&results,
|
||||
limits));
|
||||
}
|
||||
} else {
|
||||
wait(getTenantList(
|
||||
ryw, removePrefix(kr, TenantRangeImpl::submoduleRange.begin, "\xff"_sr), &results, limitsHint));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
Future<RangeResult> TenantRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr,
|
||||
GetRangeLimits limitsHint) const {
|
||||
return getTenantRange(ryw, kr, limitsHint);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw, TenantName beginTenant, TenantName endTenant) {
|
||||
std::map<TenantName, TenantMapEntry> tenants = wait(
|
||||
ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
if (tenants.size() == CLIENT_KNOBS->TOO_MANY) {
|
||||
TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
|
||||
.detail("BeginTenant", beginTenant)
|
||||
.detail("EndTenant", endTenant);
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false, "delete tenants", "too many tenants to range delete"));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> deleteFutures;
|
||||
for (auto tenant : tenants) {
|
||||
deleteFutures.push_back(ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenant.first));
|
||||
}
|
||||
|
||||
wait(waitForAll(deleteFutures));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<bool> checkTenantGroup(ReadYourWritesTransaction* ryw,
|
||||
Optional<TenantGroupName> currentGroup,
|
||||
Optional<TenantGroupName> desiredGroup) {
|
||||
if (!desiredGroup.present() || currentGroup == desiredGroup) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: check where desired group is assigned and allow if the cluster is the same
|
||||
// SOMEDAY: It should also be possible to change the tenant group when we support tenant movement.
|
||||
wait(delay(0));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> applyTenantConfig(ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries,
|
||||
TenantMapEntry* tenantEntry,
|
||||
bool creatingTenant) {
|
||||
|
||||
state std::vector<std::pair<StringRef, Optional<Value>>>::iterator configItr;
|
||||
for (configItr = configEntries.begin(); configItr != configEntries.end(); ++configItr) {
|
||||
if (configItr->first == "tenant_group"_sr) {
|
||||
state bool isValidTenantGroup = true;
|
||||
if (!creatingTenant) {
|
||||
bool result = wait(checkTenantGroup(ryw, tenantEntry->tenantGroup, configItr->second));
|
||||
isValidTenantGroup = result;
|
||||
}
|
||||
if (isValidTenantGroup) {
|
||||
tenantEntry->tenantGroup = configItr->second;
|
||||
} else {
|
||||
TraceEvent(SevWarn, "CannotChangeTenantGroup")
|
||||
.detail("TenantName", tenantName)
|
||||
.detail("CurrentTenantGroup", tenantEntry->tenantGroup)
|
||||
.detail("DesiredTenantGroup", configItr->second);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set tenant configuration",
|
||||
format("cannot change tenant group for tenant `%s'", tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
} else {
|
||||
TraceEvent(SevWarn, "InvalidTenantConfig")
|
||||
.detail("TenantName", tenantName)
|
||||
.detail("ConfigName", configItr->first);
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false,
|
||||
"set tenant configuration",
|
||||
format("invalid tenant configuration option `%s' for tenant `%s'",
|
||||
configItr->first.toString().c_str(),
|
||||
tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> createTenant(ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
Optional<std::vector<std::pair<StringRef, Optional<Value>>>> configMutations) {
|
||||
state TenantMapEntry tenantEntry;
|
||||
|
||||
if (configMutations.present()) {
|
||||
wait(applyTenantConfig(ryw, tenantName, configMutations.get(), &tenantEntry, true));
|
||||
}
|
||||
|
||||
Optional<TenantMapEntry> entry =
|
||||
wait(ManagementAPI::createTenantTransaction(&ryw->getTransaction(), tenantName, tenantEntry));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeTenantConfig(ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
std::vector<std::pair<StringRef, Optional<Value>>> configEntries) {
|
||||
state Optional<TenantMapEntry> tenantEntry = wait(ManagementAPI::tryGetTenantTransaction(ryw, tenantName));
|
||||
if (!tenantEntry.present()) {
|
||||
TraceEvent(SevWarn, "ConfigureUnknownTenant").detail("TenantName", tenantName);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set tenant configuration",
|
||||
format("cannot configure tenant `%s': tenant not found", tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
TenantMapEntry& entry = tenantEntry.get();
|
||||
wait(applyTenantConfig(ryw, tenantName, configEntries, &entry, false));
|
||||
wait(ManagementAPI::configureTenantTransaction(ryw, tenantName, tenantEntry.get()));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Optional<std::string>> TenantRangeImpl::commit(ReadYourWritesTransaction* ryw) {
|
||||
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
|
||||
std::vector<Future<Void>> tenantManagementFutures;
|
||||
|
||||
std::vector<std::pair<KeyRangeRef, Optional<Value>>> mapMutations;
|
||||
std::map<TenantNameRef, std::vector<std::pair<StringRef, Optional<Value>>>> configMutations;
|
||||
|
||||
for (auto range : ranges) {
|
||||
if (!range.value().first) {
|
||||
continue;
|
||||
}
|
||||
|
||||
KeyRangeRef adjustedRange =
|
||||
range.range().removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
|
||||
|
||||
if (ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
adjustedRange = adjustedRange.removePrefix(submoduleRange.begin);
|
||||
}
|
||||
|
||||
if (mapSubRange.intersects(adjustedRange) || !ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
if (ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
adjustedRange = mapSubRange & adjustedRange;
|
||||
adjustedRange = removePrefix(adjustedRange, mapSubRange.begin, "\xff"_sr);
|
||||
} else {
|
||||
adjustedRange = removePrefix(adjustedRange, submoduleRange.begin, "\xff"_sr);
|
||||
}
|
||||
mapMutations.push_back(std::make_pair(adjustedRange, range.value().second));
|
||||
} else if (configureSubRange.contains(adjustedRange) && adjustedRange.singleKeyRange()) {
|
||||
StringRef tenantName = adjustedRange.begin.removePrefix(configureSubRange.begin);
|
||||
StringRef configName = tenantName.eat("/");
|
||||
configMutations[tenantName].push_back(std::make_pair(configName, range.value().second));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto mapMutation : mapMutations) {
|
||||
TenantNameRef tenantName = mapMutation.first.begin;
|
||||
if (mapMutation.second.present()) {
|
||||
Optional<std::vector<std::pair<StringRef, Optional<Value>>>> createMutations;
|
||||
auto itr = configMutations.find(tenantName);
|
||||
if (itr != configMutations.end()) {
|
||||
createMutations = itr->second;
|
||||
configMutations.erase(itr);
|
||||
}
|
||||
tenantManagementFutures.push_back(createTenant(ryw, tenantName, createMutations));
|
||||
} else {
|
||||
// For a single key clear, just issue the delete
|
||||
if (mapMutation.first.singleKeyRange()) {
|
||||
tenantManagementFutures.push_back(
|
||||
ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
|
||||
} else {
|
||||
tenantManagementFutures.push_back(deleteTenantRange(ryw, tenantName, mapMutation.first.end));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto configMutation : configMutations) {
|
||||
tenantManagementFutures.push_back(changeTenantConfig(ryw, configMutation.first, configMutation.second));
|
||||
}
|
||||
|
||||
return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
|
||||
}
|
Loading…
Reference in New Issue