From b7da2ed16cffb29c3ca61748e3fb32ce68146273 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Tue, 11 Apr 2023 11:28:08 -0700 Subject: [PATCH] Fix RangeResult.readThrough misuse Fix `RangeResult.readThrough` misuses: 1. KeyValueStores do not need to set readThrough, as it will not be serialized and return. Also setting it to the last key of the result is not right, it should at least be the keyAfter of the last key; 2. Fix NativeAPI doesn't set `RangeResult.more` in a few places; 3. Avoid `tryGetRange()` setting `readThrough` when `more` is false, which was a workaround for the above item 2; 4. `tryGetRangeFromBlob()` doesn't set `more` but set `readThrough` to indicate it is end, which was following the same above workaround I think. Fixed that. 5. `getRangeStream()` is going to set `more` to true and then let the `readThrough` be it's boundary. Also added readThrough getter/setter function to validate it's usage. --- bindings/flow/FDBLoanerTypes.h | 16 +++++++ fdbclient/MutationLogReader.actor.cpp | 2 +- fdbclient/NativeAPI.actor.cpp | 24 ++++++++--- fdbclient/include/fdbclient/FDBTypes.h | 43 ++++++++++++++++--- fdbserver/BlobManifest.actor.cpp | 21 +++------ fdbserver/BlobRestoreController.actor.cpp | 14 ++---- fdbserver/KeyValueStoreMemory.actor.cpp | 4 -- fdbserver/KeyValueStoreRocksDB.actor.cpp | 3 -- fdbserver/KeyValueStoreSQLite.actor.cpp | 4 -- .../KeyValueStoreShardedRocksDB.actor.cpp | 3 -- fdbserver/StorageCache.actor.cpp | 5 ++- fdbserver/VersionedBTree.actor.cpp | 4 -- fdbserver/storageserver.actor.cpp | 38 ++++++++-------- .../workloads/BlobRestoreWorkload.actor.cpp | 6 +-- .../workloads/StreamingRangeRead.actor.cpp | 11 ++--- 15 files changed, 112 insertions(+), 86 deletions(-) diff --git a/bindings/flow/FDBLoanerTypes.h b/bindings/flow/FDBLoanerTypes.h index ddd9a577b5..9302a7d7ce 100644 --- a/bindings/flow/FDBLoanerTypes.h +++ b/bindings/flow/FDBLoanerTypes.h @@ -161,6 +161,22 @@ struct RangeResultRef : VectorRef { // False implies that no such values remain Optional readThrough; // Only present when 'more' is true. When present, this value represent the end (or // beginning if reverse) of the range + + KeyRef getReadThrough(Arena& arena) const { + ASSERT(more); + if (readThrough.present()) { + return readThrough.get(); + } + ASSERT(size() > 0); + return keyAfter(back().key, arena); + } + + void setReadThrough(KeyRef key) { + ASSERT(more); + ASSERT(!readThrough.present()); + readThrough = key; + } + // which was read to produce these results. This is guaranteed to be less than the requested range. bool readToBegin; bool readThroughEnd; diff --git a/fdbclient/MutationLogReader.actor.cpp b/fdbclient/MutationLogReader.actor.cpp index d6e3adc8dd..4f9d61eb52 100644 --- a/fdbclient/MutationLogReader.actor.cpp +++ b/fdbclient/MutationLogReader.actor.cpp @@ -103,7 +103,7 @@ ACTOR Future PipelinedReader::getNext_impl(PipelinedReader* self, Database return Void(); } - begin = kvs.readThrough.present() ? kvs.readThrough.get() : keyAfter(kvs.back().key); + begin = kvs.getReadThrough(begin.arena()); break; } catch (Error& e) { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 7f56361d0d..c62a5faf23 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -4690,11 +4690,17 @@ Future getRange(Reference trState, if (readThrough) { output.arena().dependsOn(shard.arena()); - output.readThrough = reverse ? shard.begin : shard.end; + // As modifiedSelectors is true, more is also true. Then set readThrough to the shard boundary. + ASSERT(modifiedSelectors); + output.more = true; + output.setReadThrough(reverse ? shard.begin : shard.end); } getRangeFinished( trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + if (!output.more) { + ASSERT(!output.readThrough.present()); + } return output; } @@ -4702,14 +4708,17 @@ Future getRange(Reference trState, output.append(output.arena(), rep.data.begin(), rep.data.size()); if (finished) { + output.more = modifiedSelectors || limits.isReached() || rep.more; if (readThrough) { output.arena().dependsOn(shard.arena()); - output.readThrough = reverse ? shard.begin : shard.end; + output.setReadThrough(reverse ? shard.begin : shard.end); } - output.more = modifiedSelectors || limits.isReached() || rep.more; getRangeFinished( trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + if (!output.more) { + ASSERT(!output.readThrough.present()); + } return output; } @@ -5163,7 +5172,10 @@ ACTOR Future getRangeStreamFragment(Reference trState, output.readThroughEnd = true; } output.arena().dependsOn(keys.arena()); - output.readThrough = reverse ? keys.begin : keys.end; + // for getRangeStreamFragment, one fragment end doesn't mean it's the end of getRange + // so set 'more' to true + output.more = true; + output.setReadThrough(reverse ? keys.begin : keys.end); results->send(std::move(output)); results->finish(); if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) { @@ -5177,7 +5189,9 @@ ACTOR Future getRangeStreamFragment(Reference trState, ++shard; } output.arena().dependsOn(range.arena()); - output.readThrough = reverse ? range.begin : range.end; + // if it's not the last shard, set more to true and readThrough to the shard boundary + output.more = true; + output.setReadThrough(reverse ? range.begin : range.end); results->send(std::move(output)); break; } diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index 05fb07c99e..d62ca0669b 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -750,11 +750,38 @@ struct GetRangeLimits { struct RangeResultRef : VectorRef { constexpr static FileIdentifier file_identifier = 3985192; - bool more; // True if (but not necessarily only if) values remain in the *key* range requested (possibly beyond the - // limits requested) False implies that no such values remain - Optional readThrough; // Only present when 'more' is true. When present, this value represent the end (or - // beginning if reverse) of the range which was read to produce these results. This is - // guaranteed to be less than the requested range. + bool more; // True if values remain in the *key* range requested (possibly beyond the + // limits requested), but not necessarily only if, for example, in getRangeStream(), 'more' is always set + // to true, as each stream fragment doesn't know if it's the last one. Instead, it uses + // `error_code_end_of_stream` for the end. + // False implies that no such values remain + + // Only present when 'more' is true, for example, when the read reaches the shard boundary, 'readThrough' is set to + // the shard boundary and the client's next range read should start with the 'readThrough'. + // But 'more' is true does not necessarily guarantee 'readThrough' is present, for example, when the read reaches + // size limit, 'readThrough' might not be set, the next read should just start from the keyAfter of the current + // query result's last key. + // In both cases, please use the getter function 'getReadThrough()' instead, which represents the end (or beginning + // if reverse) of the range which was read. + Optional readThrough; + + // return the value represent the end (or beginning if reverse) of the range which was read + KeyRef getReadThrough(Arena& arena) const { + ASSERT(more); + if (readThrough.present()) { + return readThrough.get(); + } + ASSERT(size() > 0); + // TODO: is this still right if reverse + return keyAfter(back().key, arena); + } + + void setReadThrough(KeyRef key) { + ASSERT(more); + ASSERT(!readThrough.present()); + readThrough = key; + } + bool readToBegin; bool readThroughEnd; @@ -875,6 +902,12 @@ struct MappedRangeResultRef : VectorRef { bool readToBegin; bool readThroughEnd; + void setReadThrough(KeyRef key) { + ASSERT(more); + ASSERT(!readThrough.present()); + readThrough = key; + } + MappedRangeResultRef() : more(false), readToBegin(false), readThroughEnd(false) {} MappedRangeResultRef(Arena& p, const MappedRangeResultRef& toCopy) : VectorRef(p, toCopy), more(toCopy.more), diff --git a/fdbserver/BlobManifest.actor.cpp b/fdbserver/BlobManifest.actor.cpp index 854fa3a861..02638020af 100644 --- a/fdbserver/BlobManifest.actor.cpp +++ b/fdbserver/BlobManifest.actor.cpp @@ -394,6 +394,7 @@ private: limits.minRows = 0; state KeySelectorRef begin = firstGreaterOrEqual(range.begin); state KeySelectorRef end = firstGreaterOrEqual(range.end); + state Arena beginKeyArena; loop { state RangeResult result = wait(tr.getRange(begin, end, limits, Snapshot::True)); for (auto& row : result) { @@ -402,11 +403,7 @@ private: if (!result.more) { break; } - if (result.readThrough.present()) { - begin = firstGreaterOrEqual(result.readThrough.get()); - } else { - begin = firstGreaterThan(result.back().key); - } + begin = firstGreaterOrEqual(result.getReadThrough(beginKeyArena)); } } @@ -491,6 +488,7 @@ public: // Read all granules state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS); limits.minRows = 0; + state Arena arena; state KeySelectorRef begin = firstGreaterOrEqual(blobGranuleMappingKeys.begin); state KeySelectorRef end = firstGreaterOrEqual(blobGranuleMappingKeys.end); loop { @@ -501,11 +499,7 @@ public: if (!rows.more) { break; } - if (rows.readThrough.present()) { - begin = firstGreaterOrEqual(rows.readThrough.get()); - } else { - begin = firstGreaterThan(rows.end()[-1].key); - } + begin = firstGreaterOrEqual(rows.getReadThrough(arena)); } // check each granule range @@ -761,6 +755,7 @@ private: limits.minRows = 0; state KeySelectorRef begin = firstGreaterOrEqual(fileKeyRange.begin); state KeySelectorRef end = firstGreaterOrEqual(fileKeyRange.end); + state Arena beginKeyArena; loop { RangeResult results = wait(tr->getRange(begin, end, limits, Snapshot::True)); for (auto& row : results) { @@ -783,11 +778,7 @@ private: if (!results.more) { break; } - if (results.readThrough.present()) { - begin = firstGreaterOrEqual(results.readThrough.get()); - } else { - begin = firstGreaterThan(results.end()[-1].key); - } + begin = firstGreaterOrEqual(results.getReadThrough(beginKeyArena)); } return files; } diff --git a/fdbserver/BlobRestoreController.actor.cpp b/fdbserver/BlobRestoreController.actor.cpp index 2ab1acfc34..9d129be1b4 100644 --- a/fdbserver/BlobRestoreController.actor.cpp +++ b/fdbserver/BlobRestoreController.actor.cpp @@ -45,6 +45,7 @@ ACTOR Future BlobRestoreController::getRangeState(Referen limits.minRows = 0; state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreCommandKeys.begin); state KeySelectorRef end = firstGreaterOrEqual(blobRestoreCommandKeys.end); + state Arena beginKeyArena; loop { RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True)); for (auto& r : ranges) { @@ -59,11 +60,7 @@ ACTOR Future BlobRestoreController::getRangeState(Referen if (!ranges.more) { break; } - if (ranges.readThrough.present()) { - begin = firstGreaterOrEqual(ranges.readThrough.get()); - } else { - begin = firstGreaterThan(ranges.end()[-1].key); - } + begin = firstGreaterOrEqual(ranges.getReadThrough(beginKeyArena)); } return std::make_pair(KeyRangeRef(), BlobRestoreState(BlobRestorePhase::DONE)); } catch (Error& e) { @@ -152,6 +149,7 @@ ACTOR Future> BlobRestoreController::getArgument(Refere limits.minRows = 0; state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreArgKeys.begin); state KeySelectorRef end = firstGreaterOrEqual(blobRestoreArgKeys.end); + state Arena beginKeyArena; loop { RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True)); for (auto& r : ranges) { @@ -165,11 +163,7 @@ ACTOR Future> BlobRestoreController::getArgument(Refere if (!ranges.more) { break; } - if (ranges.readThrough.present()) { - begin = firstGreaterOrEqual(ranges.readThrough.get()); - } else { - begin = firstGreaterThan(ranges.back().key); - } + begin = firstGreaterOrEqual(ranges.getReadThrough(beginKeyArena)); } return result; } catch (Error& e) { diff --git a/fdbserver/KeyValueStoreMemory.actor.cpp b/fdbserver/KeyValueStoreMemory.actor.cpp index 55b9a8f37a..bbcfaa190d 100644 --- a/fdbserver/KeyValueStoreMemory.actor.cpp +++ b/fdbserver/KeyValueStoreMemory.actor.cpp @@ -276,10 +276,6 @@ public: } result.more = rowLimit == 0 || byteLimit <= 0; - if (result.more) { - ASSERT(result.size() > 0); - result.readThrough = result[result.size() - 1].key; - } return result; } diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index b15e24bc79..dc37fb3b76 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -1779,9 +1779,6 @@ struct RocksDBKeyValueStore : IKeyValueStore { } result.more = (result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit); - if (result.more) { - result.readThrough = result[result.size() - 1].key; - } a.result.send(result); // Temporarily not sampling to understand the pattern of readRange results. if (metricPromiseStream) { diff --git a/fdbserver/KeyValueStoreSQLite.actor.cpp b/fdbserver/KeyValueStoreSQLite.actor.cpp index 5117a5b2cc..20dbc1b5df 100644 --- a/fdbserver/KeyValueStoreSQLite.actor.cpp +++ b/fdbserver/KeyValueStoreSQLite.actor.cpp @@ -1261,10 +1261,6 @@ struct RawCursor { } } result.more = rowLimit == 0 || accumulatedBytes >= byteLimit; - if (result.more) { - ASSERT(result.size() > 0); - result.readThrough = result[result.size() - 1].key; - } // AccumulatedBytes includes KeyValueRef overhead so subtract it kvBytesRead += (accumulatedBytes - result.size() * sizeof(KeyValueRef)); return result; diff --git a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp index 5ad33aedad..92194451f8 100644 --- a/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreShardedRocksDB.actor.cpp @@ -2788,9 +2788,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { result.more = (result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit); - if (result.more) { - result.readThrough = result[result.size() - 1].key; - } a.result.send(result); if (a.getHistograms) { double currTime = timer_monotonic(); diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index ede3de1ca6..09394c2e39 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -1241,8 +1241,9 @@ ACTOR Future tryFetchRange(Database cx, if (e.code() == error_code_transaction_too_old) *isTooOld = true; output.more = true; - if (begin.isFirstGreaterOrEqual()) - output.readThrough = begin.getKey(); + if (begin.isFirstGreaterOrEqual()) { + output.setReadThrough(begin.getKey()); + } return output; } throw; diff --git a/fdbserver/VersionedBTree.actor.cpp b/fdbserver/VersionedBTree.actor.cpp index 035d44c7fc..46ab0e4f87 100644 --- a/fdbserver/VersionedBTree.actor.cpp +++ b/fdbserver/VersionedBTree.actor.cpp @@ -8257,10 +8257,6 @@ public: } result.more = rowLimit == 0 || accumulatedBytes >= byteLimit; - if (result.more) { - ASSERT(result.size() > 0); - result.readThrough = result[result.size() - 1].key; - } g_redwoodMetrics.kvSizeReadByGetRange->sample(accumulatedBytes); return result; } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 10dc6d48e2..7d07597b50 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -6547,15 +6547,13 @@ ACTOR Future tryGetRange(PromiseStream results, Transaction* state KeySelectorRef begin = firstGreaterOrEqual(keys.begin); state KeySelectorRef end = firstGreaterOrEqual(keys.end); + state Arena beginKeyArena; try { loop { GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, SERVER_KNOBS->FETCH_BLOCK_BYTES); limits.minRows = 0; state RangeResult rep = wait(tr->getRange(begin, end, limits, Snapshot::True)); - if (!rep.more) { - rep.readThrough = keys.end; - } results.send(rep); if (!rep.more) { @@ -6563,11 +6561,7 @@ ACTOR Future tryGetRange(PromiseStream results, Transaction* return Void(); } - if (rep.readThrough.present()) { - begin = firstGreaterOrEqual(rep.readThrough.get()); - } else { - begin = firstGreaterThan(rep.end()[-1].key); - } + begin = firstGreaterOrEqual(rep.getReadThrough(beginKeyArena)); } } catch (Error& e) { if (e.code() == error_code_actor_cancelled) { @@ -6628,12 +6622,16 @@ ACTOR Future tryGetRangeFromBlob(PromiseStream results, .detail("Rows", rows.size()) .detail("ChunkRange", chunkRange) .detail("FetchVersion", fetchVersion); - if (rows.size() == 0) { - rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end)); - } + // It should read all the data from that chunk + ASSERT(!rows.more); if (i == chunks.size() - 1) { - rows.readThrough = KeyRef(rows.arena(), keys.end); + // set more to false when it's the last chunk + rows.more = false; + } else { + rows.more = true; + // no need to set readThrough, as the next read key range has to be the next chunkRange } + ASSERT(!rows.readThrough.present()); results.send(rows); } results.sendError(end_of_stream()); // end of range read @@ -7555,6 +7553,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { state PromiseStream results; state Future hold; + state KeyRef rangeEnd; if (isFullRestore) { state BlobRestoreRangeState rangeStatus = wait(BlobRestoreController::getRangeState(restoreController)); // Read from blob only when it's copying data for full restore. Otherwise it may cause data corruptions @@ -7563,11 +7562,14 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { rangeStatus.second.phase == BlobRestorePhase::ERROR) { Version version = wait(BlobRestoreController::getTargetVersion(restoreController, fetchVersion)); hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, version, data->blobConn); + rangeEnd = rangeStatus.first.end; } else { hold = tryGetRange(results, &tr, keys); + rangeEnd = keys.end; } } else { hold = tryGetRange(results, &tr, keys); + rangeEnd = keys.end; } state Key blockBegin = keys.begin; @@ -7615,9 +7617,12 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { for (; kvItr != this_block.end(); ++kvItr) { data->byteSampleApplySet(*kvItr, invalidVersion); } - ASSERT(this_block.readThrough.present() || this_block.size()); - blockBegin = this_block.readThrough.present() ? this_block.readThrough.get() - : keyAfter(this_block.end()[-1].key); + if (this_block.more) { + blockBegin = this_block.getReadThrough(blockBegin.arena()); + } else { + ASSERT(!this_block.readThrough.present()); + blockBegin = rangeEnd; + } this_block = RangeResult(); data->fetchKeysBytesBudget -= expectedBlockSize; @@ -9713,8 +9718,7 @@ ACTOR Future createSstFileForCheckpointShardBytesSample(StorageServer* dat numSampledKeys++; } if (readResult.more) { - ASSERT(readResult.readThrough.present()); - readBegin = keyAfter(readResult.readThrough.get()); + readBegin = readResult.getReadThrough(readBegin.arena()); ASSERT(readBegin <= readEnd); } else { break; // finish for current metaDataRangesIter diff --git a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp index bd8abb6cc8..f84ed12b75 100644 --- a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp +++ b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp @@ -174,11 +174,7 @@ struct BlobRestoreWorkload : TestWorkload { if (data.size() == self->readBatchSize_) { break; } - if (result.readThrough.present()) { - begin = firstGreaterOrEqual(KeyRef(arena, result.readThrough.get())); - } else { - begin = firstGreaterThan(KeyRef(arena, result.back().key)); - } + begin = firstGreaterOrEqual(result.getReadThrough(arena)); } catch (Error& e) { wait(tr.onError(e)); } diff --git a/fdbserver/workloads/StreamingRangeRead.actor.cpp b/fdbserver/workloads/StreamingRangeRead.actor.cpp index 290b1d4597..293dd92fe7 100644 --- a/fdbserver/workloads/StreamingRangeRead.actor.cpp +++ b/fdbserver/workloads/StreamingRangeRead.actor.cpp @@ -35,15 +35,14 @@ ACTOR Future streamUsingGetRange(PromiseStream results, Transaction* tr, KeyRange keys) { state KeySelectorRef begin = firstGreaterOrEqual(keys.begin); state KeySelectorRef end = firstGreaterOrEqual(keys.end); + state Arena beginKeyArena; try { loop { GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, 1e6); limits.minRows = 0; state RangeResult rep = wait(tr->getRange(begin, end, limits, Snapshot::True)); - if (!rep.more) { - rep.readThrough = keys.end; - } + results.send(rep); if (!rep.more) { @@ -51,11 +50,7 @@ ACTOR Future streamUsingGetRange(PromiseStream results, Trans return Void(); } - if (rep.readThrough.present()) { - begin = firstGreaterOrEqual(rep.readThrough.get()); - } else { - begin = firstGreaterThan(rep.end()[-1].key); - } + begin = firstGreaterOrEqual(rep.getReadThrough(beginKeyArena)); } } catch (Error& e) { if (e.code() == error_code_actor_cancelled) {