update trState ReadOption to optional

This commit is contained in:
Fuheng Zhao 2022-08-24 15:04:17 -07:00
parent 4e748d6bed
commit 7c6dbaf3cf
14 changed files with 149 additions and 159 deletions

View File

@ -230,9 +230,9 @@ void DatabaseContext::getLatestCommitVersions(const Reference<LocationInfo>& loc
VersionVector& latestCommitVersions) {
latestCommitVersions.clear();
if (info->readOptions.debugID.present()) {
if (info->readOptions.present() && info->readOptions.get().debugID.present()) {
g_traceBatch.addEvent(
"TransactionDebug", info->readOptions.debugID.get().first(), "NativeAPI.getLatestCommitVersions");
"TransactionDebug", info->readOptions.get().debugID.get().first(), "NativeAPI.getLatestCommitVersions");
}
if (!info->readVersionObtainedFromGrvProxy) {
@ -2988,7 +2988,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Reference<TransactionState> trState,
key,
member,
trState->spanContext,
trState->readOptions.debugID,
trState->readOptions.present() ? trState->readOptions.get().debugID : Optional<UID>(),
trState->useProvisionalProxies,
isBackward,
version);
@ -3129,7 +3129,7 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Reference<Transac
reverse,
member,
trState->spanContext,
trState->readOptions.debugID,
trState->readOptions.present() ? trState->readOptions.get().debugID : Optional<UID>(),
trState->useProvisionalProxies,
version);
@ -3151,16 +3151,16 @@ ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange
state Version version = wait(fVersion);
loop {
std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations_internal(trState->cx,
trState->getTenantInfo(),
keys,
CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT,
Reverse::False,
trState->spanContext,
trState->readOptions.debugID,
trState->useProvisionalProxies,
version));
std::vector<KeyRangeLocationInfo> locations = wait(getKeyRangeLocations_internal(
trState->cx,
trState->getTenantInfo(),
keys,
CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT,
Reverse::False,
trState->spanContext,
trState->readOptions.present() ? trState->readOptions.get().debugID : Optional<UID>(),
trState->useProvisionalProxies,
version));
totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT;
totalRequests++;
if (locations.size() == 0 || totalRanges >= trState->cx->locationCacheSize ||
@ -3318,14 +3318,16 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
state uint64_t startTime;
state double startTimeD;
state VersionVector ssLatestCommitVersions;
state ReadOptions readOptions = trState->readOptions;
state Optional<ReadOptions> readOptions = trState->readOptions;
trState->cx->getLatestCommitVersions(locationInfo.locations, ver, trState, ssLatestCommitVersions);
try {
if (trState->readOptions.debugID.present()) {
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
getValueID = nondeterministicRandom()->randomUniqueID();
readOptions.get().debugID = getValueID;
g_traceBatch.addAttach(
"GetValueAttachID", trState->readOptions.debugID.get().first(), getValueID.get().first());
"GetValueAttachID", trState->readOptions.get().debugID.get().first(), getValueID.get().first());
g_traceBatch.addEvent("GetValueDebug",
getValueID.get().first(),
"NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
@ -3334,7 +3336,7 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
.detail("ReqVersion", ver)
.detail("Servers", describe(ssi.second->get()));*/
}
readOptions.debugID = getValueID;
++trState->cx->getValueSubmitted;
startTime = timer_int();
startTimeD = now();
@ -3434,11 +3436,15 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
wait(success(version));
state Optional<UID> getKeyID;
state Span span("NAPI:getKey"_loc, trState->spanContext);
if (trState->readOptions.debugID.present()) {
getKeyID = nondeterministicRandom()->randomUniqueID();
state Optional<ReadOptions> readOptions = trState->readOptions;
g_traceBatch.addAttach("GetKeyAttachID", trState->readOptions.debugID.get().first(), getKeyID.get().first());
state Span span("NAPI:getKey"_loc, trState->spanContext);
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
getKeyID = nondeterministicRandom()->randomUniqueID();
readOptions.get().debugID = getKeyID;
g_traceBatch.addAttach(
"GetKeyAttachID", trState->readOptions.get().debugID.get().first(), getKeyID.get().first());
g_traceBatch.addEvent(
"GetKeyDebug",
getKeyID.get().first(),
@ -3466,10 +3472,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
state VersionVector ssLatestCommitVersions;
trState->cx->getLatestCommitVersions(locationInfo.locations, version.get(), trState, ssLatestCommitVersions);
state ReadOptions readOptions = trState->readOptions;
readOptions.debugID = getKeyID;
state bool sendReadOption =
readOptions.debugID.present() || readOptions.type != ReadType::NORMAL || readOptions.cacheResult != true;
try {
if (getKeyID.present())
g_traceBatch.addEvent(
@ -3484,7 +3487,7 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
k,
version.get(),
trState->cx->sampleReadTags() ? trState->options.readTags : Optional<TagSet>(),
sendReadOption ? readOptions : Optional<ReadOptions>(),
readOptions,
ssLatestCommitVersions);
req.arena.dependsOn(k.arena());
@ -3943,17 +3946,13 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
// FIXME: buggify byte limits on internal functions that use them, instead of globally
req.tags = trState->cx->sampleReadTags() ? trState->options.readTags : Optional<TagSet>();
if (trState->readOptions.type != ReadType::NORMAL || trState->readOptions.cacheResult != true ||
trState->readOptions.debugID.present()) {
RangeReadOptions rangeOptions;
rangeOptions = trState->readOptions;
req.options = Optional<RangeReadOptions>(rangeOptions);
}
req.options = trState->readOptions;
try {
if (trState->readOptions.debugID.present()) {
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
g_traceBatch.addEvent("TransactionDebug",
trState->readOptions.debugID.get().first(),
trState->readOptions.get().debugID.get().first(),
"NativeAPI.getExactRange.Before");
/*TraceEvent("TransactionDebugGetExactRangeInfo", trState->readOptions.debugID.get())
.detail("ReqBeginKey", req.begin.getKey())
@ -3985,9 +3984,9 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
++trState->cx->transactionPhysicalReadsCompleted;
throw;
}
if (trState->readOptions.debugID.present())
if (trState->readOptions.present() && trState->readOptions.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
trState->readOptions.debugID.get().first(),
trState->readOptions.get().debugID.get().first(),
"NativeAPI.getExactRange.After");
output.arena().dependsOn(rep.arena);
output.append(output.arena(), rep.data.begin(), rep.data.size());
@ -4316,12 +4315,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
req.arena.dependsOn(mapper.arena());
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
req.tenantInfo = useTenant ? trState->getTenantInfo() : TenantInfo();
if (trState->readOptions.type != ReadType::NORMAL || trState->readOptions.cacheResult != true ||
trState->readOptions.debugID.present()) {
RangeReadOptions rangeOptions;
rangeOptions = trState->readOptions;
req.options = Optional<RangeReadOptions>(rangeOptions);
}
req.options = trState->readOptions;
req.version = readVersion;
trState->cx->getLatestCommitVersions(
@ -4361,9 +4355,10 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
req.tags = trState->cx->sampleReadTags() ? trState->options.readTags : Optional<TagSet>();
req.spanContext = span.context;
try {
if (trState->readOptions.debugID.present()) {
g_traceBatch.addEvent(
"TransactionDebug", trState->readOptions.debugID.get().first(), "NativeAPI.getRange.Before");
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
g_traceBatch.addEvent("TransactionDebug",
trState->readOptions.get().debugID.get().first(),
"NativeAPI.getRange.Before");
/*TraceEvent("TransactionDebugGetRangeInfo", trState->readOptions.debugID.get())
.detail("ReqBeginKey", req.begin.getKey())
.detail("ReqEndKey", req.end.getKey())
@ -4403,9 +4398,9 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
throw;
}
if (trState->readOptions.debugID.present()) {
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
g_traceBatch.addEvent("TransactionDebug",
trState->readOptions.debugID.get().first(),
trState->readOptions.get().debugID.get().first(),
"NativeAPI.getRange.After"); //.detail("SizeOf", rep.data.size());
/*TraceEvent("TransactionDebugGetRangeDone", trState->readOptions.debugID.get())
.detail("ReqBeginKey", req.begin.getKey())
@ -4519,10 +4514,11 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
}
} catch (Error& e) {
if (trState->readOptions.debugID.present()) {
g_traceBatch.addEvent(
"TransactionDebug", trState->readOptions.debugID.get().first(), "NativeAPI.getRange.Error");
TraceEvent("TransactionDebugError", trState->readOptions.debugID.get()).error(e);
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
g_traceBatch.addEvent("TransactionDebug",
trState->readOptions.get().debugID.get().first(),
"NativeAPI.getRange.Error");
TraceEvent("TransactionDebugError", trState->readOptions.get().debugID.get()).error(e);
}
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
(e.code() == error_code_transaction_too_old && readVersion == latestVersion)) {
@ -4774,12 +4770,7 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
req.spanContext = spanContext;
req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.limitBytes = std::numeric_limits<int>::max();
if (trState->readOptions.type != ReadType::NORMAL || trState->readOptions.cacheResult != true ||
trState->readOptions.debugID.present()) {
RangeReadOptions rangeOptions;
rangeOptions = trState->readOptions;
req.options = Optional<RangeReadOptions>(rangeOptions);
}
req.options = trState->readOptions;
trState->cx->getLatestCommitVersions(
locations[shard].locations, req.version, trState, req.ssLatestCommitVersions);
@ -4793,9 +4784,10 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
req.tags = trState->cx->sampleReadTags() ? trState->options.readTags : Optional<TagSet>();
try {
if (trState->readOptions.debugID.present()) {
g_traceBatch.addEvent(
"TransactionDebug", trState->readOptions.debugID.get().first(), "NativeAPI.RangeStream.Before");
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
g_traceBatch.addEvent("TransactionDebug",
trState->readOptions.get().debugID.get().first(),
"NativeAPI.RangeStream.Before");
}
++trState->cx->transactionPhysicalReads;
state GetKeyValuesStreamReply rep;
@ -4889,9 +4881,9 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
}
rep = GetKeyValuesStreamReply();
}
if (trState->readOptions.debugID.present())
if (trState->readOptions.present() && trState->readOptions.get().debugID.present())
g_traceBatch.addEvent("TransactionDebug",
trState->readOptions.debugID.get().first(),
trState->readOptions.get().debugID.get().first(),
"NativeAPI.getExactRange.After");
RangeResult output(RangeResultRef(rep.data, rep.more), rep.arena);
@ -5371,7 +5363,7 @@ Future<Void> Transaction::watch(Reference<Watch> watch) {
trState->options.readTags,
trState->spanContext,
trState->taskID,
trState->readOptions.debugID,
trState->readOptions.present() ? trState->readOptions.get().debugID : Optional<UID>(),
trState->useProvisionalProxies);
}
@ -6037,16 +6029,17 @@ void Transaction::setupWatches() {
Future<Version> watchVersion = getCommittedVersion() > 0 ? getCommittedVersion() : getReadVersion();
for (int i = 0; i < watches.size(); ++i)
watches[i]->setWatch(watchValueMap(watchVersion,
trState->getTenantInfo(),
watches[i]->key,
watches[i]->value,
trState->cx,
trState->options.readTags,
trState->spanContext,
trState->taskID,
trState->readOptions.debugID,
trState->useProvisionalProxies));
watches[i]->setWatch(
watchValueMap(watchVersion,
trState->getTenantInfo(),
watches[i]->key,
watches[i]->value,
trState->cx,
trState->options.readTags,
trState->spanContext,
trState->taskID,
trState->readOptions.present() ? trState->readOptions.get().debugID : Optional<UID>(),
trState->useProvisionalProxies));
watches.clear();
} catch (Error&) {
@ -6167,7 +6160,7 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
state TraceInterval interval("TransactionCommit");
state double startTime = now();
state Span span("NAPI:tryCommit"_loc, trState->spanContext);
state Optional<UID> debugID = trState->readOptions.debugID;
state Optional<UID> debugID = trState->readOptions.present() ? trState->readOptions.get().debugID : Optional<UID>();
state TenantPrefixPrepended tenantPrefixPrepended = TenantPrefixPrepended::False;
if (debugID.present()) {
TraceEvent(interval.begin()).detail("Parent", debugID.get());
@ -6567,10 +6560,10 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
makeReference<TransactionLogInfo>(value.get().printable(), TransactionLogInfo::DONT_LOG);
trState->trLogInfo->maxFieldLength = trState->options.maxTransactionLoggingFieldLength;
}
if (trState->readOptions.debugID.present()) {
if (trState->readOptions.present() && trState->readOptions.get().debugID.present()) {
TraceEvent(SevInfo, "TransactionBeingTraced")
.detail("DebugTransactionID", trState->trLogInfo->identifier)
.detail("ServerTraceID", trState->readOptions.debugID.get());
.detail("ServerTraceID", trState->readOptions.get().debugID.get());
}
break;
@ -6602,10 +6595,11 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
case FDBTransactionOptions::SERVER_REQUEST_TRACING:
validateOptionValueNotPresent(value);
debugTransaction(deterministicRandom()->randomUniqueID());
if (trState->trLogInfo && !trState->trLogInfo->identifier.empty()) {
if (trState->trLogInfo && !trState->trLogInfo->identifier.empty() && trState->readOptions.present() &&
trState->readOptions.get().debugID.present()) {
TraceEvent(SevInfo, "TransactionBeingTraced")
.detail("DebugTransactionID", trState->trLogInfo->identifier)
.detail("ServerTraceID", trState->readOptions.debugID.get());
.detail("ServerTraceID", trState->readOptions.get().debugID.get());
}
break;
@ -7073,8 +7067,9 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
Location location = "NAPI:getReadVersion"_loc;
SpanContext spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanContext);
auto const req =
DatabaseContext::VersionRequest(spanContext, trState->options.tags, trState->readOptions.debugID);
Optional<UID> versionDebugID =
trState->readOptions.present() ? trState->readOptions.get().debugID : Optional<UID>();
auto const req = DatabaseContext::VersionRequest(spanContext, trState->options.tags, versionDebugID);
batcher.stream.send(req);
trState->startTime = now();
readVersion = extractReadVersion(trState, location, spanContext, req.reply.getFuture(), metadataVersion);
@ -7641,14 +7636,15 @@ ACTOR Future<TenantMapEntry> blobGranuleGetTenantEntry(Transaction* self, Key ra
Optional<KeyRangeLocationInfo> cachedLocationInfo =
self->trState->cx->getCachedLocation(self->getTenant().get(), rangeStartKey, Reverse::False);
if (!cachedLocationInfo.present()) {
KeyRangeLocationInfo l = wait(getKeyLocation_internal(self->trState->cx,
self->trState->getTenantInfo(AllowInvalidTenantID::True),
rangeStartKey,
self->trState->spanContext,
self->trState->readOptions.debugID,
self->trState->useProvisionalProxies,
Reverse::False,
latestVersion));
KeyRangeLocationInfo l = wait(getKeyLocation_internal(
self->trState->cx,
self->trState->getTenantInfo(AllowInvalidTenantID::True),
rangeStartKey,
self->trState->spanContext,
self->trState->readOptions.present() ? self->trState->readOptions.get().debugID : Optional<UID>(),
self->trState->useProvisionalProxies,
Reverse::False,
latestVersion));
self->trState->trySetTenantId(l.tenantEntry.id);
return l.tenantEntry;
} else {

View File

@ -1522,6 +1522,9 @@ enum class ReadType {
NORMAL,
HIGH,
};
FDB_DECLARE_BOOLEAN_PARAM(CacheResult);
// store options for storage engine read
// ReadType describes the usage and priority of the read
// cacheResult determines whether the storage engine cache for this read
@ -1529,30 +1532,17 @@ enum class ReadType {
// debugID helps to trace the path of the read
struct ReadOptions {
ReadType type;
Optional<UID> debugID;
bool cacheResult;
ReadOptions() : type(ReadType::NORMAL), cacheResult(true){};
ReadOptions(ReadType type, bool cacheResult) : type(type), cacheResult(cacheResult){};
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, type, cacheResult, debugID);
}
};
struct RangeReadOptions : ReadOptions {
Optional<UID> debugID;
Optional<Version> consistencyCheckStartVersion;
RangeReadOptions() : ReadOptions(){};
ReadOptions() : type(ReadType::NORMAL), cacheResult(CacheResult::True){};
RangeReadOptions& operator=(const ReadOptions& option) {
type = option.type;
debugID = option.debugID;
cacheResult = option.cacheResult;
return *this;
}
ReadOptions(Optional<UID> debugID,
ReadType type = ReadType::NORMAL,
CacheResult cache = CacheResult::False,
Optional<Version> version = Optional<Version>())
: type(type), cacheResult(cache), debugID(debugID), consistencyCheckStartVersion(version){};
template <class Ar>
void serialize(Ar& ar) {

View File

@ -242,7 +242,7 @@ struct TransactionState : ReferenceCounted<TransactionState> {
Optional<Standalone<StringRef>> authToken;
Reference<TransactionLogInfo> trLogInfo;
TransactionOptions options;
ReadOptions readOptions;
Optional<ReadOptions> readOptions;
TaskPriority taskID;
SpanContext spanContext;
@ -455,7 +455,13 @@ public:
void fullReset();
double getBackoff(int errCode);
void debugTransaction(UID dID) { trState->readOptions.debugID = dID; }
void debugTransaction(UID dID) {
if (trState->readOptions.present()) {
trState->readOptions.get().debugID = dID;
} else {
trState->readOptions = ReadOptions(dID);
}
}
VersionVector getVersionVector() const;
SpanContext getSpanContext() const { return trState->spanContext; }

View File

@ -394,7 +394,7 @@ struct GetKeyValuesRequest : TimedRequest {
int limit, limitBytes;
Optional<TagSet> tags;
Optional<UID> debugID;
Optional<RangeReadOptions> options;
Optional<ReadOptions> options;
ReplyPromise<GetKeyValuesReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
@ -452,7 +452,7 @@ struct GetMappedKeyValuesRequest : TimedRequest {
int limit, limitBytes;
int matchIndex;
Optional<TagSet> tags;
Optional<RangeReadOptions> options;
Optional<ReadOptions> options;
ReplyPromise<GetMappedKeyValuesReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
@ -518,7 +518,7 @@ struct GetKeyValuesStreamRequest {
Version version; // or latestVersion
int limit, limitBytes;
Optional<TagSet> tags;
Optional<RangeReadOptions> options;
Optional<ReadOptions> options;
ReplyPromiseStream<GetKeyValuesStreamReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that

View File

@ -73,7 +73,7 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) override {
Optional<ReadOptions> options = Optional<ReadOptions>()) override {
return doReadRange(store, keys, rowLimit, byteLimit, options);
}
@ -102,7 +102,7 @@ private:
KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) {
Optional<ReadOptions> options) {
RangeResult _vs = wait(store->readRange(keys, rowLimit, byteLimit, options));
RangeResult vs = _vs; // Get rid of implicit const& from wait statement
Arena& a = vs.arena();

View File

@ -232,7 +232,7 @@ public:
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) override {
Optional<ReadOptions> options) override {
if (recovering.isError())
throw recovering.getError();
if (!recovering.isReady())
@ -943,7 +943,7 @@ private:
KeyRange keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) {
Optional<ReadOptions> options) {
wait(self->recovering);
return static_cast<IKeyValueStore*>(self)->readRange(keys, rowLimit, byteLimit, options).get();
}

View File

@ -1951,7 +1951,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) override {
Optional<ReadOptions> options) override {
ReadType type = ReadType::NORMAL;
if (options.present()) {

View File

@ -1594,7 +1594,7 @@ public:
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) override;
Optional<ReadOptions> options) override;
KeyValueStoreSQLite(std::string const& filename,
UID logID,
@ -2241,7 +2241,7 @@ Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key, int max
Future<RangeResult> KeyValueStoreSQLite::readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) {
Optional<ReadOptions> options) {
++readsRequested;
auto p = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto f = p->result.getFuture();

View File

@ -2398,7 +2398,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) override {
Optional<ReadOptions> options = Optional<ReadOptions>()) override {
TraceEvent(SevVerbose, "ShardedRocksReadRangeBegin", this->id).detail("Range", keys);
auto shards = shardManager.getDataShardsByRange(keys);

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbserver/OTELSpanContextMessage.h"
#include "flow/Arena.h"
#include "fdbclient/FDBOptions.g.h"
@ -42,6 +43,8 @@
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
FDB_DEFINE_BOOLEAN_PARAM(CacheResult);
// TODO storageCache server shares quite a bit of storageServer functionality, although simplified
// Need to look into refactoring common code out for better code readability and to avoid duplication
@ -1187,6 +1190,7 @@ ACTOR Future<RangeResult> tryFetchRange(Database cx,
state RangeResult output;
state KeySelectorRef begin = firstGreaterOrEqual(keys.begin);
state KeySelectorRef end = firstGreaterOrEqual(keys.end);
state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::FETCH);
if (*isTooOld)
throw transaction_too_old();
@ -1194,8 +1198,7 @@ ACTOR Future<RangeResult> tryFetchRange(Database cx,
ASSERT(!cx->switchable);
tr.setVersion(version);
tr.trState->taskID = TaskPriority::FetchKeys;
tr.trState->readOptions.type = ReadType::FETCH;
tr.trState->readOptions.cacheResult = false;
tr.trState->readOptions = options;
limits.minRows = 0;
try {

View File

@ -7798,7 +7798,7 @@ public:
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) override {
Optional<ReadOptions> options) override {
debug_printf("READRANGE %s\n", printable(keys).c_str());
return catchError(readRange_impl(this, keys, rowLimit, byteLimit, options));
}
@ -7807,18 +7807,14 @@ public:
KeyRange keys,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options) {
Optional<ReadOptions> options) {
state PagerEventReasons reason = PagerEventReasons::RangeRead;
state VersionedBTree::BTreeCursor cur;
if (options.present()) {
state ReadOptions rOptions = options.get();
if (options.get().type == ReadType::FETCH) {
reason = PagerEventReasons::FetchRange;
}
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), reason, rOptions));
} else {
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), reason));
if (options.present() && options.get().type == ReadType::FETCH) {
reason = PagerEventReasons::FetchRange;
}
wait(self->m_tree->initBTreeCursor(&cur, self->m_tree->getLastCommittedVersion(), reason, options));
state PriorityMultiLock::Lock lock;
state Future<Void> f;
++g_redwoodMetrics.metric.opGetRange;
@ -11007,7 +11003,7 @@ ACTOR Future<Void> randomRangeScans(IKeyValueStore* kvs,
bool singlePrefix,
int rowLimit,
int byteLimit,
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) {
Optional<ReadOptions> options = Optional<ReadOptions>()) {
fmt::print("\nstoreType: {}\n", static_cast<int>(kvs->getType()));
fmt::print("prefixSource: {}\n", source.toString());
fmt::print("suffixSize: {}\n", suffixSize);

View File

@ -82,7 +82,7 @@ public:
virtual Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) = 0;
Optional<ReadOptions> options = Optional<ReadOptions>()) = 0;
// Shard management APIs.
// Adds key range to a physical shard.

View File

@ -244,7 +244,7 @@ struct IKVSReadRangeRequest {
KeyRangeRef keys;
int rowLimit;
int byteLimit;
Optional<RangeReadOptions> options;
Optional<ReadOptions> options;
ReplyPromise<IKVSReadRangeReply> reply;
template <class Ar>
@ -414,7 +414,7 @@ struct RemoteIKeyValueStore : public IKeyValueStore {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) override {
Optional<ReadOptions> options = Optional<ReadOptions>()) override {
IKVSReadRangeRequest req{ keys, rowLimit, byteLimit, options, ReplyPromise<IKVSReadRangeReply>() };
return fmap([](const IKVSReadRangeReply& reply) { return reply.toRangeResult(); },
interf.readRange.getReply(req));

View File

@ -399,7 +399,7 @@ struct StorageServerDisk {
// - "a", if key "a" exist
// - "b", if key "a" doesn't exist, and "b" is the next existing key in total order
// - allKeys.end, if keyrange [a, allKeys.end) is empty
Future<Key> readNextKeyInclusive(KeyRef key, Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) {
Future<Key> readNextKeyInclusive(KeyRef key, Optional<ReadOptions> options = Optional<ReadOptions>()) {
++(*kvScans);
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), options);
}
@ -416,7 +416,7 @@ struct StorageServerDisk {
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit = 1 << 30,
int byteLimit = 1 << 30,
Optional<RangeReadOptions> options = Optional<RangeReadOptions>()) {
Optional<ReadOptions> options = Optional<ReadOptions>()) {
++(*kvScans);
return storage->readRange(keys, rowLimit, byteLimit, options);
}
@ -445,9 +445,7 @@ private:
IKeyValueStore* storage;
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage,
KeyRangeRef range,
Optional<RangeReadOptions> options) {
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, Optional<ReadOptions> options) {
RangeResult r = wait(storage->readRange(range, 1, 1 << 30, options));
if (r.size())
return r[0].key;
@ -2495,7 +2493,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state Version startVersion = data->version.get();
// TODO: Change feed reads should probably at least set cacheResult to false, possibly set a different ReadType as
// well, perhaps high priority?
state RangeReadOptions options;
state ReadOptions options;
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedMutationsBegin", data->thisServerID)
@ -3234,7 +3232,7 @@ ACTOR Future<GetValueReqAndResultRef> quickGetValue(StorageServer* data,
state GetValueReqAndResultRef getValue;
state double getValueStart = g_network->timer();
getValue.key = key;
state Optional<ReadOptions> options = pOriginalReq->options.castTo<ReadOptions>();
state Optional<ReadOptions> options = pOriginalReq->options;
if (data->shards[key]->isReadable()) {
try {
@ -3291,7 +3289,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
int limit,
int* pLimitBytes,
SpanContext parentSpan,
Optional<RangeReadOptions> options,
Optional<ReadOptions> options,
Optional<Key> tenantPrefix) {
state GetKeyValuesReply result;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
@ -3532,7 +3530,7 @@ ACTOR Future<Key> findKey(StorageServer* data,
KeyRange range,
int* pOffset,
SpanContext parentSpan,
Optional<RangeReadOptions> options)
Optional<ReadOptions> options)
// Attempts to find the key indicated by sel in the data at version, within range.
// Precondition: selectorInRange(sel, range)
// If it is found, offset is set to 0 and a key is returned which falls inside range.
@ -3670,7 +3668,7 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
{
state Span span("SS:getKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<RangeReadOptions> options = req.options;
state Optional<ReadOptions> options = req.options;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4417,7 +4415,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
{
state Span span("SS:getMappedKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<RangeReadOptions> options = req.options;
state Optional<ReadOptions> options = req.options;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4628,7 +4626,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
{
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<RangeReadOptions> options = req.options;
state Optional<ReadOptions> options = req.options;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4843,7 +4841,7 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
state int64_t resultSize = 0;
state RangeReadOptions options;
state ReadOptions options;
if (req.options.present()) {
options = req.options.get();
}
@ -4878,7 +4876,7 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
searchRange,
&offset,
req.spanContext,
req.options.present() ? options : Optional<RangeReadOptions>()));
req.options.present() ? options : Optional<ReadOptions>()));
data->checkChangeCounter(changeCounter,
KeyRangeRef(std::min<KeyRef>(req.sel.getKey(), absoluteKey),
@ -4973,7 +4971,7 @@ void getQueuingMetrics(StorageServer* self, StorageQueuingMetricsRequest const&
ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager) {
eager->finishKeyBegin();
state RangeReadOptions options;
state ReadOptions options;
options.type = ReadType::EAGER;
if (SERVER_KNOBS->ENABLE_CLEAR_RANGE_EAGER_READS) {
std::vector<Future<Key>> keyEnd(eager->keyBegin.size());
@ -6347,6 +6345,12 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state int debug_nextRetryToLog = 1;
state Error lastError;
// TODO: update to FETCH once the priority multi lock is used.
// leaving the readtype off for now to prevent data fetches stall under heavy load
// it is used to inform the storage that the rangeRead is for Fetch
// state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::FETCH);
state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::NORMAL);
// FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server
// we must refresh the cache manually.
data->cx->invalidateCache(Key(), keys);
@ -6401,12 +6405,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
shard->updates.pop_front();
tr.setVersion(fetchVersion);
tr.trState->taskID = TaskPriority::FetchKeys;
// TODO: update to FETCH once the priority multi lock is used.
// leaving the readtype off for now to prevent data fetches stall under heavy load
// it is used to inform the storage that the rangeRead is for Fetch
// tr.trState->readOptions.type = ReadType::FETCH;
tr.trState->readOptions.type = ReadType::NORMAL;
tr.trState->readOptions.cacheResult = false;
tr.trState->readOptions = options;
state PromiseStream<RangeResult> results;
state Future<Void> hold = SERVER_KNOBS->FETCH_USING_STREAMING
? tr.getRangeStream(results, keys, GetRangeLimits(), Snapshot::True)