Merge branch 'master' into java-add-missing-dispose

# Conflicts:
#	bindings/java/src-completable/main/com/apple/foundationdb/Cluster.java
#	bindings/java/src-completable/main/com/apple/foundationdb/async/AsyncUtil.java
This commit is contained in:
A.J. Beamon 2017-12-12 15:04:14 -08:00
commit c3364a2225
9 changed files with 96 additions and 298 deletions

View File

@ -35,13 +35,6 @@ ifeq ($(PLATFORM),Linux)
CXX ?= g++
CXXFLAGS += -std=c++0x
HARDENING_CFLAGS := -fstack-protector-all -Wstack-protector --param ssp-buffer-size=4 -fPIC
CFLAGS += ${HARDENING_CFLAGS}
# TODO(alexmiller): boost 1.52.0 prevents us from using most of these with -Werror.
# Reassess after boost has been upgraded to >1.52.0.
# CFLAGS += -Wall -Wextra -Wformat-security -Wconversion -Wsign-conversion -Werror
HARDENING_LDFLAGS := -Wl,-z,noexecstack -Wl,-z,relro -Wl,-z,now
LDFLAGS := ${HARDENING_CFLAGS} ${HARDENING_LDFLAGS}
BOOSTDIR ?= /opt/boost_1_52_0
DLEXT := so
@ -91,6 +84,7 @@ CFLAGS += -g
# valgrind-compatibile builds are enabled by uncommenting lines in valgind.mk
CXXFLAGS += -Wno-deprecated
LDFLAGS :=
LIBS :=
STATIC_LIBS :=

View File

@ -15,9 +15,7 @@ case $1 in
OPTIONS="$OPTIONS -Wl,-dylib_install_name -Wl,$( basename $3 )"
fi
else
if [ "$PLATFORM" = "linux" ]; then
OPTIONS="$OPTIONS -pie -fPIE"
fi
OPTIONS=
fi
OPTIONS=$( eval echo "$OPTIONS $LDFLAGS \$$2_LDFLAGS \$$2_OBJECTS \$$2_LIBS \$$2_STATIC_LIBS_REAL -o $3" )

View File

@ -66,8 +66,6 @@ public:
Reference<LocationInfo> setCachedLocation( const KeyRangeRef&, const vector<struct StorageServerInterface>& );
void invalidateCache( const KeyRef&, bool isBackward = false );
void invalidateCache( const KeyRangeRef& );
void invalidateCache( Reference<LocationInfo> const& );
void invalidateCache( std::vector<UID> const& );
Reference<ProxyInfo> getMasterProxies();
Future<Reference<ProxyInfo>> getMasterProxiesFuture();
@ -127,12 +125,6 @@ public:
// Cache of location information
int locationCacheSize;
CoalescedKeyRangeMap< Reference<LocationInfo> > locationCache;
mutable SpinLock locationCacheLock;
// Maps the non-instance-specific SSI.id() (as stored in keyServers) to a specific instance of
// the interface (as stored in serverList)
std::map< UID, Future< Optional<StorageServerInterface> > > SSInterfaceCache;
mutable SpinLock SSInterfaceCacheLock;
std::map< std::vector<UID>, LocationInfo* > ssid_locationInfo;

View File

@ -33,7 +33,7 @@ struct MasterProxyInterface {
RequestStream< struct CommitTransactionRequest > commit;
RequestStream< struct GetReadVersionRequest > getConsistentReadVersion; // Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (at some point between when this request is sent and when its response is received, the latest version reported committed)
RequestStream< ReplyPromise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> > getKeyServersLocations;
RequestStream< struct GetKeyServerLocationsRequest > getKeyServersLocations;
RequestStream< struct GetStorageServerRejoinInfoRequest > getStorageServerRejoinInfo;
RequestStream<ReplyPromise<Void>> waitFailure;
@ -136,6 +136,30 @@ struct GetReadVersionRequest {
}
};
struct GetKeyServerLocationsReply {
vector<pair<KeyRangeRef, vector<StorageServerInterface>>> results;
template <class Ar>
void serialize(Ar& ar) {
ar & results;
}
};
struct GetKeyServerLocationsRequest {
Arena arena;
KeyRangeRef range;
int limit;
ReplyPromise<GetKeyServerLocationsReply> reply;
GetKeyServerLocationsRequest() : limit(0) {}
GetKeyServerLocationsRequest( KeyRangeRef const& range, int limit, Arena const& arena ) : range( range ), limit( limit ), arena( arena ) {}
template <class Ar>
void serialize(Ar& ar) {
ar & range & limit & reply & arena;
}
};
struct GetRawCommittedVersionRequest {
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;

View File

@ -538,9 +538,6 @@ Database DatabaseContext::create( Reference<AsyncVar<ClientDBInfo>> info, Future
DatabaseContext::~DatabaseContext() {
monitorMasterProxiesInfoChange.cancel();
locationCacheLock.assertNotEntered();
SSInterfaceCacheLock.assertNotEntered();
SSInterfaceCache.clear();
for(auto it = ssid_locationInfo.begin(); it != ssid_locationInfo.end(); it = ssid_locationInfo.erase(it))
it->second->notifyContextDestroyed();
ASSERT( ssid_locationInfo.empty() );
@ -548,7 +545,6 @@ DatabaseContext::~DatabaseContext() {
}
pair<KeyRange,Reference<LocationInfo>> DatabaseContext::getCachedLocation( const KeyRef& key, bool isBackward ) {
SpinLockHolder hold( locationCacheLock );
if( isBackward ) {
auto range = locationCache.rangeContainingKeyBefore(key);
return std::make_pair(range->range(), range->value());
@ -561,7 +557,6 @@ pair<KeyRange,Reference<LocationInfo>> DatabaseContext::getCachedLocation( const
bool DatabaseContext::getCachedLocations( const KeyRangeRef& range, vector<std::pair<KeyRange,Reference<LocationInfo>>>& result, int limit, bool reverse ) {
result.clear();
SpinLockHolder hold( locationCacheLock );
auto locRanges = locationCache.intersectingRanges(range);
auto begin = locationCache.rangeContaining(range.begin);
@ -592,15 +587,12 @@ bool DatabaseContext::getCachedLocations( const KeyRangeRef& range, vector<std::
Reference<LocationInfo> DatabaseContext::setCachedLocation( const KeyRangeRef& keys, const vector<StorageServerInterface>& servers ) {
int maxEvictionAttempts = 100, attempts = 0;
SpinLockHolder hold( locationCacheLock );
Reference<LocationInfo> loc = LocationInfo::getInterface( this, servers, clientLocality);
while( locationCache.size() > locationCacheSize && attempts < maxEvictionAttempts) {
TEST( true ); // NativeAPI storage server locationCache entry evicted
attempts++;
auto r = locationCache.randomRange();
Key begin = r.begin(), end = r.end(); // insert invalidates r, so can't be passed a mere reference into it
if( begin >= keyServersPrefix && attempts > maxEvictionAttempts / 2)
continue;
locationCache.insert( KeyRangeRef(begin, end), Reference<LocationInfo>() );
}
locationCache.insert( keys, loc );
@ -608,7 +600,6 @@ Reference<LocationInfo> DatabaseContext::setCachedLocation( const KeyRangeRef& k
}
void DatabaseContext::invalidateCache( const KeyRef& key, bool isBackward ) {
SpinLockHolder hold( locationCacheLock );
if( isBackward )
locationCache.rangeContainingKeyBefore(key)->value() = Reference<LocationInfo>();
else
@ -616,28 +607,11 @@ void DatabaseContext::invalidateCache( const KeyRef& key, bool isBackward ) {
}
void DatabaseContext::invalidateCache( const KeyRangeRef& keys ) {
SpinLockHolder hold( locationCacheLock );
auto rs = locationCache.intersectingRanges(keys);
Key begin = rs.begin().begin(), end = rs.end().begin(); // insert invalidates rs, so can't be passed a mere reference into it
locationCache.insert( KeyRangeRef(begin, end), Reference<LocationInfo>() );
}
void DatabaseContext::invalidateCache( Reference<LocationInfo> const& ref ) {
if( !ref )
return;
SpinLockHolder hold( SSInterfaceCacheLock );
for(int i=0; i<ref->size(); i++)
SSInterfaceCache.erase( ref->getId(i) );
}
void DatabaseContext::invalidateCache( std::vector<UID> const& ids ) {
SpinLockHolder hold( SSInterfaceCacheLock );
for( auto id : ids ) {
SSInterfaceCache.erase( id );
}
}
Future<Void> DatabaseContext::onMasterProxiesChanged() {
return this->masterProxiesChangeTrigger.onTrigger();
}
@ -665,7 +639,6 @@ uint64_t extractHexOption( StringRef value ) {
}
void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value) {
SpinLockHolder hold( locationCacheLock );
switch(option) {
case FDBDatabaseOptions::LOCATION_CACHE_SIZE:
locationCacheSize = (int)extractIntOption(value, 0, std::numeric_limits<int>::max());
@ -1022,52 +995,6 @@ ACTOR Future<Optional<StorageServerInterface>> fetchServerInterface( Database cx
return decodeServerListValue(val.get());
}
Future<Optional<StorageServerInterface>> _getServerInterfaceImpl( Database cx, TransactionInfo info, UID id ) {
// check the cache
SpinLockHolder hold( cx->SSInterfaceCacheLock );
auto it = cx->SSInterfaceCache.find( id );
if( it != cx->SSInterfaceCache.end() ) {
return it->second;
}
auto fetcher = fetchServerInterface(cx, info, id);
if( BUGGIFY) {
fetcher = Optional<StorageServerInterface>();
}
cx->SSInterfaceCache[ id ] = fetcher;
return fetcher;
}
ACTOR Future<Optional<StorageServerInterface>> getServerInterface( Database cx, TransactionInfo info, UID id ) {
try {
Optional<StorageServerInterface> result = wait( _getServerInterfaceImpl(cx, info, id) );
return result;
} catch( Error& ) {
SpinLockHolder hold( cx->SSInterfaceCacheLock );
cx->SSInterfaceCache.erase( id );
throw;
}
}
ACTOR Future<Optional<vector<StorageServerInterface>>> getServerInterfaces(
Database cx, TransactionInfo info, vector<UID> ids ) {
state vector< Future< Optional<StorageServerInterface> > > serverListEntries;
for( int s = 0; s < ids.size(); s++ ) {
serverListEntries.push_back( getServerInterface( cx, info, ids[s] ) );
}
vector<Optional<StorageServerInterface>> serverListValues = wait( getAll(serverListEntries) );
vector<StorageServerInterface> serverInterfaces;
for( int s = 0; s < serverListValues.size(); s++ ) {
if( !serverListValues[s].present() ) {
// A storage server has been removed from ServerList since we read keyServers
return Optional<vector<StorageServerInterface>>();
}
serverInterfaces.push_back( serverListValues[s].get() );
}
return serverInterfaces;
}
ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInterfaces( Future<Version> ver, Database cx, TransactionInfo info, vector<UID> ids ) {
state vector< Future< Optional<StorageServerInterface> > > serverListEntries;
for( int s = 0; s < ids.size(); s++ ) {
@ -1088,176 +1015,71 @@ ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInt
//If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
if (isBackward)
if (isBackward) {
ASSERT( key != allKeys.begin && key <= allKeys.end );
else
} else {
ASSERT( key < allKeys.end );
}
auto loc = cx->getCachedLocation( key, isBackward );
if (loc.second)
return loc;
state vector<StorageServerInterface> serverInterfaces;
state KeyRangeRef range;
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
// We assume that not only /FF/keyServers but /FF/serverList is present on the keyServersLocations since we now need both of them to terminate our search. Currently this is guaranteed because nothing after /FF/keyServers is split.
if ( ( key.startsWith( serverListPrefix) && (!isBackward || key.size() > serverListPrefix.size()) ) ||
( key.startsWith( keyServersPrefix ) && (!isBackward || key.size() > keyServersPrefix.size()) )) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
loop {
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {}
when ( vector<pair<KeyRangeRef, vector<StorageServerInterface>>> keyServersShards = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, ReplyPromise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>>(), TaskDefaultPromiseEndpoint ) ) ) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
ASSERT( keyServersShards.size() ); // There should always be storage servers, except on version 0 which should not get to this function
state Key submitKey = isBackward ? key : keyAfter(key);
loop {
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(KeyRangeRef(submitKey,submitKey), 1000, submitKey.arena()), TaskDefaultPromiseEndpoint ) ) ) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
ASSERT( rep.results.size() );
Reference<LocationInfo> cachedLocation;
for (pair<KeyRangeRef, vector<StorageServerInterface>> keyServersShard : keyServersShards) {
auto locationInfo = cx->setCachedLocation(keyServersShard.first, keyServersShard.second);
Reference<LocationInfo> cachedLocation;
KeyRangeRef range;
for (pair<KeyRangeRef, vector<StorageServerInterface>> shard : rep.results) {
auto locationInfo = cx->setCachedLocation(shard.first, shard.second);
if (isBackward ? (keyServersShard.first.begin < key && keyServersShard.first.end >= key) : keyServersShard.first.contains(key)) {
range = keyServersShard.first;
cachedLocation = locationInfo;
}
if (isBackward ? (shard.first.begin < key && shard.first.end >= key) : shard.first.contains(key)) {
range = shard.first;
cachedLocation = locationInfo;
}
ASSERT(isBackward ? (range.begin < key && range.end >= key) : range.contains(key));
return make_pair(range, cachedLocation);
}
}
}
} else {
loop {
auto keyServersK = keyServersKey(key);
state Standalone<RangeResultRef> results;
if( isBackward ) {
Standalone<RangeResultRef> _results = wait(
getRange( cx, latestVersion,
lastLessThan( keyServersK ),
firstGreaterOrEqual( keyServersK ) + 1,
GetRangeLimits( 2 ), false, info ) );
results = _results;
} else {
Standalone<RangeResultRef> _results = wait(
getRange( cx, latestVersion,
lastLessOrEqual( keyServersK ),
firstGreaterThan( keyServersK ) + 1,
GetRangeLimits( 2 ), false, info ) );
results = _results;
}
ASSERT( results.size() == 2 );
ASSERT( results[0].key.startsWith( keyServersPrefix ) );
ASSERT( results[1].key.startsWith( keyServersPrefix ) );
ASSERT(isBackward ? (range.begin < key && range.end >= key) : range.contains(key));
range = KeyRangeRef(
results[0].key.removePrefix( keyServersPrefix ),
results[1].key.removePrefix( keyServersPrefix ));
state vector<UID> src;
vector<UID> dest;
decodeKeyServersValue( results[0].value, src, dest );
if (!src.size()) {
vector<UID> src1;
if (results[1].value.size())
decodeKeyServersValue( results[1].value, src1, dest );
TraceEvent(SevError, "getKeyLocation_ZeroShardServers")
.detail("Key", printable(key))
.detail("r0Key", printable(results[0].key))
.detail("r0SrcCount", (int)src.size())
.detail("r1Key", printable(results[1].key))
.detail("r1SrcCount", (int)src1.size());
ASSERT(false);
}
Optional<vector<StorageServerInterface>> interfs = wait( getServerInterfaces(cx, info, src) );
if( interfs.present() ) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Interfs.present");
serverInterfaces = interfs.get();
break;
} else {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Interfs.notpresent");
TEST( true ); //GetKeyLocation did not find server in serverlist
cx->invalidateCache( src );
Void _ = wait( delay( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) );
return make_pair(range, cachedLocation);
}
}
}
ASSERT(isBackward ? (range.begin < key && range.end >= key) : range.contains(key));
return make_pair(range, cx->setCachedLocation( range, serverInterfaces ));
}
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
//printf("getKeyRangeLocations: getting '%s'-'%s'\n", keyServersKey(keys.begin).toString().c_str(), keyServersKey(keys.end).toString().c_str());
state Arena arena = keys.arena();
state KeyRef newBegin = keyAfter(keys.begin, arena);
state KeyRangeRef requestRange = KeyRangeRef(newBegin, keys.end);
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
loop {
state vector< pair<KeyRange,Reference<LocationInfo>> > result;
state vector< pair< KeyRangeRef, Future< Optional< vector<StorageServerInterface> > > > > serverListReads;
state vector< vector< UID > > serverListServers;
state bool ok = true;
state Standalone<RangeResultRef> keyServersEntries =
wait(
getRange( cx, latestVersion,
lastLessOrEqual( keyServersKey(keys.begin) ),
firstGreaterOrEqual( keyServersKey(keys.end) ) + 1,
GetRangeLimits( limit + 1 ), reverse, info ) );
//printf("getKeyRangeLocations: got %d\n", keyServersEntries.size());
int startOffset = reverse ? 1 : 0;
int endOffset = 1 - startOffset;
state int shard;
KeyRangeRef range;
for(shard=0; shard < keyServersEntries.size() - 1; shard++) {
range = KeyRangeRef(
keyServersEntries[shard + startOffset].key.substr(keyServersPrefix.size()),
keyServersEntries[shard + endOffset].key.substr(keyServersPrefix.size()) );
vector<UID> servers;
vector<UID> destServers;
decodeKeyServersValue( keyServersEntries[shard + startOffset].value, servers, destServers );
ASSERT( servers.size() );
serverListReads.push_back( make_pair( range, getServerInterfaces(cx, info, servers) ) );
serverListServers.push_back( servers );
}
for(shard=0; shard < keyServersEntries.size() - 1; shard++) {
state Optional< vector<StorageServerInterface> > serverListValues = wait( serverListReads[shard].second );
Void _ = wait(yield(info.taskID));
if( !serverListValues.present() ) {
TEST( true ); //GetKeyLocations did not find server in serverlist
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(requestRange, (reverse?-1:1)*(limit+1), arena), TaskDefaultPromiseEndpoint ) ) ) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Interfs.notpresent");
cx->invalidateCache( serverListServers[shard] );
ok = false;
break;
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After");
ASSERT( rep.results.size() );
vector< pair<KeyRange,Reference<LocationInfo>> > results;
for (pair<KeyRangeRef, vector<StorageServerInterface>> shard : rep.results) {
results.push_back( make_pair(shard.first & keys, cx->setCachedLocation(shard.first, shard.second)) );
}
return results;
}
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Interfs.present");
result.push_back(
make_pair( keys & serverListReads[shard].first,
cx->setCachedLocation( serverListReads[shard].first, serverListValues.get() ) ) );
}
if (ok) {
ASSERT(result.size());
ASSERT(reverse || (result[0].first.begin == keys.begin && result[result.size() - 1].first.end <= keys.end));
ASSERT(!reverse || (result[0].first.end == keys.end && result[result.size() - 1].first.begin >= keys.begin));
return result;
}
Void _ = wait( delay( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) );
}
}
@ -1311,7 +1133,6 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
if( onlyEndpointFailed ) {
cx->invalidateCache( key );
cx->invalidateCache( ssi.second );
pair<KeyRange, Reference<LocationInfo>> ssi2 = wait( getKeyLocation( cx, key, info ) );
ssi = std::move(ssi2);
}
@ -1368,7 +1189,6 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
(e.code() == error_code_transaction_too_old && ver == latestVersion) ) {
cx->invalidateCache( key );
cx->invalidateCache( ssi.second );
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
} else {
if (trLogInfo)
@ -1408,7 +1228,6 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
} catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
cx->invalidateCache(k.getKey(), k.isBackward());
cx->invalidateCache( ssi.second );
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
} else {
@ -1483,7 +1302,6 @@ ACTOR Future< Void > watchValue( Future<Version> version, Key key, Optional<Valu
} catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
cx->invalidateCache( key );
cx->invalidateCache( ssi.second );
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
} else if( e.code() == error_code_watch_cancelled ) {
TEST( true ); // Too many watches on the storage server, poll for changes instead
@ -1634,7 +1452,6 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
keys = KeyRangeRef( range.begin, keys.end );
cx->invalidateCache( keys );
cx->invalidateCache( locations[shard].second );
Void _ = wait( delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID ));
break;
} else {
@ -1925,7 +1742,6 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
(e.code() == error_code_transaction_too_old && readVersion == latestVersion))
{
cx->invalidateCache( reverse ? end.getKey() : begin.getKey(), reverse ? (end-1).isBackward() : begin.isBackward() );
cx->invalidateCache( beginServer.second );
if (e.code() == error_code_wrong_shard_server) {
Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
@ -3047,9 +2863,6 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
throw;
}
cx->invalidateCache(keys);
for( auto const& location : locations ) {
cx->invalidateCache( location.second );
}
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
}
}
@ -3118,9 +2931,6 @@ ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx,
throw;
}
cx->invalidateCache( keys );
for( auto const& location : locations ) {
cx->invalidateCache( location.second );
}
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
}
}

View File

@ -1003,49 +1003,30 @@ ACTOR static Future<Void> readRequestServer(
TraceEvent("ProxyReadyForReads", proxy.id());
loop choose{
when(ReplyPromise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> req = waitNext(proxy.getKeyServersLocations.getFuture())) {
Standalone<VectorRef<KeyValueRef>> keyServersBegin = commitData->txnStateStore->readRange(KeyRangeRef(allKeys.begin, keyServersKeyServersKeys.begin), -1).get();
Standalone<VectorRef<KeyValueRef>> keyServersEnd = commitData->txnStateStore->readRange(KeyRangeRef(keyServersKeyServersKeys.end, allKeys.end), 2).get();
Standalone<VectorRef<KeyValueRef>> keyServersShardBoundaries = commitData->txnStateStore->readRange(KeyRangeRef(keyServersBegin[0].key, keyServersEnd[1].key)).get();
when(GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture())) {
Standalone<VectorRef<KeyValueRef>> keyServersBegin = commitData->txnStateStore->readRange(KeyRangeRef(allKeys.begin, req.range.begin.withPrefix(keyServersPrefix)), -1).get();
Standalone<VectorRef<KeyValueRef>> keyServersEnd = commitData->txnStateStore->readRange(KeyRangeRef(req.range.end.withPrefix(keyServersPrefix), allKeys.end), 2).get();
Standalone<VectorRef<KeyValueRef>> keyServersShardBoundaries = commitData->txnStateStore->readRange(KeyRangeRef(keyServersBegin[0].key, keyServersEnd[1].key), req.limit).get();
Standalone<VectorRef<KeyValueRef>> serverListBegin = commitData->txnStateStore->readRange(KeyRangeRef(allKeys.begin, keyServersKey(serverListKeys.begin)), -1).get();
Standalone<VectorRef<KeyValueRef>> serverListEnd = commitData->txnStateStore->readRange(KeyRangeRef(keyServersKey(serverListKeys.end), allKeys.end), 2).get();
Standalone<VectorRef<KeyValueRef>> serverListShardBoundaries = commitData->txnStateStore->readRange(KeyRangeRef(serverListBegin[0].key, serverListEnd[1].key)).get();
GetKeyServerLocationsReply rep;
rep.results.reserve(keyServersShardBoundaries.size() - 1);
bool ignoreFirstServerListShard = false;
if (keyServersShardBoundaries.back().key > serverListShardBoundaries.front().key)
ignoreFirstServerListShard = true;
// shards include all keyServers and serverLists information
vector<pair<KeyRangeRef, vector<StorageServerInterface>>> shards;
int reserveSize = keyServersShardBoundaries.size() + serverListShardBoundaries.size() - 2 - (ignoreFirstServerListShard ? 1 : 0);
shards.reserve(reserveSize);
int startOffset = req.limit < 0 ? 1 : 0;
int endOffset = 1 - startOffset;
for (int i = 0; i < keyServersShardBoundaries.size() - 1; i++) {
vector<UID> src, dest;
decodeKeyServersValue(keyServersShardBoundaries[i].value, src, dest);
decodeKeyServersValue(keyServersShardBoundaries[i+startOffset].value, src, dest);
vector<StorageServerInterface> ssis;
ssis.reserve(src.size());
for (auto const& id : src) {
ssis.push_back(decodeServerListValue(commitData->txnStateStore->readValue(serverListKeyFor(id)).get().get()));
}
shards.push_back(std::make_pair(KeyRangeRef(keyServersShardBoundaries[i].key.removePrefix(keyServersPrefix), keyServersShardBoundaries[i + 1].key.removePrefix(keyServersPrefix)), ssis));
rep.results.push_back(std::make_pair(KeyRangeRef(keyServersShardBoundaries[i+startOffset].key.removePrefix(keyServersPrefix), keyServersShardBoundaries[i+endOffset].key.removePrefix(keyServersPrefix)), ssis));
}
for (int i = ignoreFirstServerListShard ? 1 : 0 ; i < serverListShardBoundaries.size() - 1; i++) {
vector<UID> src, dest;
decodeKeyServersValue(serverListShardBoundaries[i].value, src, dest);
vector<StorageServerInterface> ssis;
ssis.reserve(src.size());
for (auto const& id : src) {
ssis.push_back(decodeServerListValue(commitData->txnStateStore->readValue(serverListKeyFor(id)).get().get()));
}
shards.push_back(std::make_pair(KeyRangeRef(serverListShardBoundaries[i].key.removePrefix(keyServersPrefix), serverListShardBoundaries[i + 1].key.removePrefix(keyServersPrefix)), ssis));
}
req.send(shards);
req.reply.send(rep);
}
when(GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture())) {
if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) {

View File

@ -433,8 +433,8 @@ ACTOR Future<Standalone<CommitTransactionRef>> provisionalMaster( Reference<Mast
}
}
}
when ( ReplyPromise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> req = waitNext( parent->provisionalProxies[0].getKeyServersLocations.getFuture() ) ) {
req.send(Never());
when ( GetKeyServerLocationsRequest req = waitNext( parent->provisionalProxies[0].getKeyServersLocations.getFuture() ) ) {
req.reply.send(Never());
}
when ( Void _ = wait( waitFailure ) ) { throw worker_removed(); }
}

View File

@ -315,9 +315,10 @@ struct ConsistencyCheckWorkload : TestWorkload
state Reference<ProxyInfo> proxyInfo = wait(cx->getMasterProxiesFuture());
//Try getting key server locations from the master proxies
state vector<Future<ErrorOr<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>>>> keyServerLocationFutures;
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
state KeyRange keyServerRange = keyServersKeys;
for(int i = 0; i < proxyInfo->size(); i++)
keyServerLocationFutures.push_back(proxyInfo->get(i,&MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(ReplyPromise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>>(), 2, 0));
keyServerLocationFutures.push_back(proxyInfo->get(i,&MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(keyServerRange, 1000, keyServerRange.arena()), 2, 0));
choose {
when( Void _ = wait(waitForAll(keyServerLocationFutures)) ) {
@ -326,7 +327,7 @@ struct ConsistencyCheckWorkload : TestWorkload
state bool successful = true;
for(int i = 0; i < keyServerLocationFutures.size(); i++)
{
ErrorOr<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> shards = keyServerLocationFutures[i].get();
ErrorOr<GetKeyServerLocationsReply> shards = keyServerLocationFutures[i].get();
//If performing quiescent check, then all master proxies should be reachable. Otherwise, only one needs to be reachable
if(self->performQuiescentChecks && !shards.present())
@ -340,7 +341,7 @@ struct ConsistencyCheckWorkload : TestWorkload
//If we are doing a quiescent check, then we only need to do this for the first shard.
if(shards.present() && (i == 0 || !self->performQuiescentChecks))
{
keyServers = shards.get();
keyServers = shards.get().results;
if(!self->performQuiescentChecks)
break;
}

View File

@ -35,6 +35,10 @@ struct CpuProfilerWorkload : TestWorkload
//How long the profiler should be run; if <= 0 then it will run until the workload's check function is called
double duration;
//What process classes should be profiled as part of this run?
//See Locality.h for the list of valid strings to provide.
vector<std::string> roles;
//A list of worker interfaces which have had profiling turned on
std::vector<WorkerInterface> profilingWorkers;
@ -43,6 +47,7 @@ struct CpuProfilerWorkload : TestWorkload
{
initialDelay = getOption(options, LiteralStringRef("initialDelay"), 0.0);
duration = getOption(options, LiteralStringRef("duration"), -1.0);
roles = getOption(options, LiteralStringRef("roles"), vector<std::string>());
success = true;
}
@ -66,8 +71,11 @@ struct CpuProfilerWorkload : TestWorkload
{
vector<std::pair<WorkerInterface, ProcessClass>> _workers = wait( getWorkers( self->dbInfo ) );
vector<WorkerInterface> workers;
for(int i = 0; i < _workers.size(); i++)
workers.push_back(_workers[i].first);
for(int i = 0; i < _workers.size(); i++) {
if (self->roles.empty() || std::find(self->roles.cbegin(), self->roles.cend(), _workers[i].second.toString()) != self->roles.cend()) {
workers.push_back(_workers[i].first);
}
}
self->profilingWorkers = workers;
}
@ -98,16 +106,6 @@ struct CpuProfilerWorkload : TestWorkload
TraceEvent("DoneSignalingProfiler");
}
// Profiling the testers is already covered above, as the workers listed include all testers.
// TODO(alexmiller): Create role-restricted profiling, and consider restoring the below.
// Enable (or disable) the profiler on the current tester
// ProfilerRequest req;
// req.type = ProfilerRequest::Type::FLOW;
// req.action = enabled ? ProfilerRequest::Action::ENABLE : ProfilerRequest::Action::DISABLE;
// req.duration = 0; //unused
// req.outputFile = StringRef(SERVER_KNOBS->LOG_DIRECTORY + "/" + toIPString(g_network->getLocalAddress().ip) + "." + format("%d", g_network->getLocalAddress().port) + ".profile.local.bin");
// updateCpuProfiler(req);
return Void();
}