2020-04-09 05:50:55 +08:00
|
|
|
/*
|
|
|
|
* SpecialKeySpace.actor.cpp
|
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
|
|
|
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2020-03-04 10:35:24 +08:00
|
|
|
#include "fdbclient/SpecialKeySpace.actor.h"
|
|
|
|
#include "flow/UnitTest.h"
|
|
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
|
|
|
|
2020-06-16 03:47:27 +08:00
|
|
|
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::readModuleToBoundary = {
|
2020-05-13 17:28:04 +08:00
|
|
|
{ SpecialKeySpace::MODULE::TRANSACTION,
|
|
|
|
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0")) },
|
|
|
|
{ SpecialKeySpace::MODULE::WORKERINTERFACE,
|
|
|
|
KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0")) },
|
|
|
|
{ SpecialKeySpace::MODULE::STATUSJSON, singleKeyRange(LiteralStringRef("\xff\xff/status/json")) },
|
|
|
|
{ SpecialKeySpace::MODULE::CONNECTIONSTRING, singleKeyRange(LiteralStringRef("\xff\xff/connection_string")) },
|
2020-05-19 01:38:23 +08:00
|
|
|
{ SpecialKeySpace::MODULE::CLUSTERFILEPATH, singleKeyRange(LiteralStringRef("\xff\xff/cluster_file_path")) },
|
2020-05-19 05:23:17 +08:00
|
|
|
{ SpecialKeySpace::MODULE::METRICS,
|
2020-06-15 13:29:44 +08:00
|
|
|
KeyRangeRef(LiteralStringRef("\xff\xff/metrics/"), LiteralStringRef("\xff\xff/metrics0")) },
|
|
|
|
{ SpecialKeySpace::MODULE::MANAGEMENT,
|
|
|
|
KeyRangeRef(LiteralStringRef("\xff\xff/conf/"), LiteralStringRef("\xff\xff/conf0")) }
|
2020-05-13 17:28:04 +08:00
|
|
|
};
|
|
|
|
|
2020-05-12 15:42:43 +08:00
|
|
|
// This function will move the given KeySelector as far as possible to the standard form:
|
2020-03-04 10:35:24 +08:00
|
|
|
// orEqual == false && offset == 1 (Standard form)
|
2020-05-12 15:42:43 +08:00
|
|
|
// If the corresponding key is not in the underlying key range, it will move over the range
|
2020-06-16 06:31:55 +08:00
|
|
|
// The cache object is used to cache the first read result from the rpc call during the key resolution,
|
|
|
|
// then when we need to do key resolution or result filtering,
|
|
|
|
// we, instead of rpc call, read from this cache object have consistent results
|
2020-06-16 03:47:27 +08:00
|
|
|
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeReadImpl* skrImpl, ReadYourWritesTransaction* ryw,
|
2020-06-12 03:22:19 +08:00
|
|
|
KeySelector* ks, Optional<Standalone<RangeResultRef>>* cache) {
|
2020-03-04 10:35:24 +08:00
|
|
|
ASSERT(!ks->orEqual); // should be removed before calling
|
|
|
|
ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized
|
|
|
|
|
2020-05-12 15:42:43 +08:00
|
|
|
state Key startKey(skrImpl->getKeyRange().begin);
|
|
|
|
state Key endKey(skrImpl->getKeyRange().end);
|
2020-06-12 03:22:19 +08:00
|
|
|
state Standalone<RangeResultRef> result;
|
2020-03-04 10:35:24 +08:00
|
|
|
|
|
|
|
if (ks->offset < 1) {
|
|
|
|
// less than the given key
|
2020-05-15 17:04:15 +08:00
|
|
|
if (skrImpl->getKeyRange().contains(ks->getKey())) endKey = ks->getKey();
|
2020-03-04 10:35:24 +08:00
|
|
|
} else {
|
|
|
|
// greater than the given key
|
2020-05-12 15:42:43 +08:00
|
|
|
if (skrImpl->getKeyRange().contains(ks->getKey())) startKey = ks->getKey();
|
2020-03-04 10:35:24 +08:00
|
|
|
}
|
2020-05-15 17:04:15 +08:00
|
|
|
ASSERT(startKey < endKey); // Note : startKey never equals endKey here
|
2020-03-04 10:35:24 +08:00
|
|
|
|
2020-04-08 03:49:46 +08:00
|
|
|
TraceEvent(SevDebug, "NormalizeKeySelector")
|
2020-03-04 10:35:24 +08:00
|
|
|
.detail("OriginalKey", ks->getKey())
|
|
|
|
.detail("OriginalOffset", ks->offset)
|
2020-05-12 15:42:43 +08:00
|
|
|
.detail("SpecialKeyRangeStart", skrImpl->getKeyRange().begin)
|
|
|
|
.detail("SpecialKeyRangeEnd", skrImpl->getKeyRange().end);
|
2020-03-04 10:35:24 +08:00
|
|
|
|
2020-06-12 03:22:19 +08:00
|
|
|
if (skrImpl->isAsync()) {
|
|
|
|
const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast<const SpecialKeyRangeAsyncImpl*>(skrImpl);
|
|
|
|
Standalone<RangeResultRef> result_ = wait(ptr->getRange(ryw, KeyRangeRef(startKey, endKey), cache));
|
|
|
|
result = result_;
|
|
|
|
} else {
|
|
|
|
Standalone<RangeResultRef> result_ = wait(skrImpl->getRange(ryw, KeyRangeRef(startKey, endKey)));
|
|
|
|
result = result_;
|
|
|
|
}
|
|
|
|
|
2020-03-24 14:55:56 +08:00
|
|
|
if (result.size() == 0) {
|
2020-05-15 08:30:48 +08:00
|
|
|
TraceEvent(SevDebug, "ZeroElementsIntheRange").detail("Start", startKey).detail("End", endKey);
|
2020-03-24 14:55:56 +08:00
|
|
|
return Void();
|
|
|
|
}
|
2020-04-01 00:33:25 +08:00
|
|
|
// Note : KeySelector::setKey has byte limit according to the knobs, customize it if needed
|
2020-03-04 10:35:24 +08:00
|
|
|
if (ks->offset < 1) {
|
|
|
|
if (result.size() >= 1 - ks->offset) {
|
|
|
|
ks->setKey(KeyRef(ks->arena(), result[result.size() - (1 - ks->offset)].key));
|
|
|
|
ks->offset = 1;
|
|
|
|
} else {
|
|
|
|
ks->setKey(KeyRef(ks->arena(), result[0].key));
|
|
|
|
ks->offset += result.size();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (result.size() >= ks->offset) {
|
|
|
|
ks->setKey(KeyRef(ks->arena(), result[ks->offset - 1].key));
|
|
|
|
ks->offset = 1;
|
|
|
|
} else {
|
2020-06-24 02:21:03 +08:00
|
|
|
ks->setKey(KeyRef(ks->arena(), keyAfter(result[result.size() - 1].key))); // TODO : the keyAfter will just return if key == \xff\xff
|
2020-03-04 10:35:24 +08:00
|
|
|
ks->offset -= result.size();
|
|
|
|
}
|
|
|
|
}
|
2020-04-08 03:49:46 +08:00
|
|
|
TraceEvent(SevDebug, "NormalizeKeySelector")
|
2020-03-04 10:35:24 +08:00
|
|
|
.detail("NormalizedKey", ks->getKey())
|
|
|
|
.detail("NormalizedOffset", ks->offset)
|
2020-05-12 15:42:43 +08:00
|
|
|
.detail("SpecialKeyRangeStart", skrImpl->getKeyRange().begin)
|
|
|
|
.detail("SpecialKeyRangeEnd", skrImpl->getKeyRange().end);
|
2020-03-04 10:35:24 +08:00
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-05-12 15:42:43 +08:00
|
|
|
// This function will normalize the given KeySelector to a standard KeySelector:
|
|
|
|
// orEqual == false && offset == 1 (Standard form)
|
|
|
|
// If the corresponding key is outside the whole space, it will move to the begin or the end
|
|
|
|
// It does have overhead here since we query all keys twice in the worst case.
|
|
|
|
// However, moving the KeySelector while handling other parameters like limits makes the code much more complex and hard
|
|
|
|
// to maintain; Thus, separate each part to make the code easy to understand and more compact
|
2020-06-18 02:28:52 +08:00
|
|
|
// Boundary is the range of the legal key space, which, by default is the range of the module
|
2020-06-18 03:47:54 +08:00
|
|
|
// And (\xff\xff, \xff\xff\xff) if SPECIAL_KEY_SPACE_RELAXED is turned on
|
2020-05-21 04:55:07 +08:00
|
|
|
ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeySelector* ks,
|
2020-06-18 01:33:52 +08:00
|
|
|
KeyRangeRef boundary, int* actualOffset,
|
2020-06-12 03:22:19 +08:00
|
|
|
Standalone<RangeResultRef>* result,
|
|
|
|
Optional<Standalone<RangeResultRef>>* cache) {
|
2020-06-15 14:11:57 +08:00
|
|
|
state RangeMap<Key, SpecialKeyRangeReadImpl*, KeyRangeRef>::Iterator iter =
|
2020-06-16 03:47:27 +08:00
|
|
|
ks->offset < 1 ? sks->getReadImpls().rangeContainingKeyBefore(ks->getKey())
|
|
|
|
: sks->getReadImpls().rangeContaining(ks->getKey());
|
2020-06-18 03:47:54 +08:00
|
|
|
while ((ks->offset < 1 && iter->begin() > boundary.begin) || (ks->offset > 1 && iter->begin() < boundary.end)) {
|
2020-05-12 15:42:43 +08:00
|
|
|
if (iter->value() != nullptr) {
|
2020-06-12 03:22:19 +08:00
|
|
|
wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks, cache));
|
2020-05-12 15:42:43 +08:00
|
|
|
}
|
|
|
|
ks->offset < 1 ? --iter : ++iter;
|
|
|
|
}
|
|
|
|
*actualOffset = ks->offset;
|
2020-06-18 03:47:54 +08:00
|
|
|
if (iter->begin() == boundary.begin || iter->begin() == boundary.end) ks->setKey(iter->begin());
|
2020-05-12 15:42:43 +08:00
|
|
|
|
|
|
|
if (!ks->isFirstGreaterOrEqual()) {
|
2020-06-18 01:33:52 +08:00
|
|
|
// The Key Selector clamps up to the legal key space
|
|
|
|
TraceEvent(SevInfo, "ReadToBoundary")
|
2020-05-12 15:42:43 +08:00
|
|
|
.detail("TerminateKey", ks->getKey())
|
|
|
|
.detail("TerminateOffset", ks->offset);
|
2020-06-18 01:33:52 +08:00
|
|
|
if (ks->offset < 1)
|
2020-05-12 15:42:43 +08:00
|
|
|
result->readToBegin = true;
|
|
|
|
else
|
|
|
|
result->readThroughEnd = true;
|
|
|
|
ks->offset = 1;
|
|
|
|
}
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
2020-06-18 02:13:55 +08:00
|
|
|
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::checkRYWValid(SpecialKeySpace* sks,
|
|
|
|
ReadYourWritesTransaction* ryw,
|
|
|
|
KeySelector begin, KeySelector end,
|
|
|
|
GetRangeLimits limits, bool reverse) {
|
|
|
|
ASSERT(ryw);
|
2020-05-21 04:55:07 +08:00
|
|
|
choose {
|
2020-06-18 01:33:52 +08:00
|
|
|
when(Standalone<RangeResultRef> result =
|
2020-05-21 04:55:07 +08:00
|
|
|
wait(SpecialKeySpace::getRangeAggregationActor(sks, ryw, begin, end, limits, reverse))) {
|
2020-06-18 01:33:52 +08:00
|
|
|
return result;
|
2020-05-15 14:49:57 +08:00
|
|
|
}
|
2020-06-18 02:13:55 +08:00
|
|
|
when(wait(ryw->resetFuture())) { throw internal_error(); }
|
2020-05-06 04:07:09 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-18 01:33:52 +08:00
|
|
|
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationActor(SpecialKeySpace* sks,
|
|
|
|
ReadYourWritesTransaction* ryw,
|
|
|
|
KeySelector begin, KeySelector end,
|
|
|
|
GetRangeLimits limits,
|
|
|
|
bool reverse) {
|
2020-03-04 10:35:24 +08:00
|
|
|
// This function handles ranges which cover more than one keyrange and aggregates all results
|
|
|
|
// KeySelector, GetRangeLimits and reverse are all handled here
|
2020-03-31 14:27:09 +08:00
|
|
|
state Standalone<RangeResultRef> result;
|
2020-06-12 03:22:19 +08:00
|
|
|
state Standalone<RangeResultRef> pairs;
|
2020-06-16 03:47:27 +08:00
|
|
|
state RangeMap<Key, SpecialKeyRangeReadImpl*, KeyRangeRef>::Iterator iter;
|
2020-03-31 14:27:09 +08:00
|
|
|
state int actualBeginOffset;
|
|
|
|
state int actualEndOffset;
|
2020-06-18 01:33:52 +08:00
|
|
|
state KeyRangeRef moduleBoundary;
|
2020-06-12 03:22:19 +08:00
|
|
|
// used to cache result from potential first read
|
|
|
|
state Optional<Standalone<RangeResultRef>> cache;
|
2020-04-03 15:26:11 +08:00
|
|
|
|
2020-06-18 03:47:54 +08:00
|
|
|
if (ryw->specialKeySpaceRelaxed()) {
|
|
|
|
moduleBoundary = sks->range;
|
|
|
|
} else {
|
2020-06-18 01:33:52 +08:00
|
|
|
auto beginIter = sks->getModules().rangeContaining(begin.getKey());
|
2020-06-18 03:47:54 +08:00
|
|
|
if (beginIter->begin() <= end.getKey() && end.getKey() <= beginIter->end()) {
|
2020-06-18 01:33:52 +08:00
|
|
|
if (beginIter->value() == SpecialKeySpace::MODULE::UNKNOWN)
|
|
|
|
throw special_keys_no_module_found();
|
|
|
|
else
|
|
|
|
moduleBoundary = beginIter->range();
|
|
|
|
} else {
|
|
|
|
TraceEvent(SevInfo, "SpecialKeyCrossModuleRead")
|
|
|
|
.detail("Begin", begin.toString())
|
|
|
|
.detail("End", end.toString())
|
|
|
|
.detail("BoundaryBegin", beginIter->begin())
|
|
|
|
.detail("BoundaryEnd", beginIter->end());
|
|
|
|
throw special_keys_cross_module_read();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
wait(normalizeKeySelectorActor(sks, ryw, &begin, moduleBoundary, &actualBeginOffset, &result, &cache));
|
|
|
|
wait(normalizeKeySelectorActor(sks, ryw, &end, moduleBoundary, &actualEndOffset, &result, &cache));
|
2020-03-31 14:27:09 +08:00
|
|
|
// Handle all corner cases like what RYW does
|
2020-03-04 10:35:24 +08:00
|
|
|
// return if range inverted
|
2020-03-31 14:27:09 +08:00
|
|
|
if (actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
|
2020-03-04 10:35:24 +08:00
|
|
|
TEST(true);
|
2020-06-18 01:33:52 +08:00
|
|
|
return RangeResultRef(false, false);
|
2020-03-31 14:27:09 +08:00
|
|
|
}
|
2020-03-31 16:36:07 +08:00
|
|
|
// If touches begin or end, return with readToBegin and readThroughEnd flags
|
2020-06-18 01:33:52 +08:00
|
|
|
if (begin.getKey() == moduleBoundary.end || end.getKey() == moduleBoundary.begin) {
|
2020-03-31 16:36:07 +08:00
|
|
|
TEST(true);
|
2020-06-18 01:33:52 +08:00
|
|
|
return result;
|
2020-03-04 10:35:24 +08:00
|
|
|
}
|
2020-06-16 03:47:27 +08:00
|
|
|
state RangeMap<Key, SpecialKeyRangeReadImpl*, KeyRangeRef>::Ranges ranges =
|
2020-06-15 14:11:57 +08:00
|
|
|
sks->getReadImpls().intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
|
2020-03-04 10:35:24 +08:00
|
|
|
// TODO : workaround to write this two together to make the code compact
|
|
|
|
// The issue here is boost::iterator_range<> doest not provide rbegin(), rend()
|
|
|
|
iter = reverse ? ranges.end() : ranges.begin();
|
|
|
|
if (reverse) {
|
|
|
|
while (iter != ranges.begin()) {
|
|
|
|
--iter;
|
|
|
|
if (iter->value() == nullptr) continue;
|
|
|
|
KeyRangeRef kr = iter->range();
|
|
|
|
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
|
|
|
|
KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end;
|
2020-06-12 03:22:19 +08:00
|
|
|
if (iter->value()->isAsync() && cache.present()) {
|
|
|
|
const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast<const SpecialKeyRangeAsyncImpl*>(iter->value());
|
|
|
|
Standalone<RangeResultRef> pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache));
|
|
|
|
pairs = pairs_;
|
|
|
|
} else {
|
|
|
|
Standalone<RangeResultRef> pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
|
|
|
|
pairs = pairs_;
|
|
|
|
}
|
2020-04-07 15:08:47 +08:00
|
|
|
result.arena().dependsOn(pairs.arena());
|
2020-03-04 10:35:24 +08:00
|
|
|
// limits handler
|
|
|
|
for (int i = pairs.size() - 1; i >= 0; --i) {
|
2020-04-09 03:43:25 +08:00
|
|
|
result.push_back(result.arena(), pairs[i]);
|
2020-04-09 04:38:12 +08:00
|
|
|
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it's still
|
|
|
|
// returned. In other words, the total size of the returned value (less the last entry) will be less
|
|
|
|
// than byteLimit
|
2020-04-09 03:43:25 +08:00
|
|
|
limits.decrement(pairs[i]);
|
2020-03-31 14:27:09 +08:00
|
|
|
if (limits.isReached()) {
|
|
|
|
result.more = true;
|
|
|
|
result.readToBegin = false;
|
2020-06-18 01:33:52 +08:00
|
|
|
return result;
|
2020-03-31 14:27:09 +08:00
|
|
|
};
|
2020-03-04 10:35:24 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
for (iter = ranges.begin(); iter != ranges.end(); ++iter) {
|
|
|
|
if (iter->value() == nullptr) continue;
|
|
|
|
KeyRangeRef kr = iter->range();
|
|
|
|
KeyRef keyStart = kr.contains(begin.getKey()) ? begin.getKey() : kr.begin;
|
|
|
|
KeyRef keyEnd = kr.contains(end.getKey()) ? end.getKey() : kr.end;
|
2020-06-12 03:22:19 +08:00
|
|
|
if (iter->value()->isAsync() && cache.present()) {
|
|
|
|
const SpecialKeyRangeAsyncImpl* ptr = dynamic_cast<const SpecialKeyRangeAsyncImpl*>(iter->value());
|
|
|
|
Standalone<RangeResultRef> pairs_ = wait(ptr->getRange(ryw, KeyRangeRef(keyStart, keyEnd), &cache));
|
|
|
|
pairs = pairs_;
|
|
|
|
} else {
|
|
|
|
Standalone<RangeResultRef> pairs_ = wait(iter->value()->getRange(ryw, KeyRangeRef(keyStart, keyEnd)));
|
|
|
|
pairs = pairs_;
|
|
|
|
}
|
2020-04-07 15:08:47 +08:00
|
|
|
result.arena().dependsOn(pairs.arena());
|
2020-03-04 10:35:24 +08:00
|
|
|
// limits handler
|
2020-03-31 14:27:09 +08:00
|
|
|
for (int i = 0; i < pairs.size(); ++i) {
|
2020-04-09 03:43:25 +08:00
|
|
|
result.push_back(result.arena(), pairs[i]);
|
2020-04-09 04:38:12 +08:00
|
|
|
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it's still
|
|
|
|
// returned. In other words, the total size of the returned value (less the last entry) will be less
|
|
|
|
// than byteLimit
|
2020-04-09 03:43:25 +08:00
|
|
|
limits.decrement(pairs[i]);
|
2020-03-31 14:27:09 +08:00
|
|
|
if (limits.isReached()) {
|
|
|
|
result.more = true;
|
|
|
|
result.readThroughEnd = false;
|
2020-06-18 01:33:52 +08:00
|
|
|
return result;
|
2020-03-31 14:27:09 +08:00
|
|
|
};
|
2020-03-04 10:35:24 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-06-18 01:33:52 +08:00
|
|
|
return result;
|
2020-03-04 10:35:24 +08:00
|
|
|
}
|
|
|
|
|
2020-05-21 04:55:07 +08:00
|
|
|
Future<Standalone<RangeResultRef>> SpecialKeySpace::getRange(ReadYourWritesTransaction* ryw, KeySelector begin,
|
|
|
|
KeySelector end, GetRangeLimits limits, bool reverse) {
|
2020-03-04 10:35:24 +08:00
|
|
|
// validate limits here
|
|
|
|
if (!limits.isValid()) return range_limits_invalid();
|
|
|
|
if (limits.isReached()) {
|
|
|
|
TEST(true); // read limit 0
|
|
|
|
return Standalone<RangeResultRef>();
|
|
|
|
}
|
2020-03-31 16:44:02 +08:00
|
|
|
// make sure orEqual == false
|
|
|
|
begin.removeOrEqual(begin.arena());
|
|
|
|
end.removeOrEqual(end.arena());
|
2020-04-07 13:09:17 +08:00
|
|
|
|
2020-05-15 17:04:15 +08:00
|
|
|
if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) {
|
2020-05-15 14:49:57 +08:00
|
|
|
TEST(true); // range inverted
|
|
|
|
return Standalone<RangeResultRef>();
|
|
|
|
}
|
2020-05-15 08:30:48 +08:00
|
|
|
|
2020-06-18 02:13:55 +08:00
|
|
|
return checkRYWValid(this, ryw, begin, end, limits, reverse);
|
2020-03-04 10:35:24 +08:00
|
|
|
}
|
|
|
|
|
2020-05-21 04:55:07 +08:00
|
|
|
ACTOR Future<Optional<Value>> SpecialKeySpace::getActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw,
|
2020-04-09 04:38:12 +08:00
|
|
|
KeyRef key) {
|
2020-03-04 10:35:24 +08:00
|
|
|
// use getRange to workaround this
|
2020-04-01 00:33:25 +08:00
|
|
|
Standalone<RangeResultRef> result =
|
2020-05-12 15:42:43 +08:00
|
|
|
wait(sks->getRange(ryw, KeySelector(firstGreaterOrEqual(key)), KeySelector(firstGreaterOrEqual(keyAfter(key))),
|
2020-04-09 04:38:12 +08:00
|
|
|
GetRangeLimits(CLIENT_KNOBS->TOO_MANY), false));
|
2020-03-04 10:35:24 +08:00
|
|
|
ASSERT(result.size() <= 1);
|
|
|
|
if (result.size()) {
|
|
|
|
return Optional<Value>(result[0].value);
|
|
|
|
} else {
|
|
|
|
return Optional<Value>();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-21 04:55:07 +08:00
|
|
|
Future<Optional<Value>> SpecialKeySpace::get(ReadYourWritesTransaction* ryw, const Key& key) {
|
2020-04-09 04:38:12 +08:00
|
|
|
return getActor(this, ryw, key);
|
2020-03-04 10:35:24 +08:00
|
|
|
}
|
|
|
|
|
2020-06-24 02:21:03 +08:00
|
|
|
ACTOR Future<Void> commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw) {
|
|
|
|
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys);
|
|
|
|
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Iterator iter = ranges.begin();
|
|
|
|
// TODO : update this set container
|
|
|
|
state std::set<SpecialKeyRangeRWImpl*> writeModulePtrs;
|
|
|
|
while (iter != ranges.end()) {
|
|
|
|
std::pair<bool, Optional<Value>> entry = iter->value();
|
|
|
|
if (entry.first) {
|
|
|
|
writeModulePtrs.insert(sks->getRWImpls().rangeContaining(iter->begin())->value());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
state std::set<SpecialKeyRangeRWImpl*>::const_iterator it;
|
|
|
|
for (it = writeModulePtrs.begin(); it != writeModulePtrs.end(); ++it)
|
|
|
|
wait((*it)->commit(ryw));
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> SpecialKeySpace::commit(ReadYourWritesTransaction* ryw) {
|
|
|
|
return commitActor(this, ryw);
|
|
|
|
}
|
|
|
|
|
2020-06-16 03:47:27 +08:00
|
|
|
ReadConflictRangeImpl::ReadConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
2020-04-29 00:00:06 +08:00
|
|
|
|
2020-05-21 04:55:07 +08:00
|
|
|
ACTOR static Future<Standalone<RangeResultRef>> getReadConflictRangeImpl(ReadYourWritesTransaction* ryw, KeyRange kr) {
|
2020-04-29 03:44:34 +08:00
|
|
|
wait(ryw->pendingReads());
|
2020-04-30 05:43:37 +08:00
|
|
|
return ryw->getReadConflictRangeIntersecting(kr);
|
2020-04-29 00:00:06 +08:00
|
|
|
}
|
|
|
|
|
2020-05-21 04:55:07 +08:00
|
|
|
Future<Standalone<RangeResultRef>> ReadConflictRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
2020-04-29 00:00:06 +08:00
|
|
|
KeyRangeRef kr) const {
|
|
|
|
return getReadConflictRangeImpl(ryw, kr);
|
|
|
|
}
|
|
|
|
|
2020-06-16 03:47:27 +08:00
|
|
|
WriteConflictRangeImpl::WriteConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
2020-04-29 01:34:10 +08:00
|
|
|
|
2020-05-21 04:55:07 +08:00
|
|
|
Future<Standalone<RangeResultRef>> WriteConflictRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
2020-04-29 01:34:10 +08:00
|
|
|
KeyRangeRef kr) const {
|
2020-04-30 05:43:37 +08:00
|
|
|
return ryw->getWriteConflictRangeIntersecting(kr);
|
2020-04-29 01:34:10 +08:00
|
|
|
}
|
|
|
|
|
2020-06-16 03:47:27 +08:00
|
|
|
ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
2020-04-07 04:23:41 +08:00
|
|
|
|
2020-05-21 04:55:07 +08:00
|
|
|
Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
2020-04-07 04:23:41 +08:00
|
|
|
Standalone<RangeResultRef> result;
|
2020-04-07 04:38:18 +08:00
|
|
|
if (ryw->getTransactionInfo().conflictingKeys) {
|
|
|
|
auto krMapPtr = ryw->getTransactionInfo().conflictingKeys.get();
|
|
|
|
auto beginIter = krMapPtr->rangeContaining(kr.begin);
|
|
|
|
if (beginIter->begin() != kr.begin) ++beginIter;
|
|
|
|
auto endIter = krMapPtr->rangeContaining(kr.end);
|
2020-04-07 04:23:41 +08:00
|
|
|
for (auto it = beginIter; it != endIter; ++it) {
|
2020-04-07 15:24:01 +08:00
|
|
|
// it->begin() is stored in the CoalescedKeyRangeMap in TransactionInfo
|
|
|
|
// it->value() is always constants in SystemData.cpp
|
|
|
|
// Thus, push_back() can be used
|
|
|
|
result.push_back(result.arena(), KeyValueRef(it->begin(), it->value()));
|
2020-04-07 04:23:41 +08:00
|
|
|
}
|
|
|
|
if (endIter->begin() != kr.end)
|
2020-04-07 15:24:01 +08:00
|
|
|
result.push_back(result.arena(), KeyValueRef(endIter->begin(), endIter->value()));
|
2020-04-07 04:23:41 +08:00
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2020-06-12 03:22:19 +08:00
|
|
|
ACTOR Future<Standalone<RangeResultRef>> ddMetricsGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
|
2020-05-19 01:38:23 +08:00
|
|
|
try {
|
|
|
|
auto keys = kr.removePrefix(ddStatsRange.begin);
|
|
|
|
Standalone<VectorRef<DDMetricsRef>> resultWithoutPrefix =
|
|
|
|
wait(waitDataDistributionMetricsList(ryw->getDatabase(), keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT));
|
|
|
|
Standalone<RangeResultRef> result;
|
|
|
|
for (const auto& ddMetricsRef : resultWithoutPrefix) {
|
2020-05-20 11:48:21 +08:00
|
|
|
// each begin key is the previous end key, thus we only encode the begin key in the result
|
2020-05-19 01:38:23 +08:00
|
|
|
KeyRef beginKey = ddMetricsRef.beginKey.withPrefix(ddStatsRange.begin, result.arena());
|
2020-05-21 06:59:01 +08:00
|
|
|
// Use json string encoded in utf-8 to encode the values, easy for adding more fields in the future
|
|
|
|
json_spirit::mObject statsObj;
|
|
|
|
statsObj["ShardBytes"] = ddMetricsRef.shardBytes;
|
|
|
|
std::string statsString =
|
|
|
|
json_spirit::write_string(json_spirit::mValue(statsObj), json_spirit::Output_options::raw_utf8);
|
|
|
|
ValueRef bytes(result.arena(), statsString);
|
2020-05-19 01:38:23 +08:00
|
|
|
result.push_back(result.arena(), KeyValueRef(beginKey, bytes));
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
} catch (Error& e) {
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-12 03:22:19 +08:00
|
|
|
DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {}
|
2020-05-19 01:38:23 +08:00
|
|
|
|
2020-05-22 02:26:34 +08:00
|
|
|
Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
2020-06-12 03:22:19 +08:00
|
|
|
return ddMetricsGetRangeActor(ryw, kr);
|
2020-05-19 01:38:23 +08:00
|
|
|
}
|
2020-06-15 13:29:44 +08:00
|
|
|
|
|
|
|
// Management API - exclude / include
|
2020-06-24 02:21:03 +08:00
|
|
|
ACTOR Future<Standalone<RangeResultRef>> excludeServersGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef range, KeyRangeRef kr) {
|
2020-06-15 13:29:44 +08:00
|
|
|
// get all keys under \xff/conf/excluded, \xff/conf/excluded
|
2020-06-24 02:21:03 +08:00
|
|
|
KeyRangeRef krWithoutPrefix = kr.removePrefix(normalKeys.end);
|
|
|
|
ASSERT(excludedServersKeys.contains(krWithoutPrefix));
|
|
|
|
Standalone<RangeResultRef> resultWithoutPrefix = wait(ryw->getRange(krWithoutPrefix, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
ASSERT(!resultWithoutPrefix.more && resultWithoutPrefix.size() < CLIENT_KNOBS->TOO_MANY);
|
|
|
|
Standalone<RangeResultRef> result;
|
|
|
|
// TODO : add support for readYourWritesDisabled
|
|
|
|
if (ryw->readYourWritesDisabled()) {
|
|
|
|
for (const KeyValueRef& kv : resultWithoutPrefix) {
|
|
|
|
KeyRef rk = kv.key.withPrefix(normalKeys.end, result.arena());
|
|
|
|
ValueRef rv(result.arena(), kv.value);
|
|
|
|
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
|
|
|
|
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Iterator iter = ranges.begin();
|
|
|
|
int index = 0;
|
|
|
|
while (iter != ranges.end()) {
|
|
|
|
// add all previous entries into result
|
|
|
|
while (index < resultWithoutPrefix.size() && resultWithoutPrefix[index].key.withPrefix(normalKeys.end) < iter->begin()) {
|
|
|
|
const KeyValueRef& kv = resultWithoutPrefix[index];
|
|
|
|
KeyRef rk = kv.key.withPrefix(normalKeys.end, result.arena());
|
|
|
|
ValueRef rv(result.arena(), kv.value);
|
|
|
|
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
|
|
|
++index;
|
|
|
|
}
|
|
|
|
std::pair<bool, Optional<Value>> entry = iter->value();
|
|
|
|
if (entry.first) {
|
|
|
|
// add the writen entries if exists
|
|
|
|
if (entry.second.present()) {
|
|
|
|
KeyRef rk(result.arena(), iter->begin());
|
|
|
|
ValueRef rv(result.arena(), entry.second.get());
|
|
|
|
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
|
|
|
}
|
|
|
|
// move index to skip all entries in the iter->range
|
|
|
|
while (index < resultWithoutPrefix.size() && iter->range().contains(resultWithoutPrefix[index].key.withPrefix(normalKeys.end)))
|
|
|
|
++index;
|
|
|
|
}
|
|
|
|
++iter;
|
|
|
|
}
|
|
|
|
// add all remaining entries into result
|
|
|
|
while (index < resultWithoutPrefix.size()) {
|
|
|
|
const KeyValueRef& kv = resultWithoutPrefix[index];
|
|
|
|
KeyRef rk = kv.key.withPrefix(normalKeys.end, result.arena());
|
|
|
|
ValueRef rv(result.arena(), kv.value);
|
|
|
|
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
|
|
|
++index;
|
|
|
|
}
|
|
|
|
}
|
2020-06-15 13:29:44 +08:00
|
|
|
return result;
|
|
|
|
}
|
2020-06-15 13:39:20 +08:00
|
|
|
|
2020-06-24 02:21:03 +08:00
|
|
|
ExcludeServersRangeImpl::ExcludeServersRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
2020-06-15 13:39:20 +08:00
|
|
|
|
|
|
|
Future<Standalone<RangeResultRef>> ExcludeServersRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
|
|
|
KeyRangeRef kr) const {
|
2020-06-24 02:21:03 +08:00
|
|
|
return excludeServersGetRangeActor(ryw, getKeyRange(), kr);
|
|
|
|
}
|
|
|
|
|
|
|
|
void ExcludeServersRangeImpl::set(ReadYourWritesTransaction *ryw, const KeyRef &key, const ValueRef &value) {
|
|
|
|
// TODO : check value valid
|
|
|
|
Value val(value);
|
|
|
|
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(val)));
|
|
|
|
}
|
|
|
|
|
|
|
|
void ExcludeServersRangeImpl::clear(ReadYourWritesTransaction *ryw, const KeyRef &key) {
|
|
|
|
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
|
|
|
|
}
|
|
|
|
|
|
|
|
void ExcludeServersRangeImpl::clear(ReadYourWritesTransaction *ryw, const KeyRangeRef &range) {
|
|
|
|
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
|
|
|
|
}
|
|
|
|
|
|
|
|
Future<Void> ExcludeServersRangeImpl::commit(ReadYourWritesTransaction *ryw) {
|
|
|
|
// Do all the checks
|
|
|
|
return Void();
|
2020-06-15 13:39:20 +08:00
|
|
|
}
|