Merge pull request #6428 from sfc-gh-ajbeamon/fdb-tenant-add-interfaces

Add server-side and protocol support for tenants
This commit is contained in:
A.J. Beamon 2022-03-08 15:33:18 -08:00 committed by GitHub
commit 402fa4dd9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1090 additions and 194 deletions

View File

@ -76,7 +76,8 @@ extern "C" DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_erro
return code == error_code_not_committed || code == error_code_transaction_too_old ||
code == error_code_future_version || code == error_code_database_locked ||
code == error_code_proxy_memory_limit_exceeded || code == error_code_batch_transaction_throttled ||
code == error_code_process_behind || code == error_code_tag_throttled;
code == error_code_process_behind || code == error_code_tag_throttled ||
code == error_code_unknown_tenant;
}
return false;
}

View File

@ -160,6 +160,22 @@ FoundationDB may return the following error codes from API functions. If you nee
| special_keys_api_failure | 2117| Api call through special keys failed. For more information, read the |
| | | ``0xff0xff/error_message`` key |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| tenant_name_required | 2130| Tenant name must be specified to access data in the cluster |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| tenant_not_found | 2131| Tenant does not exist |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| tenant_already_exists | 2132| A tenant with the given name already exists |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| tenant_not_empty | 2133| Cannot delete a non-empty tenant |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| invalid_tenant_name | 2134| Tenant name cannot begin with \xff |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| tenant_prefix_allocator_conflict | 2135| The database already has keys stored at the prefix allocated for the tenant |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| tenants_disabled | 2136| Tenants have been disabled in the cluster |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| unknown_tenant | 2137| Tenant is not available from this server |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| api_version_unset | 2200| API version is not set |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| api_version_already_set | 2201| API version may be set only once |

View File

@ -742,6 +742,12 @@
"disabled",
"gradual",
"aggressive"
]},
"tenant_mode": {
"$enum":[
"disabled",
"optional_experimental",
"required_experimental"
]}
},
"data":{

View File

@ -138,6 +138,7 @@ set(FDBCLIENT_SRCS
TagThrottle.actor.h
TaskBucket.actor.cpp
TaskBucket.h
Tenant.h
TestKnobCollection.cpp
TestKnobCollection.h
ThreadSafeTransaction.cpp

View File

@ -116,6 +116,8 @@ struct ClientDBInfo {
Optional<Value> forward;
std::vector<VersionHistory> history;
TenantMode tenantMode;
ClientDBInfo() {}
bool operator==(ClientDBInfo const& r) const { return id == r.id; }
@ -126,7 +128,7 @@ struct ClientDBInfo {
if constexpr (!is_fb_function<Archive>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, grvProxies, commitProxies, id, forward, history);
serializer(ar, grvProxies, commitProxies, id, forward, history, tenantMode);
}
};
@ -167,12 +169,16 @@ struct CommitTransactionRequest : TimedRequest {
Optional<ClientTrCommitCostEstimation> commitCostEstimation;
Optional<TagSet> tagSet;
CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {}
CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {}
TenantInfo tenantInfo;
CommitTransactionRequest() : CommitTransactionRequest(TenantInfo(), SpanID()) {}
CommitTransactionRequest(TenantInfo const& tenantInfo, SpanID const& context)
: spanContext(context), flags(0), tenantInfo(tenantInfo) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet, spanContext);
serializer(
ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet, spanContext, tenantInfo);
}
};
@ -289,6 +295,7 @@ struct GetReadVersionRequest : TimedRequest {
struct GetKeyServerLocationsReply {
constexpr static FileIdentifier file_identifier = 10636023;
Arena arena;
TenantMapEntry tenantEntry;
std::vector<std::pair<KeyRangeRef, std::vector<StorageServerInterface>>> results;
// if any storage servers in results have a TSS pair, that mapping is in here
@ -296,7 +303,7 @@ struct GetKeyServerLocationsReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, results, resultsTssMapping, arena);
serializer(ar, results, resultsTssMapping, tenantEntry, arena);
}
};
@ -304,24 +311,34 @@ struct GetKeyServerLocationsRequest {
constexpr static FileIdentifier file_identifier = 9144680;
Arena arena;
SpanID spanContext;
Optional<TenantNameRef> tenant;
KeyRef begin;
Optional<KeyRef> end;
int limit;
bool reverse;
ReplyPromise<GetKeyServerLocationsReply> reply;
GetKeyServerLocationsRequest() : limit(0), reverse(false) {}
// This version is used to specify the minimum metadata version a proxy must have in order to declare that
// a tenant is not present. If the metadata version is lower, the proxy must wait in case the tenant gets
// created. If latestVersion is specified, then the proxy will wait until it is sure that it has received
// updates from other proxies before answering.
Version minTenantVersion;
GetKeyServerLocationsRequest() : limit(0), reverse(false), minTenantVersion(latestVersion) {}
GetKeyServerLocationsRequest(SpanID spanContext,
Optional<TenantNameRef> const& tenant,
KeyRef const& begin,
Optional<KeyRef> const& end,
int limit,
bool reverse,
Version minTenantVersion,
Arena const& arena)
: arena(arena), spanContext(spanContext), begin(begin), end(end), limit(limit), reverse(reverse) {}
: arena(arena), spanContext(spanContext), tenant(tenant), begin(begin), end(end), limit(limit), reverse(reverse),
minTenantVersion(minTenantVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, limit, reverse, reply, spanContext, arena);
serializer(ar, begin, end, limit, reverse, reply, spanContext, tenant, minTenantVersion, arena);
}
};

View File

