Merge pull request #580 from alexmiller-apple/consistencycheck-5.2
Backport consistencycheck fixes from release-6.0
This commit is contained in:
commit
ad08fbad5b
|
@ -368,29 +368,22 @@ struct ConsistencyCheckWorkload : TestWorkload
|
||||||
ACTOR Future<bool> getKeyLocations(Database cx, vector<pair<KeyRange, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
|
ACTOR Future<bool> getKeyLocations(Database cx, vector<pair<KeyRange, vector<StorageServerInterface>>> shards, ConsistencyCheckWorkload *self, Promise<Standalone<VectorRef<KeyValueRef>>> keyLocationPromise)
|
||||||
{
|
{
|
||||||
state Standalone<VectorRef<KeyValueRef>> keyLocations;
|
state Standalone<VectorRef<KeyValueRef>> keyLocations;
|
||||||
state Key beginKey = allKeys.begin;
|
state Key beginKey = allKeys.begin.withPrefix(keyServersPrefix);
|
||||||
|
state Key endKey = allKeys.end.withPrefix(keyServersPrefix);
|
||||||
state int i = 0;
|
state int i = 0;
|
||||||
|
|
||||||
//If the responses are too big, we may use multiple requests to get the key locations. Each request begins where the last left off
|
//If the responses are too big, we may use multiple requests to get the key locations. Each request begins where the last left off
|
||||||
for ( ; i < shards.size(); i++)
|
for ( ; i < shards.size(); i++)
|
||||||
{
|
{
|
||||||
// skip serverList shards
|
while(beginKey < std::min<KeyRef>(shards[i].first.end, endKey))
|
||||||
if (!shards[i].first.begin.startsWith(keyServersPrefix)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
state Key endKey = shards[i].first.end.startsWith(keyServersPrefix) ? shards[i].first.end.removePrefix(keyServersPrefix) : allKeys.end;
|
|
||||||
|
|
||||||
while(beginKey < endKey)
|
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Version version = wait(self->getVersion(cx, self));
|
Version version = wait(self->getVersion(cx, self));
|
||||||
|
|
||||||
GetKeyValuesRequest req;
|
GetKeyValuesRequest req;
|
||||||
Key prefixBegin = beginKey.withPrefix(keyServersPrefix);
|
req.begin = firstGreaterOrEqual(beginKey);
|
||||||
req.begin = firstGreaterOrEqual(prefixBegin);
|
req.end = firstGreaterOrEqual(std::min<KeyRef>(shards[i].first.end, endKey));
|
||||||
req.end = firstGreaterOrEqual(keyServersEnd);
|
|
||||||
req.limit = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT;
|
req.limit = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT;
|
||||||
req.limitBytes = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES;
|
req.limitBytes = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES;
|
||||||
req.version = version;
|
req.version = version;
|
||||||
|
@ -442,17 +435,26 @@ struct ConsistencyCheckWorkload : TestWorkload
|
||||||
}
|
}
|
||||||
|
|
||||||
auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get();
|
auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get();
|
||||||
Standalone<RangeResultRef> currentLocations = krmDecodeRanges( keyServersPrefix, KeyRangeRef(beginKey, endKey), RangeResultRef( keyValueResponse.data, keyValueResponse.more) );
|
Standalone<RangeResultRef> currentLocations = krmDecodeRanges( keyServersPrefix, KeyRangeRef(beginKey.removePrefix(keyServersPrefix), std::min<KeyRef>(shards[i].first.end, endKey).removePrefix(keyServersPrefix)), RangeResultRef( keyValueResponse.data, keyValueResponse.more) );
|
||||||
|
|
||||||
//Push all but the last item, which will be pushed as the first item next iteration
|
if(keyValueResponse.data.size() && beginKey == keyValueResponse.data[0].key) {
|
||||||
keyLocations.append_deep(keyLocations.arena(), currentLocations.begin(), currentLocations.size() - 1);
|
keyLocations.push_back_deep(keyLocations.arena(), currentLocations[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(currentLocations.size() > 2) {
|
||||||
|
keyLocations.append_deep(keyLocations.arena(), ¤tLocations[1], currentLocations.size() - 2);
|
||||||
|
}
|
||||||
|
|
||||||
//Next iteration should pick up where we left off
|
//Next iteration should pick up where we left off
|
||||||
ASSERT(currentLocations.size() > 1);
|
ASSERT(currentLocations.size() > 1);
|
||||||
beginKey = currentLocations.end()[-1].key;
|
if(!keyValueResponse.more) {
|
||||||
|
beginKey = shards[i].first.end;
|
||||||
|
} else {
|
||||||
|
beginKey = keyValueResponse.data.end()[-1].key;
|
||||||
|
}
|
||||||
|
|
||||||
//If this is the last iteration, then push the allKeys.end KV pair
|
//If this is the last iteration, then push the allKeys.end KV pair
|
||||||
if(beginKey == allKeys.end)
|
if(beginKey >= endKey)
|
||||||
keyLocations.push_back_deep(keyLocations.arena(), currentLocations.end()[-1]);
|
keyLocations.push_back_deep(keyLocations.arena(), currentLocations.end()[-1]);
|
||||||
}
|
}
|
||||||
catch(Error &e)
|
catch(Error &e)
|
||||||
|
@ -970,7 +972,7 @@ struct ConsistencyCheckWorkload : TestWorkload
|
||||||
//Min and max shard sizes have a 3 * shardBounds.permittedError.bytes cushion for error since shard sizes are not precise
|
//Min and max shard sizes have a 3 * shardBounds.permittedError.bytes cushion for error since shard sizes are not precise
|
||||||
//Shard splits ignore the first key in a shard, so its size shouldn't be considered when checking the upper bound
|
//Shard splits ignore the first key in a shard, so its size shouldn't be considered when checking the upper bound
|
||||||
//0xff shards are not checked
|
//0xff shards are not checked
|
||||||
if( canSplit && self->performQuiescentChecks && !range.begin.startsWith(keyServersPrefix) &&
|
if( canSplit && sampledKeys > 5 && self->performQuiescentChecks && !range.begin.startsWith(keyServersPrefix) &&
|
||||||
(sampledBytes < shardBounds.min.bytes - 3 * shardBounds.permittedError.bytes || sampledBytes - firstKeySampledBytes > shardBounds.max.bytes + 3 * shardBounds.permittedError.bytes))
|
(sampledBytes < shardBounds.min.bytes - 3 * shardBounds.permittedError.bytes || sampledBytes - firstKeySampledBytes > shardBounds.max.bytes + 3 * shardBounds.permittedError.bytes))
|
||||||
{
|
{
|
||||||
TraceEvent("ConsistencyCheck_InvalidShardSize").detail("Min", shardBounds.min.bytes).detail("Max", shardBounds.max.bytes).detail("Size", shardBytes)
|
TraceEvent("ConsistencyCheck_InvalidShardSize").detail("Min", shardBounds.min.bytes).detail("Max", shardBounds.max.bytes).detail("Size", shardBytes)
|
||||||
|
|
Loading…
Reference in New Issue