From 986a99a39aa3533139fe972a05c7668895c161b8 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 20 Oct 2017 09:03:48 -0700 Subject: [PATCH 01/10] Change the reply priority for NativeAPI requests to the cluster to TaskDefaultPromiseEndpoint. --- fdbclient/NativeAPI.actor.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 08e394c50e..f160ab9813 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1111,7 +1111,7 @@ ACTOR Future< pair> > getKeyLocation( Database loop { choose { when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {} - when ( vector>> keyServersShards = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, ReplyPromise>>>(), info.taskID ) ) ) { + when ( vector>> keyServersShards = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, ReplyPromise>>>(), TaskDefaultPromiseEndpoint ) ) ) { if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After"); ASSERT( keyServersShards.size() ); // There should always be storage servers, except on version 0 which should not get to this function @@ -1401,7 +1401,7 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T try { if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", printable(k.getKey())).detail("offset",k.offset).detail("orEqual",k.orEqual); - GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), info.taskID, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); + GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",printable(reply.sel.key)).detail("offset", reply.sel.offset).detail("orEqual", k.orEqual); k = reply.sel; @@ -1469,7 +1469,7 @@ ACTOR Future< Void > watchValue( Future version, Key key, OptionalgetCurrentTask()); } - state Version resp = wait( loadBalance( ssi.second, &StorageServerInterface::watchValue, WatchValueRequest(key, value, ver, watchValueID), info.taskID ) ); + state Version resp = wait( loadBalance( ssi.second, &StorageServerInterface::watchValue, WatchValueRequest(key, value, ver, watchValueID), TaskDefaultPromiseEndpoint ) ); if( info.debugID.present() ) { g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask()); } @@ -1561,7 +1561,7 @@ ACTOR Future> getExactRange( Database cx, Version ver .detail("Reverse", reverse) .detail("Servers", locations[shard].second->description());*/ } - GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, info.taskID, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); + GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After"); output.arena().dependsOn( rep.arena ); @@ -1779,8 +1779,8 @@ ACTOR Future> getRange( Database cx, Future .detail("ModifiedSelectors", modifiedSelectors) .detail("Servers", beginServer.second->description());*/ } - GetKeyValuesReply _rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, info.taskID, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); - GetKeyValuesReply rep = _rep; + + GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); if( info.debugID.present() ) { g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size()); @@ -2491,7 +2491,7 @@ ACTOR static Future tryCommit( Database cx, Reference } req->debugID = commitID; - state Future reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, *req, info.taskID, true ); + state Future reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, *req, TaskDefaultPromiseEndpoint, true ); choose { when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) { From 7bab9a0276fa88f89bc0d2b7865403916765ee22 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 20 Oct 2017 09:11:26 -0700 Subject: [PATCH 02/10] Eliminate some copies --- fdbclient/NativeAPI.actor.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index f160ab9813..067d1f8fd8 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1724,7 +1724,7 @@ ACTOR Future> getRange( Database cx, Future if( begin.getKey() == allKeys.begin && begin.offset < 1 ) { output.readToBegin = true; - begin = firstGreaterOrEqual( begin.getKey() ); + begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena()); } validateVersion(version); @@ -2071,8 +2071,8 @@ ACTOR Future< Standalone > getRangeAndConflictRange( KeySelector begin, KeySelector end, GetRangeLimits limits, Promise conflictRange, bool reverse, TransactionInfo info) { - state Key beginKey = ( begin.orEqual ? keyAfter(begin.getKey()) : begin.getKey() ); - state Key endKey = ( end.orEqual ? keyAfter(end.getKey()) : end.getKey() ); + state Key beginKey = ( begin.orEqual ? keyAfter(begin.getKey()) : Key(begin.getKey(), begin.arena()) ); + state Key endKey = ( end.orEqual ? keyAfter(end.getKey()) : Key(end.getKey(), end.arena()) ); //This optimization prevents NULL operations from being added to the conflict range if( begin.offset >= end.offset && beginKey >= endKey ) { @@ -2103,11 +2103,13 @@ ACTOR Future< Standalone > getRangeAndConflictRange( if ( rep.size() ) { if( reverse ) { - rangeBegin = min( rangeBegin, Key(rep.end()[-1].key) ); - if( end.offset > 0 ) rangeEnd = max( rangeEnd, keyAfter( rep[0].key ) ); + rangeBegin = min( rangeBegin, Key(rep.end()[-1].key, rep.arena()) ); + if( end.offset > 0 && rep[0].key >= rangeEnd ) rangeEnd = keyAfter( rep[0].key ); } else { - if( begin.offset <= 0 ) rangeBegin = min( rangeBegin, Key(rep[0].key) ); - rangeEnd = max( rangeEnd, keyAfter( rep.end()[-1].key ) ); + if( begin.offset <= 0 ) rangeBegin = min( rangeBegin, Key(rep[0].key, rep.arena()) ); + if(rep.end()[-1].key >= rangeEnd) { + rangeEnd = keyAfter( rep.end()[-1].key ); + } } } From 55bea14b9eafd6f9c578cd312bf680ab3ab99104 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 20 Oct 2017 09:17:47 -0700 Subject: [PATCH 03/10] Convert extraConflictRanges from KeyRange to std::pair to avoid copies. --- fdbclient/NativeAPI.actor.cpp | 24 ++++++++++++------------ fdbclient/NativeAPI.h | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 067d1f8fd8..01e4b15d3a 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2042,17 +2042,17 @@ Future< Standalone< VectorRef< const char*>>> Transaction::getAddressesForKey( c } ACTOR Future< Key > getKeyAndConflictRange( - Database cx, KeySelector k, Future version, Promise conflictRange, TransactionInfo info) + Database cx, KeySelector k, Future version, Promise> conflictRange, TransactionInfo info) { try { Key rep = wait( getKey(cx, k, version, info) ); if( k.offset <= 0 ) - conflictRange.send( KeyRangeRef( rep, k.orEqual ? keyAfter( k.getKey() ) : k.getKey() ) ); + conflictRange.send( std::make_pair( rep, k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()) ) ); else - conflictRange.send( KeyRangeRef( k.orEqual ? keyAfter( k.getKey() ) : k.getKey(), keyAfter( rep ) ) ); + conflictRange.send( std::make_pair( k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()), keyAfter( rep ) ) ); return std::move(rep); } catch( Error&e ) { - conflictRange.send(KeyRangeRef()); + conflictRange.send(std::make_pair(Key(), Key())); throw; } } @@ -2061,7 +2061,7 @@ Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) { if( snapshot ) return ::getKey(cx, key, getReadVersion(), info); - Promise conflictRange; + Promise> conflictRange; extraConflictRanges.push_back( conflictRange.getFuture() ); return getKeyAndConflictRange( cx, key, getReadVersion(), conflictRange, info ); } @@ -2069,7 +2069,7 @@ Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) { ACTOR Future< Standalone > getRangeAndConflictRange( Database cx, Reference trLogInfo, Future version, KeySelector begin, KeySelector end, GetRangeLimits limits, - Promise conflictRange, bool reverse, TransactionInfo info) + Promise> conflictRange, bool reverse, TransactionInfo info) { state Key beginKey = ( begin.orEqual ? keyAfter(begin.getKey()) : Key(begin.getKey(), begin.arena()) ); state Key endKey = ( end.orEqual ? keyAfter(end.getKey()) : Key(end.getKey(), end.arena()) ); @@ -2077,7 +2077,7 @@ ACTOR Future< Standalone > getRangeAndConflictRange( //This optimization prevents NULL operations from being added to the conflict range if( begin.offset >= end.offset && beginKey >= endKey ) { TEST(true); //get range no result possible - conflictRange.send(KeyRangeRef()); + conflictRange.send(std::make_pair(Key(), Key())); return Standalone(); } @@ -2113,11 +2113,11 @@ ACTOR Future< Standalone > getRangeAndConflictRange( } } - conflictRange.send( KeyRangeRef( rangeBegin, rangeEnd ) ); + conflictRange.send( std::make_pair( rangeBegin, rangeEnd ) ); return std::move(rep); } catch( Error &e ) { - conflictRange.send(KeyRangeRef()); + conflictRange.send(std::make_pair(Key(), Key())); throw; } } @@ -2138,7 +2138,7 @@ Future< Standalone > Transaction::getRange( if( snapshot ) return getRangeWrapper(cx, trLogInfo, getReadVersion(), begin, end, limits, reverse, info ); - Promise conflictRange; + Promise> conflictRange; extraConflictRanges.push_back( conflictRange.getFuture() ); return getRangeAndConflictRange( cx, trLogInfo, getReadVersion(), begin, end, limits, conflictRange, reverse, info ); } @@ -2606,8 +2606,8 @@ Future Transaction::commitMutations() { bool isCheckingWrites = options.checkWritesEnabled && g_random->random01() < 0.01; for(int i=0; i readVersion; - vector> extraConflictRanges; + vector>> extraConflictRanges; Promise commitResult; Future committing; }; From 5fcd58b6374fd27f4c7728159c39719c2839e095 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 20 Oct 2017 09:24:49 -0700 Subject: [PATCH 04/10] Whitespace fix --- fdbclient/NativeAPI.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 01e4b15d3a..786544af82 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1869,8 +1869,8 @@ ACTOR Future> getRange( Database cx, Future cx->invalidateCache( beginServer.second ); if (e.code() == error_code_wrong_shard_server) { - Standalone result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) ); - return result; + Standalone result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) ); + return result; } Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); From a2e059be11755817a4eea342e3c25ca89259e3b7 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 20 Oct 2017 09:27:17 -0700 Subject: [PATCH 05/10] Eliminate some more copies --- fdbclient/NativeAPI.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 786544af82..f8734c648b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1397,7 +1397,7 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T return Key(); } - state pair> ssi = wait( getKeyLocation(cx, k.getKey(), info, k.isBackward()) ); + state pair> ssi = wait( getKeyLocation(cx, Key(k.getKey(), k.arena()), info, k.isBackward()) ); try { if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", printable(k.getKey())).detail("offset",k.offset).detail("orEqual",k.orEqual); @@ -1736,7 +1736,7 @@ ACTOR Future> getRange( Database cx, Future return output; } - state pair> beginServer = wait( getKeyLocation( cx, reverse ? end.getKey() : begin.getKey(), info, reverse ? (end-1).isBackward() : begin.isBackward() ) ); + state pair> beginServer = wait( getKeyLocation( cx, reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()), info, reverse ? (end-1).isBackward() : begin.isBackward() ) ); state KeyRange shard = beginServer.first; state bool modifiedSelectors = false; state GetKeyValuesRequest req; From 39a43aeb9561ca22a431c3bc19a735bc722dcccc Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 20 Oct 2017 09:46:35 -0700 Subject: [PATCH 06/10] Eliminate another copy --- fdbclient/ReadYourWrites.actor.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 9e55c79ce3..3f5d086397 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -146,7 +146,7 @@ public: if(key > ryw->getMaxReadKey()) read.end = firstGreaterOrEqual(ryw->getMaxReadKey()); else - read.end = firstGreaterOrEqual(key); + read.end = KeySelector(firstGreaterOrEqual(key), key.arena()); } Standalone v = wait( ryw->tr.getRange(read.begin, read.end, read.limits, snapshot, Reverse) ); @@ -161,6 +161,7 @@ public: } return v; + //return ryw->tr.getRange(read.begin, read.end, read.limits, snapshot, Reverse); } // addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant conflict range From 0167c1e7e8ebc63b6cd9d522dc35e047e81ce95a Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 20 Oct 2017 09:47:10 -0700 Subject: [PATCH 07/10] Remove unintentionally committed line --- fdbclient/ReadYourWrites.actor.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 3f5d086397..8942ae3e62 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -161,7 +161,6 @@ public: } return v; - //return ryw->tr.getRange(read.begin, read.end, read.limits, snapshot, Reverse); } // addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant conflict range From e52f46064afe5f6c79a3b86568f12bdbf6c56f4f Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 26 Oct 2017 09:35:34 -0700 Subject: [PATCH 08/10] Don't use the results arena for conflict ranges. Rearrange the code to avoid the copy unless necessary. --- fdbclient/NativeAPI.actor.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index b0905b72f2..073ad587e9 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2103,11 +2103,17 @@ ACTOR Future< Standalone > getRangeAndConflictRange( if ( rep.size() ) { if( reverse ) { - rangeBegin = min( rangeBegin, Key(rep.end()[-1].key, rep.arena()) ); - if( end.offset > 0 && rep[0].key >= rangeEnd ) rangeEnd = keyAfter( rep[0].key ); + if( rep.end()[-1].key < rangeBegin ) { + rangeBegin = rep.end()[-1].key; + } + if( end.offset > 0 && rep[0].key >= rangeEnd ) { + rangeEnd = keyAfter( rep[0].key ); + } } else { - if( begin.offset <= 0 ) rangeBegin = min( rangeBegin, Key(rep[0].key, rep.arena()) ); - if(rep.end()[-1].key >= rangeEnd) { + if( begin.offset <= 0 && rep[0].key < rangeBegin ) { + rangeBegin = rep[0].key; + } + if( rep.end()[-1].key >= rangeEnd ) { rangeEnd = keyAfter( rep.end()[-1].key ); } } From 2d5a3a07e4e7749a4457d648c21994375c1f93e6 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 2 Nov 2017 10:51:30 -0700 Subject: [PATCH 09/10] Avoid copies and comparisons in RYW get range --- fdbclient/ReadYourWrites.actor.cpp | 64 ++++++++++++++++-------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 1d325ae4fb..290c8f6615 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -346,9 +346,6 @@ public: if(keyNeedsCopy) { key.setKey(keykey.toArena(key.arena())); } - else { - key.setKey(keykey.toArenaOrRef(key.arena())); - } } static KeyRangeRef getKnownKeyRange( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) { @@ -360,10 +357,12 @@ public: if( data.size() ) { beginKey = std::min( beginKey, data[0].key ); - - if( data.readThrough.present() ) + if( data.readThrough.present() ) { endKey = std::max( endKey, data.readThrough.get() ); - endKey = !data.more && data.end()[-1].key < endKey ? endKey : ExtStringRef( data.end()[-1].key, 1 ); + } + else { + endKey = !data.more && data.end()[-1].key < endKey ? endKey : ExtStringRef( data.end()[-1].key, 1 ); + } } if (beginKey >= endKey) return KeyRangeRef(); @@ -461,7 +460,7 @@ public: resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset ); resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset ); - if( begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) { + if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) { return RangeResultRef(false, false); } else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() ) @@ -501,16 +500,16 @@ public: .detail("unknown", it.is_unknown_range()) .detail("requests", requestCount);*/ - if( !result.size() && begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) { + if( !result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) { return RangeResultRef(false, false); } - if( end.getKey() == allKeys.begin && end.offset <= 1 ) { + if( end.offset <= 1 && end.getKey() == allKeys.begin ) { return RangeResultRef(readToBegin, readThroughEnd); } - if( ( begin.getKey() >= end.getKey() && begin.offset >= end.offset ) || - ( begin.getKey() >= ryw->getMaxReadKey() && begin.offset >= 1) ) { + if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) || + ( begin.offset >= 1 && begin.getKey() >= ryw->getMaxReadKey() ) ) { if( end.isFirstGreaterOrEqual() ) break; if( !result.size() ) break; Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop @@ -522,7 +521,7 @@ public: break; } - if( it.beginKey() > itEnd.beginKey() && !it.is_unreadable() && !it.is_unknown_range() ) { + if( !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() > itEnd.beginKey() ) { if( end.isFirstGreaterOrEqual() ) break; return RangeResultRef(readToBegin, readThroughEnd); } @@ -545,10 +544,11 @@ public: state KeySelector read_end; if ( ucEnd!=itEnd ) { - read_end = firstGreaterOrEqual(ucEnd.endKey().toStandaloneStringRef()); + Key k = ucEnd.endKey().toStandaloneStringRef(); + read_end = KeySelector(firstGreaterOrEqual(k), k.arena()); if( end.offset < 1 ) additionalRows += 1 - end.offset; // extra for items past end } else if( end.offset < 1 ) { - read_end = firstGreaterOrEqual( end.getKey() ); + read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena()); additionalRows += 1 - end.offset; } else { read_end = end; @@ -562,10 +562,11 @@ public: state KeySelector read_begin; if (begin.isFirstGreaterOrEqual()) { - begin = firstGreaterOrEqual( it.beginKey() > begin.getKey() ? it.beginKey().toStandaloneStringRef() : begin.getKey() ); + Key k = it.beginKey() > begin.getKey() ? it.beginKey().toStandaloneStringRef() : Key(begin.getKey(), begin.arena()); + begin = KeySelector(firstGreaterOrEqual(k), k.arena()); read_begin = begin; } else if( begin.offset > 1 ) { - read_begin = firstGreaterOrEqual(begin.getKey()); + read_begin = KeySelector(firstGreaterOrEqual(begin.getKey()), begin.arena()); additionalRows += begin.offset - 1; } else { read_begin = begin; @@ -645,10 +646,13 @@ public: if (data.readThroughEnd) endKey = allKeys.end; if( data.size() ) { - beginKey = !data.more && data.end()[-1].key > beginKey ? beginKey : data.end()[-1].key; - - if( data.readThrough.present() ) + if( data.readThrough.present() ) { beginKey = std::min( data.readThrough.get(), beginKey ); + } + else { + beginKey = !data.more && data.end()[-1].key > beginKey ? beginKey : data.end()[-1].key; + } + endKey = data[0].key < endKey ? endKey : ExtStringRef( data[0].key, 1 ); } if (beginKey >= endKey) return KeyRangeRef(); @@ -725,7 +729,7 @@ public: resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset ); resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset ); - if( ( begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) ) { + if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) { return RangeResultRef(false, false); } else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() ) @@ -766,16 +770,16 @@ public: .detail("kv", it.is_kv()) .detail("requests", requestCount);*/ - if(!result.size() && begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset) { + if(!result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) { return RangeResultRef(false, false); } - if( begin.getKey() >= ryw->getMaxReadKey() && !begin.isBackward() ) { + if( !begin.isBackward() && begin.getKey() >= ryw->getMaxReadKey() ) { return RangeResultRef(readToBegin, readThroughEnd); } - if( ( begin.getKey() >= end.getKey() && begin.offset >= end.offset ) || - ( end.getKey() == allKeys.begin && end.offset <= 1 ) ) { + if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) || + ( end.offset <= 1 && end.getKey() == allKeys.begin ) ) { if( begin.isFirstGreaterOrEqual() ) break; if( !result.size() ) break; Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop @@ -787,7 +791,7 @@ public: break; } - if (it.beginKey() < itEnd.beginKey() && !it.is_unreadable() && !it.is_unknown_range() && itemsPastBegin >= begin.offset - 1) { + if (itemsPastBegin >= begin.offset - 1 && !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() < itEnd.beginKey()) { if( begin.isFirstGreaterOrEqual() ) break; return RangeResultRef(readToBegin, readThroughEnd); } @@ -813,10 +817,11 @@ public: state KeySelector read_begin; if ( ucEnd!=itEnd ) { - read_begin = firstGreaterOrEqual(ucEnd.beginKey().toStandaloneStringRef()); + Key k = ucEnd.beginKey().toStandaloneStringRef(); + read_begin = KeySelector(firstGreaterOrEqual(k), k.arena()); if( begin.offset > 1 ) additionalRows += begin.offset - 1; // extra for items past end } else if( begin.offset > 1 ) { - read_begin = firstGreaterOrEqual( begin.getKey() ); + read_begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena()); additionalRows += begin.offset - 1; } else { read_begin = begin; @@ -830,10 +835,11 @@ public: state KeySelector read_end; if (end.isFirstGreaterOrEqual()) { - end = firstGreaterOrEqual( it.endKey() < end.getKey() ? it.endKey().toStandaloneStringRef() : end.getKey() ); + Key k = it.endKey() < end.getKey() ? it.endKey().toStandaloneStringRef() : end.getKey(); + end = KeySelector(firstGreaterOrEqual(k), k.arena()); read_end = end; } else if (end.offset < 1) { - read_end = firstGreaterOrEqual(end.getKey()); + read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena()); additionalRows += 1 - end.offset; } else { read_end = end; From 3cad2676cc07eec5c8f3aa8e64d206c58ebd3dd1 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 2 Nov 2017 13:39:06 -0700 Subject: [PATCH 10/10] Collapse some of the getRange actors into a single actor. Avoid unnecessary comparisons. --- fdbclient/NativeAPI.actor.cpp | 459 ++++++++++++++++++---------------- 1 file changed, 238 insertions(+), 221 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 453d26262a..1758751fe4 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1726,195 +1726,255 @@ ACTOR Future> getRangeFallback( Database cx, Version return r; } -ACTOR Future> getRange( Database cx, Future fVersion, - KeySelector begin, KeySelector end, GetRangeLimits limits, bool reverse, TransactionInfo info ) +void getRangeFinished(Reference trLogInfo, double startTime, KeySelector begin, KeySelector end, bool snapshot, + Promise> conflictRange, bool reverse, Standalone result) +{ + if( trLogInfo ) { + int rangeSize = 0; + for (const KeyValueRef &kv : result.contents()) + rangeSize += kv.key.size() + kv.value.size(); + trLogInfo->addLog(FdbClientLogEvents::EventGetRange(startTime, now()-startTime, rangeSize, begin.getKey(), end.getKey())); + } + + if( !snapshot ) { + Key rangeBegin; + Key rangeEnd; + + if(result.readToBegin) { + rangeBegin = allKeys.begin; + } + else if(((!reverse || !result.more || begin.offset > 1) && begin.offset > 0) || result.size() == 0) { + rangeBegin = Key(begin.getKey(), begin.arena()); + } + else { + rangeBegin = reverse ? result.end()[-1].key : result[0].key; + } + + if(end.offset > begin.offset && end.getKey() < rangeBegin) { + rangeBegin = Key(end.getKey(), end.arena()); + } + + if(result.readThroughEnd) { + rangeEnd = allKeys.end; + } + else if(((reverse || !result.more || end.offset <= 0) && end.offset <= 1) || result.size() == 0) { + rangeEnd = Key(end.getKey(), end.arena()); + } + else { + rangeEnd = keyAfter(reverse ? result[0].key : result.end()[-1].key); + } + + if(begin.offset < end.offset && begin.getKey() > rangeEnd) { + rangeEnd = Key(begin.getKey(), begin.arena()); + } + + conflictRange.send(std::make_pair(rangeBegin, rangeEnd)); + } +} + +ACTOR Future> getRange( Database cx, Reference trLogInfo, Future fVersion, + KeySelector begin, KeySelector end, GetRangeLimits limits, Promise> conflictRange, bool snapshot, bool reverse, + TransactionInfo info ) { state GetRangeLimits originalLimits( limits ); state KeySelector originalBegin = begin; state KeySelector originalEnd = end; state Standalone output; - state Version version = wait( fVersion ); - state Version readVersion = version; // Needed for latestVersion requests; if more, make future requests at the version that the first one completed - // FIXME: Is this really right? Weaken this and see if there is a problem; if so maybe there is a much subtler problem even with this. + try { + state Version version = wait( fVersion ); + validateVersion(version); - if( begin.getKey() == allKeys.begin && begin.offset < 1 ) { - output.readToBegin = true; - begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena()); - } + state double startTime = now(); + state Version readVersion = version; // Needed for latestVersion requests; if more, make future requests at the version that the first one completed + // FIXME: Is this really right? Weaken this and see if there is a problem; if so maybe there is a much subtler problem even with this. - validateVersion(version); - ASSERT( !limits.isReached() ); - ASSERT( (!limits.hasRowLimit() || limits.rows >= limits.minRows) && limits.minRows >= 0 ); - - loop { - if( end.getKey() == allKeys.begin && (end.offset < 1 || end.isFirstGreaterOrEqual()) ) { - return output; + if( begin.getKey() == allKeys.begin && begin.offset < 1 ) { + output.readToBegin = true; + begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena()); } - state pair> beginServer = wait( getKeyLocation( cx, reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()), info, reverse ? (end-1).isBackward() : begin.isBackward() ) ); - state KeyRange shard = beginServer.first; - state bool modifiedSelectors = false; - state GetKeyValuesRequest req; + ASSERT( !limits.isReached() ); + ASSERT( (!limits.hasRowLimit() || limits.rows >= limits.minRows) && limits.minRows >= 0 ); - req.version = readVersion; - - if( reverse && (begin-1).isDefinitelyLess(shard.begin) && - ( !begin.isFirstGreaterOrEqual() || begin.getKey() != shard.begin ) ) { //In this case we would be setting modifiedSelectors to true, but not modifying anything - - req.begin = firstGreaterOrEqual( shard.begin ); - modifiedSelectors = true; - } - else req.begin = begin; - - if( !reverse && end.isDefinitelyGreater(shard.end) ) { - req.end = firstGreaterOrEqual( shard.end ); - modifiedSelectors = true; - } - else req.end = end; - - transformRangeLimits(limits, reverse, req); - ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); - - req.debugID = info.debugID; - try { - if( info.debugID.present() ) { - g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before"); - /*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get()) - .detail("ReqBeginKey", printable(req.begin.getKey())) - .detail("ReqEndKey", printable(req.end.getKey())) - .detail("originalBegin", originalBegin.toString()) - .detail("originalEnd", originalEnd.toString()) - .detail("Begin", begin.toString()) - .detail("End", end.toString()) - .detail("shard", printable(shard)) - .detail("ReqLimit", req.limit) - .detail("ReqLimitBytes", req.limitBytes) - .detail("ReqVersion", req.version) - .detail("Reverse", reverse) - .detail("ModifiedSelectors", modifiedSelectors) - .detail("Servers", beginServer.second->description());*/ + loop { + if( end.getKey() == allKeys.begin && (end.offset < 1 || end.isFirstGreaterOrEqual()) ) { + getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + return output; } - GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); + state pair> beginServer = wait( getKeyLocation( cx, reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()), info, reverse ? (end-1).isBackward() : begin.isBackward() ) ); + state KeyRange shard = beginServer.first; + state bool modifiedSelectors = false; + state GetKeyValuesRequest req; - if( info.debugID.present() ) { - g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size()); - /*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get()) - .detail("ReqBeginKey", printable(req.begin.getKey())) - .detail("ReqEndKey", printable(req.end.getKey())) - .detail("RepIsMore", rep.more) - .detail("VersionReturned", rep.version) - .detail("RowsReturned", rep.data.size());*/ + req.version = readVersion; + + if( reverse && (begin-1).isDefinitelyLess(shard.begin) && + ( !begin.isFirstGreaterOrEqual() || begin.getKey() != shard.begin ) ) { //In this case we would be setting modifiedSelectors to true, but not modifying anything + + req.begin = firstGreaterOrEqual( shard.begin ); + modifiedSelectors = true; } + else req.begin = begin; - ASSERT( !rep.more || rep.data.size() ); - ASSERT( !limits.hasRowLimit() || rep.data.size() <= limits.rows ); - - limits.decrement( rep.data ); - - if(reverse && begin.isLastLessOrEqual() && rep.data.size() && rep.data.end()[-1].key == begin.getKey()) { - modifiedSelectors = false; + if( !reverse && end.isDefinitelyGreater(shard.end) ) { + req.end = firstGreaterOrEqual( shard.end ); + modifiedSelectors = true; } + else req.end = end; - bool finished = limits.isReached() || ( !modifiedSelectors && !rep.more ) || limits.hasSatisfiedMinRows(); - bool readThrough = modifiedSelectors && !rep.more; + transformRangeLimits(limits, reverse, req); + ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); - // optimization: first request got all data--just return it - if( finished && !output.size() ) { - output = Standalone( RangeResultRef( rep.data, modifiedSelectors || limits.isReached() || rep.more ), rep.arena ); + req.debugID = info.debugID; + try { + if( info.debugID.present() ) { + g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before"); + /*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get()) + .detail("ReqBeginKey", printable(req.begin.getKey())) + .detail("ReqEndKey", printable(req.end.getKey())) + .detail("originalBegin", originalBegin.toString()) + .detail("originalEnd", originalEnd.toString()) + .detail("Begin", begin.toString()) + .detail("End", end.toString()) + .detail("shard", printable(shard)) + .detail("ReqLimit", req.limit) + .detail("ReqLimitBytes", req.limitBytes) + .detail("ReqVersion", req.version) + .detail("Reverse", reverse) + .detail("ModifiedSelectors", modifiedSelectors) + .detail("Servers", beginServer.second->description());*/ + } - if( BUGGIFY && limits.hasByteLimit() && output.size() > std::max(1, originalLimits.minRows) ) { - output.more = true; - output.resize(output.arena(), g_random->randomInt(std::max(1,originalLimits.minRows),output.size())); + GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) ); + + if( info.debugID.present() ) { + g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size()); + /*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get()) + .detail("ReqBeginKey", printable(req.begin.getKey())) + .detail("ReqEndKey", printable(req.end.getKey())) + .detail("RepIsMore", rep.more) + .detail("VersionReturned", rep.version) + .detail("RowsReturned", rep.data.size());*/ + } + + ASSERT( !rep.more || rep.data.size() ); + ASSERT( !limits.hasRowLimit() || rep.data.size() <= limits.rows ); + + limits.decrement( rep.data ); + + if(reverse && begin.isLastLessOrEqual() && rep.data.size() && rep.data.end()[-1].key == begin.getKey()) { + modifiedSelectors = false; + } + + bool finished = limits.isReached() || ( !modifiedSelectors && !rep.more ) || limits.hasSatisfiedMinRows(); + bool readThrough = modifiedSelectors && !rep.more; + + // optimization: first request got all data--just return it + if( finished && !output.size() ) { + bool readToBegin = output.readToBegin; + bool readThroughEnd = output.readThroughEnd; + + output = Standalone( RangeResultRef( rep.data, modifiedSelectors || limits.isReached() || rep.more ), rep.arena ); + output.readToBegin = readToBegin; + output.readThroughEnd = readThroughEnd; + + if( BUGGIFY && limits.hasByteLimit() && output.size() > std::max(1, originalLimits.minRows) ) { + output.more = true; + output.resize(output.arena(), g_random->randomInt(std::max(1,originalLimits.minRows),output.size())); + getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + return output; + } + + if( readThrough ) { + output.arena().dependsOn( shard.arena() ); + output.readThrough = reverse ? shard.begin : shard.end; + } + + getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); return output; } - if( readThrough ) { - output.arena().dependsOn( shard.arena() ); - output.readThrough = reverse ? shard.begin : shard.end; - } - return output; - } + output.arena().dependsOn( rep.arena ); + output.append(output.arena(), rep.data.begin(), rep.data.size()); - output.arena().dependsOn( rep.arena ); - output.append(output.arena(), rep.data.begin(), rep.data.size()); + if( finished ) { + if( readThrough ) { + output.arena().dependsOn( shard.arena() ); + output.readThrough = reverse ? shard.begin : shard.end; + } + output.more = modifiedSelectors || limits.isReached() || rep.more; - if( finished ) { - if( readThrough ) { - output.arena().dependsOn( shard.arena() ); - output.readThrough = reverse ? shard.begin : shard.end; - } - output.more = modifiedSelectors || limits.isReached() || rep.more; - return output; - } - - readVersion = rep.version; // see above comment - - if( !rep.more ) { - ASSERT( modifiedSelectors ); - TEST(true); // !GetKeyValuesReply.more and modifiedSelectors in getRange - - if( !rep.data.size() ) { - Standalone result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) ); - return result; + getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + return output; } - if( reverse ) - end = firstGreaterOrEqual( shard.begin ); - else - begin = firstGreaterOrEqual( shard.end ); - } else { - TEST(true); // GetKeyValuesReply.more in getRange - if( reverse ) - end = firstGreaterOrEqual( output[output.size()-1].key ); - else - begin = firstGreaterThan( output[output.size()-1].key ); - } + readVersion = rep.version; // see above comment + if( !rep.more ) { + ASSERT( modifiedSelectors ); + TEST(true); // !GetKeyValuesReply.more and modifiedSelectors in getRange - } catch ( Error& e ) { - if( info.debugID.present() ) { - g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error"); - TraceEvent("TransactionDebugError", info.debugID.get()).error(e); - } - if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || - (e.code() == error_code_transaction_too_old && readVersion == latestVersion)) - { - cx->invalidateCache( reverse ? end.getKey() : begin.getKey(), reverse ? (end-1).isBackward() : begin.isBackward() ); - cx->invalidateCache( beginServer.second ); + if( !rep.data.size() ) { + Standalone result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) ); + getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result); + return result; + } - if (e.code() == error_code_wrong_shard_server) { - Standalone result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) ); - return result; + if( reverse ) + end = firstGreaterOrEqual( shard.begin ); + else + begin = firstGreaterOrEqual( shard.end ); + } else { + TEST(true); // GetKeyValuesReply.more in getRange + if( reverse ) + end = firstGreaterOrEqual( output[output.size()-1].key ); + else + begin = firstGreaterThan( output[output.size()-1].key ); } - Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); - } else - throw e; + + } catch ( Error& e ) { + if( info.debugID.present() ) { + g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error"); + TraceEvent("TransactionDebugError", info.debugID.get()).error(e); + } + if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || + (e.code() == error_code_transaction_too_old && readVersion == latestVersion)) + { + cx->invalidateCache( reverse ? end.getKey() : begin.getKey(), reverse ? (end-1).isBackward() : begin.isBackward() ); + cx->invalidateCache( beginServer.second ); + + if (e.code() == error_code_wrong_shard_server) { + Standalone result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) ); + getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result); + return result; + } + + Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); + } else { + if (trLogInfo) + trLogInfo->addLog(FdbClientLogEvents::EventGetRangeError(startTime, static_cast(e.code()), begin.getKey(), end.getKey())); + + throw e; + } + } } } + catch(Error &e) { + if(conflictRange.canBeSet()) { + conflictRange.send(std::make_pair(Key(), Key())); + } + + throw; + } } -ACTOR Future> getRangeWrapper(Database cx, Reference trLogInfo, Future fVersion, KeySelector begin, KeySelector end, - GetRangeLimits limits, bool reverse, TransactionInfo info) { - state double startTime = now(); - try { - Standalone ret = wait(getRange(cx, fVersion, begin, end, limits, reverse, info)); - double latency = now() - startTime; - if (trLogInfo) { - int rangeSize = 0; - for (const KeyValueRef &res : ret.contents()) - rangeSize += res.key.size() + res.value.size(); - trLogInfo->addLog(FdbClientLogEvents::EventGetRange(startTime, latency, rangeSize, begin.getKey(), end.getKey())); - } - return ret; - } - catch (Error &e) { - if (trLogInfo) - trLogInfo->addLog(FdbClientLogEvents::EventGetRangeError(startTime, static_cast(e.code()), begin.getKey(), end.getKey())); - throw; - } +Future> getRange( Database const& cx, Future const& fVersion, KeySelector const& begin, KeySelector const& end, + GetRangeLimits const& limits, bool const& reverse, TransactionInfo const& info ) +{ + return getRange(cx, Reference(), fVersion, begin, end, limits, Promise>(), true, reverse, info); } Transaction::Transaction( Database const& cx ) @@ -2082,68 +2142,6 @@ Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) { return getKeyAndConflictRange( cx, key, getReadVersion(), conflictRange, info ); } -ACTOR Future< Standalone > getRangeAndConflictRange( - Database cx, Reference trLogInfo, Future version, - KeySelector begin, KeySelector end, GetRangeLimits limits, - Promise> conflictRange, bool reverse, TransactionInfo info) -{ - state Key beginKey = ( begin.orEqual ? keyAfter(begin.getKey()) : Key(begin.getKey(), begin.arena()) ); - state Key endKey = ( end.orEqual ? keyAfter(end.getKey()) : Key(end.getKey(), end.arena()) ); - - //This optimization prevents NULL operations from being added to the conflict range - if( begin.offset >= end.offset && beginKey >= endKey ) { - TEST(true); //get range no result possible - conflictRange.send(std::make_pair(Key(), Key())); - return Standalone(); - } - - try { - Standalone rep = wait(getRangeWrapper(cx, trLogInfo, version, begin, end, limits, reverse, info) ); - - ASSERT( limits.rows != 0 ); - - Key rangeBegin; - Key rangeEnd; - - if( beginKey < endKey ) { - rangeBegin = reverse && begin.offset <= 0 && rep.more ? endKey : beginKey; - rangeEnd = !reverse && end.offset > 0 && rep.more ? beginKey : endKey; - } - else { - rangeBegin = endKey; - rangeEnd = beginKey; - } - - if( rep.readToBegin ) rangeBegin = allKeys.begin; - if( rep.readThroughEnd ) rangeEnd = allKeys.end; - - if ( rep.size() ) { - if( reverse ) { - if( rep.end()[-1].key < rangeBegin ) { - rangeBegin = rep.end()[-1].key; - } - if( end.offset > 0 && rep[0].key >= rangeEnd ) { - rangeEnd = keyAfter( rep[0].key ); - } - } else { - if( begin.offset <= 0 && rep[0].key < rangeBegin ) { - rangeBegin = rep[0].key; - } - if( rep.end()[-1].key >= rangeEnd ) { - rangeEnd = keyAfter( rep.end()[-1].key ); - } - } - } - - conflictRange.send( std::make_pair( rangeBegin, rangeEnd ) ); - - return std::move(rep); - } catch( Error &e ) { - conflictRange.send(std::make_pair(Key(), Key())); - throw; - } -} - Future< Standalone > Transaction::getRange( const KeySelector& begin, const KeySelector& end, @@ -2157,12 +2155,31 @@ Future< Standalone > Transaction::getRange( if( !limits.isValid() ) return range_limits_invalid(); - if( snapshot ) - return getRangeWrapper(cx, trLogInfo, getReadVersion(), begin, end, limits, reverse, info ); + ASSERT(limits.rows != 0); + + KeySelector b = begin; + if( b.orEqual ) { + TEST(true); // Native begin orEqual==true + b.removeOrEqual(b.arena()); + } + + KeySelector e = end; + if( e.orEqual ) { + TEST(true); // Native end orEqual==true + e.removeOrEqual(e.arena()); + } + + if( b.offset >= e.offset && b.getKey() >= e.getKey() ) { + TEST(true); // Native range inverted + return Standalone(); + } Promise> conflictRange; - extraConflictRanges.push_back( conflictRange.getFuture() ); - return getRangeAndConflictRange( cx, trLogInfo, getReadVersion(), begin, end, limits, conflictRange, reverse, info ); + if(!snapshot) { + extraConflictRanges.push_back( conflictRange.getFuture() ); + } + + return ::getRange(cx, trLogInfo, getReadVersion(), b, e, limits, conflictRange, snapshot, reverse, info); } Future< Standalone > Transaction::getRange( @@ -2637,7 +2654,7 @@ Future Transaction::commitMutations() { bool isCheckingWrites = options.checkWritesEnabled && g_random->random01() < 0.01; for(int i=0; i