add RangReadOptions which inherit from ReadOptions
This commit is contained in:
parent
bfb56d2532
commit
78f4b4f739
|
@ -1529,16 +1529,34 @@ enum class ReadType {
|
|||
// debugID helps to trace the path of the read
|
||||
struct ReadOptions {
|
||||
ReadType type;
|
||||
bool cacheResult;
|
||||
Optional<Version> consistencyCheckStartVersion;
|
||||
Optional<UID> debugID;
|
||||
bool cacheResult;
|
||||
|
||||
ReadOptions() : type(ReadType::NORMAL), cacheResult(true){};
|
||||
ReadOptions(ReadType type, bool cacheResult) : type(type), cacheResult(cacheResult){};
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, type, cacheResult, consistencyCheckStartVersion, debugID);
|
||||
serializer(ar, type, cacheResult, debugID);
|
||||
}
|
||||
};
|
||||
|
||||
struct RangeReadOptions : ReadOptions {
|
||||
|
||||
Optional<Version> consistencyCheckStartVersion;
|
||||
|
||||
RangeReadOptions() : ReadOptions(){};
|
||||
|
||||
RangeReadOptions& operator=(const ReadOptions& option) {
|
||||
type = option.type;
|
||||
debugID = option.debugID;
|
||||
cacheResult = option.cacheResult;
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, type, cacheResult, debugID, consistencyCheckStartVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -398,7 +398,7 @@ struct GetKeyValuesRequest : TimedRequest {
|
|||
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
|
||||
// to this client, of all storage replicas that
|
||||
// serve the given key
|
||||
ReadOptions options;
|
||||
RangeReadOptions options;
|
||||
|
||||
GetKeyValuesRequest() {}
|
||||
|
||||
|
@ -453,7 +453,7 @@ struct GetMappedKeyValuesRequest : TimedRequest {
|
|||
int matchIndex;
|
||||
Optional<TagSet> tags;
|
||||
ReplyPromise<GetMappedKeyValuesReply> reply;
|
||||
ReadOptions options;
|
||||
RangeReadOptions options;
|
||||
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
|
||||
// to this client, of all storage replicas that
|
||||
// serve the given key range
|
||||
|
@ -518,7 +518,7 @@ struct GetKeyValuesStreamRequest {
|
|||
Version version; // or latestVersion
|
||||
int limit, limitBytes;
|
||||
Optional<TagSet> tags;
|
||||
ReadOptions options;
|
||||
RangeReadOptions options;
|
||||
ReplyPromiseStream<GetKeyValuesStreamReply> reply;
|
||||
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
|
||||
// to this client, of all storage replicas that
|
||||
|
|
|
@ -70,7 +70,10 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
|
|||
|
||||
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
|
||||
// The total size of the returned value (less the last entry) will be less than byteLimit
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, ReadOptions const& options) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
RangeReadOptions const& options) override {
|
||||
return doReadRange(store, keys, rowLimit, byteLimit, options);
|
||||
}
|
||||
|
||||
|
@ -99,7 +102,7 @@ private:
|
|||
KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
ReadOptions options) {
|
||||
RangeReadOptions options) {
|
||||
RangeResult _vs = wait(store->readRange(keys, rowLimit, byteLimit, options));
|
||||
RangeResult vs = _vs; // Get rid of implicit const& from wait statement
|
||||
Arena& a = vs.arena();
|
||||
|
|
|
@ -229,7 +229,10 @@ public:
|
|||
|
||||
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
|
||||
// The total size of the returned value (less the last entry) will be less than byteLimit
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, ReadOptions const& options) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
RangeReadOptions const& options) override {
|
||||
if (recovering.isError())
|
||||
throw recovering.getError();
|
||||
if (!recovering.isReady())
|
||||
|
@ -938,7 +941,7 @@ private:
|
|||
KeyRange keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
ReadOptions options) {
|
||||
RangeReadOptions options) {
|
||||
wait(self->recovering);
|
||||
return static_cast<IKeyValueStore*>(self)->readRange(keys, rowLimit, byteLimit, options).get();
|
||||
}
|
||||
|
|
|
@ -1932,7 +1932,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
return result;
|
||||
}
|
||||
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, ReadOptions const& options) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
RangeReadOptions const& options) override {
|
||||
if (!shouldThrottle(options.type, keys.begin)) {
|
||||
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
|
||||
auto res = a->result.getFuture();
|
||||
|
|
|
@ -1591,7 +1591,10 @@ public:
|
|||
|
||||
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options) override;
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) override;
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, ReadOptions const& options) override;
|
||||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
RangeReadOptions const& options) override;
|
||||
|
||||
KeyValueStoreSQLite(std::string const& filename,
|
||||
UID logID,
|
||||
|
@ -2230,7 +2233,7 @@ Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key, int max
|
|||
Future<RangeResult> KeyValueStoreSQLite::readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
ReadOptions const& options) {
|
||||
RangeReadOptions const& options) {
|
||||
++readsRequested;
|
||||
auto p = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
|
||||
auto f = p->result.getFuture();
|
||||
|
|
|
@ -2382,7 +2382,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
ReadOptions const& options = ReadOptions()) override {
|
||||
RangeReadOptions const& options = ReadOptions()) override {
|
||||
TraceEvent(SevVerbose, "ShardedRocksReadRangeBegin", this->id).detail("Range", keys);
|
||||
auto shards = shardManager.getDataShardsByRange(keys);
|
||||
|
||||
|
|
|
@ -7795,7 +7795,10 @@ public:
|
|||
m_tree->set(keyValue);
|
||||
}
|
||||
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, ReadOptions const& options) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
RangeReadOptions const& options) override {
|
||||
debug_printf("READRANGE %s\n", printable(keys).c_str());
|
||||
return catchError(readRange_impl(this, keys, rowLimit, byteLimit, options));
|
||||
}
|
||||
|
@ -7804,7 +7807,7 @@ public:
|
|||
KeyRange keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
ReadOptions options) {
|
||||
RangeReadOptions options) {
|
||||
state PagerEventReasons reason =
|
||||
(options.type == ReadType::FETCH) ? PagerEventReasons::FetchRange : PagerEventReasons::RangeRead;
|
||||
state VersionedBTree::BTreeCursor cur;
|
||||
|
@ -10995,7 +10998,7 @@ ACTOR Future<Void> randomRangeScans(IKeyValueStore* kvs,
|
|||
bool singlePrefix,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
ReadOptions options = ReadOptions()) {
|
||||
RangeReadOptions options = RangeReadOptions()) {
|
||||
fmt::print("\nstoreType: {}\n", static_cast<int>(kvs->getType()));
|
||||
fmt::print("prefixSource: {}\n", source.toString());
|
||||
fmt::print("suffixSize: {}\n", suffixSize);
|
||||
|
@ -11052,7 +11055,7 @@ TEST_CASE(":/redwood/performance/randomRangeScans") {
|
|||
state int suffixSize = 12;
|
||||
state int valueSize = 100;
|
||||
state int maxByteLimit = std::numeric_limits<int>::max();
|
||||
state ReadOptions options;
|
||||
state RangeReadOptions options;
|
||||
options.cacheResult = true;
|
||||
options.type = ReadType::NORMAL;
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ public:
|
|||
virtual Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit = 1 << 30,
|
||||
int byteLimit = 1 << 30,
|
||||
ReadOptions const& options = ReadOptions()) = 0;
|
||||
RangeReadOptions const& options = RangeReadOptions()) = 0;
|
||||
|
||||
// Shard management APIs.
|
||||
// Adds key range to a physical shard.
|
||||
|
|
|
@ -244,7 +244,7 @@ struct IKVSReadRangeRequest {
|
|||
KeyRangeRef keys;
|
||||
int rowLimit;
|
||||
int byteLimit;
|
||||
ReadOptions options;
|
||||
RangeReadOptions options;
|
||||
ReplyPromise<IKVSReadRangeReply> reply;
|
||||
|
||||
template <class Ar>
|
||||
|
@ -414,7 +414,7 @@ struct RemoteIKeyValueStore : public IKeyValueStore {
|
|||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit = 1 << 30,
|
||||
int byteLimit = 1 << 30,
|
||||
ReadOptions const& options = ReadOptions()) override {
|
||||
RangeReadOptions const& options = RangeReadOptions()) override {
|
||||
IKVSReadRangeRequest req{ keys, rowLimit, byteLimit, options, ReplyPromise<IKVSReadRangeReply>() };
|
||||
return fmap([](const IKVSReadRangeReply& reply) { return reply.toRangeResult(); },
|
||||
interf.readRange.getReply(req));
|
||||
|
|
|
@ -399,7 +399,7 @@ struct StorageServerDisk {
|
|||
// - "a", if key "a" exist
|
||||
// - "b", if key "a" doesn't exist, and "b" is the next existing key in total order
|
||||
// - allKeys.end, if keyrange [a, allKeys.end) is empty
|
||||
Future<Key> readNextKeyInclusive(KeyRef key, ReadOptions const& options = ReadOptions()) {
|
||||
Future<Key> readNextKeyInclusive(KeyRef key, RangeReadOptions const& options = RangeReadOptions()) {
|
||||
++(*kvScans);
|
||||
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), options);
|
||||
}
|
||||
|
@ -414,7 +414,7 @@ struct StorageServerDisk {
|
|||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit = 1 << 30,
|
||||
int byteLimit = 1 << 30,
|
||||
ReadOptions const& options = ReadOptions()) {
|
||||
RangeReadOptions const& options = RangeReadOptions()) {
|
||||
++(*kvScans);
|
||||
return storage->readRange(keys, rowLimit, byteLimit, options);
|
||||
}
|
||||
|
@ -443,7 +443,7 @@ private:
|
|||
IKeyValueStore* storage;
|
||||
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
|
||||
|
||||
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, ReadOptions options) {
|
||||
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, RangeReadOptions options) {
|
||||
RangeResult r = wait(storage->readRange(range, 1, 1 << 30, options));
|
||||
if (r.size())
|
||||
return r[0].key;
|
||||
|
@ -2491,7 +2491,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
state Version startVersion = data->version.get();
|
||||
// TODO: Change feed reads should probably at least set cacheResult to false, possibly set a different ReadType as
|
||||
// well, perhaps high priority?
|
||||
state ReadOptions options;
|
||||
state RangeReadOptions options;
|
||||
|
||||
if (DEBUG_CF_TRACE) {
|
||||
TraceEvent(SevDebug, "TraceChangeFeedMutationsBegin", data->thisServerID)
|
||||
|
@ -3286,7 +3286,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
|
|||
int limit,
|
||||
int* pLimitBytes,
|
||||
SpanContext parentSpan,
|
||||
ReadOptions options,
|
||||
RangeReadOptions options,
|
||||
Optional<Key> tenantPrefix) {
|
||||
state GetKeyValuesReply result;
|
||||
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
|
||||
|
@ -3527,7 +3527,7 @@ ACTOR Future<Key> findKey(StorageServer* data,
|
|||
KeyRange range,
|
||||
int* pOffset,
|
||||
SpanContext parentSpan,
|
||||
ReadOptions options)
|
||||
RangeReadOptions options)
|
||||
// Attempts to find the key indicated by sel in the data at version, within range.
|
||||
// Precondition: selectorInRange(sel, range)
|
||||
// If it is found, offset is set to 0 and a key is returned which falls inside range.
|
||||
|
@ -3665,7 +3665,7 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
|
|||
{
|
||||
state Span span("SS:getKeyValues"_loc, req.spanContext);
|
||||
state int64_t resultSize = 0;
|
||||
state ReadOptions options = req.options;
|
||||
state RangeReadOptions options = req.options;
|
||||
|
||||
if (req.tenantInfo.name.present()) {
|
||||
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
|
||||
|
@ -3864,7 +3864,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
|
|||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", pOriginalReq->options.debugID.get().first(), "storageserver.quickGetKeyValues.Before");
|
||||
try {
|
||||
// TODO: Use a lower level API may be better? Or tweak priorities?
|
||||
// TODO: Use a lower level API may be better?
|
||||
GetKeyValuesRequest req;
|
||||
req.spanContext = pOriginalReq->spanContext;
|
||||
req.arena = *a;
|
||||
|
@ -3877,6 +3877,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
|
|||
req.limit = SERVER_KNOBS->QUICK_GET_KEY_VALUES_LIMIT;
|
||||
req.limitBytes = SERVER_KNOBS->QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
||||
req.options = pOriginalReq->options;
|
||||
// TODO: tweak priorities?
|
||||
req.options.type = ReadType::NORMAL;
|
||||
req.tags = pOriginalReq->tags;
|
||||
req.ssLatestCommitVersions = VersionVector();
|
||||
|
@ -4408,7 +4409,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
|
|||
{
|
||||
state Span span("SS:getMappedKeyValues"_loc, req.spanContext);
|
||||
state int64_t resultSize = 0;
|
||||
state ReadOptions options = req.options;
|
||||
state RangeReadOptions options = req.options;
|
||||
|
||||
if (req.tenantInfo.name.present()) {
|
||||
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
|
||||
|
@ -4616,7 +4617,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
|
|||
{
|
||||
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
|
||||
state int64_t resultSize = 0;
|
||||
state ReadOptions options = req.options;
|
||||
state RangeReadOptions options = req.options;
|
||||
|
||||
if (req.tenantInfo.name.present()) {
|
||||
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
|
||||
|
@ -4827,7 +4828,8 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
|
|||
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
|
||||
}
|
||||
state int64_t resultSize = 0;
|
||||
state ReadOptions options = req.options;
|
||||
state RangeReadOptions options;
|
||||
options = req.options;
|
||||
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
|
||||
|
||||
++data->counters.getKeyQueries;
|
||||
|
@ -4948,7 +4950,7 @@ void getQueuingMetrics(StorageServer* self, StorageQueuingMetricsRequest const&
|
|||
|
||||
ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager) {
|
||||
eager->finishKeyBegin();
|
||||
state ReadOptions options;
|
||||
state RangeReadOptions options;
|
||||
options.type = ReadType::EAGER;
|
||||
if (SERVER_KNOBS->ENABLE_CLEAR_RANGE_EAGER_READS) {
|
||||
std::vector<Future<Key>> keyEnd(eager->keyBegin.size());
|
||||
|
|
Loading…
Reference in New Issue