Merge pull request #352 from etschannen/release-5.2
Properly handle endpoint failures on the client for all request types
This commit is contained in:
commit
93497d7d42
|
@ -19,6 +19,11 @@ Performance
|
|||
|
||||
* Improved backup task prioritization. `(PR #71) <https://github.com/apple/foundationdb/pull/71>`_
|
||||
|
||||
Fixes
|
||||
-----
|
||||
|
||||
* The client did not clear the storage server interface cache on endpoint failure for all request types. This causes up to one second of additional latency on the first get range request to a rebooted storage server. `(Issue #351) <https://github.com/apple/foundationdb/issues/351>`_
|
||||
|
||||
Status
|
||||
------
|
||||
|
||||
|
|
|
@ -1023,17 +1023,13 @@ ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInt
|
|||
}
|
||||
|
||||
//If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
|
||||
ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
|
||||
ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
|
||||
if (isBackward) {
|
||||
ASSERT( key != allKeys.begin && key <= allKeys.end );
|
||||
} else {
|
||||
ASSERT( key < allKeys.end );
|
||||
}
|
||||
|
||||
auto loc = cx->getCachedLocation( key, isBackward );
|
||||
if (loc.second)
|
||||
return loc;
|
||||
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
|
||||
|
||||
|
@ -1052,6 +1048,24 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database
|
|||
}
|
||||
}
|
||||
|
||||
template <class F>
|
||||
Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation( Database const& cx, Key const& key, F StorageServerInterface::*member, TransactionInfo const& info, bool isBackward = false ) {
|
||||
auto ssi = cx->getCachedLocation( key, isBackward );
|
||||
if (!ssi.second) {
|
||||
return getKeyLocation_internal( cx, key, info, isBackward );
|
||||
}
|
||||
|
||||
for(int i = 0; i < ssi.second->size(); i++) {
|
||||
if( IFailureMonitor::failureMonitor().onlyEndpointFailed(ssi.second->get(i, member).getEndpoint()) ) {
|
||||
cx->invalidateCache( key );
|
||||
ssi.second.clear();
|
||||
return getKeyLocation_internal( cx, key, info, isBackward );
|
||||
}
|
||||
}
|
||||
|
||||
return ssi;
|
||||
}
|
||||
|
||||
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
|
||||
|
@ -1079,14 +1093,36 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
|
|||
}
|
||||
}
|
||||
|
||||
Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
||||
template <class F>
|
||||
Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database const& cx, KeyRange const& keys, int limit, bool reverse, F StorageServerInterface::*member, TransactionInfo const& info ) {
|
||||
ASSERT (!keys.empty());
|
||||
|
||||
vector< pair<KeyRange,Reference<LocationInfo>> > result;
|
||||
if (cx->getCachedLocations(keys, result, limit, reverse))
|
||||
return result;
|
||||
vector< pair<KeyRange,Reference<LocationInfo>> > locations;
|
||||
if (!cx->getCachedLocations(keys, locations, limit, reverse)) {
|
||||
return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
|
||||
}
|
||||
|
||||
return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
|
||||
bool foundFailed = false;
|
||||
for(auto& it : locations) {
|
||||
bool onlyEndpointFailed = false;
|
||||
for(int i = 0; i < it.second->size(); i++) {
|
||||
if( IFailureMonitor::failureMonitor().onlyEndpointFailed(it.second->get(i, member).getEndpoint()) ) {
|
||||
onlyEndpointFailed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if( onlyEndpointFailed ) {
|
||||
cx->invalidateCache( it.first.begin );
|
||||
foundFailed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if(foundFailed) {
|
||||
return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
|
||||
}
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> warmRange_impl( Transaction *self, Database cx, KeyRange keys ) {
|
||||
|
@ -1129,28 +1165,8 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
state Version ver = wait( version );
|
||||
validateVersion(ver);
|
||||
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi;
|
||||
loop {
|
||||
ssi = cx->getCachedLocation( key );
|
||||
if (!ssi.second) {
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, key, info ) );
|
||||
ssi = std::move(ssi2);
|
||||
} else {
|
||||
bool onlyEndpointFailed = false;
|
||||
for(int i = 0; i < ssi.second->size(); i++) {
|
||||
if( IFailureMonitor::failureMonitor().onlyEndpointFailed(ssi.second->get(i, &StorageServerInterface::getValue).getEndpoint()) ) {
|
||||
onlyEndpointFailed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if( onlyEndpointFailed ) {
|
||||
cx->invalidateCache( key );
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, key, info ) );
|
||||
ssi = std::move(ssi2);
|
||||
}
|
||||
}
|
||||
//Reference<LocationInfo> ssi = wait( getKeyLocation( cx, key ) );
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, key, &StorageServerInterface::getValue, info) );
|
||||
state Optional<UID> getValueID = Optional<UID>();
|
||||
state uint64_t startTime;
|
||||
state double startTimeD;
|
||||
|
@ -1228,7 +1244,9 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
|||
return Key();
|
||||
}
|
||||
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, Key(k.getKey(), k.arena()), info, k.isBackward()) );
|
||||
Key locationKey(k.getKey(), k.arena());
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()) );
|
||||
|
||||
try {
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", printable(k.getKey())).detail("offset",k.offset).detail("orEqual",k.orEqual);
|
||||
|
@ -1286,11 +1304,7 @@ ACTOR Future< Void > watchValue( Future<Version> version, Key key, Optional<Valu
|
|||
ASSERT(ver != latestVersion);
|
||||
|
||||
loop {
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = cx->getCachedLocation( key );
|
||||
if (!ssi.second) {
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, key, info ) );
|
||||
ssi = std::move(ssi2);
|
||||
}
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, key, &StorageServerInterface::watchValue, info ) );
|
||||
|
||||
try {
|
||||
state Optional<UID> watchValueID = Optional<UID>();
|
||||
|
@ -1361,8 +1375,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
|||
|
||||
//printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
|
||||
loop {
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, info ) );
|
||||
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValues, info ) );
|
||||
ASSERT( locations.size() );
|
||||
state int shard = 0;
|
||||
loop {
|
||||
|
@ -1617,7 +1630,9 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
|||
return output;
|
||||
}
|
||||
|
||||
state pair<KeyRange, Reference<LocationInfo>> beginServer = wait( getKeyLocation( cx, reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()), info, reverse ? (end-1).isBackward() : begin.isBackward() ) );
|
||||
Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena());
|
||||
bool locationBackward = reverse ? (end-1).isBackward() : begin.isBackward();
|
||||
state pair<KeyRange, Reference<LocationInfo>> beginServer = wait( getKeyLocation( cx, locationKey, &StorageServerInterface::getKeyValues, info, locationBackward ) );
|
||||
state KeyRange shard = beginServer.first;
|
||||
state bool modifiedSelectors = false;
|
||||
state GetKeyValuesRequest req;
|
||||
|
@ -2872,8 +2887,8 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
|
|||
{
|
||||
state int tooManyShardsCount = 0;
|
||||
loop {
|
||||
state vector< pair<KeyRange,Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, TransactionInfo(TaskDataDistribution) ) );
|
||||
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskDataDistribution) ) );
|
||||
|
||||
if( locations.size() == shardLimit ) {
|
||||
TraceEvent(!g_network->isSimulated() && ++tooManyShardsCount >= 15 ? SevWarnAlways : SevWarn, "WaitStorageMetricsPenalty")
|
||||
.detail("Keys", printable(keys))
|
||||
|
@ -2926,7 +2941,7 @@ Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, i
|
|||
ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated )
|
||||
{
|
||||
loop {
|
||||
state vector< pair<KeyRange,Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, TransactionInfo(TaskDataDistribution) ) );
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics, TransactionInfo(TaskDataDistribution) ) );
|
||||
state StorageMetrics used;
|
||||
state Standalone<VectorRef<KeyRef>> results;
|
||||
|
||||
|
|
Loading…
Reference in New Issue