@ -50,6 +50,7 @@ void DatabaseConfiguration::resetInternal() {
perpetualStorageWiggleSpeed = 0;
perpetualStorageWiggleLocality = "0";
storageMigrationType = StorageMigrationType::DEFAULT;
tenantMode = TenantMode::DISABLED;
}
int toInt(ValueRef const& v) {
@ -209,7 +210,8 @@ bool DatabaseConfiguration::isValid() const {
// We cannot specify regions with three_datacenter replication
(perpetualStorageWiggleSpeed == 0 || perpetualStorageWiggleSpeed == 1) &&
isValidPerpetualStorageWiggleLocality(perpetualStorageWiggleLocality) &&
storageMigrationType != StorageMigrationType::UNSET)) {
storageMigrationType != StorageMigrationType::UNSET && tenantMode >= TenantMode::DISABLED &&
tenantMode < TenantMode::END)) {
return false;
}
std::set<Key> dcIds;
@ -402,6 +404,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["perpetual_storage_wiggle"] = perpetualStorageWiggleSpeed;
result["perpetual_storage_wiggle_locality"] = perpetualStorageWiggleLocality;
result["storage_migration_type"] = storageMigrationType.toString();
result["tenant_mode"] = tenantMode.toString();
return result;
}
@ -625,6 +628,9 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
} else if (ck == LiteralStringRef("storage_migration_type")) {
parse((&type), value);
storageMigrationType = (StorageMigrationType::MigrationType)type;
} else if (ck == LiteralStringRef("tenant_mode")) {
parse((&type), value);
tenantMode = (TenantMode::Mode)type;
} else if (ck == LiteralStringRef("proxies")) {
overwriteProxiesCount();
} else {

View File

@ -250,6 +250,8 @@ struct DatabaseConfiguration {
// Storage Migration Type
StorageMigrationType storageMigrationType;
TenantMode tenantMode;
// Excluded servers (no state should be here)
bool isExcludedServer(NetworkAddressList) const;
bool isExcludedLocality(const LocalityData& locality) const;

View File

@ -52,6 +52,10 @@ void KeySelectorRef::setKey(KeyRef const& key) {
this->key = key;
}
void KeySelectorRef::setKeyUnlimited(KeyRef const& key) {
this->key = key;
}
std::string KeySelectorRef::toString() const {
if (offset > 0) {
if (orEqual)

View File

@ -546,6 +546,7 @@ public:
KeyRef getKey() const { return key; }
void setKey(KeyRef const& key);
void setKeyUnlimited(KeyRef const& key);
std::string toString() const;
@ -1180,6 +1181,42 @@ struct StorageMigrationType {
uint32_t type;
};
struct TenantMode {
// These enumerated values are stored in the database configuration, so can NEVER be changed. Only add new ones
// just before END.
// Note: OPTIONAL_TENANT is not named OPTIONAL because of a collision with a Windows macro.
enum Mode { DISABLED = 0, OPTIONAL_TENANT = 1, REQUIRED = 2, END = 3 };
TenantMode() : mode(DISABLED) {}
TenantMode(Mode mode) : mode(mode) {
if ((uint32_t)mode >= END) {
this->mode = DISABLED;
}
}
operator Mode() const { return Mode(mode); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mode);
}
std::string toString() const {
switch (mode) {
case DISABLED:
return "disabled";
case OPTIONAL_TENANT:
return "optional_experimental";
case REQUIRED:
return "required_experimental";
default:
ASSERT(false);
}
return "";
}
uint32_t mode;
};
inline bool isValidPerpetualStorageWiggleLocality(std::string locality) {
int pos = locality.find(':');
// locality should be either 0 or in the format '<non_empty_string>:<non_empty_string>'

View File

@ -174,6 +174,20 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
}
out[p + key] = format("%d", type);
}
if (key == "tenant_mode") {
TenantMode tenantMode;
if (value == "disabled") {
tenantMode = TenantMode::DISABLED;
} else if (value == "optional_experimental") {
tenantMode = TenantMode::OPTIONAL_TENANT;
} else if (value == "required_experimental") {
tenantMode = TenantMode::REQUIRED;
} else {
printf("Error: Only disabled|optional_experimental|required_experimental are valid for tenant_mode.\n");
return out;
}
out[p + key] = format("%d", tenantMode);
}
return out;
}

View File

@ -2566,10 +2566,17 @@ ACTOR Future<std::pair<KeyRange, Reference<LocationInfo>>> getKeyLocation_intern
++cx->transactionKeyServerLocationRequests;
choose {
when(wait(cx->onProxiesChanged())) {}
when(GetKeyServerLocationsReply rep = wait(basicLoadBalance(
cx->getCommitProxies(useProvisionalProxies),
when(GetKeyServerLocationsReply rep =
wait(basicLoadBalance(cx->getCommitProxies(useProvisionalProxies),
&CommitProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span.context, key, Optional<KeyRef>(), 100, isBackward, key.arena()),
GetKeyServerLocationsRequest(span.context,
Optional<TenantNameRef>(),
key,
Optional<KeyRef>(),
100,
isBackward,
latestVersion,
key.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
if (debugID.present())
@ -2668,10 +2675,17 @@ ACTOR Future<std::vector<std::pair<KeyRange, Reference<LocationInfo>>>> getKeyRa
++cx->transactionKeyServerLocationRequests;
choose {
when(wait(cx->onProxiesChanged())) {}
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
cx->getCommitProxies(useProvisionalProxies),
when(GetKeyServerLocationsReply _rep =
wait(basicLoadBalance(cx->getCommitProxies(useProvisionalProxies),
&CommitProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(span.context, keys.begin, keys.end, limit, reverse, keys.arena()),
GetKeyServerLocationsRequest(span.context,
Optional<TenantNameRef>(),
keys.begin,
keys.end,
limit,
reverse,
latestVersion,
keys.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
state GetKeyServerLocationsReply rep = _rep;
@ -2880,6 +2894,7 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
ssi.second,
&StorageServerInterface::getValue,
GetValueRequest(span.context,
TenantInfo(),
key,
ver,
trState->cx->sampleReadTags() ? trState->options.readTags
@ -2985,6 +3000,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState, KeySelector k, Fut
++trState->cx->transactionPhysicalReads;
GetKeyRequest req(span.context,
TenantInfo(),
k,
version.get(),
trState->cx->sampleReadTags() ? trState->options.readTags : Optional<TagSet>(),
@ -3117,6 +3133,7 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
ssi.second,
&StorageServerInterface::watchValue,
WatchValueRequest(span.context,
TenantInfo(),
parameters->key,
parameters->value,
ver,
@ -4484,7 +4501,8 @@ Transaction::Transaction(Database const& cx)
cx->taskID,
generateSpanID(cx->transactionTracingSample),
createTrLogInfoProbabilistically(cx))),
span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) {
span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF),
tr(TenantInfo(), trState->spanID) {
if (DatabaseContext::debugUseTags) {
debugAddTags(trState);
}
@ -5113,7 +5131,7 @@ void TransactionOptions::reset(Database const& cx) {
void Transaction::resetImpl(bool generateNewSpan) {
flushTrLogsIfEnabled();
trState = trState->cloneAndReset(createTrLogInfoProbabilistically(trState->cx), generateNewSpan);
tr = CommitTransactionRequest(trState->spanID);
tr = CommitTransactionRequest(TenantInfo(), trState->spanID);
readVersion = Future<Version>();
metadataVersion = Promise<Optional<Key>>();
extraConflictRanges.clear();
@ -6684,7 +6702,7 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Reference<Transa
for (int i = 0; i < nLocs; i++) {
partBegin = (i == 0) ? keys.begin : locations[i].first.begin;
partEnd = (i == nLocs - 1) ? keys.end : locations[i].first.end;
SplitRangeRequest req(KeyRangeRef(partBegin, partEnd), chunkSize);
SplitRangeRequest req(TenantInfo(), KeyRangeRef(partBegin, partEnd), chunkSize);
fReplies[i] = loadBalance(locations[i].second->locations(),
&StorageServerInterface::getRangeSplitPoints,
req,

View File

@ -809,6 +809,12 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"disabled",
"aggressive",
"gradual"
]},
"tenant_mode": {
"$enum":[
"disabled",
"optional_experimental",
"required_experimental"
]}
},
"data":{

View File

@ -106,6 +106,7 @@ void TSS_traceMismatch(TraceEvent& event, const GetKeyRequest& req, const GetKey
event
.detail("KeySelector",
format("%s%s:%d", req.sel.orEqual ? "=" : "", req.sel.getKey().printable().c_str(), req.sel.offset))
.detail("Tenant", req.tenantInfo.name)
.detail("Version", req.version)
.detail("SSReply",
format("%s%s:%d", src.sel.orEqual ? "=" : "", src.sel.getKey().printable().c_str(), src.sel.offset))
@ -144,6 +145,7 @@ void TSS_traceMismatch(TraceEvent& event,
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
.detail("End",
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
.detail("Tenant", req.tenantInfo.name)
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)
@ -183,6 +185,7 @@ void TSS_traceMismatch(TraceEvent& event,
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
.detail("End",
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
.detail("Tenant", req.tenantInfo.name)
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)
@ -223,6 +226,7 @@ void TSS_traceMismatch(TraceEvent& event,
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
.detail("End",
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
.detail("Tenant", req.tenantInfo.name)
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)

View File

@ -33,6 +33,7 @@
#include "fdbrpc/TSSComparison.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/Tenant.h"
#include "flow/UnitTest.h"
// Dead code, removed in the next protocol version
@ -212,6 +213,21 @@ struct ServerCacheInfo {
}
};
struct TenantInfo {
static const int64_t INVALID_TENANT = -1;
Optional<TenantName> name;
int64_t tenantId;
TenantInfo() : tenantId(INVALID_TENANT) {}
TenantInfo(TenantName name, int64_t tenantId) : name(name), tenantId(tenantId) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, name, tenantId);
}
};
struct GetValueReply : public LoadBalancedReply {
constexpr static FileIdentifier file_identifier = 1378929;
Optional<Value> value;
@ -229,6 +245,7 @@ struct GetValueReply : public LoadBalancedReply {
struct GetValueRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 8454530;
SpanID spanContext;
TenantInfo tenantInfo;
Key key;
Version version;
Optional<TagSet> tags;
@ -236,12 +253,17 @@ struct GetValueRequest : TimedRequest {
ReplyPromise<GetValueReply> reply;
GetValueRequest() {}
GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID)
: spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {}
GetValueRequest(SpanID spanContext,
const TenantInfo& tenantInfo,
const Key& key,
Version ver,
Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), tenantInfo(tenantInfo), key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, version, tags, debugID, reply, spanContext);
serializer(ar, key, version, tags, debugID, reply, spanContext, tenantInfo);
}
};
@ -262,6 +284,7 @@ struct WatchValueReply {
struct WatchValueRequest {
constexpr static FileIdentifier file_identifier = 14747733;
SpanID spanContext;
TenantInfo tenantInfo;
Key key;
Optional<Value> value;
Version version;
@ -270,17 +293,20 @@ struct WatchValueRequest {
ReplyPromise<WatchValueReply> reply;
WatchValueRequest() {}
WatchValueRequest(SpanID spanContext,
TenantInfo tenantInfo,
const Key& key,
Optional<Value> value,
Version ver,
Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
: spanContext(spanContext), tenantInfo(tenantInfo), key(key), value(value), version(ver), tags(tags),
debugID(debugID) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, value, version, tags, debugID, reply, spanContext);
serializer(ar, key, value, version, tags, debugID, reply, spanContext, tenantInfo);
}
};
@ -304,6 +330,7 @@ struct GetKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
// This is a dummy field there has never been used.
// TODO: Get rid of this by constexpr or other template magic in getRange
@ -316,9 +343,22 @@ struct GetKeyValuesRequest : TimedRequest {
ReplyPromise<GetKeyValuesReply> reply;
GetKeyValuesRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
serializer(ar,
begin,
end,
version,
limit,
limitBytes,
isFetchKeys,
tags,
debugID,
reply,
spanContext,
tenantInfo,
arena);
}
};
@ -342,6 +382,7 @@ struct GetKeyValuesAndFlatMapRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795747;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
KeyRef mapper;
Version version; // or latestVersion
@ -352,10 +393,23 @@ struct GetKeyValuesAndFlatMapRequest : TimedRequest {
ReplyPromise<GetKeyValuesAndFlatMapReply> reply;
GetKeyValuesAndFlatMapRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(
ar, begin, end, mapper, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
serializer(ar,
begin,
end,
mapper,
version,
limit,
limitBytes,
isFetchKeys,
tags,
debugID,
reply,
spanContext,
tenantInfo,
arena);
}
};
@ -390,6 +444,7 @@ struct GetKeyValuesStreamRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef begin, end;
Version version; // or latestVersion
int limit, limitBytes;
@ -399,9 +454,22 @@ struct GetKeyValuesStreamRequest {
ReplyPromiseStream<GetKeyValuesStreamReply> reply;
GetKeyValuesStreamRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
serializer(ar,
begin,
end,
version,
limit,
limitBytes,
isFetchKeys,
tags,
debugID,
reply,
spanContext,
tenantInfo,
arena);
}
};
@ -423,6 +491,7 @@ struct GetKeyRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 10457870;
SpanID spanContext;
Arena arena;
TenantInfo tenantInfo;
KeySelectorRef sel;
Version version; // or latestVersion
Optional<TagSet> tags;
@ -430,16 +499,18 @@ struct GetKeyRequest : TimedRequest {
ReplyPromise<GetKeyReply> reply;
GetKeyRequest() {}
GetKeyRequest(SpanID spanContext,
TenantInfo tenantInfo,
KeySelectorRef const& sel,
Version version,
Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), sel(sel), version(version), debugID(debugID) {}
: spanContext(spanContext), tenantInfo(tenantInfo), sel(sel), version(version), debugID(debugID) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, sel, version, tags, debugID, reply, spanContext, arena);
serializer(ar, sel, version, tags, debugID, reply, spanContext, tenantInfo, arena);
}
};
@ -668,19 +739,22 @@ struct SplitRangeReply {
serializer(ar, splitPoints);
}
};
struct SplitRangeRequest {
constexpr static FileIdentifier file_identifier = 10725174;
Arena arena;
TenantInfo tenantInfo;
KeyRangeRef keys;
int64_t chunkSize;
ReplyPromise<SplitRangeReply> reply;
SplitRangeRequest() {}
SplitRangeRequest(KeyRangeRef const& keys, int64_t chunkSize) : keys(arena, keys), chunkSize(chunkSize) {}
SplitRangeRequest(TenantInfo tenantInfo, KeyRangeRef const& keys, int64_t chunkSize)
: tenantInfo(tenantInfo), keys(arena, keys), chunkSize(chunkSize) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, chunkSize, reply, arena);
serializer(ar, keys, chunkSize, reply, tenantInfo, arena);
}
};

View File

@ -1322,6 +1322,21 @@ BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) {
return interf;
}
Value encodeTenantEntry(TenantMapEntry const& tenantEntry) {
return ObjectWriter::toValue(tenantEntry, IncludeVersion());
}
TenantMapEntry decodeTenantEntry(ValueRef const& value) {
TenantMapEntry entry;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(entry);
return entry;
}
const KeyRangeRef tenantMapKeys("\xff/tenantMap/"_sr, "\xff/tenantMap0"_sr);
const KeyRef tenantMapPrefix = tenantMapKeys.begin;
const KeyRef tenantMapPrivatePrefix = "\xff\xff/tenantMap/"_sr;
// for tests
void testSSISerdes(StorageServerInterface const& ssi, bool useFB) {
printf("ssi=\nid=%s\nlocality=%s\nisTss=%s\ntssId=%s\naddress=%s\ngetValue=%s\n\n\n",

View File

@ -27,6 +27,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h to remove this depdendency
#include "fdbclient/StorageServerInterface.h"
#include "Tenant.h"
// Don't warn on constants being defined in this file.
#pragma clang diagnostic push
@ -593,6 +594,14 @@ UID decodeBlobWorkerListKey(KeyRef const& key);
const Value blobWorkerListValue(BlobWorkerInterface const& interface);
BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value);
// State for the tenant map
extern const KeyRangeRef tenantMapKeys;
extern const KeyRef tenantMapPrefix;
extern const KeyRef tenantMapPrivatePrefix;
Value encodeTenantEntry(TenantMapEntry const& tenantEntry);
TenantMapEntry decodeTenantEntry(ValueRef const& value);
#pragma clang diagnostic pop
#endif

86
fdbclient/Tenant.h Normal file
View File

@ -0,0 +1,86 @@
/*
* Tenant.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_TENANT_H
#define FDBCLIENT_TENANT_H
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbclient/VersionedMap.h"
#include "flow/flat_buffers.h"
typedef StringRef TenantNameRef;
typedef Standalone<TenantNameRef> TenantName;
struct TenantMapEntry {
constexpr static FileIdentifier file_identifier = 12247338;
static Key idToPrefix(int64_t id) {
int64_t swapped = bigEndian64(id);
return StringRef(reinterpret_cast<const uint8_t*>(&swapped), 8);
}
static int64_t prefixToId(KeyRef prefix) {
ASSERT(prefix.size() == 8);
int64_t id = *reinterpret_cast<const int64_t*>(prefix.begin());
id = bigEndian64(id);
ASSERT(id >= 0);
return id;
}
int64_t id;
Key prefix;
private:
void initPrefix(KeyRef subspace) {
ASSERT(id >= 0);
prefix = makeString(8 + subspace.size());
uint8_t* data = mutateString(prefix);
memcpy(data, subspace.begin(), subspace.size());
int64_t swapped = bigEndian64(id);
memcpy(data + subspace.size(), &swapped, 8);
}
public:
TenantMapEntry() : id(-1) {}
TenantMapEntry(int64_t id, KeyRef subspace) : id(id) { initPrefix(subspace); }
template <class Ar>
void serialize(Ar& ar) {
KeyRef subspace;
if (ar.isDeserializing) {
serializer(ar, id, subspace);
if (id >= 0) {
initPrefix(subspace);
}
} else {
ASSERT(prefix.size() >= 8 || (prefix.empty() && id == -1));
if (!prefix.empty()) {
subspace = prefix.substr(0, prefix.size() - 8);
}
serializer(ar, id, subspace);
}
}
};
typedef VersionedMap<TenantName, TenantMapEntry> TenantMap;
typedef VersionedMap<Key, TenantName> TenantPrefixIndex;
#endif

View File

@ -877,7 +877,13 @@ public:
Version at;
};
ViewAtVersion at(Version v) const { return ViewAtVersion(getRoot(v), v); }
ViewAtVersion at(Version v) const {
if (v == ::latestVersion) {
return atLatest();
}
return ViewAtVersion(getRoot(v), v);
}
ViewAtVersion atLatest() const { return ViewAtVersion(roots.back().second, latestVersion); }
bool isClearContaining(ViewAtVersion const& view, KeyRef key) {

View File

@ -70,16 +70,19 @@ public:
Reference<ILogSystem> logSystem_,
LogPushData* toCommit_,
bool& confChange_,
Version version,
Version popVersion_,
bool initialCommit_)
: spanContext(spanContext_), dbgid(proxyCommitData_.dbgid), arena(arena_), mutations(mutations_),
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), confChange(confChange_),
logSystem(logSystem_), popVersion(popVersion_), vecBackupKeys(&proxyCommitData_.vecBackupKeys),
keyInfo(&proxyCommitData_.keyInfo), cacheInfo(&proxyCommitData_.cacheInfo),
logSystem(logSystem_), version(version), popVersion(popVersion_),
vecBackupKeys(&proxyCommitData_.vecBackupKeys), keyInfo(&proxyCommitData_.keyInfo),
cacheInfo(&proxyCommitData_.cacheInfo),
uid_applyMutationsData(proxyCommitData_.firstProxy ? &proxyCommitData_.uid_applyMutationsData : nullptr),
commit(proxyCommitData_.commit), cx(proxyCommitData_.cx), commitVersion(&proxyCommitData_.committedVersion),
commit(proxyCommitData_.commit), cx(proxyCommitData_.cx), committedVersion(&proxyCommitData_.committedVersion),
storageCache(&proxyCommitData_.storageCache), tag_popped(&proxyCommitData_.tag_popped),
tssMapping(&proxyCommitData_.tssMapping), initialCommit(initialCommit_) {}
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
initialCommit(initialCommit_) {}
private:
// The following variables are incoming parameters
@ -102,6 +105,7 @@ private:
bool& confChange;
Reference<ILogSystem> logSystem = Reference<ILogSystem>();
Version version = invalidVersion;
Version popVersion = 0;
KeyRangeMap<std::set<Key>>* vecBackupKeys = nullptr;
KeyRangeMap<ServerCacheInfo>* keyInfo = nullptr;
@ -109,11 +113,13 @@ private:
std::map<Key, ApplyMutationsData>* uid_applyMutationsData = nullptr;
RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>();
Database cx = Database();
NotifiedVersion* commitVersion = nullptr;
NotifiedVersion* committedVersion = nullptr;
std::map<UID, Reference<StorageInfo>>* storageCache = nullptr;
std::map<Tag, Version>* tag_popped = nullptr;
std::unordered_map<UID, StorageServerInterface>* tssMapping = nullptr;
std::map<TenantName, TenantMapEntry>* tenantMap = nullptr;
// true if the mutations were already written to the txnStateStore as part of recovery
bool initialCommit = false;
@ -445,7 +451,7 @@ private:
beginValue.present() ? BinaryReader::fromStringRef<Version>(beginValue.get(), Unversioned()) : 0,
&p.endVersion,
commit,
commitVersion,
committedVersion,
p.keyVersion);
}
@ -571,6 +577,41 @@ private:
TEST(true); // Snapshot created, setting writeRecoveryKey in txnStateStore
}
void checkSetTenantMapPrefix(MutationRef m) {
if (m.param1.startsWith(tenantMapPrefix)) {
if (tenantMap) {
ASSERT(version != invalidVersion);
Standalone<StringRef> tenantName = m.param1.removePrefix(tenantMapPrefix);
TenantMapEntry tenantEntry = decodeTenantEntry(m.param2);
TraceEvent("CommitProxyInsertTenant", dbgid).detail("Tenant", tenantName).detail("Version", version);
(*tenantMap)[tenantName] = tenantEntry;
}
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
}
// For now, this goes to all storage servers.
// Eventually, we can have each SS store tenants that apply only to the data stored on it.
if (toCommit) {
std::set<Tag> allTags;
auto allServers = txnStateStore->readRange(serverTagKeys).get();
for (auto& kv : allServers) {
allTags.insert(decodeServerTagValue(kv.value));
}
toCommit->addTags(allTags);
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
toCommit->writeTypedMessage(privatized);
}
TEST(true); // Tenant added to map
}
}
void checkClearKeyServerKeys(KeyRangeRef range) {
if (!keyServersKeys.intersects(range)) {
return;
@ -893,6 +934,51 @@ private:
}
}
void checkClearTenantMapPrefix(KeyRangeRef range) {
if (tenantMapKeys.intersects(range)) {
if (tenantMap) {
ASSERT(version != invalidVersion);
StringRef startTenant = std::max(range.begin, tenantMapPrefix).removePrefix(tenantMapPrefix);
StringRef endTenant = (range.end.startsWith(tenantMapPrefix) ? range.end : tenantMapKeys.end)
.removePrefix(tenantMapPrefix);
TraceEvent("CommitProxyEraseTenants", dbgid)
.detail("BeginTenant", startTenant)
.detail("EndTenant", endTenant)
.detail("Version", version);
auto startItr = tenantMap->lower_bound(startTenant);
auto endItr = tenantMap->lower_bound(endTenant);
tenantMap->erase(startItr, endItr);
}
if (!initialCommit) {
txnStateStore->clear(range);
}
// For now, this goes to all storage servers.
// Eventually, we can have each SS store tenants that apply only to the data stored on it.
if (toCommit) {
std::set<Tag> allTags;
auto allServers = txnStateStore->readRange(serverTagKeys).get();
for (auto& kv : allServers) {
allTags.insert(decodeServerTagValue(kv.value));
}
toCommit->addTags(allTags);
MutationRef privatized;
privatized.type = MutationRef::ClearRange;
privatized.param1 = range.begin.withPrefix(systemKeys.begin, arena);
privatized.param2 = range.end.withPrefix(systemKeys.begin, arena);
toCommit->writeTypedMessage(privatized);
}
TEST(true); // Tenant cleared from map
}
}
void checkClearMiscRangeKeys(KeyRangeRef range) {
if (initialCommit) {
return;
@ -1009,6 +1095,7 @@ public:
checkSetGlobalKeys(m);
checkSetWriteRecoverKey(m);
checkSetMinRequiredCommitVersionKey(m);
checkSetTenantMapPrefix(m);
checkSetOtherKeys(m);
} else if (m.type == MutationRef::ClearRange && isSystemKey(m.param2)) {
KeyRangeRef range(m.param1, m.param2);
@ -1024,6 +1111,7 @@ public:
checkClearLogRangesRange(range);
checkClearTssMappingKeys(m, range);
checkClearTssQuarantineKeys(m, range);
checkClearTenantMapPrefix(range);
checkClearMiscRangeKeys(range);
}
}
@ -1052,11 +1140,20 @@ void applyMetadataMutations(SpanID const& spanContext,
const VectorRef<MutationRef>& mutations,
LogPushData* toCommit,
bool& confChange,
Version version,
Version popVersion,
bool initialCommit) {
ApplyMetadataMutationsImpl(
spanContext, arena, mutations, proxyCommitData, logSystem, toCommit, confChange, popVersion, initialCommit)
ApplyMetadataMutationsImpl(spanContext,
arena,
mutations,
proxyCommitData,
logSystem,
toCommit,
confChange,
version,
popVersion,
initialCommit)
.apply();
}

View File

@ -56,6 +56,7 @@ void applyMetadataMutations(SpanID const& spanContext,
const VectorRef<MutationRef>& mutations,
LogPushData* pToCommit,
bool& confChange,
Version version,
Version popVersion,
bool initialCommit);
void applyMetadataMutations(SpanID const& spanContext,

View File

@ -226,6 +226,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig;
dbInfo.myLocality = db->serverInfo->get().myLocality;
dbInfo.client = ClientDBInfo();
dbInfo.client.tenantMode = db->config.tenantMode;
TraceEvent("CCWDB", cluster->id)
.detail("NewMaster", dbInfo.master.id().toString())
@ -980,19 +981,23 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
// Construct the client information
if (db->clientInfo->get().commitProxies != req.commitProxies ||
db->clientInfo->get().grvProxies != req.grvProxies) {
db->clientInfo->get().grvProxies != req.grvProxies ||
db->clientInfo->get().tenantMode != db->config.tenantMode) {
TraceEvent("PublishNewClientInfo", self->id)
.detail("Master", dbInfo.master.id())
.detail("GrvProxies", db->clientInfo->get().grvProxies)
.detail("ReqGrvProxies", req.grvProxies)
.detail("CommitProxies", db->clientInfo->get().commitProxies)
.detail("ReqCPs", req.commitProxies);
.detail("ReqCPs", req.commitProxies)
.detail("TenantMode", db->clientInfo->get().tenantMode.toString())
.detail("ReqTenantMode", db->config.tenantMode.toString());
isChanged = true;
// TODO why construct a new one and not just copy the old one and change proxies + id?
ClientDBInfo clientInfo;
clientInfo.id = deterministicRandom()->randomUniqueID();
clientInfo.commitProxies = req.commitProxies;
clientInfo.grvProxies = req.grvProxies;
clientInfo.tenantMode = db->config.tenantMode;
db->clientInfo->set(clientInfo);
dbInfo.client = db->clientInfo->get();
}

View File

@ -410,6 +410,35 @@ ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Referen
return reply;
}
ErrorOr<Optional<TenantMapEntry>> getTenantEntry(ProxyCommitData* commitData,
Optional<TenantNameRef> tenant,
Optional<int64_t> tenantId,
bool logOnFailure) {
if (tenant.present()) {
auto itr = commitData->tenantMap.find(tenant.get());
if (itr == commitData->tenantMap.end()) {
if (logOnFailure) {
TraceEvent(SevWarn, "CommitProxyUnknownTenant", commitData->dbgid).detail("Tenant", tenant.get());
}
return unknown_tenant();
} else if (tenantId.present() && tenantId.get() != itr->second.id) {
if (logOnFailure) {
TraceEvent(SevWarn, "CommitProxyTenantIdMismatch", commitData->dbgid)
.detail("Tenant", tenant.get())
.detail("TenantId", tenantId)
.detail("ExistingId", itr->second.id);
}
return unknown_tenant();
}
return ErrorOr<Optional<TenantMapEntry>>(Optional<TenantMapEntry>(itr->second));
}
return Optional<TenantMapEntry>();
}
namespace CommitBatch {
struct CommitBatchContext {
@ -558,8 +587,8 @@ bool canReject(const std::vector<CommitTransactionRequest>& trs) {
for (const auto& tr : trs) {
if (tr.transaction.mutations.empty())
continue;
if (tr.transaction.mutations[0].param1.startsWith(LiteralStringRef("\xff")) ||
tr.transaction.read_conflict_ranges.empty()) {
if (!tr.tenantInfo.name.present() && (tr.transaction.mutations[0].param1.startsWith(LiteralStringRef("\xff")) ||
tr.transaction.read_conflict_ranges.empty())) {
return false;
}
}
@ -670,7 +699,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
state Span span("MP:getResolution"_loc, self->span.context);
ResolutionRequestBuilder requests(
pProxyCommitData, self->commitVersion, self->prevVersion, pProxyCommitData->version, span);
pProxyCommitData, self->commitVersion, self->prevVersion, pProxyCommitData->version.get(), span);
int conflictRangeCount = 0;
self->maxTransactionBytes = 0;
for (int t = 0; t < trs.size(); t++) {
@ -762,6 +791,7 @@ void applyMetadataEffect(CommitBatchContext* self) {
self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations,
/* pToCommit= */ nullptr,
self->forceRecovery,
/* version= */ self->commitVersion,
/* popVersion= */ 0,
/* initialCommit */ false);
}
@ -845,6 +875,13 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
int t;
for (t = 0; t < trs.size() && !self->forceRecovery; t++) {
if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) {
ErrorOr<Optional<TenantMapEntry>> result = getTenantEntry(
pProxyCommitData, trs[t].tenantInfo.name.castTo<TenantNameRef>(), trs[t].tenantInfo.tenantId, true);
if (result.isError()) {
self->committed[t] = ConflictBatch::TransactionTenantFailure;
trs[t].reply.sendError(result.getError());
} else {
self->commitCount++;
applyMetadataMutations(trs[t].spanContext,
*pProxyCommitData,
@ -853,9 +890,11 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
trs[t].transaction.mutations,
&self->toCommit,
self->forceRecovery,
self->commitVersion,
self->commitVersion + 1,
/* initialCommit= */ false);
}
}
if (self->firstStateMutations) {
ASSERT(self->committed[t] == ConflictBatch::TransactionCommitted);
self->firstStateMutations = false;
@ -875,7 +914,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
auto fcm = pProxyCommitData->logAdapter->getCommitMessage();
self->storeCommits.emplace_back(fcm, pProxyCommitData->txnStateStore->commit());
pProxyCommitData->version = self->commitVersion;
pProxyCommitData->version.set(self->commitVersion);
if (!pProxyCommitData->validState.isSet())
pProxyCommitData->validState.send(Void());
ASSERT(self->commitVersion);
@ -1090,7 +1129,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
"CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ProcessingMutations");
}
self->isMyFirstBatch = !pProxyCommitData->version;
self->isMyFirstBatch = !pProxyCommitData->version.get();
self->oldCoordinators = pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get();
assertResolutionStateMutationsSizeConsistent(self->resolution);
@ -1346,6 +1385,9 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
tr.reply.send(CommitID(self->commitVersion, t, self->metadataVersionAfter));
} else if (self->committed[t] == ConflictBatch::TransactionTooOld) {
tr.reply.sendError(transaction_too_old());
} else if (self->committed[t] == ConflictBatch::TransactionTenantFailure) {
// We already sent the error
ASSERT(tr.reply.isSet());
} else {
// If enable the option to report conflicting keys from resolvers, we send back all keyranges' indices
// through CommitID
@ -1491,8 +1533,51 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
wait(commitData->validState.getFuture());
wait(delay(0, TaskPriority::DefaultEndpoint));
state ErrorOr<Optional<TenantMapEntry>> tenantEntry;
state Version minTenantVersion =
req.minTenantVersion == latestVersion ? commitData->stats.lastCommitVersionAssigned + 1 : req.minTenantVersion;
// If a large minTenantVersion is specified, we limit how long we wait for it to be available
state Future<Void> futureVersionDelay = minTenantVersion > commitData->stats.lastCommitVersionAssigned + 1
? delay(SERVER_KNOBS->FUTURE_VERSION_DELAY)
: Never();
while (tenantEntry.isError()) {
bool finalQuery = commitData->version.get() >= minTenantVersion;
ErrorOr<Optional<TenantMapEntry>> _tenantEntry =
getTenantEntry(commitData, req.tenant, Optional<int64_t>(), finalQuery);
tenantEntry = _tenantEntry;
if (tenantEntry.isError()) {
if (finalQuery) {
req.reply.sendError(tenant_not_found());
return Void();
} else {
choose {
// Wait until we are sure that we've received metadata updates through minTenantVersion
// If latestVersion is specified, this will wait until we have definitely received
// updates through the version at the time we received the request
when(wait(commitData->version.whenAtLeast(minTenantVersion))) {}
when(wait(futureVersionDelay)) {
req.reply.sendError(future_version());
return Void();
}
}
}
}
}
std::unordered_set<UID> tssMappingsIncluded;
GetKeyServerLocationsReply rep;
if (tenantEntry.get().present()) {
rep.tenantEntry = tenantEntry.get().get();
req.begin = req.begin.withPrefix(rep.tenantEntry.prefix, req.arena);
if (req.end.present()) {
req.end = req.end.get().withPrefix(rep.tenantEntry.prefix, req.arena);
}
}
if (!req.end.present()) {
auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin)
: commitData->keyInfo.rangeContaining(req.begin);
@ -1569,7 +1654,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture());
if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) {
GetStorageServerRejoinInfoReply rep;
rep.version = commitData->version;
rep.version = commitData->version.get();
rep.tag = decodeServerTagValue(commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get());
RangeResult history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get();
for (int i = history.size() - 1; i >= 0; i--) {
@ -1958,6 +2043,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
mutations,
/* pToCommit= */ nullptr,
confChanges,
/* version= */ 0,
/* popVersion= */ 0,
/* initialCommit= */ true);
} // loop

View File

@ -41,6 +41,7 @@ struct ConflictBatch {
enum TransactionCommitResult {
TransactionConflict = 0,
TransactionTooOld,
TransactionTenantFailure,
TransactionCommitted,
};

View File

@ -26,6 +26,7 @@
#define FDBSERVER_PROXYCOMMITDATA_ACTOR_H
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
@ -96,7 +97,7 @@ struct ProxyStats {
}
explicit ProxyStats(UID id,
Version* pVersion,
NotifiedVersion* pVersion,
NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), txnCommitIn("TxnCommitIn", cc),
@ -145,7 +146,7 @@ struct ProxyStats {
LiteralStringRef("ReplyCommit"),
Histogram::Unit::microseconds)) {
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
specialCounter(cc, "Version", [pVersion]() { return pVersion->get(); });
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() {
return *commitBatchesMemBytesCountPtr;
@ -169,7 +170,7 @@ struct ProxyCommitData {
// fully committed (durable)
Version minKnownCommittedVersion; // No version smaller than this one will be used as the known committed version
// during recovery
Version version; // The version at which txnStateStore is up to date
NotifiedVersion version; // The version at which txnStateStore is up to date
Promise<Void> validState; // Set once txnStateStore and version are valid
double lastVersionTime;
KeyRangeMap<std::set<Key>> vecBackupKeys;
@ -214,6 +215,8 @@ struct ProxyCommitData {
double lastMasterReset;
double lastResolverReset;
std::map<TenantName, TenantMapEntry> tenantMap;
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
// signify they must be repopulated. We do not repopulate them immediately to avoid a slow task.

View File

@ -1811,7 +1811,7 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
if (kv.second.type() == json_spirit::int_type) {
startingConfigString += kv.first + ":=" + format("%d", kv.second.get_int());
} else if (kv.second.type() == json_spirit::str_type) {
if ("storage_migration_type" == kv.first) {
if ("storage_migration_type" == kv.first || "tenant_mode" == kv.first) {
startingConfigString += kv.first + "=" + kv.second.get_str();
} else {
startingConfigString += kv.second.get_str();

View File

@ -555,7 +555,7 @@ struct StorageServerMetrics {
req.reply.send(reply);
}
std::vector<KeyRef> getSplitPoints(KeyRangeRef range, int64_t chunkSize) {
std::vector<KeyRef> getSplitPoints(KeyRangeRef range, int64_t chunkSize, Optional<Key> prefixToRemove) {
std::vector<KeyRef> toReturn;
KeyRef beginKey = range.begin;
IndexedSet<Key, int64_t>::iterator endKey =
@ -568,7 +568,11 @@ struct StorageServerMetrics {
++endKey;
continue;
}
toReturn.push_back(*endKey);
KeyRef splitPoint = *endKey;
if (prefixToRemove.present()) {
splitPoint = splitPoint.removePrefix(prefixToRemove.get());
}
toReturn.push_back(splitPoint);
beginKey = *endKey;
endKey =
byteSample.sample.index(byteSample.sample.sumTo(byteSample.sample.lower_bound(beginKey)) + chunkSize);
@ -576,9 +580,13 @@ struct StorageServerMetrics {
return toReturn;
}
void getSplitPoints(SplitRangeRequest req) {
void getSplitPoints(SplitRangeRequest req, Optional<Key> prefix) {
SplitRangeReply reply;
std::vector<KeyRef> points = getSplitPoints(req.keys, req.chunkSize);
KeyRangeRef range = req.keys;
if (prefix.present()) {
range = range.withPrefix(prefix.get(), req.arena);
}
std::vector<KeyRef> points = getSplitPoints(range, req.chunkSize, prefix);
reply.splitPoints.append_deep(reply.splitPoints.arena(), points.data(), points.size());
req.reply.send(reply);
@ -621,8 +629,8 @@ TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/simple") {
ssm.byteSample.sample.insert(LiteralStringRef("But"), 100 * sampleUnit);
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
std::vector<KeyRef> t =
ssm.getSplitPoints(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 2000 * sampleUnit);
std::vector<KeyRef> t = ssm.getSplitPoints(
KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 2000 * sampleUnit, Optional<Key>());
ASSERT(t.size() == 1 && t[0] == LiteralStringRef("Bah"));
@ -643,8 +651,8 @@ TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/multipleReturnedPoint
ssm.byteSample.sample.insert(LiteralStringRef("But"), 100 * sampleUnit);
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
std::vector<KeyRef> t =
ssm.getSplitPoints(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 600 * sampleUnit);
std::vector<KeyRef> t = ssm.getSplitPoints(
KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 600 * sampleUnit, Optional<Key>());
ASSERT(t.size() == 3 && t[0] == LiteralStringRef("Absolute") && t[1] == LiteralStringRef("Apple") &&
t[2] == LiteralStringRef("Bah"));
@ -666,8 +674,8 @@ TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/noneSplitable") {
ssm.byteSample.sample.insert(LiteralStringRef("But"), 100 * sampleUnit);
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
std::vector<KeyRef> t =
ssm.getSplitPoints(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 10000 * sampleUnit);
std::vector<KeyRef> t = ssm.getSplitPoints(
KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 10000 * sampleUnit, Optional<Key>());
ASSERT(t.size() == 0);
@ -688,8 +696,8 @@ TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/chunkTooLarge") {
ssm.byteSample.sample.insert(LiteralStringRef("But"), 10 * sampleUnit);
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 30 * sampleUnit);
std::vector<KeyRef> t =
ssm.getSplitPoints(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 1000 * sampleUnit);
std::vector<KeyRef> t = ssm.getSplitPoints(
KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 1000 * sampleUnit, Optional<Key>());
ASSERT(t.size() == 0);

File diff suppressed because it is too large Load Diff

View File

@ -881,8 +881,14 @@ struct ConsistencyCheckWorkload : TestWorkload {
for (int i = 0; i < commitProxyInfo->size(); i++)
keyServerLocationFutures.push_back(
commitProxyInfo->get(i, &CommitProxyInterface::getKeyServersLocations)
.getReplyUnlessFailedFor(
GetKeyServerLocationsRequest(span.context, begin, end, limitKeyServers, false, Arena()),
.getReplyUnlessFailedFor(GetKeyServerLocationsRequest(span.context,
Optional<TenantNameRef>(),
begin,
end,
limitKeyServers,
false,
latestVersion,
Arena()),
2,
0));

View File

@ -198,6 +198,15 @@ ERROR( client_lib_not_found, 2120, "Client library for the given identifier not
ERROR( client_lib_not_available, 2121, "Client library exists, but is not available for download." )
ERROR( client_lib_invalid_binary, 2122, "Invalid client library binary." )
ERROR( tenant_name_required, 2130, "Tenant name must be specified to access data in the cluster" )
ERROR( tenant_not_found, 2131, "Tenant does not exist" )
ERROR( tenant_already_exists, 2132, "A tenant with the given name already exists" )
ERROR( tenant_not_empty, 2133, "Cannot delete a non-empty tenant" )
ERROR( invalid_tenant_name, 2134, "Tenant name cannot begin with \\xff");
ERROR( tenant_prefix_allocator_conflict, 2135, "The database already has keys stored at the prefix allocated for the tenant");
ERROR( tenants_disabled, 2136, "Tenants have been disabled in the cluster");
ERROR( unknown_tenant, 2137, "Tenant is not available from this server")
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )
ERROR( api_version_already_set, 2201, "API version may be set only once" )