Update comments for cache consistency check, refactor some part of the code

This commit is contained in:
Chaoguang Lin 2021-03-09 12:23:38 -08:00
parent 8b578185f4
commit f99b6b04fe
1 changed files with 253 additions and 235 deletions

View File

@ -19,6 +19,7 @@
*/
#include <math.h>
#include "boost/lexical_cast.hpp"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
@ -345,16 +346,21 @@ struct ConsistencyCheckWorkload : TestWorkload
return Void();
}
// Check the data consistency between storage cache servers and storage servers
// keyLocations: all key/value pairs persisted in the database, reused from previous consistency check on all
// storage servers
ACTOR Future<bool> checkCacheConsistency(Database cx, VectorRef<KeyValueRef> keyLocations,
ConsistencyCheckWorkload* self) {
state Promise<Standalone<VectorRef<KeyValueRef>>> cacheKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> cacheServerKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> serverListKeyPromise;
state Promise<Standalone<VectorRef<KeyValueRef>>> serverTagKeyPromise;
state Standalone<VectorRef<KeyValueRef>> cacheKey;
state Standalone<VectorRef<KeyValueRef>> cacheServer;
state Standalone<VectorRef<KeyValueRef>> serverList;
state Standalone<VectorRef<KeyValueRef>> serverTag;
state Standalone<VectorRef<KeyValueRef>> cacheKey; // "\xff/storageCache/[[begin]]" := "[[vector<uint16_t>]]"
state Standalone<VectorRef<KeyValueRef>>
cacheServer; // "\xff/storageCacheServer/[[UID]] := StorageServerInterface"
state Standalone<VectorRef<KeyValueRef>>
serverList; // "\xff/serverList/[[serverID]]" := "[[StorageServerInterface]]"
state Standalone<VectorRef<KeyValueRef>> serverTag; // "\xff/serverTag/[[serverID]]" = "[[Tag]]"
std::vector<Future<bool>> cacheResultsPromise;
cacheResultsPromise.push_back(self->fetchKeyValuesFromSS(cx, self, storageCacheKeys, cacheKeyPromise, true));
@ -370,11 +376,14 @@ struct ConsistencyCheckWorkload : TestWorkload
serverList = serverListKeyPromise.getFuture().get();
serverTag = serverTagKeyPromise.getFuture().get();
} else {
TraceEvent(SevDebug, "CheckCacheConsistencyFailed")
.detail("CacheKey", boost::lexical_cast<std::string>(cacheResults[0]))
.detail("CacheServerKey", boost::lexical_cast<std::string>(cacheResults[1]))
.detail("ServerListKey", boost::lexical_cast<std::string>(cacheResults[2]))
.detail("ServerTagKey", boost::lexical_cast<std::string>(cacheResults[3]));
return false;
}
state int effectiveClientCount = (self->distributed) ? self->clientCount : 1;
state int increment =
(self->distributed && !self->firstClient) ? effectiveClientCount * self->shardSampleFactor : 1;
state int rateLimitForThisRound =
self->bytesReadInPreviousRound == 0
? self->rateLimitMax
@ -388,27 +397,28 @@ struct ConsistencyCheckWorkload : TestWorkload
state double rateLimiterStartTime = now();
state int bytesReadInRange = 0;
// Get all SS interfaces
// Get all cache storage servers' interfaces
// Note: currently, all storage cache servers cache the same data
// Thus, no need to differentiate them for now
state std::vector<StorageServerInterface> cacheServerInterfaces;
for (const auto& kv : cacheServer) {
// TODO: check uniqueness?
StorageServerInterface cacheServer = decodeServerListValue(kv.value);
TraceEvent(SevDebug, "CheckCacheConsistency")
.detail("CacheServerAddress", cacheServer.address().toString())
.detail("Key", kv.key.toString())
.detail("Existing", std::find(cacheServerInterfaces.begin(), cacheServerInterfaces.end(),
cacheServer) != cacheServerInterfaces.end());
if (std::find(cacheServerInterfaces.begin(), cacheServerInterfaces.end(), cacheServer) ==
cacheServerInterfaces.end())
cacheServerInterfaces.push_back(cacheServer);
// Uniqueness
ASSERT(std::find(cacheServerInterfaces.begin(), cacheServerInterfaces.end(), cacheServer) ==
cacheServerInterfaces.end());
cacheServerInterfaces.push_back(cacheServer);
}
TraceEvent(SevDebug, "CheckCacheConsistencyCacheServers")
.detail("SSInterfaces", describe(cacheServerInterfaces));
// construct cache key location map
.detail("CacheSSInterfaces", describe(cacheServerInterfaces));
// Construct a key range map where the value for each range,
// if the range is cached, then a list of cache server interfaces plus one storage server interfaces(randomly
// pick) if not cached, empty
state KeyRangeMap<std::vector<StorageServerInterface>> cachedKeysLocationMap;
// First, for any range is cached, update the list to have all cache storage interfaces
for (int k = 0; k < cacheKey.size(); k++) {
std::vector<uint16_t> serverIndices;
decodeStorageCacheValue(cacheKey[k].value, serverIndices);
// non-empty means this is the start of a cached range
if (serverIndices.size()) {
KeyRangeRef range(cacheKey[k].key, (k < cacheKey.size() - 1) ? cacheKey[k + 1].key : allKeys.end);
cachedKeysLocationMap.insert(range, cacheServerInterfaces);
@ -417,14 +427,16 @@ struct ConsistencyCheckWorkload : TestWorkload
.detail("Index", k);
}
}
// construct UID2SS amp
// Second, insert corresponding storage servers into the list
// Here we need to construct a UID2SS map
state std::map<UID, StorageServerInterface> UIDtoSSMap;
for (const auto& kv : serverList) {
UID serverId = decodeServerListKey(kv.key);
UIDtoSSMap[serverId] = decodeServerListValue(kv.value);
TraceEvent(SevDebug, "CheckCacheConsistencyStorageServer").detail("UID", serverId);
}
// add storage servers
// Now, for each shard, check if it is cached,
// if cached, add storage servers that persist the data into the list
for (int k = 0; k < keyLocations.size() - 1; k++) {
KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key);
std::vector<UID> sourceStorageServers;
@ -435,35 +447,42 @@ struct ConsistencyCheckWorkload : TestWorkload
std::vector<UID> storageServers = (isRelocating) ? destStorageServers : sourceStorageServers;
std::vector<StorageServerInterface> storageServerInterfaces;
for (const auto& UID : storageServers) {
// TODO: add traces
storageServerInterfaces.push_back(UIDtoSSMap[UID]);
}
std::vector<StorageServerInterface> allSS(cacheServerInterfaces);
allSS.insert(allSS.end(), storageServerInterfaces.begin(), storageServerInterfaces.end());
// The shard may overlap with several cached ranges
// only insert the storage server interface on cached ranges
// Since range.end is next range.begin, and both begins are Key(), so we always hold this condition
// Note: both ends are allKeys.end
auto begin_iter = cachedKeysLocationMap.rangeContaining(range.begin);
if (begin_iter->begin() != range.begin) {
// insert range.begin
cachedKeysLocationMap.insert(KeyRangeRef(range.begin, begin_iter->end()), cacheServerInterfaces);
}
ASSERT(begin_iter->begin() == range.begin);
// Split the range to maintain the condition
auto end_iter = cachedKeysLocationMap.rangeContaining(range.end);
if (end_iter->begin() != range.end) {
cachedKeysLocationMap.insert(KeyRangeRef(end_iter->begin(), range.end), cacheServerInterfaces);
cachedKeysLocationMap.insert(KeyRangeRef(end_iter->begin(), range.end), end_iter->value());
}
// TODO : Add assertions
for (auto iter = cachedKeysLocationMap.rangeContaining(range.begin);
iter != cachedKeysLocationMap.rangeContaining(range.end); ++iter) {
if (iter->value().size()) {
iter->value() = allSS;
// randomly pick one for check since the data are guaranteed to be consistent on any SS
iter->value().push_back(deterministicRandom()->randomChoice(storageServerInterfaces));
}
}
}
// Once having the KeyRangeMap, iterate all cached ranges and verify consistency
state RangeMap<Key, std::vector<StorageServerInterface>, KeyRangeRef>::Ranges iter_ranges =
cachedKeysLocationMap.containedRanges(allKeys);
state RangeMap<Key, std::vector<StorageServerInterface>, KeyRangeRef>::iterator iter = iter_ranges.begin();
state std::vector<StorageServerInterface> iter_ss;
state int shard = 0;
while (shard < self->clientId * (self->shardSampleFactor + 1) && iter != iter_ranges.end()) {
state int effectiveClientCount = (self->distributed) ? self->clientCount : 1;
state int increment = self->distributed ? effectiveClientCount * self->shardSampleFactor : 1;
state int shard = 0; // index used for spliting work on different clients
// move the index to the first responsible cached range
while (shard < self->clientId * self->shardSampleFactor && iter != iter_ranges.end()) {
if (iter->value().empty()) {
++iter;
continue;
@ -474,216 +493,212 @@ struct ConsistencyCheckWorkload : TestWorkload
for (; iter != iter_ranges.end(); ++iter) {
iter_ss = iter->value();
if (iter_ss.empty()) continue;
if (shard % increment) {
if (shard % increment != (self->clientId * self->shardSampleFactor) % increment) {
++shard;
continue;
}
if (!self->firstClient || shard++ % (effectiveClientCount * self->shardSampleFactor) == 0) {
state Key lastSampleKey;
state Key lastStartSampleKey;
state int64_t totalReadAmount = 0;
// TODO: in the future, run a check based on estimated data size like the existing storage servers'
// consistency check on the first client
state Key lastSampleKey;
state Key lastStartSampleKey;
state int64_t totalReadAmount = 0;
state KeySelector begin = firstGreaterOrEqual(iter->begin());
state Transaction onErrorTr(
cx); // This transaction exists only to access onError and its backoff behavior
state KeySelector begin = firstGreaterOrEqual(iter->begin());
state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior
// Read a limited number of entries at a time, repeating until all keys in the shard have been read
loop {
try {
lastSampleKey = lastStartSampleKey;
// Read a limited number of entries at a time, repeating until all keys in the shard have been read
loop {
try {
lastSampleKey = lastStartSampleKey;
// Get the min version of the storage servers
Version version = wait(self->getVersion(cx, self));
// Get the min version of the storage servers
Version version = wait(self->getVersion(cx, self));
state GetKeyValuesRequest req;
req.begin = begin;
req.end = firstGreaterOrEqual(iter->end());
req.limit = 1e4;
req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.version = version;
req.tags = TagSet();
state GetKeyValuesRequest req;
req.begin = begin;
req.end = firstGreaterOrEqual(iter->end());
req.limit = 1e4;
req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.version = version;
req.tags = TagSet();
// Try getting the entries in the specified range
state vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
state int j = 0;
for (j = 0; j < iter_ss.size(); j++) {
resetReply(req);
keyValueFutures.push_back(iter_ss[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
// Try getting the entries in the specified range
state vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
state int j = 0;
for (j = 0; j < iter_ss.size(); j++) {
resetReply(req);
keyValueFutures.push_back(iter_ss[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
wait(waitForAll(keyValueFutures));
TraceEvent(SevDebug, "CheckCacheConsistencyComparison")
.detail("Begin", req.begin.toString())
.detail("End", req.end.toString())
.detail("SSInterfaces", describe(iter_ss));
wait(waitForAll(keyValueFutures));
TraceEvent(SevDebug, "CheckCacheConsistencyComparison")
.detail("Begin", req.begin.toString())
.detail("End", req.end.toString())
.detail("SSInterfaces", describe(iter_ss));
// Read the resulting entries
state int firstValidServer = -1;
totalReadAmount = 0;
for (j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();
// if (rangeResult.isError()) {
// throw rangeResult.getError();
// }
// Read the resulting entries
state int firstValidServer = -1;
totalReadAmount = 0;
for (j = 0; j < keyValueFutures.size(); j++) {
ErrorOr<GetKeyValuesReply> rangeResult = keyValueFutures[j].get();
// if (rangeResult.isError()) {
// throw rangeResult.getError();
// }
// Compare the results with other storage servers
if (rangeResult.present() && !rangeResult.get().error.present()) {
state GetKeyValuesReply current = rangeResult.get();
totalReadAmount += current.data.expectedSize();
TraceEvent(SevDebug, "CheckCacheConsistencyResult")
.detail("SSInterface", iter_ss[j].uniqueID);
// If we haven't encountered a valid storage server yet, then mark this as the baseline
// to compare against
if (firstValidServer == -1) firstValidServer = j;
// Compare the results with other storage servers
if (rangeResult.present() && !rangeResult.get().error.present()) {
state GetKeyValuesReply current = rangeResult.get();
totalReadAmount += current.data.expectedSize();
TraceEvent(SevDebug, "CheckCacheConsistencyResult")
.detail("SSInterface", iter_ss[j].uniqueID);
// If we haven't encountered a valid storage server yet, then mark this as the baseline
// to compare against
if (firstValidServer == -1) firstValidServer = j;
// Compare this shard against the first
else {
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
// Compare this shard against the first
else {
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
if (current.data != reference.data || current.more != reference.more) {
// Be especially verbose if in simulation
if (g_network->isSimulated()) {
int invalidIndex = -1;
printf("\nSERVER %d (%s); shard = %s - %s:\n", j,
iter_ss[j].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < current.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(current.data[k].key).c_str(),
printable(current.data[k].value).c_str());
if (invalidIndex < 0 &&
(k >= reference.data.size() ||
current.data[k].key != reference.data[k].key ||
current.data[k].value != reference.data[k].value))
invalidIndex = k;
}
printf("\nSERVER %d (%s); shard = %s - %s:\n", firstValidServer,
iter_ss[firstValidServer].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < reference.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(reference.data[k].key).c_str(),
printable(reference.data[k].value).c_str());
if (invalidIndex < 0 &&
(k >= current.data.size() ||
reference.data[k].key != current.data[k].key ||
reference.data[k].value != current.data[k].value))
invalidIndex = k;
}
printf("\nMISMATCH AT %d\n\n", invalidIndex);
if (current.data != reference.data || current.more != reference.more) {
// Be especially verbose if in simulation
if (g_network->isSimulated()) {
int invalidIndex = -1;
printf("\nSERVER %d (%s); shard = %s - %s:\n", j,
iter_ss[j].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < current.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(current.data[k].key).c_str(),
printable(current.data[k].value).c_str());
if (invalidIndex < 0 && (k >= reference.data.size() ||
current.data[k].key != reference.data[k].key ||
current.data[k].value != reference.data[k].value))
invalidIndex = k;
}
// Data for trace event
// The number of keys unique to the current shard
int currentUniques = 0;
// The number of keys unique to the reference shard
int referenceUniques = 0;
// The number of keys in both shards with conflicting values
int valueMismatches = 0;
// The number of keys in both shards with matching values
int matchingKVPairs = 0;
// Last unique key on the current shard
KeyRef currentUniqueKey;
// Last unique key on the reference shard
KeyRef referenceUniqueKey;
// Last value mismatch
KeyRef valueMismatchKey;
printf("\nSERVER %d (%s); shard = %s - %s:\n", firstValidServer,
iter_ss[firstValidServer].address().toString().c_str(),
printable(req.begin.getKey()).c_str(),
printable(req.end.getKey()).c_str());
for (int k = 0; k < reference.data.size(); k++) {
printf("%d. %s => %s\n", k, printable(reference.data[k].key).c_str(),
printable(reference.data[k].value).c_str());
if (invalidIndex < 0 && (k >= current.data.size() ||
reference.data[k].key != current.data[k].key ||
reference.data[k].value != current.data[k].value))
invalidIndex = k;
}
// Loop indeces
int currentI = 0;
int referenceI = 0;
while (currentI < current.data.size() || referenceI < reference.data.size()) {
if (currentI >= current.data.size()) {
referenceUniqueKey = reference.data[referenceI].key;
referenceUniques++;
printf("\nMISMATCH AT %d\n\n", invalidIndex);
}
// Data for trace event
// The number of keys unique to the current shard
int currentUniques = 0;
// The number of keys unique to the reference shard
int referenceUniques = 0;
// The number of keys in both shards with conflicting values
int valueMismatches = 0;
// The number of keys in both shards with matching values
int matchingKVPairs = 0;
// Last unique key on the current shard
KeyRef currentUniqueKey;
// Last unique key on the reference shard
KeyRef referenceUniqueKey;
// Last value mismatch
KeyRef valueMismatchKey;
// Loop indeces
int currentI = 0;
int referenceI = 0;
while (currentI < current.data.size() || referenceI < reference.data.size()) {
if (currentI >= current.data.size()) {
referenceUniqueKey = reference.data[referenceI].key;
referenceUniques++;
referenceI++;
} else if (referenceI >= reference.data.size()) {
currentUniqueKey = current.data[currentI].key;
currentUniques++;
currentI++;
} else {
KeyValueRef currentKV = current.data[currentI];
KeyValueRef referenceKV = reference.data[referenceI];
if (currentKV.key == referenceKV.key) {
if (currentKV.value == referenceKV.value)
matchingKVPairs++;
else {
valueMismatchKey = currentKV.key;
valueMismatches++;
}
currentI++;
referenceI++;
} else if (referenceI >= reference.data.size()) {
currentUniqueKey = current.data[currentI].key;
} else if (currentKV.key < referenceKV.key) {
currentUniqueKey = currentKV.key;
currentUniques++;
currentI++;
} else {
KeyValueRef currentKV = current.data[currentI];
KeyValueRef referenceKV = reference.data[referenceI];
if (currentKV.key == referenceKV.key) {
if (currentKV.value == referenceKV.value)
matchingKVPairs++;
else {
valueMismatchKey = currentKV.key;
valueMismatches++;
}
currentI++;
referenceI++;
} else if (currentKV.key < referenceKV.key) {
currentUniqueKey = currentKV.key;
currentUniques++;
currentI++;
} else {
referenceUniqueKey = referenceKV.key;
referenceUniques++;
referenceI++;
}
referenceUniqueKey = referenceKV.key;
referenceUniques++;
referenceI++;
}
}
TraceEvent("CacheConsistencyCheck_DataInconsistent")
.detail(format("StorageServer%d", j).c_str(), iter_ss[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
iter_ss[firstValidServer].toString())
.detail("ShardBegin", printable(req.begin.getKey()))
.detail("ShardEnd", printable(req.end.getKey()))
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), printable(currentUniqueKey))
.detail(format("Server%dUniques", firstValidServer).c_str(),
referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(),
printable(referenceUniqueKey))
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", printable(valueMismatchKey))
.detail("MatchingKVPairs", matchingKVPairs);
self->testFailure("Data inconsistent", true);
return false;
}
TraceEvent("CacheConsistencyCheck_DataInconsistent")
.detail(format("StorageServer%d", j).c_str(), iter_ss[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
iter_ss[firstValidServer].toString())
.detail("ShardBegin", printable(req.begin.getKey()))
.detail("ShardEnd", printable(req.end.getKey()))
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), printable(currentUniqueKey))
.detail(format("Server%dUniques", firstValidServer).c_str(), referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(),
printable(referenceUniqueKey))
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", printable(valueMismatchKey))
.detail("MatchingKVPairs", matchingKVPairs);
self->testFailure("Data inconsistent", true);
return false;
}
}
}
// after requesting each shard, enforce rate limit based on how much data will likely be read
if (rateLimitForThisRound > 0) {
wait(rateLimiter->getAllowance(totalReadAmount));
// Set ratelimit to max allowed if current round has been going on for a while
if (now() - rateLimiterStartTime >
1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME &&
rateLimitForThisRound != self->rateLimitMax) {
rateLimitForThisRound = self->rateLimitMax;
rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
rateLimiterStartTime = now();
TraceEvent(SevInfo, "CacheConsistencyCheck_RateLimitSetMaxForThisRound")
.detail("RateLimit", rateLimitForThisRound);
}
}
bytesReadInRange += totalReadAmount;
// Advance to the next set of entries
if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) {
VectorRef<KeyValueRef> result = keyValueFutures[firstValidServer].get().get().data;
ASSERT(result.size() > 0);
begin = firstGreaterThan(result[result.size() - 1].key);
ASSERT(begin.getKey() != allKeys.end);
lastStartSampleKey = lastSampleKey;
TraceEvent(SevDebug, "CacheConsistencyCheckNextBeginKey").detail("Key", begin.toString());
} else
break;
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("CacheConsistencyCheck_RetryDataConsistency").error(err);
}
// after requesting each shard, enforce rate limit based on how much data will likely be read
if (rateLimitForThisRound > 0) {
wait(rateLimiter->getAllowance(totalReadAmount));
// Set ratelimit to max allowed if current round has been going on for a while
if (now() - rateLimiterStartTime >
1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME &&
rateLimitForThisRound != self->rateLimitMax) {
rateLimitForThisRound = self->rateLimitMax;
rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
rateLimiterStartTime = now();
TraceEvent(SevInfo, "CacheConsistencyCheck_RateLimitSetMaxForThisRound")
.detail("RateLimit", rateLimitForThisRound);
}
}
bytesReadInRange += totalReadAmount;
// Advance to the next set of entries
if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) {
VectorRef<KeyValueRef> result = keyValueFutures[firstValidServer].get().get().data;
ASSERT(result.size() > 0);
begin = firstGreaterThan(result[result.size() - 1].key);
ASSERT(begin.getKey() != allKeys.end);
lastStartSampleKey = lastSampleKey;
TraceEvent(SevDebug, "CacheConsistencyCheckNextBeginKey").detail("Key", begin.toString());
} else
break;
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("CacheConsistencyCheck_RetryDataConsistency").error(err);
}
}
@ -697,24 +712,30 @@ struct ConsistencyCheckWorkload : TestWorkload
return true;
}
// Directly fetch key/values from storage servers through GetKeyValuesRequest
// In particular, avoid transaction-based read which may read data from storage cache servers
// range: all key/values in the range will be fetched
// removePrefix: if true, remove the prefix of the range, e.g. \xff/storageCacheServer/
ACTOR Future<bool> fetchKeyValuesFromSS(Database cx, ConsistencyCheckWorkload* self, KeyRangeRef range,
Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise,
Promise<Standalone<VectorRef<KeyValueRef>>> resultPromise,
bool removePrefix) {
// get the SS where range is persisted
// get shards paired with corresponding storage servers
state Promise<std::vector<std::pair<KeyRange, std::vector<StorageServerInterface>>>> keyServerPromise;
bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise, range));
if (!keyServerResult) return false;
state std::vector<std::pair<KeyRange, vector<StorageServerInterface>>> shards =
keyServerPromise.getFuture().get();
state Standalone<VectorRef<KeyValueRef>> keyLocations;
// key/value pairs in the given range
state Standalone<VectorRef<KeyValueRef>> result;
state Key beginKey = allKeys.begin.withPrefix(range.begin);
state Key endKey = allKeys.end.withPrefix(range.begin);
state int i = 0;
state int i; // index
state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior
// If the responses are too big, we may use multiple requests to get the key locations. Each request begins
// where the last left off
for (; i < shards.size(); i++) {
for (i = 0; i < shards.size(); i++) {
while (beginKey < std::min<KeyRef>(shards[i].first.end, endKey)) {
try {
Version version = wait(self->getVersion(cx, self));
@ -727,18 +748,15 @@ struct ConsistencyCheckWorkload : TestWorkload
req.version = version;
req.tags = TagSet();
// Try getting the shard locations from the key servers
// Fetch key/values from storage servers
// Here we read from all storage servers and make sure results are consistent
// Note: this maybe duplicate but to make sure all storage servers available in a quiescent database
state vector<Future<ErrorOr<GetKeyValuesReply>>> keyValueFutures;
for (const auto& kv : shards[i].second) {
resetReply(req);
keyValueFutures.push_back(kv.getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
}
TraceEvent(SevDebug, "CheckCacheConsistency")
.detail("Prefix", range.begin.toString())
.detail("Begin", req.begin.getKey().toString())
.detail("End", req.end.getKey().toString());
wait(waitForAll(keyValueFutures));
int firstValidStorageServer = -1;
@ -750,7 +768,7 @@ struct ConsistencyCheckWorkload : TestWorkload
if (!reply.present() || reply.get().error.present()) {
// If the storage server didn't reply in a quiescent database, then the check fails
if (self->performQuiescentChecks) {
TraceEvent("ConsistencyCheck_KeyServerUnavailable")
TraceEvent("CacheConsistencyCheck_KeyServerUnavailable")
.detail("StorageServer", shards[i].second[j].id().toString().c_str());
self->testFailure("Key server unavailable");
return false;
@ -769,7 +787,7 @@ struct ConsistencyCheckWorkload : TestWorkload
// different, then the check fails
} else if (reply.get().data != keyValueFutures[firstValidStorageServer].get().get().data ||
reply.get().more != keyValueFutures[firstValidStorageServer].get().get().more) {
TraceEvent("ConsistencyCheck_InconsistentKeyServers")
TraceEvent("CacheConsistencyCheck_InconsistentKeyServers")
.detail("StorageServer1", shards[i].second[firstValidStorageServer].id())
.detail("StorageServer2", shards[i].second[j].id());
self->testFailure("Key servers inconsistent", true);
@ -780,13 +798,13 @@ struct ConsistencyCheckWorkload : TestWorkload
auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get();
for (const auto& kv : keyValueResponse.data) {
keyLocations.push_back_deep(
keyLocations.arena(),
result.push_back_deep(
result.arena(),
KeyValueRef(removePrefix ? kv.key.removePrefix(range.begin) : kv.key, kv.value));
}
// Next iteration should pick up where we left off
ASSERT(keyLocations.size() >= 1);
ASSERT(result.size() >= 1);
if (!keyValueResponse.more) {
beginKey = shards[i].first.end;
} else {
@ -795,12 +813,12 @@ struct ConsistencyCheckWorkload : TestWorkload
} catch (Error& e) {
state Error err = e;
wait(onErrorTr.onError(err));
TraceEvent("ConsistencyCheck_RetryGetKeyLocations").error(err);
TraceEvent("CacheConsistencyCheck_RetryGetKeyLocations").error(err);
}
}
}
keyLocationPromise.send(keyLocations);
resultPromise.send(result);
return true;
}
@ -823,9 +841,9 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}
// Get a list of storage servers(persisting keys in "kr") from the master and compares them with the TLogs.
// If this is a quiescent check, then each commit proxy needs to respond, otherwise only one needs to respond.
// Returns false if there is a failure (in this case, keyServersPromise will never be set)
// Get a list of storage servers(persisting keys within range "kr") from the master and compares them with the
// TLogs. If this is a quiescent check, then each commit proxy needs to respond, otherwise only one needs to
// respond. Returns false if there is a failure (in this case, keyServersPromise will never be set)
ACTOR Future<bool> getKeyServers(
Database cx, ConsistencyCheckWorkload* self,
Promise<std::vector<std::pair<KeyRange, vector<StorageServerInterface>>>> keyServersPromise, KeyRangeRef kr) {