Add consistencycheck for cache in consistencycheck workload

This commit is contained in:
Chaoguang Lin 2021-03-04 14:30:35 -08:00
parent eb205ef851
commit 425531060e
4 changed files with 494 additions and 10 deletions

View File

@ -758,7 +758,7 @@ ACTOR Future<Void> changeConfiguration(Database cx, std::vector< TesterInterface
}
//Runs the consistency check workload, which verifies that the database is in a consistent state
ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface > testers, bool doQuiescentCheck,
ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface > testers, bool doQuiescentCheck, bool doCacheCheck,
double quiescentWaitTimeout, double softTimeLimit, double databasePingDelay, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state TestSpec spec;
@ -771,14 +771,19 @@ ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface >
Standalone<VectorRef<KeyValueRef>> options;
StringRef performQuiescent = LiteralStringRef("false");
StringRef performCacheCheck = LiteralStringRef("false");
if (doQuiescentCheck) {
performQuiescent = LiteralStringRef("true");
}
if (doCacheCheck) {
performCacheCheck = LiteralStringRef("true");
}
spec.title = LiteralStringRef("ConsistencyCheck");
spec.databasePingDelay = databasePingDelay;
spec.timeout = 32000;
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("testName"), LiteralStringRef("ConsistencyCheck")));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("performQuiescentChecks"), performQuiescent));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("performCacheCheck"), performCacheCheck));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("quiescentWaitTimeout"), ValueRef(options.arena(), format("%f", quiescentWaitTimeout))));
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("distributed"), LiteralStringRef("false")));
spec.options.push_back_deep(spec.options.arena(), options);
@ -842,7 +847,7 @@ ACTOR Future<bool> runTest( Database cx, std::vector< TesterInterface > testers,
if(spec.runConsistencyCheck) {
try {
bool quiescent = g_network->isSimulated() ? !BUGGIFY : spec.waitForQuiescenceEnd;
wait(timeoutError(checkConsistency(cx, testers, quiescent, 10000.0, 18000, spec.databasePingDelay, dbInfo), 20000.0));
wait(timeoutError(checkConsistency(cx, testers, quiescent, spec.runConsistencyCheckOnCache, 10000.0, 18000, spec.databasePingDelay, dbInfo), 20000.0));
}
catch(Error& e) {
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to perform consistency check");
@ -964,6 +969,10 @@ std::map<std::string, std::function<void(const std::string& value, TestSpec* spe
spec->runConsistencyCheck = ( value == "true" );
TraceEvent("TestParserTest").detail("ParsedRunConsistencyCheck", spec->runConsistencyCheck);
}},
{ "runConsistencyCheckOnCache", [](const std::string& value, TestSpec* spec) {
spec->runConsistencyCheckOnCache = ( value == "true" );
TraceEvent("TestParserTest").detail("ParsedRunConsistencyCheckOnCache", spec->runConsistencyCheckOnCache);
}},
{ "waitForQuiescence", [](const std::string& value, TestSpec* spec) {
bool toWait = value == "true";
spec->waitForQuiescenceBegin = toWait;

View File

@ -20,6 +20,9 @@
#include <math.h>
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "fdbclient/NativeAPI.actor.h"
@ -44,6 +47,9 @@ struct ConsistencyCheckWorkload : TestWorkload
//Whether or not we should perform checks that will only pass if the database is in a quiescent state
bool performQuiescentChecks;
// Whether or not perform consistency check between storage cache servers and storage servers
bool performCacheCheck;
//How long to wait for the database to go quiet before failing (if doing quiescent checks)
double quiescentWaitTimeout;
@ -89,6 +95,7 @@ struct ConsistencyCheckWorkload : TestWorkload
: TestWorkload(wcx)
{
performQuiescentChecks = getOption(options, LiteralStringRef("performQuiescentChecks"), false);
performCacheCheck = getOption(options, LiteralStringRef("performCacheCheck"), false);
quiescentWaitTimeout = getOption(options, LiteralStringRef("quiescentWaitTimeout"), 600.0);
distributed = getOption(options, LiteralStringRef("distributed"), true);
shardSampleFactor = std::max(getOption(options, LiteralStringRef("shardSampleFactor"), 1), 1);
@ -302,7 +309,7 @@ struct ConsistencyCheckWorkload : TestWorkload
//Get a list of key servers; verify that the TLogs and master all agree about who the key servers are
state Promise<std::vector<std::pair<KeyRange, std::vector<StorageServerInterface>>>> keyServerPromise;
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise));
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise, keyServersKeys));
if(keyServerResult)
{
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> keyServers = keyServerPromise.getFuture().get();
@ -316,6 +323,10 @@ struct ConsistencyCheckWorkload : TestWorkload
//Check that each shard has the same data on all storage servers that it resides on
wait(::success(self->checkDataConsistency(cx, keyLocations, configuration, self)));
// Cache consistency check
if (self->performCacheCheck)
wait(::success(self->checkCacheConsistency(cx, keyLocations, self)));
}
}
}
@ -334,6 +345,465 @@ struct ConsistencyCheckWorkload : TestWorkload
return Void();
}
ACTOR Future<bool> checkCacheConsistency(Database cx, VectorRef<KeyValueRef> keyLocations,
ConsistencyCheckWorkload* self) {
state Promise<Standalone<VectorRef<KeyValueRef>>> cacheKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> cacheServerKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> serverListKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> serverTagKeyPromise;
state Standalone<VectorRef<KeyValueRef>> cacheKey;
state Standalone<VectorRef<KeyValueRef>> cacheServer;
state Standalone<VectorRef<KeyValueRef>> serverList;
state Standalone<VectorRef<KeyValueRef>> serverTag;
std::vector<Future<bool>> cacheResultsPromise;
cacheResultsPromise.push_back(self->fetchKeyValuesFromSS(cx, self, storageCacheKeys, cacheKeyPromise, true));
cacheResultsPromise.push_back(
self->fetchKeyValuesFromSS(cx, self, storageCacheServerKeys, cacheServerKeyPromise, false));
cacheResultsPromise.push_back(
self->fetchKeyValuesFromSS(cx, self, serverListKeys, serverListKeyPromise, false));
cacheResultsPromise.push_back(self->fetchKeyValuesFromSS(cx, self, serverTagKeys, serverTagKeyPromise, false));
std::vector<bool> cacheResults = wait(getAll(cacheResultsPromise));
if (std::all_of(cacheResults.begin(), cacheResults.end(), [](bool success) { return success; })) {
cacheKey = cacheKeyPromise.getFuture().get();
cacheServer = cacheServerKeyPromise.getFuture().get();
serverList = serverListKeyPromise.getFuture().get();
serverTag = serverTagKeyPromise.getFuture().get();
} else {
return false;
}
state int effectiveClientCount = (self->distributed) ? self->clientCount : 1;
state int increment =
(self->distributed && !self->firstClient) ? effectiveClientCount * self->shardSampleFactor : 1;
state int rateLimitForThisRound =
self->bytesReadInPreviousRound == 0
? self->rateLimitMax
: std::min(
self->rateLimitMax,
static_cast<int>(ceil(self->bytesReadInPreviousRound /
(float)CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME)));
ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= self->rateLimitMax);
TraceEvent("CacheConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound);
state Reference<IRateControl> rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
state double rateLimiterStartTime = now();
state int bytesReadInRange = 0;
// Get all SS interfaces
state std::vector<StorageServerInterface> cacheServerInterfaces;
for (const auto& kv : cacheServer) {
// TODO: check uniqueness?
StorageServerInterface cacheServer = decodeServerListValue(kv.value);
TraceEvent(SevDebug, "CheckCacheConsistency")
.detail("CacheServerAddress", cacheServer.address().toString())
.detail("Key", kv.key.toString())
.detail("Existing", std::find(cacheServerInterfaces.begin(), cacheServerInterfaces.end(),
cacheServer) != cacheServerInterfaces.end());
if (std::find(cacheServerInterfaces.begin(), cacheServerInterfaces.end(), cacheServer) ==
cacheServerInterfaces.end())
cacheServerInterfaces.push_back(cacheServer);
}
TraceEvent(SevDebug, "CheckCacheConsistencyCacheServers")
.detail("SSInterfaces", describe(cacheServerInterfaces));
// construct cache key location map
state KeyRangeMap<std::vector<StorageServerInterface>> cachedKeysLocationMap;
for (int k = 0; k < cacheKey.size(); k++) {
std::vector<uint16_t> serverIndices;
decodeStorageCacheValue(cacheKey[k].value, serverIndices);
if (serverIndices.size()) {
KeyRangeRef range(cacheKey[k].key, (k < cacheKey.size() - 1) ? cacheKey[k + 1].key : allKeys.end);
cachedKeysLocationMap.insert(range, cacheServerInterfaces);
TraceEvent(SevDebug, "CheckCacheConsistency")
.detail("CachedRange", range.toString())
.detail("Index", k);
}
}
// construct UID2SS amp
state std::map<UID, StorageServerInterface> UIDtoSSMap;
for (const auto& kv : serverList) {
UID serverId = decodeServerListKey(kv.key);
UIDtoSSMap[serverId] = decodeServerListValue(kv.value);
TraceEvent(SevDebug, "CheckCacheConsistencyStorageServer").detail("UID", serverId);
}
// add storage servers
for (int k = 0; k < keyLocations.size() - 1; k++) {
KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key);
std::vector<UID> sourceStorageServers;
std::vector<UID> destStorageServers;
decodeKeyServersValue(RangeResultRef(serverTag, false), keyLocations[k].value, sourceStorageServers,
destStorageServers, false);
bool isRelocating = destStorageServers.size() > 0;
std::vector<UID> storageServers = (isRelocating) ? destStorageServers : sourceStorageServers;
std::vector<StorageServerInterface> storageServerInterfaces;
for (const auto& UID : storageServers) {
// TODO: add traces
storageServerInterfaces.push_back(UIDtoSSMap[UID]);
}
std::vector<StorageServerInterface> allSS(cacheServerInterfaces);
allSS.insert(allSS.end(), storageServerInterfaces.begin(), storageServerInterfaces.end());
auto begin_iter = cachedKeysLocationMap.rangeContaining(range.begin);
if (begin_iter->begin() != range.begin) {
// insert range.begin
cachedKeysLocationMap.insert(KeyRangeRef(range.begin, begin_iter->end()), cacheServerInterfaces);
}
auto end_iter = cachedKeysLocationMap.rangeContaining(range.end);
if (end_iter->begin() != range.end) {
cachedKeysLocationMap.insert(KeyRangeRef(end_iter->begin(), range.end), cacheServerInterfaces);
}
// TODO : Add assertions
for (auto iter = cachedKeysLocationMap.rangeContaining(range.begin);
iter != cachedKeysLocationMap.rangeContaining(range.end); ++iter) {
if (iter->value().size()) {
iter->value() = allSS;
}
}
}
state RangeMap<Key, std::vector<StorageServerInterface>, KeyRangeRef>::Ranges iter_ranges =
cachedKeysLocationMap.containedRanges(allKeys);
state RangeMap<Key, std::vector<StorageServerInterface>, KeyRangeRef>::iterator iter = iter_ranges.begin();
state std::vector<StorageServerInterface> iter_ss;
state int shard = 0;
while (shard < self->clientId * (self->shardSampleFactor + 1) && iter != iter_ranges.end()) {
if (iter->value().empty()) {
++iter;
continue;
}
++iter;
++shard;
}
for (; iter != iter_ranges.end(); ++iter) {
iter_ss = iter->value();
if (iter_ss.empty()) continue;
if (shard % increment) {
++shard;
continue;
}
if (!self->firstClient || shard++ % (effectiveClientCount * self->shardSampleFactor) == 0) {
state Key lastSampleKey;
state Key lastStartSampleKey;
state int64_t totalReadAmount = 0;
state KeySelector begin = firstGreaterOrEqual(iter->begin());
state Transaction onErrorTr(
cx); // This transaction exists only to access onError and its backoff behavior
// Read a limited number of entries at a time, repeating until all keys in the shard have been read
loop {
try {
lastSampleKey = lastStartSampleKey;
// Get the min version of the storage servers
Version version = wait(self->getVersion(cx, self));
state GetKeyValuesRequest req;
req.begin = begin;
req.end = firstGreaterOrEqual(iter->end());
req.limit = 1e4;
req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.version = version;
req.tags = TagSet();
// Try getting the entries in the specified range
state vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
state int j = 0;
for (j = 0; j < iter_ss.size(); j++) {
resetReply(req);
keyValueFutures.push_back(iter_ss[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
wait(waitForAll(keyValueFutures));
TraceEvent(SevDebug, "CheckCacheConsistencyComparison")
.detail("Begin", req.begin.toString())
.detail("End", req.end.toString())
.detail("SSInterfaces", describe(iter_ss));
// Read the resulting entries
state int firstValidServer = -1;
totalReadAmount = 0;
for (j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();
// if (rangeResult.isError()) {
// throw rangeResult.getError();
// }
// Compare the results with other storage servers
if (rangeResult.present() && !rangeResult.get().error.present()) {
state GetKeyValuesReply current = rangeResult.get();
totalReadAmount += current.data.expectedSize();
TraceEvent(SevDebug, "CheckCacheConsistencyResult")
.detail("SSInterface", iter_ss[j].uniqueID);
// If we haven't encountered a valid storage server yet, then mark this as the baseline
// to compare against
if (firstValidServer == -1) firstValidServer = j;
// Compare this shard against the first
else {
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
if (current.data != reference.data || current.more != reference.more) {
// Be especially verbose if in simulation
if (g_network->isSimulated()) {
int invalidIndex = -1;
printf("\nSERVER %d (%s); shard = %s - %s:\n", j,
iter_ss[j].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < current.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(current.data[k].key).c_str(),
printable(current.data[k].value).c_str());
if (invalidIndex < 0 &&
(k >= reference.data.size() ||
current.data[k].key != reference.data[k].key ||
current.data[k].value != reference.data[k].value))
invalidIndex = k;
}
printf("\nSERVER %d (%s); shard = %s - %s:\n", firstValidServer,
iter_ss[firstValidServer].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < reference.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(reference.data[k].key).c_str(),
printable(reference.data[k].value).c_str());
if (invalidIndex < 0 &&
(k >= current.data.size() ||
reference.data[k].key != current.data[k].key ||
reference.data[k].value != current.data[k].value))
invalidIndex = k;
}
printf("\nMISMATCH AT %d\n\n", invalidIndex);
}
// Data for trace event
// The number of keys unique to the current shard
int currentUniques = 0;
// The number of keys unique to the reference shard
int referenceUniques = 0;
// The number of keys in both shards with conflicting values
int valueMismatches = 0;
// The number of keys in both shards with matching values
int matchingKVPairs = 0;
// Last unique key on the current shard
KeyRef currentUniqueKey;
// Last unique key on the reference shard
KeyRef referenceUniqueKey;
// Last value mismatch
KeyRef valueMismatchKey;
// Loop indeces
int currentI = 0;
int referenceI = 0;
while (currentI < current.data.size() || referenceI < reference.data.size()) {
if (currentI >= current.data.size()) {
referenceUniqueKey = reference.data[referenceI].key;
referenceUniques++;
referenceI++;
} else if (referenceI >= reference.data.size()) {
currentUniqueKey = current.data[currentI].key;
currentUniques++;
currentI++;
} else {
KeyValueRef currentKV = current.data[currentI];
KeyValueRef referenceKV = reference.data[referenceI];
if (currentKV.key == referenceKV.key) {
if (currentKV.value == referenceKV.value)
matchingKVPairs++;
else {
valueMismatchKey = currentKV.key;
valueMismatches++;
}
currentI++;
referenceI++;
} else if (currentKV.key < referenceKV.key) {
currentUniqueKey = currentKV.key;
currentUniques++;
currentI++;
} else {
referenceUniqueKey = referenceKV.key;
referenceUniques++;
referenceI++;
}
}
}
TraceEvent("CacheConsistencyCheck_DataInconsistent")
.detail(format("StorageServer%d", j).c_str(), iter_ss[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
iter_ss[firstValidServer].toString())
.detail("ShardBegin", printable(req.begin.getKey()))
.detail("ShardEnd", printable(req.end.getKey()))
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), printable(currentUniqueKey))
.detail(format("Server%dUniques", firstValidServer).c_str(),
referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(),
printable(referenceUniqueKey))
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", printable(valueMismatchKey))
.detail("MatchingKVPairs", matchingKVPairs);
self->testFailure("Data inconsistent", true);
return false;
}
}
}
}
// after requesting each shard, enforce rate limit based on how much data will likely be read
if (rateLimitForThisRound > 0) {
wait(rateLimiter->getAllowance(totalReadAmount));
// Set ratelimit to max allowed if current round has been going on for a while
if (now() - rateLimiterStartTime >
1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME &&
rateLimitForThisRound != self->rateLimitMax) {
rateLimitForThisRound = self->rateLimitMax;
rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
rateLimiterStartTime = now();
TraceEvent(SevInfo, "CacheConsistencyCheck_RateLimitSetMaxForThisRound")
.detail("RateLimit", rateLimitForThisRound);
}
}
bytesReadInRange += totalReadAmount;
// Advance to the next set of entries
if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) {
VectorRef<KeyValueRef> result = keyValueFutures[firstValidServer].get().get().data;
ASSERT(result.size() > 0);
begin = firstGreaterThan(result[result.size() - 1].key);
ASSERT(begin.getKey() != allKeys.end);
lastStartSampleKey = lastSampleKey;
TraceEvent(SevDebug, "CacheConsistencyCheckNextBeginKey").detail("Key", begin.toString());
} else
break;
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("CacheConsistencyCheck_RetryDataConsistency").error(err);
}
}
}
if (bytesReadInRange > 0) {
TraceEvent("CacheConsistencyCheck_ReadRange")
.suppressFor(1.0)
.detail("Range", printable(iter->range()))
.detail("BytesRead", bytesReadInRange);
}
}
return true;
}
ACTOR Future<bool> fetchKeyValuesFromSS(Database cx, ConsistencyCheckWorkload* self, KeyRangeRef range,
Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise,
bool removePrefix) {
// get the SS where range is persisted
state Promise<std::vector<std::pair<KeyRange, std::vector<StorageServerInterface>>>> keyServerPromise;
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise, range));
if (!keyServerResult) return false;
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> shards =
keyServerPromise.getFuture().get();
state Standalone<VectorRef<KeyValueRef>> keyLocations;
state Key beginKey = allKeys.begin.withPrefix(range.begin);
state Key endKey = allKeys.end.withPrefix(range.begin);
state int i = 0;
state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior
// If the responses are too big, we may use multiple requests to get the key locations. Each request begins
// where the last left off
for (; i < shards.size(); i++) {
while (beginKey < std::min<KeyRef>(shards[i].first.end, endKey)) {
try {
Version version = wait(self->getVersion(cx, self));
GetKeyValuesRequest req;
req.begin = firstGreaterOrEqual(beginKey);
req.end = firstGreaterOrEqual(std::min<KeyRef>(shards[i].first.end, endKey));
req.limit = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT;
req.limitBytes = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES;
req.version = version;
req.tags = TagSet();
// Try getting the shard locations from the key servers
state vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
for (const auto& kv : shards[i].second) {
resetReply(req);
keyValueFutures.push_back(kv.getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
TraceEvent(SevDebug, "CheckCacheConsistency")
.detail("Prefix", range.begin.toString())
.detail("Begin", req.begin.getKey().toString())
.detail("End", req.end.getKey().toString());
wait(waitForAll(keyValueFutures));
int firstValidStorageServer = -1;
// Read the shard location results
for (int j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> reply = keyValueFutures[j].get();
if (!reply.present() || reply.get().error.present()) {
// If the storage server didn't reply in a quiescent database, then the check fails
if (self->performQuiescentChecks) {
TraceEvent("ConsistencyCheck_KeyServerUnavailable")
.detail("StorageServer", shards[i].second[j].id().toString().c_str());
self->testFailure("Key server unavailable");
return false;
}
// If no storage servers replied, then throw all_alternatives_failed to force a retry
else if (firstValidStorageServer < 0 && j == keyValueFutures.size() - 1)
throw all_alternatives_failed();
}
// If this is the first storage server, store the locations to send back to the caller
else if (firstValidStorageServer < 0) {
firstValidStorageServer = j;
// Otherwise, compare the data to the results from the first storage server. If they are
// different, then the check fails
} else if (reply.get().data != keyValueFutures[firstValidStorageServer].get().get().data ||
reply.get().more != keyValueFutures[firstValidStorageServer].get().get().more) {
TraceEvent("ConsistencyCheck_InconsistentKeyServers")
.detail("StorageServer1", shards[i].second[firstValidStorageServer].id())
.detail("StorageServer2", shards[i].second[j].id());
self->testFailure("Key servers inconsistent", true);
return false;
}
}
auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get();
for (const auto& kv : keyValueResponse.data) {
keyLocations.push_back_deep(
keyLocations.arena(),
KeyValueRef(removePrefix ? kv.key.removePrefix(range.begin) : kv.key, kv.value));
}
// Next iteration should pick up where we left off
ASSERT(keyLocations.size() >= 1);
if (!keyValueResponse.more) {
beginKey = shards[i].first.end;
} else {
beginKey = keyAfter(keyValueResponse.data.end()[-1].key);
}
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("ConsistencyCheck_RetryGetKeyLocations").error(err);
}
}
}
keyLocationPromise.send(keyLocations);
return true;
}
//Gets a version at which to read from the storage servers
ACTOR Future<Version> getVersion(Database cx, ConsistencyCheckWorkload *self)
{
@ -353,17 +823,18 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}
// Get a list of storage servers from the master and compares them with the TLogs.
// Get a list of storage servers(persisting keys in "kr") from the master and compares them with the TLogs.
// If this is a quiescent check, then each commit proxy needs to respond, otherwise only one needs to respond.
// Returns false if there is a failure (in this case, keyServersPromise will never be set)
ACTOR Future<bool> getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise<std::vector<std::pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise)
{
ACTOR Future<bool> getKeyServers(
Database cx, ConsistencyCheckWorkload* self,
Promise<std::vector<std::pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise, KeyRangeRef kr) {
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> keyServers;
//Try getting key server locations from the master proxies
state vector<Future<ErrorOr<GetKeyServerLocationsReply>>> keyServerLocationFutures;
state Key begin = keyServersKeys.begin;
state Key end = keyServersKeys.end;
state Key begin = kr.begin;
state Key end = kr.end;
state int limitKeyServers = BUGGIFY ? 1 : 100;
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);

View File

@ -158,6 +158,7 @@ public:
timeout = g_network->isSimulated() ? 15000 : 1500;
databasePingDelay = g_network->isSimulated() ? 0.0 : 15.0;
runConsistencyCheck = g_network->isSimulated();
runConsistencyCheckOnCache = false;
waitForQuiescenceBegin = true;
waitForQuiescenceEnd = true;
simCheckRelocationDuration = false;
@ -169,8 +170,9 @@ public:
double databasePingDelay = -1.0)
: title(title), dumpAfterTest(dump), clearAfterTest(clear), startDelay(startDelay), useDB(useDB), timeout(600),
databasePingDelay(databasePingDelay), runConsistencyCheck(g_network->isSimulated()),
waitForQuiescenceBegin(true), waitForQuiescenceEnd(true), simCheckRelocationDuration(false),
simConnectionFailuresDisableDuration(0), simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents),
runConsistencyCheckOnCache(false), waitForQuiescenceBegin(true), waitForQuiescenceEnd(true),
simCheckRelocationDuration(false), simConnectionFailuresDisableDuration(0),
simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents),
simDrAgents(ISimulator::BackupAgentType::NoBackupAgents) {
phases = TestWorkload::SETUP | TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS;
if( databasePingDelay < 0 )
@ -187,6 +189,7 @@ public:
int timeout;
double databasePingDelay;
bool runConsistencyCheck;
bool runConsistencyCheckOnCache;
bool waitForQuiescenceBegin;
bool waitForQuiescenceEnd;

View File

@ -7,6 +7,7 @@ testTitle = 'Cached'
[[test]]
testTitle = 'Cycle'
runConsistencyCheckOnCache = true
[[test.workload]]
testName = 'Cycle'