Merge pull request #10002 from sfc-gh-jazhuang/readThrough

Fix RangeResult.readThrough misuse
This commit is contained in:
Jay Zhuang 2023-04-27 09:59:11 -07:00 committed by GitHub
commit 0ab691b707
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 169 additions and 93 deletions

View File

@ -156,11 +156,61 @@ struct KeyValueRef {
typedef Standalone<KeyValueRef> KeyValue;
struct RangeResultRef : VectorRef<KeyValueRef> {
bool more; // True if (but not necessarily only if) values remain in the *key* range requested (possibly beyond the
// limits requested)
// False implies that no such values remain
Optional<KeyRef> readThrough; // Only present when 'more' is true. When present, this value represent the end (or
// beginning if reverse) of the range
// True if the range may have more keys in it (possibly beyond the specified limits).
// 'more' can be true even if there are no keys left in the range, e.g. if a shard boundary is hit, it may or may
// not have more keys left, but 'more' will be set to true in that case.
// Additionally, 'getRangeStream()' always sets 'more' to true and uses the 'end_of_stream' error to indicate that a
// range is exhausted.
// If 'more' is false, the range is guaranteed to have been exhausted.
bool more;
// Only present when 'more' is true, for example, when the read reaches the shard boundary, 'readThrough' is set to
// the shard boundary and the client's next range read should start with the 'readThrough'.
// But 'more' is true does not necessarily guarantee 'readThrough' is present, for example, when the read reaches
// size limit, 'readThrough' might not be set, the next read should just start from the keyAfter of the current
// query result's last key.
// In both cases, please use the getter function 'getReadThrough()' instead, which represents the end (or beginning
// if reverse) of the range which was read.
Optional<KeyRef> readThrough;
// return the value represents the end of the range which was read. If 'reverse' is true, returns the last key, as
// it should be used as the new "end" of the next query and the end key should be non-inclusive.
Key getReadThrough(bool reverse = false) const {
ASSERT(more);
if (readThrough.present()) {
return readThrough.get();
}
ASSERT(size() > 0);
return reverse ? back().key : keyAfter(back().key);
}
// Helper function to get the next range scan's BeginKeySelector, use it when the range read is non-reverse,
// otherwise, please use nextEndKeySelector().
KeySelectorRef nextBeginKeySelector() const {
ASSERT(more);
if (readThrough.present()) {
return firstGreaterOrEqual(readThrough.get());
}
ASSERT(size() > 0);
return firstGreaterThan(back().key);
}
// Helper function to get the next range scan's EndKeySelector, use it when the range read is reverse.
KeySelectorRef nextEndKeySelector() const {
ASSERT(more);
if (readThrough.present()) {
return firstGreaterOrEqual(readThrough.get());
}
ASSERT(size() > 0);
return firstGreaterOrEqual(back().key);
}
void setReadThrough(KeyRef key) {
ASSERT(more);
ASSERT(!readThrough.present());
readThrough = key;
}
// which was read to produce these results. This is guaranteed to be less than the requested range.
bool readToBegin;
bool readThroughEnd;

View File

@ -103,7 +103,7 @@ ACTOR Future<Void> PipelinedReader::getNext_impl(PipelinedReader* self, Database
return Void();
}
begin = kvs.readThrough.present() ? kvs.readThrough.get() : keyAfter(kvs.back().key);
begin = kvs.getReadThrough();
break;
} catch (Error& e) {

View File

@ -4764,11 +4764,17 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
if (readThrough) {
output.arena().dependsOn(shard.arena());
output.readThrough = reverse ? shard.begin : shard.end;
// As modifiedSelectors is true, more is also true. Then set readThrough to the shard boundary.
ASSERT(modifiedSelectors);
output.more = true;
output.setReadThrough(reverse ? shard.begin : shard.end);
}
getRangeFinished(
trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
if (!output.more) {
ASSERT(!output.readThrough.present());
}
return output;
}
@ -4776,14 +4782,17 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
output.append(output.arena(), rep.data.begin(), rep.data.size());
if (finished) {
output.more = modifiedSelectors || limits.isReached() || rep.more;
if (readThrough) {
output.arena().dependsOn(shard.arena());
output.readThrough = reverse ? shard.begin : shard.end;
output.setReadThrough(reverse ? shard.begin : shard.end);
}
output.more = modifiedSelectors || limits.isReached() || rep.more;
getRangeFinished(
trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
if (!output.more) {
ASSERT(!output.readThrough.present());
}
return output;
}
@ -5237,7 +5246,10 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
output.readThroughEnd = true;
}
output.arena().dependsOn(keys.arena());
output.readThrough = reverse ? keys.begin : keys.end;
// for getRangeStreamFragment, one fragment end doesn't mean it's the end of getRange
// so set 'more' to true
output.more = true;
output.setReadThrough(reverse ? keys.begin : keys.end);
results->send(std::move(output));
results->finish();
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
@ -5251,7 +5263,9 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
++shard;
}
output.arena().dependsOn(range.arena());
output.readThrough = reverse ? range.begin : range.end;
// if it's not the last shard, set more to true and readThrough to the shard boundary
output.more = true;
output.setReadThrough(reverse ? range.begin : range.end);
results->send(std::move(output));
break;
}

View File

@ -755,11 +755,62 @@ struct GetRangeLimits {
struct RangeResultRef : VectorRef<KeyValueRef> {
constexpr static FileIdentifier file_identifier = 3985192;
bool more; // True if (but not necessarily only if) values remain in the *key* range requested (possibly beyond the
// limits requested) False implies that no such values remain
Optional<KeyRef> readThrough; // Only present when 'more' is true. When present, this value represent the end (or
// beginning if reverse) of the range which was read to produce these results. This is
// guaranteed to be less than the requested range.
// True if the range may have more keys in it (possibly beyond the specified limits).
// 'more' can be true even if there are no keys left in the range, e.g. if a shard boundary is hit, it may or may
// not have more keys left, but 'more' will be set to true in that case.
// Additionally, 'getRangeStream()' always sets 'more' to true and uses the 'end_of_stream' error to indicate that a
// range is exhausted.
// If 'more' is false, the range is guaranteed to have been exhausted.
bool more;
// Only present when 'more' is true, for example, when the read reaches the shard boundary, 'readThrough' is set to
// the shard boundary and the client's next range read should start with the 'readThrough'.
// But 'more' is true does not necessarily guarantee 'readThrough' is present, for example, when the read reaches
// size limit, 'readThrough' might not be set, the next read should just start from the keyAfter of the current
// query result's last key.
// In both cases, please use the getter function 'getReadThrough()' instead, which represents the end (or beginning
// if reverse) of the range which was read.
Optional<KeyRef> readThrough;
// return the value represents the end of the range which was read. If 'reverse' is true, returns the last key, as
// it should be used as the new "end" of the next query and the end key should be non-inclusive.
Key getReadThrough(bool reverse = false) const {
ASSERT(more);
if (readThrough.present()) {
return readThrough.get();
}
ASSERT(size() > 0);
return reverse ? back().key : keyAfter(back().key);
}
// Helper function to get the next range scan's BeginKeySelector, use it when the range read is non-reverse,
// otherwise, please use nextEndKeySelector().
KeySelectorRef nextBeginKeySelector() const {
ASSERT(more);
if (readThrough.present()) {
return firstGreaterOrEqual(readThrough.get());
}
ASSERT(size() > 0);
return firstGreaterThan(back().key);
}
// Helper function to get the next range scan's EndKeySelector, use it when the range read is reverse.
KeySelectorRef nextEndKeySelector() const {
ASSERT(more);
if (readThrough.present()) {
return firstGreaterOrEqual(readThrough.get());
}
ASSERT(size() > 0);
return firstGreaterOrEqual(back().key);
}
void setReadThrough(KeyRef key) {
ASSERT(more);
ASSERT(!readThrough.present());
readThrough = key;
}
bool readToBegin;
bool readThroughEnd;
@ -880,6 +931,12 @@ struct MappedRangeResultRef : VectorRef<MappedKeyValueRef> {
bool readToBegin;
bool readThroughEnd;
void setReadThrough(KeyRef key) {
ASSERT(more);
ASSERT(!readThrough.present());
readThrough = key;
}
MappedRangeResultRef() : more(false), readToBegin(false), readThroughEnd(false) {}
MappedRangeResultRef(Arena& p, const MappedRangeResultRef& toCopy)
: VectorRef<MappedKeyValueRef>(p, toCopy), more(toCopy.more),

View File

@ -402,11 +402,7 @@ private:
if (!result.more) {
break;
}
if (result.readThrough.present()) {
begin = firstGreaterOrEqual(result.readThrough.get());
} else {
begin = firstGreaterThan(result.back().key);
}
begin = result.nextBeginKeySelector();
}
}
@ -501,11 +497,7 @@ public:
if (!rows.more) {
break;
}
if (rows.readThrough.present()) {
begin = firstGreaterOrEqual(rows.readThrough.get());
} else {
begin = firstGreaterThan(rows.end()[-1].key);
}
begin = rows.nextBeginKeySelector();
}
// check each granule range
@ -783,11 +775,7 @@ private:
if (!results.more) {
break;
}
if (results.readThrough.present()) {
begin = firstGreaterOrEqual(results.readThrough.get());
} else {
begin = firstGreaterThan(results.end()[-1].key);
}
begin = results.nextBeginKeySelector();
}
return files;
}

