Refactor getKeyServers to be more readable.
Fix possible memory corruption by returning KeyRange instead of KeyRangeRef in getKeyServers. Simplify getMasterProxies on DatabaseContext class.
This commit is contained in:
parent
6218934c7b
commit
e2030db5a8
|
@ -69,7 +69,6 @@ public:
|
|||
|
||||
Reference<ProxyInfo> getMasterProxies();
|
||||
Future<Reference<ProxyInfo>> getMasterProxiesFuture();
|
||||
Future<Reference<ProxyInfo>> getMasterProxiesOrNever();
|
||||
Future<Void> onMasterProxiesChanged();
|
||||
|
||||
// Update the watch counter for the database
|
||||
|
|
|
@ -889,25 +889,14 @@ Reference<ProxyInfo> DatabaseContext::getMasterProxies() {
|
|||
return masterProxies;
|
||||
}
|
||||
|
||||
//Gets the master proxies if available. If the ProxyInfo Reference is NULL, then return Never
|
||||
Future<Reference<ProxyInfo>> DatabaseContext::getMasterProxiesOrNever() {
|
||||
Reference<ProxyInfo> info = getMasterProxies();
|
||||
if (!info)
|
||||
return Never();
|
||||
else
|
||||
return info;
|
||||
}
|
||||
|
||||
//Actor which will wait until the ProxyInfo returned by the DatabaseContext cx is not NULL
|
||||
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx) {
|
||||
state Reference<ProxyInfo> proxyInfo;
|
||||
loop {
|
||||
choose {
|
||||
when(Void _ = wait(cx->onMasterProxiesChanged())) { }
|
||||
when(Reference<ProxyInfo> info = wait(cx->getMasterProxiesOrNever())) {
|
||||
return info;
|
||||
}
|
||||
}
|
||||
state Reference<ProxyInfo> proxies;
|
||||
loop{
|
||||
proxies = cx->getMasterProxies();
|
||||
if (proxies)
|
||||
return proxies;
|
||||
Void _ = wait( cx->onMasterProxiesChanged() );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -252,11 +252,11 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
}
|
||||
|
||||
//Get a list of key servers; verify that the TLogs and master all agree about who the key servers are
|
||||
state Promise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> keyServerPromise;
|
||||
state Promise<vector<pair<KeyRange, vector<StorageServerInterface>>>> keyServerPromise;
|
||||
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise));
|
||||
if(keyServerResult)
|
||||
{
|
||||
state vector<pair<KeyRangeRef, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
|
||||
state vector<pair<KeyRange, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
|
||||
|
||||
//Get the locations of all the shards in the database
|
||||
state Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise;
|
||||
|
@ -306,76 +306,71 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
//Get a list of storage servers from the master and compares them with the TLogs.
|
||||
//If this is a quiescent check, then each master proxy needs to respond, otherwise only one needs to respond.
|
||||
//Returns false if there is a failure (in this case, keyServersPromise will never be set)
|
||||
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<vector<pair<KeyRangeRef, vector<StorageServerInterface>>>> keyServersPromise)
|
||||
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<vector<pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise)
|
||||
{
|
||||
state vector<pair<KeyRangeRef, vector<StorageServerInterface>>> keyServers;
|
||||
state vector<pair<KeyRange, vector<StorageServerInterface>>> keyServers;
|
||||
|
||||
loop
|
||||
{
|
||||
state bool successful = true;
|
||||
state bool masterProxiesChanged = false;
|
||||
state Reference<ProxyInfo> proxyInfo = wait(cx->getMasterProxiesFuture());
|
||||
//Try getting key server locations from the master proxies
|
||||
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
|
||||
state Key begin = keyServersKeys.begin;
|
||||
state Key end = keyServersKeys.end;
|
||||
state int limitKeyServers = BUGGIFY ? 1 : 100;
|
||||
|
||||
//Try getting key server locations from the master proxies
|
||||
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
|
||||
state KeyRange keyServerRange = keyServersKeys;
|
||||
while (begin < end) {
|
||||
state Reference<ProxyInfo> proxyInfo = cx->getMasterProxies();
|
||||
while (!proxyInfo) {
|
||||
Reference<ProxyInfo> _ = wait( cx->getMasterProxiesFuture() );
|
||||
proxyInfo = cx->getMasterProxies();
|
||||
}
|
||||
|
||||
while (keyServerRange.begin.size()) {
|
||||
keyServerLocationFutures.clear();
|
||||
for (int i = 0; i < proxyInfo->size(); i++)
|
||||
keyServerLocationFutures.push_back(proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(keyServerRange.begin, keyServerRange.end, 100, false, keyServerRange.arena()), 2, 0));
|
||||
keyServerLocationFutures.clear();
|
||||
for (int i = 0; i < proxyInfo->size(); i++)
|
||||
keyServerLocationFutures.push_back(proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(begin, end, limitKeyServers, false, Arena()), 2, 0));
|
||||
|
||||
state bool keyServersInsertedForThisIteration = false;
|
||||
choose {
|
||||
when(Void _ = wait(waitForAll(keyServerLocationFutures))) {
|
||||
//Read the key server location results
|
||||
for (int i = 0; i < keyServerLocationFutures.size(); i++)
|
||||
state bool keyServersInsertedForThisIteration = false;
|
||||
choose {
|
||||
when(Void _ = wait(waitForAll(keyServerLocationFutures))) {
|
||||
//Read the key server location results
|
||||
for (int i = 0; i < keyServerLocationFutures.size(); i++)
|
||||
{
|
||||
ErrorOr<GetKeyServerLocationsReply> shards = keyServerLocationFutures[i].get();
|
||||
|
||||
//If performing quiescent check, then all master proxies should be reachable. Otherwise, only one needs to be reachable
|
||||
if (self->performQuiescentChecks && !shards.present())
|
||||
{
|
||||
ErrorOr<GetKeyServerLocationsReply> shards = keyServerLocationFutures[i].get();
|
||||
TraceEvent("ConsistencyCheck_MasterProxyUnavailable").detail("MasterProxyID", proxyInfo->getId(i));
|
||||
self->testFailure("Master proxy unavailable");
|
||||
return false;
|
||||
}
|
||||
|
||||
//If performing quiescent check, then all master proxies should be reachable. Otherwise, only one needs to be reachable
|
||||
if (self->performQuiescentChecks && !shards.present())
|
||||
{
|
||||
TraceEvent("ConsistencyCheck_MasterProxyUnavailable").detail("MasterProxyID", proxyInfo->getId(i));
|
||||
self->testFailure("Master proxy unavailable");
|
||||
return false;
|
||||
}
|
||||
//Get the list of shards if one was returned. If not doing a quiescent check, we can break if it is.
|
||||
//If we are doing a quiescent check, then we only need to do this for the first shard.
|
||||
if (shards.present() && !keyServersInsertedForThisIteration)
|
||||
{
|
||||
keyServers.insert(keyServers.end(), shards.get().results.begin(), shards.get().results.end());
|
||||
keyServersInsertedForThisIteration = true;
|
||||
begin = shards.get().results.back().first.end;
|
||||
|
||||
//Get the list of shards if one was returned. If not doing a quiescent check, we can break if it is.
|
||||
//If we are doing a quiescent check, then we only need to do this for the first shard.
|
||||
if (shards.present() && !keyServersInsertedForThisIteration)
|
||||
{
|
||||
keyServers.insert(keyServers.end(), shards.get().results.begin(), shards.get().results.end());
|
||||
keyServersInsertedForThisIteration = true;
|
||||
keyServerRange = shards.get().results[shards.get().results.size() - 1].first.end < keyServersKeys.end ?
|
||||
KeyRangeRef(shards.get().results[shards.get().results.size() - 1].first.end, keyServersKeys.end)
|
||||
: KeyRangeRef();
|
||||
|
||||
if (!self->performQuiescentChecks)
|
||||
break;
|
||||
}
|
||||
} // End of For
|
||||
}
|
||||
when(Void _ = wait(cx->onMasterProxiesChanged())) { masterProxiesChanged = true; }
|
||||
} // End of choose
|
||||
|
||||
if (masterProxiesChanged || !keyServersInsertedForThisIteration) { // Retry the entire workflow
|
||||
successful = false;
|
||||
break;
|
||||
if (!self->performQuiescentChecks)
|
||||
break;
|
||||
}
|
||||
} // End of For
|
||||
}
|
||||
when(Void _ = wait(cx->onMasterProxiesChanged())) { }
|
||||
} // End of choose
|
||||
|
||||
if (!keyServersInsertedForThisIteration) // Retry the entire workflow
|
||||
Void _ = wait(delay(1.0));
|
||||
|
||||
} // End of while
|
||||
|
||||
} // End of while
|
||||
if (successful)
|
||||
break;
|
||||
Void _ = wait(delay(1.0));
|
||||
}
|
||||
keyServersPromise.send(keyServers);
|
||||
return true;
|
||||
}
|
||||
|
||||
//Retrieves the locations of all shards in the database
|
||||
//Returns false if there is a failure (in this case, keyLocationPromise will never be set)
|
||||
ACTOR Future<bool> getKeyLocations(Database cx, vector<pair<KeyRangeRef, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
|
||||
ACTOR Future<bool> getKeyLocations(Database cx, vector<pair<KeyRange, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
|
||||
{
|
||||
state Standalone<VectorRef<KeyValueRef>> keyLocations;
|
||||
state Key beginKey = allKeys.begin;
|
||||
|
|
Loading…
Reference in New Issue