2017-05-26 04:48:44 +08:00
/*
* ReadYourWrites . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2018-10-20 01:30:13 +08:00
# include "fdbclient/ReadYourWrites.h"
# include "fdbclient/Atomic.h"
# include "fdbclient/DatabaseContext.h"
# include "fdbclient/StatusClient.h"
# include "fdbclient/MonitorLeader.h"
2018-08-02 09:09:54 +08:00
# include "flow/Util.h"
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
class RYWImpl {
public :
template < class Iter > static void dump ( Iter it ) {
it . skip ( allKeys . begin ) ;
Arena arena ;
while ( true ) {
2019-01-31 17:23:32 +08:00
Optional < StringRef > key = StringRef ( ) ;
if ( it . is_kv ( ) ) {
auto kv = it . kv ( arena ) ;
if ( kv ) key = kv - > key ;
}
TraceEvent ( " RYWDump " )
2019-03-19 06:03:43 +08:00
. detail ( " Begin " , it . beginKey ( ) . toStandaloneStringRef ( ) )
. detail ( " End " , it . endKey ( ) . toStandaloneStringRef ( ) )
2019-01-31 17:23:32 +08:00
. detail ( " Unknown " , it . is_unknown_range ( ) )
. detail ( " Empty " , it . is_empty_range ( ) )
. detail ( " KV " , it . is_kv ( ) )
2019-03-19 06:03:43 +08:00
. detail ( " Key " , key . get ( ) ) ;
2017-05-26 04:48:44 +08:00
if ( it . endKey ( ) = = allKeys . end )
break ;
+ + it ;
}
}
struct GetValueReq {
explicit GetValueReq ( Key key ) : key ( key ) { }
Key key ;
typedef Optional < Value > Result ;
} ;
struct GetKeyReq {
explicit GetKeyReq ( KeySelector key ) : key ( key ) { }
KeySelector key ;
typedef Key Result ;
} ;
template < bool Reverse >
struct GetRangeReq {
GetRangeReq ( KeySelector begin , KeySelector end , GetRangeLimits limits ) : begin ( begin ) , end ( end ) , limits ( limits ) { }
KeySelector begin , end ;
GetRangeLimits limits ;
typedef Standalone < RangeResultRef > Result ;
} ;
// read() Performs a read (get, getKey, getRange, etc), in the context of the given transaction. Snapshot or RYW reads are distingushed by the type Iter being SnapshotCache::iterator or RYWIterator.
// Fills in the snapshot cache as a side effect but does not affect conflict ranges.
// Some (indicated) overloads of read are required to update the given *it to point to the key that was read, so that the corresponding overload of addConflictRange() can make use of it.
ACTOR template < class Iter > static Future < Optional < Value > > read ( ReadYourWritesTransaction * ryw , GetValueReq read , Iter * it ) {
// This overload is required to provide postcondition: it->extractWriteMapIterator().segmentContains(read.key)
it - > skip ( read . key ) ;
state bool dependent = it - > is_dependent ( ) ;
if ( it - > is_kv ( ) ) {
2019-01-31 17:23:32 +08:00
const KeyValueRef * result = it - > kv ( ryw - > arena ) ;
if ( result ! = nullptr ) {
return result - > value ;
} else {
return Optional < Value > ( ) ;
}
2017-05-26 04:48:44 +08:00
} else if ( it - > is_empty_range ( ) ) {
return Optional < Value > ( ) ;
} else {
Optional < Value > res = wait ( ryw - > tr . get ( read . key , true ) ) ;
KeyRef k ( ryw - > arena , read . key ) ;
2019-01-31 17:23:32 +08:00
2017-05-26 04:48:44 +08:00
if ( res . present ( ) ) {
if ( ryw - > cache . insert ( k , res . get ( ) ) )
ryw - > arena . dependsOn ( res . get ( ) . arena ( ) ) ;
if ( ! dependent )
return res ;
} else {
ryw - > cache . insert ( k , Optional < ValueRef > ( ) ) ;
if ( ! dependent )
return Optional < Value > ( ) ;
}
//There was a dependent write at the key, so we need to lookup the iterator again
it - > skip ( k ) ;
ASSERT ( it - > is_kv ( ) ) ;
2019-01-31 17:23:32 +08:00
const KeyValueRef * result = it - > kv ( ryw - > arena ) ;
if ( result ! = nullptr ) {
return result - > value ;
} else {
return Optional < Value > ( ) ;
}
2017-05-26 04:48:44 +08:00
}
}
ACTOR template < class Iter > static Future < Key > read ( ReadYourWritesTransaction * ryw , GetKeyReq read , Iter * it ) {
if ( read . key . offset > 0 ) {
Standalone < RangeResultRef > result = wait ( getRangeValue ( ryw , read . key , firstGreaterOrEqual ( ryw - > getMaxReadKey ( ) ) , GetRangeLimits ( 1 ) , it ) ) ;
if ( result . readToBegin )
return allKeys . begin ;
if ( result . readThroughEnd | | ! result . size ( ) )
return ryw - > getMaxReadKey ( ) ;
return result [ 0 ] . key ;
} else {
read . key . offset + + ;
Standalone < RangeResultRef > result = wait ( getRangeValueBack ( ryw , firstGreaterOrEqual ( allKeys . begin ) , read . key , GetRangeLimits ( 1 ) , it ) ) ;
if ( result . readThroughEnd )
return ryw - > getMaxReadKey ( ) ;
if ( result . readToBegin | | ! result . size ( ) )
return allKeys . begin ;
return result [ 0 ] . key ;
}
} ;
template < class Iter > static Future < Standalone < RangeResultRef > > read ( ReadYourWritesTransaction * ryw , GetRangeReq < false > read , Iter * it ) {
return getRangeValue ( ryw , read . begin , read . end , read . limits , it ) ;
} ;
template < class Iter > static Future < Standalone < RangeResultRef > > read ( ReadYourWritesTransaction * ryw , GetRangeReq < true > read , Iter * it ) {
return getRangeValueBack ( ryw , read . begin , read . end , read . limits , it ) ;
} ;
// readThrough() performs a read in the RYW disabled case, passing it on relatively directly to the underlying transaction.
// Responsible for clipping results to the non-system keyspace when appropriate, since NativeAPI doesn't do that.
static Future < Optional < Value > > readThrough ( ReadYourWritesTransaction * ryw , GetValueReq read , bool snapshot ) {
return ryw - > tr . get ( read . key , snapshot ) ;
}
ACTOR static Future < Key > readThrough ( ReadYourWritesTransaction * ryw , GetKeyReq read , bool snapshot ) {
Key key = wait ( ryw - > tr . getKey ( read . key , snapshot ) ) ;
if ( ryw - > getMaxReadKey ( ) < key ) return ryw - > getMaxReadKey ( ) ; // Filter out results in the system keys if they are not accessible
return key ;
}
ACTOR template < bool Reverse > static Future < Standalone < RangeResultRef > > readThrough ( ReadYourWritesTransaction * ryw , GetRangeReq < Reverse > read , bool snapshot ) {
if ( Reverse & & read . end . offset > 1 ) {
// FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result actually does.
Key key = wait ( ryw - > tr . getKey ( read . end , snapshot ) ) ;
if ( key > ryw - > getMaxReadKey ( ) )
read . end = firstGreaterOrEqual ( ryw - > getMaxReadKey ( ) ) ;
else
2017-10-21 00:46:35 +08:00
read . end = KeySelector ( firstGreaterOrEqual ( key ) , key . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
}
Standalone < RangeResultRef > v = wait ( ryw - > tr . getRange ( read . begin , read . end , read . limits , snapshot , Reverse ) ) ;
KeyRef maxKey = ryw - > getMaxReadKey ( ) ;
if ( v . size ( ) > 0 ) {
if ( ! Reverse & & v [ v . size ( ) - 1 ] . key > = maxKey ) {
state Standalone < RangeResultRef > _v = v ;
int i = _v . size ( ) - 2 ;
for ( ; i > = 0 & & _v [ i ] . key > = maxKey ; - - i ) { }
return Standalone < RangeResultRef > ( RangeResultRef ( VectorRef < KeyValueRef > ( & _v [ 0 ] , i + 1 ) , false ) , _v . arena ( ) ) ;
}
}
return v ;
}
// addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant conflict range
static void addConflictRange ( ReadYourWritesTransaction * ryw , GetValueReq read , WriteMap : : iterator & it , Optional < Value > result ) {
// it will already point to the right segment (see the calling code in read()), so we don't need to skip
// read.key will be copied into ryw->arena inside of updateConflictMap if it is being added
ryw - > updateConflictMap ( read . key , it ) ;
}
static void addConflictRange ( ReadYourWritesTransaction * ryw , GetKeyReq read , WriteMap : : iterator & it , Key result ) {
KeyRangeRef readRange ;
if ( read . key . offset < = 0 )
readRange = KeyRangeRef ( KeyRef ( ryw - > arena , result ) , read . key . orEqual ? keyAfter ( read . key . getKey ( ) , ryw - > arena ) : KeyRef ( ryw - > arena , read . key . getKey ( ) ) ) ;
else
readRange = KeyRangeRef ( read . key . orEqual ? keyAfter ( read . key . getKey ( ) , ryw - > arena ) : KeyRef ( ryw - > arena , read . key . getKey ( ) ) , keyAfter ( result , ryw - > arena ) ) ;
it . skip ( readRange . begin ) ;
ryw - > updateConflictMap ( readRange , it ) ;
}
static void addConflictRange ( ReadYourWritesTransaction * ryw , GetRangeReq < false > read , WriteMap : : iterator & it , Standalone < RangeResultRef > const & result ) {
KeyRef rangeBegin , rangeEnd ;
bool endInArena = false ;
if ( read . begin . getKey ( ) < read . end . getKey ( ) ) {
rangeBegin = read . begin . getKey ( ) ;
rangeEnd = read . end . offset > 0 & & result . more ? read . begin . getKey ( ) : read . end . getKey ( ) ;
}
else {
rangeBegin = read . end . getKey ( ) ;
rangeEnd = read . begin . getKey ( ) ;
}
if ( result . readToBegin & & read . begin . offset < = 0 ) rangeBegin = allKeys . begin ;
if ( result . readThroughEnd & & read . end . offset > 0 ) rangeEnd = ryw - > getMaxReadKey ( ) ;
if ( result . size ( ) ) {
if ( read . begin . offset < = 0 ) rangeBegin = std : : min ( rangeBegin , result [ 0 ] . key ) ;
if ( rangeEnd < = result . end ( ) [ - 1 ] . key ) {
rangeEnd = keyAfter ( result . end ( ) [ - 1 ] . key , ryw - > arena ) ;
endInArena = true ;
}
}
KeyRangeRef readRange = KeyRangeRef ( KeyRef ( ryw - > arena , rangeBegin ) , endInArena ? rangeEnd : KeyRef ( ryw - > arena , rangeEnd ) ) ;
it . skip ( readRange . begin ) ;
ryw - > updateConflictMap ( readRange , it ) ;
}
static void addConflictRange ( ReadYourWritesTransaction * ryw , GetRangeReq < true > read , WriteMap : : iterator & it , Standalone < RangeResultRef > const & result ) {
KeyRef rangeBegin , rangeEnd ;
bool endInArena = false ;
if ( read . begin . getKey ( ) < read . end . getKey ( ) ) {
rangeBegin = read . begin . offset < = 0 & & result . more ? read . end . getKey ( ) : read . begin . getKey ( ) ;
rangeEnd = read . end . getKey ( ) ;
}
else {
rangeBegin = read . end . getKey ( ) ;
rangeEnd = read . begin . getKey ( ) ;
}
if ( result . readToBegin & & read . begin . offset < = 0 ) rangeBegin = allKeys . begin ;
if ( result . readThroughEnd & & read . end . offset > 0 ) rangeEnd = ryw - > getMaxReadKey ( ) ;
if ( result . size ( ) ) {
rangeBegin = std : : min ( rangeBegin , result . end ( ) [ - 1 ] . key ) ;
if ( read . end . offset > 0 & & rangeEnd < = result [ 0 ] . key ) {
rangeEnd = keyAfter ( result [ 0 ] . key , ryw - > arena ) ;
endInArena = true ;
}
}
KeyRangeRef readRange = KeyRangeRef ( KeyRef ( ryw - > arena , rangeBegin ) , endInArena ? rangeEnd : KeyRef ( ryw - > arena , rangeEnd ) ) ;
it . skip ( readRange . begin ) ;
ryw - > updateConflictMap ( readRange , it ) ;
}
ACTOR template < class Req > static Future < typename Req : : Result > readWithConflictRangeThrough ( ReadYourWritesTransaction * ryw , Req req , bool snapshot ) {
choose {
when ( typename Req : : Result result = wait ( readThrough ( ryw , req , snapshot ) ) ) {
return result ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( ryw - > resetPromise . getFuture ( ) ) ) { throw internal_error ( ) ; }
2017-05-26 04:48:44 +08:00
}
}
ACTOR template < class Req > static Future < typename Req : : Result > readWithConflictRangeSnapshot ( ReadYourWritesTransaction * ryw , Req req ) {
state SnapshotCache : : iterator it ( & ryw - > cache , & ryw - > writes ) ;
choose {
when ( typename Req : : Result result = wait ( read ( ryw , req , & it ) ) ) {
return result ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( ryw - > resetPromise . getFuture ( ) ) ) { throw internal_error ( ) ; }
2017-05-26 04:48:44 +08:00
}
}
ACTOR template < class Req > static Future < typename Req : : Result > readWithConflictRangeRYW ( ReadYourWritesTransaction * ryw , Req req , bool snapshot ) {
state RYWIterator it ( & ryw - > cache , & ryw - > writes ) ;
choose {
when ( typename Req : : Result result = wait ( read ( ryw , req , & it ) ) ) {
// Some overloads of addConflictRange() require it to point to the "right" key and others don't. The corresponding overloads of read() have to provide that guarantee!
if ( ! snapshot )
addConflictRange ( ryw , req , it . extractWriteMapIterator ( ) , result ) ;
return result ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( ryw - > resetPromise . getFuture ( ) ) ) { throw internal_error ( ) ; }
2017-05-26 04:48:44 +08:00
}
}
template < class Req > static inline Future < typename Req : : Result > readWithConflictRange ( ReadYourWritesTransaction * ryw , Req const & req , bool snapshot ) {
if ( ryw - > options . readYourWritesDisabled ) {
return readWithConflictRangeThrough ( ryw , req , snapshot ) ;
} else if ( snapshot & & ryw - > options . snapshotRywEnabled < = 0 ) {
return readWithConflictRangeSnapshot ( ryw , req ) ;
}
return readWithConflictRangeRYW ( ryw , req , snapshot ) ;
}
template < class Iter > static void resolveKeySelectorFromCache ( KeySelector & key , Iter & it , KeyRef const & maxKey , bool * readToBegin , bool * readThroughEnd , int * actualOffset ) {
// If the key indicated by `key` can be determined without reading unknown data from the snapshot, then it.kv().key is the resolved key.
// If the indicated key is determined to be "off the beginning or end" of the database, it points to the first or last segment in the DB,
// and key is an equivalent key selector relative to the beginning or end of the database.
// Otherwise it points to an unknown segment, and key is an equivalent key selector whose base key is in or adjoining the segment.
key . removeOrEqual ( key . arena ( ) ) ;
bool alreadyExhausted = key . offset = = 1 ;
it . skip ( key . getKey ( ) ) ; // TODO: or precondition?
if ( key . offset < = 0 & & it . beginKey ( ) = = key . getKey ( ) & & key . getKey ( ) ! = allKeys . begin )
- - it ;
2017-09-26 06:52:45 +08:00
2017-05-26 04:48:44 +08:00
ExtStringRef keykey = key . getKey ( ) ;
2017-09-26 06:52:45 +08:00
bool keyNeedsCopy = false ;
2017-05-26 04:48:44 +08:00
// Invariant: it.beginKey() <= keykey && keykey <= it.endKey() && (key.isBackward() ? it.beginKey() != keykey : it.endKey() != keykey)
// Maintaining this invariant, we transform the key selector toward firstGreaterOrEqual form until we reach an unknown range or the result
while ( key . offset > 1 & & ! it . is_unreadable ( ) & & ! it . is_unknown_range ( ) & & it . endKey ( ) < maxKey ) {
if ( it . is_kv ( ) )
- - key . offset ;
+ + it ;
keykey = it . beginKey ( ) ;
2017-09-26 06:52:45 +08:00
keyNeedsCopy = true ;
2017-05-26 04:48:44 +08:00
}
while ( key . offset < 1 & & ! it . is_unreadable ( ) & & ! it . is_unknown_range ( ) & & it . beginKey ( ) ! = allKeys . begin ) {
if ( it . is_kv ( ) ) {
+ + key . offset ;
if ( key . offset = = 1 ) {
keykey = it . beginKey ( ) ;
2017-09-26 06:52:45 +08:00
keyNeedsCopy = true ;
2017-05-26 04:48:44 +08:00
break ;
}
}
- - it ;
keykey = it . endKey ( ) ;
2017-09-26 06:52:45 +08:00
keyNeedsCopy = true ;
2017-05-26 04:48:44 +08:00
}
if ( ! alreadyExhausted ) {
* actualOffset = key . offset ;
}
if ( ! it . is_unreadable ( ) & & ! it . is_unknown_range ( ) & & key . offset < 1 ) {
* readToBegin = true ;
key . setKey ( allKeys . begin ) ;
key . offset = 1 ;
return ;
}
if ( ! it . is_unreadable ( ) & & ! it . is_unknown_range ( ) & & key . offset > 1 ) {
* readThroughEnd = true ;
2017-09-26 06:52:45 +08:00
key . setKey ( maxKey ) ; // maxKey is a KeyRef, but points to a LiteralStringRef. TODO: how can we ASSERT this?
2017-05-26 04:48:44 +08:00
key . offset = 1 ;
return ;
}
while ( ! it . is_unreadable ( ) & & it . is_empty_range ( ) & & it . endKey ( ) < maxKey ) {
+ + it ;
keykey = it . beginKey ( ) ;
2017-09-26 06:52:45 +08:00
keyNeedsCopy = true ;
2017-05-26 04:48:44 +08:00
}
2017-09-26 06:52:45 +08:00
if ( keyNeedsCopy ) {
key . setKey ( keykey . toArena ( key . arena ( ) ) ) ;
}
2017-05-26 04:48:44 +08:00
}
static KeyRangeRef getKnownKeyRange ( RangeResultRef data , KeySelector begin , KeySelector end , Arena & arena ) {
StringRef beginKey = begin . offset < = 1 ? begin . getKey ( ) : allKeys . end ;
ExtStringRef endKey = ! data . more & & end . offset > = 1 ? end . getKey ( ) : allKeys . begin ;
if ( data . readToBegin ) beginKey = allKeys . begin ;
if ( data . readThroughEnd ) endKey = allKeys . end ;
if ( data . size ( ) ) {
beginKey = std : : min ( beginKey , data [ 0 ] . key ) ;
2017-11-03 01:51:30 +08:00
if ( data . readThrough . present ( ) ) {
2017-05-26 04:48:44 +08:00
endKey = std : : max < ExtStringRef > ( endKey , data . readThrough . get ( ) ) ;
2017-11-03 01:51:30 +08:00
}
else {
endKey = ! data . more & & data . end ( ) [ - 1 ] . key < endKey ? endKey : ExtStringRef ( data . end ( ) [ - 1 ] . key , 1 ) ;
}
2017-05-26 04:48:44 +08:00
}
if ( beginKey > = endKey ) return KeyRangeRef ( ) ;
return KeyRangeRef ( StringRef ( arena , beginKey ) , endKey . toArena ( arena ) ) ;
}
// Pre: it points to an unknown range
// Increments it to point to the unknown range just before the next nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
template < class Iter > static int skipUncached ( Iter & it , Iter const & end , int iterationLimit ) {
ExtStringRef b = it . beginKey ( ) ;
ExtStringRef e = it . endKey ( ) ;
int singleEmpty = 0 ;
ASSERT ( ! it . is_unreadable ( ) & & it . is_unknown_range ( ) ) ;
// b is the beginning of the most recent contiguous *empty* range
// e is it.endKey()
while ( it ! = end & & - - iterationLimit > = 0 ) {
if ( it . is_unreadable ( ) | | it . is_empty_range ( ) ) {
if ( it . is_unreadable ( ) | | ! e . isKeyAfter ( b ) ) { //Assumes no degenerate ranges
while ( it . is_unreadable ( ) | | ! it . is_unknown_range ( ) )
- - it ;
return singleEmpty ;
}
singleEmpty + + ;
} else
b = e ;
+ + it ;
e = it . endKey ( ) ;
}
while ( it . is_unreadable ( ) | | ! it . is_unknown_range ( ) )
- - it ;
return singleEmpty ;
}
// Pre: it points to an unknown range
// Returns the number of following empty single-key known ranges between it and the next nontrivial known range, but no more than maxClears
// Leaves `it` in an indeterminate state
template < class Iter > static int countUncached ( Iter & & it , KeyRef maxKey , int maxClears ) {
if ( maxClears < = 0 ) return 0 ;
ExtStringRef b = it . beginKey ( ) ;
ExtStringRef e = it . endKey ( ) ;
int singleEmpty = 0 ;
while ( e < maxKey ) {
if ( it . is_unreadable ( ) | | it . is_empty_range ( ) ) {
if ( it . is_unreadable ( ) | | ! e . isKeyAfter ( b ) ) { //Assumes no degenerate ranges
return singleEmpty ;
}
singleEmpty + + ;
if ( singleEmpty > = maxClears )
return maxClears ;
} else
b = e ;
+ + it ;
e = it . endKey ( ) ;
}
return singleEmpty ;
}
static void setRequestLimits ( GetRangeLimits & requestLimit , int64_t additionalRows , int offset , int requestCount ) {
requestLimit . minRows = ( int ) std : : min ( std : : max ( 1 + additionalRows , ( int64_t ) offset ) , ( int64_t ) std : : numeric_limits < int > : : max ( ) ) ;
if ( requestLimit . hasRowLimit ( ) ) {
requestLimit . rows = ( int ) std : : min ( std : : max ( std : : max ( 1 , requestLimit . rows ) + additionalRows , ( int64_t ) offset ) , ( int64_t ) std : : numeric_limits < int > : : max ( ) ) ;
}
// Calculating request byte limit
if ( requestLimit . bytes = = 0 ) {
requestLimit . bytes = CLIENT_KNOBS - > BYTE_LIMIT_UNLIMITED ;
if ( ! requestLimit . hasRowLimit ( ) ) {
requestLimit . rows = ( int ) std : : min ( std : : max ( std : : max ( 1 , requestLimit . rows ) + additionalRows , ( int64_t ) offset ) , ( int64_t ) std : : numeric_limits < int > : : max ( ) ) ;
}
}
else if ( requestLimit . hasByteLimit ( ) ) {
requestLimit . bytes = std : : min ( int64_t ( requestLimit . bytes ) < < std : : min ( requestCount , 20 ) , ( int64_t ) CLIENT_KNOBS - > REPLY_BYTE_LIMIT ) ;
}
}
//TODO: read to begin, read through end flags for result
ACTOR template < class Iter > static Future < Standalone < RangeResultRef > > getRangeValue ( ReadYourWritesTransaction * ryw , KeySelector begin , KeySelector end , GetRangeLimits limits , Iter * pit ) {
state Iter & it ( * pit ) ;
state Iter itEnd ( * pit ) ;
state Standalone < RangeResultRef > result ;
state int64_t additionalRows = 0 ;
state int itemsPastEnd = 0 ;
state int requestCount = 0 ;
state bool readToBegin = false ;
state bool readThroughEnd = false ;
state int actualBeginOffset = begin . offset ;
state int actualEndOffset = end . offset ;
//state UID randomID = g_nondeterministic_random->randomUniqueID();
resolveKeySelectorFromCache ( begin , it , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualBeginOffset ) ;
resolveKeySelectorFromCache ( end , itEnd , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualEndOffset ) ;
2017-11-03 01:51:30 +08:00
if ( actualBeginOffset > = actualEndOffset & & begin . getKey ( ) > = end . getKey ( ) ) {
2017-05-26 04:48:44 +08:00
return RangeResultRef ( false , false ) ;
}
else if ( ( begin . isFirstGreaterOrEqual ( ) & & begin . getKey ( ) = = ryw - > getMaxReadKey ( ) )
| | ( end . isFirstGreaterOrEqual ( ) & & end . getKey ( ) = = allKeys . begin ) )
{
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
if ( ! end . isFirstGreaterOrEqual ( ) & & begin . getKey ( ) > end . getKey ( ) ) {
Key resolvedEnd = wait ( read ( ryw , GetKeyReq ( end ) , pit ) ) ;
if ( resolvedEnd = = allKeys . begin )
readToBegin = true ;
if ( resolvedEnd = = ryw - > getMaxReadKey ( ) )
readThroughEnd = true ;
if ( begin . getKey ( ) > = resolvedEnd & & ! begin . isBackward ( ) ) {
return RangeResultRef ( false , false ) ;
}
else if ( resolvedEnd = = allKeys . begin ) {
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
resolveKeySelectorFromCache ( begin , it , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualBeginOffset ) ;
resolveKeySelectorFromCache ( end , itEnd , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualEndOffset ) ;
}
//TraceEvent("RYWSelectorsStartForward", randomID).detail("ByteLimit", limits.bytes).detail("RowLimit", limits.rows);
loop {
2018-06-09 02:11:08 +08:00
/*TraceEvent("RYWSelectors", randomID).detail("Begin", begin.toString())
. detail ( " End " , end . toString ( ) )
. detail ( " Reached " , limits . isReached ( ) )
. detail ( " ItemsPastEnd " , itemsPastEnd )
. detail ( " EndOffset " , - end . offset )
2019-03-19 06:03:43 +08:00
. detail ( " ItBegin " , it . beginKey ( ) . toStandaloneStringRef ( ) )
. detail ( " ItEnd " , itEnd . beginKey ( ) . toStandaloneStringRef ( ) )
2018-06-09 02:11:08 +08:00
. detail ( " Unknown " , it . is_unknown_range ( ) )
. detail ( " Requests " , requestCount ) ; */
2017-05-26 04:48:44 +08:00
2017-11-03 01:51:30 +08:00
if ( ! result . size ( ) & & actualBeginOffset > = actualEndOffset & & begin . getKey ( ) > = end . getKey ( ) ) {
2017-05-26 04:48:44 +08:00
return RangeResultRef ( false , false ) ;
}
2017-11-03 01:51:30 +08:00
if ( end . offset < = 1 & & end . getKey ( ) = = allKeys . begin ) {
2017-05-26 04:48:44 +08:00
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
2017-11-03 01:51:30 +08:00
if ( ( begin . offset > = end . offset & & begin . getKey ( ) > = end . getKey ( ) ) | |
( begin . offset > = 1 & & begin . getKey ( ) > = ryw - > getMaxReadKey ( ) ) ) {
2017-05-26 04:48:44 +08:00
if ( end . isFirstGreaterOrEqual ( ) ) break ;
if ( ! result . size ( ) ) break ;
Key resolvedEnd = wait ( read ( ryw , GetKeyReq ( end ) , pit ) ) ; //do not worry about iterator invalidation, because we are breaking for the loop
if ( resolvedEnd = = allKeys . begin )
readToBegin = true ;
if ( resolvedEnd = = ryw - > getMaxReadKey ( ) )
readThroughEnd = true ;
end = firstGreaterOrEqual ( resolvedEnd ) ;
break ;
}
2017-11-03 01:51:30 +08:00
if ( ! it . is_unreadable ( ) & & ! it . is_unknown_range ( ) & & it . beginKey ( ) > itEnd . beginKey ( ) ) {
2017-05-26 04:48:44 +08:00
if ( end . isFirstGreaterOrEqual ( ) ) break ;
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
if ( limits . isReached ( ) & & itemsPastEnd > = 1 - end . offset ) break ;
if ( it = = itEnd & & ( ( ! it . is_unreadable ( ) & & ! it . is_unknown_range ( ) ) | | ( begin . offset > 0 & & end . isFirstGreaterOrEqual ( ) & & end . getKey ( ) = = it . beginKey ( ) ) ) ) break ;
if ( it . is_unknown_range ( ) ) {
if ( limits . hasByteLimit ( ) & & result . size ( ) & & itemsPastEnd > = 1 - end . offset ) {
result . more = true ;
break ;
}
Iter ucEnd ( it ) ;
int singleClears = 0 ;
int clearLimit = requestCount ? 1 < < std : : min ( requestCount , 20 ) : 0 ;
if ( it . beginKey ( ) < itEnd . beginKey ( ) )
singleClears = std : : min ( skipUncached ( ucEnd , itEnd , BUGGIFY ? 0 : clearLimit + 100 ) , clearLimit ) ;
state KeySelector read_end ;
if ( ucEnd ! = itEnd ) {
2017-11-03 01:51:30 +08:00
Key k = ucEnd . endKey ( ) . toStandaloneStringRef ( ) ;
read_end = KeySelector ( firstGreaterOrEqual ( k ) , k . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
if ( end . offset < 1 ) additionalRows + = 1 - end . offset ; // extra for items past end
} else if ( end . offset < 1 ) {
2017-11-03 01:51:30 +08:00
read_end = KeySelector ( firstGreaterOrEqual ( end . getKey ( ) ) , end . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
additionalRows + = 1 - end . offset ;
} else {
read_end = end ;
if ( end . offset > 1 ) {
singleClears + = countUncached ( std : : move ( ucEnd ) , ryw - > getMaxReadKey ( ) , clearLimit - singleClears ) ;
read_end . offset + = singleClears ;
}
}
additionalRows + = singleClears ;
state KeySelector read_begin ;
if ( begin . isFirstGreaterOrEqual ( ) ) {
2017-11-03 01:51:30 +08:00
Key k = it . beginKey ( ) > begin . getKey ( ) ? it . beginKey ( ) . toStandaloneStringRef ( ) : Key ( begin . getKey ( ) , begin . arena ( ) ) ;
begin = KeySelector ( firstGreaterOrEqual ( k ) , k . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
read_begin = begin ;
} else if ( begin . offset > 1 ) {
2017-11-03 01:51:30 +08:00
read_begin = KeySelector ( firstGreaterOrEqual ( begin . getKey ( ) ) , begin . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
additionalRows + = begin . offset - 1 ;
} else {
read_begin = begin ;
ucEnd = it ;
singleClears = countUncachedBack ( std : : move ( ucEnd ) , clearLimit ) ;
read_begin . offset - = singleClears ;
additionalRows + = singleClears ;
}
2017-09-26 06:52:45 +08:00
if ( read_end . getKey ( ) < read_begin . getKey ( ) ) {
read_end . setKey ( read_begin . getKey ( ) ) ;
read_end . arena ( ) . dependsOn ( read_begin . arena ( ) ) ;
}
2017-05-26 04:48:44 +08:00
state GetRangeLimits requestLimit = limits ;
setRequestLimits ( requestLimit , additionalRows , 2 - read_begin . offset , requestCount ) ;
requestCount + + ;
ASSERT ( ! requestLimit . hasRowLimit ( ) | | requestLimit . rows > 0 ) ;
ASSERT ( requestLimit . hasRowLimit ( ) | | requestLimit . hasByteLimit ( ) ) ;
2019-03-19 06:03:43 +08:00
//TraceEvent("RYWIssuing", randomID).detail("Begin", read_begin.toString()).detail("End", read_end.toString()).detail("Bytes", requestLimit.bytes).detail("Rows", requestLimit.rows).detail("Limits", limits.bytes).detail("Reached", limits.isReached()).detail("RequestCount", requestCount).detail("SingleClears", singleClears).detail("UcEnd", ucEnd.beginKey().toStandaloneStringRef()).detail("MinRows", requestLimit.minRows);
2017-05-26 04:48:44 +08:00
additionalRows = 0 ;
Standalone < RangeResultRef > snapshot_read = wait ( ryw - > tr . getRange ( read_begin , read_end , requestLimit , true , false ) ) ;
KeyRangeRef range = getKnownKeyRange ( snapshot_read , read_begin , read_end , ryw - > arena ) ;
2019-03-19 06:03:43 +08:00
//TraceEvent("RYWCacheInsert", randomID).detail("Range", range).detail("ExpectedSize", snapshot_read.expectedSize()).detail("Rows", snapshot_read.size()).detail("Results", snapshot_read).detail("More", snapshot_read.more).detail("ReadToBegin", snapshot_read.readToBegin).detail("ReadThroughEnd", snapshot_read.readThroughEnd).detail("ReadThrough", snapshot_read.readThrough);
2017-05-26 04:48:44 +08:00
if ( ryw - > cache . insert ( range , snapshot_read ) )
ryw - > arena . dependsOn ( snapshot_read . arena ( ) ) ;
// TODO: Is there a more efficient way to deal with invalidation?
resolveKeySelectorFromCache ( begin , it , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualBeginOffset ) ;
resolveKeySelectorFromCache ( end , itEnd , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualEndOffset ) ;
} else if ( it . is_kv ( ) ) {
2019-01-31 17:23:32 +08:00
KeyValueRef const * start = it . kv ( ryw - > arena ) ;
if ( start = = nullptr ) {
+ + it ;
continue ;
}
2017-05-26 04:48:44 +08:00
it . skipContiguous ( end . isFirstGreaterOrEqual ( ) ? end . getKey ( ) : ryw - > getMaxReadKey ( ) ) ; //not technically correct since this would add end.getKey(), but that is protected above
2019-01-31 17:23:32 +08:00
int maxCount = it . kv ( ryw - > arena ) - start + 1 ;
2017-05-26 04:48:44 +08:00
int count = 0 ;
for ( ; count < maxCount & & ! limits . isReached ( ) ; count + + ) {
limits . decrement ( start [ count ] ) ;
}
itemsPastEnd + = maxCount - count ;
2019-03-19 06:03:43 +08:00
//TraceEvent("RYWaddKV", randomID).detail("Key", it.beginKey().toStandaloneStringRef()).detail("Count", count).detail("MaxCount", maxCount).detail("ItemsPastEnd", itemsPastEnd);
2017-05-26 04:48:44 +08:00
if ( count ) result . append ( result . arena ( ) , start , count ) ;
+ + it ;
} else
+ + it ;
}
result . more = result . more | | limits . isReached ( ) ;
if ( end . isFirstGreaterOrEqual ( ) ) {
int keepItems = std : : lower_bound ( result . begin ( ) , result . end ( ) , end . getKey ( ) , KeyValueRef : : OrderByKey ( ) ) - result . begin ( ) ;
if ( keepItems < result . size ( ) )
result . more = false ;
result . resize ( result . arena ( ) , keepItems ) ;
}
result . readToBegin = readToBegin ;
result . readThroughEnd = ! result . more & & readThroughEnd ;
result . arena ( ) . dependsOn ( ryw - > arena ) ;
return result ;
}
static KeyRangeRef getKnownKeyRangeBack ( RangeResultRef data , KeySelector begin , KeySelector end , Arena & arena ) {
StringRef beginKey = ! data . more & & begin . offset < = 1 ? begin . getKey ( ) : allKeys . end ;
ExtStringRef endKey = end . offset > = 1 ? end . getKey ( ) : allKeys . begin ;
if ( data . readToBegin ) beginKey = allKeys . begin ;
if ( data . readThroughEnd ) endKey = allKeys . end ;
if ( data . size ( ) ) {
2017-11-03 01:51:30 +08:00
if ( data . readThrough . present ( ) ) {
2017-05-26 04:48:44 +08:00
beginKey = std : : min ( data . readThrough . get ( ) , beginKey ) ;
2017-11-03 01:51:30 +08:00
}
else {
beginKey = ! data . more & & data . end ( ) [ - 1 ] . key > beginKey ? beginKey : data . end ( ) [ - 1 ] . key ;
}
2017-05-26 04:48:44 +08:00
endKey = data [ 0 ] . key < endKey ? endKey : ExtStringRef ( data [ 0 ] . key , 1 ) ;
}
if ( beginKey > = endKey ) return KeyRangeRef ( ) ;
return KeyRangeRef ( StringRef ( arena , beginKey ) , endKey . toArena ( arena ) ) ;
}
// Pre: it points to an unknown range
// Decrements it to point to the unknown range just before the last nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
// Returns the number of single-key empty ranges skipped
template < class Iter > static int skipUncachedBack ( Iter & it , Iter const & end , int iterationLimit ) {
ExtStringRef b = it . beginKey ( ) ;
ExtStringRef e = it . endKey ( ) ;
int singleEmpty = 0 ;
ASSERT ( ! it . is_unreadable ( ) & & it . is_unknown_range ( ) ) ;
// b == it.beginKey()
// e is the end of the contiguous empty range containing it
while ( it ! = end & & - - iterationLimit > = 0 ) {
if ( it . is_unreadable ( ) | | it . is_empty_range ( ) ) {
if ( it . is_unreadable ( ) | | ! e . isKeyAfter ( b ) ) { //Assumes no degenerate ranges
while ( it . is_unreadable ( ) | | ! it . is_unknown_range ( ) )
+ + it ;
return singleEmpty ;
}
singleEmpty + + ;
} else
e = b ;
- - it ;
b = it . beginKey ( ) ;
}
while ( it . is_unreadable ( ) | | ! it . is_unknown_range ( ) )
+ + it ;
return singleEmpty ;
}
// Pre: it points to an unknown range
// Returns the number of preceding empty single-key known ranges between it and the previous nontrivial known range, but no more than maxClears
// Leaves it in an indeterminate state
template < class Iter > static int countUncachedBack ( Iter & & it , int maxClears ) {
if ( maxClears < = 0 ) return 0 ;
ExtStringRef b = it . beginKey ( ) ;
ExtStringRef e = it . endKey ( ) ;
int singleEmpty = 0 ;
while ( b > allKeys . begin ) {
if ( it . is_unreadable ( ) | | it . is_empty_range ( ) ) {
if ( it . is_unreadable ( ) | | ! e . isKeyAfter ( b ) ) { //Assumes no degenerate ranges
return singleEmpty ;
}
singleEmpty + + ;
if ( singleEmpty > = maxClears )
return maxClears ;
} else
e = b ;
- - it ;
b = it . beginKey ( ) ;
}
return singleEmpty ;
}
ACTOR template < class Iter > static Future < Standalone < RangeResultRef > > getRangeValueBack ( ReadYourWritesTransaction * ryw , KeySelector begin , KeySelector end , GetRangeLimits limits , Iter * pit ) {
state Iter & it ( * pit ) ;
state Iter itEnd ( * pit ) ;
state Standalone < RangeResultRef > result ;
state int64_t additionalRows = 0 ;
state int itemsPastBegin = 0 ;
state int requestCount = 0 ;
state bool readToBegin = false ;
state bool readThroughEnd = false ;
state int actualBeginOffset = begin . offset ;
state int actualEndOffset = end . offset ;
//state UID randomID = g_nondeterministic_random->randomUniqueID();
resolveKeySelectorFromCache ( end , it , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualEndOffset ) ;
resolveKeySelectorFromCache ( begin , itEnd , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualBeginOffset ) ;
2017-11-03 01:51:30 +08:00
if ( actualBeginOffset > = actualEndOffset & & begin . getKey ( ) > = end . getKey ( ) ) {
2017-05-26 04:48:44 +08:00
return RangeResultRef ( false , false ) ;
}
else if ( ( begin . isFirstGreaterOrEqual ( ) & & begin . getKey ( ) = = ryw - > getMaxReadKey ( ) )
| | ( end . isFirstGreaterOrEqual ( ) & & end . getKey ( ) = = allKeys . begin ) )
{
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
if ( ! begin . isFirstGreaterOrEqual ( ) & & begin . getKey ( ) > end . getKey ( ) ) {
Key resolvedBegin = wait ( read ( ryw , GetKeyReq ( begin ) , pit ) ) ;
if ( resolvedBegin = = allKeys . begin )
readToBegin = true ;
if ( resolvedBegin = = ryw - > getMaxReadKey ( ) )
readThroughEnd = true ;
if ( resolvedBegin > = end . getKey ( ) & & end . offset < = 1 ) {
return RangeResultRef ( false , false ) ;
}
else if ( resolvedBegin = = ryw - > getMaxReadKey ( ) ) {
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
resolveKeySelectorFromCache ( end , it , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualEndOffset ) ;
resolveKeySelectorFromCache ( begin , itEnd , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualBeginOffset ) ;
}
2018-06-09 02:11:08 +08:00
//TraceEvent("RYWSelectorsStartReverse", randomID).detail("ByteLimit", limits.bytes).detail("RowLimit", limits.rows);
2017-05-26 04:48:44 +08:00
loop {
2018-06-09 02:11:08 +08:00
/*TraceEvent("RYWSelectors", randomID).detail("Begin", begin.toString())
. detail ( " End " , end . toString ( ) )
. detail ( " Reached " , limits . isReached ( ) )
. detail ( " ItemsPastBegin " , itemsPastBegin )
. detail ( " EndOffset " , end . offset )
2019-03-19 06:03:43 +08:00
. detail ( " ItBegin " , it . beginKey ( ) . toStandaloneStringRef ( ) )
. detail ( " ItEnd " , itEnd . beginKey ( ) . toStandaloneStringRef ( ) )
2018-06-09 02:11:08 +08:00
. detail ( " Unknown " , it . is_unknown_range ( ) )
. detail ( " Kv " , it . is_kv ( ) )
. detail ( " Requests " , requestCount ) ; */
2017-05-26 04:48:44 +08:00
2017-11-03 01:51:30 +08:00
if ( ! result . size ( ) & & actualBeginOffset > = actualEndOffset & & begin . getKey ( ) > = end . getKey ( ) ) {
2017-05-26 04:48:44 +08:00
return RangeResultRef ( false , false ) ;
}
2017-11-03 01:51:30 +08:00
if ( ! begin . isBackward ( ) & & begin . getKey ( ) > = ryw - > getMaxReadKey ( ) ) {
2017-05-26 04:48:44 +08:00
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
2017-11-03 01:51:30 +08:00
if ( ( begin . offset > = end . offset & & begin . getKey ( ) > = end . getKey ( ) ) | |
( end . offset < = 1 & & end . getKey ( ) = = allKeys . begin ) ) {
2017-05-26 04:48:44 +08:00
if ( begin . isFirstGreaterOrEqual ( ) ) break ;
if ( ! result . size ( ) ) break ;
Key resolvedBegin = wait ( read ( ryw , GetKeyReq ( begin ) , pit ) ) ; //do not worry about iterator invalidation, because we are breaking for the loop
if ( resolvedBegin = = allKeys . begin )
readToBegin = true ;
if ( resolvedBegin = = ryw - > getMaxReadKey ( ) )
readThroughEnd = true ;
begin = firstGreaterOrEqual ( resolvedBegin ) ;
break ;
}
2017-11-03 01:51:30 +08:00
if ( itemsPastBegin > = begin . offset - 1 & & ! it . is_unreadable ( ) & & ! it . is_unknown_range ( ) & & it . beginKey ( ) < itEnd . beginKey ( ) ) {
2017-05-26 04:48:44 +08:00
if ( begin . isFirstGreaterOrEqual ( ) ) break ;
return RangeResultRef ( readToBegin , readThroughEnd ) ;
}
if ( limits . isReached ( ) & & itemsPastBegin > = begin . offset - 1 ) break ;
if ( end . isFirstGreaterOrEqual ( ) & & end . getKey ( ) = = it . beginKey ( ) ) {
if ( itemsPastBegin > = begin . offset - 1 & & it = = itEnd ) break ;
- - it ;
}
if ( it . is_unknown_range ( ) ) {
if ( limits . hasByteLimit ( ) & & result . size ( ) & & itemsPastBegin > = begin . offset - 1 ) {
result . more = true ;
break ;
}
Iter ucEnd ( it ) ;
int singleClears = 0 ;
int clearLimit = requestCount ? 1 < < std : : min ( requestCount , 20 ) : 0 ;
if ( it . beginKey ( ) > itEnd . beginKey ( ) )
singleClears = std : : min ( skipUncachedBack ( ucEnd , itEnd , BUGGIFY ? 0 : clearLimit + 100 ) , clearLimit ) ;
state KeySelector read_begin ;
if ( ucEnd ! = itEnd ) {
2017-11-03 01:51:30 +08:00
Key k = ucEnd . beginKey ( ) . toStandaloneStringRef ( ) ;
read_begin = KeySelector ( firstGreaterOrEqual ( k ) , k . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
if ( begin . offset > 1 ) additionalRows + = begin . offset - 1 ; // extra for items past end
} else if ( begin . offset > 1 ) {
2017-11-03 01:51:30 +08:00
read_begin = KeySelector ( firstGreaterOrEqual ( begin . getKey ( ) ) , begin . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
additionalRows + = begin . offset - 1 ;
} else {
read_begin = begin ;
if ( begin . offset < 1 ) {
singleClears + = countUncachedBack ( std : : move ( ucEnd ) , clearLimit - singleClears ) ;
read_begin . offset - = singleClears ;
}
}
additionalRows + = singleClears ;
state KeySelector read_end ;
if ( end . isFirstGreaterOrEqual ( ) ) {
2017-11-03 01:51:30 +08:00
Key k = it . endKey ( ) < end . getKey ( ) ? it . endKey ( ) . toStandaloneStringRef ( ) : end . getKey ( ) ;
end = KeySelector ( firstGreaterOrEqual ( k ) , k . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
read_end = end ;
} else if ( end . offset < 1 ) {
2017-11-03 01:51:30 +08:00
read_end = KeySelector ( firstGreaterOrEqual ( end . getKey ( ) ) , end . arena ( ) ) ;
2017-05-26 04:48:44 +08:00
additionalRows + = 1 - end . offset ;
} else {
read_end = end ;
ucEnd = it ;
singleClears = countUncached ( std : : move ( ucEnd ) , ryw - > getMaxReadKey ( ) , clearLimit ) ;
read_end . offset + = singleClears ;
additionalRows + = singleClears ;
}
2017-09-26 06:52:45 +08:00
if ( read_begin . getKey ( ) > read_end . getKey ( ) ) {
read_begin . setKey ( read_end . getKey ( ) ) ;
read_begin . arena ( ) . dependsOn ( read_end . arena ( ) ) ;
}
2017-05-26 04:48:44 +08:00
state GetRangeLimits requestLimit = limits ;
setRequestLimits ( requestLimit , additionalRows , read_end . offset , requestCount ) ;
requestCount + + ;
ASSERT ( ! requestLimit . hasRowLimit ( ) | | requestLimit . rows > 0 ) ;
ASSERT ( requestLimit . hasRowLimit ( ) | | requestLimit . hasByteLimit ( ) ) ;
2019-03-19 06:03:43 +08:00
//TraceEvent("RYWIssuing", randomID).detail("Begin", read_begin.toString()).detail("End", read_end.toString()).detail("Bytes", requestLimit.bytes).detail("Rows", requestLimit.rows).detail("Limits", limits.bytes).detail("Reached", limits.isReached()).detail("RequestCount", requestCount).detail("SingleClears", singleClears).detail("UcEnd", ucEnd.beginKey().toStandaloneStringRef()).detail("MinRows", requestLimit.minRows);
2017-05-26 04:48:44 +08:00
additionalRows = 0 ;
Standalone < RangeResultRef > snapshot_read = wait ( ryw - > tr . getRange ( read_begin , read_end , requestLimit , true , true ) ) ;
KeyRangeRef range = getKnownKeyRangeBack ( snapshot_read , read_begin , read_end , ryw - > arena ) ;
2019-03-19 06:03:43 +08:00
//TraceEvent("RYWCacheInsert", randomID).detail("Range", range).detail("ExpectedSize", snapshot_read.expectedSize()).detail("Rows", snapshot_read.size()).detail("Results", snapshot_read).detail("More", snapshot_read.more).detail("ReadToBegin", snapshot_read.readToBegin).detail("ReadThroughEnd", snapshot_read.readThroughEnd).detail("ReadThrough", snapshot_read.readThrough);
2017-05-26 04:48:44 +08:00
RangeResultRef reversed ;
reversed . resize ( ryw - > arena , snapshot_read . size ( ) ) ;
for ( int i = 0 ; i < snapshot_read . size ( ) ; i + + ) {
reversed [ snapshot_read . size ( ) - i - 1 ] = snapshot_read [ i ] ;
}
if ( ryw - > cache . insert ( range , reversed ) )
ryw - > arena . dependsOn ( snapshot_read . arena ( ) ) ;
// TODO: Is there a more efficient way to deal with invalidation?
resolveKeySelectorFromCache ( end , it , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualEndOffset ) ;
resolveKeySelectorFromCache ( begin , itEnd , ryw - > getMaxReadKey ( ) , & readToBegin , & readThroughEnd , & actualBeginOffset ) ;
} else {
2019-01-31 17:23:32 +08:00
KeyValueRef const * end = it . is_kv ( ) ? it . kv ( ryw - > arena ) : nullptr ;
if ( end ! = nullptr ) {
2017-05-26 04:48:44 +08:00
it . skipContiguousBack ( begin . isFirstGreaterOrEqual ( ) ? begin . getKey ( ) : allKeys . begin ) ;
2019-01-31 17:23:32 +08:00
KeyValueRef const * start = it . kv ( ryw - > arena ) ;
ASSERT ( start ! = nullptr ) ;
2017-05-26 04:48:44 +08:00
int maxCount = end - start + 1 ;
int count = 0 ;
for ( ; count < maxCount & & ! limits . isReached ( ) ; count + + ) {
limits . decrement ( start [ maxCount - count - 1 ] ) ;
}
itemsPastBegin + = maxCount - count ;
2019-03-19 06:03:43 +08:00
//TraceEvent("RYWaddKV", randomID).detail("Key", it.beginKey().toStandaloneStringRef()).detail("Count", count).detail("MaxCount", maxCount).detail("ItemsPastBegin", itemsPastBegin);
2017-05-26 04:48:44 +08:00
if ( count ) {
int size = result . size ( ) ;
result . resize ( result . arena ( ) , size + count ) ;
for ( int i = 0 ; i < count ; i + + ) {
result [ size + i ] = start [ maxCount - i - 1 ] ;
}
}
}
if ( it = = itEnd ) break ;
- - it ;
}
}
result . more = result . more | | limits . isReached ( ) ;
if ( begin . isFirstGreaterOrEqual ( ) ) {
int keepItems = result . rend ( ) - std : : lower_bound ( result . rbegin ( ) , result . rend ( ) , begin . getKey ( ) , KeyValueRef : : OrderByKey ( ) ) ;
if ( keepItems < result . size ( ) )
result . more = false ;
result . resize ( result . arena ( ) , keepItems ) ;
}
result . readToBegin = ! result . more & & readToBegin ;
result . readThroughEnd = readThroughEnd ;
result . arena ( ) . dependsOn ( ryw - > arena ) ;
return result ;
}
static void triggerWatches ( ReadYourWritesTransaction * ryw , KeyRangeRef range , Optional < ValueRef > val , bool valueKnown = true ) {
for ( auto it = ryw - > watchMap . lower_bound ( range . begin ) ; it ! = ryw - > watchMap . end ( ) & & it - > key < range . end ; ) {
auto itCopy = it ;
+ + it ;
ASSERT ( itCopy - > value . size ( ) ) ;
TEST ( itCopy - > value . size ( ) > 1 ) ; //Multiple watches on the same key triggered by RYOW
for ( int i = 0 ; i < itCopy - > value . size ( ) ; i + + ) {
if ( itCopy - > value [ i ] - > onChangeTrigger . isSet ( ) ) {
2018-08-02 09:09:54 +08:00
swapAndPop ( & itCopy - > value , i - - ) ;
2017-05-26 04:48:44 +08:00
} else if ( ! valueKnown | |
2018-03-22 09:58:19 +08:00
( itCopy - > value [ i ] - > setPresent & & ( itCopy - > value [ i ] - > setValue . present ( ) ! = val . present ( ) | | ( val . present ( ) & & itCopy - > value [ i ] - > setValue . get ( ) ! = val . get ( ) ) ) ) | |
( itCopy - > value [ i ] - > valuePresent & & ( itCopy - > value [ i ] - > value . present ( ) ! = val . present ( ) | | ( val . present ( ) & & itCopy - > value [ i ] - > value . get ( ) ! = val . get ( ) ) ) ) ) {
2017-05-26 04:48:44 +08:00
itCopy - > value [ i ] - > onChangeTrigger . send ( Void ( ) ) ;
2018-08-02 09:09:54 +08:00
swapAndPop ( & itCopy - > value , i - - ) ;
2017-05-26 04:48:44 +08:00
} else {
itCopy - > value [ i ] - > setPresent = true ;
2019-01-12 01:03:38 +08:00
itCopy - > value [ i ] - > setValue = val . castTo < Value > ( ) ;
2017-05-26 04:48:44 +08:00
}
}
if ( itCopy - > value . size ( ) = = 0 )
ryw - > watchMap . erase ( itCopy ) ;
}
}
static void triggerWatches ( ReadYourWritesTransaction * ryw , KeyRef key , Optional < ValueRef > val , bool valueKnown = true ) {
triggerWatches ( ryw , singleKeyRange ( key ) , val , valueKnown ) ;
}
ACTOR static Future < Void > watch ( ReadYourWritesTransaction * ryw , Key key ) {
state Future < Optional < Value > > val ;
state Future < Void > watchFuture ;
state Reference < Watch > watch ( new Watch ( key ) ) ;
state Promise < Void > done ;
ryw - > reading . add ( done . getFuture ( ) ) ;
if ( ! ryw - > options . readYourWritesDisabled ) {
ryw - > watchMap [ key ] . push_back ( watch ) ;
val = readWithConflictRange ( ryw , GetValueReq ( key ) , false ) ;
}
else
val = ryw - > tr . get ( key ) ;
try {
2018-08-11 04:57:10 +08:00
wait ( ryw - > resetPromise . getFuture ( ) | | success ( val ) | | watch - > onChangeTrigger . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
done . send ( Void ( ) ) ;
throw ;
}
if ( watch - > onChangeTrigger . getFuture ( ) . isReady ( ) ) {
done . send ( Void ( ) ) ;
if ( watch - > onChangeTrigger . getFuture ( ) . isError ( ) )
throw watch - > onChangeTrigger . getFuture ( ) . getError ( ) ;
return Void ( ) ;
}
watch - > valuePresent = true ;
watch - > value = val . get ( ) ;
if ( watch - > setPresent & & ( watch - > setValue . present ( ) ! = watch - > value . present ( ) | | ( watch - > value . present ( ) & & watch - > setValue . get ( ) ! = watch - > value . get ( ) ) ) ) {
watch - > onChangeTrigger . send ( Void ( ) ) ;
done . send ( Void ( ) ) ;
return Void ( ) ;
}
watchFuture = ryw - > tr . watch ( watch ) ; // throws if there are too many outstanding watches
done . send ( Void ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( watchFuture ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
ACTOR static Future < Void > commit ( ReadYourWritesTransaction * ryw ) {
try {
ryw - > commitStarted = true ;
Future < Void > ready = ryw - > reading ;
2018-08-11 04:57:10 +08:00
wait ( ryw - > resetPromise . getFuture ( ) | | ready ) ;
2017-05-26 04:48:44 +08:00
if ( ryw - > options . readYourWritesDisabled ) {
if ( ryw - > resetPromise . isSet ( ) )
throw ryw - > resetPromise . getFuture ( ) . getError ( ) ;
2018-08-11 04:57:10 +08:00
wait ( ryw - > resetPromise . getFuture ( ) | | ryw - > tr . commit ( ) ) ;
2017-05-26 04:48:44 +08:00
ryw - > debugLogRetries ( ) ;
if ( ! ryw - > tr . apiVersionAtLeast ( 410 ) ) {
ryw - > reset ( ) ;
}
return Void ( ) ;
}
2019-03-01 09:45:00 +08:00
ryw - > writeRangeToNativeTransaction ( KeyRangeRef ( StringRef ( ) , allKeys . end ) ) ;
2017-05-26 04:48:44 +08:00
auto conflictRanges = ryw - > readConflicts . ranges ( ) ;
for ( auto iter = conflictRanges . begin ( ) ; iter ! = conflictRanges . end ( ) ; + + iter ) {
if ( iter - > value ( ) ) {
ryw - > tr . addReadConflictRange ( iter - > range ( ) ) ;
}
}
2018-08-11 04:57:10 +08:00
wait ( ryw - > resetPromise . getFuture ( ) | | ryw - > tr . commit ( ) ) ;
2017-05-26 04:48:44 +08:00
ryw - > debugLogRetries ( ) ;
if ( ! ryw - > tr . apiVersionAtLeast ( 410 ) ) {
ryw - > reset ( ) ;
}
return Void ( ) ;
} catch ( Error & e ) {
if ( ! ryw - > tr . apiVersionAtLeast ( 410 ) ) {
ryw - > commitStarted = false ;
if ( ! ryw - > resetPromise . isSet ( ) ) {
ryw - > tr . reset ( ) ;
ryw - > resetRyow ( ) ;
}
}
throw ;
}
}
ACTOR static Future < Void > onError ( ReadYourWritesTransaction * ryw , Error e ) {
try {
if ( ryw - > resetPromise . isSet ( ) ) {
throw ryw - > resetPromise . getFuture ( ) . getError ( ) ;
}
bool retry_limit_hit = ryw - > options . maxRetries ! = - 1 & & ryw - > retries > = ryw - > options . maxRetries ;
if ( ryw - > retries < std : : numeric_limits < int > : : max ( ) )
ryw - > retries + + ;
if ( retry_limit_hit ) {
throw e ;
}
2018-08-11 04:57:10 +08:00
wait ( ryw - > resetPromise . getFuture ( ) | | ryw - > tr . onError ( e ) ) ;
2017-05-26 04:48:44 +08:00
ryw - > debugLogRetries ( e ) ;
ryw - > resetRyow ( ) ;
return Void ( ) ;
} catch ( Error & e ) {
if ( ! ryw - > resetPromise . isSet ( ) ) {
2019-03-15 02:40:51 +08:00
if ( ryw - > tr . apiVersionAtLeast ( 610 ) ) {
ryw - > resetPromise . sendError ( transaction_cancelled ( ) ) ;
}
else {
ryw - > resetRyow ( ) ;
}
2017-05-26 04:48:44 +08:00
}
if ( e . code ( ) = = error_code_broken_promise )
throw transaction_cancelled ( ) ;
throw ;
}
}
ACTOR static Future < Version > getReadVersion ( ReadYourWritesTransaction * ryw ) {
choose {
when ( Version v = wait ( ryw - > tr . getReadVersion ( ) ) ) {
return v ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( ryw - > resetPromise . getFuture ( ) ) ) {
2017-05-26 04:48:44 +08:00
throw internal_error ( ) ;
}
}
}
} ;
2019-03-22 02:45:43 +08:00
ReadYourWritesTransaction : : ReadYourWritesTransaction ( Database const & cx ) : cache ( & arena ) , writes ( & arena ) , tr ( cx ) , retries ( 0 ) , creationTime ( now ( ) ) , commitStarted ( false ) , options ( tr ) , deferredError ( cx - > deferredError ) {
resetTimeout ( ) ;
}
2017-05-26 04:48:44 +08:00
2019-03-20 07:59:14 +08:00
ACTOR Future < Void > timebomb ( double endTime , Promise < Void > resetPromise ) {
if ( now ( ) < endTime ) {
wait ( delayUntil ( endTime ) ) ;
2017-05-26 04:48:44 +08:00
}
if ( ! resetPromise . isSet ( ) )
resetPromise . sendError ( transaction_timed_out ( ) ) ;
throw transaction_timed_out ( ) ;
}
2019-03-22 02:45:43 +08:00
void ReadYourWritesTransaction : : resetTimeout ( ) {
2019-03-20 07:59:14 +08:00
timeoutActor = options . timeoutInSeconds = = 0.0 ? Void ( ) : timebomb ( options . timeoutInSeconds + creationTime , resetPromise ) ;
}
2017-05-26 04:48:44 +08:00
Future < Version > ReadYourWritesTransaction : : getReadVersion ( ) {
if ( tr . apiVersionAtLeast ( 101 ) ) {
if ( resetPromise . isSet ( ) )
return resetPromise . getFuture ( ) . getError ( ) ;
return RYWImpl : : getReadVersion ( this ) ;
}
return tr . getReadVersion ( ) ;
}
Optional < Value > getValueFromJSON ( StatusObject statusObj ) {
try {
Value output = StringRef ( json_spirit : : write_string ( json_spirit : : mValue ( statusObj ) , json_spirit : : Output_options : : raw_utf8 ) . c_str ( ) ) ;
return output ;
}
catch ( std : : exception & e ) {
TraceEvent ( SevError , " UnableToUnparseStatusJSON " ) . detail ( " What " , e . what ( ) ) ;
throw internal_error ( ) ;
}
}
ACTOR Future < Optional < Value > > getJSON ( Reference < ClusterConnectionFile > clusterFile ) {
StatusObject statusObj = wait ( StatusClient : : statusFetcher ( clusterFile ) ) ;
return getValueFromJSON ( statusObj ) ;
}
ACTOR Future < Standalone < RangeResultRef > > getWorkerInterfaces ( Reference < ClusterConnectionFile > clusterFile ) {
state Reference < AsyncVar < Optional < ClusterInterface > > > clusterInterface ( new AsyncVar < Optional < ClusterInterface > > ) ;
state Future < Void > leaderMon = monitorLeader < ClusterInterface > ( clusterFile , clusterInterface ) ;
loop {
choose {
when ( vector < ClientWorkerInterface > workers = wait ( clusterInterface - > get ( ) . present ( ) ? brokenPromiseToNever ( clusterInterface - > get ( ) . get ( ) . getClientWorkers . getReply ( GetClientWorkersRequest ( ) ) ) : Never ( ) ) ) {
Standalone < RangeResultRef > result ;
for ( auto & it : workers ) {
result . push_back_deep ( result . arena ( ) , KeyValueRef ( it . address ( ) . toString ( ) , BinaryWriter : : toValue ( it , IncludeVersion ( ) ) ) ) ;
}
return result ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( clusterInterface - > onChange ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
}
Future < Optional < Value > > ReadYourWritesTransaction : : get ( const Key & key , bool snapshot ) {
TEST ( true ) ;
if ( key = = LiteralStringRef ( " \xff \xff /status/json " ) ) {
2018-09-22 06:58:14 +08:00
if ( tr . getDatabase ( ) . getPtr ( ) & & tr . getDatabase ( ) - > getConnectionFile ( ) ) {
return getJSON ( tr . getDatabase ( ) - > getConnectionFile ( ) ) ;
2017-05-26 04:48:44 +08:00
}
else {
return Optional < Value > ( ) ;
}
}
if ( key = = LiteralStringRef ( " \xff \xff /cluster_file_path " ) ) {
try {
2018-09-22 06:58:14 +08:00
if ( tr . getDatabase ( ) . getPtr ( ) & & tr . getDatabase ( ) - > getConnectionFile ( ) ) {
Optional < Value > output = StringRef ( tr . getDatabase ( ) - > getConnectionFile ( ) - > getFilename ( ) ) ;
2017-05-26 04:48:44 +08:00
return output ;
}
}
catch ( Error & e ) {
return e ;
}
return Optional < Value > ( ) ;
}
if ( key = = LiteralStringRef ( " \xff \xff /connection_string " ) ) {
try {
2018-09-22 06:58:14 +08:00
if ( tr . getDatabase ( ) . getPtr ( ) & & tr . getDatabase ( ) - > getConnectionFile ( ) ) {
Reference < ClusterConnectionFile > f = tr . getDatabase ( ) - > getConnectionFile ( ) ;
2017-05-26 04:48:44 +08:00
Optional < Value > output = StringRef ( f - > getConnectionString ( ) . toString ( ) ) ;
return output ;
}
}
catch ( Error & e ) {
return e ;
}
return Optional < Value > ( ) ;
}
if ( checkUsedDuringCommit ( ) ) {
return used_during_commit ( ) ;
}
if ( resetPromise . isSet ( ) )
return resetPromise . getFuture ( ) . getError ( ) ;
2019-03-13 02:08:40 +08:00
if ( key > = getMaxReadKey ( ) & & key ! = metadataVersionKey )
2017-05-26 04:48:44 +08:00
return key_outside_legal_range ( ) ;
//There are no keys in the database with size greater than KEY_SIZE_LIMIT
if ( key . size ( ) > ( key . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
return Optional < Value > ( ) ;
Future < Optional < Value > > result = RYWImpl : : readWithConflictRange ( this , RYWImpl : : GetValueReq ( key ) , snapshot ) ;
reading . add ( success ( result ) ) ;
return result ;
}
Future < Key > ReadYourWritesTransaction : : getKey ( const KeySelector & key , bool snapshot ) {
if ( checkUsedDuringCommit ( ) ) {
return used_during_commit ( ) ;
}
if ( resetPromise . isSet ( ) )
return resetPromise . getFuture ( ) . getError ( ) ;
if ( key . getKey ( ) > getMaxReadKey ( ) )
return key_outside_legal_range ( ) ;
Future < Key > result = RYWImpl : : readWithConflictRange ( this , RYWImpl : : GetKeyReq ( key ) , snapshot ) ;
reading . add ( success ( result ) ) ;
return result ;
}
Future < Standalone < RangeResultRef > > ReadYourWritesTransaction : : getRange (
KeySelector begin ,
KeySelector end ,
GetRangeLimits limits ,
bool snapshot ,
bool reverse )
{
if ( begin . getKey ( ) = = LiteralStringRef ( " \xff \xff /worker_interfaces " ) ) {
2018-09-22 06:58:14 +08:00
if ( tr . getDatabase ( ) . getPtr ( ) & & tr . getDatabase ( ) - > getConnectionFile ( ) ) {
return getWorkerInterfaces ( tr . getDatabase ( ) - > getConnectionFile ( ) ) ;
2017-05-26 04:48:44 +08:00
}
else {
return Standalone < RangeResultRef > ( ) ;
}
}
if ( checkUsedDuringCommit ( ) ) {
return used_during_commit ( ) ;
}
if ( resetPromise . isSet ( ) )
return resetPromise . getFuture ( ) . getError ( ) ;
KeyRef maxKey = getMaxReadKey ( ) ;
if ( begin . getKey ( ) > maxKey | | end . getKey ( ) > maxKey )
return key_outside_legal_range ( ) ;
//This optimization prevents NULL operations from being added to the conflict range
if ( limits . isReached ( ) ) {
TEST ( true ) ; // RYW range read limit 0
return Standalone < RangeResultRef > ( ) ;
}
if ( ! limits . isValid ( ) )
return range_limits_invalid ( ) ;
if ( begin . orEqual )
begin . removeOrEqual ( begin . arena ( ) ) ;
if ( end . orEqual )
end . removeOrEqual ( end . arena ( ) ) ;
if ( begin . offset > = end . offset & & begin . getKey ( ) > = end . getKey ( ) ) {
TEST ( true ) ; // RYW range inverted
return Standalone < RangeResultRef > ( ) ;
}
Future < Standalone < RangeResultRef > > result = reverse
? RYWImpl : : readWithConflictRange ( this , RYWImpl : : GetRangeReq < true > ( begin , end , limits ) , snapshot )
: RYWImpl : : readWithConflictRange ( this , RYWImpl : : GetRangeReq < false > ( begin , end , limits ) , snapshot ) ;
reading . add ( success ( result ) ) ;
return result ;
}
Future < Standalone < RangeResultRef > > ReadYourWritesTransaction : : getRange (
const KeySelector & begin ,
const KeySelector & end ,
int limit ,
bool snapshot ,
bool reverse )
{
return getRange ( begin , end , GetRangeLimits ( limit ) , snapshot , reverse ) ;
}
Future < Standalone < VectorRef < const char * > > > ReadYourWritesTransaction : : getAddressesForKey ( const Key & key ) {
if ( checkUsedDuringCommit ( ) ) {
return used_during_commit ( ) ;
}
if ( resetPromise . isSet ( ) )
return resetPromise . getFuture ( ) . getError ( ) ;
// If key >= allKeys.end, then our resulting address vector will be empty.
2018-02-01 05:47:36 +08:00
Future < Standalone < VectorRef < const char * > > > result = waitOrError ( tr . getAddressesForKey ( key ) , resetPromise . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
reading . add ( success ( result ) ) ;
return result ;
}
void ReadYourWritesTransaction : : addReadConflictRange ( KeyRangeRef const & keys ) {
if ( checkUsedDuringCommit ( ) ) {
throw used_during_commit ( ) ;
}
if ( tr . apiVersionAtLeast ( 300 ) ) {
if ( keys . begin > getMaxReadKey ( ) | | keys . end > getMaxReadKey ( ) ) {
throw key_outside_legal_range ( ) ;
}
}
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = keys . begin ;
KeyRef end = keys . end ;
if ( begin . size ( ) > ( begin . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
begin = begin . substr ( 0 , ( begin . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) + 1 ) ;
if ( end . size ( ) > ( end . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
end = end . substr ( 0 , ( end . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) + 1 ) ;
KeyRangeRef r = KeyRangeRef ( begin , end ) ;
if ( r . empty ( ) ) {
return ;
}
if ( options . readYourWritesDisabled ) {
tr . addReadConflictRange ( r ) ;
return ;
}
WriteMap : : iterator it ( & writes ) ;
KeyRangeRef readRange ( arena , r ) ;
it . skip ( readRange . begin ) ;
updateConflictMap ( readRange , it ) ;
}
void ReadYourWritesTransaction : : updateConflictMap ( KeyRef const & key , WriteMap : : iterator & it ) {
//it.skip( key );
//ASSERT( it.beginKey() <= key && key < it.endKey() );
if ( it . is_unmodified_range ( ) | | ( it . is_operation ( ) & & ! it . is_independent ( ) ) ) {
readConflicts . insert ( singleKeyRange ( key , arena ) , true ) ;
}
}
void ReadYourWritesTransaction : : updateConflictMap ( KeyRangeRef const & keys , WriteMap : : iterator & it ) {
//it.skip( keys.begin );
//ASSERT( it.beginKey() <= keys.begin && keys.begin < it.endKey() );
for ( ; it . beginKey ( ) < keys . end ; + + it ) {
if ( it . is_unmodified_range ( ) | | ( it . is_operation ( ) & & ! it . is_independent ( ) ) ) {
KeyRangeRef insert_range = KeyRangeRef ( std : : max ( keys . begin , it . beginKey ( ) . toArenaOrRef ( arena ) ) , std : : min ( keys . end , it . endKey ( ) . toArenaOrRef ( arena ) ) ) ;
if ( ! insert_range . empty ( ) )
readConflicts . insert ( insert_range , true ) ;
}
}
}
void ReadYourWritesTransaction : : writeRangeToNativeTransaction ( KeyRangeRef const & keys ) {
WriteMap : : iterator it ( & writes ) ;
it . skip ( keys . begin ) ;
bool inClearRange = false ;
ExtStringRef clearBegin ;
//Clear ranges must be done first because of keys that are both cleared and set to a new value
for ( ; it . beginKey ( ) < keys . end ; + + it ) {
if ( it . is_cleared_range ( ) & & ! inClearRange ) {
clearBegin = std : : max ( ExtStringRef ( keys . begin ) , it . beginKey ( ) ) ;
inClearRange = true ;
} else if ( ! it . is_cleared_range ( ) & & inClearRange ) {
tr . clear ( KeyRangeRef ( clearBegin . toArenaOrRef ( arena ) , it . beginKey ( ) . toArenaOrRef ( arena ) ) , false ) ;
inClearRange = false ;
}
}
if ( inClearRange ) {
tr . clear ( KeyRangeRef ( clearBegin . toArenaOrRef ( arena ) , keys . end ) , false ) ;
}
it . skip ( keys . begin ) ;
bool inConflictRange = false ;
ExtStringRef conflictBegin ;
for ( ; it . beginKey ( ) < keys . end ; + + it ) {
if ( it . is_conflict_range ( ) & & ! inConflictRange ) {
conflictBegin = std : : max ( ExtStringRef ( keys . begin ) , it . beginKey ( ) ) ;
inConflictRange = true ;
} else if ( ! it . is_conflict_range ( ) & & inConflictRange ) {
tr . addWriteConflictRange ( KeyRangeRef ( conflictBegin . toArenaOrRef ( arena ) , it . beginKey ( ) . toArenaOrRef ( arena ) ) ) ;
inConflictRange = false ;
}
//SOMEDAY: make atomicOp take set to avoid switch
if ( it . is_operation ( ) ) {
auto op = it . op ( ) ;
for ( int i = 0 ; i < op . size ( ) ; + + i ) {
switch ( op [ i ] . type ) {
case MutationRef : : SetValue :
2019-01-31 17:23:32 +08:00
if ( op [ i ] . value . present ( ) ) {
tr . set ( it . beginKey ( ) . assertRef ( ) , op [ i ] . value . get ( ) , false ) ;
} else {
tr . clear ( it . beginKey ( ) . assertRef ( ) , false ) ;
}
2017-05-26 04:48:44 +08:00
break ;
case MutationRef : : AddValue :
case MutationRef : : AppendIfFits :
case MutationRef : : And :
case MutationRef : : Or :
case MutationRef : : Xor :
case MutationRef : : Max :
case MutationRef : : Min :
case MutationRef : : SetVersionstampedKey :
case MutationRef : : SetVersionstampedValue :
2017-10-11 04:02:22 +08:00
case MutationRef : : ByteMin :
case MutationRef : : ByteMax :
2017-10-26 05:48:05 +08:00
case MutationRef : : MinV2 :
case MutationRef : : AndV2 :
2019-01-31 17:23:32 +08:00
case MutationRef : : CompareAndClear :
tr . atomicOp ( it . beginKey ( ) . assertRef ( ) , op [ i ] . value . get ( ) , op [ i ] . type , false ) ;
2017-05-26 04:48:44 +08:00
break ;
default :
break ;
}
}
}
}
if ( inConflictRange ) {
tr . addWriteConflictRange ( KeyRangeRef ( conflictBegin . toArenaOrRef ( arena ) , keys . end ) ) ;
}
}
2019-03-22 04:47:12 +08:00
ReadYourWritesTransactionOptions : : ReadYourWritesTransactionOptions ( Transaction const & tr ) {
2019-03-22 06:48:40 +08:00
Database cx = tr . getDatabase ( ) ;
timeoutInSeconds = cx - > transactionTimeout ;
maxRetries = cx - > transactionMaxRetries ;
2019-03-22 04:47:12 +08:00
reset ( tr ) ;
}
2019-03-20 00:15:41 +08:00
void ReadYourWritesTransactionOptions : : reset ( Transaction const & tr ) {
2019-03-22 04:47:12 +08:00
double oldTimeout = timeoutInSeconds ;
2019-03-22 06:48:40 +08:00
int oldMaxRetries = maxRetries ;
2019-03-22 04:47:12 +08:00
memset ( this , 0 , sizeof ( * this ) ) ;
if ( tr . apiVersionAtLeast ( 610 ) ) {
2019-03-22 06:48:40 +08:00
// Starting in API version 610, these options are not cleared after reset.
2019-03-22 04:47:12 +08:00
timeoutInSeconds = oldTimeout ;
2019-03-22 06:48:40 +08:00
maxRetries = oldMaxRetries ;
}
else {
Database cx = tr . getDatabase ( ) ;
maxRetries = cx - > transactionMaxRetries ;
timeoutInSeconds = cx - > transactionTimeout ;
2019-03-22 04:47:12 +08:00
}
2019-03-22 06:48:40 +08:00
snapshotRywEnabled = tr . getDatabase ( ) - > snapshotRywEnabled ;
}
void ReadYourWritesTransactionOptions : : fullReset ( Transaction const & tr ) {
reset ( tr ) ;
Database cx = tr . getDatabase ( ) ;
maxRetries = cx - > transactionMaxRetries ;
timeoutInSeconds = cx - > transactionTimeout ;
2019-03-20 00:15:41 +08:00
}
2017-05-26 04:48:44 +08:00
bool ReadYourWritesTransactionOptions : : getAndResetWriteConflictDisabled ( ) {
bool disabled = nextWriteDisableConflictRange ;
nextWriteDisableConflictRange = false ;
return disabled ;
}
void ReadYourWritesTransaction : : getWriteConflicts ( KeyRangeMap < bool > * result ) {
WriteMap : : iterator it ( & writes ) ;
it . skip ( allKeys . begin ) ;
bool inConflictRange = false ;
ExtStringRef conflictBegin ;
for ( ; it . beginKey ( ) < getMaxWriteKey ( ) ; + + it ) {
if ( it . is_conflict_range ( ) & & ! inConflictRange ) {
conflictBegin = it . beginKey ( ) ;
inConflictRange = true ;
} else if ( ! it . is_conflict_range ( ) & & inConflictRange ) {
result - > insert ( KeyRangeRef ( conflictBegin . toArenaOrRef ( arena ) , it . beginKey ( ) . toArenaOrRef ( arena ) ) , true ) ;
inConflictRange = false ;
}
}
if ( inConflictRange ) {
result - > insert ( KeyRangeRef ( conflictBegin . toArenaOrRef ( arena ) , getMaxWriteKey ( ) ) , true ) ;
}
}
void ReadYourWritesTransaction : : atomicOp ( const KeyRef & key , const ValueRef & operand , uint32_t operationType ) {
bool addWriteConflict = ! options . getAndResetWriteConflictDisabled ( ) ;
if ( checkUsedDuringCommit ( ) ) {
throw used_during_commit ( ) ;
}
2019-03-01 09:45:00 +08:00
if ( key = = metadataVersionKey ) {
if ( operationType ! = MutationRef : : SetVersionstampedValue | | operand ! = metadataVersionRequiredValue ) {
throw client_invalid_operation ( ) ;
}
}
else if ( key > = getMaxWriteKey ( ) ) {
2017-05-26 04:48:44 +08:00
throw key_outside_legal_range ( ) ;
2019-03-01 09:45:00 +08:00
}
2017-05-26 04:48:44 +08:00
2018-02-28 06:02:24 +08:00
if ( ! isValidMutationType ( operationType ) | | ! isAtomicOp ( ( MutationRef : : Type ) operationType ) )
2017-05-26 04:48:44 +08:00
throw invalid_mutation_type ( ) ;
if ( key . size ( ) > ( key . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
throw key_too_large ( ) ;
if ( operand . size ( ) > CLIENT_KNOBS - > VALUE_SIZE_LIMIT )
throw value_too_large ( ) ;
2018-03-22 09:58:19 +08:00
if ( tr . apiVersionAtLeast ( 510 ) ) {
if ( operationType = = MutationRef : : Min )
operationType = MutationRef : : MinV2 ;
else if ( operationType = = MutationRef : : And )
operationType = MutationRef : : AndV2 ;
}
KeyRef k ;
if ( ! tr . apiVersionAtLeast ( 520 ) & & operationType = = MutationRef : : SetVersionstampedKey ) {
k = key . withSuffix ( LiteralStringRef ( " \x00 \x00 " ) , arena ) ;
} else {
k = KeyRef ( arena , key ) ;
}
ValueRef v ;
if ( ! tr . apiVersionAtLeast ( 520 ) & & operationType = = MutationRef : : SetVersionstampedValue ) {
v = operand . withSuffix ( LiteralStringRef ( " \x00 \x00 \x00 \x00 " ) , arena ) ;
} else {
v = ValueRef ( arena , operand ) ;
}
2017-05-26 04:48:44 +08:00
if ( operationType = = MutationRef : : SetVersionstampedKey ) {
2018-05-24 06:32:56 +08:00
KeyRangeRef range = getVersionstampKeyRange ( arena , k , getMaxReadKey ( ) ) ; // this does validation of the key and needs to be performed before the readYourWritesDisabled path
2017-05-26 04:48:44 +08:00
if ( ! options . readYourWritesDisabled ) {
writeRangeToNativeTransaction ( range ) ;
writes . addUnmodifiedAndUnreadableRange ( range ) ;
}
}
2018-03-22 09:58:19 +08:00
if ( operationType = = MutationRef : : SetVersionstampedValue ) {
2018-05-24 06:32:56 +08:00
if ( v . size ( ) < 4 )
2018-02-16 13:35:00 +08:00
throw client_invalid_operation ( ) ;
int32_t pos ;
2018-05-24 06:32:56 +08:00
memcpy ( & pos , v . end ( ) - sizeof ( int32_t ) , sizeof ( int32_t ) ) ;
2018-02-16 13:35:00 +08:00
pos = littleEndian32 ( pos ) ;
2018-05-24 06:32:56 +08:00
if ( pos < 0 | | pos + 10 > v . size ( ) - 4 )
2018-02-16 13:35:00 +08:00
throw client_invalid_operation ( ) ;
}
2017-10-11 04:02:22 +08:00
2017-05-26 04:48:44 +08:00
if ( options . readYourWritesDisabled ) {
2018-03-22 09:58:19 +08:00
return tr . atomicOp ( k , v , ( MutationRef : : Type ) operationType , addWriteConflict ) ;
2017-05-26 04:48:44 +08:00
}
writes . mutate ( k , ( MutationRef : : Type ) operationType , v , addWriteConflict ) ;
2018-05-24 06:32:56 +08:00
RYWImpl : : triggerWatches ( this , k , Optional < ValueRef > ( ) , false ) ;
2017-05-26 04:48:44 +08:00
}
void ReadYourWritesTransaction : : set ( const KeyRef & key , const ValueRef & value ) {
if ( key = = LiteralStringRef ( " \xff \xff /reboot_worker " ) ) {
BinaryReader : : fromStringRef < ClientWorkerInterface > ( value , IncludeVersion ( ) ) . reboot . send ( RebootRequest ( ) ) ;
return ;
}
if ( key = = LiteralStringRef ( " \xff \xff /reboot_and_check_worker " ) ) {
BinaryReader : : fromStringRef < ClientWorkerInterface > ( value , IncludeVersion ( ) ) . reboot . send ( RebootRequest ( false , true ) ) ;
return ;
}
2019-03-01 09:45:00 +08:00
if ( key = = metadataVersionKey ) {
throw client_invalid_operation ( ) ;
}
2017-05-26 04:48:44 +08:00
bool addWriteConflict = ! options . getAndResetWriteConflictDisabled ( ) ;
if ( checkUsedDuringCommit ( ) ) {
throw used_during_commit ( ) ;
}
if ( key > = getMaxWriteKey ( ) )
throw key_outside_legal_range ( ) ;
if ( options . readYourWritesDisabled ) {
return tr . set ( key , value , addWriteConflict ) ;
}
//TODO: check transaction size here
if ( key . size ( ) > ( key . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
throw key_too_large ( ) ;
if ( value . size ( ) > CLIENT_KNOBS - > VALUE_SIZE_LIMIT )
throw value_too_large ( ) ;
KeyRef k = KeyRef ( arena , key ) ;
ValueRef v = ValueRef ( arena , value ) ;
writes . mutate ( k , MutationRef : : SetValue , v , addWriteConflict ) ;
RYWImpl : : triggerWatches ( this , key , value ) ;
}
void ReadYourWritesTransaction : : clear ( const KeyRangeRef & range ) {
bool addWriteConflict = ! options . getAndResetWriteConflictDisabled ( ) ;
if ( checkUsedDuringCommit ( ) ) {
throw used_during_commit ( ) ;
}
KeyRef maxKey = getMaxWriteKey ( ) ;
if ( range . begin > maxKey | | range . end > maxKey )
throw key_outside_legal_range ( ) ;
if ( options . readYourWritesDisabled ) {
return tr . clear ( range , addWriteConflict ) ;
}
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = range . begin ;
KeyRef end = range . end ;
if ( begin . size ( ) > ( begin . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
begin = begin . substr ( 0 , ( begin . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) + 1 ) ;
if ( end . size ( ) > ( end . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
end = end . substr ( 0 , ( end . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) + 1 ) ;
KeyRangeRef r = KeyRangeRef ( begin , end ) ;
if ( r . empty ( ) ) {
return ;
}
r = KeyRangeRef ( arena , r ) ;
writes . clear ( r , addWriteConflict ) ;
RYWImpl : : triggerWatches ( this , r , Optional < ValueRef > ( ) ) ;
}
void ReadYourWritesTransaction : : clear ( const KeyRef & key ) {
bool addWriteConflict = ! options . getAndResetWriteConflictDisabled ( ) ;
if ( checkUsedDuringCommit ( ) ) {
throw used_during_commit ( ) ;
}
if ( key > = getMaxWriteKey ( ) )
throw key_outside_legal_range ( ) ;
if ( key . size ( ) > ( key . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
return ;
if ( options . readYourWritesDisabled ) {
return tr . clear ( key , addWriteConflict ) ;
}
KeyRangeRef r = singleKeyRange ( key , arena ) ;
//SOMEDAY: add an optimized single key clear to write map
writes . clear ( r , addWriteConflict ) ;
RYWImpl : : triggerWatches ( this , r , Optional < ValueRef > ( ) ) ;
}
Future < Void > ReadYourWritesTransaction : : watch ( const Key & key ) {
if ( checkUsedDuringCommit ( ) ) {
return used_during_commit ( ) ;
}
if ( resetPromise . isSet ( ) )
return resetPromise . getFuture ( ) . getError ( ) ;
if ( options . readYourWritesDisabled )
return watches_disabled ( ) ;
2019-03-11 12:19:35 +08:00
if ( key > = allKeys . end | | ( key > = getMaxReadKey ( ) & & key ! = metadataVersionKey & & tr . apiVersionAtLeast ( 300 ) ) )
2017-05-26 04:48:44 +08:00
return key_outside_legal_range ( ) ;
if ( key . size ( ) > ( key . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
return key_too_large ( ) ;
return RYWImpl : : watch ( this , key ) ;
}
void ReadYourWritesTransaction : : addWriteConflictRange ( KeyRangeRef const & keys ) {
if ( checkUsedDuringCommit ( ) ) {
throw used_during_commit ( ) ;
}
if ( tr . apiVersionAtLeast ( 300 ) ) {
if ( keys . begin > getMaxWriteKey ( ) | | keys . end > getMaxWriteKey ( ) ) {
throw key_outside_legal_range ( ) ;
}
}
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = keys . begin ;
KeyRef end = keys . end ;
if ( begin . size ( ) > ( begin . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
begin = begin . substr ( 0 , ( begin . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) + 1 ) ;
if ( end . size ( ) > ( end . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) )
end = end . substr ( 0 , ( end . startsWith ( systemKeys . begin ) ? CLIENT_KNOBS - > SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS - > KEY_SIZE_LIMIT ) + 1 ) ;
KeyRangeRef r = KeyRangeRef ( begin , end ) ;
if ( r . empty ( ) ) {
return ;
}
if ( options . readYourWritesDisabled ) {
tr . addWriteConflictRange ( r ) ;
return ;
}
r = KeyRangeRef ( arena , r ) ;
writes . addConflictRange ( r ) ;
}
Future < Void > ReadYourWritesTransaction : : commit ( ) {
if ( checkUsedDuringCommit ( ) ) {
return used_during_commit ( ) ;
}
if ( resetPromise . isSet ( ) )
return resetPromise . getFuture ( ) . getError ( ) ;
return RYWImpl : : commit ( this ) ;
}
2018-02-01 05:47:36 +08:00
Future < Standalone < StringRef > > ReadYourWritesTransaction : : getVersionstamp ( ) {
if ( checkUsedDuringCommit ( ) ) {
return used_during_commit ( ) ;
}
return waitOrError ( tr . getVersionstamp ( ) , resetPromise . getFuture ( ) ) ;
}
2017-05-26 04:48:44 +08:00
void ReadYourWritesTransaction : : setOption ( FDBTransactionOptions : : Option option , Optional < StringRef > value ) {
switch ( option ) {
case FDBTransactionOptions : : READ_YOUR_WRITES_DISABLE :
validateOptionValue ( value , false ) ;
if ( ! reading . isReady ( ) | | ! cache . empty ( ) | | ! writes . empty ( ) )
throw client_invalid_operation ( ) ;
options . readYourWritesDisabled = true ;
break ;
case FDBTransactionOptions : : READ_AHEAD_DISABLE :
validateOptionValue ( value , false ) ;
options . readAheadDisabled = true ;
break ;
case FDBTransactionOptions : : NEXT_WRITE_NO_WRITE_CONFLICT_RANGE :
validateOptionValue ( value , false ) ;
options . nextWriteDisableConflictRange = true ;
break ;
case FDBTransactionOptions : : ACCESS_SYSTEM_KEYS :
validateOptionValue ( value , false ) ;
options . readSystemKeys = true ;
options . writeSystemKeys = true ;
break ;
case FDBTransactionOptions : : READ_SYSTEM_KEYS :
validateOptionValue ( value , false ) ;
options . readSystemKeys = true ;
break ;
case FDBTransactionOptions : : TIMEOUT :
options . timeoutInSeconds = extractIntOption ( value , 0 , std : : numeric_limits < int > : : max ( ) ) / 1000.0 ;
2019-03-22 02:45:43 +08:00
resetTimeout ( ) ;
2017-05-26 04:48:44 +08:00
break ;
case FDBTransactionOptions : : RETRY_LIMIT :
options . maxRetries = ( int ) extractIntOption ( value , - 1 , std : : numeric_limits < int > : : max ( ) ) ;
break ;
case FDBTransactionOptions : : DEBUG_RETRY_LOGGING :
options . debugRetryLogging = true ;
if ( ! transactionDebugInfo ) {
transactionDebugInfo = Reference < TransactionDebugInfo > : : addRef ( new TransactionDebugInfo ( ) ) ;
transactionDebugInfo - > lastRetryLogTime = creationTime ;
}
transactionDebugInfo - > transactionName = value . present ( ) ? value . get ( ) . toString ( ) : " " ;
break ;
case FDBTransactionOptions : : SNAPSHOT_RYW_ENABLE :
validateOptionValue ( value , false ) ;
options . snapshotRywEnabled + + ;
break ;
case FDBTransactionOptions : : SNAPSHOT_RYW_DISABLE :
validateOptionValue ( value , false ) ;
options . snapshotRywEnabled - - ;
break ;
case FDBTransactionOptions : : USED_DURING_COMMIT_PROTECTION_DISABLE :
validateOptionValue ( value , false ) ;
options . disableUsedDuringCommitProtection = true ;
break ;
default :
break ;
}
tr . setOption ( option , value ) ;
}
2019-01-26 08:49:59 +08:00
void ReadYourWritesTransaction : : operator = ( ReadYourWritesTransaction & & r ) BOOST_NOEXCEPT {
2017-05-26 04:48:44 +08:00
cache = std : : move ( r . cache ) ;
writes = std : : move ( r . writes ) ;
arena = std : : move ( r . arena ) ;
tr = std : : move ( r . tr ) ;
readConflicts = std : : move ( r . readConflicts ) ;
watchMap = std : : move ( r . watchMap ) ;
reading = std : : move ( r . reading ) ;
resetPromise = std : : move ( r . resetPromise ) ;
r . resetPromise = Promise < Void > ( ) ;
2018-09-22 06:58:14 +08:00
deferredError = std : : move ( r . deferredError ) ;
2017-05-26 04:48:44 +08:00
retries = r . retries ;
timeoutActor = r . timeoutActor ;
creationTime = r . creationTime ;
commitStarted = r . commitStarted ;
options = r . options ;
transactionDebugInfo = r . transactionDebugInfo ;
cache . arena = & arena ;
writes . arena = & arena ;
}
2019-01-26 08:49:59 +08:00
ReadYourWritesTransaction : : ReadYourWritesTransaction ( ReadYourWritesTransaction & & r ) BOOST_NOEXCEPT :
2017-05-26 04:48:44 +08:00
cache ( std : : move ( r . cache ) ) ,
writes ( std : : move ( r . writes ) ) ,
arena ( std : : move ( r . arena ) ) ,
reading ( std : : move ( r . reading ) ) ,
retries ( r . retries ) ,
creationTime ( r . creationTime ) ,
2018-09-22 06:58:14 +08:00
deferredError ( std : : move ( r . deferredError ) ) ,
2017-05-26 04:48:44 +08:00
timeoutActor ( std : : move ( r . timeoutActor ) ) ,
resetPromise ( std : : move ( r . resetPromise ) ) ,
commitStarted ( r . commitStarted ) ,
options ( r . options ) ,
transactionDebugInfo ( r . transactionDebugInfo )
{
cache . arena = & arena ;
writes . arena = & arena ;
tr = std : : move ( r . tr ) ;
readConflicts = std : : move ( r . readConflicts ) ;
watchMap = std : : move ( r . watchMap ) ;
r . resetPromise = Promise < Void > ( ) ;
}
Future < Void > ReadYourWritesTransaction : : onError ( Error const & e ) {
return RYWImpl : : onError ( this , e ) ;
}
void ReadYourWritesTransaction : : resetRyow ( ) {
Promise < Void > oldReset = resetPromise ;
resetPromise = Promise < Void > ( ) ;
timeoutActor . cancel ( ) ;
arena = Arena ( ) ;
cache = SnapshotCache ( & arena ) ;
writes = WriteMap ( & arena ) ;
readConflicts = CoalescedKeyRefRangeMap < bool > ( ) ;
watchMap . clear ( ) ;
reading = AndFuture ( ) ;
commitStarted = false ;
2018-09-22 06:58:14 +08:00
deferredError = Error ( ) ;
2017-05-26 04:48:44 +08:00
if ( tr . apiVersionAtLeast ( 16 ) ) {
options . reset ( tr ) ;
2019-03-22 02:45:43 +08:00
resetTimeout ( ) ;
2017-05-26 04:48:44 +08:00
}
if ( ! oldReset . isSet ( ) )
oldReset . sendError ( transaction_cancelled ( ) ) ;
}
void ReadYourWritesTransaction : : cancel ( ) {
if ( ! resetPromise . isSet ( ) )
resetPromise . sendError ( transaction_cancelled ( ) ) ;
}
void ReadYourWritesTransaction : : reset ( ) {
retries = 0 ;
creationTime = now ( ) ;
2019-03-22 04:47:12 +08:00
timeoutActor . cancel ( ) ;
2019-03-22 06:48:40 +08:00
options . fullReset ( tr ) ;
2017-05-26 04:48:44 +08:00
transactionDebugInfo . clear ( ) ;
tr . fullReset ( ) ;
resetRyow ( ) ;
}
KeyRef ReadYourWritesTransaction : : getMaxReadKey ( ) {
if ( options . readSystemKeys )
return systemKeys . end ;
else
return normalKeys . end ;
}
KeyRef ReadYourWritesTransaction : : getMaxWriteKey ( ) {
if ( options . writeSystemKeys )
return systemKeys . end ;
else
return normalKeys . end ;
}
ReadYourWritesTransaction : : ~ ReadYourWritesTransaction ( ) {
if ( ! resetPromise . isSet ( ) )
resetPromise . sendError ( transaction_cancelled ( ) ) ;
}
bool ReadYourWritesTransaction : : checkUsedDuringCommit ( ) {
if ( commitStarted & & ! resetPromise . isSet ( ) & & ! options . disableUsedDuringCommitProtection ) {
resetPromise . sendError ( used_during_commit ( ) ) ;
}
return commitStarted ;
}
void ReadYourWritesTransaction : : debugLogRetries ( Optional < Error > error ) {
bool committed = ! error . present ( ) ;
if ( options . debugRetryLogging ) {
double timeSinceLastLog = now ( ) - transactionDebugInfo - > lastRetryLogTime ;
double elapsed = now ( ) - creationTime ;
if ( timeSinceLastLog > = 1 | | ( committed & & elapsed > 1 ) ) {
std : : string transactionNameStr = " " ;
if ( ! transactionDebugInfo - > transactionName . empty ( ) )
transactionNameStr = format ( " in transaction '%s' " , printable ( StringRef ( transactionDebugInfo - > transactionName ) ) . c_str ( ) ) ;
if ( ! g_network - > isSimulated ( ) ) //Fuzz workload turns this on, but we do not want stderr output in simulation
fprintf ( stderr , " fdb WARNING: long transaction (%.2fs elapsed%s, %d retries, %s) \n " , elapsed , transactionNameStr . c_str ( ) , retries , committed ? " committed " : error . get ( ) . what ( ) ) ;
{
TraceEvent trace = TraceEvent ( " LongTransaction " ) ;
2018-08-02 05:30:57 +08:00
if ( error . present ( ) )
trace . error ( error . get ( ) , true ) ;
2017-05-26 04:48:44 +08:00
if ( ! transactionDebugInfo - > transactionName . empty ( ) )
2019-03-19 06:03:43 +08:00
trace . detail ( " TransactionName " , StringRef ( transactionDebugInfo - > transactionName ) ) ;
2017-05-26 04:48:44 +08:00
trace . detail ( " Elapsed " , elapsed ) . detail ( " Retries " , retries ) . detail ( " Committed " , committed ) ;
}
transactionDebugInfo - > lastRetryLogTime = now ( ) ;
}
}
2018-02-28 06:02:24 +08:00
}