Adding write support for special-key-space
This commit is contained in:
parent
59361cedaa
commit
fc8715dacd
|
@ -1044,6 +1044,8 @@ public:
|
|||
ACTOR static Future<Void> commit( ReadYourWritesTransaction *ryw ) {
|
||||
try {
|
||||
ryw->commitStarted = true;
|
||||
// TODO : special-key-space delayed check, I guess we should first set the commit start flag
|
||||
wait(ryw->getDatabase()->specialKeySpace->commit(ryw));
|
||||
|
||||
Future<Void> ready = ryw->reading;
|
||||
wait( ryw->resetPromise.getFuture() || ready );
|
||||
|
@ -1160,7 +1162,8 @@ public:
|
|||
|
||||
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
|
||||
: cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false),
|
||||
options(tr), deferredError(cx->deferredError), versionStampFuture(tr.getVersionstamp()) {
|
||||
options(tr), deferredError(cx->deferredError), versionStampFuture(tr.getVersionstamp()),
|
||||
specialKeySpaceWriteMap(std::make_pair(false, Optional<Value>()), specialKeys.end) {
|
||||
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
|
||||
std::back_inserter(persistentOptions));
|
||||
applyPersistentOptions();
|
||||
|
@ -2048,6 +2051,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N
|
|||
nativeReadRanges = std::move(r.nativeReadRanges);
|
||||
nativeWriteRanges = std::move(r.nativeWriteRanges);
|
||||
versionStampKeys = std::move(r.versionStampKeys);
|
||||
specialKeySpaceWriteMap = std::move(r.specialKeySpaceWriteMap);
|
||||
}
|
||||
|
||||
ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT :
|
||||
|
@ -2075,6 +2079,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
|
|||
nativeReadRanges = std::move(r.nativeReadRanges);
|
||||
nativeWriteRanges = std::move(r.nativeWriteRanges);
|
||||
versionStampKeys = std::move(r.versionStampKeys);
|
||||
specialKeySpaceWriteMap = std::move(r.specialKeySpaceWriteMap); // TODO: Check why copy constructor is deleted
|
||||
}
|
||||
|
||||
Future<Void> ReadYourWritesTransaction::onError(Error const& e) {
|
||||
|
@ -2112,6 +2117,7 @@ void ReadYourWritesTransaction::resetRyow() {
|
|||
versionStampKeys = VectorRef<KeyRef>();
|
||||
nativeReadRanges = Standalone<VectorRef<KeyRangeRef>>();
|
||||
nativeWriteRanges = Standalone<VectorRef<KeyRangeRef>>();
|
||||
specialKeySpaceWriteMap.rawErase(specialKeys); // TODO : check rawErase
|
||||
watchMap.clear();
|
||||
reading = AndFuture();
|
||||
approximateSize = 0;
|
||||
|
|
|
@ -149,6 +149,9 @@ public:
|
|||
|
||||
bool specialKeySpaceRelaxed() const { return options.specialKeySpaceRelaxed; }
|
||||
|
||||
KeyRangeMap<std::pair<bool, Optional<Value>>>& getSpecialKeySpaceWriteMap() {return specialKeySpaceWriteMap;}
|
||||
bool readYourWritesDisabled() const { return options.readYourWritesDisabled; }
|
||||
|
||||
private:
|
||||
friend class RYWImpl;
|
||||
|
||||
|
@ -176,6 +179,8 @@ private:
|
|||
|
||||
Reference<TransactionDebugInfo> transactionDebugInfo;
|
||||
|
||||
KeyRangeMap<std::pair<bool, Optional<Value>>> specialKeySpaceWriteMap;
|
||||
|
||||
void resetTimeout();
|
||||
void updateConflictMap( KeyRef const& key, WriteMap::iterator& it ); // pre: it.segmentContains(key)
|
||||
void updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ); // pre: it.segmentContains(keys.begin), keys are already inside this->arena
|
||||
|
|
|
@ -93,7 +93,7 @@ ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeReadImpl*
|
|||
ks->setKey(KeyRef(ks->arena(), result[ks->offset - 1].key));
|
||||
ks->offset = 1;
|
||||
} else {
|
||||
ks->setKey(KeyRef(ks->arena(), keyAfter(result[result.size() - 1].key)));
|
||||
ks->setKey(KeyRef(ks->arena(), keyAfter(result[result.size() - 1].key))); // TODO : the keyAfter will just return if key == \xff\xff
|
||||
ks->offset -= result.size();
|
||||
}
|
||||
}
|
||||
|
@ -311,6 +311,27 @@ Future<Optional<Value>> SpecialKeySpace::get(ReadYourWritesTransaction* ryw, con
|
|||
return getActor(this, ryw, key);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw) {
|
||||
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys);
|
||||
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Iterator iter = ranges.begin();
|
||||
// TODO : update this set container
|
||||
state std::set<SpecialKeyRangeRWImpl*> writeModulePtrs;
|
||||
while (iter != ranges.end()) {
|
||||
std::pair<bool, Optional<Value>> entry = iter->value();
|
||||
if (entry.first) {
|
||||
writeModulePtrs.insert(sks->getRWImpls().rangeContaining(iter->begin())->value());
|
||||
}
|
||||
}
|
||||
state std::set<SpecialKeyRangeRWImpl*>::const_iterator it;
|
||||
for (it = writeModulePtrs.begin(); it != writeModulePtrs.end(); ++it)
|
||||
wait((*it)->commit(ryw));
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> SpecialKeySpace::commit(ReadYourWritesTransaction* ryw) {
|
||||
return commitActor(this, ryw);
|
||||
}
|
||||
|
||||
ReadConflictRangeImpl::ReadConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
||||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> getReadConflictRangeImpl(ReadYourWritesTransaction* ryw, KeyRange kr) {
|
||||
|
@ -381,17 +402,81 @@ Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTran
|
|||
}
|
||||
|
||||
// Management API - exclude / include
|
||||
ACTOR Future<Standalone<RangeResultRef>> excludeServersGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
|
||||
ACTOR Future<Standalone<RangeResultRef>> excludeServersGetRangeActor(ReadYourWritesTransaction* ryw, KeyRangeRef range, KeyRangeRef kr) {
|
||||
// get all keys under \xff/conf/excluded, \xff/conf/excluded
|
||||
ASSERT(excludedServersKeys.contains(kr));
|
||||
Standalone<RangeResultRef> result = wait(ryw->getRange(kr, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!result.more && result.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
KeyRangeRef krWithoutPrefix = kr.removePrefix(normalKeys.end);
|
||||
ASSERT(excludedServersKeys.contains(krWithoutPrefix));
|
||||
Standalone<RangeResultRef> resultWithoutPrefix = wait(ryw->getRange(krWithoutPrefix, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!resultWithoutPrefix.more && resultWithoutPrefix.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
Standalone<RangeResultRef> result;
|
||||
// TODO : add support for readYourWritesDisabled
|
||||
if (ryw->readYourWritesDisabled()) {
|
||||
for (const KeyValueRef& kv : resultWithoutPrefix) {
|
||||
KeyRef rk = kv.key.withPrefix(normalKeys.end, result.arena());
|
||||
ValueRef rv(result.arena(), kv.value);
|
||||
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
||||
}
|
||||
} else {
|
||||
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
|
||||
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Iterator iter = ranges.begin();
|
||||
int index = 0;
|
||||
while (iter != ranges.end()) {
|
||||
// add all previous entries into result
|
||||
while (index < resultWithoutPrefix.size() && resultWithoutPrefix[index].key.withPrefix(normalKeys.end) < iter->begin()) {
|
||||
const KeyValueRef& kv = resultWithoutPrefix[index];
|
||||
KeyRef rk = kv.key.withPrefix(normalKeys.end, result.arena());
|
||||
ValueRef rv(result.arena(), kv.value);
|
||||
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
||||
++index;
|
||||
}
|
||||
std::pair<bool, Optional<Value>> entry = iter->value();
|
||||
if (entry.first) {
|
||||
// add the writen entries if exists
|
||||
if (entry.second.present()) {
|
||||
KeyRef rk(result.arena(), iter->begin());
|
||||
ValueRef rv(result.arena(), entry.second.get());
|
||||
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
||||
}
|
||||
// move index to skip all entries in the iter->range
|
||||
while (index < resultWithoutPrefix.size() && iter->range().contains(resultWithoutPrefix[index].key.withPrefix(normalKeys.end)))
|
||||
++index;
|
||||
}
|
||||
++iter;
|
||||
}
|
||||
// add all remaining entries into result
|
||||
while (index < resultWithoutPrefix.size()) {
|
||||
const KeyValueRef& kv = resultWithoutPrefix[index];
|
||||
KeyRef rk = kv.key.withPrefix(normalKeys.end, result.arena());
|
||||
ValueRef rv(result.arena(), kv.value);
|
||||
result.push_back(result.arena(), KeyValueRef(rk, rv));
|
||||
++index;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ExcludeServersRangeImpl::ExcludeServersRangeImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
||||
ExcludeServersRangeImpl::ExcludeServersRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
|
||||
|
||||
Future<Standalone<RangeResultRef>> ExcludeServersRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
return excludeServersGetRangeActor(ryw, kr);
|
||||
return excludeServersGetRangeActor(ryw, getKeyRange(), kr);
|
||||
}
|
||||
|
||||
void ExcludeServersRangeImpl::set(ReadYourWritesTransaction *ryw, const KeyRef &key, const ValueRef &value) {
|
||||
// TODO : check value valid
|
||||
Value val(value);
|
||||
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(val)));
|
||||
}
|
||||
|
||||
void ExcludeServersRangeImpl::clear(ReadYourWritesTransaction *ryw, const KeyRef &key) {
|
||||
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
|
||||
}
|
||||
|
||||
void ExcludeServersRangeImpl::clear(ReadYourWritesTransaction *ryw, const KeyRangeRef &range) {
|
||||
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
|
||||
}
|
||||
|
||||
Future<Void> ExcludeServersRangeImpl::commit(ReadYourWritesTransaction *ryw) {
|
||||
// Do all the checks
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -51,15 +51,18 @@ protected:
|
|||
KeyRange range; // underlying key range for this function
|
||||
};
|
||||
|
||||
class SpecialKeyRangeWriteImpl {
|
||||
class SpecialKeyRangeRWImpl : SpecialKeyRangeReadImpl{
|
||||
public:
|
||||
virtual void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value ) = 0;
|
||||
virtual void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range ) = 0;
|
||||
virtual void clear(ReadYourWritesTransaction* ryw, const KeyRef& key ) = 0;
|
||||
virtual Future<Void> commit(ReadYourWritesTransaction* ryw) = 0; // all delayed async operations of writes in special-key-space
|
||||
|
||||
explicit SpecialKeyRangeWriteImpl(KeyRangeRef kr) : range(kr) {}
|
||||
explicit SpecialKeyRangeRWImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
||||
KeyRangeRef getKeyRange() const { return range; }
|
||||
|
||||
virtual ~SpecialKeyRangeRWImpl() {}
|
||||
|
||||
protected:
|
||||
KeyRange range;
|
||||
};
|
||||
|
@ -117,19 +120,22 @@ public:
|
|||
WORKERINTERFACE,
|
||||
};
|
||||
|
||||
Future<Optional<Value>> get(ReadYourWritesTransaction* ryw, const Key& key);
|
||||
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeySelector begin, KeySelector end,
|
||||
GetRangeLimits limits, bool reverse = false);
|
||||
|
||||
SpecialKeySpace(KeyRef spaceStartKey = Key(), KeyRef spaceEndKey = normalKeys.end, bool testOnly = true)
|
||||
: range(KeyRangeRef(spaceStartKey, spaceEndKey)), readImpls(nullptr, spaceEndKey),
|
||||
readModules(testOnly ? SpecialKeySpace::MODULE::TESTONLY : SpecialKeySpace::MODULE::UNKNOWN, spaceEndKey) {
|
||||
// Default begin of KeyRangeMap is Key(), insert the range to update start key if needed
|
||||
readImpls.insert(range, nullptr);
|
||||
if (!testOnly) readModulesBoundaryInit(); // testOnly is used in the correctness workload
|
||||
writeImpls = KeyRangeMap<SpecialKeyRangeWriteImpl*>(nullptr, spaceEndKey);
|
||||
writeImpls = KeyRangeMap<SpecialKeyRangeRWImpl*>(nullptr, spaceEndKey);
|
||||
}
|
||||
|
||||
Future<Optional<Value>> get(ReadYourWritesTransaction* ryw, const Key& key);
|
||||
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeySelector begin, KeySelector end,
|
||||
GetRangeLimits limits, bool reverse = false);
|
||||
|
||||
Future<Void> commit(ReadYourWritesTransaction* ryw);
|
||||
|
||||
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value ) {
|
||||
auto impl = writeImpls[key];
|
||||
if (impl == nullptr)
|
||||
|
@ -173,7 +179,7 @@ public:
|
|||
}
|
||||
|
||||
KeyRangeMap<SpecialKeyRangeReadImpl*>& getReadImpls() { return readImpls; }
|
||||
KeyRangeMap<SpecialKeyRangeWriteImpl*>& getWriteImpls() { return writeImpls; }
|
||||
KeyRangeMap<SpecialKeyRangeRWImpl*>& getRWImpls() { return writeImpls; }
|
||||
KeyRangeMap<SpecialKeySpace::MODULE>& getModules() { return readModules; }
|
||||
KeyRangeRef getKeyRange() const { return range; }
|
||||
|
||||
|
@ -190,7 +196,7 @@ private:
|
|||
|
||||
KeyRangeMap<SpecialKeyRangeReadImpl*> readImpls;
|
||||
KeyRangeMap<SpecialKeySpace::MODULE> readModules;
|
||||
KeyRangeMap<SpecialKeyRangeWriteImpl*> writeImpls;
|
||||
KeyRangeMap<SpecialKeyRangeRWImpl*> writeImpls;
|
||||
KeyRange range; // key space range, (\xff\xff, \xff\xff\xff\xf) in prod and (, \xff) in test
|
||||
|
||||
static std::unordered_map<SpecialKeySpace::MODULE, KeyRange> readModuleToBoundary;
|
||||
|
@ -226,10 +232,14 @@ public:
|
|||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
};
|
||||
|
||||
class ExcludeServersRangeImpl : public SpecialKeyRangeReadImpl {
|
||||
class ExcludeServersRangeImpl : public SpecialKeyRangeRWImpl {
|
||||
public:
|
||||
explicit ExcludeServersRangeImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value ) override;
|
||||
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range ) override;
|
||||
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key ) override;
|
||||
Future<Void> commit(ReadYourWritesTransaction* ryw) override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
Loading…
Reference in New Issue