
1876 lines
67 KiB

* This source file is part of the FoundationDB open source project
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "flow/actorcompiler.h"
#include "ReadYourWrites.h"
#include "Atomic.h"
#include "DatabaseContext.h"
#include "StatusClient.h"
#include "MonitorLeader.h"
class RYWImpl {
template<class Iter> static void dump( Iter it ) {
Arena arena;
while( true ) {
TraceEvent("RYWDump").detail("Begin", printable(it.beginKey().toStandaloneStringRef()))
.detail("End", printable(it.endKey().toStandaloneStringRef()))
.detail("Unknown", it.is_unknown_range())
.detail("Empty", it.is_empty_range())
.detail("KV", it.is_kv())
.detail("Key", printable(it.is_kv() ? it.kv(arena).key : StringRef()));
if( it.endKey() == allKeys.end )
struct GetValueReq {
explicit GetValueReq( Key key ) : key(key) {}
Key key;
typedef Optional<Value> Result;
struct GetKeyReq {
explicit GetKeyReq( KeySelector key ) : key(key) {}
KeySelector key;
typedef Key Result;
template <bool Reverse>
struct GetRangeReq {
GetRangeReq( KeySelector begin, KeySelector end, GetRangeLimits limits ) : begin(begin), end(end), limits(limits) {}
KeySelector begin, end;
GetRangeLimits limits;
typedef Standalone<RangeResultRef> Result;
// read() Performs a read (get, getKey, getRange, etc), in the context of the given transaction. Snapshot or RYW reads are distingushed by the type Iter being SnapshotCache::iterator or RYWIterator.
// Fills in the snapshot cache as a side effect but does not affect conflict ranges.
// Some (indicated) overloads of read are required to update the given *it to point to the key that was read, so that the corresponding overload of addConflictRange() can make use of it.
ACTOR template<class Iter> static Future< Optional<Value> > read( ReadYourWritesTransaction *ryw, GetValueReq read, Iter* it ) {
// This overload is required to provide postcondition: it->extractWriteMapIterator().segmentContains(read.key)
state bool dependent = it->is_dependent();
if( it->is_kv() ) {
return it->kv(ryw->arena).value;
} else if( it->is_empty_range() ) {
return Optional<Value>();
} else {
Optional<Value> res = wait( ryw->tr.get( read.key, true ) );
KeyRef k( ryw->arena, read.key );
if( res.present() ) {
if( ryw->cache.insert( k, res.get() ) )
if( !dependent )
return res;
} else {
ryw->cache.insert( k, Optional<ValueRef>() );
if( !dependent )
return Optional<Value>();
//There was a dependent write at the key, so we need to lookup the iterator again
ASSERT( it->is_kv() );
return it->kv(ryw->arena).value;
ACTOR template<class Iter> static Future< Key > read( ReadYourWritesTransaction* ryw, GetKeyReq read, Iter* it ) {
if( read.key.offset > 0 ) {
Standalone<RangeResultRef> result = wait( getRangeValue( ryw, read.key, firstGreaterOrEqual(ryw->getMaxReadKey()), GetRangeLimits(1), it ) );
if( result.readToBegin )
return allKeys.begin;
if( result.readThroughEnd || !result.size() )
return ryw->getMaxReadKey();
return result[0].key;
} else {
Standalone<RangeResultRef> result = wait( getRangeValueBack( ryw, firstGreaterOrEqual(allKeys.begin), read.key, GetRangeLimits(1), it ) );
if( result.readThroughEnd )
return ryw->getMaxReadKey();
if( result.readToBegin || !result.size() )
return allKeys.begin;
return result[0].key;
template <class Iter> static Future< Standalone<RangeResultRef> > read( ReadYourWritesTransaction* ryw, GetRangeReq<false> read, Iter* it ) {
return getRangeValue( ryw, read.begin, read.end, read.limits, it );
template <class Iter> static Future< Standalone<RangeResultRef> > read( ReadYourWritesTransaction* ryw, GetRangeReq<true> read, Iter* it ) {
return getRangeValueBack( ryw, read.begin, read.end, read.limits, it );
// readThrough() performs a read in the RYW disabled case, passing it on relatively directly to the underlying transaction.
// Responsible for clipping results to the non-system keyspace when appropriate, since NativeAPI doesn't do that.
static Future<Optional<Value>> readThrough( ReadYourWritesTransaction *ryw, GetValueReq read, bool snapshot ) {
return ryw->tr.get( read.key, snapshot );
ACTOR static Future<Key> readThrough( ReadYourWritesTransaction *ryw, GetKeyReq read, bool snapshot ) {
Key key = wait( ryw->tr.getKey( read.key, snapshot ) );
if (ryw->getMaxReadKey() < key) return ryw->getMaxReadKey(); // Filter out results in the system keys if they are not accessible
return key;
ACTOR template <bool Reverse> static Future<Standalone<RangeResultRef>> readThrough( ReadYourWritesTransaction *ryw, GetRangeReq<Reverse> read, bool snapshot ) {
if(Reverse && read.end.offset > 1) {
// FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result actually does.
Key key = wait( ryw->tr.getKey(read.end, snapshot) );
if(key > ryw->getMaxReadKey())
read.end = firstGreaterOrEqual(ryw->getMaxReadKey());
read.end = firstGreaterOrEqual(key);
Standalone<RangeResultRef> v = wait( ryw->tr.getRange(read.begin, read.end, read.limits, snapshot, Reverse) );
KeyRef maxKey = ryw->getMaxReadKey();
if(v.size() > 0) {
if(!Reverse && v[v.size()-1].key >= maxKey) {
state Standalone<RangeResultRef> _v = v;
int i = _v.size() - 2;
for(; i >= 0 && _v[i].key >= maxKey; --i) { }
return Standalone<RangeResultRef>(RangeResultRef( VectorRef<KeyValueRef>(&_v[0], i+1), false ), _v.arena());
return v;
// addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant conflict range
static void addConflictRange( ReadYourWritesTransaction* ryw, GetValueReq read, WriteMap::iterator& it, Optional<Value> result ) {
// it will already point to the right segment (see the calling code in read()), so we don't need to skip
// read.key will be copied into ryw->arena inside of updateConflictMap if it is being added
ryw->updateConflictMap(read.key, it);
static void addConflictRange( ReadYourWritesTransaction* ryw, GetKeyReq read, WriteMap::iterator& it, Key result ) {
KeyRangeRef readRange;
if( read.key.offset <= 0 )
readRange = KeyRangeRef( KeyRef( ryw->arena, result ), read.key.orEqual ? keyAfter( read.key.getKey(), ryw->arena ) : KeyRef( ryw->arena, read.key.getKey() ) );
readRange = KeyRangeRef( read.key.orEqual ? keyAfter( read.key.getKey(), ryw->arena ) : KeyRef( ryw->arena, read.key.getKey() ), keyAfter( result, ryw->arena ) );
it.skip( readRange.begin );
ryw->updateConflictMap(readRange, it);
static void addConflictRange( ReadYourWritesTransaction* ryw, GetRangeReq<false> read, WriteMap::iterator &it, Standalone<RangeResultRef> const& result ) {
KeyRef rangeBegin, rangeEnd;
bool endInArena = false;
if( read.begin.getKey() < read.end.getKey() ) {
rangeBegin = read.begin.getKey();
rangeEnd = read.end.offset > 0 && result.more ? read.begin.getKey() : read.end.getKey();
else {
rangeBegin = read.end.getKey();
rangeEnd = read.begin.getKey();
if( result.readToBegin && read.begin.offset <= 0 ) rangeBegin = allKeys.begin;
if( result.readThroughEnd && read.end.offset > 0 ) rangeEnd = ryw->getMaxReadKey();
if ( result.size() ) {
if( read.begin.offset <= 0 ) rangeBegin = std::min( rangeBegin, result[0].key );
if( rangeEnd <= result.end()[-1].key ) {
rangeEnd = keyAfter( result.end()[-1].key, ryw->arena );
endInArena = true;
KeyRangeRef readRange = KeyRangeRef( KeyRef( ryw->arena, rangeBegin ), endInArena ? rangeEnd : KeyRef( ryw->arena, rangeEnd ) );
it.skip( readRange.begin );
ryw->updateConflictMap(readRange, it);
static void addConflictRange( ReadYourWritesTransaction* ryw, GetRangeReq<true> read, WriteMap::iterator& it, Standalone<RangeResultRef> const& result ) {
KeyRef rangeBegin, rangeEnd;
bool endInArena = false;
if( read.begin.getKey() < read.end.getKey() ) {
rangeBegin = read.begin.offset <= 0 && result.more ? read.end.getKey() : read.begin.getKey();
rangeEnd = read.end.getKey();
else {
rangeBegin = read.end.getKey();
rangeEnd = read.begin.getKey();
if( result.readToBegin && read.begin.offset <= 0 ) rangeBegin = allKeys.begin;
if( result.readThroughEnd && read.end.offset > 0 ) rangeEnd = ryw->getMaxReadKey();
if ( result.size() ) {
rangeBegin = std::min( rangeBegin, result.end()[-1].key );
if( read.end.offset > 0 && rangeEnd <= result[0].key ) {
rangeEnd = keyAfter( result[0].key, ryw->arena );
endInArena = true;
KeyRangeRef readRange = KeyRangeRef( KeyRef( ryw->arena, rangeBegin ), endInArena ? rangeEnd : KeyRef( ryw->arena, rangeEnd ) );
it.skip( readRange.begin );
ryw->updateConflictMap(readRange, it);
ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeThrough( ReadYourWritesTransaction* ryw, Req req, bool snapshot ) {
choose {
when (typename Req::Result result = wait( readThrough( ryw, req, snapshot ) )) {
return result;
when (Void _ = wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeSnapshot( ReadYourWritesTransaction* ryw, Req req ) {
state SnapshotCache::iterator it(&ryw->cache, &ryw->writes);
choose {
when (typename Req::Result result = wait( read( ryw, req, &it ) )) {
return result;
when (Void _ = wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeRYW( ReadYourWritesTransaction* ryw, Req req, bool snapshot ) {
state RYWIterator it( &ryw->cache, &ryw->writes );
choose {
when (typename Req::Result result = wait( read( ryw, req, &it ) )) {
// Some overloads of addConflictRange() require it to point to the "right" key and others don't. The corresponding overloads of read() have to provide that guarantee!
addConflictRange( ryw, req, it.extractWriteMapIterator(), result );
return result;
when (Void _ = wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
template <class Req> static inline Future<typename Req::Result> readWithConflictRange( ReadYourWritesTransaction* ryw, Req const& req, bool snapshot ) {
if (ryw->options.readYourWritesDisabled) {
return readWithConflictRangeThrough(ryw, req, snapshot);
} else if (snapshot && ryw->options.snapshotRywEnabled <= 0) {
return readWithConflictRangeSnapshot(ryw, req);
return readWithConflictRangeRYW(ryw, req, snapshot);
template<class Iter> static void resolveKeySelectorFromCache( KeySelector& key, Iter& it, KeyRef const& maxKey, bool* readToBegin, bool* readThroughEnd, int* actualOffset ) {
// If the key indicated by `key` can be determined without reading unknown data from the snapshot, then it.kv().key is the resolved key.
// If the indicated key is determined to be "off the beginning or end" of the database, it points to the first or last segment in the DB,
// and key is an equivalent key selector relative to the beginning or end of the database.
// Otherwise it points to an unknown segment, and key is an equivalent key selector whose base key is in or adjoining the segment.
bool alreadyExhausted = key.offset == 1;
it.skip( key.getKey() ); // TODO: or precondition?
if ( key.offset <= 0 && it.beginKey() == key.getKey() && key.getKey() != allKeys.begin )
ExtStringRef keykey = key.getKey();
bool keyNeedsCopy = false;
// Invariant: it.beginKey() <= keykey && keykey <= it.endKey() && (key.isBackward() ? it.beginKey() != keykey : it.endKey() != keykey)
// Maintaining this invariant, we transform the key selector toward firstGreaterOrEqual form until we reach an unknown range or the result
while (key.offset > 1 && !it.is_unreadable() && !it.is_unknown_range() && it.endKey() < maxKey ) {
if (it.is_kv())
keykey = it.beginKey();
keyNeedsCopy = true;
while (key.offset < 1 && !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() != allKeys.begin) {
if (it.is_kv()) {
if (key.offset == 1) {
keykey = it.beginKey();
keyNeedsCopy = true;
keykey = it.endKey();
keyNeedsCopy = true;
if(!alreadyExhausted) {
*actualOffset = key.offset;
if (!it.is_unreadable() && !it.is_unknown_range() && key.offset < 1) {
*readToBegin = true;
key.offset = 1;
if (!it.is_unreadable() && !it.is_unknown_range() && key.offset > 1) {
*readThroughEnd = true;
key.setKey(maxKey); // maxKey is a KeyRef, but points to a LiteralStringRef. TODO: how can we ASSERT this?
key.offset = 1;
while (!it.is_unreadable() && it.is_empty_range() && it.endKey() < maxKey) {
keykey = it.beginKey();
keyNeedsCopy = true;
if(keyNeedsCopy) {
else {
static KeyRangeRef getKnownKeyRange( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) {
StringRef beginKey = begin.offset<=1 ? begin.getKey() : allKeys.end;
ExtStringRef endKey = !data.more && end.offset>=1 ? end.getKey() : allKeys.begin;
if (data.readToBegin) beginKey = allKeys.begin;
if (data.readThroughEnd) endKey = allKeys.end;
if( data.size() ) {
beginKey = std::min( beginKey, data[0].key );
if( data.readThrough.present() )
endKey = std::max<ExtStringRef>( endKey, data.readThrough.get() );
endKey = !data.more && data.end()[-1].key < endKey ? endKey : ExtStringRef( data.end()[-1].key, 1 );
if (beginKey >= endKey) return KeyRangeRef();
return KeyRangeRef( StringRef(arena, beginKey), endKey.toArena(arena));
// Pre: it points to an unknown range
// Increments it to point to the unknown range just before the next nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
template<class Iter> static int skipUncached( Iter& it, Iter const& end, int iterationLimit ) {
ExtStringRef b = it.beginKey();
ExtStringRef e = it.endKey();
int singleEmpty = 0;
ASSERT( !it.is_unreadable() && it.is_unknown_range() );
// b is the beginning of the most recent contiguous *empty* range
// e is it.endKey()
while( it != end && --iterationLimit>=0 ) {
if (it.is_unreadable() || it.is_empty_range()) {
if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
while (it.is_unreadable() || !it.is_unknown_range())
return singleEmpty;
} else
b = e;
e = it.endKey();
while (it.is_unreadable() || !it.is_unknown_range())
return singleEmpty;
// Pre: it points to an unknown range
// Returns the number of following empty single-key known ranges between it and the next nontrivial known range, but no more than maxClears
// Leaves `it` in an indeterminate state
template<class Iter> static int countUncached( Iter&& it, KeyRef maxKey, int maxClears ) {
if (maxClears<=0) return 0;
ExtStringRef b = it.beginKey();
ExtStringRef e = it.endKey();
int singleEmpty = 0;
while( e < maxKey ) {
if (it.is_unreadable() || it.is_empty_range()) {
if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
return singleEmpty;
if( singleEmpty >= maxClears )
return maxClears;
} else
b = e;
e = it.endKey();
return singleEmpty;
static void setRequestLimits(GetRangeLimits &requestLimit, int64_t additionalRows, int offset, int requestCount) {
requestLimit.minRows = (int)std::min(std::max(1 + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
if(requestLimit.hasRowLimit()) {
requestLimit.rows = (int)std::min(std::max(std::max(1,requestLimit.rows) + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
// Calculating request byte limit
if(requestLimit.bytes==0) {
if(!requestLimit.hasRowLimit()) {
requestLimit.rows = (int)std::min(std::max(std::max(1,requestLimit.rows) + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
else if(requestLimit.hasByteLimit()) {
requestLimit.bytes = std::min(int64_t(requestLimit.bytes)<<std::min(requestCount, 20), (int64_t)CLIENT_KNOBS->REPLY_BYTE_LIMIT);
//TODO: read to begin, read through end flags for result
ACTOR template<class Iter> static Future< Standalone<RangeResultRef> > getRangeValue( ReadYourWritesTransaction *ryw, KeySelector begin, KeySelector end, GetRangeLimits limits, Iter* pit ) {
state Iter& it(*pit);
state Iter itEnd(*pit);
state Standalone<RangeResultRef> result;
state int64_t additionalRows = 0;
state int itemsPastEnd = 0;
state int requestCount = 0;
state bool readToBegin = false;
state bool readThroughEnd = false;
state int actualBeginOffset = begin.offset;
state int actualEndOffset = end.offset;
//state UID randomID = g_nondeterministic_random->randomUniqueID();
resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
if( begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) {
return RangeResultRef(false, false);
else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
|| ( end.isFirstGreaterOrEqual() && end.getKey() == allKeys.begin ) )
return RangeResultRef(readToBegin, readThroughEnd);
if( !end.isFirstGreaterOrEqual() && begin.getKey() > end.getKey() ) {
Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) );
if( resolvedEnd == allKeys.begin )
readToBegin = true;
if( resolvedEnd == ryw->getMaxReadKey() )
readThroughEnd = true;
if( begin.getKey() >= resolvedEnd && !begin.isBackward() ) {
return RangeResultRef(false, false);
else if( resolvedEnd == allKeys.begin ) {
return RangeResultRef(readToBegin, readThroughEnd);
resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
//TraceEvent("RYWSelectorsStartForward", randomID).detail("ByteLimit", limits.bytes).detail("RowLimit", limits.rows);
loop {
/*TraceEvent("RYWSelectors", randomID).detail("begin", begin.toString())
.detail("end", end.toString())
.detail("reached", limits.isReached())
.detail("itemsPastEnd", itemsPastEnd)
.detail("endOffset", -end.offset)
.detail("itBegin", printable(it.beginKey().toStandaloneStringRef()))
.detail("itEnd", printable(itEnd.beginKey().toStandaloneStringRef()))
.detail("unknown", it.is_unknown_range())
.detail("requests", requestCount);*/
if( !result.size() && begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) {
return RangeResultRef(false, false);
if( end.getKey() == allKeys.begin && end.offset <= 1 ) {
return RangeResultRef(readToBegin, readThroughEnd);
if( ( begin.getKey() >= end.getKey() && begin.offset >= end.offset ) ||
( begin.getKey() >= ryw->getMaxReadKey() && begin.offset >= 1) ) {
if( end.isFirstGreaterOrEqual() ) break;
if( !result.size() ) break;
Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
if( resolvedEnd == allKeys.begin )
readToBegin = true;
if( resolvedEnd == ryw->getMaxReadKey() )
readThroughEnd = true;
end = firstGreaterOrEqual( resolvedEnd );
if( it.beginKey() > itEnd.beginKey() && !it.is_unreadable() && !it.is_unknown_range() ) {
if( end.isFirstGreaterOrEqual() ) break;
return RangeResultRef(readToBegin, readThroughEnd);
if( limits.isReached() && itemsPastEnd >= 1-end.offset ) break;
if (it == itEnd && ((!it.is_unreadable() && !it.is_unknown_range()) || (begin.offset > 0 && end.isFirstGreaterOrEqual() && end.getKey() == it.beginKey()))) break;
if (it.is_unknown_range()) {
if( limits.hasByteLimit() && result.size() && itemsPastEnd >= 1-end.offset ) {
result.more = true;
Iter ucEnd(it);
int singleClears = 0;
int clearLimit = requestCount ? 1 << std::min(requestCount, 20) : 0;
if( it.beginKey() < itEnd.beginKey() )
singleClears = std::min(skipUncached(ucEnd, itEnd, BUGGIFY ? 0 : clearLimit + 100), clearLimit);
state KeySelector read_end;
if ( ucEnd!=itEnd ) {
read_end = firstGreaterOrEqual(ucEnd.endKey().toStandaloneStringRef());
if( end.offset < 1 ) additionalRows += 1 - end.offset; // extra for items past end
} else if( end.offset < 1 ) {
read_end = firstGreaterOrEqual( end.getKey() );
additionalRows += 1 - end.offset;
} else {
read_end = end;
if( end.offset > 1 ) {
singleClears += countUncached( std::move(ucEnd), ryw->getMaxReadKey(), clearLimit-singleClears);
read_end.offset += singleClears;
additionalRows += singleClears;
state KeySelector read_begin;
if (begin.isFirstGreaterOrEqual()) {
begin = firstGreaterOrEqual( it.beginKey() > begin.getKey() ? it.beginKey().toStandaloneStringRef() : begin.getKey() );
read_begin = begin;
} else if( begin.offset > 1 ) {
read_begin = firstGreaterOrEqual(begin.getKey());
additionalRows += begin.offset - 1;
} else {
read_begin = begin;
ucEnd = it;
singleClears = countUncachedBack(std::move(ucEnd), clearLimit);
read_begin.offset -= singleClears;
additionalRows += singleClears;
if(read_end.getKey() < read_begin.getKey()) {
state GetRangeLimits requestLimit = limits;
setRequestLimits(requestLimit, additionalRows, 2-read_begin.offset, requestCount);
ASSERT( !requestLimit.hasRowLimit() || requestLimit.rows > 0 );
ASSERT( requestLimit.hasRowLimit() || requestLimit.hasByteLimit() );
//TraceEvent("RYWIssuing", randomID).detail("begin", read_begin.toString()).detail("end", read_end.toString()).detail("bytes", requestLimit.bytes).detail("rows", requestLimit.rows).detail("limits", limits.bytes).detail("reached", limits.isReached()).detail("requestCount", requestCount).detail("singleClears", singleClears).detail("ucEnd", printable(ucEnd.beginKey().toStandaloneStringRef())).detail("minRows", requestLimit.minRows);
additionalRows = 0;
Standalone<RangeResultRef> snapshot_read = wait( ryw->tr.getRange( read_begin, read_end, requestLimit, true, false ) );
KeyRangeRef range = getKnownKeyRange( snapshot_read, read_begin, read_end, ryw->arena );
//TraceEvent("RYWCacheInsert", randomID).detail("Range", printable(range)).detail("expectedSize", snapshot_read.expectedSize()).detail("rows", snapshot_read.size()).detail("results", printable(snapshot_read)).detail("more", snapshot_read.more).detail("readToBegin", snapshot_read.readToBegin).detail("readThroughEnd", snapshot_read.readThroughEnd).detail("readThrough", printable(snapshot_read.readThrough));
if( ryw->cache.insert( range, snapshot_read ) )
// TODO: Is there a more efficient way to deal with invalidation?
resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
} else if (it.is_kv()) {
KeyValueRef const* start = &it.kv(ryw->arena);
it.skipContiguous( end.isFirstGreaterOrEqual() ? end.getKey() : ryw->getMaxReadKey() ); //not technically correct since this would add end.getKey(), but that is protected above
int maxCount = &it.kv(ryw->arena) - start + 1;
int count = 0;
for(; count < maxCount && !limits.isReached(); count++ ) {
itemsPastEnd += maxCount - count;
//TraceEvent("RYWaddKV", randomID).detail("key", printable(it.beginKey().toStandaloneStringRef())).detail("count", count).detail("maxCount", maxCount).detail("itemsPastEnd", itemsPastEnd);
if( count ) result.append( result.arena(), start, count );
} else
result.more = result.more || limits.isReached();
if( end.isFirstGreaterOrEqual() ) {
int keepItems = std::lower_bound( result.begin(), result.end(), end.getKey(), KeyValueRef::OrderByKey() ) - result.begin();
if( keepItems < result.size() )
result.more = false;
result.resize( result.arena(), keepItems );
result.readToBegin = readToBegin;
result.readThroughEnd = !result.more && readThroughEnd;
result.arena().dependsOn( ryw->arena );
return result;
static KeyRangeRef getKnownKeyRangeBack( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) {
StringRef beginKey = !data.more && begin.offset<=1 ? begin.getKey() : allKeys.end;
ExtStringRef endKey = end.offset>=1 ? end.getKey() : allKeys.begin;
if (data.readToBegin) beginKey = allKeys.begin;
if (data.readThroughEnd) endKey = allKeys.end;
if( data.size() ) {
beginKey = !data.more && data.end()[-1].key > beginKey ? beginKey : data.end()[-1].key;
if( data.readThrough.present() )
beginKey = std::min( data.readThrough.get(), beginKey );
endKey = data[0].key < endKey ? endKey : ExtStringRef( data[0].key, 1 );
if (beginKey >= endKey) return KeyRangeRef();
return KeyRangeRef( StringRef(arena, beginKey), endKey.toArena(arena));
// Pre: it points to an unknown range
// Decrements it to point to the unknown range just before the last nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
// Returns the number of single-key empty ranges skipped
template<class Iter> static int skipUncachedBack( Iter& it, Iter const& end, int iterationLimit ) {
ExtStringRef b = it.beginKey();
ExtStringRef e = it.endKey();
int singleEmpty = 0;
ASSERT(!it.is_unreadable() && it.is_unknown_range());
// b == it.beginKey()
// e is the end of the contiguous empty range containing it
while( it != end && --iterationLimit>=0) {
if (it.is_unreadable() || it.is_empty_range()) {
if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
while (it.is_unreadable() || !it.is_unknown_range())
return singleEmpty;
} else
e = b;
b = it.beginKey();
while (it.is_unreadable() || !it.is_unknown_range())
return singleEmpty;
// Pre: it points to an unknown range
// Returns the number of preceding empty single-key known ranges between it and the previous nontrivial known range, but no more than maxClears
// Leaves it in an indeterminate state
template<class Iter> static int countUncachedBack( Iter&& it, int maxClears ) {
if (maxClears <= 0) return 0;
ExtStringRef b = it.beginKey();
ExtStringRef e = it.endKey();
int singleEmpty = 0;
while( b > allKeys.begin ) {
if (it.is_unreadable() || it.is_empty_range()) {
if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
return singleEmpty;
if( singleEmpty >= maxClears )
return maxClears;
} else
e = b;
b = it.beginKey();
return singleEmpty;
ACTOR template<class Iter> static Future< Standalone<RangeResultRef> > getRangeValueBack( ReadYourWritesTransaction *ryw, KeySelector begin, KeySelector end, GetRangeLimits limits, Iter* pit ) {
state Iter& it(*pit);
state Iter itEnd(*pit);
state Standalone<RangeResultRef> result;
state int64_t additionalRows = 0;
state int itemsPastBegin = 0;
state int requestCount = 0;
state bool readToBegin = false;
state bool readThroughEnd = false;
state int actualBeginOffset = begin.offset;
state int actualEndOffset = end.offset;
//state UID randomID = g_nondeterministic_random->randomUniqueID();
resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
if( ( begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) ) {
return RangeResultRef(false, false);
else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
|| ( end.isFirstGreaterOrEqual() && end.getKey() == allKeys.begin ) )
return RangeResultRef(readToBegin, readThroughEnd);
if( !begin.isFirstGreaterOrEqual() && begin.getKey() > end.getKey() ) {
Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) );
if( resolvedBegin == allKeys.begin )
readToBegin = true;
if( resolvedBegin == ryw->getMaxReadKey() )
readThroughEnd = true;
if( resolvedBegin >= end.getKey() && end.offset <= 1 ) {
return RangeResultRef(false, false);
else if( resolvedBegin == ryw->getMaxReadKey() ) {
return RangeResultRef(readToBegin, readThroughEnd);
resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
//TraceEvent("RYWSelectorsStartReverse", randomID).detail("byteLimit", limits.bytes).detail("rowLimit", limits.rows);
loop {
/*TraceEvent("RYWSelectors", randomID).detail("begin", begin.toString())
.detail("end", end.toString())
.detail("reached", limits.isReached())
.detail("itemsPastBegin", itemsPastBegin)
.detail("endOffset", end.offset)
.detail("itBegin", printable(it.beginKey().toStandaloneStringRef()))
.detail("itEnd", printable(itEnd.beginKey().toStandaloneStringRef()))
.detail("unknown", it.is_unknown_range())
.detail("kv", it.is_kv())
.detail("requests", requestCount);*/
if(!result.size() && begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset) {
return RangeResultRef(false, false);
if( begin.getKey() >= ryw->getMaxReadKey() && !begin.isBackward() ) {
return RangeResultRef(readToBegin, readThroughEnd);
if( ( begin.getKey() >= end.getKey() && begin.offset >= end.offset ) ||
( end.getKey() == allKeys.begin && end.offset <= 1 ) ) {
if( begin.isFirstGreaterOrEqual() ) break;
if( !result.size() ) break;
Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
if( resolvedBegin == allKeys.begin )
readToBegin = true;
if( resolvedBegin == ryw->getMaxReadKey() )
readThroughEnd = true;
begin = firstGreaterOrEqual( resolvedBegin );
if (it.beginKey() < itEnd.beginKey() && !it.is_unreadable() && !it.is_unknown_range() && itemsPastBegin >= begin.offset - 1) {
if( begin.isFirstGreaterOrEqual() ) break;
return RangeResultRef(readToBegin, readThroughEnd);
if( limits.isReached() && itemsPastBegin >= begin.offset-1 ) break;
if( end.isFirstGreaterOrEqual() && end.getKey() == it.beginKey() ) {
if( itemsPastBegin >= begin.offset-1 && it == itEnd) break;
if (it.is_unknown_range()) {
if( limits.hasByteLimit() && result.size() && itemsPastBegin >= begin.offset-1 ) {
result.more = true;
Iter ucEnd(it);
int singleClears = 0;
int clearLimit = requestCount ? 1 << std::min(requestCount, 20) : 0;
if( it.beginKey() > itEnd.beginKey() )
singleClears = std::min(skipUncachedBack(ucEnd, itEnd, BUGGIFY ? 0 : clearLimit+100), clearLimit);
state KeySelector read_begin;
if ( ucEnd!=itEnd ) {
read_begin = firstGreaterOrEqual(ucEnd.beginKey().toStandaloneStringRef());
if( begin.offset > 1 ) additionalRows += begin.offset - 1; // extra for items past end
} else if( begin.offset > 1 ) {
read_begin = firstGreaterOrEqual( begin.getKey() );
additionalRows += begin.offset - 1;
} else {
read_begin = begin;
if( begin.offset < 1 ) {
singleClears += countUncachedBack(std::move(ucEnd), clearLimit-singleClears);
read_begin.offset -= singleClears;
additionalRows += singleClears;
state KeySelector read_end;
if (end.isFirstGreaterOrEqual()) {
end = firstGreaterOrEqual( it.endKey() < end.getKey() ? it.endKey().toStandaloneStringRef() : end.getKey() );
read_end = end;
} else if (end.offset < 1) {
read_end = firstGreaterOrEqual(end.getKey());
additionalRows += 1 - end.offset;
} else {
read_end = end;
ucEnd = it;
singleClears = countUncached(std::move(ucEnd), ryw->getMaxReadKey(), clearLimit);
read_end.offset += singleClears;
additionalRows += singleClears;
if(read_begin.getKey() > read_end.getKey()) {
state GetRangeLimits requestLimit = limits;
setRequestLimits(requestLimit, additionalRows, read_end.offset, requestCount);
ASSERT( !requestLimit.hasRowLimit() || requestLimit.rows > 0 );
ASSERT( requestLimit.hasRowLimit() || requestLimit.hasByteLimit() );
//TraceEvent("RYWIssuing", randomID).detail("begin", read_begin.toString()).detail("end", read_end.toString()).detail("bytes", requestLimit.bytes).detail("rows", requestLimit.rows).detail("limits", limits.bytes).detail("reached", limits.isReached()).detail("requestCount", requestCount).detail("singleClears", singleClears).detail("ucEnd", printable(ucEnd.beginKey().toStandaloneStringRef())).detail("minRows", requestLimit.minRows);
additionalRows = 0;
Standalone<RangeResultRef> snapshot_read = wait( ryw->tr.getRange( read_begin, read_end, requestLimit, true, true ) );
KeyRangeRef range = getKnownKeyRangeBack( snapshot_read, read_begin, read_end, ryw->arena );
//TraceEvent("RYWCacheInsert", randomID).detail("Range", printable(range)).detail("expectedSize", snapshot_read.expectedSize()).detail("rows", snapshot_read.size()).detail("results", printable(snapshot_read)).detail("more", snapshot_read.more).detail("readToBegin", snapshot_read.readToBegin).detail("readThroughEnd", snapshot_read.readThroughEnd).detail("readThrough", printable(snapshot_read.readThrough));
RangeResultRef reversed;
reversed.resize(ryw->arena, snapshot_read.size());
for( int i = 0; i < snapshot_read.size(); i++ ) {
reversed[snapshot_read.size()-i-1] = snapshot_read[i];
if( ryw->cache.insert( range, reversed ) )
// TODO: Is there a more efficient way to deal with invalidation?
resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
} else {
if (it.is_kv()) {
KeyValueRef const* end = &it.kv(ryw->arena);
it.skipContiguousBack( begin.isFirstGreaterOrEqual() ? begin.getKey() : allKeys.begin );
KeyValueRef const* start = &it.kv(ryw->arena);
int maxCount = end - start + 1;
int count = 0;
for(; count < maxCount && !limits.isReached(); count++ ) {
itemsPastBegin += maxCount - count;
//TraceEvent("RYWaddKV", randomID).detail("key", printable(it.beginKey().toStandaloneStringRef())).detail("count", count).detail("maxCount", maxCount).detail("itemsPastBegin", itemsPastBegin);
if( count ) {
int size = result.size();
for( int i = 0; i < count; i++ ) {
result[size + i] = start[maxCount-i-1];
if (it == itEnd) break;
result.more = result.more || limits.isReached();
if( begin.isFirstGreaterOrEqual() ) {
int keepItems = result.rend() - std::lower_bound( result.rbegin(), result.rend(), begin.getKey(), KeyValueRef::OrderByKey());
if( keepItems < result.size() )
result.more = false;
result.resize( result.arena(), keepItems );
result.readToBegin = !result.more && readToBegin;
result.readThroughEnd = readThroughEnd;
result.arena().dependsOn( ryw->arena );
return result;
static void triggerWatches(ReadYourWritesTransaction *ryw, KeyRangeRef range, Optional<ValueRef> val, bool valueKnown = true) {
for(auto it = ryw->watchMap.lower_bound(range.begin); it != ryw->watchMap.end() && it->key < range.end; ) {
auto itCopy = it;
ASSERT( itCopy->value.size() );
TEST( itCopy->value.size() > 1 ); //Multiple watches on the same key triggered by RYOW
for( int i = 0; i < itCopy->value.size(); i++ ) {
if(itCopy->value[i]->onChangeTrigger.isSet()) {
if( i < itCopy->value.size() - 1 )
std::swap(itCopy->value[i--], itCopy->value.back());
} else if( !valueKnown ||
(itCopy->value[i]->setPresent && (itCopy->value[i]->setValue.present() != val.present() || (val.present() && itCopy->value[i]->setValue.get() != val.get()))) ||
(itCopy->value[i]->valuePresent && (itCopy->value[i]->value.present() != val.present() || (val.present() && itCopy->value[i]->value.get() != val.get()))) ) {
if( i < itCopy->value.size() - 1 )
std::swap(itCopy->value[i--], itCopy->value.back());
} else {
itCopy->value[i]->setPresent = true;
itCopy->value[i]->setValue = val.cast_to<Value>();
if( itCopy->value.size() == 0 )
static void triggerWatches(ReadYourWritesTransaction *ryw, KeyRef key, Optional<ValueRef> val, bool valueKnown = true) {
triggerWatches(ryw, singleKeyRange(key), val, valueKnown);
ACTOR static Future<Void> watch( ReadYourWritesTransaction *ryw, Key key ) {
state Future<Optional<Value>> val;
state Future<Void> watchFuture;
state Reference<Watch> watch(new Watch(key));
state Promise<Void> done;
ryw->reading.add( done.getFuture() );
if(!ryw->options.readYourWritesDisabled) {
val = readWithConflictRange( ryw, GetValueReq(key), false );
val = ryw->tr.get(key);
try {
Void _ = wait(ryw->resetPromise.getFuture() || success(val) || watch->onChangeTrigger.getFuture());
} catch( Error &e ) {
if( watch->onChangeTrigger.getFuture().isReady() ) {
if( watch->onChangeTrigger.getFuture().isError() )
throw watch->onChangeTrigger.getFuture().getError();
return Void();
watch->valuePresent = true;
watch->value = val.get();
if( watch->setPresent && ( watch->setValue.present() != watch->value.present() || (watch->value.present() && watch->setValue.get() != watch->value.get()) ) ) {
return Void();
watchFuture = ryw->; // throws if there are too many outstanding watches
Void _ = wait(watchFuture);
return Void();
ACTOR static Future<Void> commit( ReadYourWritesTransaction *ryw ) {
try {
ryw->commitStarted = true;
Future<Void> ready = ryw->reading;
Void _ = wait( ryw->resetPromise.getFuture() || ready );
if( ryw->options.readYourWritesDisabled ) {
if (ryw->resetPromise.isSet())
throw ryw->resetPromise.getFuture().getError();
Void _ = wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
if(!ryw->tr.apiVersionAtLeast(410)) {
return Void();
ryw->writeRangeToNativeTransaction(KeyRangeRef(StringRef(), ryw->getMaxWriteKey()));
auto conflictRanges = ryw->readConflicts.ranges();
for( auto iter = conflictRanges.begin(); iter != conflictRanges.end(); ++iter ) {
if( iter->value() ) {
ryw->tr.addReadConflictRange( iter->range() );
Void _ = wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
if(!ryw->tr.apiVersionAtLeast(410)) {
return Void();
} catch( Error &e ) {
if(!ryw->tr.apiVersionAtLeast(410)) {
ryw->commitStarted = false;
if( !ryw->resetPromise.isSet() ) {
ACTOR static Future<Void> onError( ReadYourWritesTransaction *ryw, Error e ) {
try {
if ( ryw->resetPromise.isSet() ) {
throw ryw->resetPromise.getFuture().getError();
bool retry_limit_hit = ryw->options.maxRetries != -1 && ryw->retries >= ryw->options.maxRetries;
if (ryw->retries < std::numeric_limits<int>::max())
if(retry_limit_hit) {
throw e;
Void _ = wait( ryw->resetPromise.getFuture() || ryw->tr.onError(e) );
return Void();
} catch( Error &e ) {
if ( !ryw->resetPromise.isSet() ) {
if( e.code() == error_code_broken_promise )
throw transaction_cancelled();
ACTOR static Future<Version> getReadVersion(ReadYourWritesTransaction* ryw) {
when(Version v = wait(ryw->tr.getReadVersion())) {
return v;
when(Void _ = wait(ryw->resetPromise.getFuture())) {
throw internal_error();
ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr) {}
ACTOR Future<Void> timebomb(double totalSeconds, Promise<Void> resetPromise) {
if(totalSeconds == 0.0) {
Void _ = wait ( Never() );
else if (now() < totalSeconds) {
Void _ = wait ( delayUntil( totalSeconds ) );
if( !resetPromise.isSet() )
throw transaction_timed_out();
Future<Version> ReadYourWritesTransaction::getReadVersion() {
if (tr.apiVersionAtLeast(101)) {
if (resetPromise.isSet())
return resetPromise.getFuture().getError();
return RYWImpl::getReadVersion(this);
return tr.getReadVersion();
Optional<Value> getValueFromJSON(StatusObject statusObj) {
try {
Value output = StringRef(json_spirit::write_string(json_spirit::mValue(statusObj), json_spirit::Output_options::raw_utf8).c_str());
return output;
catch (std::exception& e){
TraceEvent(SevError, "UnableToUnparseStatusJSON").detail("What", e.what());
throw internal_error();
ACTOR Future<Optional<Value>> getJSON(Reference<ClusterConnectionFile> clusterFile) {
StatusObject statusObj = wait(StatusClient::statusFetcher(clusterFile));
return getValueFromJSON(statusObj);
ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces (Reference<ClusterConnectionFile> clusterFile){
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);
choose {
when( vector<ClientWorkerInterface> workers = wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().getClientWorkers.getReply( GetClientWorkersRequest() ) ) : Never() ) ) {
Standalone<RangeResultRef> result;
for(auto& it : workers) {
result.push_back_deep(result.arena(), KeyValueRef(it.address().toString(), BinaryWriter::toValue(it, IncludeVersion())));
return result;
when( Void _ = wait(clusterInterface->onChange()) ) {}
Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool snapshot ) {
if (key == LiteralStringRef("\xff\xff/status/json")){
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
return getJSON(tr.getDatabase()->cluster->getConnectionFile());
else {
return Optional<Value>();
if (key == LiteralStringRef("\xff\xff/cluster_file_path")) {
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
Optional<Value> output = StringRef(tr.getDatabase()->cluster->getConnectionFile()->getFilename());
return output;
catch (Error &e){
return e;
return Optional<Value>();
if (key == LiteralStringRef("\xff\xff/connection_string")){
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
Reference<ClusterConnectionFile> f = tr.getDatabase()->cluster->getConnectionFile();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
catch (Error &e){
return e;
return Optional<Value>();
if(checkUsedDuringCommit()) {
return used_during_commit();
if( resetPromise.isSet() )
return resetPromise.getFuture().getError();
if(key >= getMaxReadKey())
return key_outside_legal_range();
//There are no keys in the database with size greater than KEY_SIZE_LIMIT
if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
return Optional<Value>();
Future< Optional<Value> > result = RYWImpl::readWithConflictRange( this, RYWImpl::GetValueReq(key), snapshot );
reading.add( success( result ) );
return result;
Future< Key > ReadYourWritesTransaction::getKey( const KeySelector& key, bool snapshot ) {
if(checkUsedDuringCommit()) {
return used_during_commit();
if( resetPromise.isSet() )
return resetPromise.getFuture().getError();
if(key.getKey() > getMaxReadKey())
return key_outside_legal_range();
Future< Key > result = RYWImpl::readWithConflictRange(this, RYWImpl::GetKeyReq(key), snapshot);
reading.add( success( result ) );
return result;
Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
KeySelector begin,
KeySelector end,
GetRangeLimits limits,
bool snapshot,
bool reverse )
if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")){
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
return getWorkerInterfaces(tr.getDatabase()->cluster->getConnectionFile());
else {
return Standalone<RangeResultRef>();
if(checkUsedDuringCommit()) {
return used_during_commit();
if( resetPromise.isSet() )
return resetPromise.getFuture().getError();
KeyRef maxKey = getMaxReadKey();
if(begin.getKey() > maxKey || end.getKey() > maxKey)
return key_outside_legal_range();
//This optimization prevents NULL operations from being added to the conflict range
if( limits.isReached() ) {
TEST(true); // RYW range read limit 0
return Standalone<RangeResultRef>();
if( !limits.isValid() )
return range_limits_invalid();
if( begin.orEqual )
if( end.orEqual )
if( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) {
TEST(true); // RYW range inverted
return Standalone<RangeResultRef>();
Future< Standalone<RangeResultRef> > result = reverse
? RYWImpl::readWithConflictRange( this, RYWImpl::GetRangeReq<true>(begin, end, limits), snapshot )
: RYWImpl::readWithConflictRange( this, RYWImpl::GetRangeReq<false>(begin, end, limits), snapshot );
reading.add( success( result ) );
return result;
Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
const KeySelector& begin,
const KeySelector& end,
int limit,
bool snapshot,
bool reverse )
return getRange( begin, end, GetRangeLimits( limit ), snapshot, reverse );
Future< Standalone<VectorRef<const char*> >> ReadYourWritesTransaction::getAddressesForKey( const Key& key ) {
if(checkUsedDuringCommit()) {
return used_during_commit();
if( resetPromise.isSet() )
return resetPromise.getFuture().getError();
// If key >= allKeys.end, then our resulting address vector will be empty.
Future< Standalone<VectorRef<const char*> >> result = tr.getAddressesForKey(key);
reading.add( success( result ) );
return result;
void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) {
if(checkUsedDuringCommit()) {
throw used_during_commit();
if (tr.apiVersionAtLeast(300)) {
if (keys.begin > getMaxReadKey() || keys.end > getMaxReadKey()) {
throw key_outside_legal_range();
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = keys.begin;
KeyRef end = keys.end;
if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
KeyRangeRef r = KeyRangeRef(begin, end);
if(r.empty()) {
if(options.readYourWritesDisabled) {
WriteMap::iterator it( &writes );
KeyRangeRef readRange( arena, r );
it.skip( readRange.begin );
updateConflictMap(readRange, it);
void ReadYourWritesTransaction::updateConflictMap( KeyRef const& key, WriteMap::iterator& it ) {
//it.skip( key );
//ASSERT( it.beginKey() <= key && key < it.endKey() );
if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
readConflicts.insert( singleKeyRange( key, arena ), true );
void ReadYourWritesTransaction::updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ) {
//it.skip( keys.begin );
//ASSERT( it.beginKey() <= keys.begin && keys.begin < it.endKey() );
for(; it.beginKey() < keys.end; ++it ) {
if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
KeyRangeRef insert_range = KeyRangeRef( std::max( keys.begin, it.beginKey().toArenaOrRef( arena ) ), std::min( keys.end, it.endKey().toArenaOrRef( arena ) ) );
if( !insert_range.empty() )
readConflicts.insert( insert_range, true );
void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const& keys ) {
WriteMap::iterator it( &writes );
bool inClearRange = false;
ExtStringRef clearBegin;
//Clear ranges must be done first because of keys that are both cleared and set to a new value
for(; it.beginKey() < keys.end; ++it) {
if( it.is_cleared_range() && !inClearRange ) {
clearBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
inClearRange = true;
} else if( !it.is_cleared_range() && inClearRange ) {
tr.clear( KeyRangeRef( clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), false );
inClearRange = false;
if( inClearRange ) {
tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), keys.end), false);
bool inConflictRange = false;
ExtStringRef conflictBegin;
for(; it.beginKey() < keys.end; ++it) {
if( it.is_conflict_range() && !inConflictRange ) {
conflictBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
inConflictRange = true;
} else if( !it.is_conflict_range() && inConflictRange ) {
tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ) );
inConflictRange = false;
//SOMEDAY: make atomicOp take set to avoid switch
if( it.is_operation() ) {
auto op = it.op();
for( int i = 0; i < op.size(); ++i) {
switch(op[i].type) {
case MutationRef::SetValue:
tr.set( it.beginKey().assertRef(), op[i].value, false );
case MutationRef::AddValue:
case MutationRef::AppendIfFits:
case MutationRef::And:
case MutationRef::Or:
case MutationRef::Xor:
case MutationRef::Max:
case MutationRef::Min:
case MutationRef::SetVersionstampedKey:
case MutationRef::SetVersionstampedValue:
tr.atomicOp( it.beginKey().assertRef(), op[i].value, op[i].type, false );
if( inConflictRange ) {
tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), keys.end ) );
bool ReadYourWritesTransactionOptions::getAndResetWriteConflictDisabled() {
bool disabled = nextWriteDisableConflictRange;
nextWriteDisableConflictRange = false;
return disabled;
void ReadYourWritesTransaction::getWriteConflicts( KeyRangeMap<bool> *result ) {
WriteMap::iterator it( &writes );
bool inConflictRange = false;
ExtStringRef conflictBegin;
for(; it.beginKey() < getMaxWriteKey(); ++it) {
if( it.is_conflict_range() && !inConflictRange ) {
conflictBegin = it.beginKey();
inConflictRange = true;
} else if( !it.is_conflict_range() && inConflictRange ) {
result->insert( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), true );
inConflictRange = false;
if( inConflictRange ) {
result->insert( KeyRangeRef( conflictBegin.toArenaOrRef(arena), getMaxWriteKey() ), true );
void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& operand, uint32_t operationType ) {
bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
if(checkUsedDuringCommit()) {
throw used_during_commit();
if(key >= getMaxWriteKey())
throw key_outside_legal_range();
if(!isValidMutationType(operationType) || !isAtomicOp((MutationRef::Type) operationType) || operationType == MutationRef::AppendIfFits)
throw invalid_mutation_type();
if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
throw key_too_large();
if(operand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
throw value_too_large();
if(operationType == MutationRef::SetVersionstampedKey) {
KeyRangeRef range = getVersionstampKeyRange(arena, key, getMaxReadKey()); // this does validation of the key and needs to be performed before the readYourWritesDisabled path
if(!options.readYourWritesDisabled) {
if (operationType == MutationRef::SetVersionstampedValue && operand.size() < 10)
throw client_invalid_operation();
if(options.readYourWritesDisabled) {
return tr.atomicOp(key, operand, (MutationRef::Type) operationType, addWriteConflict);
KeyRef k = KeyRef( arena, key );
ValueRef v = ValueRef( arena, operand );
writes.mutate(k, (MutationRef::Type) operationType, v, addWriteConflict);
RYWImpl::triggerWatches(this, key, Optional<ValueRef>(), false);
void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) {
if (key == LiteralStringRef("\xff\xff/reboot_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest() );
if (key == LiteralStringRef("\xff\xff/reboot_and_check_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest(false, true) );
bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
if(checkUsedDuringCommit()) {
throw used_during_commit();
if(key >= getMaxWriteKey())
throw key_outside_legal_range();
if(options.readYourWritesDisabled ) {
return tr.set(key, value, addWriteConflict);
//TODO: check transaction size here
if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
throw key_too_large();
throw value_too_large();
KeyRef k = KeyRef( arena, key );
ValueRef v = ValueRef( arena, value );
writes.mutate(k, MutationRef::SetValue, v, addWriteConflict);
RYWImpl::triggerWatches(this, key, value);
void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
if(checkUsedDuringCommit()) {
throw used_during_commit();
KeyRef maxKey = getMaxWriteKey();
if(range.begin > maxKey || range.end > maxKey)
throw key_outside_legal_range();
if( options.readYourWritesDisabled ) {
return tr.clear(range, addWriteConflict);
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = range.begin;
KeyRef end = range.end;
if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
KeyRangeRef r = KeyRangeRef(begin, end);
if(r.empty()) {
r = KeyRangeRef( arena, r );
writes.clear(r, addWriteConflict);
RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
void ReadYourWritesTransaction::clear( const KeyRef& key ) {
bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
if(checkUsedDuringCommit()) {
throw used_during_commit();
if(key >= getMaxWriteKey())
throw key_outside_legal_range();
if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
if( options.readYourWritesDisabled ) {
return tr.clear(key, addWriteConflict);
KeyRangeRef r = singleKeyRange( key, arena );
//SOMEDAY: add an optimized single key clear to write map
writes.clear(r, addWriteConflict);
RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
Future<Void> ReadYourWritesTransaction::watch(const Key& key) {
if(checkUsedDuringCommit()) {
return used_during_commit();
if( resetPromise.isSet() )
return resetPromise.getFuture().getError();
if( options.readYourWritesDisabled )
return watches_disabled();
if(key >= allKeys.end || (key >= getMaxReadKey() && tr.apiVersionAtLeast(300)))
return key_outside_legal_range();
if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
return key_too_large();
return RYWImpl::watch(this, key);
void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) {
if(checkUsedDuringCommit()) {
throw used_during_commit();
if (tr.apiVersionAtLeast(300)) {
if (keys.begin > getMaxWriteKey() || keys.end > getMaxWriteKey()) {
throw key_outside_legal_range();
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = keys.begin;
KeyRef end = keys.end;
if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
KeyRangeRef r = KeyRangeRef(begin, end);
if(r.empty()) {
if(options.readYourWritesDisabled) {
r = KeyRangeRef( arena, r );
Future<Void> ReadYourWritesTransaction::commit() {
if(checkUsedDuringCommit()) {
return used_during_commit();
if( resetPromise.isSet() )
return resetPromise.getFuture().getError();
return RYWImpl::commit( this );
void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
switch(option) {
case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
validateOptionValue(value, false);
if (!reading.isReady() || !cache.empty() || !writes.empty())
throw client_invalid_operation();
options.readYourWritesDisabled = true;
case FDBTransactionOptions::READ_AHEAD_DISABLE:
validateOptionValue(value, false);
options.readAheadDisabled = true;
validateOptionValue(value, false);
options.nextWriteDisableConflictRange = true;
case FDBTransactionOptions::ACCESS_SYSTEM_KEYS:
validateOptionValue(value, false);
options.readSystemKeys = true;
options.writeSystemKeys = true;
case FDBTransactionOptions::READ_SYSTEM_KEYS:
validateOptionValue(value, false);
options.readSystemKeys = true;
case FDBTransactionOptions::TIMEOUT:
options.timeoutInSeconds = extractIntOption(value, 0, std::numeric_limits<int>::max())/1000.0;
timeoutActor = timebomb(options.timeoutInSeconds == 0.0 ? options.timeoutInSeconds : options.timeoutInSeconds + creationTime, resetPromise);
case FDBTransactionOptions::RETRY_LIMIT:
options.maxRetries = (int)extractIntOption(value, -1, std::numeric_limits<int>::max());
case FDBTransactionOptions::DEBUG_RETRY_LOGGING:
options.debugRetryLogging = true;
if(!transactionDebugInfo) {
transactionDebugInfo = Reference<TransactionDebugInfo>::addRef(new TransactionDebugInfo());
transactionDebugInfo->lastRetryLogTime = creationTime;
transactionDebugInfo->transactionName = value.present() ? value.get().toString() : "";
case FDBTransactionOptions::SNAPSHOT_RYW_ENABLE:
validateOptionValue(value, false);
case FDBTransactionOptions::SNAPSHOT_RYW_DISABLE:
validateOptionValue(value, false);
validateOptionValue(value, false);
options.disableUsedDuringCommitProtection = true;
tr.setOption( option, value );
void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) noexcept(true) {
cache = std::move( r.cache );
writes = std::move( r.writes );
arena = std::move( r.arena );
tr = std::move( );
readConflicts = std::move( r.readConflicts );
watchMap = std::move( r.watchMap );
reading = std::move( r.reading );
resetPromise = std::move( r.resetPromise );
r.resetPromise = Promise<Void>();
deferred_error = std::move( r.deferred_error );
retries = r.retries;
timeoutActor = r.timeoutActor;
creationTime = r.creationTime;
commitStarted = r.commitStarted;
options = r.options;
transactionDebugInfo = r.transactionDebugInfo;
cache.arena = &arena;
writes.arena = &arena;
ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) noexcept(true) :
cache( std::move(r.cache) ),
writes( std::move(r.writes) ),
arena( std::move(r.arena) ),
reading( std::move(r.reading) ),
retries( r.retries ),
creationTime( r.creationTime ),
deferred_error( std::move(r.deferred_error) ),
timeoutActor( std::move(r.timeoutActor) ),
resetPromise( std::move(r.resetPromise) ),
commitStarted( r.commitStarted ),
options( r.options ),
transactionDebugInfo( r.transactionDebugInfo )
cache.arena = &arena;
writes.arena = &arena;
tr = std::move( );
readConflicts = std::move(r.readConflicts);
watchMap = std::move( r.watchMap );
r.resetPromise = Promise<Void>();
Future<Void> ReadYourWritesTransaction::onError(Error const& e) {
return RYWImpl::onError( this, e );
void ReadYourWritesTransaction::resetRyow() {
Promise<Void> oldReset = resetPromise;
resetPromise = Promise<Void>();
arena = Arena();
cache = SnapshotCache(&arena);
writes = WriteMap(&arena);
readConflicts = CoalescedKeyRefRangeMap<bool>();
reading = AndFuture();
commitStarted = false;
deferred_error = Error();
if(tr.apiVersionAtLeast(16)) {
if ( !oldReset.isSet() )
void ReadYourWritesTransaction::cancel() {
if(!resetPromise.isSet() )
void ReadYourWritesTransaction::reset() {
retries = 0;
creationTime = now();
KeyRef ReadYourWritesTransaction::getMaxReadKey() {
return systemKeys.end;
return normalKeys.end;
KeyRef ReadYourWritesTransaction::getMaxWriteKey() {
return systemKeys.end;
return normalKeys.end;
ReadYourWritesTransaction::~ReadYourWritesTransaction() {
if( !resetPromise.isSet() )
bool ReadYourWritesTransaction::checkUsedDuringCommit() {
if(commitStarted && !resetPromise.isSet() && !options.disableUsedDuringCommitProtection) {
return commitStarted;
void ReadYourWritesTransaction::debugLogRetries(Optional<Error> error) {
bool committed = !error.present();
if(options.debugRetryLogging) {
double timeSinceLastLog = now() - transactionDebugInfo->lastRetryLogTime;
double elapsed = now() - creationTime;
if(timeSinceLastLog >= 1 || (committed && elapsed > 1)) {
std::string transactionNameStr = "";
transactionNameStr = format(" in transaction '%s'", printable(StringRef(transactionDebugInfo->transactionName)).c_str());
if(!g_network->isSimulated()) //Fuzz workload turns this on, but we do not want stderr output in simulation
fprintf(stderr, "fdb WARNING: long transaction (%.2fs elapsed%s, %d retries, %s)\n", elapsed, transactionNameStr.c_str(), retries, committed ? "committed" : error.get().what());
TraceEvent trace = TraceEvent("LongTransaction");
trace.detail("TransactionName", printable(StringRef(transactionDebugInfo->transactionName)));
trace.detail("Elapsed", elapsed).detail("Retries", retries).detail("Committed", committed);
trace.error(error.get(), true);
transactionDebugInfo->lastRetryLogTime = now();