Fix checkall when shard is large
the begin key has to be updated for checkall command when a shard is large, this PR makes this change.
This commit is contained in:
parent
e458651a04
commit
e8cdfc5a0c
|
@ -158,11 +158,12 @@ ACTOR Future<bool> getallCommandActor(Database cx, std::vector<StringRef> tokens
|
|||
CommandFactory getallCommandFactory("getall");
|
||||
|
||||
// check that all replies are the same. Update begin to the next key to check
|
||||
bool checkResults(Version version,
|
||||
const std::vector<StorageServerInterface>& servers,
|
||||
const std::vector<Future<ErrorOr<GetKeyValuesReply>>>& replies,
|
||||
KeySelectorRef& begin,
|
||||
const KeySelectorRef& end) {
|
||||
std::pair<bool, bool> checkResults(Version version,
|
||||
const std::vector<StorageServerInterface>& servers,
|
||||
const std::vector<Future<ErrorOr<GetKeyValuesReply>>>& replies,
|
||||
KeySelectorRef& begin,
|
||||
const KeySelectorRef& end,
|
||||
Key& lastEnd) {
|
||||
bool allSame = true;
|
||||
int firstValidServer = -1;
|
||||
for (int j = 0; j < replies.size(); j++) {
|
||||
|
@ -246,18 +247,20 @@ bool checkResults(Version version,
|
|||
}
|
||||
}
|
||||
|
||||
if (firstValidServer >= 0 && replies[firstValidServer].get().get().more) {
|
||||
bool hasMore = firstValidServer >= 0 && replies[firstValidServer].get().get().more;
|
||||
if (hasMore) {
|
||||
const VectorRef<KeyValueRef>& result = replies[firstValidServer].get().get().data;
|
||||
printf("Warning: Consistency check was incomplete, last key of server %d that was checked: %s\n",
|
||||
firstValidServer,
|
||||
printable(result[result.size() - 1].key).c_str());
|
||||
begin = firstGreaterThan(result[result.size() - 1].key);
|
||||
lastEnd = result[result.size() - 1].key; // store to a standalone, otherwise memory will be invalid
|
||||
begin = firstGreaterThan(lastEnd);
|
||||
} else {
|
||||
printf("Same at version %ld\n", version);
|
||||
printf("Consistency check finishes for version %ld\n", version);
|
||||
begin = end; // signal that we're done
|
||||
}
|
||||
|
||||
return allSame;
|
||||
return std::make_pair(allSame, hasMore);
|
||||
}
|
||||
|
||||
// The command is used to check the inconsistency in a keyspace, default is \xff\x02/blog/ keyspace.
|
||||
|
@ -301,6 +304,7 @@ ACTOR Future<bool> checkallCommandActor(Database cx, std::vector<StringRef> toke
|
|||
|
||||
state std::vector<std::pair<KeyRange, std::vector<StorageServerInterface>>> keyServers =
|
||||
keyServerPromise.getFuture().get();
|
||||
state Key lastEnd; // this is to hold the Key having the value of next begin
|
||||
for (i = 0; i < keyServers.size(); i++) { // for each key range
|
||||
state KeyRange range = keyServers[i].first;
|
||||
range = range & toCheck;
|
||||
|
@ -314,28 +318,39 @@ ACTOR Future<bool> checkallCommandActor(Database cx, std::vector<StringRef> toke
|
|||
for (const auto& server : servers) {
|
||||
printf(" %s\n", server.address().toString().c_str());
|
||||
}
|
||||
wait(store(version, getVersion(cx)));
|
||||
state std::vector<Future<ErrorOr<GetKeyValuesReply>>> replies;
|
||||
for (const auto& s : keyServers[i].second) { // for each storage server
|
||||
GetKeyValuesRequest req;
|
||||
req.begin = begin;
|
||||
req.end = end;
|
||||
req.limit = CLIENT_KNOBS->KRM_GET_RANGE_LIMIT;
|
||||
req.limitBytes = CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES;
|
||||
req.version = version;
|
||||
req.tags = TagSet();
|
||||
|
||||
replies.push_back(s.getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
|
||||
}
|
||||
// printf("waiting for %lu replies at version: %ld\n", keyServers[i].second.size(), version);
|
||||
wait(waitForAll(replies));
|
||||
if (!checkResults(version, keyServers[i].second, replies, begin, end) && !checkAll) {
|
||||
return false;
|
||||
state bool hasMore = true;
|
||||
state int round = 0;
|
||||
while (hasMore) {
|
||||
wait(store(version, getVersion(cx)));
|
||||
replies.clear();
|
||||
printf("round %d, begin Key: %s\n", round, printable(begin.toString()).c_str());
|
||||
for (const auto& s : keyServers[i].second) { // for each storage server
|
||||
GetKeyValuesRequest req;
|
||||
req.begin = begin;
|
||||
req.end = end;
|
||||
req.limit = CLIENT_KNOBS->KRM_GET_RANGE_LIMIT;
|
||||
req.limitBytes = CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES;
|
||||
req.version = version;
|
||||
req.tags = TagSet();
|
||||
replies.push_back(s.getKeyValues.getReplyUnlessFailedFor(req, 2, 0));
|
||||
}
|
||||
// printf("waiting for %lu replies at version: %ld\n", keyServers[i].second.size(), version);
|
||||
wait(waitForAll(replies));
|
||||
// if there are more results, continue checking in the same shard
|
||||
auto p = checkResults(version, keyServers[i].second, replies, begin, end, lastEnd);
|
||||
bool allSame = p.first;
|
||||
hasMore = p.second;
|
||||
printf("AllSame %d, hasMore %d, checkAll %d\n", allSame, hasMore, checkAll);
|
||||
if (!allSame && !checkAll) {
|
||||
return false;
|
||||
}
|
||||
round++;
|
||||
}
|
||||
if (begin == end) {
|
||||
// this shard is done, signaled by begin == end
|
||||
toCheck = KeyRangeRef(range.end, toCheck.end);
|
||||
}
|
||||
// TODO: if there are more results, continue checking in the same shard
|
||||
}
|
||||
} catch (Error& e) {
|
||||
printf("Retrying for error: %s\n", e.what());
|
||||
|
|
Loading…
Reference in New Issue