From 1dc9eceb6d3bf63c03cc08d832b025a4bd759f5b Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Fri, 15 Dec 2017 20:13:44 -0800 Subject: [PATCH] optimize GetKeyLocationRequests on the proxy so they only require a single map lookup, instead of doing 3 + (3* [number of ranges]) lookups --- fdbclient/FDBTypes.h | 18 +- fdbclient/MasterProxyInterface.h | 13 +- fdbclient/NativeAPI.actor.cpp | 37 ++--- fdbclient/StorageServerInterface.h | 11 ++ fdbserver/ApplyMetadataMutation.h | 119 ++++++++----- fdbserver/MasterProxyServer.actor.cpp | 157 ++++++++++-------- fdbserver/MoveKeys.actor.cpp | 2 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 6 - .../workloads/ConsistencyCheck.actor.cpp | 2 +- 9 files changed, 217 insertions(+), 148 deletions(-) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index e8edad2382..7828f3a17e 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -37,6 +37,12 @@ typedef int64_t Generation; struct KeyRangeRef; struct KeyValueRef; +template +void uniquify( Collection& c ) { + std::sort(c.begin(), c.end()); + c.resize( std::unique(c.begin(), c.end()) - c.begin() ); +} + static std::string describe( const Tag item ) { return format("%d", item); } @@ -95,6 +101,12 @@ std::string printable( const VectorRef& val ); std::string printable( const VectorRef& val ); std::string printable( const KeyValueRef& val ); +inline bool equalsKeyAfter( const KeyRef& key, const KeyRef& compareKey ) { + if( key.size()+1 != compareKey.size() || compareKey[compareKey.size()-1] != 0 ) + return false; + return compareKey.startsWith( key ); +} + struct KeyRangeRef { const KeyRef begin, end; KeyRangeRef() {} @@ -110,6 +122,7 @@ struct KeyRangeRef { bool contains( const KeyRangeRef& keys ) const { return begin <= keys.begin && keys.end <= end; } bool intersects( const KeyRangeRef& keys ) const { return begin < keys.end && keys.begin < end; } bool empty() const { return begin == end; } + bool singleKeyRange() const { return equalsKeyAfter(begin, end); } Standalone withPrefix( const StringRef& prefix ) const { return KeyRangeRef( begin.withPrefix(prefix), end.withPrefix(prefix) ); @@ -222,11 +235,6 @@ inline KeyRef keyAfter( const KeyRef& key, Arena& arena ) { t[key.size()] = 0; return KeyRef(t,key.size()+1); } -inline bool equalsKeyAfter( const KeyRef& key, const KeyRef& compareKey ) { - if( key.size()+1 != compareKey.size() || compareKey[compareKey.size()-1] != 0 ) - return false; - return compareKey.startsWith( key ); -} inline KeyRange singleKeyRange( const KeyRef& a ) { return KeyRangeRef(a, keyAfter(a)); } diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index 09b4f42978..5984c5922d 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -137,26 +137,29 @@ struct GetReadVersionRequest { }; struct GetKeyServerLocationsReply { + Arena arena; vector>> results; template void serialize(Ar& ar) { - ar & results; + ar & results & arena; } }; struct GetKeyServerLocationsRequest { Arena arena; - KeyRangeRef range; + KeyRef begin; + Optional end; int limit; + bool reverse; ReplyPromise reply; - GetKeyServerLocationsRequest() : limit(0) {} - GetKeyServerLocationsRequest( KeyRangeRef const& range, int limit, Arena const& arena ) : range( range ), limit( limit ), arena( arena ) {} + GetKeyServerLocationsRequest() : limit(0), reverse(false) {} + GetKeyServerLocationsRequest( KeyRef const& begin, Optional const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {} template void serialize(Ar& ar) { - ar & range & limit & reply & arena; + ar & begin & end & limit & reverse & reply & arena; } }; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 508f6730b6..189a7a7e0d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1028,53 +1028,40 @@ ACTOR Future< pair> > getKeyLocation( Database if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before"); - state Key submitKey = isBackward ? key : keyAfter(key); loop { choose { when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {} - when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(KeyRangeRef(submitKey,submitKey), 1000, submitKey.arena()), TaskDefaultPromiseEndpoint ) ) ) { + when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional(), 1000, isBackward, key.arena()), TaskDefaultPromiseEndpoint ) ) ) { if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After"); - ASSERT( rep.results.size() ); + ASSERT( rep.results.size() == 1 ); - Reference cachedLocation; - KeyRangeRef range; - for (pair> shard : rep.results) { - auto locationInfo = cx->setCachedLocation(shard.first, shard.second); - - if (isBackward ? (shard.first.begin < key && shard.first.end >= key) : shard.first.contains(key)) { - range = shard.first; - cachedLocation = locationInfo; - } - } - - ASSERT(isBackward ? (range.begin < key && range.end >= key) : range.contains(key)); - - return make_pair(range, cachedLocation); + auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second); + return std::make_pair(KeyRange(rep.results[0].first, rep.arena), locationInfo); } } } } ACTOR Future< vector< pair> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) { - state Arena arena = keys.arena(); - state KeyRef newBegin = keyAfter(keys.begin, arena); - state KeyRangeRef requestRange = KeyRangeRef(newBegin, keys.end); - if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before"); loop { choose { when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {} - when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(requestRange, (reverse?-1:1)*(limit+1), arena), TaskDefaultPromiseEndpoint ) ) ) { + when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskDefaultPromiseEndpoint ) ) ) { + state GetKeyServerLocationsReply rep = _rep; if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After"); ASSERT( rep.results.size() ); - vector< pair> > results; - for (pair> shard : rep.results) { - results.push_back( make_pair(shard.first & keys, cx->setCachedLocation(shard.first, shard.second)) ); + state vector< pair> > results; + state int shard = 0; + for (; shard < rep.results.size(); shard++) { + //FIXME: these shards are being inserted into the map sequentially, it would be much more CPU efficient to save the map pairs and insert them all at once. + results.push_back( make_pair(rep.results[shard].first & keys, cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second)) ); + Void _ = wait(yield()); } return results; diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 73612ff7af..c3c6e10f58 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -82,6 +82,17 @@ struct StorageServerInterface { } }; +struct StorageInfo : NonCopyable, public ReferenceCounted { + Tag tag; + StorageServerInterface interf; + StorageInfo() : tag(invalidTag) {} +}; + +struct ServerCacheInfo { + std::vector tags; + std::vector> info; +}; + struct GetValueReply : public LoadBalancedReply { Optional value; diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index baa55f6f2c..7e755d4c5e 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -43,53 +43,54 @@ struct applyMutationsData { }; static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference logSystem = Reference(), Version popVersion = 0, - KeyRangeMap >* vecBackupKeys = NULL, KeyRangeMap>* keyTags = NULL, std::map* uid_applyMutationsData = NULL, - RequestStream commit = RequestStream(), Database cx = Database(), NotifiedVersion* commitVersion = NULL, std::map *tagCache = NULL, bool initialCommit = false) { + KeyRangeMap >* vecBackupKeys = NULL, KeyRangeMap* keyInfo = NULL, std::map* uid_applyMutationsData = NULL, + RequestStream commit = RequestStream(), Database cx = Database(), NotifiedVersion* commitVersion = NULL, std::map>* storageCache = NULL, bool initialCommit = false ) { for (auto const& m : mutations) { //TraceEvent("MetadataMutation", dbgid).detail("M", m.toString()); if (m.param1.size() && m.param1[0] == systemKeys.begin[0] && m.type == MutationRef::SetValue) { if(m.param1.startsWith(keyServersPrefix)) { - if(keyTags) { + if(keyInfo) { KeyRef k = m.param1.removePrefix(keyServersPrefix); if(k != allKeys.end) { - KeyRef end = keyTags->rangeContaining(k).end(); + KeyRef end = keyInfo->rangeContaining(k).end(); KeyRangeRef insertRange(k,end); vector src, dest; decodeKeyServersValue(m.param2, src, dest); - std::set tags; - if(!tagCache) { - for(auto id : src) - tags.insert(decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get())); - for(auto id : dest) - tags.insert(decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get())); - } - else { - Tag tag; - for(auto id : src) { - auto tagItr = tagCache->find(id); - if(tagItr == tagCache->end()) { - tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - (*tagCache)[id] = tag; - } else { - tag = tagItr->second; - } - tags.insert( tag ); - } - for(auto id : dest) { - auto tagItr = tagCache->find(id); - if(tagItr == tagCache->end()) { - tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - (*tagCache)[id] = tag; - } else { - tag = tagItr->second; - } - tags.insert( tag ); + ASSERT(storageCache); + Reference storageInfo; + ServerCacheInfo info; + + for(auto id : src) { + auto cacheItr = storageCache->find(id); + if(cacheItr == storageCache->end()) { + storageInfo = Reference( new StorageInfo() ); + storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); + storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); + (*storageCache)[id] = storageInfo; + } else { + storageInfo = cacheItr->second; } + ASSERT(storageInfo->tag != invalidTag); + info.tags.push_back( storageInfo->tag ); + info.info.push_back( storageInfo ); } - - keyTags->insert(insertRange,std::vector(tags.begin(), tags.end())); + for(auto id : dest) { + auto cacheItr = storageCache->find(id); + if(cacheItr == storageCache->end()) { + storageInfo = Reference( new StorageInfo() ); + storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); + storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); + (*storageCache)[id] = storageInfo; + } else { + storageInfo = cacheItr->second; + } + ASSERT(storageInfo->tag != invalidTag); + info.tags.push_back( storageInfo->tag ); + } + uniquify(info.tags); + keyInfo->insert(insertRange,info); } } if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2)); @@ -119,8 +120,19 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRefset(KeyValueRef(m.param1, m.param2)); - if(tagCache) { - (*tagCache)[id] = tag; + if(storageCache) { + auto cacheItr = storageCache->find(id); + if(cacheItr == storageCache->end()) { + Reference storageInfo = Reference( new StorageInfo() ); + storageInfo->tag = tag; + Optional interfKey = txnStateStore->readValue( serverListKeyFor(id) ).get(); + if(interfKey.present()) { + storageInfo->interf = decodeServerListValue( interfKey.get() ); + } + (*storageCache)[id] = storageInfo; + } else { + cacheItr->second->tag = tag; + } } } } @@ -134,7 +146,28 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRefset(KeyValueRef(m.param1, m.param2)); } - else if (m.param1.startsWith(serverListPrefix) || m.param1 == databaseLockedKey || m.param1.startsWith(applyMutationsBeginRange.begin) || + else if (m.param1.startsWith(serverListPrefix)) { + if(!initialCommit) { + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + if(storageCache) { + UID id = decodeServerListKey(m.param1); + StorageServerInterface interf = decodeServerListValue(m.param2); + + auto cacheItr = storageCache->find(id); + if(cacheItr == storageCache->end()) { + Reference storageInfo = Reference( new StorageInfo() ); + storageInfo->interf = interf; + Optional tagKey = txnStateStore->readValue( serverTagKeyFor(id) ).get(); + if(tagKey.present()) { + storageInfo->tag = decodeServerTagValue( tagKey.get() ); + } + (*storageCache)[id] = storageInfo; + } else { + cacheItr->second->interf = interf; + } + } + } + } else if( m.param1 == databaseLockedKey || m.param1.startsWith(applyMutationsBeginRange.begin) || m.param1.startsWith(applyMutationsAddPrefixRange.begin) || m.param1.startsWith(applyMutationsRemovePrefixRange.begin)) { if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2)); } @@ -218,9 +251,9 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRefinsert(clearRange, clearRange.begin == StringRef() ? vector() : keyTags->rangeContainingKeyBefore(clearRange.begin).value()); + keyInfo->insert(clearRange, clearRange.begin == StringRef() ? ServerCacheInfo() : keyInfo->rangeContainingKeyBefore(clearRange.begin).value()); } if(!initialCommit) txnStateStore->clear(r); @@ -254,7 +287,13 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRefclear( range & serverTagKeys ); + if(!initialCommit) { + KeyRangeRef clearRange = range & serverTagKeys; + txnStateStore->clear(clearRange); + if(storageCache && clearRange.singleKeyRange()) { + storageCache->erase(decodeServerTagKey(clearRange.begin)); + } + } } if (range.contains(coordinatorsKey)) { if(!initialCommit) txnStateStore->clear(singleKeyRange(coordinatorsKey)); diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 4576a3c79c..0e0bf8632b 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -181,7 +181,7 @@ struct ProxyCommitData { uint64_t commitVersionRequestNumber; uint64_t mostRecentProcessedRequestNumber; KeyRangeMap>> keyResolvers; - KeyRangeMap> keyTags; + KeyRangeMap keyInfo; std::map uid_applyMutationsData; bool firstProxy; double lastCoalesceTime; @@ -198,7 +198,7 @@ struct ProxyCommitData { Database cx; EventMetricHandle singleKeyMutationEvent; - std::map tagCache; + std::map> storageCache; ProxyCommitData(UID dbgid, MasterInterface master, RequestStream getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream commit, Reference> db, bool firstProxy) : dbgid(dbgid), stats(dbgid, &version, &committedVersion), master(master), @@ -427,7 +427,7 @@ ACTOR Future commitBatch( for (int resolver = 0; resolver < resolution.size(); resolver++) committed = committed && resolution[resolver].stateMutations[versionIndex][transactionIndex].committed; if (committed) - applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyTags, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->tagCache ); + applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache ); if( resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() && firstStateMutations ) { ASSERT(committed); @@ -489,7 +489,7 @@ ACTOR Future commitBatch( { if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware)) { commitCount++; - applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyTags, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->tagCache); + applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache); } if(firstStateMutations) { ASSERT(committed[t] == ConflictBatch::TransactionCommitted); @@ -536,10 +536,10 @@ ACTOR Future commitBatch( // FIXME: Make this process not disgustingly CPU intensive if (isSingleKeyMutation((MutationRef::Type) m.type)) { - auto& tags = self->keyTags[m.param1]; + auto& tags = self->keyInfo[m.param1].tags; if(self->singleKeyMutationEvent->enabled) { - KeyRangeRef shard = self->keyTags.rangeContaining(m.param1).range(); + KeyRangeRef shard = self->keyInfo.rangeContaining(m.param1).range(); self->singleKeyMutationEvent->tag1 = (int64_t)tags[0]; self->singleKeyMutationEvent->tag2 = (int64_t)tags[1]; self->singleKeyMutationEvent->tag3 = (int64_t)tags[2]; @@ -555,21 +555,21 @@ ACTOR Future commitBatch( toCommit.addTypedMessage(m); } else if (m.type == MutationRef::ClearRange) { - auto ranges = self->keyTags.intersectingRanges(KeyRangeRef(m.param1, m.param2)); + auto ranges = self->keyInfo.intersectingRanges(KeyRangeRef(m.param1, m.param2)); auto firstRange = ranges.begin(); ++firstRange; if (firstRange == ranges.end()) { // Fast path if (debugMutation("ProxyCommit", commitVersion, m)) - TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value())).detail("Mutation", m.toString()).detail("Version", commitVersion); - for (auto& tag : ranges.begin().value()) + TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion); + for (auto& tag : ranges.begin().value().tags) toCommit.addTag(tag); } else { TEST(true); //A clear range extends past a shard boundary std::set allSources; for (auto r : ranges) - allSources.insert(r.value().begin(), r.value().end()); + allSources.insert(r.value().tags.begin(), r.value().tags.end()); if (debugMutation("ProxyCommit", commitVersion, m)) TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion); for (auto& tag : allSources) @@ -681,7 +681,7 @@ ACTOR Future commitBatch( backupMutation.param1 = wr.toStringRef(); ASSERT( backupMutation.param1.startsWith(logRangeMutation.first) ); // We are writing into the configured destination - auto& tags = self->keyTags[backupMutation.param1]; + auto& tags = self->keyInfo[backupMutation.param1].tags; for (auto& tag : tags) toCommit.addTag(tag); toCommit.addTypedMessage(backupMutation); @@ -705,7 +705,7 @@ ACTOR Future commitBatch( TEST(true); // Semi-committed pipeline limited by MVCC window //TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion); choose{ - when(Void _ = wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) { + when(Void _ = wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) { Void _ = wait(yield()); break; } @@ -1002,41 +1002,59 @@ ACTOR static Future readRequestServer( TraceEvent("ProxyReadyForReads", proxy.id()); - loop choose{ - when(GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture())) { - Standalone> keyServersBegin = commitData->txnStateStore->readRange(KeyRangeRef(allKeys.begin, req.range.begin.withPrefix(keyServersPrefix)), -1).get(); - Standalone> keyServersEnd = commitData->txnStateStore->readRange(KeyRangeRef(req.range.end.withPrefix(keyServersPrefix), allKeys.end), 2).get(); - Standalone> keyServersShardBoundaries = commitData->txnStateStore->readRange(KeyRangeRef(keyServersBegin[0].key, keyServersEnd[1].key), req.limit).get(); - - GetKeyServerLocationsReply rep; - rep.results.reserve(keyServersShardBoundaries.size() - 1); - - int startOffset = req.limit < 0 ? 1 : 0; - int endOffset = 1 - startOffset; - - for (int i = 0; i < keyServersShardBoundaries.size() - 1; i++) { - vector src, dest; - decodeKeyServersValue(keyServersShardBoundaries[i+startOffset].value, src, dest); - vector ssis; - ssis.reserve(src.size()); - for (auto const& id : src) { - ssis.push_back(decodeServerListValue(commitData->txnStateStore->readValue(serverListKeyFor(id)).get().get())); + loop { + choose{ + when(GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture())) { + GetKeyServerLocationsReply rep; + if(!req.end.present()) { + auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin) : commitData->keyInfo.rangeContaining(req.begin); + vector ssis; + ssis.reserve(r.value().info.size()); + for(auto& it : r.value().info) { + ssis.push_back(it->interf); + } + rep.results.push_back(std::make_pair(r.range(), ssis)); + } else if(!req.reverse) { + int count = 0; + for(auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) { + vector ssis; + ssis.reserve(r.value().info.size()); + for(auto& it : r.value().info) { + ssis.push_back(it->interf); + } + rep.results.push_back(std::make_pair(r.range(), ssis)); + count++; + } + } else { + int count = 0; + auto r = commitData->keyInfo.rangeContainingKeyBefore(req.end.get()); + while( count < req.limit && req.begin < r.end() ) { + vector ssis; + ssis.reserve(r.value().info.size()); + for(auto& it : r.value().info) { + ssis.push_back(it->interf); + } + rep.results.push_back(std::make_pair(r.range(), ssis)); + if(r == commitData->keyInfo.ranges().begin()) { + break; + } + count++; + --r; + } } - - rep.results.push_back(std::make_pair(KeyRangeRef(keyServersShardBoundaries[i+startOffset].key.removePrefix(keyServersPrefix), keyServersShardBoundaries[i+endOffset].key.removePrefix(keyServersPrefix)), ssis)); - } - - req.reply.send(rep); - } - when(GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture())) { - if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) { - GetStorageServerRejoinInfoReply rep; - rep.version = commitData->version; - rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() ); req.reply.send(rep); - } else - req.reply.sendError(worker_removed()); + } + when(GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture())) { + if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) { + GetStorageServerRejoinInfoReply rep; + rep.version = commitData->version; + rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() ); + req.reply.send(rep); + } else + req.reply.sendError(worker_removed()); + } } + Void _ = wait(yield()); } } @@ -1142,37 +1160,46 @@ ACTOR Future masterProxyServerCore( ((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end ); Standalone> mutations; - std::vector>,int>> keyTagData; + std::vector,int>> keyInfoData; vector src, dest; - std::set tags; - Tag tag; + Reference storageInfo; + ServerCacheInfo info; for(auto &kv : data) { if( kv.key.startsWith(keyServersPrefix) ) { KeyRef k = kv.key.removePrefix(keyServersPrefix); if(k != allKeys.end) { decodeKeyServersValue(kv.value, src, dest); - tags.clear(); - for(auto id : src) { - auto tagItr = commitData.tagCache.find(id); - if(tagItr == commitData.tagCache.end()) { - tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - commitData.tagCache[id] = tag; + info.tags.clear(); + info.info.clear(); + for(auto& id : src) { + auto cacheItr = commitData.storageCache.find(id); + if(cacheItr == commitData.storageCache.end()) { + storageInfo = Reference( new StorageInfo() ); + storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); + storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); + commitData.storageCache[id] = storageInfo; } else { - tag = tagItr->second; + storageInfo = cacheItr->second; } - tags.insert( tag ); + ASSERT(storageInfo->tag != invalidTag); + info.tags.push_back( storageInfo->tag ); + info.info.push_back( storageInfo ); } - for(auto id : dest) { - auto tagItr = commitData.tagCache.find(id); - if(tagItr == commitData.tagCache.end()) { - tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); - commitData.tagCache[id] = tag; + for(auto& id : dest) { + auto cacheItr = commitData.storageCache.find(id); + if(cacheItr == commitData.storageCache.end()) { + storageInfo = Reference( new StorageInfo() ); + storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() ); + storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() ); + commitData.storageCache[id] = storageInfo; } else { - tag = tagItr->second; + storageInfo = cacheItr->second; } - tags.insert( tag ); + ASSERT(storageInfo->tag != invalidTag); + info.tags.push_back( storageInfo->tag ); } - keyTagData.push_back( std::make_pair(MapPair>(k, std::vector(tags.begin(), tags.end())), 1) ); + uniquify(info.tags); + keyInfoData.push_back( std::make_pair(MapPair(k, info), 1) ); } } else { mutations.push_back(mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value)); @@ -1180,11 +1207,11 @@ ACTOR Future masterProxyServerCore( } //insert keyTag data separately from metadata mutations so that we can do one bulk insert which avoids a lot of map lookups. - commitData.keyTags.rawInsert(keyTagData); + commitData.keyInfo.rawInsert(keyInfoData); Arena arena; bool confChanges; - applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference(), 0, &commitData.vecBackupKeys, &commitData.keyTags, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.tagCache, true); + applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference(), 0, &commitData.vecBackupKeys, &commitData.keyInfo, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.storageCache, true); } auto lockedKey = commitData.txnStateStore->readValue(databaseLockedKey).get(); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index b3b93aa8d9..deb71cda1c 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -782,8 +782,8 @@ void seedShardServers( tr.read_conflict_ranges.push_back_deep( arena, allKeys ); for(int s=0; s -void uniquify( Collection& c ) { - std::sort(c.begin(), c.end()); - c.resize( std::unique(c.begin(), c.end()) - c.begin() ); -} - ACTOR static Future reportTLogCommitErrors( Future commitReply, UID debugID ) { try { Void _ = wait(commitReply); diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index 3938213ffa..9586d592d9 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -318,7 +318,7 @@ struct ConsistencyCheckWorkload : TestWorkload state vector>> keyServerLocationFutures; state KeyRange keyServerRange = keyServersKeys; for(int i = 0; i < proxyInfo->size(); i++) - keyServerLocationFutures.push_back(proxyInfo->get(i,&MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(keyServerRange, 1000, keyServerRange.arena()), 2, 0)); + keyServerLocationFutures.push_back(proxyInfo->get(i,&MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(keyServerRange.begin, keyServerRange.end, 1000, false, keyServerRange.arena()), 2, 0)); choose { when( Void _ = wait(waitForAll(keyServerLocationFutures)) ) {