Merge pull request #199 from cie/getrange-perf-improvements

Getrange perf improvements
This commit is contained in:
Evan Tschannen 2017-11-04 14:25:55 -07:00 committed by GitHub Enterprise
commit 150aca4734
3 changed files with 288 additions and 257 deletions

View File

@ -1127,7 +1127,7 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database
loop {
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {}
when ( vector<pair<KeyRangeRef, vector<StorageServerInterface>>> keyServersShards = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, ReplyPromise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>>(), info.taskID ) ) ) {
when ( vector<pair<KeyRangeRef, vector<StorageServerInterface>>> keyServersShards = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, ReplyPromise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>>(), TaskDefaultPromiseEndpoint ) ) ) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
ASSERT( keyServersShards.size() ); // There should always be storage servers, except on version 0 which should not get to this function
@ -1413,11 +1413,11 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
return Key();
}
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, k.getKey(), info, k.isBackward()) );
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, Key(k.getKey(), k.arena()), info, k.isBackward()) );
try {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", printable(k.getKey())).detail("offset",k.offset).detail("orEqual",k.orEqual);
GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), info.taskID, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",printable(reply.sel.key)).detail("offset", reply.sel.offset).detail("orEqual", k.orEqual);
k = reply.sel;
@ -1485,7 +1485,7 @@ ACTOR Future< Void > watchValue( Future<Version> version, Key key, Optional<Valu
g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first());
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
state Version resp = wait( loadBalance( ssi.second, &StorageServerInterface::watchValue, WatchValueRequest(key, value, ver, watchValueID), info.taskID ) );
state Version resp = wait( loadBalance( ssi.second, &StorageServerInterface::watchValue, WatchValueRequest(key, value, ver, watchValueID), TaskDefaultPromiseEndpoint ) );
if( info.debugID.present() ) {
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask());
}
@ -1577,7 +1577,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
.detail("Reverse", reverse)
.detail("Servers", locations[shard].second->description());*/
}
GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, info.taskID, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
output.arena().dependsOn( rep.arena );
@ -1726,195 +1726,255 @@ ACTOR Future<Standalone<RangeResultRef>> getRangeFallback( Database cx, Version
return r;
}
ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Future<Version> fVersion,
KeySelector begin, KeySelector end, GetRangeLimits limits, bool reverse, TransactionInfo info )
void getRangeFinished(Reference<TransactionLogInfo> trLogInfo, double startTime, KeySelector begin, KeySelector end, bool snapshot,
Promise<std::pair<Key, Key>> conflictRange, bool reverse, Standalone<RangeResultRef> result)
{
if( trLogInfo ) {
int rangeSize = 0;
for (const KeyValueRef &kv : result.contents())
rangeSize += kv.key.size() + kv.value.size();
trLogInfo->addLog(FdbClientLogEvents::EventGetRange(startTime, now()-startTime, rangeSize, begin.getKey(), end.getKey()));
}
if( !snapshot ) {
Key rangeBegin;
Key rangeEnd;
if(result.readToBegin) {
rangeBegin = allKeys.begin;
}
else if(((!reverse || !result.more || begin.offset > 1) && begin.offset > 0) || result.size() == 0) {
rangeBegin = Key(begin.getKey(), begin.arena());
}
else {
rangeBegin = reverse ? result.end()[-1].key : result[0].key;
}
if(end.offset > begin.offset && end.getKey() < rangeBegin) {
rangeBegin = Key(end.getKey(), end.arena());
}
if(result.readThroughEnd) {
rangeEnd = allKeys.end;
}
else if(((reverse || !result.more || end.offset <= 0) && end.offset <= 1) || result.size() == 0) {
rangeEnd = Key(end.getKey(), end.arena());
}
else {
rangeEnd = keyAfter(reverse ? result[0].key : result.end()[-1].key);
}
if(begin.offset < end.offset && begin.getKey() > rangeEnd) {
rangeEnd = Key(begin.getKey(), begin.arena());
}
conflictRange.send(std::make_pair(rangeBegin, rangeEnd));
}
}
ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<TransactionLogInfo> trLogInfo, Future<Version> fVersion,
KeySelector begin, KeySelector end, GetRangeLimits limits, Promise<std::pair<Key, Key>> conflictRange, bool snapshot, bool reverse,
TransactionInfo info )
{
state GetRangeLimits originalLimits( limits );
state KeySelector originalBegin = begin;
state KeySelector originalEnd = end;
state Standalone<RangeResultRef> output;
state Version version = wait( fVersion );
state Version readVersion = version; // Needed for latestVersion requests; if more, make future requests at the version that the first one completed
// FIXME: Is this really right? Weaken this and see if there is a problem; if so maybe there is a much subtler problem even with this.
try {
state Version version = wait( fVersion );
validateVersion(version);
if( begin.getKey() == allKeys.begin && begin.offset < 1 ) {
output.readToBegin = true;
begin = firstGreaterOrEqual( begin.getKey() );
}
state double startTime = now();
state Version readVersion = version; // Needed for latestVersion requests; if more, make future requests at the version that the first one completed
// FIXME: Is this really right? Weaken this and see if there is a problem; if so maybe there is a much subtler problem even with this.
validateVersion(version);
ASSERT( !limits.isReached() );
ASSERT( (!limits.hasRowLimit() || limits.rows >= limits.minRows) && limits.minRows >= 0 );
loop {
if( end.getKey() == allKeys.begin && (end.offset < 1 || end.isFirstGreaterOrEqual()) ) {
return output;
if( begin.getKey() == allKeys.begin && begin.offset < 1 ) {
output.readToBegin = true;
begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena());
}
state pair<KeyRange, Reference<LocationInfo>> beginServer = wait( getKeyLocation( cx, reverse ? end.getKey() : begin.getKey(), info, reverse ? (end-1).isBackward() : begin.isBackward() ) );
state KeyRange shard = beginServer.first;
state bool modifiedSelectors = false;
state GetKeyValuesRequest req;
ASSERT( !limits.isReached() );
ASSERT( (!limits.hasRowLimit() || limits.rows >= limits.minRows) && limits.minRows >= 0 );
req.version = readVersion;
if( reverse && (begin-1).isDefinitelyLess(shard.begin) &&
( !begin.isFirstGreaterOrEqual() || begin.getKey() != shard.begin ) ) { //In this case we would be setting modifiedSelectors to true, but not modifying anything
req.begin = firstGreaterOrEqual( shard.begin );
modifiedSelectors = true;
}
else req.begin = begin;
if( !reverse && end.isDefinitelyGreater(shard.end) ) {
req.end = firstGreaterOrEqual( shard.end );
modifiedSelectors = true;
}
else req.end = end;
transformRangeLimits(limits, reverse, req);
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
req.debugID = info.debugID;
try {
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
/*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get())
.detail("ReqBeginKey", printable(req.begin.getKey()))
.detail("ReqEndKey", printable(req.end.getKey()))
.detail("originalBegin", originalBegin.toString())
.detail("originalEnd", originalEnd.toString())
.detail("Begin", begin.toString())
.detail("End", end.toString())
.detail("shard", printable(shard))
.detail("ReqLimit", req.limit)
.detail("ReqLimitBytes", req.limitBytes)
.detail("ReqVersion", req.version)
.detail("Reverse", reverse)
.detail("ModifiedSelectors", modifiedSelectors)
.detail("Servers", beginServer.second->description());*/
}
GetKeyValuesReply _rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, info.taskID, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
GetKeyValuesReply rep = _rep;
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size());
/*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get())
.detail("ReqBeginKey", printable(req.begin.getKey()))
.detail("ReqEndKey", printable(req.end.getKey()))
.detail("RepIsMore", rep.more)
.detail("VersionReturned", rep.version)
.detail("RowsReturned", rep.data.size());*/
loop {
if( end.getKey() == allKeys.begin && (end.offset < 1 || end.isFirstGreaterOrEqual()) ) {
getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
return output;
}
ASSERT( !rep.more || rep.data.size() );
ASSERT( !limits.hasRowLimit() || rep.data.size() <= limits.rows );
state pair<KeyRange, Reference<LocationInfo>> beginServer = wait( getKeyLocation( cx, reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()), info, reverse ? (end-1).isBackward() : begin.isBackward() ) );
state KeyRange shard = beginServer.first;
state bool modifiedSelectors = false;
state GetKeyValuesRequest req;
limits.decrement( rep.data );
req.version = readVersion;
if(reverse && begin.isLastLessOrEqual() && rep.data.size() && rep.data.end()[-1].key == begin.getKey()) {
modifiedSelectors = false;
if( reverse && (begin-1).isDefinitelyLess(shard.begin) &&
( !begin.isFirstGreaterOrEqual() || begin.getKey() != shard.begin ) ) { //In this case we would be setting modifiedSelectors to true, but not modifying anything
req.begin = firstGreaterOrEqual( shard.begin );
modifiedSelectors = true;
}
else req.begin = begin;
bool finished = limits.isReached() || ( !modifiedSelectors && !rep.more ) || limits.hasSatisfiedMinRows();
bool readThrough = modifiedSelectors && !rep.more;
if( !reverse && end.isDefinitelyGreater(shard.end) ) {
req.end = firstGreaterOrEqual( shard.end );
modifiedSelectors = true;
}
else req.end = end;
// optimization: first request got all data--just return it
if( finished && !output.size() ) {
output = Standalone<RangeResultRef>( RangeResultRef( rep.data, modifiedSelectors || limits.isReached() || rep.more ), rep.arena );
transformRangeLimits(limits, reverse, req);
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
if( BUGGIFY && limits.hasByteLimit() && output.size() > std::max(1, originalLimits.minRows) ) {
output.more = true;
output.resize(output.arena(), g_random->randomInt(std::max(1,originalLimits.minRows),output.size()));
req.debugID = info.debugID;
try {
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
/*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get())
.detail("ReqBeginKey", printable(req.begin.getKey()))
.detail("ReqEndKey", printable(req.end.getKey()))
.detail("originalBegin", originalBegin.toString())
.detail("originalEnd", originalEnd.toString())
.detail("Begin", begin.toString())
.detail("End", end.toString())
.detail("shard", printable(shard))
.detail("ReqLimit", req.limit)
.detail("ReqLimitBytes", req.limitBytes)
.detail("ReqVersion", req.version)
.detail("Reverse", reverse)
.detail("ModifiedSelectors", modifiedSelectors)
.detail("Servers", beginServer.second->description());*/
}
GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size());
/*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get())
.detail("ReqBeginKey", printable(req.begin.getKey()))
.detail("ReqEndKey", printable(req.end.getKey()))
.detail("RepIsMore", rep.more)
.detail("VersionReturned", rep.version)
.detail("RowsReturned", rep.data.size());*/
}
ASSERT( !rep.more || rep.data.size() );
ASSERT( !limits.hasRowLimit() || rep.data.size() <= limits.rows );
limits.decrement( rep.data );
if(reverse && begin.isLastLessOrEqual() && rep.data.size() && rep.data.end()[-1].key == begin.getKey()) {
modifiedSelectors = false;
}
bool finished = limits.isReached() || ( !modifiedSelectors && !rep.more ) || limits.hasSatisfiedMinRows();
bool readThrough = modifiedSelectors && !rep.more;
// optimization: first request got all data--just return it
if( finished && !output.size() ) {
bool readToBegin = output.readToBegin;
bool readThroughEnd = output.readThroughEnd;
output = Standalone<RangeResultRef>( RangeResultRef( rep.data, modifiedSelectors || limits.isReached() || rep.more ), rep.arena );
output.readToBegin = readToBegin;
output.readThroughEnd = readThroughEnd;
if( BUGGIFY && limits.hasByteLimit() && output.size() > std::max(1, originalLimits.minRows) ) {
output.more = true;
output.resize(output.arena(), g_random->randomInt(std::max(1,originalLimits.minRows),output.size()));
getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
return output;
}
if( readThrough ) {
output.arena().dependsOn( shard.arena() );
output.readThrough = reverse ? shard.begin : shard.end;
}
getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
return output;
}
if( readThrough ) {
output.arena().dependsOn( shard.arena() );
output.readThrough = reverse ? shard.begin : shard.end;
}
return output;
}
output.arena().dependsOn( rep.arena );
output.append(output.arena(), rep.data.begin(), rep.data.size());
output.arena().dependsOn( rep.arena );
output.append(output.arena(), rep.data.begin(), rep.data.size());
if( finished ) {
if( readThrough ) {
output.arena().dependsOn( shard.arena() );
output.readThrough = reverse ? shard.begin : shard.end;
}
output.more = modifiedSelectors || limits.isReached() || rep.more;
if( finished ) {
if( readThrough ) {
output.arena().dependsOn( shard.arena() );
output.readThrough = reverse ? shard.begin : shard.end;
}
output.more = modifiedSelectors || limits.isReached() || rep.more;
return output;
}
readVersion = rep.version; // see above comment
if( !rep.more ) {
ASSERT( modifiedSelectors );
TEST(true); // !GetKeyValuesReply.more and modifiedSelectors in getRange
if( !rep.data.size() ) {
Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
return result;
getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
return output;
}
if( reverse )
end = firstGreaterOrEqual( shard.begin );
else
begin = firstGreaterOrEqual( shard.end );
} else {
TEST(true); // GetKeyValuesReply.more in getRange
if( reverse )
end = firstGreaterOrEqual( output[output.size()-1].key );
else
begin = firstGreaterThan( output[output.size()-1].key );
}
readVersion = rep.version; // see above comment
if( !rep.more ) {
ASSERT( modifiedSelectors );
TEST(true); // !GetKeyValuesReply.more and modifiedSelectors in getRange
} catch ( Error& e ) {
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error");
TraceEvent("TransactionDebugError", info.debugID.get()).error(e);
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
(e.code() == error_code_transaction_too_old && readVersion == latestVersion))
{
cx->invalidateCache( reverse ? end.getKey() : begin.getKey(), reverse ? (end-1).isBackward() : begin.isBackward() );
cx->invalidateCache( beginServer.second );
if (e.code() == error_code_wrong_shard_server) {
if( !rep.data.size() ) {
Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
return result;
}
if( reverse )
end = firstGreaterOrEqual( shard.begin );
else
begin = firstGreaterOrEqual( shard.end );
} else {
TEST(true); // GetKeyValuesReply.more in getRange
if( reverse )
end = firstGreaterOrEqual( output[output.size()-1].key );
else
begin = firstGreaterThan( output[output.size()-1].key );
}
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
} else
throw e;
} catch ( Error& e ) {
if( info.debugID.present() ) {
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error");
TraceEvent("TransactionDebugError", info.debugID.get()).error(e);
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
(e.code() == error_code_transaction_too_old && readVersion == latestVersion))
{
cx->invalidateCache( reverse ? end.getKey() : begin.getKey(), reverse ? (end-1).isBackward() : begin.isBackward() );
cx->invalidateCache( beginServer.second );
if (e.code() == error_code_wrong_shard_server) {
Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
return result;
}
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
} else {
if (trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventGetRangeError(startTime, static_cast<int>(e.code()), begin.getKey(), end.getKey()));
throw e;
}
}
}
}
catch(Error &e) {
if(conflictRange.canBeSet()) {
conflictRange.send(std::make_pair(Key(), Key()));
}
throw;
}
}
ACTOR Future<Standalone<RangeResultRef>> getRangeWrapper(Database cx, Reference<TransactionLogInfo> trLogInfo, Future<Version> fVersion, KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse, TransactionInfo info) {
state double startTime = now();
try {
Standalone<RangeResultRef> ret = wait(getRange(cx, fVersion, begin, end, limits, reverse, info));
double latency = now() - startTime;
if (trLogInfo) {
int rangeSize = 0;
for (const KeyValueRef &res : ret.contents())
rangeSize += res.key.size() + res.value.size();
trLogInfo->addLog(FdbClientLogEvents::EventGetRange(startTime, latency, rangeSize, begin.getKey(), end.getKey()));
}
return ret;
}
catch (Error &e) {
if (trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventGetRangeError(startTime, static_cast<int>(e.code()), begin.getKey(), end.getKey()));
throw;
}
Future<Standalone<RangeResultRef>> getRange( Database const& cx, Future<Version> const& fVersion, KeySelector const& begin, KeySelector const& end,
GetRangeLimits const& limits, bool const& reverse, TransactionInfo const& info )
{
return getRange(cx, Reference<TransactionLogInfo>(), fVersion, begin, end, limits, Promise<std::pair<Key, Key>>(), true, reverse, info);
}
Transaction::Transaction( Database const& cx )
@ -2058,17 +2118,17 @@ Future< Standalone< VectorRef< const char*>>> Transaction::getAddressesForKey( c
}
ACTOR Future< Key > getKeyAndConflictRange(
Database cx, KeySelector k, Future<Version> version, Promise<KeyRange> conflictRange, TransactionInfo info)
Database cx, KeySelector k, Future<Version> version, Promise<std::pair<Key, Key>> conflictRange, TransactionInfo info)
{
try {
Key rep = wait( getKey(cx, k, version, info) );
if( k.offset <= 0 )
conflictRange.send( KeyRangeRef( rep, k.orEqual ? keyAfter( k.getKey() ) : k.getKey() ) );
conflictRange.send( std::make_pair( rep, k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()) ) );
else
conflictRange.send( KeyRangeRef( k.orEqual ? keyAfter( k.getKey() ) : k.getKey(), keyAfter( rep ) ) );
conflictRange.send( std::make_pair( k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()), keyAfter( rep ) ) );
return std::move(rep);
} catch( Error&e ) {
conflictRange.send(KeyRangeRef());
conflictRange.send(std::make_pair(Key(), Key()));
throw;
}
}
@ -2077,65 +2137,11 @@ Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) {
if( snapshot )
return ::getKey(cx, key, getReadVersion(), info);
Promise<KeyRange> conflictRange;
Promise<std::pair<Key, Key>> conflictRange;
extraConflictRanges.push_back( conflictRange.getFuture() );
return getKeyAndConflictRange( cx, key, getReadVersion(), conflictRange, info );
}
ACTOR Future< Standalone<RangeResultRef> > getRangeAndConflictRange(
Database cx, Reference<TransactionLogInfo> trLogInfo, Future<Version> version,
KeySelector begin, KeySelector end, GetRangeLimits limits,
Promise<KeyRange> conflictRange, bool reverse, TransactionInfo info)
{
state Key beginKey = ( begin.orEqual ? keyAfter(begin.getKey()) : begin.getKey() );
state Key endKey = ( end.orEqual ? keyAfter(end.getKey()) : end.getKey() );
//This optimization prevents NULL operations from being added to the conflict range
if( begin.offset >= end.offset && beginKey >= endKey ) {
TEST(true); //get range no result possible
conflictRange.send(KeyRangeRef());
return Standalone<RangeResultRef>();
}
try {
Standalone<RangeResultRef> rep = wait(getRangeWrapper(cx, trLogInfo, version, begin, end, limits, reverse, info) );
ASSERT( limits.rows != 0 );
Key rangeBegin;
Key rangeEnd;
if( beginKey < endKey ) {
rangeBegin = reverse && begin.offset <= 0 && rep.more ? endKey : beginKey;
rangeEnd = !reverse && end.offset > 0 && rep.more ? beginKey : endKey;
}
else {
rangeBegin = endKey;
rangeEnd = beginKey;
}
if( rep.readToBegin ) rangeBegin = allKeys.begin;
if( rep.readThroughEnd ) rangeEnd = allKeys.end;
if ( rep.size() ) {
if( reverse ) {
rangeBegin = min( rangeBegin, Key(rep.end()[-1].key) );
if( end.offset > 0 ) rangeEnd = max( rangeEnd, keyAfter( rep[0].key ) );
} else {
if( begin.offset <= 0 ) rangeBegin = min( rangeBegin, Key(rep[0].key) );
rangeEnd = max( rangeEnd, keyAfter( rep.end()[-1].key ) );
}
}
conflictRange.send( KeyRangeRef( rangeBegin, rangeEnd ) );
return std::move(rep);
} catch( Error &e ) {
conflictRange.send(KeyRangeRef());
throw;
}
}
Future< Standalone<RangeResultRef> > Transaction::getRange(
const KeySelector& begin,
const KeySelector& end,
@ -2149,12 +2155,31 @@ Future< Standalone<RangeResultRef> > Transaction::getRange(
if( !limits.isValid() )
return range_limits_invalid();
if( snapshot )
return getRangeWrapper(cx, trLogInfo, getReadVersion(), begin, end, limits, reverse, info );
ASSERT(limits.rows != 0);
Promise<KeyRange> conflictRange;
extraConflictRanges.push_back( conflictRange.getFuture() );
return getRangeAndConflictRange( cx, trLogInfo, getReadVersion(), begin, end, limits, conflictRange, reverse, info );
KeySelector b = begin;
if( b.orEqual ) {
TEST(true); // Native begin orEqual==true
b.removeOrEqual(b.arena());
}
KeySelector e = end;
if( e.orEqual ) {
TEST(true); // Native end orEqual==true
e.removeOrEqual(e.arena());
}
if( b.offset >= e.offset && b.getKey() >= e.getKey() ) {
TEST(true); // Native range inverted
return Standalone<RangeResultRef>();
}
Promise<std::pair<Key, Key>> conflictRange;
if(!snapshot) {
extraConflictRanges.push_back( conflictRange.getFuture() );
}
return ::getRange(cx, trLogInfo, getReadVersion(), b, e, limits, conflictRange, snapshot, reverse, info);
}
Future< Standalone<RangeResultRef> > Transaction::getRange(
@ -2514,7 +2539,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
}
req.debugID = commitID;
state Future<CommitID> reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, info.taskID, true );
state Future<CommitID> reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true );
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {
@ -2627,8 +2652,8 @@ Future<Void> Transaction::commitMutations() {
bool isCheckingWrites = options.checkWritesEnabled && g_random->random01() < 0.01;
for(int i=0; i<extraConflictRanges.size(); i++)
if (extraConflictRanges[i].isReady() && !extraConflictRanges[i].get().empty() )
tr.transaction.read_conflict_ranges.push_back( tr.arena, extraConflictRanges[i].get() );
if (extraConflictRanges[i].isReady() && extraConflictRanges[i].get().first < extraConflictRanges[i].get().second )
tr.transaction.read_conflict_ranges.push_back( tr.arena, KeyRangeRef(extraConflictRanges[i].get().first, extraConflictRanges[i].get().second) );
if( !options.causalWriteRisky && !intersects( tr.transaction.write_conflict_ranges, tr.transaction.read_conflict_ranges ).present() )
makeSelfConflicting();
@ -3138,4 +3163,4 @@ void enableClientInfoLogging() {
ASSERT(networkOptions.logClientInfo.present() == false);
networkOptions.logClientInfo = true;
TraceEvent(SevInfo, "ClientInfoLoggingEnabled");
}
}

View File

@ -308,7 +308,7 @@ private:
Version committedVersion;
CommitTransactionRequest tr;
Future<Version> readVersion;
vector<Future<KeyRange>> extraConflictRanges;
vector<Future<std::pair<Key, Key>>> extraConflictRanges;
Promise<Void> commitResult;
Future<Void> committing;
};

View File

@ -146,7 +146,7 @@ public:
if(key > ryw->getMaxReadKey())
read.end = firstGreaterOrEqual(ryw->getMaxReadKey());
else
read.end = firstGreaterOrEqual(key);
read.end = KeySelector(firstGreaterOrEqual(key), key.arena());
}
Standalone<RangeResultRef> v = wait( ryw->tr.getRange(read.begin, read.end, read.limits, snapshot, Reverse) );
@ -346,9 +346,6 @@ public:
if(keyNeedsCopy) {
key.setKey(keykey.toArena(key.arena()));
}
else {
key.setKey(keykey.toArenaOrRef(key.arena()));
}
}
static KeyRangeRef getKnownKeyRange( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) {
@ -360,10 +357,12 @@ public:
if( data.size() ) {
beginKey = std::min( beginKey, data[0].key );
if( data.readThrough.present() )
if( data.readThrough.present() ) {
endKey = std::max<ExtStringRef>( endKey, data.readThrough.get() );
endKey = !data.more && data.end()[-1].key < endKey ? endKey : ExtStringRef( data.end()[-1].key, 1 );
}
else {
endKey = !data.more && data.end()[-1].key < endKey ? endKey : ExtStringRef( data.end()[-1].key, 1 );
}
}
if (beginKey >= endKey) return KeyRangeRef();
@ -461,7 +460,7 @@ public:
resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
if( begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) {
if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
return RangeResultRef(false, false);
}
else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
@ -501,16 +500,16 @@ public:
.detail("unknown", it.is_unknown_range())
.detail("requests", requestCount);*/
if( !result.size() && begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) {
if( !result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
return RangeResultRef(false, false);
}
if( end.getKey() == allKeys.begin && end.offset <= 1 ) {
if( end.offset <= 1 && end.getKey() == allKeys.begin ) {
return RangeResultRef(readToBegin, readThroughEnd);
}
if( ( begin.getKey() >= end.getKey() && begin.offset >= end.offset ) ||
( begin.getKey() >= ryw->getMaxReadKey() && begin.offset >= 1) ) {
if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) ||
( begin.offset >= 1 && begin.getKey() >= ryw->getMaxReadKey() ) ) {
if( end.isFirstGreaterOrEqual() ) break;
if( !result.size() ) break;
Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
@ -522,7 +521,7 @@ public:
break;
}
if( it.beginKey() > itEnd.beginKey() && !it.is_unreadable() && !it.is_unknown_range() ) {
if( !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() > itEnd.beginKey() ) {
if( end.isFirstGreaterOrEqual() ) break;
return RangeResultRef(readToBegin, readThroughEnd);
}
@ -545,10 +544,11 @@ public:
state KeySelector read_end;
if ( ucEnd!=itEnd ) {
read_end = firstGreaterOrEqual(ucEnd.endKey().toStandaloneStringRef());
Key k = ucEnd.endKey().toStandaloneStringRef();
read_end = KeySelector(firstGreaterOrEqual(k), k.arena());
if( end.offset < 1 ) additionalRows += 1 - end.offset; // extra for items past end
} else if( end.offset < 1 ) {
read_end = firstGreaterOrEqual( end.getKey() );
read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena());
additionalRows += 1 - end.offset;
} else {
read_end = end;
@ -562,10 +562,11 @@ public:
state KeySelector read_begin;
if (begin.isFirstGreaterOrEqual()) {
begin = firstGreaterOrEqual( it.beginKey() > begin.getKey() ? it.beginKey().toStandaloneStringRef() : begin.getKey() );
Key k = it.beginKey() > begin.getKey() ? it.beginKey().toStandaloneStringRef() : Key(begin.getKey(), begin.arena());
begin = KeySelector(firstGreaterOrEqual(k), k.arena());
read_begin = begin;
} else if( begin.offset > 1 ) {
read_begin = firstGreaterOrEqual(begin.getKey());
read_begin = KeySelector(firstGreaterOrEqual(begin.getKey()), begin.arena());
additionalRows += begin.offset - 1;
} else {
read_begin = begin;
@ -645,10 +646,13 @@ public:
if (data.readThroughEnd) endKey = allKeys.end;
if( data.size() ) {
beginKey = !data.more && data.end()[-1].key > beginKey ? beginKey : data.end()[-1].key;
if( data.readThrough.present() )
if( data.readThrough.present() ) {
beginKey = std::min( data.readThrough.get(), beginKey );
}
else {
beginKey = !data.more && data.end()[-1].key > beginKey ? beginKey : data.end()[-1].key;
}
endKey = data[0].key < endKey ? endKey : ExtStringRef( data[0].key, 1 );
}
if (beginKey >= endKey) return KeyRangeRef();
@ -725,7 +729,7 @@ public:
resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
if( ( begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset ) ) {
if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
return RangeResultRef(false, false);
}
else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
@ -766,16 +770,16 @@ public:
.detail("kv", it.is_kv())
.detail("requests", requestCount);*/
if(!result.size() && begin.getKey() >= end.getKey() && actualBeginOffset >= actualEndOffset) {
if(!result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
return RangeResultRef(false, false);
}
if( begin.getKey() >= ryw->getMaxReadKey() && !begin.isBackward() ) {
if( !begin.isBackward() && begin.getKey() >= ryw->getMaxReadKey() ) {
return RangeResultRef(readToBegin, readThroughEnd);
}
if( ( begin.getKey() >= end.getKey() && begin.offset >= end.offset ) ||
( end.getKey() == allKeys.begin && end.offset <= 1 ) ) {
if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) ||
( end.offset <= 1 && end.getKey() == allKeys.begin ) ) {
if( begin.isFirstGreaterOrEqual() ) break;
if( !result.size() ) break;
Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
@ -787,7 +791,7 @@ public:
break;
}
if (it.beginKey() < itEnd.beginKey() && !it.is_unreadable() && !it.is_unknown_range() && itemsPastBegin >= begin.offset - 1) {
if (itemsPastBegin >= begin.offset - 1 && !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() < itEnd.beginKey()) {
if( begin.isFirstGreaterOrEqual() ) break;
return RangeResultRef(readToBegin, readThroughEnd);
}
@ -813,10 +817,11 @@ public:
state KeySelector read_begin;
if ( ucEnd!=itEnd ) {
read_begin = firstGreaterOrEqual(ucEnd.beginKey().toStandaloneStringRef());
Key k = ucEnd.beginKey().toStandaloneStringRef();
read_begin = KeySelector(firstGreaterOrEqual(k), k.arena());
if( begin.offset > 1 ) additionalRows += begin.offset - 1; // extra for items past end
} else if( begin.offset > 1 ) {
read_begin = firstGreaterOrEqual( begin.getKey() );
read_begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena());
additionalRows += begin.offset - 1;
} else {
read_begin = begin;
@ -830,10 +835,11 @@ public:
state KeySelector read_end;
if (end.isFirstGreaterOrEqual()) {
end = firstGreaterOrEqual( it.endKey() < end.getKey() ? it.endKey().toStandaloneStringRef() : end.getKey() );
Key k = it.endKey() < end.getKey() ? it.endKey().toStandaloneStringRef() : end.getKey();
end = KeySelector(firstGreaterOrEqual(k), k.arena());
read_end = end;
} else if (end.offset < 1) {
read_end = firstGreaterOrEqual(end.getKey());
read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena());
additionalRows += 1 - end.offset;
} else {
read_end = end;