Fix RangeResult.readThrough misuse

Fix `RangeResult.readThrough` misuses:
1. KeyValueStores do not need to set readThrough, as it will not be
   serialized and return. Also setting it to the last key of the result
   is not right, it should at least be the keyAfter of the last key;
2. Fix NativeAPI doesn't set `RangeResult.more` in a few places;
3. Avoid `tryGetRange()` setting `readThrough` when `more` is false,
   which was a workaround for the above item 2;
4. `tryGetRangeFromBlob()` doesn't set `more` but set `readThrough` to
   indicate it is end, which was following the same above workaround I
   think. Fixed that.
5. `getRangeStream()` is going to set `more` to true and then let the
   `readThrough` be it's boundary.

Also added readThrough getter/setter function to validate it's usage.
This commit is contained in:
Jay Zhuang 2023-04-11 11:28:08 -07:00 committed by Jay Zhuang
parent 862c7e2ee8
commit b7da2ed16c
15 changed files with 112 additions and 86 deletions

View File

@ -161,6 +161,22 @@ struct RangeResultRef : VectorRef<KeyValueRef> {
// False implies that no such values remain
Optional<KeyRef> readThrough; // Only present when 'more' is true. When present, this value represent the end (or
// beginning if reverse) of the range
KeyRef getReadThrough(Arena& arena) const {
ASSERT(more);
if (readThrough.present()) {
return readThrough.get();
}
ASSERT(size() > 0);
return keyAfter(back().key, arena);
}
void setReadThrough(KeyRef key) {
ASSERT(more);
ASSERT(!readThrough.present());
readThrough = key;
}
// which was read to produce these results. This is guaranteed to be less than the requested range.
bool readToBegin;
bool readThroughEnd;

View File

@ -103,7 +103,7 @@ ACTOR Future<Void> PipelinedReader::getNext_impl(PipelinedReader* self, Database
return Void();
}
begin = kvs.readThrough.present() ? kvs.readThrough.get() : keyAfter(kvs.back().key);
begin = kvs.getReadThrough(begin.arena());
break;
} catch (Error& e) {

View File

@ -4690,11 +4690,17 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
if (readThrough) {
output.arena().dependsOn(shard.arena());
output.readThrough = reverse ? shard.begin : shard.end;
// As modifiedSelectors is true, more is also true. Then set readThrough to the shard boundary.
ASSERT(modifiedSelectors);
output.more = true;
output.setReadThrough(reverse ? shard.begin : shard.end);
}
getRangeFinished(
trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
if (!output.more) {
ASSERT(!output.readThrough.present());
}
return output;
}
@ -4702,14 +4708,17 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
output.append(output.arena(), rep.data.begin(), rep.data.size());
if (finished) {
output.more = modifiedSelectors || limits.isReached() || rep.more;
if (readThrough) {
output.arena().dependsOn(shard.arena());
output.readThrough = reverse ? shard.begin : shard.end;
output.setReadThrough(reverse ? shard.begin : shard.end);
}
output.more = modifiedSelectors || limits.isReached() || rep.more;
getRangeFinished(
trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
if (!output.more) {
ASSERT(!output.readThrough.present());
}
return output;
}
@ -5163,7 +5172,10 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
output.readThroughEnd = true;
}
output.arena().dependsOn(keys.arena());
output.readThrough = reverse ? keys.begin : keys.end;
// for getRangeStreamFragment, one fragment end doesn't mean it's the end of getRange
// so set 'more' to true
output.more = true;
output.setReadThrough(reverse ? keys.begin : keys.end);
results->send(std::move(output));
results->finish();
if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) {
@ -5177,7 +5189,9 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
++shard;
}
output.arena().dependsOn(range.arena());
output.readThrough = reverse ? range.begin : range.end;
// if it's not the last shard, set more to true and readThrough to the shard boundary
output.more = true;
output.setReadThrough(reverse ? range.begin : range.end);
results->send(std::move(output));
break;
}

View File

@ -750,11 +750,38 @@ struct GetRangeLimits {
struct RangeResultRef : VectorRef<KeyValueRef> {
constexpr static FileIdentifier file_identifier = 3985192;
bool more; // True if (but not necessarily only if) values remain in the *key* range requested (possibly beyond the
// limits requested) False implies that no such values remain
Optional<KeyRef> readThrough; // Only present when 'more' is true. When present, this value represent the end (or
// beginning if reverse) of the range which was read to produce these results. This is
// guaranteed to be less than the requested range.
bool more; // True if values remain in the *key* range requested (possibly beyond the
// limits requested), but not necessarily only if, for example, in getRangeStream(), 'more' is always set
// to true, as each stream fragment doesn't know if it's the last one. Instead, it uses
// `error_code_end_of_stream` for the end.
// False implies that no such values remain
// Only present when 'more' is true, for example, when the read reaches the shard boundary, 'readThrough' is set to
// the shard boundary and the client's next range read should start with the 'readThrough'.
// But 'more' is true does not necessarily guarantee 'readThrough' is present, for example, when the read reaches
// size limit, 'readThrough' might not be set, the next read should just start from the keyAfter of the current
// query result's last key.
// In both cases, please use the getter function 'getReadThrough()' instead, which represents the end (or beginning
// if reverse) of the range which was read.
Optional<KeyRef> readThrough;
// return the value represent the end (or beginning if reverse) of the range which was read
KeyRef getReadThrough(Arena& arena) const {
ASSERT(more);
if (readThrough.present()) {
return readThrough.get();
}
ASSERT(size() > 0);
// TODO: is this still right if reverse
return keyAfter(back().key, arena);
}
void setReadThrough(KeyRef key) {
ASSERT(more);
ASSERT(!readThrough.present());
readThrough = key;
}
bool readToBegin;
bool readThroughEnd;
@ -875,6 +902,12 @@ struct MappedRangeResultRef : VectorRef<MappedKeyValueRef> {
bool readToBegin;
bool readThroughEnd;
void setReadThrough(KeyRef key) {
ASSERT(more);
ASSERT(!readThrough.present());
readThrough = key;
}
MappedRangeResultRef() : more(false), readToBegin(false), readThroughEnd(false) {}
MappedRangeResultRef(Arena& p, const MappedRangeResultRef& toCopy)
: VectorRef<MappedKeyValueRef>(p, toCopy), more(toCopy.more),

View File

@ -394,6 +394,7 @@ private:
limits.minRows = 0;
state KeySelectorRef begin = firstGreaterOrEqual(range.begin);
state KeySelectorRef end = firstGreaterOrEqual(range.end);
state Arena beginKeyArena;
loop {
state RangeResult result = wait(tr.getRange(begin, end, limits, Snapshot::True));
for (auto& row : result) {
@ -402,11 +403,7 @@ private:
if (!result.more) {
break;
}
if (result.readThrough.present()) {
begin = firstGreaterOrEqual(result.readThrough.get());
} else {
begin = firstGreaterThan(result.back().key);
}
begin = firstGreaterOrEqual(result.getReadThrough(beginKeyArena));
}
}
@ -491,6 +488,7 @@ public:
// Read all granules
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
limits.minRows = 0;
state Arena arena;
state KeySelectorRef begin = firstGreaterOrEqual(blobGranuleMappingKeys.begin);
state KeySelectorRef end = firstGreaterOrEqual(blobGranuleMappingKeys.end);
loop {
@ -501,11 +499,7 @@ public:
if (!rows.more) {
break;
}
if (rows.readThrough.present()) {
begin = firstGreaterOrEqual(rows.readThrough.get());
} else {
begin = firstGreaterThan(rows.end()[-1].key);
}
begin = firstGreaterOrEqual(rows.getReadThrough(arena));
}
// check each granule range
@ -761,6 +755,7 @@ private:
limits.minRows = 0;
state KeySelectorRef begin = firstGreaterOrEqual(fileKeyRange.begin);
state KeySelectorRef end = firstGreaterOrEqual(fileKeyRange.end);
state Arena beginKeyArena;
loop {
RangeResult results = wait(tr->getRange(begin, end, limits, Snapshot::True));
for (auto& row : results) {
@ -783,11 +778,7 @@ private:
if (!results.more) {
break;
}
if (results.readThrough.present()) {
begin = firstGreaterOrEqual(results.readThrough.get());
} else {
begin = firstGreaterThan(results.end()[-1].key);
}
begin = firstGreaterOrEqual(results.getReadThrough(beginKeyArena));
}
return files;
}

View File

@ -45,6 +45,7 @@ ACTOR Future<BlobRestoreRangeState> BlobRestoreController::getRangeState(Referen
limits.minRows = 0;
state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreCommandKeys.begin);
state KeySelectorRef end = firstGreaterOrEqual(blobRestoreCommandKeys.end);
state Arena beginKeyArena;
loop {
RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True));
for (auto& r : ranges) {
@ -59,11 +60,7 @@ ACTOR Future<BlobRestoreRangeState> BlobRestoreController::getRangeState(Referen
if (!ranges.more) {
break;
}
if (ranges.readThrough.present()) {
begin = firstGreaterOrEqual(ranges.readThrough.get());
} else {
begin = firstGreaterThan(ranges.end()[-1].key);
}
begin = firstGreaterOrEqual(ranges.getReadThrough(beginKeyArena));
}
return std::make_pair(KeyRangeRef(), BlobRestoreState(BlobRestorePhase::DONE));
} catch (Error& e) {
@ -152,6 +149,7 @@ ACTOR Future<Optional<BlobRestoreArg>> BlobRestoreController::getArgument(Refere
limits.minRows = 0;
state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreArgKeys.begin);
state KeySelectorRef end = firstGreaterOrEqual(blobRestoreArgKeys.end);
state Arena beginKeyArena;
loop {
RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True));
for (auto& r : ranges) {
@ -165,11 +163,7 @@ ACTOR Future<Optional<BlobRestoreArg>> BlobRestoreController::getArgument(Refere
if (!ranges.more) {
break;
}
if (ranges.readThrough.present()) {
begin = firstGreaterOrEqual(ranges.readThrough.get());
} else {
begin = firstGreaterThan(ranges.back().key);
}
begin = firstGreaterOrEqual(ranges.getReadThrough(beginKeyArena));
}
return result;
} catch (Error& e) {

View File

@ -276,10 +276,6 @@ public:
}
result.more = rowLimit == 0 || byteLimit <= 0;
if (result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
return result;
}

View File

@ -1779,9 +1779,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
result.more =
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
if (result.more) {
result.readThrough = result[result.size() - 1].key;
}
a.result.send(result);
// Temporarily not sampling to understand the pattern of readRange results.
if (metricPromiseStream) {

View File

@ -1261,10 +1261,6 @@ struct RawCursor {
}
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if (result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
// AccumulatedBytes includes KeyValueRef overhead so subtract it
kvBytesRead += (accumulatedBytes - result.size() * sizeof(KeyValueRef));
return result;

View File

@ -2788,9 +2788,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
result.more =
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
if (result.more) {
result.readThrough = result[result.size() - 1].key;
}
a.result.send(result);
if (a.getHistograms) {
double currTime = timer_monotonic();

View File

@ -1241,8 +1241,9 @@ ACTOR Future<RangeResult> tryFetchRange(Database cx,
if (e.code() == error_code_transaction_too_old)
*isTooOld = true;
output.more = true;
if (begin.isFirstGreaterOrEqual())
output.readThrough = begin.getKey();
if (begin.isFirstGreaterOrEqual()) {
output.setReadThrough(begin.getKey());
}
return output;
}
throw;

View File

@ -8257,10 +8257,6 @@ public:
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if (result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
g_redwoodMetrics.kvSizeReadByGetRange->sample(accumulatedBytes);
return result;
}

View File

@ -6547,15 +6547,13 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
state KeySelectorRef begin = firstGreaterOrEqual(keys.begin);
state KeySelectorRef end = firstGreaterOrEqual(keys.end);
state Arena beginKeyArena;
try {
loop {
GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, SERVER_KNOBS->FETCH_BLOCK_BYTES);
limits.minRows = 0;
state RangeResult rep = wait(tr->getRange(begin, end, limits, Snapshot::True));
if (!rep.more) {
rep.readThrough = keys.end;
}
results.send(rep);
if (!rep.more) {
@ -6563,11 +6561,7 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
return Void();
}
if (rep.readThrough.present()) {
begin = firstGreaterOrEqual(rep.readThrough.get());
} else {
begin = firstGreaterThan(rep.end()[-1].key);
}
begin = firstGreaterOrEqual(rep.getReadThrough(beginKeyArena));
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
@ -6628,12 +6622,16 @@ ACTOR Future<Void> tryGetRangeFromBlob(PromiseStream<RangeResult> results,
.detail("Rows", rows.size())
.detail("ChunkRange", chunkRange)
.detail("FetchVersion", fetchVersion);
if (rows.size() == 0) {
rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end));
}
// It should read all the data from that chunk
ASSERT(!rows.more);
if (i == chunks.size() - 1) {
rows.readThrough = KeyRef(rows.arena(), keys.end);
// set more to false when it's the last chunk
rows.more = false;
} else {
rows.more = true;
// no need to set readThrough, as the next read key range has to be the next chunkRange
}
ASSERT(!rows.readThrough.present());
results.send(rows);
}
results.sendError(end_of_stream()); // end of range read
@ -7555,6 +7553,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state PromiseStream<RangeResult> results;
state Future<Void> hold;
state KeyRef rangeEnd;
if (isFullRestore) {
state BlobRestoreRangeState rangeStatus = wait(BlobRestoreController::getRangeState(restoreController));
// Read from blob only when it's copying data for full restore. Otherwise it may cause data corruptions
@ -7563,11 +7562,14 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
rangeStatus.second.phase == BlobRestorePhase::ERROR) {
Version version = wait(BlobRestoreController::getTargetVersion(restoreController, fetchVersion));
hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, version, data->blobConn);
rangeEnd = rangeStatus.first.end;
} else {
hold = tryGetRange(results, &tr, keys);
rangeEnd = keys.end;
}
} else {
hold = tryGetRange(results, &tr, keys);
rangeEnd = keys.end;
}
state Key blockBegin = keys.begin;
@ -7615,9 +7617,12 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
for (; kvItr != this_block.end(); ++kvItr) {
data->byteSampleApplySet(*kvItr, invalidVersion);
}
ASSERT(this_block.readThrough.present() || this_block.size());
blockBegin = this_block.readThrough.present() ? this_block.readThrough.get()
: keyAfter(this_block.end()[-1].key);
if (this_block.more) {
blockBegin = this_block.getReadThrough(blockBegin.arena());
} else {
ASSERT(!this_block.readThrough.present());
blockBegin = rangeEnd;
}
this_block = RangeResult();
data->fetchKeysBytesBudget -= expectedBlockSize;
@ -9713,8 +9718,7 @@ ACTOR Future<bool> createSstFileForCheckpointShardBytesSample(StorageServer* dat
numSampledKeys++;
}
if (readResult.more) {
ASSERT(readResult.readThrough.present());
readBegin = keyAfter(readResult.readThrough.get());
readBegin = readResult.getReadThrough(readBegin.arena());
ASSERT(readBegin <= readEnd);
} else {
break; // finish for current metaDataRangesIter

View File

@ -174,11 +174,7 @@ struct BlobRestoreWorkload : TestWorkload {
if (data.size() == self->readBatchSize_) {
break;
}
if (result.readThrough.present()) {
begin = firstGreaterOrEqual(KeyRef(arena, result.readThrough.get()));
} else {
begin = firstGreaterThan(KeyRef(arena, result.back().key));
}
begin = firstGreaterOrEqual(result.getReadThrough(arena));
} catch (Error& e) {
wait(tr.onError(e));
}

View File

@ -35,15 +35,14 @@
ACTOR Future<Void> streamUsingGetRange(PromiseStream<RangeResult> results, Transaction* tr, KeyRange keys) {
state KeySelectorRef begin = firstGreaterOrEqual(keys.begin);
state KeySelectorRef end = firstGreaterOrEqual(keys.end);
state Arena beginKeyArena;
try {
loop {
GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, 1e6);
limits.minRows = 0;
state RangeResult rep = wait(tr->getRange(begin, end, limits, Snapshot::True));
if (!rep.more) {
rep.readThrough = keys.end;
}
results.send(rep);
if (!rep.more) {
@ -51,11 +50,7 @@ ACTOR Future<Void> streamUsingGetRange(PromiseStream<RangeResult> results, Trans
return Void();
}
if (rep.readThrough.present()) {
begin = firstGreaterOrEqual(rep.readThrough.get());
} else {
begin = firstGreaterThan(rep.end()[-1].key);
}
begin = firstGreaterOrEqual(rep.getReadThrough(beginKeyArena));
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {