Merge pull request #3095 from atn34/atn34/special-key-versioning

Add api versioning and opt-in cross-module reads to special key space
This commit is contained in:
Meng Xu 2020-05-09 16:21:02 -07:00 committed by GitHub
commit d9c5a0c559
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 374 additions and 132 deletions

View File

@ -14,6 +14,11 @@ For more details about API versions, see :ref:`api-versions`.
API version 630
===============
General
-------
* In previous api versions, a get range starting at ``\xff\xff/worker_interfaces`` ignored its arguments and returned keys outside the range requested. In api version 630, you can get similar behavior by reading from ``\xff\xff/worker_interfaces/`` to ``\xff\xff/worker_interfaces0`` and stripping the prefix ``\xff\xff/worker_interfaces/`` from each key in the result.
C bindings
----------

View File

@ -2755,11 +2755,14 @@ ACTOR Future<Void> addInterface( std::map<Key,std::pair<Value,ClientLeaderRegInt
state ClientLeaderRegInterface leaderInterf(workerInterf.address());
choose {
when( Optional<LeaderInfo> rep = wait( brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())) ) ) {
StringRef ip_port = kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key;
StringRef ip_port =
(kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 = StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
Key full_ip_port2 =
StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls")) ? full_ip_port2.removeSuffix(LiteralStringRef(":tls")) : full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
@ -3246,7 +3249,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if (tokencmp(tokens[0], "kill")) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {
Standalone<RangeResultRef> kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) );
Standalone<RangeResultRef> kvs = wait(
makeInterruptable(tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
std::vector<Future<Void>> addInterfs;
for( auto it : kvs ) {
@ -3443,12 +3450,16 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
getTransaction(db, tr, options, intrans);
Standalone<RangeResultRef> kvs = wait(makeInterruptable(
tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"),
LiteralStringRef("\xff\xff\xff")),
1)));
Standalone<RangeResultRef> kvs = wait(
makeInterruptable(tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
for (const auto& pair : kvs) {
auto ip_port = pair.key.endsWith(LiteralStringRef(":tls")) ? pair.key.removeSuffix(LiteralStringRef(":tls")) : pair.key;
auto ip_port = (pair.key.endsWith(LiteralStringRef(":tls"))
? pair.key.removeSuffix(LiteralStringRef(":tls"))
: pair.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
printf("%s\n", printable(ip_port).c_str());
}
continue;
@ -3467,9 +3478,10 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
getTransaction(db, tr, options, intrans);
Standalone<RangeResultRef> kvs = wait(makeInterruptable(
tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"),
LiteralStringRef("\xff\xff\xff")),
1)));
tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
char *duration_end;
int duration = std::strtol((const char*)tokens[3].begin(), &duration_end, 10);
if (!std::isspace(*duration_end)) {
@ -3481,7 +3493,10 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state std::vector<Key> all_profiler_addresses;
state std::vector<Future<ErrorOr<Void>>> all_profiler_responses;
for (const auto& pair : kvs) {
auto ip_port = pair.key.endsWith(LiteralStringRef(":tls")) ? pair.key.removeSuffix(LiteralStringRef(":tls")) : pair.key;
auto ip_port = (pair.key.endsWith(LiteralStringRef(":tls"))
? pair.key.removeSuffix(LiteralStringRef(":tls"))
: pair.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
interfaces.emplace(ip_port, BinaryReader::fromStringRef<ClientWorkerInterface>(pair.value, IncludeVersion()));
}
if (tokens.size() == 6 && tokencmp(tokens[5], "all")) {

View File

@ -306,12 +306,11 @@ public:
double detailedHealthMetricsLastUpdated;
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
std::shared_ptr<SpecialKeySpace> specialKeySpace;
std::shared_ptr<ConflictingKeysImpl> cKImpl;
std::shared_ptr<ReadConflictRangeImpl> rCRImpl;
std::shared_ptr<WriteConflictRangeImpl> wCRImpl;
std::vector<std::unique_ptr<SpecialKeyRangeBaseImpl>> specialKeySpaceModules;
std::unique_ptr<SpecialKeySpace> specialKeySpace;
void registerSpecialKeySpaceModule(std::unique_ptr<SpecialKeyRangeBaseImpl> module);
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;
};

View File

@ -493,6 +493,97 @@ ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bo
Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
return getHealthMetricsActor(this, detailed);
}
void DatabaseContext::registerSpecialKeySpaceModule(std::unique_ptr<SpecialKeyRangeBaseImpl> module) {
specialKeySpace->registerKeyRange(module->getKeyRange(), module.get());
specialKeySpaceModules.push_back(std::move(module));
}
ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces(Reference<ClusterConnectionFile> clusterFile);
ACTOR Future<Optional<Value>> getJSON(Database db);
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeBaseImpl {
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Key prefix = Key(getKeyRange().begin);
return map(getWorkerInterfaces(ryw->getDatabase()->getConnectionFile()),
[prefix = prefix, kr = KeyRange(kr)](const Standalone<RangeResultRef>& in) {
Standalone<RangeResultRef> result;
for (const auto& [k_, v] : in) {
auto k = k_.withPrefix(prefix);
if (kr.contains(k)) result.push_back_deep(result.arena(), KeyValueRef(k, v));
}
std::sort(result.begin(), result.end(), KeyValueRef::OrderByKey{});
return result;
});
} else {
return Standalone<RangeResultRef>();
}
}
explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
};
struct SingleSpecialKeyImpl : SpecialKeyRangeBaseImpl {
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override {
return map(f(ryw), [k = k](Optional<Value> v) {
Standalone<RangeResultRef> result;
if (v.present()) {
result.push_back_deep(result.arena(), KeyValueRef(k, v.get()));
}
return result;
});
}
SingleSpecialKeyImpl(KeyRef k,
const std::function<Future<Optional<Value>>(Reference<ReadYourWritesTransaction>)>& f)
: SpecialKeyRangeBaseImpl(singleKeyRange(k)), k(k), f(f) {}
private:
Key k;
std::function<Future<Optional<Value>>(Reference<ReadYourWritesTransaction>)> f;
};
struct TransactionModule : SpecialKeyRangeBaseImpl {
Future<Standalone<RangeResultRef>> getRange(Reference<ReadYourWritesTransaction> ryw,
KeyRangeRef kr) const override {
return getRange_(ryw, kr, this);
}
TransactionModule(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {
impls.emplace_back(std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
impls.emplace_back(std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
impls.emplace_back(std::make_unique<WriteConflictRangeImpl>(writeConflictRangeKeysRange));
}
private:
ACTOR static Future<Standalone<RangeResultRef>> getRange_(Reference<ReadYourWritesTransaction> ryw, KeyRangeRef kr,
const TransactionModule* self) {
state std::vector<Future<Standalone<RangeResultRef>>> futures;
futures.reserve(self->impls.size());
for (const auto& impl : self->impls) {
auto implRange = impl->getKeyRange();
auto begin = std::max(kr.begin, implRange.begin);
auto end = std::min(kr.end, implRange.end);
if (begin < end) {
futures.push_back(impl->getRange(ryw, KeyRangeRef(begin, end)));
}
}
self = nullptr;
wait(waitForAll(futures));
Standalone<RangeResultRef> result;
for (const auto& f : futures) {
result.append(result.arena(), f.get().begin(), f.get().size());
result.arena().dependsOn(f.get().arena());
}
return result;
}
std::vector<std::unique_ptr<SpecialKeyRangeBaseImpl>> impls;
};
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
TaskPriority taskID, LocalityData const& clientLocality,
@ -529,10 +620,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal),
specialKeySpace(std::make_shared<SpecialKeySpace>(normalKeys.begin, specialKeys.end)),
cKImpl(std::make_shared<ConflictingKeysImpl>(conflictingKeysRange)),
rCRImpl(std::make_shared<ReadConflictRangeImpl>(readConflictRangeKeysRange)),
wCRImpl(std::make_shared<WriteConflictRangeImpl>(writeConflictRangeKeysRange)) {
specialKeySpace(std::make_unique<SpecialKeySpace>(normalKeys.begin, specialKeys.end)) {
dbId = deterministicRandom()->randomUniqueID();
connected = clientInfo->get().proxies.size() ? Void() : clientInfo->onChange();
@ -551,10 +639,50 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(std::make_unique<TransactionModule>(
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0"))));
registerSpecialKeySpaceModule(std::make_unique<WorkerInterfacesSpecialKeyImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0"))));
registerSpecialKeySpaceModule(std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/status/json"),
[](Reference<ReadYourWritesTransaction> ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
return getJSON(ryw->getDatabase());
} else {
return Optional<Value>();
}
}));
registerSpecialKeySpaceModule(std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](Reference<ReadYourWritesTransaction> ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(ryw->getDatabase()->getConnectionFile()->getFilename());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
registerSpecialKeySpaceModule(std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/connection_string"),
[](Reference<ReadYourWritesTransaction> ryw) -> Future<Optional<Value>> {
try {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = ryw->getDatabase()->getConnectionFile();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}));
}
throttleExpirer = recurring([this](){ expireThrottles(); }, CLIENT_KNOBS->TAG_THROTTLE_EXPIRATION_INTERVAL);
specialKeySpace->registerKeyRange(conflictingKeysRange, cKImpl.get());
specialKeySpace->registerKeyRange(readConflictRangeKeysRange, rCRImpl.get());
specialKeySpace->registerKeyRange(writeConflictRangeKeysRange, wCRImpl.get());
}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),

View File

@ -1209,47 +1209,47 @@ ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces (Reference<ClusterC
Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool snapshot ) {
TEST(true);
if (key == LiteralStringRef("\xff\xff/status/json")){
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getJSON(tr.getDatabase());
if (getDatabase()->apiVersionAtLeast(630)) {
if (specialKeys.contains(key)) {
TEST(true); // Special keys get
return getDatabase()->specialKeySpace->get(Reference<ReadYourWritesTransaction>::addRef(this), key);
}
else {
} else {
if (key == LiteralStringRef("\xff\xff/status/json")) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getJSON(tr.getDatabase());
} else {
return Optional<Value>();
}
}
if (key == LiteralStringRef("\xff\xff/cluster_file_path")) {
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(tr.getDatabase()->getConnectionFile()->getFilename());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}
}
if (key == LiteralStringRef("\xff\xff/cluster_file_path")) {
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(tr.getDatabase()->getConnectionFile()->getFilename());
return output;
}
}
catch (Error &e){
return e;
}
return Optional<Value>();
}
if (key == LiteralStringRef("\xff\xff/connection_string")){
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = tr.getDatabase()->getConnectionFile();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
if (key == LiteralStringRef("\xff\xff/connection_string")) {
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = tr.getDatabase()->getConnectionFile();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
}
} catch (Error& e) {
return e;
}
return Optional<Value>();
}
catch (Error &e){
return e;
}
return Optional<Value>();
}
// special key space are only allowed to query if both begin and end are in \xff\xff, \xff\xff\xff
if (specialKeys.contains(key))
return getDatabase()->specialKeySpace->get(Reference<ReadYourWritesTransaction>::addRef(this), key);
if(checkUsedDuringCommit()) {
return used_during_commit();
}
@ -1292,20 +1292,22 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
bool snapshot,
bool reverse )
{
if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")){
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getWorkerInterfaces(tr.getDatabase()->getConnectionFile());
if (getDatabase()->apiVersionAtLeast(630)) {
if (specialKeys.contains(begin.getKey()) && end.getKey() <= specialKeys.end) {
TEST(true); // Special key space get range
return getDatabase()->specialKeySpace->getRange(Reference<ReadYourWritesTransaction>::addRef(this), begin,
end, limits, reverse);
}
else {
return Standalone<RangeResultRef>();
} else {
if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")) {
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getWorkerInterfaces(tr.getDatabase()->getConnectionFile());
} else {
return Standalone<RangeResultRef>();
}
}
}
// special key space are only allowed to query if both begin and end are in \xff\xff, \xff\xff\xff
if (specialKeys.contains(begin.getKey()) && end.getKey() <= specialKeys.end)
return getDatabase()->specialKeySpace->getRange(Reference<ReadYourWritesTransaction>::addRef(this), begin, end,
limits, reverse);
if(checkUsedDuringCommit()) {
return used_during_commit();
}
@ -1558,6 +1560,7 @@ void ReadYourWritesTransaction::getWriteConflicts( KeyRangeMap<bool> *result ) {
}
Standalone<RangeResultRef> ReadYourWritesTransaction::getReadConflictRangeIntersecting(KeyRangeRef kr) {
TEST(true); // Special keys read conflict range
ASSERT(readConflictRangeKeysRange.contains(kr));
ASSERT(!tr.options.checkWritesEnabled)
Standalone<RangeResultRef> result;
@ -1598,6 +1601,7 @@ Standalone<RangeResultRef> ReadYourWritesTransaction::getReadConflictRangeInters
}
Standalone<RangeResultRef> ReadYourWritesTransaction::getWriteConflictRangeIntersecting(KeyRangeRef kr) {
TEST(true); // Special keys write conflict range
ASSERT(writeConflictRangeKeysRange.contains(kr));
Standalone<RangeResultRef> result;
@ -1995,6 +1999,9 @@ void ReadYourWritesTransaction::setOptionImpl( FDBTransactionOptions::Option opt
options.disableUsedDuringCommitProtection = true;
break;
case FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED:
validateOptionValue(value, false);
options.specialKeySpaceRelaxed = true;
default:
break;
}

View File

@ -37,6 +37,7 @@ struct ReadYourWritesTransactionOptions {
bool nextWriteDisableConflictRange : 1;
bool debugRetryLogging : 1;
bool disableUsedDuringCommitProtection : 1;
bool specialKeySpaceRelaxed : 1;
double timeoutInSeconds;
int maxRetries;
int snapshotRywEnabled;
@ -144,6 +145,8 @@ public:
// Read from the special key space writeConflictRangeKeysRange
Standalone<RangeResultRef> getWriteConflictRangeIntersecting(KeyRangeRef kr);
bool specialKeySpaceRelaxed() const { return options.specialKeySpaceRelaxed; }
private:
friend class RYWImpl;

View File

@ -82,23 +82,49 @@ ACTOR Future<Void> SpecialKeyRangeBaseImpl::normalizeKeySelectorActor(const Spec
return Void();
}
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationActor(
SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw, KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse) {
void onModuleRead(const Reference<ReadYourWritesTransaction>& ryw, SpecialKeyRangeBaseImpl* module,
Optional<SpecialKeyRangeBaseImpl*>& lastModuleRead) {
if (ryw && !ryw->specialKeySpaceRelaxed() && lastModuleRead.present() && lastModuleRead.get() != module) {
throw special_keys_cross_module_read();
}
lastModuleRead = module;
}
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::checkModuleFound(SpecialKeySpace* pks,
Reference<ReadYourWritesTransaction> ryw,
KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse) {
std::pair<Standalone<RangeResultRef>, Optional<SpecialKeyRangeBaseImpl*>> result =
wait(SpecialKeySpace::getRangeAggregationActor(pks, ryw, begin, end, limits, reverse));
if (ryw && !ryw->specialKeySpaceRelaxed()) {
auto* module = result.second.orDefault(nullptr);
if (module == nullptr) {
throw special_keys_no_module_found();
}
}
return result.first;
}
ACTOR Future<std::pair<Standalone<RangeResultRef>, Optional<SpecialKeyRangeBaseImpl*>>>
SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw,
KeySelector begin, KeySelector end, GetRangeLimits limits, bool reverse) {
// This function handles ranges which cover more than one keyrange and aggregates all results
// KeySelector, GetRangeLimits and reverse are all handled here
state Standalone<RangeResultRef> result;
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator iter;
state int actualBeginOffset;
state int actualEndOffset;
state Optional<SpecialKeyRangeBaseImpl*> lastModuleRead;
// make sure offset == 1
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Iterator beginIter =
pks->impls.rangeContaining(begin.getKey());
while ((begin.offset < 1 && beginIter != pks->impls.ranges().begin()) ||
(begin.offset > 1 && beginIter != pks->impls.ranges().end())) {
if (beginIter->value() != nullptr)
onModuleRead(ryw, beginIter->value(), lastModuleRead);
if (beginIter->value() != nullptr) {
wait(beginIter->value()->normalizeKeySelectorActor(beginIter->value(), ryw, &begin));
}
begin.offset < 1 ? --beginIter : ++beginIter;
}
@ -123,7 +149,10 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
pks->impls.rangeContaining(end.getKey());
while ((end.offset < 1 && endIter != pks->impls.ranges().begin()) ||
(end.offset > 1 && endIter != pks->impls.ranges().end())) {
if (endIter->value() != nullptr) wait(endIter->value()->normalizeKeySelectorActor(endIter->value(), ryw, &end));
onModuleRead(ryw, endIter->value(), lastModuleRead);
if (endIter->value() != nullptr) {
wait(endIter->value()->normalizeKeySelectorActor(endIter->value(), ryw, &end));
}
end.offset < 1 ? --endIter : ++endIter;
}
@ -148,12 +177,12 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
// return if range inverted
if (actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
TEST(true);
return RangeResultRef(false, false);
return std::make_pair(RangeResultRef(false, false), lastModuleRead);
}
// If touches begin or end, return with readToBegin and readThroughEnd flags
if (beginIter == pks->impls.ranges().end() || endIter == pks->impls.ranges().begin()) {
TEST(true);
return result;
return std::make_pair(result, lastModuleRead);
}
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Ranges ranges =
pks->impls.intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
@ -163,6 +192,7 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
if (reverse) {
while (iter != ranges.begin()) {
--iter;
onModuleRead(ryw, iter->value(), lastModuleRead);
if (iter->value() == nullptr) continue;
KeyRangeRef kr = iter->range();
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
@ -179,12 +209,13 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
if (limits.isReached()) {
result.more = true;
result.readToBegin = false;
return result;
return std::make_pair(result, lastModuleRead);
};
}
}
} else {
for (iter = ranges.begin(); iter != ranges.end(); ++iter) {
onModuleRead(ryw, iter->value(), lastModuleRead);
if (iter->value() == nullptr) continue;
KeyRangeRef kr = iter->range();
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
@ -201,12 +232,12 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
if (limits.isReached()) {
result.more = true;
result.readThroughEnd = false;
return result;
return std::make_pair(result, lastModuleRead);
};
}
}
}
return result;
return std::make_pair(result, lastModuleRead);
}
Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(Reference<ReadYourWritesTransaction> ryw,
@ -222,7 +253,7 @@ Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(Reference<ReadYourW
begin.removeOrEqual(begin.arena());
end.removeOrEqual(end.arena());
return getRangeAggregationActor(this, ryw, begin, end, limits, reverse);
return checkModuleFound(this, ryw, begin, end, limits, reverse);
}
ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw,

View File

@ -44,6 +44,8 @@ public:
ACTOR Future<Void> normalizeKeySelectorActor(const SpecialKeyRangeBaseImpl* pkrImpl,
Reference<ReadYourWritesTransaction> ryw, KeySelector* ks);
virtual ~SpecialKeyRangeBaseImpl() {}
protected:
KeyRange range; // underlying key range for this function
};
@ -71,12 +73,15 @@ public:
}
private:
ACTOR Future<Optional<Value>> getActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw, KeyRef key);
ACTOR static Future<Optional<Value>> getActor(SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw, KeyRef key);
ACTOR Future<Standalone<RangeResultRef>> getRangeAggregationActor(SpecialKeySpace* pks,
Reference<ReadYourWritesTransaction> ryw,
KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse);
ACTOR static Future<Standalone<RangeResultRef>> checkModuleFound(SpecialKeySpace* pks,
Reference<ReadYourWritesTransaction> ryw,
KeySelector begin, KeySelector end, GetRangeLimits limits,
bool reverse);
ACTOR static Future<std::pair<Standalone<RangeResultRef>, Optional<SpecialKeyRangeBaseImpl*>>> getRangeAggregationActor(
SpecialKeySpace* pks, Reference<ReadYourWritesTransaction> ryw, KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse);
KeyRangeMap<SpecialKeyRangeBaseImpl*> impls;
KeyRange range;

View File

@ -262,6 +262,8 @@ description is not currently required but encouraged.
description="This option should only be used by tools which change the database configuration." />
<Option name="report_conflicting_keys" code="712"
description="The transaction can retrieve keys that are conflicting with other transactions." />
<Option name="special_key_space_relaxed" code="713"
description="By default, the special key space will only allow users to read from exactly one module (a subspace in the special key space). Use this option to allow reading from zero or more modules. Users who set this option should be prepared for new modules, which may have different behaviors than the modules they're currently reading. For example, a new module might block or return an error." />
<Option name="tag" code="800" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."
description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="auto_throttle_tag" code="801" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."

View File

@ -112,6 +112,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
KeyRange conflictRange;
unsigned int operationId;
int64_t maximumTotalData;
bool specialKeysRelaxed;
bool success;
@ -127,6 +128,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
adjacentKeys = deterministicRandom()->coinflip();
useSystemKeys = deterministicRandom()->coinflip();
initialKeyDensity = deterministicRandom()->random01(); // This fraction of keys are present before the first transaction (and after an unknown result)
specialKeysRelaxed = deterministicRandom()->coinflip();
// See https://github.com/apple/foundationdb/issues/2424
if (BUGGIFY) {
@ -151,13 +153,14 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
maxClearSize = 1<<deterministicRandom()->randomInt(0, 20);
conflictRange = KeyRangeRef( LiteralStringRef("\xfe"), LiteralStringRef("\xfe\x00") );
TraceEvent("FuzzApiCorrectnessConfiguration")
.detail("Nodes", nodes)
.detail("InitialKeyDensity", initialKeyDensity)
.detail("AdjacentKeys", adjacentKeys)
.detail("ValueSizeMin", valueSizeRange.first)
.detail("ValueSizeRange", valueSizeRange.second)
.detail("MaxClearSize", maxClearSize)
.detail("UseSystemKeys", useSystemKeys);
.detail("Nodes", nodes)
.detail("InitialKeyDensity", initialKeyDensity)
.detail("AdjacentKeys", adjacentKeys)
.detail("ValueSizeMin", valueSizeRange.first)
.detail("ValueSizeRange", valueSizeRange.second)
.detail("MaxClearSize", maxClearSize)
.detail("UseSystemKeys", useSystemKeys)
.detail("SpecialKeysRelaxed", specialKeysRelaxed);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketSent").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketReceived").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
@ -219,6 +222,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
}
if( self->useSystemKeys )
tr->setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
if (self->specialKeysRelaxed) tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED);
int end = std::min(self->nodes, i+keysPerBatch );
tr->clear( KeyRangeRef( self->getKeyForIndex(i), self->getKeyForIndex(end) ) );
@ -276,6 +280,9 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
if( self->useSystemKeys ) {
tr->setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
}
if (self->specialKeysRelaxed) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED);
}
tr->addWriteConflictRange( self->conflictRange );
try {
@ -655,13 +662,18 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair( error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf(limit < 0) ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::Possible ),
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
((keysel1.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(keysel2.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange) ),
std::make_pair( error_code_accessed_unreadable, ExceptionContract::Possible )
std::make_pair(error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf(limit < 0)),
std::make_pair(error_code_client_invalid_operation, ExceptionContract::Possible),
std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
((keysel1.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(keysel2.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange)),
std::make_pair(error_code_special_keys_cross_module_read,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_special_keys_no_module_found,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_accessed_unreadable, ExceptionContract::Possible)
};
}
@ -688,13 +700,19 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair( error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf( !limits.isReached() && !limits.isValid()) ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::Possible ),
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
((keysel1.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(keysel2.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange) ),
std::make_pair( error_code_accessed_unreadable, ExceptionContract::Possible )
std::make_pair(error_code_range_limits_invalid,
ExceptionContract::possibleButRequiredIf(!limits.isReached() && !limits.isValid())),
std::make_pair(error_code_client_invalid_operation, ExceptionContract::Possible),
std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
((keysel1.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(keysel2.getKey() > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange)),
std::make_pair(error_code_special_keys_cross_module_read,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_special_keys_no_module_found,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_accessed_unreadable, ExceptionContract::Possible)
};
}
@ -732,14 +750,19 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
contract = {
std::make_pair( error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2) ),
std::make_pair( error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf(limit < 0) ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::Possible ),
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
((key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)))
&& !isSpecialKeyRange) ),
std::make_pair( error_code_accessed_unreadable, ExceptionContract::Possible )
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),
std::make_pair(error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf(limit < 0)),
std::make_pair(error_code_client_invalid_operation, ExceptionContract::Possible),
std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
((key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange)),
std::make_pair(error_code_special_keys_cross_module_read,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_special_keys_no_module_found,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_accessed_unreadable, ExceptionContract::Possible)
};
}
@ -767,14 +790,20 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
contract = {
std::make_pair( error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2) ),
std::make_pair( error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf( !limits.isReached() && !limits.isValid()) ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::Possible ),
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
((key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange) ),
std::make_pair( error_code_accessed_unreadable, ExceptionContract::Possible )
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),
std::make_pair(error_code_range_limits_invalid,
ExceptionContract::possibleButRequiredIf(!limits.isReached() && !limits.isValid())),
std::make_pair(error_code_client_invalid_operation, ExceptionContract::Possible),
std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
((key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange)),
std::make_pair(error_code_special_keys_cross_module_read,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_special_keys_no_module_found,
ExceptionContract::possibleIf(isSpecialKeyRange && !workload->specialKeysRelaxed)),
std::make_pair(error_code_accessed_unreadable, ExceptionContract::Possible)
};
}