View File

@ -59,11 +59,7 @@ ACTOR Future<BlobRestoreRangeState> BlobRestoreController::getRangeState(Referen
if (!ranges.more) {
break;
}
if (ranges.readThrough.present()) {
begin = firstGreaterOrEqual(ranges.readThrough.get());
} else {
begin = firstGreaterThan(ranges.end()[-1].key);
}
begin = ranges.nextBeginKeySelector();
}
return std::make_pair(KeyRangeRef(), BlobRestoreState(BlobRestorePhase::DONE));
} catch (Error& e) {
@ -165,11 +161,7 @@ ACTOR Future<Optional<BlobRestoreArg>> BlobRestoreController::getArgument(Refere
if (!ranges.more) {
break;
}
if (ranges.readThrough.present()) {
begin = firstGreaterOrEqual(ranges.readThrough.get());
} else {
begin = firstGreaterThan(ranges.back().key);
}
begin = ranges.nextBeginKeySelector();
}
return result;
} catch (Error& e) {

View File

@ -276,10 +276,6 @@ public:
}
result.more = rowLimit == 0 || byteLimit <= 0;
if (result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
return result;
}

View File

@ -1779,9 +1779,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
result.more =
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
if (result.more) {
result.readThrough = result[result.size() - 1].key;
}
a.result.send(result);
// Temporarily not sampling to understand the pattern of readRange results.
if (metricPromiseStream) {

View File

@ -1261,10 +1261,6 @@ struct RawCursor {
}
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if (result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
// AccumulatedBytes includes KeyValueRef overhead so subtract it
kvBytesRead += (accumulatedBytes - result.size() * sizeof(KeyValueRef));
return result;

View File

@ -2942,9 +2942,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
result.more =
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
if (result.more) {
result.readThrough = result[result.size() - 1].key;
}
a.result.send(result);
if (a.getHistograms) {
double currTime = timer_monotonic();

View File

@ -1241,8 +1241,9 @@ ACTOR Future<RangeResult> tryFetchRange(Database cx,
if (e.code() == error_code_transaction_too_old)
*isTooOld = true;
output.more = true;
if (begin.isFirstGreaterOrEqual())
output.readThrough = begin.getKey();
if (begin.isFirstGreaterOrEqual()) {
output.setReadThrough(begin.getKey());
}
return output;
}
throw;

View File

@ -8257,10 +8257,6 @@ public:
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if (result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
g_redwoodMetrics.kvSizeReadByGetRange->sample(accumulatedBytes);
return result;
}

View File

@ -6551,9 +6551,6 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, SERVER_KNOBS->FETCH_BLOCK_BYTES);
limits.minRows = 0;
state RangeResult rep = wait(tr->getRange(begin, end, limits, Snapshot::True));
if (!rep.more) {
rep.readThrough = keys.end;
}
results.send(rep);
if (!rep.more) {
@ -6561,11 +6558,7 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
return Void();
}
if (rep.readThrough.present()) {
begin = firstGreaterOrEqual(rep.readThrough.get());
} else {
begin = firstGreaterThan(rep.end()[-1].key);
}
begin = rep.nextBeginKeySelector();
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
@ -6626,12 +6619,16 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
.detail("Rows", rows.size())
.detail("ChunkRange", chunkRange)
.detail("FetchVersion", fetchVersion);
if (rows.size() == 0) {
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
}
// It should read all the data from that chunk
ASSERT(!rows.more);
if (i == chunks.size() - 1) {
rows.readThrough = KeyRef(rows.arena(), keys.end);
// set more to false when it's the last chunk
rows.more = false;
} else {
rows.more = true;
// no need to set readThrough, as the next read key range has to be the next chunkRange
}
ASSERT(!rows.readThrough.present());
results.send(rows);
}
results.sendError(end_of_stream()); // end of range read
@ -7571,6 +7568,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state PromiseStream<RangeResult> results;
state Future<Void> hold;
state KeyRef rangeEnd;
if (isFullRestore) {
state BlobRestoreRangeState rangeStatus = wait(BlobRestoreController::getRangeState(restoreController));
// Read from blob only when it's copying data for full restore. Otherwise it may cause data corruptions
@ -7579,11 +7577,14 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
rangeStatus.second.phase == BlobRestorePhase::ERROR) {
Version version = wait(BlobRestoreController::getTargetVersion(restoreController, fetchVersion));
hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, version, data->blobConn);
rangeEnd = rangeStatus.first.end;
} else {
hold = tryGetRange(results, &tr, keys);
rangeEnd = keys.end;
}
} else {
hold = tryGetRange(results, &tr, keys);
rangeEnd = keys.end;
}
state Key blockBegin = keys.begin;
@ -7631,9 +7632,12 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
for (; kvItr != this_block.end(); ++kvItr) {
data->byteSampleApplySet(*kvItr, invalidVersion);
}
ASSERT(this_block.readThrough.present() || this_block.size());
blockBegin = this_block.readThrough.present() ? this_block.readThrough.get()
: keyAfter(this_block.end()[-1].key);
if (this_block.more) {
blockBegin = this_block.getReadThrough();
} else {
ASSERT(!this_block.readThrough.present());
blockBegin = rangeEnd;
}
this_block = RangeResult();
data->fetchKeysBytesBudget -= expectedBlockSize;
@ -9725,8 +9729,7 @@ ACTOR Future<bool> createSstFileForCheckpointShardBytesSample(StorageServer* dat
numSampledKeys++;
}
if (readResult.more) {
ASSERT(readResult.readThrough.present());
readBegin = keyAfter(readResult.readThrough.get());
readBegin = readResult.getReadThrough();
ASSERT(readBegin <= readEnd);
} else {
break; // finish for current metaDataRangesIter

View File

@ -158,13 +158,12 @@ struct BlobRestoreWorkload : TestWorkload {
state Standalone<VectorRef<KeyValueRef>> data;
state Transaction tr(cx);
state KeySelectorRef end = firstGreaterOrEqual(normalKeys.end);
state Arena arena;
loop {
try {
GetRangeLimits limits(self->readBatchSize_ - data.size());
limits.minRows = 0;
RangeResult result = wait(tr.getRange(begin, end, limits, Snapshot::True));
state RangeResult result = wait(tr.getRange(begin, end, limits, Snapshot::True));
for (auto& row : result) {
data.push_back_deep(data.arena(), KeyValueRef(row.key, row.value));
}
@ -174,11 +173,7 @@ struct BlobRestoreWorkload : TestWorkload {
if (data.size() == self->readBatchSize_) {
break;
}
if (result.readThrough.present()) {
begin = firstGreaterOrEqual(KeyRef(arena, result.readThrough.get()));
} else {
begin = firstGreaterThan(KeyRef(arena, result.back().key));
}
begin = result.nextBeginKeySelector();
} catch (Error& e) {
wait(tr.onError(e));
}

View File

@ -41,9 +41,7 @@ ACTOR Future<Void> streamUsingGetRange(PromiseStream<RangeResult> results, Trans
GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, 1e6);
limits.minRows = 0;
state RangeResult rep = wait(tr->getRange(begin, end, limits, Snapshot::True));
if (!rep.more) {
rep.readThrough = keys.end;
}
results.send(rep);
if (!rep.more) {
@ -51,11 +49,7 @@ ACTOR Future<Void> streamUsingGetRange(PromiseStream<RangeResult> results, Trans
return Void();
}
if (rep.readThrough.present()) {
begin = firstGreaterOrEqual(rep.readThrough.get());
} else {
begin = firstGreaterThan(rep.end()[-1].key);
}
begin = rep.nextBeginKeySelector();
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {