fix: clear the client's storage server interface cache on endpoint failure for all request types
This commit is contained in:
parent
fb86d674f8
commit
98e07fd92e
|
@ -19,6 +19,11 @@ Performance
|
|||
|
||||
* Improved backup task prioritization. `(PR #71) <https://github.com/apple/foundationdb/pull/71>`_
|
||||
|
||||
Fixes
|
||||
-----
|
||||
|
||||
* The client did not clear the storage server interface cache on endpoint failure for all request types. This causes up to one second of additional latency on the first get range request to a rebooted storage server. `(Issue #351) <https://github.com/apple/foundationdb/issues/351>`_
|
||||
|
||||
Status
|
||||
------
|
||||
|
||||
|
|
|
@ -1010,6 +1010,55 @@ ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInt
|
|||
return serverInterfaces;
|
||||
}
|
||||
|
||||
template <class F>
|
||||
pair<KeyRange, Reference<LocationInfo>> getCachedKeyLocation( Database cx, Key key, F StorageServerInterface::*member, bool isBackward = false ) {
|
||||
auto ssi = cx->getCachedLocation( key, isBackward );
|
||||
if (!ssi.second) {
|
||||
return ssi;
|
||||
}
|
||||
|
||||
for(int i = 0; i < ssi.second->size(); i++) {
|
||||
if( IFailureMonitor::failureMonitor().onlyEndpointFailed(ssi.second->get(i, member).getEndpoint()) ) {
|
||||
cx->invalidateCache( key );
|
||||
ssi.second.clear();
|
||||
return ssi;
|
||||
}
|
||||
}
|
||||
|
||||
return ssi;
|
||||
}
|
||||
|
||||
template <class F>
|
||||
vector< pair<KeyRange,Reference<LocationInfo>> > getCachedKeyRangeLocations( Database cx, KeyRange keys, int limit, bool reverse, F StorageServerInterface::*member ) {
|
||||
ASSERT (!keys.empty());
|
||||
|
||||
vector< pair<KeyRange,Reference<LocationInfo>> > locations;
|
||||
if (!cx->getCachedLocations(keys, locations, limit, reverse))
|
||||
return vector< pair<KeyRange,Reference<LocationInfo>> >();
|
||||
|
||||
bool foundFailed = false;
|
||||
for(auto& it : locations) {
|
||||
bool onlyEndpointFailed = false;
|
||||
for(int i = 0; i < it.second->size(); i++) {
|
||||
if( IFailureMonitor::failureMonitor().onlyEndpointFailed(it.second->get(i, member).getEndpoint()) ) {
|
||||
onlyEndpointFailed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if( onlyEndpointFailed ) {
|
||||
cx->invalidateCache( it.first.begin );
|
||||
foundFailed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if(foundFailed) {
|
||||
return vector< pair<KeyRange,Reference<LocationInfo>> >();
|
||||
}
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
//If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
|
||||
ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
|
||||
if (isBackward) {
|
||||
|
@ -1018,10 +1067,6 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database
|
|||
ASSERT( key < allKeys.end );
|
||||
}
|
||||
|
||||
auto loc = cx->getCachedLocation( key, isBackward );
|
||||
if (loc.second)
|
||||
return loc;
|
||||
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
|
||||
|
||||
|
@ -1040,7 +1085,7 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
||||
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
|
||||
|
||||
|
@ -1067,21 +1112,11 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
|
|||
}
|
||||
}
|
||||
|
||||
Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
||||
ASSERT (!keys.empty());
|
||||
|
||||
vector< pair<KeyRange,Reference<LocationInfo>> > result;
|
||||
if (cx->getCachedLocations(keys, result, limit, reverse))
|
||||
return result;
|
||||
|
||||
return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
|
||||
}
|
||||
|
||||
ACTOR Future<Void> warmRange_impl( Transaction *self, Database cx, KeyRange keys ) {
|
||||
state int totalRanges = 0;
|
||||
state int totalRequests = 0;
|
||||
loop {
|
||||
vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations_internal(cx, keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, false, self->info));
|
||||
vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(cx, keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, false, self->info));
|
||||
totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT;
|
||||
totalRequests++;
|
||||
if(locations.size() == 0 || totalRanges >= cx->locationCacheSize || locations[locations.size()-1].first.end >= keys.end)
|
||||
|
@ -1117,28 +1152,13 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
state Version ver = wait( version );
|
||||
validateVersion(ver);
|
||||
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi;
|
||||
loop {
|
||||
ssi = cx->getCachedLocation( key );
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = getCachedKeyLocation(cx, key, &StorageServerInterface::getValue);
|
||||
if (!ssi.second) {
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, key, info ) );
|
||||
ssi = std::move(ssi2);
|
||||
} else {
|
||||
bool onlyEndpointFailed = false;
|
||||
for(int i = 0; i < ssi.second->size(); i++) {
|
||||
if( IFailureMonitor::failureMonitor().onlyEndpointFailed(ssi.second->get(i, &StorageServerInterface::getValue).getEndpoint()) ) {
|
||||
onlyEndpointFailed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if( onlyEndpointFailed ) {
|
||||
cx->invalidateCache( key );
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, key, info ) );
|
||||
ssi = std::move(ssi2);
|
||||
}
|
||||
}
|
||||
//Reference<LocationInfo> ssi = wait( getKeyLocation( cx, key ) );
|
||||
|
||||
state Optional<UID> getValueID = Optional<UID>();
|
||||
state uint64_t startTime;
|
||||
state double startTimeD;
|
||||
|
@ -1216,7 +1236,13 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
|||
return Key();
|
||||
}
|
||||
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, Key(k.getKey(), k.arena()), info, k.isBackward()) );
|
||||
Key locationKey(k.getKey(), k.arena());
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = getCachedKeyLocation(cx, locationKey, &StorageServerInterface::getKey, k.isBackward());
|
||||
if(!ssi.second) {
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation(cx, locationKey, info, k.isBackward()) );
|
||||
ssi = std::move(ssi2);
|
||||
}
|
||||
|
||||
try {
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", printable(k.getKey())).detail("offset",k.offset).detail("orEqual",k.orEqual);
|
||||
|
@ -1274,7 +1300,7 @@ ACTOR Future< Void > watchValue( Future<Version> version, Key key, Optional<Valu
|
|||
ASSERT(ver != latestVersion);
|
||||
|
||||
loop {
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = cx->getCachedLocation( key );
|
||||
state pair<KeyRange, Reference<LocationInfo>> ssi = getCachedKeyLocation(cx, key, &StorageServerInterface::watchValue );
|
||||
if (!ssi.second) {
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, key, info ) );
|
||||
ssi = std::move(ssi2);
|
||||
|
@ -1349,7 +1375,11 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
|||
|
||||
//printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
|
||||
loop {
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, info ) );
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = getCachedKeyRangeLocations( cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValues);
|
||||
if (!locations.size()) {
|
||||
vector< pair<KeyRange, Reference<LocationInfo>> > _locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, info ) );
|
||||
locations = std::move(_locations);
|
||||
}
|
||||
|
||||
ASSERT( locations.size() );
|
||||
state int shard = 0;
|
||||
|
@ -1605,7 +1635,14 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
|||
return output;
|
||||
}
|
||||
|
||||
state pair<KeyRange, Reference<LocationInfo>> beginServer = wait( getKeyLocation( cx, reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()), info, reverse ? (end-1).isBackward() : begin.isBackward() ) );
|
||||
Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena());
|
||||
bool locationBackward = reverse ? (end-1).isBackward() : begin.isBackward();
|
||||
state pair<KeyRange, Reference<LocationInfo>> beginServer = getCachedKeyLocation( cx, locationKey, &StorageServerInterface::getKeyValues, locationBackward );
|
||||
if (!beginServer.second) {
|
||||
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, locationKey, info, locationBackward ) );
|
||||
beginServer = std::move(ssi2);
|
||||
}
|
||||
|
||||
state KeyRange shard = beginServer.first;
|
||||
state bool modifiedSelectors = false;
|
||||
state GetKeyValuesRequest req;
|
||||
|
@ -2858,8 +2895,12 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
|
|||
{
|
||||
state int tooManyShardsCount = 0;
|
||||
loop {
|
||||
state vector< pair<KeyRange,Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, TransactionInfo(TaskDataDistribution) ) );
|
||||
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = getCachedKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics);
|
||||
if (!locations.size()) {
|
||||
vector< pair<KeyRange, Reference<LocationInfo>> > _locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, TransactionInfo(TaskDataDistribution) ) );
|
||||
locations = std::move(_locations);
|
||||
}
|
||||
|
||||
if( locations.size() == shardLimit ) {
|
||||
TraceEvent(!g_network->isSimulated() && ++tooManyShardsCount >= 15 ? SevWarnAlways : SevWarn, "WaitStorageMetricsPenalty")
|
||||
.detail("Keys", printable(keys))
|
||||
|
@ -2912,7 +2953,11 @@ Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, i
|
|||
ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated )
|
||||
{
|
||||
loop {
|
||||
state vector< pair<KeyRange,Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, TransactionInfo(TaskDataDistribution) ) );
|
||||
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = getCachedKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics);
|
||||
if (!locations.size()) {
|
||||
vector< pair<KeyRange, Reference<LocationInfo>> > _locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, TransactionInfo(TaskDataDistribution) ) );
|
||||
locations = std::move(_locations);
|
||||
}
|
||||
state StorageMetrics used;
|
||||
state Standalone<VectorRef<KeyRef>> results;
|
||||
|
||||
|
|
Loading…
Reference in New Issue