View File

@ -75,9 +75,10 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
Future<Void> _setup(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
cx->specialKeySpace = std::make_shared<SpecialKeySpace>();
cx->specialKeySpace = std::make_unique<SpecialKeySpace>();
if (self->clientId == 0) {
self->ryw = Reference(new ReadYourWritesTransaction(cx));
self->ryw->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED);
self->ryw->setVersion(100);
self->ryw->clear(normalKeys);
// generate key ranges
@ -269,6 +270,13 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
}
TEST(true); // Read write conflict range of committed transaction
}
try {
wait(success(tx->get(LiteralStringRef("\xff\xff/1314109/i_hope_this_isn't_registered"))));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_special_keys_no_module_found);
}
for (int i = 0; i < self->conflictRangeSizeFactor; ++i) {
GetRangeLimits limit;
KeySelector begin;
@ -276,7 +284,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
loop {
begin = firstGreaterOrEqual(deterministicRandom()->randomChoice(keys));
end = firstGreaterOrEqual(deterministicRandom()->randomChoice(keys));
if (begin.getKey() <= end.getKey()) break;
if (begin.getKey() < end.getKey()) break;
}
bool reverse = deterministicRandom()->coinflip();

View File

@ -30,13 +30,17 @@ struct SuspendProcessesWorkload : TestWorkload {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<RangeResultRef> kvs = wait(tr.getRange(
KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1));
Standalone<RangeResultRef> kvs =
wait(tr.getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY));
ASSERT(!kvs.more);
std::vector<Standalone<StringRef>> suspendProcessInterfaces;
for (auto it : kvs) {
auto ip_port = it.key.endsWith(LiteralStringRef(":tls"))
? it.key.removeSuffix(LiteralStringRef(":tls"))
: it.key;
auto ip_port =
(it.key.endsWith(LiteralStringRef(":tls")) ? it.key.removeSuffix(LiteralStringRef(":tls"))
: it.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
for (auto& killProcess : self->prefixSuspendProcesses) {
if (boost::starts_with(ip_port.toString().c_str(), killProcess.c_str())) {
suspendProcessInterfaces.push_back(it.value);

View File

@ -89,13 +89,17 @@ struct TriggerRecoveryLoopWorkload : TestWorkload {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<RangeResultRef> kvs = wait(tr.getRange(
KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1));
Standalone<RangeResultRef> kvs =
wait(tr.getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY));
ASSERT(!kvs.more);
std::map<Key, Value> address_interface;
for (auto it : kvs) {
auto ip_port = it.key.endsWith(LiteralStringRef(":tls"))
? it.key.removeSuffix(LiteralStringRef(":tls"))
: it.key;
auto ip_port =
(it.key.endsWith(LiteralStringRef(":tls")) ? it.key.removeSuffix(LiteralStringRef(":tls"))
: it.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
address_interface[ip_port] = it.value;
}
for (auto it : address_interface) {

View File

@ -152,6 +152,8 @@ ERROR( unsupported_operation, 2108, "Operation is not supported" )
ERROR( too_many_tags, 2109, "Too many tags set on transaction" )
ERROR( tag_too_long, 2110, "Tag set on transaction is too long" )
ERROR( too_many_tag_throttles, 2111, "Too many tag throttles have been created" )
ERROR( special_keys_cross_module_read, 2112, "Special key space range read crosses modules. Refer to the `special_key_space_relaxed' transaction option for more details." )
ERROR( special_keys_no_module_found, 2113, "Special key space range read does not intersect a module. Refer to the `special_key_space_relaxed' transaction option for more details." )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )