optimize GetKeyLocationRequests on the proxy so they only require a single map lookup, instead of doing 3 + (3* [number of ranges]) lookups

This commit is contained in:
Evan Tschannen 2017-12-15 20:13:44 -08:00
parent f2221cd16e
commit 1dc9eceb6d
9 changed files with 217 additions and 148 deletions

View File

@ -37,6 +37,12 @@ typedef int64_t Generation;
struct KeyRangeRef;
struct KeyValueRef;
template <class Collection>
void uniquify( Collection& c ) {
std::sort(c.begin(), c.end());
c.resize( std::unique(c.begin(), c.end()) - c.begin() );
}
static std::string describe( const Tag item ) {
return format("%d", item);
}
@ -95,6 +101,12 @@ std::string printable( const VectorRef<StringRef>& val );
std::string printable( const VectorRef<KeyValueRef>& val );
std::string printable( const KeyValueRef& val );
inline bool equalsKeyAfter( const KeyRef& key, const KeyRef& compareKey ) {
if( key.size()+1 != compareKey.size() || compareKey[compareKey.size()-1] != 0 )
return false;
return compareKey.startsWith( key );
}
struct KeyRangeRef {
const KeyRef begin, end;
KeyRangeRef() {}
@ -110,6 +122,7 @@ struct KeyRangeRef {
bool contains( const KeyRangeRef& keys ) const { return begin <= keys.begin && keys.end <= end; }
bool intersects( const KeyRangeRef& keys ) const { return begin < keys.end && keys.begin < end; }
bool empty() const { return begin == end; }
bool singleKeyRange() const { return equalsKeyAfter(begin, end); }
Standalone<KeyRangeRef> withPrefix( const StringRef& prefix ) const {
return KeyRangeRef( begin.withPrefix(prefix), end.withPrefix(prefix) );
@ -222,11 +235,6 @@ inline KeyRef keyAfter( const KeyRef& key, Arena& arena ) {
t[key.size()] = 0;
return KeyRef(t,key.size()+1);
}
inline bool equalsKeyAfter( const KeyRef& key, const KeyRef& compareKey ) {
if( key.size()+1 != compareKey.size() || compareKey[compareKey.size()-1] != 0 )
return false;
return compareKey.startsWith( key );
}
inline KeyRange singleKeyRange( const KeyRef& a ) {
return KeyRangeRef(a, keyAfter(a));
}

View File

@ -137,26 +137,29 @@ struct GetReadVersionRequest {
};
struct GetKeyServerLocationsReply {
Arena arena;
vector<pair<KeyRangeRef, vector<StorageServerInterface>>> results;
template <class Ar>
void serialize(Ar& ar) {
ar & results;
ar & results & arena;
}
};
struct GetKeyServerLocationsRequest {
Arena arena;
KeyRangeRef range;
KeyRef begin;
Optional<KeyRef> end;
int limit;
bool reverse;
ReplyPromise<GetKeyServerLocationsReply> reply;
GetKeyServerLocationsRequest() : limit(0) {}
GetKeyServerLocationsRequest( KeyRangeRef const& range, int limit, Arena const& arena ) : range( range ), limit( limit ), arena( arena ) {}
GetKeyServerLocationsRequest() : limit(0), reverse(false) {}
GetKeyServerLocationsRequest( KeyRef const& begin, Optional<KeyRef> const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {}
template <class Ar>
void serialize(Ar& ar) {
ar & range & limit & reply & arena;
ar & begin & end & limit & reverse & reply & arena;
}
};

View File

@ -1028,53 +1028,40 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
state Key submitKey = isBackward ? key : keyAfter(key);
loop {
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(KeyRangeRef(submitKey,submitKey), 1000, submitKey.arena()), TaskDefaultPromiseEndpoint ) ) ) {
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 1000, isBackward, key.arena()), TaskDefaultPromiseEndpoint ) ) ) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
ASSERT( rep.results.size() );
ASSERT( rep.results.size() == 1 );
Reference<LocationInfo> cachedLocation;
KeyRangeRef range;
for (pair<KeyRangeRef, vector<StorageServerInterface>> shard : rep.results) {
auto locationInfo = cx->setCachedLocation(shard.first, shard.second);
if (isBackward ? (shard.first.begin < key && shard.first.end >= key) : shard.first.contains(key)) {
range = shard.first;
cachedLocation = locationInfo;
}
}
ASSERT(isBackward ? (range.begin < key && range.end >= key) : range.contains(key));
return make_pair(range, cachedLocation);
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
return std::make_pair(KeyRange(rep.results[0].first, rep.arena), locationInfo);
}
}
}
}
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
state Arena arena = keys.arena();
state KeyRef newBegin = keyAfter(keys.begin, arena);
state KeyRangeRef requestRange = KeyRangeRef(newBegin, keys.end);
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
loop {
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(requestRange, (reverse?-1:1)*(limit+1), arena), TaskDefaultPromiseEndpoint ) ) ) {
when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskDefaultPromiseEndpoint ) ) ) {
state GetKeyServerLocationsReply rep = _rep;
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After");
ASSERT( rep.results.size() );
vector< pair<KeyRange,Reference<LocationInfo>> > results;
for (pair<KeyRangeRef, vector<StorageServerInterface>> shard : rep.results) {
results.push_back( make_pair(shard.first & keys, cx->setCachedLocation(shard.first, shard.second)) );
state vector< pair<KeyRange,Reference<LocationInfo>> > results;
state int shard = 0;
for (; shard < rep.results.size(); shard++) {
//FIXME: these shards are being inserted into the map sequentially, it would be much more CPU efficient to save the map pairs and insert them all at once.
results.push_back( make_pair(rep.results[shard].first & keys, cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second)) );
Void _ = wait(yield());
}
return results;

View File

@ -82,6 +82,17 @@ struct StorageServerInterface {
}
};
struct StorageInfo : NonCopyable, public ReferenceCounted<StorageInfo> {
Tag tag;
StorageServerInterface interf;
StorageInfo() : tag(invalidTag) {}
};
struct ServerCacheInfo {
std::vector<Tag> tags;
std::vector<Reference<StorageInfo>> info;
};
struct GetValueReply : public LoadBalancedReply {
Optional<Value> value;

View File

@ -43,53 +43,54 @@ struct applyMutationsData {
};
static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference<ILogSystem> logSystem = Reference<ILogSystem>(), Version popVersion = 0,
KeyRangeMap<std::set<Key> >* vecBackupKeys = NULL, KeyRangeMap<std::vector<Tag>>* keyTags = NULL, std::map<Key, applyMutationsData>* uid_applyMutationsData = NULL,
RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>(), Database cx = Database(), NotifiedVersion* commitVersion = NULL, std::map<UID, Tag> *tagCache = NULL, bool initialCommit = false) {
KeyRangeMap<std::set<Key> >* vecBackupKeys = NULL, KeyRangeMap<ServerCacheInfo>* keyInfo = NULL, std::map<Key, applyMutationsData>* uid_applyMutationsData = NULL,
RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>(), Database cx = Database(), NotifiedVersion* commitVersion = NULL, std::map<UID, Reference<StorageInfo>>* storageCache = NULL, bool initialCommit = false ) {
for (auto const& m : mutations) {
//TraceEvent("MetadataMutation", dbgid).detail("M", m.toString());
if (m.param1.size() && m.param1[0] == systemKeys.begin[0] && m.type == MutationRef::SetValue) {
if(m.param1.startsWith(keyServersPrefix)) {
if(keyTags) {
if(keyInfo) {
KeyRef k = m.param1.removePrefix(keyServersPrefix);
if(k != allKeys.end) {
KeyRef end = keyTags->rangeContaining(k).end();
KeyRef end = keyInfo->rangeContaining(k).end();
KeyRangeRef insertRange(k,end);
vector<UID> src, dest;
decodeKeyServersValue(m.param2, src, dest);
std::set<Tag> tags;
if(!tagCache) {
for(auto id : src)
tags.insert(decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get()));
for(auto id : dest)
tags.insert(decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get()));
}
else {
Tag tag;
for(auto id : src) {
auto tagItr = tagCache->find(id);
if(tagItr == tagCache->end()) {
tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
(*tagCache)[id] = tag;
} else {
tag = tagItr->second;
}
tags.insert( tag );
}
for(auto id : dest) {
auto tagItr = tagCache->find(id);
if(tagItr == tagCache->end()) {
tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
(*tagCache)[id] = tag;
} else {
tag = tagItr->second;
}
tags.insert( tag );
ASSERT(storageCache);
Reference<StorageInfo> storageInfo;
ServerCacheInfo info;
for(auto id : src) {
auto cacheItr = storageCache->find(id);
if(cacheItr == storageCache->end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
(*storageCache)[id] = storageInfo;
} else {
storageInfo = cacheItr->second;
}
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
info.info.push_back( storageInfo );
}
keyTags->insert(insertRange,std::vector<Tag>(tags.begin(), tags.end()));
for(auto id : dest) {
auto cacheItr = storageCache->find(id);
if(cacheItr == storageCache->end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
(*storageCache)[id] = storageInfo;
} else {
storageInfo = cacheItr->second;
}
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
}
uniquify(info.tags);
keyInfo->insert(insertRange,info);
}
}
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
@ -119,8 +120,19 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
if(!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
if(tagCache) {
(*tagCache)[id] = tag;
if(storageCache) {
auto cacheItr = storageCache->find(id);
if(cacheItr == storageCache->end()) {
Reference<StorageInfo> storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = tag;
Optional<Key> interfKey = txnStateStore->readValue( serverListKeyFor(id) ).get();
if(interfKey.present()) {
storageInfo->interf = decodeServerListValue( interfKey.get() );
}
(*storageCache)[id] = storageInfo;
} else {
cacheItr->second->tag = tag;
}
}
}
}
@ -134,7 +146,28 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
}
else if (m.param1.startsWith(serverListPrefix) || m.param1 == databaseLockedKey || m.param1.startsWith(applyMutationsBeginRange.begin) ||
else if (m.param1.startsWith(serverListPrefix)) {
if(!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
if(storageCache) {
UID id = decodeServerListKey(m.param1);
StorageServerInterface interf = decodeServerListValue(m.param2);
auto cacheItr = storageCache->find(id);
if(cacheItr == storageCache->end()) {
Reference<StorageInfo> storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->interf = interf;
Optional<Key> tagKey = txnStateStore->readValue( serverTagKeyFor(id) ).get();
if(tagKey.present()) {
storageInfo->tag = decodeServerTagValue( tagKey.get() );
}
(*storageCache)[id] = storageInfo;
} else {
cacheItr->second->interf = interf;
}
}
}
} else if( m.param1 == databaseLockedKey || m.param1.startsWith(applyMutationsBeginRange.begin) ||
m.param1.startsWith(applyMutationsAddPrefixRange.begin) || m.param1.startsWith(applyMutationsRemovePrefixRange.begin)) {
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
}
@ -218,9 +251,9 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
if (keyServersKeys.intersects(range)) {
KeyRangeRef r = range & keyServersKeys;
if(keyTags) {
if(keyInfo) {
KeyRangeRef clearRange(r.begin.removePrefix(keyServersPrefix), r.end.removePrefix(keyServersPrefix));
keyTags->insert(clearRange, clearRange.begin == StringRef() ? vector<Tag>() : keyTags->rangeContainingKeyBefore(clearRange.begin).value());
keyInfo->insert(clearRange, clearRange.begin == StringRef() ? ServerCacheInfo() : keyInfo->rangeContainingKeyBefore(clearRange.begin).value());
}
if(!initialCommit) txnStateStore->clear(r);
@ -254,7 +287,13 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
}
}
if(!initialCommit) txnStateStore->clear( range & serverTagKeys );
if(!initialCommit) {
KeyRangeRef clearRange = range & serverTagKeys;
txnStateStore->clear(clearRange);
if(storageCache && clearRange.singleKeyRange()) {
storageCache->erase(decodeServerTagKey(clearRange.begin));
}
}
}
if (range.contains(coordinatorsKey)) {
if(!initialCommit) txnStateStore->clear(singleKeyRange(coordinatorsKey));

View File

@ -181,7 +181,7 @@ struct ProxyCommitData {
uint64_t commitVersionRequestNumber;
uint64_t mostRecentProcessedRequestNumber;
KeyRangeMap<Deque<std::pair<Version,int>>> keyResolvers;
KeyRangeMap<std::vector<Tag>> keyTags;
KeyRangeMap<ServerCacheInfo> keyInfo;
std::map<Key, applyMutationsData> uid_applyMutationsData;
bool firstProxy;
double lastCoalesceTime;
@ -198,7 +198,7 @@ struct ProxyCommitData {
Database cx;
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Tag> tagCache;
std::map<UID, Reference<StorageInfo>> storageCache;
ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit, Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion), master(master),
@ -427,7 +427,7 @@ ACTOR Future<Void> commitBatch(
for (int resolver = 0; resolver < resolution.size(); resolver++)
committed = committed && resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
if (committed)
applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyTags, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->tagCache );
applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache );
if( resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() && firstStateMutations ) {
ASSERT(committed);
@ -489,7 +489,7 @@ ACTOR Future<Void> commitBatch(
{
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware)) {
commitCount++;
applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyTags, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->tagCache);
applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache);
}
if(firstStateMutations) {
ASSERT(committed[t] == ConflictBatch::TransactionCommitted);
@ -536,10 +536,10 @@ ACTOR Future<Void> commitBatch(
// FIXME: Make this process not disgustingly CPU intensive
if (isSingleKeyMutation((MutationRef::Type) m.type)) {
auto& tags = self->keyTags[m.param1];
auto& tags = self->keyInfo[m.param1].tags;
if(self->singleKeyMutationEvent->enabled) {
KeyRangeRef shard = self->keyTags.rangeContaining(m.param1).range();
KeyRangeRef shard = self->keyInfo.rangeContaining(m.param1).range();
self->singleKeyMutationEvent->tag1 = (int64_t)tags[0];
self->singleKeyMutationEvent->tag2 = (int64_t)tags[1];
self->singleKeyMutationEvent->tag3 = (int64_t)tags[2];
@ -555,21 +555,21 @@ ACTOR Future<Void> commitBatch(
toCommit.addTypedMessage(m);
}
else if (m.type == MutationRef::ClearRange) {
auto ranges = self->keyTags.intersectingRanges(KeyRangeRef(m.param1, m.param2));
auto ranges = self->keyInfo.intersectingRanges(KeyRangeRef(m.param1, m.param2));
auto firstRange = ranges.begin();
++firstRange;
if (firstRange == ranges.end()) {
// Fast path
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value())).detail("Mutation", m.toString()).detail("Version", commitVersion);
for (auto& tag : ranges.begin().value())
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
for (auto& tag : ranges.begin().value().tags)
toCommit.addTag(tag);
}
else {
TEST(true); //A clear range extends past a shard boundary
std::set<Tag> allSources;
for (auto r : ranges)
allSources.insert(r.value().begin(), r.value().end());
allSources.insert(r.value().tags.begin(), r.value().tags.end());
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion);
for (auto& tag : allSources)
@ -681,7 +681,7 @@ ACTOR Future<Void> commitBatch(
backupMutation.param1 = wr.toStringRef();
ASSERT( backupMutation.param1.startsWith(logRangeMutation.first) ); // We are writing into the configured destination
auto& tags = self->keyTags[backupMutation.param1];
auto& tags = self->keyInfo[backupMutation.param1].tags;
for (auto& tag : tags)
toCommit.addTag(tag);
toCommit.addTypedMessage(backupMutation);
@ -705,7 +705,7 @@ ACTOR Future<Void> commitBatch(
TEST(true); // Semi-committed pipeline limited by MVCC window
//TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
choose{
when(Void _ = wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
when(Void _ = wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
Void _ = wait(yield());
break;
}
@ -1002,41 +1002,59 @@ ACTOR static Future<Void> readRequestServer(
TraceEvent("ProxyReadyForReads", proxy.id());
loop choose{
when(GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture())) {
Standalone<VectorRef<KeyValueRef>> keyServersBegin = commitData->txnStateStore->readRange(KeyRangeRef(allKeys.begin, req.range.begin.withPrefix(keyServersPrefix)), -1).get();
Standalone<VectorRef<KeyValueRef>> keyServersEnd = commitData->txnStateStore->readRange(KeyRangeRef(req.range.end.withPrefix(keyServersPrefix), allKeys.end), 2).get();
Standalone<VectorRef<KeyValueRef>> keyServersShardBoundaries = commitData->txnStateStore->readRange(KeyRangeRef(keyServersBegin[0].key, keyServersEnd[1].key), req.limit).get();
GetKeyServerLocationsReply rep;
rep.results.reserve(keyServersShardBoundaries.size() - 1);
int startOffset = req.limit < 0 ? 1 : 0;
int endOffset = 1 - startOffset;
for (int i = 0; i < keyServersShardBoundaries.size() - 1; i++) {
vector<UID> src, dest;
decodeKeyServersValue(keyServersShardBoundaries[i+startOffset].value, src, dest);
vector<StorageServerInterface> ssis;
ssis.reserve(src.size());
for (auto const& id : src) {
ssis.push_back(decodeServerListValue(commitData->txnStateStore->readValue(serverListKeyFor(id)).get().get()));
loop {
choose{
when(GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture())) {
GetKeyServerLocationsReply rep;
if(!req.end.present()) {
auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin) : commitData->keyInfo.rangeContaining(req.begin);
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().info.size());
for(auto& it : r.value().info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
} else if(!req.reverse) {
int count = 0;
for(auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) {
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().info.size());
for(auto& it : r.value().info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
count++;
}
} else {
int count = 0;
auto r = commitData->keyInfo.rangeContainingKeyBefore(req.end.get());
while( count < req.limit && req.begin < r.end() ) {
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().info.size());
for(auto& it : r.value().info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
if(r == commitData->keyInfo.ranges().begin()) {
break;
}
count++;
--r;
}
}
rep.results.push_back(std::make_pair(KeyRangeRef(keyServersShardBoundaries[i+startOffset].key.removePrefix(keyServersPrefix), keyServersShardBoundaries[i+endOffset].key.removePrefix(keyServersPrefix)), ssis));
}
req.reply.send(rep);
}
when(GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture())) {
if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) {
GetStorageServerRejoinInfoReply rep;
rep.version = commitData->version;
rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() );
req.reply.send(rep);
} else
req.reply.sendError(worker_removed());
}
when(GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture())) {
if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) {
GetStorageServerRejoinInfoReply rep;
rep.version = commitData->version;
rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() );
req.reply.send(rep);
} else
req.reply.sendError(worker_removed());
}
}
Void _ = wait(yield());
}
}
@ -1142,37 +1160,46 @@ ACTOR Future<Void> masterProxyServerCore(
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
Standalone<VectorRef<MutationRef>> mutations;
std::vector<std::pair<MapPair<Key,std::vector<Tag>>,int>> keyTagData;
std::vector<std::pair<MapPair<Key,ServerCacheInfo>,int>> keyInfoData;
vector<UID> src, dest;
std::set<Tag> tags;
Tag tag;
Reference<StorageInfo> storageInfo;
ServerCacheInfo info;
for(auto &kv : data) {
if( kv.key.startsWith(keyServersPrefix) ) {
KeyRef k = kv.key.removePrefix(keyServersPrefix);
if(k != allKeys.end) {
decodeKeyServersValue(kv.value, src, dest);
tags.clear();
for(auto id : src) {
auto tagItr = commitData.tagCache.find(id);
if(tagItr == commitData.tagCache.end()) {
tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
commitData.tagCache[id] = tag;
info.tags.clear();
info.info.clear();
for(auto& id : src) {
auto cacheItr = commitData.storageCache.find(id);
if(cacheItr == commitData.storageCache.end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
commitData.storageCache[id] = storageInfo;
} else {
tag = tagItr->second;
storageInfo = cacheItr->second;
}
tags.insert( tag );
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
info.info.push_back( storageInfo );
}
for(auto id : dest) {
auto tagItr = commitData.tagCache.find(id);
if(tagItr == commitData.tagCache.end()) {
tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
commitData.tagCache[id] = tag;
for(auto& id : dest) {
auto cacheItr = commitData.storageCache.find(id);
if(cacheItr == commitData.storageCache.end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
commitData.storageCache[id] = storageInfo;
} else {
tag = tagItr->second;
storageInfo = cacheItr->second;
}
tags.insert( tag );
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
}
keyTagData.push_back( std::make_pair(MapPair<Key,std::vector<Tag>>(k, std::vector<Tag>(tags.begin(), tags.end())), 1) );
uniquify(info.tags);
keyInfoData.push_back( std::make_pair(MapPair<Key,ServerCacheInfo>(k, info), 1) );
}
} else {
mutations.push_back(mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value));
@ -1180,11 +1207,11 @@ ACTOR Future<Void> masterProxyServerCore(
}
//insert keyTag data separately from metadata mutations so that we can do one bulk insert which avoids a lot of map lookups.
commitData.keyTags.rawInsert(keyTagData);
commitData.keyInfo.rawInsert(keyInfoData);
Arena arena;
bool confChanges;
applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference<ILogSystem>(), 0, &commitData.vecBackupKeys, &commitData.keyTags, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.tagCache, true);
applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference<ILogSystem>(), 0, &commitData.vecBackupKeys, &commitData.keyInfo, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.storageCache, true);
}
auto lockedKey = commitData.txnStateStore->readValue(databaseLockedKey).get();

View File

@ -782,8 +782,8 @@ void seedShardServers(
tr.read_conflict_ranges.push_back_deep( arena, allKeys );
for(int s=0; s<servers.size(); s++) {
tr.set(arena, serverListKeyFor(servers[s].id()), serverListValue(servers[s]));
tr.set(arena, serverTagKeyFor(servers[s].id()), serverTagValue(server_tag[servers[s].id()]));
tr.set(arena, serverListKeyFor(servers[s].id()), serverListValue(servers[s]));
}
tr.set(arena, serverTagMaxKey, serverTagMaxValue(servers.size()-1));

View File

@ -30,12 +30,6 @@
#include "fdbrpc/ReplicationUtils.h"
#include "RecoveryState.h"
template <class Collection>
void uniquify( Collection& c ) {
std::sort(c.begin(), c.end());
c.resize( std::unique(c.begin(), c.end()) - c.begin() );
}
ACTOR static Future<Void> reportTLogCommitErrors( Future<Void> commitReply, UID debugID ) {
try {
Void _ = wait(commitReply);

View File

@ -318,7 +318,7 @@ struct ConsistencyCheckWorkload : TestWorkload
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
state KeyRange keyServerRange = keyServersKeys;
for(int i = 0; i < proxyInfo->size(); i++)
keyServerLocationFutures.push_back(proxyInfo->get(i,&MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(keyServerRange, 1000, keyServerRange.arena()), 2, 0));
keyServerLocationFutures.push_back(proxyInfo->get(i,&MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(keyServerRange.begin, keyServerRange.end, 1000, false, keyServerRange.arena()), 2, 0));
choose {
when( Void _ = wait(waitForAll(keyServerLocationFutures)) ) {