Merge pull request from bnamasivayam/getKeyServers-refactor

Having 1000 as the limit for Limit for GetKeyServerLocationsRequest s…
This commit is contained in:
Evan Tschannen 2018-02-22 12:39:48 -08:00 committed by GitHub
commit 719bb5bd0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 58 deletions

View File

@ -69,7 +69,6 @@ public:
Reference<ProxyInfo> getMasterProxies();
Future<Reference<ProxyInfo>> getMasterProxiesFuture();
Future<Reference<ProxyInfo>> getMasterProxiesOrNever();
Future<Void> onMasterProxiesChanged();
// Update the watch counter for the database

View File

@ -889,25 +889,13 @@ Reference<ProxyInfo> DatabaseContext::getMasterProxies() {
return masterProxies;
}
//Gets the master proxies if available. If the ProxyInfo Reference is NULL, then return Never
Future<Reference<ProxyInfo>> DatabaseContext::getMasterProxiesOrNever() {
Reference<ProxyInfo> info = getMasterProxies();
if (!info)
return Never();
else
return info;
}
//Actor which will wait until the ProxyInfo returned by the DatabaseContext cx is not NULL
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx) {
state Reference<ProxyInfo> proxyInfo;
loop {
choose {
when(Void _ = wait(cx->onMasterProxiesChanged())) { }
when(Reference<ProxyInfo> info = wait(cx->getMasterProxiesOrNever())) {
return info;
}
}
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx) {
loop{
Reference<ProxyInfo> proxies = cx->getMasterProxies();
if (proxies)
return proxies;
Void _ = wait( cx->onMasterProxiesChanged() );
}
}

View File

@ -252,11 +252,11 @@ struct ConsistencyCheckWorkload : TestWorkload
}
//Get a list of key servers; verify that the TLogs and master all agree about who the key servers are
state Promise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> keyServerPromise;
state Promise<vector<pair<KeyRange, vector<StorageServerInterface>>>> keyServerPromise;
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise));
if(keyServerResult)
{
state vector<pair<KeyRangeRef, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
state vector<pair<KeyRange, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
//Get the locations of all the shards in the database
state Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise;
@ -306,31 +306,32 @@ struct ConsistencyCheckWorkload : TestWorkload
//Get a list of storage servers from the master and compares them with the TLogs.
//If this is a quiescent check, then each master proxy needs to respond, otherwise only one needs to respond.
//Returns false if there is a failure (in this case, keyServersPromise will never be set)
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> keyServersPromise)
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<vector<pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise)
{
state vector<pair<KeyRangeRef, vector<StorageServerInterface>>> keyServers;
state vector<pair<KeyRange, vector<StorageServerInterface>>> keyServers;
loop
{
//Try getting key server locations from the master proxies
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
state Key begin = keyServersKeys.begin;
state Key end = keyServersKeys.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
while (begin < end) {
state Reference<ProxyInfo> proxyInfo = wait(cx->getMasterProxiesFuture());
keyServerLocationFutures.clear();
for (int i = 0; i < proxyInfo->size(); i++)
keyServerLocationFutures.push_back(proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(begin, end, limitKeyServers, false, Arena()), 2, 0));
//Try getting key server locations from the master proxies
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
state KeyRange keyServerRange = keyServersKeys;
for(int i = 0; i < proxyInfo->size(); i++)
keyServerLocationFutures.push_back(proxyInfo->get(i,&MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(keyServerRange.begin, keyServerRange.end, 1000, false, keyServerRange.arena()), 2, 0));
state bool keyServersInsertedForThisIteration = false;
choose {
when( Void _ = wait(waitForAll(keyServerLocationFutures)) ) {
when(Void _ = wait(waitForAll(keyServerLocationFutures))) {
//Read the key server location results
state bool successful = true;
for(int i = 0; i < keyServerLocationFutures.size(); i++)
for (int i = 0; i < keyServerLocationFutures.size(); i++)
{
ErrorOr<GetKeyServerLocationsReply> shards = keyServerLocationFutures[i].get();
//If performing quiescent check, then all master proxies should be reachable. Otherwise, only one needs to be reachable
if(self->performQuiescentChecks && !shards.present())
if (self->performQuiescentChecks && !shards.present())
{
TraceEvent("ConsistencyCheck_MasterProxyUnavailable").detail("MasterProxyID", proxyInfo->getId(i));
self->testFailure("Master proxy unavailable");
@ -339,31 +340,24 @@ struct ConsistencyCheckWorkload : TestWorkload
//Get the list of shards if one was returned. If not doing a quiescent check, we can break if it is.
//If we are doing a quiescent check, then we only need to do this for the first shard.
if(shards.present() && (i == 0 || !self->performQuiescentChecks))
if (shards.present() && !keyServersInsertedForThisIteration)
{
keyServers = shards.get().results;
if(!self->performQuiescentChecks)
keyServers.insert(keyServers.end(), shards.get().results.begin(), shards.get().results.end());
keyServersInsertedForThisIteration = true;
begin = shards.get().results.back().first.end;
if (!self->performQuiescentChecks)
break;
}
} // End of For
}
when(Void _ = wait(cx->onMasterProxiesChanged())) { }
} // End of choose
if (!keyServersInsertedForThisIteration) // Retry the entire workflow
Void _ = wait(delay(1.0));
//If none of the master proxies responded, then we will have to try again
else if(i == keyServerLocationFutures.size() - 1 && !self->performQuiescentChecks)
{
TraceEvent("ConsistencyCheck_NoMasterProxiesAvailable");
//Retry (continues outer loop)
successful = false;
}
}
//If master proxy check and tlog check were successful
if(successful)
break;
Void _ = wait(delay(1.0));
} when( Void _ = wait(cx->onMasterProxiesChanged()) ) {}
}
}
} // End of while
keyServersPromise.send(keyServers);
return true;
@ -371,7 +365,7 @@ struct ConsistencyCheckWorkload : TestWorkload
//Retrieves the locations of all shards in the database
//Returns false if there is a failure (in this case, keyLocationPromise will never be set)
ACTOR Future<bool> getKeyLocations(Database cx, vector<pair<KeyRangeRef, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
ACTOR Future<bool> getKeyLocations(Database cx, vector<pair<KeyRange, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
{
state Standalone<VectorRef<KeyValueRef>> keyLocations;
state Key beginKey = allKeys.begin;