use optional ReadOptions and RangeReadOptions in requests

This commit is contained in:
Fuheng Zhao 2022-08-23 17:16:47 -07:00
parent 78f4b4f739
commit 4e748d6bed
12 changed files with 298 additions and 205 deletions

View File

@ -3433,7 +3433,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
UseTenant useTenant = UseTenant::True) {
wait(success(version));
state Optional<UID> getKeyID = Optional<UID>();
state Optional<UID> getKeyID;
state Span span("NAPI:getKey"_loc, trState->spanContext);
if (trState->readOptions.debugID.present()) {
getKeyID = nondeterministicRandom()->randomUniqueID();
@ -3466,7 +3466,10 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
state VersionVector ssLatestCommitVersions;
trState->cx->getLatestCommitVersions(locationInfo.locations, version.get(), trState, ssLatestCommitVersions);
state ReadOptions readOptions;
state ReadOptions readOptions = trState->readOptions;
readOptions.debugID = getKeyID;
state bool sendReadOption =
readOptions.debugID.present() || readOptions.type != ReadType::NORMAL || readOptions.cacheResult != true;
try {
if (getKeyID.present())
g_traceBatch.addEvent(
@ -3475,15 +3478,13 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
"NativeAPI.getKey.Before"); //.detail("StartKey",
// k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
++trState->cx->transactionPhysicalReads;
readOptions = trState->readOptions;
readOptions.debugID = getKeyID;
GetKeyRequest req(span.context,
useTenant ? trState->getTenantInfo() : TenantInfo(),
k,
version.get(),
trState->cx->sampleReadTags() ? trState->options.readTags : Optional<TagSet>(),
readOptions,
sendReadOption ? readOptions : Optional<ReadOptions>(),
ssLatestCommitVersions);
req.arena.dependsOn(k.arena());
@ -3942,7 +3943,12 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
// FIXME: buggify byte limits on internal functions that use them, instead of globally
req.tags = trState->cx->sampleReadTags() ? trState->options.readTags : Optional<TagSet>();
req.options = trState->readOptions;
if (trState->readOptions.type != ReadType::NORMAL || trState->readOptions.cacheResult != true ||
trState->readOptions.debugID.present()) {
RangeReadOptions rangeOptions;
rangeOptions = trState->readOptions;
req.options = Optional<RangeReadOptions>(rangeOptions);
}
try {
if (trState->readOptions.debugID.present()) {
@ -4310,7 +4316,12 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
req.arena.dependsOn(mapper.arena());
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
req.tenantInfo = useTenant ? trState->getTenantInfo() : TenantInfo();
req.options = trState->readOptions;
if (trState->readOptions.type != ReadType::NORMAL || trState->readOptions.cacheResult != true ||
trState->readOptions.debugID.present()) {
RangeReadOptions rangeOptions;
rangeOptions = trState->readOptions;
req.options = Optional<RangeReadOptions>(rangeOptions);
}
req.version = readVersion;
trState->cx->getLatestCommitVersions(
@ -4763,8 +4774,12 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
req.spanContext = spanContext;
req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.limitBytes = std::numeric_limits<int>::max();
req.options = trState->readOptions;
if (trState->readOptions.type != ReadType::NORMAL || trState->readOptions.cacheResult != true ||
trState->readOptions.debugID.present()) {
RangeReadOptions rangeOptions;
rangeOptions = trState->readOptions;
req.options = Optional<RangeReadOptions>(rangeOptions);
}
trState->cx->getLatestCommitVersions(
locations[shard].locations, req.version, trState, req.ssLatestCommitVersions);

View File

@ -296,7 +296,7 @@ struct GetValueRequest : TimedRequest {
Version version;
Optional<TagSet> tags;
ReplyPromise<GetValueReply> reply;
ReadOptions options;
Optional<ReadOptions> options;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key
@ -309,7 +309,7 @@ struct GetValueRequest : TimedRequest {
const Key& key,
Version ver,
Optional<TagSet> tags,
ReadOptions options,
Optional<ReadOptions> options,
VersionVector latestCommitVersions)
: spanContext(spanContext), tenantInfo(tenantInfo), key(key), version(ver), tags(tags), options(options),
ssLatestCommitVersions(latestCommitVersions) {}
@ -394,11 +394,11 @@ struct GetKeyValuesRequest : TimedRequest {
int limit, limitBytes;
Optional<TagSet> tags;
Optional<UID> debugID;
Optional<RangeReadOptions> options;
ReplyPromise<GetKeyValuesReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key
RangeReadOptions options;
GetKeyValuesRequest() {}
@ -412,12 +412,12 @@ struct GetKeyValuesRequest : TimedRequest {
version,
limit,
limitBytes,
options,
tags,
debugID,
reply,
spanContext,
tenantInfo,
options,
arena,
ssLatestCommitVersions);
}
@ -452,8 +452,8 @@ struct GetMappedKeyValuesRequest : TimedRequest {
int limit, limitBytes;
int matchIndex;
Optional<TagSet> tags;
Optional<RangeReadOptions> options;
ReplyPromise<GetMappedKeyValuesReply> reply;
RangeReadOptions options;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key range
@ -518,7 +518,7 @@ struct GetKeyValuesStreamRequest {
Version version; // or latestVersion
int limit, limitBytes;
Optional<TagSet> tags;
RangeReadOptions options;
Optional<RangeReadOptions> options;
ReplyPromiseStream<GetKeyValuesStreamReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
@ -569,7 +569,7 @@ struct GetKeyRequest : TimedRequest {
Version version; // or latestVersion
Optional<TagSet> tags;
ReplyPromise<GetKeyReply> reply;
ReadOptions options;
Optional<ReadOptions> options;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key
@ -583,7 +583,7 @@ struct GetKeyRequest : TimedRequest {
KeySelectorRef const& sel,
Version version,
Optional<TagSet> tags,
ReadOptions options,
Optional<ReadOptions> options,
VersionVector latestCommitVersions)
: spanContext(spanContext), tenantInfo(tenantInfo), sel(sel), version(version), tags(tags), options(options),
ssLatestCommitVersions(latestCommitVersions) {}

View File

@ -56,7 +56,7 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
void clear(KeyRangeRef range, const Arena* arena = nullptr) override { store->clear(range, arena); }
Future<Void> commit(bool sequential = false) override { return store->commit(sequential); }
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override {
return doReadValue(store, key, options);
}
@ -64,7 +64,7 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
// problem is still present if you are using this storage interface, but this storage interface is not used by
// customers ever. However, if you want to try to test malicious atomic op workloads with compressed values for some
// reason, you will need to fix this.
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) override {
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<ReadOptions> options) override {
return doReadValuePrefix(store, key, maxLength, options);
}
@ -73,12 +73,12 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions const& options) override {
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) override {
return doReadRange(store, keys, rowLimit, byteLimit, options);
}
private:
ACTOR static Future<Optional<Value>> doReadValue(IKeyValueStore* store, Key key, ReadOptions options) {
ACTOR static Future<Optional<Value>> doReadValue(IKeyValueStore* store, Key key, Optional<ReadOptions> options) {
Optional<Value> v = wait(store->readValue(key, options));
if (!v.present())
return v;
@ -88,7 +88,7 @@ private:
ACTOR static Future<Optional<Value>> doReadValuePrefix(IKeyValueStore* store,
Key key,
int maxLength,
ReadOptions options) {
Optional<ReadOptions> options) {
Optional<Value> v = wait(doReadValue(store, key, options));
if (!v.present())
return v;
@ -102,7 +102,7 @@ private:
KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions options) {
Optional<RangeReadOptions> options) {
RangeResult _vs = wait(store->readRange(keys, rowLimit, byteLimit, options));
RangeResult vs = _vs; // Get rid of implicit const& from wait statement
Arena& a = vs.arena();

View File

@ -198,7 +198,7 @@ public:
return c;
}
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override {
if (recovering.isError())
throw recovering.getError();
if (!recovering.isReady())
@ -210,7 +210,7 @@ public:
return Optional<Value>(it.getValue());
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) override {
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<ReadOptions> options) override {
if (recovering.isError())
throw recovering.getError();
if (!recovering.isReady())
@ -232,7 +232,7 @@ public:
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions const& options) override {
Optional<RangeReadOptions> options) override {
if (recovering.isError())
throw recovering.getError();
if (!recovering.isReady())
@ -926,14 +926,16 @@ private:
}
}
ACTOR static Future<Optional<Value>> waitAndReadValue(KeyValueStoreMemory* self, Key key, ReadOptions options) {
ACTOR static Future<Optional<Value>> waitAndReadValue(KeyValueStoreMemory* self,
Key key,
Optional<ReadOptions> options) {
wait(self->recovering);
return static_cast<IKeyValueStore*>(self)->readValue(key, options).get();
}
ACTOR static Future<Optional<Value>> waitAndReadValuePrefix(KeyValueStoreMemory* self,
Key key,
int maxLength,
ReadOptions options) {
Optional<ReadOptions> options) {
wait(self->recovering);
return static_cast<IKeyValueStore*>(self)->readValuePrefix(key, maxLength, options).get();
}
@ -941,7 +943,7 @@ private:
KeyRange keys,
int rowLimit,
int byteLimit,
RangeReadOptions options) {
Optional<RangeReadOptions> options) {
wait(self->recovering);
return static_cast<IKeyValueStore*>(self)->readRange(keys, rowLimit, byteLimit, options).get();
}

View File

@ -1880,35 +1880,51 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return result;
}
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options) override {
if (!shouldThrottle(options.type, key)) {
auto a = new Reader::ReadValueAction(key, options.debugID);
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override {
ReadType type = ReadType::NORMAL;
Optional<UID> debugID;
if (options.present()) {
type = options.get().type;
debugID = options.get().debugID;
}
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValueAction(key, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (options.type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (options.type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
auto& semaphore = (type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValueAction>(key, options.debugID);
auto a = std::make_unique<Reader::ReadValueAction>(key, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) override {
if (!shouldThrottle(options.type, key)) {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, options.debugID);
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<ReadOptions> options) override {
ReadType type = ReadType::NORMAL;
Optional<UID> debugID;
if (options.present()) {
type = options.get().type;
debugID = options.get().debugID;
}
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (options.type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (options.type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
auto& semaphore = (type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValuePrefixAction>(key, maxLength, options.debugID);
auto a = std::make_unique<Reader::ReadValuePrefixAction>(key, maxLength, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
@ -1935,16 +1951,22 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions const& options) override {
if (!shouldThrottle(options.type, keys.begin)) {
Optional<RangeReadOptions> options) override {
ReadType type = ReadType::NORMAL;
if (options.present()) {
type = options.get().type;
}
if (!shouldThrottle(type, keys.begin)) {
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (options.type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (options.type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
auto& semaphore = (type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadRangeAction>(keys, rowLimit, byteLimit);

View File

@ -1589,12 +1589,12 @@ public:
void clear(KeyRangeRef range, const Arena* arena = nullptr) override;
Future<Void> commit(bool sequential = false) override;
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options) override;
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) override;
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> optionss) override;
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<ReadOptions> options) override;
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions const& options) override;
Optional<RangeReadOptions> options) override;
KeyValueStoreSQLite(std::string const& filename,
UID logID,
@ -2216,16 +2216,24 @@ Future<Void> KeyValueStoreSQLite::commit(bool sequential) {
writeThread->post(p);
return f;
}
Future<Optional<Value>> KeyValueStoreSQLite::readValue(KeyRef key, ReadOptions const& options) {
Future<Optional<Value>> KeyValueStoreSQLite::readValue(KeyRef key, Optional<ReadOptions> options) {
++readsRequested;
auto p = new Reader::ReadValueAction(key, options.debugID);
Optional<UID> debugID;
if (options.present()) {
debugID = options.get().debugID;
}
auto p = new Reader::ReadValueAction(key, debugID);
auto f = p->result.getFuture();
readThreads->post(p);
return f;
}
Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) {
Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key, int maxLength, Optional<ReadOptions> options) {
++readsRequested;
auto p = new Reader::ReadValuePrefixAction(key, maxLength, options.debugID);
Optional<UID> debugID;
if (options.present()) {
debugID = options.get().debugID;
}
auto p = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto f = p->result.getFuture();
readThreads->post(p);
return f;
@ -2233,7 +2241,7 @@ Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key, int max
Future<RangeResult> KeyValueStoreSQLite::readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions const& options) {
Optional<RangeReadOptions> options) {
++readsRequested;
auto p = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto f = p->result.getFuture();

View File

@ -2309,7 +2309,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return result;
}
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override {
auto* shard = shardManager.getDataShard(key);
if (shard == nullptr || !shard->physicalShard->initialized()) {
// TODO: read non-exist system key range should not cause an error.
@ -2319,22 +2319,30 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return Optional<Value>();
}
if (!shouldThrottle(options.type, key)) {
auto a = new Reader::ReadValueAction(key, shard->physicalShard, options.debugID);
ReadType type = ReadType::NORMAL;
Optional<UID> debugID;
if (options.present()) {
type = options.get().type;
debugID = options.get().debugID;
}
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValueAction(key, shard->physicalShard, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (options.type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (options.type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
auto& semaphore = (type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValueAction>(key, shard->physicalShard, options.debugID);
auto a = std::make_unique<Reader::ReadValueAction>(key, shard->physicalShard, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) override {
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<ReadOptions> options) override {
auto* shard = shardManager.getDataShard(key);
if (shard == nullptr || !shard->physicalShard->initialized()) {
// TODO: read non-exist system key range should not cause an error.
@ -2344,18 +2352,26 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return Optional<Value>();
}
if (!shouldThrottle(options.type, key)) {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, shard->physicalShard, options.debugID);
ReadType type = ReadType::NORMAL;
Optional<UID> debugID;
if (options.present()) {
type = options.get().type;
debugID = options.get().debugID;
}
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, shard->physicalShard, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (options.type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (options.type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
auto& semaphore = (type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValuePrefixAction>(key, maxLength, shard->physicalShard, options.debugID);
auto a = std::make_unique<Reader::ReadValuePrefixAction>(key, maxLength, shard->physicalShard, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
@ -2382,19 +2398,24 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions const& options = ReadOptions()) override {
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) override {
TraceEvent(SevVerbose, "ShardedRocksReadRangeBegin", this->id).detail("Range", keys);
auto shards = shardManager.getDataShardsByRange(keys);
if (!shouldThrottle(options.type, keys.begin)) {
ReadType type = ReadType::NORMAL;
if (options.present()) {
type = options.get().type;
}
if (!shouldThrottle(type, keys.begin)) {
auto a = new Reader::ReadRangeAction(keys, shards, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (options.type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (options.type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
auto& semaphore = (type == ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadRangeAction>(keys, shards, rowLimit, byteLimit);

View File

@ -481,18 +481,18 @@ ACTOR Future<Void> getValueQ(StorageCacheData* data, GetValueRequest req) {
// TODO what's this?
wait(delay(0, TaskPriority::DefaultEndpoint));
if (req.options.debugID.present()) {
if (req.options.present() && req.options.get().debugID.present()) {
g_traceBatch.addEvent("GetValueDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
// FIXME
}
state Optional<Value> v;
state Version version = wait(waitForVersion(data, req.version));
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
state uint64_t changeCounter = data->cacheRangeChangeCounter;
@ -527,9 +527,9 @@ ACTOR Future<Void> getValueQ(StorageCacheData* data, GetValueRequest req) {
//TraceEvent(SevDebug, "SCGetValueQPresent", data->thisServerID).detail("ResultSize",resultSize).detail("Version", version).detail("ReqKey",req.key).detail("Value",v);
}
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
GetValueReply reply(v, true);
@ -732,7 +732,7 @@ ACTOR Future<Void> getKeyValues(StorageCacheData* data, GetKeyValuesRequest req)
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
TaskPriority taskType = TaskPriority::DefaultEndpoint;
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.type == ReadType::FETCH) {
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
taskType = TaskPriority::FetchKeys;
// } else if (false) {
// // Placeholder for up-prioritizing fetches for important requests
@ -741,18 +741,18 @@ ACTOR Future<Void> getKeyValues(StorageCacheData* data, GetKeyValuesRequest req)
wait(delay(0, taskType));
try {
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storagecache.getKeyValues.Before");
"TransactionDebug", req.options.get().debugID.get().first(), "storagecache.getKeyValues.Before");
state Version version = wait(waitForVersion(data, req.version));
state uint64_t changeCounter = data->cacheRangeChangeCounter;
state KeyRange cachedKeyRange = getCachedKeyRange(data, req.begin);
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storagecache.getKeyValues.AfterVersion");
"TransactionDebug", req.options.get().debugID.get().first(), "storagecache.getKeyValues.AfterVersion");
//.detail("CacheRangeBegin", cachedKeyRange.begin).detail("CacheRangeEnd", cachedKeyRange.end);
if (!selectorInRange(req.end, cachedKeyRange) &&
@ -770,9 +770,9 @@ ACTOR Future<Void> getKeyValues(StorageCacheData* data, GetKeyValuesRequest req)
: findKey(data, req.begin, version, cachedKeyRange, &offset1);
state Key end = req.end.isFirstGreaterOrEqual() ? req.end.getKey()
: findKey(data, req.end, version, cachedKeyRange, &offset2);
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storagecache.getKeyValues.AfterKeys");
"TransactionDebug", req.options.get().debugID.get().first(), "storagecache.getKeyValues.AfterKeys");
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
// Offsets of zero indicate begin/end keys in this cachedKeyRange, which obviously means we can answer the query
@ -797,9 +797,9 @@ ACTOR Future<Void> getKeyValues(StorageCacheData* data, GetKeyValuesRequest req)
// offset1).detail("EndOffset", offset2);
if (begin >= end) {
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storagecache.getKeyValues.Send");
"TransactionDebug", req.options.get().debugID.get().first(), "storagecache.getKeyValues.Send");
//.detail("Begin",begin).detail("End",end);
GetKeyValuesReply none;
@ -815,9 +815,10 @@ ACTOR Future<Void> getKeyValues(StorageCacheData* data, GetKeyValuesRequest req)
GetKeyValuesReply _r = readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes);
GetKeyValuesReply r = _r;
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storagecache.getKeyValues.AfterReadRange");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storagecache.getKeyValues.AfterReadRange");
data->checkChangeCounter(
changeCounter,
KeyRangeRef(std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())),

View File

@ -7346,7 +7346,7 @@ public:
private:
PagerEventReasons reason;
ReadOptions options;
Optional<ReadOptions> options;
VersionedBTree* btree;
Reference<IPagerSnapshot> pager;
bool valid;
@ -7412,7 +7412,7 @@ public:
link.get().getChildPage(),
ioMaxPriority,
false,
options.cacheResult || path.back().btPage()->height != 2),
!options.present() || options.get().cacheResult || path.back().btPage()->height != 2),
[=](Reference<const ArenaPage> p) {
#if REDWOOD_DEBUG
path.push_back({ p, btree->getCursor(p.getPtr(), link), link.get().getChildPage() });
@ -7446,7 +7446,7 @@ public:
// Initialize or reinitialize cursor
Future<Void> init(VersionedBTree* btree_in,
PagerEventReasons reason_in,
ReadOptions options_in,
Optional<ReadOptions> options_in,
Reference<IPagerSnapshot> pager_in,
BTreeNodeLink root) {
btree = btree_in;
@ -7649,7 +7649,7 @@ public:
Future<Void> initBTreeCursor(BTreeCursor* cursor,
Version snapshotVersion,
PagerEventReasons reason,
ReadOptions options = ReadOptions()) {
Optional<ReadOptions> options = Optional<ReadOptions>()) {
Reference<IPagerSnapshot> snapshot = m_pager->getReadSnapshot(snapshotVersion);
BTreeNodeLinkRef root;
@ -7798,7 +7798,7 @@ public:
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
RangeReadOptions const& options) override {
Optional<RangeReadOptions> options) override {
debug_printf("READRANGE %s\n", printable(keys).c_str());
return catchError(readRange_impl(this, keys, rowLimit, byteLimit, options));
}
@ -7807,11 +7807,18 @@ public:
KeyRange keys,
int rowLimit,
int byteLimit,
RangeReadOptions options) {
state PagerEventReasons reason =
(options.type == ReadType::FETCH) ? PagerEventReasons::FetchRange : PagerEventReasons::RangeRead;
Optional<RangeReadOptions> options) {
state PagerEventReasons reason = PagerEventReasons::RangeRead;
state VersionedBTree::BTreeCursor cur;
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), reason, options));
if (options.present()) {
state ReadOptions rOptions = options.get();
if (options.get().type == ReadType::FETCH) {
reason = PagerEventReasons::FetchRange;
}
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), reason, rOptions));
} else {
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), reason));
}
state PriorityMultiLock::Lock lock;
state Future<Void> f;
++g_redwoodMetrics.metric.opGetRange;
@ -7947,7 +7954,9 @@ public:
return result;
}
ACTOR static Future<Optional<Value>> readValue_impl(KeyValueStoreRedwood* self, Key key, ReadOptions options) {
ACTOR static Future<Optional<Value>> readValue_impl(KeyValueStoreRedwood* self,
Key key,
Optional<ReadOptions> options) {
state VersionedBTree::BTreeCursor cur;
wait(self->m_tree->initBTreeCursor(
&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::PointRead, options));
@ -7969,11 +7978,11 @@ public:
return Optional<Value>();
}
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options) override {
return catchError(readValue_impl(this, key, options));
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options) override {
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<ReadOptions> options) override {
return catchError(map(readValue_impl(this, key, options), [maxLength](Optional<Value> v) {
if (v.present() && v.get().size() > maxLength) {
v.get().contents() = v.get().substr(0, maxLength);
@ -10998,7 +11007,7 @@ ACTOR Future<Void> randomRangeScans(IKeyValueStore* kvs,
bool singlePrefix,
int rowLimit,
int byteLimit,
RangeReadOptions options = RangeReadOptions()) {
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) {
fmt::print("\nstoreType: {}\n", static_cast<int>(kvs->getType()));
fmt::print("prefixSource: {}\n", source.toString());
fmt::print("suffixSize: {}\n", suffixSize);
@ -11055,9 +11064,6 @@ TEST_CASE(":/redwood/performance/randomRangeScans") {
state int suffixSize = 12;
state int valueSize = 100;
state int maxByteLimit = std::numeric_limits<int>::max();
state RangeReadOptions options;
options.cacheResult = true;
options.type = ReadType::NORMAL;
// TODO change to 100e8 after figuring out no-disk redwood mode
state int writeRecordCountTarget = 1e6;
@ -11073,16 +11079,11 @@ TEST_CASE(":/redwood/performance/randomRangeScans") {
redwood, suffixSize, valueSize, source, writeRecordCountTarget, writePrefixesInOrder, false));
// divide targets for tiny queries by 10 because they are much slower
wait(randomRangeScans(
redwood, suffixSize, source, valueSize, queryRecordTarget / 10, true, 10, maxByteLimit, options));
wait(
randomRangeScans(redwood, suffixSize, source, valueSize, queryRecordTarget, true, 1000, maxByteLimit, options));
wait(randomRangeScans(
redwood, suffixSize, source, valueSize, queryRecordTarget / 10, false, 100, maxByteLimit, options));
wait(randomRangeScans(
redwood, suffixSize, source, valueSize, queryRecordTarget, false, 10000, maxByteLimit, options));
wait(randomRangeScans(
redwood, suffixSize, source, valueSize, queryRecordTarget, false, 1000000, maxByteLimit, options));
wait(randomRangeScans(redwood, suffixSize, source, valueSize, queryRecordTarget / 10, true, 10, maxByteLimit));
wait(randomRangeScans(redwood, suffixSize, source, valueSize, queryRecordTarget, true, 1000, maxByteLimit));
wait(randomRangeScans(redwood, suffixSize, source, valueSize, queryRecordTarget / 10, false, 100, maxByteLimit));
wait(randomRangeScans(redwood, suffixSize, source, valueSize, queryRecordTarget, false, 10000, maxByteLimit));
wait(randomRangeScans(redwood, suffixSize, source, valueSize, queryRecordTarget, false, 1000000, maxByteLimit));
wait(closeKVS(redwood));
printf("\n");
return Void();

View File

@ -70,19 +70,19 @@ public:
virtual Future<Void> commit(
bool sequential = false) = 0; // returns when prior sets and clears are (atomically) durable
virtual Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options = ReadOptions()) = 0;
virtual Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options = Optional<ReadOptions>()) = 0;
// Like readValue(), but returns only the first maxLength bytes of the value if it is longer
virtual Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
ReadOptions const& options = ReadOptions()) = 0;
Optional<ReadOptions> options = Optional<ReadOptions>()) = 0;
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
RangeReadOptions const& options = RangeReadOptions()) = 0;
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) = 0;
// Shard management APIs.
// Adds key range to a physical shard.

View File

@ -155,7 +155,7 @@ struct OpenKVStoreRequest {
struct IKVSGetValueRequest {
constexpr static FileIdentifier file_identifier = 1029439;
KeyRef key;
ReadOptions options;
Optional<ReadOptions> options;
ReplyPromise<Optional<Value>> reply;
template <class Ar>
@ -201,7 +201,7 @@ struct IKVSReadValuePrefixRequest {
constexpr static FileIdentifier file_identifier = 1928374;
KeyRef key;
int maxLength;
ReadOptions options;
Optional<ReadOptions> options;
ReplyPromise<Optional<Value>> reply;
template <class Ar>
@ -244,7 +244,7 @@ struct IKVSReadRangeRequest {
KeyRangeRef keys;
int rowLimit;
int byteLimit;
RangeReadOptions options;
Optional<RangeReadOptions> options;
ReplyPromise<IKVSReadRangeReply> reply;
template <class Ar>
@ -400,13 +400,13 @@ struct RemoteIKeyValueStore : public IKeyValueStore {
return commitAndGetStorageBytes(this, commitReply);
}
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options = ReadOptions()) override {
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options = Optional<ReadOptions>()) override {
return readValueImpl(this, IKVSGetValueRequest{ key, options, ReplyPromise<Optional<Value>>() });
}
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
ReadOptions const& options = ReadOptions()) override {
Optional<ReadOptions> options = Optional<ReadOptions>()) override {
return interf.readValuePrefix.getReply(
IKVSReadValuePrefixRequest{ key, maxLength, options, ReplyPromise<Optional<Value>>() });
}
@ -414,7 +414,7 @@ struct RemoteIKeyValueStore : public IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
RangeReadOptions const& options = RangeReadOptions()) override {
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) override {
IKVSReadRangeRequest req{ keys, rowLimit, byteLimit, options, ReplyPromise<IKVSReadRangeReply>() };
return fmap([](const IKVSReadRangeReply& reply) { return reply.toRangeResult(); },
interf.readRange.getReply(req));

View File

@ -399,22 +399,24 @@ struct StorageServerDisk {
// - "a", if key "a" exist
// - "b", if key "a" doesn't exist, and "b" is the next existing key in total order
// - allKeys.end, if keyrange [a, allKeys.end) is empty
Future<Key> readNextKeyInclusive(KeyRef key, RangeReadOptions const& options = RangeReadOptions()) {
Future<Key> readNextKeyInclusive(KeyRef key, Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) {
++(*kvScans);
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), options);
}
Future<Optional<Value>> readValue(KeyRef key, ReadOptions const& options = ReadOptions()) {
Future<Optional<Value>> readValue(KeyRef key, Optional<ReadOptions> options = Optional<ReadOptions>()) {
++(*kvGets);
return storage->readValue(key, options);
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, ReadOptions const& options = ReadOptions()) {
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
Optional<ReadOptions> options = Optional<ReadOptions>()) {
++(*kvGets);
return storage->readValuePrefix(key, maxLength, options);
}
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
RangeReadOptions const& options = RangeReadOptions()) {
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) {
++(*kvScans);
return storage->readRange(keys, rowLimit, byteLimit, options);
}
@ -443,7 +445,9 @@ private:
IKeyValueStore* storage;
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, RangeReadOptions options) {
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage,
KeyRangeRef range,
Optional<RangeReadOptions> options) {
RangeResult r = wait(storage->readRange(range, 1, 1 << 30, options));
if (r.size())
return r[0].key;
@ -1806,17 +1810,17 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// so we need to downgrade here
wait(data->getQueryDelay());
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
state Optional<Value> v;
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, req.spanContext));
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
Optional<TenantMapEntry> entry = data->getTenantEntry(version, req.tenantInfo);
@ -1884,9 +1888,9 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
data->metrics.notifyBytesReadPerKSecond(req.key, bytesReadPerKSecond);
}
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
// Check if the desired key might be cached
@ -3230,6 +3234,7 @@ ACTOR Future<GetValueReqAndResultRef> quickGetValue(StorageServer* data,
state GetValueReqAndResultRef getValue;
state double getValueStart = g_network->timer();
getValue.key = key;
state Optional<ReadOptions> options = pOriginalReq->options.castTo<ReadOptions>();
if (data->shards[key]->isReadable()) {
try {
@ -3239,7 +3244,7 @@ ACTOR Future<GetValueReqAndResultRef> quickGetValue(StorageServer* data,
key,
version,
pOriginalReq->tags,
pOriginalReq->options,
options,
VersionVector());
// Note that it does not use readGuard to avoid server being overloaded here. Throttling is enforced at the
// original request level, rather than individual underlying lookups. The reason is that throttle any
@ -3286,7 +3291,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
int limit,
int* pLimitBytes,
SpanContext parentSpan,
RangeReadOptions options,
Optional<RangeReadOptions> options,
Optional<Key> tenantPrefix) {
state GetKeyValuesReply result;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
@ -3527,7 +3532,7 @@ ACTOR Future<Key> findKey(StorageServer* data,
KeyRange range,
int* pOffset,
SpanContext parentSpan,
RangeReadOptions options)
Optional<RangeReadOptions> options)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
// If it is found, offset is set to 0 and a key is returned which falls inside range.
@ -3665,7 +3670,7 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
{
state Span span("SS:getKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state RangeReadOptions options = req.options;
state Optional<RangeReadOptions> options = req.options;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -3680,16 +3685,16 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.type == ReadType::FETCH) {
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(data->getQueryDelay());
}
try {
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValues.Before");
"TransactionDebug", req.options.get().debugID.get().first(), "storageserver.getKeyValues.Before");
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context));
@ -3705,9 +3710,9 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
// try {
state KeyRange shard = getShardKeyRange(data, req.begin);
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
"TransactionDebug", req.options.get().debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin",
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
@ -3734,9 +3739,9 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
"TransactionDebug", req.options.get().debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
@ -3754,9 +3759,9 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
}
if (begin >= end) {
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValues.Send");
"TransactionDebug", req.options.get().debugID.get().first(), "storageserver.getKeyValues.Send");
//.detail("Begin",begin).detail("End",end);
GetKeyValuesReply none;
@ -3781,9 +3786,10 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
tenantPrefix));
GetKeyValuesReply r = _r;
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storageserver.getKeyValues.AfterReadRange");
//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
data->checkChangeCounter(
changeCounter,
@ -3860,9 +3866,10 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
state double getValuesStart = g_network->timer();
getRange.begin = firstGreaterOrEqual(KeyRef(*a, prefix));
getRange.end = firstGreaterOrEqual(strinc(prefix, *a));
if (pOriginalReq->options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", pOriginalReq->options.debugID.get().first(), "storageserver.quickGetKeyValues.Before");
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
pOriginalReq->options.get().debugID.get().first(),
"storageserver.quickGetKeyValues.Before");
try {
// TODO: Use a lower level API may be better?
GetKeyValuesRequest req;
@ -3877,8 +3884,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
req.limit = SERVER_KNOBS->QUICK_GET_KEY_VALUES_LIMIT;
req.limitBytes = SERVER_KNOBS->QUICK_GET_KEY_VALUES_LIMIT_BYTES;
req.options = pOriginalReq->options;
// TODO: tweak priorities?
req.options.type = ReadType::NORMAL;
// TODO: tweak priorities in req.options.get().type?
req.tags = pOriginalReq->tags;
req.ssLatestCommitVersions = VersionVector();
@ -3894,9 +3900,9 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
getRange.result = RangeResultRef(reply.data, reply.more);
const double duration = g_network->timer() - getValuesStart;
data->counters.mappedRangeLocalSample.addMeasurement(duration);
if (pOriginalReq->options.debugID.present())
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
pOriginalReq->options.debugID.get().first(),
pOriginalReq->options.get().debugID.get().first(),
"storageserver.quickGetKeyValues.AfterLocalFetch");
return getRange;
}
@ -3909,8 +3915,8 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
if (SERVER_KNOBS->QUICK_GET_KEY_VALUES_FALLBACK) {
state Transaction tr(data->cx, pOriginalReq->tenantInfo.name.castTo<TenantName>());
tr.setVersion(version);
if (pOriginalReq->options.debugID.present()) {
tr.debugTransaction(pOriginalReq->options.debugID.get());
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present()) {
tr.debugTransaction(pOriginalReq->options.get().debugID.get());
}
// TODO: is DefaultPromiseEndpoint the best priority for this?
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
@ -3922,9 +3928,9 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
getRange.result = rangeResult;
const double duration = g_network->timer() - getValuesStart;
data->counters.mappedRangeRemoteSample.addMeasurement(duration);
if (pOriginalReq->options.debugID.present())
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
pOriginalReq->options.debugID.get().first(),
pOriginalReq->options.get().debugID.get().first(),
"storageserver.quickGetKeyValues.AfterRemoteFetch");
return getRange;
} else {
@ -4213,9 +4219,9 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
result.arena.dependsOn(input.arena);
result.data.reserve(result.arena, input.data.size());
if (pOriginalReq->options.debugID.present())
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", pOriginalReq->options.debugID.get().first(), "storageserver.mapKeyValues.Start");
"TransactionDebug", pOriginalReq->options.get().debugID.get().first(), "storageserver.mapKeyValues.Start");
state Tuple mappedKeyFormatTuple;
state Tuple mappedKeyTuple;
@ -4234,9 +4240,10 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
state std::vector<MappedKeyValueRef> kvms(k);
state std::vector<Future<Void>> subqueries;
state int offset = 0;
if (pOriginalReq->options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", pOriginalReq->options.debugID.get().first(), "storageserver.mapKeyValues.BeforeLoop");
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
pOriginalReq->options.get().debugID.get().first(),
"storageserver.mapKeyValues.BeforeLoop");
for (; offset < sz; offset += SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE) {
// Divide into batches of MAX_PARALLEL_QUICK_GET_VALUE subqueries
for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) {
@ -4272,18 +4279,19 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
mappedKey));
}
wait(waitForAll(subqueries));
if (pOriginalReq->options.debugID.present())
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
pOriginalReq->options.debugID.get().first(),
pOriginalReq->options.get().debugID.get().first(),
"storageserver.mapKeyValues.AfterBatch");
subqueries.clear();
for (int i = 0; i + offset < sz && i < SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE; i++) {
result.data.push_back(result.arena, kvms[i]);
}
}
if (pOriginalReq->options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", pOriginalReq->options.debugID.get().first(), "storageserver.mapKeyValues.AfterAll");
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
pOriginalReq->options.get().debugID.get().first(),
"storageserver.mapKeyValues.AfterAll");
return result;
}
@ -4409,7 +4417,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
{
state Span span("SS:getMappedKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state RangeReadOptions options = req.options;
state Optional<RangeReadOptions> options = req.options;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4424,16 +4432,16 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.type == ReadType::FETCH) {
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(data->getQueryDelay());
}
try {
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getMappedKeyValues.Before");
"TransactionDebug", req.options.get().debugID.get().first(), "storageserver.getMappedKeyValues.Before");
// VERSION_VECTOR change
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context));
@ -4449,9 +4457,10 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// try {
state KeyRange shard = getShardKeyRange(data, req.begin);
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getMappedKeyValues.AfterVersion");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storageserver.getMappedKeyValues.AfterVersion");
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin",
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
@ -4491,9 +4500,10 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
}
}
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getMappedKeyValues.AfterKeys");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storageserver.getMappedKeyValues.AfterKeys");
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
@ -4511,9 +4521,10 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
}
if (begin >= end) {
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getMappedKeyValues.Send");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storageserver.getMappedKeyValues.Send");
//.detail("Begin",begin).detail("End",end);
GetMappedKeyValuesReply none;
@ -4548,9 +4559,9 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
throw;
}
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"storageserver.getMappedKeyValues.AfterReadRange");
//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
data->checkChangeCounter(
@ -4617,7 +4628,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
{
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
state int64_t resultSize = 0;
state RangeReadOptions options = req.options;
state Optional<RangeReadOptions> options = req.options;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4631,16 +4642,16 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.type == ReadType::FETCH) {
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(delay(0, TaskPriority::DefaultEndpoint));
}
try {
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValuesStream.Before");
"TransactionDebug", req.options.get().debugID.get().first(), "storageserver.getKeyValuesStream.Before");
Version commitVersion = getLatestCommitVersion(req.ssLatestCommitVersions, data->tag);
state Version version = wait(waitForVersion(data, commitVersion, req.version, span.context));
@ -4656,9 +4667,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// try {
state KeyRange shard = getShardKeyRange(data, req.begin);
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storageserver.getKeyValuesStream.AfterVersion");
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin",
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
@ -4684,9 +4696,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
: findKey(data, req.end, version, searchRange, &offset2, span.context, options);
state Key begin = wait(fBegin);
state Key end = wait(fEnd);
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValuesStream.AfterKeys");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storageserver.getKeyValuesStream.AfterKeys");
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
@ -4704,9 +4717,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
}
if (begin >= end) {
if (req.options.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.options.debugID.get().first(), "storageserver.getKeyValuesStream.Send");
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.get().debugID.get().first(),
"storageserver.getKeyValuesStream.Send");
//.detail("Begin",begin).detail("End",end);
GetKeyValuesStreamReply none;
@ -4742,9 +4756,9 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
tenantPrefix));
GetKeyValuesStreamReply r(_r);
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"storageserver.getKeyValuesStream.AfterReadRange");
//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
data->checkChangeCounter(
@ -4799,7 +4813,8 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
end = lastKey;
}
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.type == ReadType::FETCH) {
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() &&
req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(delay(0, TaskPriority::DefaultEndpoint));
@ -4829,7 +4844,9 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
}
state int64_t resultSize = 0;
state RangeReadOptions options;
options = req.options;
if (req.options.present()) {
options = req.options.get();
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
++data->counters.getKeyQueries;
@ -4855,7 +4872,13 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
KeyRangeRef searchRange = data->clampRangeToTenant(shard, tenantEntry, req.arena);
state int offset;
Key absoluteKey = wait(findKey(data, req.sel, version, searchRange, &offset, req.spanContext, options));
Key absoluteKey = wait(findKey(data,
req.sel,
version,
searchRange,
&offset,
req.spanContext,
req.options.present() ? options : Optional<RangeReadOptions>()));
data->checkChangeCounter(changeCounter,
KeyRangeRef(std::min<KeyRef>(req.sel.getKey(), absoluteKey),
@ -9761,9 +9784,9 @@ ACTOR Future<Void> serveGetValueRequests(StorageServer* self, FutureStream<GetVa
GetValueRequest req = waitNext(getValue);
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so
// downgrade before doing real work
if (req.options.debugID.present())
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.options.debugID.get().first(),
req.options.get().debugID.get().first(),
"storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))