Merge pull request #4434 from sfc-gh-clin/cache-correctness

Add consistencycheck for cache in consistencycheck workload
This commit is contained in:
sfc-gh-ngoyal 2021-03-11 12:00:18 -08:00 committed by GitHub
commit 800c37bbb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 511 additions and 12 deletions

View File

@ -762,7 +762,7 @@ ACTOR Future<Void> changeConfiguration(Database cx, std::vector< TesterInterface
}
//Runs the consistency check workload, which verifies that the database is in a consistent state
ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface > testers, bool doQuiescentCheck,
ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface > testers, bool doQuiescentCheck, bool doCacheCheck,
double quiescentWaitTimeout, double softTimeLimit, double databasePingDelay, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state TestSpec spec;
@ -775,14 +775,19 @@ ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface >
Standalone<VectorRef<KeyValueRef>> options;
StringRef performQuiescent = LiteralStringRef("false");
StringRef performCacheCheck = LiteralStringRef("false");
if (doQuiescentCheck) {
performQuiescent = LiteralStringRef("true");
}
if (doCacheCheck) {
performCacheCheck = LiteralStringRef("true");
}
spec.title = LiteralStringRef("ConsistencyCheck");
spec.databasePingDelay = databasePingDelay;
spec.timeout = 32000;
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("testName"), LiteralStringRef("ConsistencyCheck")));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("performQuiescentChecks"), performQuiescent));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("performCacheCheck"), performCacheCheck));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("quiescentWaitTimeout"), ValueRef(options.arena(), format("%f", quiescentWaitTimeout))));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("distributed"), LiteralStringRef("false")));
spec.options.push_back_deep(spec.options.arena(), options);
@ -846,7 +851,7 @@ ACTOR Future<bool> runTest( Database cx, std::vector< TesterInterface > testers,
if(spec.runConsistencyCheck) {
try {
bool quiescent = g_network->isSimulated() ? !BUGGIFY : spec.waitForQuiescenceEnd;
wait(timeoutError(checkConsistency(cx, testers, quiescent, 10000.0, 18000, spec.databasePingDelay, dbInfo), 20000.0));
wait(timeoutError(checkConsistency(cx, testers, quiescent, spec.runConsistencyCheckOnCache, 10000.0, 18000, spec.databasePingDelay, dbInfo), 20000.0));
}
catch(Error& e) {
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to perform consistency check");
@ -968,6 +973,10 @@ std::map<std::string, std::function<void(const std::string& value, TestSpec* spe
spec->runConsistencyCheck = ( value == "true" );
TraceEvent("TestParserTest").detail("ParsedRunConsistencyCheck", spec->runConsistencyCheck);
}},
{ "runConsistencyCheckOnCache", [](const std::string& value, TestSpec* spec) {
spec->runConsistencyCheckOnCache = ( value == "true" );
TraceEvent("TestParserTest").detail("ParsedRunConsistencyCheckOnCache", spec->runConsistencyCheckOnCache);
}},
{ "waitForQuiescence", [](const std::string& value, TestSpec* spec) {
bool toWait = value == "true";
spec->waitForQuiescenceBegin = toWait;

View File

@ -19,6 +19,7 @@
*/
#include <math.h>
#include "boost/lexical_cast.hpp"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
@ -33,6 +34,7 @@
#include "fdbserver/QuietDatabase.h"
#include "flow/DeterministicRandom.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/network.h"
@ -44,6 +46,9 @@ struct ConsistencyCheckWorkload : TestWorkload
//Whether or not we should perform checks that will only pass if the database is in a quiescent state
bool performQuiescentChecks;
// Whether or not perform consistency check between storage cache servers and storage servers
bool performCacheCheck;
//How long to wait for the database to go quiet before failing (if doing quiescent checks)
double quiescentWaitTimeout;
@ -89,6 +94,7 @@ struct ConsistencyCheckWorkload : TestWorkload
: TestWorkload(wcx)
{
performQuiescentChecks = getOption(options, LiteralStringRef("performQuiescentChecks"), false);
performCacheCheck = getOption(options, LiteralStringRef("performCacheCheck"), false);
quiescentWaitTimeout = getOption(options, LiteralStringRef("quiescentWaitTimeout"), 600.0);
distributed = getOption(options, LiteralStringRef("distributed"), true);
shardSampleFactor = std::max(getOption(options, LiteralStringRef("shardSampleFactor"), 1), 1);
@ -302,7 +308,7 @@ struct ConsistencyCheckWorkload : TestWorkload
//Get a list of key servers; verify that the TLogs and master all agree about who the key servers are
state Promise<std::vector<std::pair<KeyRange, std::vector<StorageServerInterface>>>> keyServerPromise;
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise));
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise, keyServersKeys));
if(keyServerResult)
{
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
@ -316,6 +322,10 @@ struct ConsistencyCheckWorkload : TestWorkload
//Check that each shard has the same data on all storage servers that it resides on
wait(::success(self->checkDataConsistency(cx, keyLocations, configuration, self)));
// Cache consistency check
if (self->performCacheCheck)
wait(::success(self->checkCacheConsistency(cx, keyLocations, self)));
}
}
}
@ -334,6 +344,482 @@ struct ConsistencyCheckWorkload : TestWorkload
return Void();
}
// Check the data consistency between storage cache servers and storage servers
// keyLocations: all key/value pairs persisted in the database, reused from previous consistency check on all
// storage servers
ACTOR Future<bool> checkCacheConsistency(Database cx, VectorRef<KeyValueRef> keyLocations,
ConsistencyCheckWorkload* self) {
state Promise<Standalone<VectorRef<KeyValueRef>>> cacheKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> cacheServerKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> serverListKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> serverTagKeyPromise;
state Standalone<VectorRef<KeyValueRef>> cacheKey; // "\xff/storageCache/[[begin]]" := "[[vector<uint16_t>]]"
state Standalone<VectorRef<KeyValueRef>>
cacheServer; // "\xff/storageCacheServer/[[UID]] := StorageServerInterface"
state Standalone<VectorRef<KeyValueRef>>
serverList; // "\xff/serverList/[[serverID]]" := "[[StorageServerInterface]]"
state Standalone<VectorRef<KeyValueRef>> serverTag; // "\xff/serverTag/[[serverID]]" = "[[Tag]]"
std::vector<Future<bool>> cacheResultsPromise;
cacheResultsPromise.push_back(self->fetchKeyValuesFromSS(cx, self, storageCacheKeys, cacheKeyPromise, true));
cacheResultsPromise.push_back(
self->fetchKeyValuesFromSS(cx, self, storageCacheServerKeys, cacheServerKeyPromise, false));
cacheResultsPromise.push_back(
self->fetchKeyValuesFromSS(cx, self, serverListKeys, serverListKeyPromise, false));
cacheResultsPromise.push_back(self->fetchKeyValuesFromSS(cx, self, serverTagKeys, serverTagKeyPromise, false));
std::vector<bool> cacheResults = wait(getAll(cacheResultsPromise));
if (std::all_of(cacheResults.begin(), cacheResults.end(), [](bool success) { return success; })) {
cacheKey = cacheKeyPromise.getFuture().get();
cacheServer = cacheServerKeyPromise.getFuture().get();
serverList = serverListKeyPromise.getFuture().get();
serverTag = serverTagKeyPromise.getFuture().get();
} else {
TraceEvent(SevDebug, "CheckCacheConsistencyFailed")
.detail("CacheKey", boost::lexical_cast<std::string>(cacheResults[0]))
.detail("CacheServerKey", boost::lexical_cast<std::string>(cacheResults[1]))
.detail("ServerListKey", boost::lexical_cast<std::string>(cacheResults[2]))
.detail("ServerTagKey", boost::lexical_cast<std::string>(cacheResults[3]));
return false;
}
state int rateLimitForThisRound =
self->bytesReadInPreviousRound == 0
? self->rateLimitMax
: std::min(
self->rateLimitMax,
static_cast<int>(ceil(self->bytesReadInPreviousRound /
(float)CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME)));
ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= self->rateLimitMax);
TraceEvent("CacheConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound);
state Reference<IRateControl> rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
state double rateLimiterStartTime = now();
state int bytesReadInRange = 0;
// Get all cache storage servers' interfaces
// Note: currently, all storage cache servers cache the same data
// Thus, no need to differentiate them for now
state std::vector<StorageServerInterface> cacheServerInterfaces;
for (const auto& kv : cacheServer) {
StorageServerInterface cacheServer = decodeServerListValue(kv.value);
// Uniqueness
ASSERT(std::find(cacheServerInterfaces.begin(), cacheServerInterfaces.end(), cacheServer) ==
cacheServerInterfaces.end());
cacheServerInterfaces.push_back(cacheServer);
}
TraceEvent(SevDebug, "CheckCacheConsistencyCacheServers")
.detail("CacheSSInterfaces", describe(cacheServerInterfaces));
// Construct a key range map where the value for each range,
// if the range is cached, then a list of cache server interfaces plus one storage server interfaces(randomly
// pick) if not cached, empty
state KeyRangeMap<std::vector<StorageServerInterface>> cachedKeysLocationMap;
// First, for any range is cached, update the list to have all cache storage interfaces
for (int k = 0; k < cacheKey.size(); k++) {
std::vector<uint16_t> serverIndices;
decodeStorageCacheValue(cacheKey[k].value, serverIndices);
// non-empty means this is the start of a cached range
if (serverIndices.size()) {
KeyRangeRef range(cacheKey[k].key, (k < cacheKey.size() - 1) ? cacheKey[k + 1].key : allKeys.end);
cachedKeysLocationMap.insert(range, cacheServerInterfaces);
TraceEvent(SevDebug, "CheckCacheConsistency")
.detail("CachedRange", range.toString())
.detail("Index", k);
}
}
// Second, insert corresponding storage servers into the list
// Here we need to construct a UID2SS map
state std::map<UID, StorageServerInterface> UIDtoSSMap;
for (const auto& kv : serverList) {
UID serverId = decodeServerListKey(kv.key);
UIDtoSSMap[serverId] = decodeServerListValue(kv.value);
TraceEvent(SevDebug, "CheckCacheConsistencyStorageServer").detail("UID", serverId);
}
// Now, for each shard, check if it is cached,
// if cached, add storage servers that persist the data into the list
for (int k = 0; k < keyLocations.size() - 1; k++) {
KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key);
std::vector<UID> sourceStorageServers;
std::vector<UID> destStorageServers;
decodeKeyServersValue(RangeResultRef(serverTag, false), keyLocations[k].value, sourceStorageServers,
destStorageServers, false);
bool isRelocating = destStorageServers.size() > 0;
std::vector<UID> storageServers = (isRelocating) ? destStorageServers : sourceStorageServers;
std::vector<StorageServerInterface> storageServerInterfaces;
for (const auto& UID : storageServers) {
storageServerInterfaces.push_back(UIDtoSSMap[UID]);
}
std::vector<StorageServerInterface> allSS(cacheServerInterfaces);
allSS.insert(allSS.end(), storageServerInterfaces.begin(), storageServerInterfaces.end());
// The shard may overlap with several cached ranges
// only insert the storage server interface on cached ranges
// Since range.end is next range.begin, and both begins are Key(), so we always hold this condition
// Note: both ends are allKeys.end
auto begin_iter = cachedKeysLocationMap.rangeContaining(range.begin);
ASSERT(begin_iter->begin() == range.begin);
// Split the range to maintain the condition
auto end_iter = cachedKeysLocationMap.rangeContaining(range.end);
if (end_iter->begin() != range.end) {
cachedKeysLocationMap.insert(KeyRangeRef(end_iter->begin(), range.end), end_iter->value());
}
for (auto iter = cachedKeysLocationMap.rangeContaining(range.begin);
iter != cachedKeysLocationMap.rangeContaining(range.end); ++iter) {
if (iter->value().size()) {
// randomly pick one for check since the data are guaranteed to be consistent on any SS
iter->value().push_back(deterministicRandom()->randomChoice(storageServerInterfaces));
}
}
}
// Once having the KeyRangeMap, iterate all cached ranges and verify consistency
state RangeMap<Key, std::vector<StorageServerInterface>, KeyRangeRef>::Ranges iter_ranges =
cachedKeysLocationMap.containedRanges(allKeys);
state RangeMap<Key, std::vector<StorageServerInterface>, KeyRangeRef>::iterator iter = iter_ranges.begin();
state std::vector<StorageServerInterface> iter_ss;
state int effectiveClientCount = (self->distributed) ? self->clientCount : 1;
state int increment = self->distributed ? effectiveClientCount * self->shardSampleFactor : 1;
state int shard = 0; // index used for spliting work on different clients
// move the index to the first responsible cached range
while (shard < self->clientId * self->shardSampleFactor && iter != iter_ranges.end()) {
if (iter->value().empty()) {
++iter;
continue;
}
++iter;
++shard;
}
for (; iter != iter_ranges.end(); ++iter) {
iter_ss = iter->value();
if (iter_ss.empty()) continue;
if (shard % increment != (self->clientId * self->shardSampleFactor) % increment) {
++shard;
continue;
}
// TODO: in the future, run a check based on estimated data size like the existing storage servers'
// consistency check on the first client
state Key lastSampleKey;
state Key lastStartSampleKey;
state int64_t totalReadAmount = 0;
state KeySelector begin = firstGreaterOrEqual(iter->begin());
state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior
// Read a limited number of entries at a time, repeating until all keys in the shard have been read
loop {
try {
lastSampleKey = lastStartSampleKey;
// Get the min version of the storage servers
Version version = wait(self->getVersion(cx, self));
state GetKeyValuesRequest req;
req.begin = begin;
req.end = firstGreaterOrEqual(iter->end());
req.limit = 1e4;
req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.version = version;
req.tags = TagSet();
// Try getting the entries in the specified range
state vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
state int j = 0;
for (j = 0; j < iter_ss.size(); j++) {
resetReply(req);
keyValueFutures.push_back(iter_ss[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
wait(waitForAll(keyValueFutures));
TraceEvent(SevDebug, "CheckCacheConsistencyComparison")
.detail("Begin", req.begin.toString())
.detail("End", req.end.toString())
.detail("SSInterfaces", describe(iter_ss));
// Read the resulting entries
state int firstValidServer = -1;
totalReadAmount = 0;
for (j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();
// if (rangeResult.isError()) {
// throw rangeResult.getError();
// }
// Compare the results with other storage servers
if (rangeResult.present() && !rangeResult.get().error.present()) {
state GetKeyValuesReply current = rangeResult.get();
totalReadAmount += current.data.expectedSize();
TraceEvent(SevDebug, "CheckCacheConsistencyResult")
.detail("SSInterface", iter_ss[j].uniqueID);
// If we haven't encountered a valid storage server yet, then mark this as the baseline
// to compare against
if (firstValidServer == -1) firstValidServer = j;
// Compare this shard against the first
else {
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
if (current.data != reference.data || current.more != reference.more) {
// Be especially verbose if in simulation
if (g_network->isSimulated()) {
int invalidIndex = -1;
printf("\nSERVER %d (%s); shard = %s - %s:\n", j,
iter_ss[j].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < current.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(current.data[k].key).c_str(),
printable(current.data[k].value).c_str());
if (invalidIndex < 0 && (k >= reference.data.size() ||
current.data[k].key != reference.data[k].key ||
current.data[k].value != reference.data[k].value))
invalidIndex = k;
}
printf("\nSERVER %d (%s); shard = %s - %s:\n", firstValidServer,
iter_ss[firstValidServer].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < reference.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(reference.data[k].key).c_str(),
printable(reference.data[k].value).c_str());
if (invalidIndex < 0 && (k >= current.data.size() ||
reference.data[k].key != current.data[k].key ||
reference.data[k].value != current.data[k].value))
invalidIndex = k;
}
printf("\nMISMATCH AT %d\n\n", invalidIndex);
}
// Data for trace event
// The number of keys unique to the current shard
int currentUniques = 0;
// The number of keys unique to the reference shard
int referenceUniques = 0;
// The number of keys in both shards with conflicting values
int valueMismatches = 0;
// The number of keys in both shards with matching values
int matchingKVPairs = 0;
// Last unique key on the current shard
KeyRef currentUniqueKey;
// Last unique key on the reference shard
KeyRef referenceUniqueKey;
// Last value mismatch
KeyRef valueMismatchKey;
// Loop indeces
int currentI = 0;
int referenceI = 0;
while (currentI < current.data.size() || referenceI < reference.data.size()) {
if (currentI >= current.data.size()) {
referenceUniqueKey = reference.data[referenceI].key;
referenceUniques++;
referenceI++;
} else if (referenceI >= reference.data.size()) {
currentUniqueKey = current.data[currentI].key;
currentUniques++;
currentI++;
} else {
KeyValueRef currentKV = current.data[currentI];
KeyValueRef referenceKV = reference.data[referenceI];
if (currentKV.key == referenceKV.key) {
if (currentKV.value == referenceKV.value)
matchingKVPairs++;
else {
valueMismatchKey = currentKV.key;
valueMismatches++;
}
currentI++;
referenceI++;
} else if (currentKV.key < referenceKV.key) {
currentUniqueKey = currentKV.key;
currentUniques++;
currentI++;
} else {
referenceUniqueKey = referenceKV.key;
referenceUniques++;
referenceI++;
}
}
}
TraceEvent("CacheConsistencyCheck_DataInconsistent")
.detail(format("StorageServer%d", j).c_str(), iter_ss[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
iter_ss[firstValidServer].toString())
.detail("ShardBegin", printable(req.begin.getKey()))
.detail("ShardEnd", printable(req.end.getKey()))
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), printable(currentUniqueKey))
.detail(format("Server%dUniques", firstValidServer).c_str(), referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(),
printable(referenceUniqueKey))
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", printable(valueMismatchKey))
.detail("MatchingKVPairs", matchingKVPairs);
self->testFailure("Data inconsistent", true);
return false;
}
}
}
}
// after requesting each shard, enforce rate limit based on how much data will likely be read
if (rateLimitForThisRound > 0) {
wait(rateLimiter->getAllowance(totalReadAmount));
// Set ratelimit to max allowed if current round has been going on for a while
if (now() - rateLimiterStartTime >
1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME &&
rateLimitForThisRound != self->rateLimitMax) {
rateLimitForThisRound = self->rateLimitMax;
rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
rateLimiterStartTime = now();
TraceEvent(SevInfo, "CacheConsistencyCheck_RateLimitSetMaxForThisRound")
.detail("RateLimit", rateLimitForThisRound);
}
}
bytesReadInRange += totalReadAmount;
// Advance to the next set of entries
if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) {
VectorRef<KeyValueRef> result = keyValueFutures[firstValidServer].get().get().data;
ASSERT(result.size() > 0);
begin = firstGreaterThan(result[result.size() - 1].key);
ASSERT(begin.getKey() != allKeys.end);
lastStartSampleKey = lastSampleKey;
TraceEvent(SevDebug, "CacheConsistencyCheckNextBeginKey").detail("Key", begin.toString());
} else
break;
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("CacheConsistencyCheck_RetryDataConsistency").error(err);
}
}
if (bytesReadInRange > 0) {
TraceEvent("CacheConsistencyCheck_ReadRange")
.suppressFor(1.0)
.detail("Range", printable(iter->range()))
.detail("BytesRead", bytesReadInRange);
}
}
return true;
}
// Directly fetch key/values from storage servers through GetKeyValuesRequest
// In particular, avoid transaction-based read which may read data from storage cache servers
// range: all key/values in the range will be fetched
// removePrefix: if true, remove the prefix of the range, e.g. \xff/storageCacheServer/
ACTOR Future<bool> fetchKeyValuesFromSS(Database cx, ConsistencyCheckWorkload* self, KeyRangeRef range,
Promise<Standalone<VectorRef<KeyValueRef>>> resultPromise,
bool removePrefix) {
// get shards paired with corresponding storage servers
state Promise<std::vector<std::pair<KeyRange, std::vector<StorageServerInterface>>>> keyServerPromise;
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise, range));
if (!keyServerResult) return false;
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> shards =
keyServerPromise.getFuture().get();
// key/value pairs in the given range
state Standalone<VectorRef<KeyValueRef>> result;
state Key beginKey = allKeys.begin.withPrefix(range.begin);
state Key endKey = allKeys.end.withPrefix(range.begin);
state int i; // index
state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior
// If the responses are too big, we may use multiple requests to get the key locations. Each request begins
// where the last left off
for (i = 0; i < shards.size(); i++) {
while (beginKey < std::min<KeyRef>(shards[i].first.end, endKey)) {
try {
Version version = wait(self->getVersion(cx, self));
GetKeyValuesRequest req;
req.begin = firstGreaterOrEqual(beginKey);
req.end = firstGreaterOrEqual(std::min<KeyRef>(shards[i].first.end, endKey));
req.limit = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT;
req.limitBytes = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES;
req.version = version;
req.tags = TagSet();
// Fetch key/values from storage servers
// Here we read from all storage servers and make sure results are consistent
// Note: this maybe duplicate but to make sure all storage servers available in a quiescent database
state vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
for (const auto& kv : shards[i].second) {
resetReply(req);
keyValueFutures.push_back(kv.getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
wait(waitForAll(keyValueFutures));
int firstValidStorageServer = -1;
// Read the shard location results
for (int j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> reply = keyValueFutures[j].get();
if (!reply.present() || reply.get().error.present()) {
// If the storage server didn't reply in a quiescent database, then the check fails
if (self->performQuiescentChecks) {
TraceEvent("CacheConsistencyCheck_KeyServerUnavailable")
.detail("StorageServer", shards[i].second[j].id().toString().c_str());
self->testFailure("Key server unavailable");
return false;
}
// If no storage servers replied, then throw all_alternatives_failed to force a retry
else if (firstValidStorageServer < 0 && j == keyValueFutures.size() - 1)
throw all_alternatives_failed();
}
// If this is the first storage server, store the locations to send back to the caller
else if (firstValidStorageServer < 0) {
firstValidStorageServer = j;
// Otherwise, compare the data to the results from the first storage server. If they are
// different, then the check fails
} else if (reply.get().data != keyValueFutures[firstValidStorageServer].get().get().data ||
reply.get().more != keyValueFutures[firstValidStorageServer].get().get().more) {
TraceEvent("CacheConsistencyCheck_InconsistentKeyServers")
.detail("StorageServer1", shards[i].second[firstValidStorageServer].id())
.detail("StorageServer2", shards[i].second[j].id());
self->testFailure("Key servers inconsistent", true);
return false;
}
}
auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get();
for (const auto& kv : keyValueResponse.data) {
result.push_back_deep(
result.arena(),
KeyValueRef(removePrefix ? kv.key.removePrefix(range.begin) : kv.key, kv.value));
}
// Next iteration should pick up where we left off
ASSERT(result.size() >= 1);
if (!keyValueResponse.more) {
beginKey = shards[i].first.end;
} else {
beginKey = keyAfter(keyValueResponse.data.end()[-1].key);
}
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("CacheConsistencyCheck_RetryGetKeyLocations").error(err);
}
}
}
resultPromise.send(result);
return true;
}
//Gets a version at which to read from the storage servers
ACTOR Future<Version> getVersion(Database cx, ConsistencyCheckWorkload *self)
{
@ -353,17 +839,18 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}
// Get a list of storage servers from the master and compares them with the TLogs.
// If this is a quiescent check, then each commit proxy needs to respond, otherwise only one needs to respond.
// Returns false if there is a failure (in this case, keyServersPromise will never be set)
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<std::vector<std::pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise)
{
// Get a list of storage servers(persisting keys within range "kr") from the master and compares them with the
// TLogs. If this is a quiescent check, then each commit proxy needs to respond, otherwise only one needs to
// respond. Returns false if there is a failure (in this case, keyServersPromise will never be set)
ACTOR Future<bool> getKeyServers(
Database cx, ConsistencyCheckWorkload* self,
Promise<std::vector<std::pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise, KeyRangeRef kr) {
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> keyServers;
//Try getting key server locations from the master proxies
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
state Key begin = keyServersKeys.begin;
state Key end = keyServersKeys.end;
state Key begin = kr.begin;
state Key end = kr.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);

View File

@ -158,6 +158,7 @@ public:
timeout = g_network->isSimulated() ? 15000 : 1500;
databasePingDelay = g_network->isSimulated() ? 0.0 : 15.0;
runConsistencyCheck = g_network->isSimulated();
runConsistencyCheckOnCache = false;
waitForQuiescenceBegin = true;
waitForQuiescenceEnd = true;
simCheckRelocationDuration = false;
@ -169,8 +170,9 @@ public:
double databasePingDelay = -1.0)
: title(title), dumpAfterTest(dump), clearAfterTest(clear), startDelay(startDelay), useDB(useDB), timeout(600),
databasePingDelay(databasePingDelay), runConsistencyCheck(g_network->isSimulated()),
waitForQuiescenceBegin(true), waitForQuiescenceEnd(true), simCheckRelocationDuration(false),
simConnectionFailuresDisableDuration(0), simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents),
runConsistencyCheckOnCache(false), waitForQuiescenceBegin(true), waitForQuiescenceEnd(true),
simCheckRelocationDuration(false), simConnectionFailuresDisableDuration(0),
simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents),
simDrAgents(ISimulator::BackupAgentType::NoBackupAgents) {
phases = TestWorkload::SETUP | TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS;
if( databasePingDelay < 0 )
@ -187,6 +189,7 @@ public:
int timeout;
double databasePingDelay;
bool runConsistencyCheck;
bool runConsistencyCheckOnCache;
bool waitForQuiescenceBegin;
bool waitForQuiescenceEnd;