Merge pull request #6165 from sfc-gh-ajbeamon/native-api-refactor
Refactor Native API Transactions
This commit is contained in:
commit
07e5319477
|
@ -786,7 +786,7 @@ struct CopyLogRangeTaskFunc : TaskFuncBase {
|
|||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
|
||||
tr.trState->options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
|
||||
wait(checkDatabaseLock(&tr,
|
||||
BinaryReader::fromStringRef<UID>(
|
||||
task->params[BackupAgentBase::keyConfigLogUid], Unversioned())));
|
||||
|
@ -1531,7 +1531,7 @@ struct OldCopyLogRangeTaskFunc : TaskFuncBase {
|
|||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
|
||||
tr.trState->options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
|
||||
wait(checkDatabaseLock(&tr,
|
||||
BinaryReader::fromStringRef<UID>(
|
||||
task->params[BackupAgentBase::keyConfigLogUid], Unversioned())));
|
||||
|
|
|
@ -132,19 +132,39 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
struct WatchParameters : public ReferenceCounted<WatchParameters> {
|
||||
const Key key;
|
||||
const Optional<Value> value;
|
||||
|
||||
const Version version;
|
||||
const TagSet tags;
|
||||
const SpanID spanID;
|
||||
const TaskPriority taskID;
|
||||
const Optional<UID> debugID;
|
||||
const UseProvisionalProxies useProvisionalProxies;
|
||||
|
||||
WatchParameters(Key key,
|
||||
Optional<Value> value,
|
||||
Version version,
|
||||
TagSet tags,
|
||||
SpanID spanID,
|
||||
TaskPriority taskID,
|
||||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies)
|
||||
: key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID), debugID(debugID),
|
||||
useProvisionalProxies(useProvisionalProxies) {}
|
||||
};
|
||||
|
||||
class WatchMetadata : public ReferenceCounted<WatchMetadata> {
|
||||
public:
|
||||
Key key;
|
||||
Optional<Value> value;
|
||||
Version version;
|
||||
Promise<Version> watchPromise;
|
||||
Future<Version> watchFuture;
|
||||
Future<Void> watchFutureSS;
|
||||
|
||||
TransactionInfo info;
|
||||
TagSet tags;
|
||||
Reference<const WatchParameters> parameters;
|
||||
|
||||
WatchMetadata(Key key, Optional<Value> value, Version version, TransactionInfo info, TagSet tags);
|
||||
WatchMetadata(Reference<const WatchParameters> parameters)
|
||||
: watchFuture(watchPromise.getFuture()), parameters(parameters) {}
|
||||
};
|
||||
|
||||
struct MutationAndVersionStream {
|
||||
|
@ -225,12 +245,25 @@ public:
|
|||
bool sampleOnCost(uint64_t cost) const;
|
||||
|
||||
void updateProxies();
|
||||
Reference<CommitProxyInfo> getCommitProxies(bool useProvisionalProxies);
|
||||
Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(bool useProvisionalProxies);
|
||||
Reference<GrvProxyInfo> getGrvProxies(bool useProvisionalProxies);
|
||||
Reference<CommitProxyInfo> getCommitProxies(UseProvisionalProxies useProvisionalProxies);
|
||||
Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(UseProvisionalProxies useProvisionalProxies);
|
||||
Reference<GrvProxyInfo> getGrvProxies(UseProvisionalProxies useProvisionalProxies);
|
||||
Future<Void> onProxiesChanged() const;
|
||||
Future<Void> onClientLibStatusChanged() const;
|
||||
Future<HealthMetrics> getHealthMetrics(bool detailed);
|
||||
// Pass a negative value for `shardLimit` to indicate no limit on the shard number.
|
||||
Future<StorageMetrics> getStorageMetrics(KeyRange const& keys, int shardLimit);
|
||||
Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(KeyRange const& keys,
|
||||
StorageMetrics const& min,
|
||||
StorageMetrics const& max,
|
||||
StorageMetrics const& permittedError,
|
||||
int shardLimit,
|
||||
int expectedShardCount);
|
||||
Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(KeyRange const& keys,
|
||||
StorageMetrics const& limit,
|
||||
StorageMetrics const& estimated);
|
||||
|
||||
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(KeyRange const& keys);
|
||||
|
||||
// Returns the protocol version reported by the coordinator this client is connected to
|
||||
// If an expected version is given, the future won't return until the protocol version is different than expected
|
||||
|
@ -498,7 +531,7 @@ public:
|
|||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||
|
||||
private:
|
||||
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
|
||||
std::unordered_map<Key, Reference<WatchMetadata>> watchMap;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -46,6 +46,8 @@
|
|||
(getSBVar(__FILE__, __LINE__, BuggifyType::Client) && deterministicRandom()->random01() < (x))
|
||||
#define CLIENT_BUGGIFY CLIENT_BUGGIFY_WITH_PROB(P_BUGGIFIED_SECTION_FIRES[int(BuggifyType::Client)])
|
||||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(UseProvisionalProxies);
|
||||
|
||||
// Incomplete types that are reference counted
|
||||
class DatabaseContext;
|
||||
template <>
|
||||
|
@ -173,22 +175,6 @@ private:
|
|||
};
|
||||
|
||||
class ReadYourWritesTransaction; // workaround cyclic dependency
|
||||
struct TransactionInfo {
|
||||
Optional<UID> debugID;
|
||||
TaskPriority taskID;
|
||||
SpanID spanID;
|
||||
bool useProvisionalProxies;
|
||||
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
|
||||
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
|
||||
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
|
||||
|
||||
// Only available so that Transaction can have a default constructor, for use in state variables
|
||||
TransactionInfo() : taskID(), spanID(), useProvisionalProxies() {}
|
||||
|
||||
explicit TransactionInfo(TaskPriority taskID, SpanID spanID)
|
||||
: taskID(taskID), spanID(spanID), useProvisionalProxies(false) {}
|
||||
};
|
||||
|
||||
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
|
||||
enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 };
|
||||
|
@ -246,6 +232,34 @@ struct Watch : public ReferenceCounted<Watch>, NonCopyable {
|
|||
void setWatch(Future<Void> watchFuture);
|
||||
};
|
||||
|
||||
struct TransactionState : ReferenceCounted<TransactionState> {
|
||||
Database cx;
|
||||
Reference<TransactionLogInfo> trLogInfo;
|
||||
TransactionOptions options;
|
||||
|
||||
Optional<UID> debugID;
|
||||
TaskPriority taskID;
|
||||
SpanID spanID;
|
||||
UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False;
|
||||
|
||||
int numErrors = 0;
|
||||
double startTime = 0;
|
||||
Promise<Standalone<StringRef>> versionstampPromise;
|
||||
|
||||
Version committedVersion{ invalidVersion };
|
||||
|
||||
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
|
||||
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
|
||||
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
|
||||
|
||||
// Only available so that Transaction can have a default constructor, for use in state variables
|
||||
TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID) {}
|
||||
|
||||
TransactionState(Database cx, TaskPriority taskID, SpanID spanID, Reference<TransactionLogInfo> trLogInfo)
|
||||
: cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID) {}
|
||||
};
|
||||
|
||||
class Transaction : NonCopyable {
|
||||
public:
|
||||
explicit Transaction(Database const& cx);
|
||||
|
@ -298,16 +312,6 @@ public:
|
|||
Snapshot = Snapshot::False,
|
||||
Reverse = Reverse::False);
|
||||
|
||||
private:
|
||||
template <class GetKeyValuesFamilyRequest, class GetKeyValuesFamilyReply>
|
||||
Future<RangeResult> getRangeInternal(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse);
|
||||
|
||||
public:
|
||||
// A method for streaming data from the storage server that is more efficient than getRange when reading large
|
||||
// amounts of data
|
||||
[[nodiscard]] Future<Void> getRangeStream(const PromiseStream<Standalone<RangeResultRef>>& results,
|
||||
|
@ -354,20 +358,7 @@ public:
|
|||
void addWriteConflictRange(KeyRangeRef const& keys);
|
||||
void makeSelfConflicting();
|
||||
|
||||
Future<Void> warmRange(Database cx, KeyRange keys);
|
||||
|
||||
Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(KeyRange const& keys,
|
||||
StorageMetrics const& min,
|
||||
StorageMetrics const& max,
|
||||
StorageMetrics const& permittedError,
|
||||
int shardLimit,
|
||||
int expectedShardCount);
|
||||
// Pass a negative value for `shardLimit` to indicate no limit on the shard number.
|
||||
Future<StorageMetrics> getStorageMetrics(KeyRange const& keys, int shardLimit);
|
||||
Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(KeyRange const& keys,
|
||||
StorageMetrics const& limit,
|
||||
StorageMetrics const& estimated);
|
||||
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(KeyRange const& keys);
|
||||
Future<Void> warmRange(KeyRange keys);
|
||||
|
||||
// Try to split the given range into equally sized chunks based on estimated size.
|
||||
// The returned list would still be in form of [keys.begin, splitPoint1, splitPoint2, ... , keys.end]
|
||||
|
@ -387,20 +378,20 @@ public:
|
|||
AddConflictRange = AddConflictRange::True);
|
||||
void clear(const KeyRangeRef& range, AddConflictRange = AddConflictRange::True);
|
||||
void clear(const KeyRef& key, AddConflictRange = AddConflictRange::True);
|
||||
[[nodiscard]] Future<Void> commit(); // Throws not_committed or commit_unknown_result errors in normal operation
|
||||
|
||||
// Throws not_committed or commit_unknown_result errors in normal operation
|
||||
[[nodiscard]] Future<Void> commit();
|
||||
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
|
||||
|
||||
Version getCommittedVersion() const {
|
||||
return committedVersion;
|
||||
} // May be called only after commit() returns success
|
||||
[[nodiscard]] Future<Standalone<StringRef>>
|
||||
getVersionstamp(); // Will be fulfilled only after commit() returns success
|
||||
// May be called only after commit() returns success
|
||||
Version getCommittedVersion() const { return trState->committedVersion; }
|
||||
|
||||
// Will be fulfilled only after commit() returns success
|
||||
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp();
|
||||
|
||||
Future<uint64_t> getProtocolVersion();
|
||||
|
||||
Promise<Standalone<StringRef>> versionstampPromise;
|
||||
|
||||
uint32_t getSize();
|
||||
[[nodiscard]] Future<Void> onError(Error const& e);
|
||||
void flushTrLogsIfEnabled();
|
||||
|
@ -412,27 +403,17 @@ public:
|
|||
void reset();
|
||||
void fullReset();
|
||||
double getBackoff(int errCode);
|
||||
void debugTransaction(UID dID) { info.debugID = dID; }
|
||||
void debugTransaction(UID dID) { trState->debugID = dID; }
|
||||
|
||||
Future<Void> commitMutations();
|
||||
void setupWatches();
|
||||
void cancelWatches(Error const& e = transaction_cancelled());
|
||||
|
||||
TransactionInfo info;
|
||||
int numErrors;
|
||||
|
||||
std::vector<Reference<Watch>> watches;
|
||||
|
||||
int apiVersionAtLeast(int minVersion) const;
|
||||
|
||||
void checkDeferredError() const;
|
||||
|
||||
Database getDatabase() const { return cx; }
|
||||
Database getDatabase() const { return trState->cx; }
|
||||
static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx);
|
||||
TransactionOptions options;
|
||||
Span span;
|
||||
double startTime;
|
||||
Reference<TransactionLogInfo> trLogInfo;
|
||||
|
||||
void setTransactionID(uint64_t id);
|
||||
void setToken(uint64_t token);
|
||||
|
@ -445,12 +426,22 @@ public:
|
|||
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.write_conflict_ranges, tr.arena);
|
||||
}
|
||||
|
||||
Reference<TransactionState> trState;
|
||||
std::vector<Reference<Watch>> watches;
|
||||
Span span;
|
||||
|
||||
private:
|
||||
Future<Version> getReadVersion(uint32_t flags);
|
||||
Database cx;
|
||||
|
||||
template <class GetKeyValuesFamilyRequest, class GetKeyValuesFamilyReply>
|
||||
Future<RangeResult> getRangeInternal(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
const Key& mapper,
|
||||
GetRangeLimits limits,
|
||||
Snapshot snapshot,
|
||||
Reverse reverse);
|
||||
|
||||
double backoff;
|
||||
Version committedVersion{ invalidVersion };
|
||||
CommitTransactionRequest tr;
|
||||
Future<Version> readVersion;
|
||||
Promise<Optional<Value>> metadataVersion;
|
||||
|
|
|
@ -1653,7 +1653,7 @@ Future<int64_t> ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyR
|
|||
if (resetPromise.isSet())
|
||||
return resetPromise.getFuture().getError();
|
||||
|
||||
return map(waitOrError(tr.getStorageMetrics(keys, -1), resetPromise.getFuture()),
|
||||
return map(waitOrError(tr.getDatabase()->getStorageMetrics(keys, -1), resetPromise.getFuture()),
|
||||
[](const StorageMetrics& m) { return m.bytes; });
|
||||
}
|
||||
|
||||
|
@ -1911,7 +1911,7 @@ void ReadYourWritesTransaction::setToken(uint64_t token) {
|
|||
RangeResult ReadYourWritesTransaction::getReadConflictRangeIntersecting(KeyRangeRef kr) {
|
||||
TEST(true); // Special keys read conflict range
|
||||
ASSERT(readConflictRangeKeysRange.contains(kr));
|
||||
ASSERT(!tr.options.checkWritesEnabled);
|
||||
ASSERT(!tr.trState->options.checkWritesEnabled);
|
||||
RangeResult result;
|
||||
if (!options.readYourWritesDisabled) {
|
||||
kr = kr.removePrefix(readConflictRangeKeysRange.begin);
|
||||
|
|
|
@ -168,7 +168,7 @@ public:
|
|||
|
||||
Database getDatabase() const { return tr.getDatabase(); }
|
||||
|
||||
const TransactionInfo& getTransactionInfo() const { return tr.info; }
|
||||
Reference<const TransactionState> getTransactionState() const { return tr.trState; }
|
||||
|
||||
void setTransactionID(uint64_t id);
|
||||
void setToken(uint64_t token);
|
||||
|
|
|
@ -653,8 +653,8 @@ ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeReadIm
|
|||
|
||||
Future<RangeResult> ConflictingKeysImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
|
||||
RangeResult result;
|
||||
if (ryw->getTransactionInfo().conflictingKeys) {
|
||||
auto krMapPtr = ryw->getTransactionInfo().conflictingKeys.get();
|
||||
if (ryw->getTransactionState()->conflictingKeys) {
|
||||
auto krMapPtr = ryw->getTransactionState()->conflictingKeys.get();
|
||||
auto beginIter = krMapPtr->rangeContaining(kr.begin);
|
||||
if (beginIter->begin() != kr.begin)
|
||||
++beginIter;
|
||||
|
@ -1539,10 +1539,10 @@ Future<RangeResult> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
|
|||
|
||||
if (key.endsWith(kTracingTransactionIdKey)) {
|
||||
result.push_back_deep(result.arena(),
|
||||
KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.first())));
|
||||
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.first())));
|
||||
} else if (key.endsWith(kTracingTokenKey)) {
|
||||
result.push_back_deep(result.arena(),
|
||||
KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.second())));
|
||||
KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.second())));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
|
|
@ -434,10 +434,11 @@ struct BackupData {
|
|||
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
|
||||
choose {
|
||||
when(wait(self->cx->onProxiesChanged())) {}
|
||||
when(GetReadVersionReply reply = wait(basicLoadBalance(self->cx->getGrvProxies(false),
|
||||
&GrvProxyInterface::getConsistentReadVersion,
|
||||
request,
|
||||
self->cx->taskID))) {
|
||||
when(GetReadVersionReply reply =
|
||||
wait(basicLoadBalance(self->cx->getGrvProxies(UseProvisionalProxies::False),
|
||||
&GrvProxyInterface::getConsistentReadVersion,
|
||||
request,
|
||||
self->cx->taskID))) {
|
||||
return reply.version;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbclient/BlobWorkerInterface.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/BlobManagerInterface.h"
|
||||
|
@ -233,7 +234,8 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<ReadYourWritesT
|
|||
printf(
|
||||
"Splitting new range [%s - %s)\n", range.begin.printable().c_str(), range.end.printable().c_str());
|
||||
}
|
||||
StorageMetrics estimated = wait(tr->getTransaction().getStorageMetrics(range, CLIENT_KNOBS->TOO_MANY));
|
||||
StorageMetrics estimated =
|
||||
wait(tr->getTransaction().getDatabase()->getStorageMetrics(range, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("Estimated bytes for [{0} - {1}): {2}\n",
|
||||
|
@ -252,7 +254,7 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<ReadYourWritesT
|
|||
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity; // Don't split by readBandwidth
|
||||
|
||||
Standalone<VectorRef<KeyRef>> keys =
|
||||
wait(tr->getTransaction().splitStorageMetrics(range, splitMetrics, estimated));
|
||||
wait(tr->getTransaction().getDatabase()->splitStorageMetrics(range, splitMetrics, estimated));
|
||||
return keys;
|
||||
} else {
|
||||
// printf(" Not splitting range\n");
|
||||
|
|
|
@ -274,12 +274,12 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self,
|
|||
Transaction tr(self()->cx);
|
||||
// metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range
|
||||
std::pair<Optional<StorageMetrics>, int> metrics =
|
||||
wait(tr.waitStorageMetrics(keys,
|
||||
bounds.min,
|
||||
bounds.max,
|
||||
bounds.permittedError,
|
||||
CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT,
|
||||
shardCount));
|
||||
wait(self()->cx->waitStorageMetrics(keys,
|
||||
bounds.min,
|
||||
bounds.max,
|
||||
bounds.permittedError,
|
||||
CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT,
|
||||
shardCount));
|
||||
if (metrics.first.present()) {
|
||||
BandwidthStatus newBandwidthStatus = getBandwidthStatus(metrics.first.get());
|
||||
if (newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) {
|
||||
|
@ -336,7 +336,8 @@ ACTOR Future<Void> readHotDetector(DataDistributionTracker* self) {
|
|||
state Transaction tr(self->cx);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> readHotRanges = wait(tr.getReadHotRanges(keys));
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> readHotRanges =
|
||||
wait(self->cx->getReadHotRanges(keys));
|
||||
for (const auto& keyRange : readHotRanges) {
|
||||
TraceEvent("ReadHotRangeLog")
|
||||
.detail("ReadDensity", keyRange.density)
|
||||
|
@ -378,7 +379,8 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getSplitKeys(DataDistributionTracker
|
|||
loop {
|
||||
state Transaction tr(self->cx);
|
||||
try {
|
||||
Standalone<VectorRef<KeyRef>> keys = wait(tr.splitStorageMetrics(splitRange, splitMetrics, estimated));
|
||||
Standalone<VectorRef<KeyRef>> keys =
|
||||
wait(self->cx->splitStorageMetrics(splitRange, splitMetrics, estimated));
|
||||
return keys;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
|
|
@ -400,7 +400,7 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
|
|||
// Keep track of shards for all src servers so that we can preserve their values in serverKeys
|
||||
state Map<UID, VectorRef<KeyRangeRef>> shardMap;
|
||||
|
||||
tr->getTransaction().info.taskID = TaskPriority::MoveKeys;
|
||||
tr->getTransaction().trState->taskID = TaskPriority::MoveKeys;
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
|
@ -603,7 +603,7 @@ ACTOR Future<Void> checkFetchingState(Database cx,
|
|||
if (BUGGIFY)
|
||||
wait(delay(5));
|
||||
|
||||
tr.info.taskID = TaskPriority::MoveKeys;
|
||||
tr.trState->taskID = TaskPriority::MoveKeys;
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
std::vector<Future<Optional<Value>>> serverListEntries;
|
||||
|
@ -696,7 +696,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
|
|||
loop {
|
||||
try {
|
||||
|
||||
tr.info.taskID = TaskPriority::MoveKeys;
|
||||
tr.trState->taskID = TaskPriority::MoveKeys;
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
releaser.release();
|
||||
|
@ -1317,7 +1317,7 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.info.taskID = TaskPriority::MoveKeys;
|
||||
tr.trState->taskID = TaskPriority::MoveKeys;
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
wait(checkMoveKeysLock(&tr, lock, ddEnabledState));
|
||||
TraceEvent("RemoveKeysFromFailedServerLocked")
|
||||
|
|
|
@ -1189,7 +1189,7 @@ ACTOR Future<RangeResult> tryFetchRange(Database cx,
|
|||
|
||||
ASSERT(!cx->switchable);
|
||||
tr.setVersion(version);
|
||||
tr.info.taskID = TaskPriority::FetchKeys;
|
||||
tr.trState->taskID = TaskPriority::FetchKeys;
|
||||
limits.minRows = 0;
|
||||
|
||||
try {
|
||||
|
|
|
@ -2104,7 +2104,7 @@ ACTOR Future<Optional<Value>> quickGetValue(StorageServer* data,
|
|||
state Transaction tr(data->cx);
|
||||
tr.setVersion(version);
|
||||
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
||||
tr.info.taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||
Future<Optional<Value>> valueFuture = tr.get(key, Snapshot::True);
|
||||
// TODO: async in case it needs to read from other servers.
|
||||
state Optional<Value> valueOption = wait(valueFuture);
|
||||
|
@ -2635,7 +2635,7 @@ ACTOR Future<RangeResult> quickGetKeyValues(StorageServer* data,
|
|||
state Transaction tr(data->cx);
|
||||
tr.setVersion(version);
|
||||
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
||||
tr.info.taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||
Future<RangeResult> rangeResultFuture = tr.getRange(prefixRange(prefix), Snapshot::True);
|
||||
// TODO: async in case it needs to read from other servers.
|
||||
RangeResult rangeResult = wait(rangeResultFuture);
|
||||
|
@ -4143,7 +4143,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion)
|
||||
shard->updates.pop_front();
|
||||
tr.setVersion(fetchVersion);
|
||||
tr.info.taskID = TaskPriority::FetchKeys;
|
||||
tr.trState->taskID = TaskPriority::FetchKeys;
|
||||
state PromiseStream<RangeResult> results;
|
||||
state Future<Void> hold = SERVER_KNOBS->FETCH_USING_STREAMING
|
||||
? tr.getRangeStream(results, keys, GetRangeLimits(), Snapshot::True)
|
||||
|
|
|
@ -877,7 +877,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
|||
state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc);
|
||||
|
||||
while (begin < end) {
|
||||
state Reference<CommitProxyInfo> commitProxyInfo = wait(cx->getCommitProxiesFuture(false));
|
||||
state Reference<CommitProxyInfo> commitProxyInfo =
|
||||
wait(cx->getCommitProxiesFuture(UseProvisionalProxies::False));
|
||||
keyServerLocationFutures.clear();
|
||||
for (int i = 0; i < commitProxyInfo->size(); i++)
|
||||
keyServerLocationFutures.push_back(
|
||||
|
@ -1124,7 +1125,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
|||
loop {
|
||||
try {
|
||||
StorageMetrics metrics =
|
||||
wait(tr.getStorageMetrics(KeyRangeRef(allKeys.begin, keyServersPrefix), 100000));
|
||||
wait(tr.getDatabase()->getStorageMetrics(KeyRangeRef(allKeys.begin, keyServersPrefix), 100000));
|
||||
return metrics.bytes;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
|
|
@ -76,7 +76,7 @@ struct IndexScanWorkload : KVWorkload {
|
|||
loop {
|
||||
state Transaction tr(cx);
|
||||
try {
|
||||
wait(tr.warmRange(cx, allKeys));
|
||||
wait(tr.warmRange(allKeys));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
|
|
@ -98,11 +98,11 @@ struct ReadHotDetectionWorkload : TestWorkload {
|
|||
loop {
|
||||
state Transaction tr(cx);
|
||||
try {
|
||||
StorageMetrics sm = wait(tr.getStorageMetrics(self->wholeRange, 100));
|
||||
StorageMetrics sm = wait(cx->getStorageMetrics(self->wholeRange, 100));
|
||||
// TraceEvent("RHDCheckPhaseLog")
|
||||
// .detail("KeyRangeSize", sm.bytes)
|
||||
// .detail("KeyRangeReadBandwith", sm.bytesReadPerKSecond);
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> keyRanges = wait(tr.getReadHotRanges(self->wholeRange));
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> keyRanges = wait(cx->getReadHotRanges(self->wholeRange));
|
||||
// TraceEvent("RHDCheckPhaseLog")
|
||||
// .detail("KeyRangesSize", keyRanges.size())
|
||||
// .detail("ReadKey", self->readKey.printable().c_str())
|
||||
|
|
|
@ -592,7 +592,7 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
try {
|
||||
self->setupTransaction(&tr);
|
||||
wait(self->readOp(&tr, keys, self, false));
|
||||
wait(tr.warmRange(cx, allKeys));
|
||||
wait(tr.warmRange(allKeys));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
|
|
@ -105,7 +105,7 @@ struct TargetedKillWorkload : TestWorkload {
|
|||
if (self->machineToKill == "master") {
|
||||
machine = self->dbInfo->get().master.address();
|
||||
} else if (self->machineToKill == "commitproxy") {
|
||||
auto commitProxies = cx->getCommitProxies(false);
|
||||
auto commitProxies = cx->getCommitProxies(UseProvisionalProxies::False);
|
||||
int o = deterministicRandom()->randomInt(0, commitProxies->size());
|
||||
for (int i = 0; i < commitProxies->size(); i++) {
|
||||
CommitProxyInterface mpi = commitProxies->getInterface(o);
|
||||
|
@ -115,7 +115,7 @@ struct TargetedKillWorkload : TestWorkload {
|
|||
o = ++o % commitProxies->size();
|
||||
}
|
||||
} else if (self->machineToKill == "grvproxy") {
|
||||
auto grvProxies = cx->getGrvProxies(false);
|
||||
auto grvProxies = cx->getGrvProxies(UseProvisionalProxies::False);
|
||||
int o = deterministicRandom()->randomInt(0, grvProxies->size());
|
||||
for (int i = 0; i < grvProxies->size(); i++) {
|
||||
GrvProxyInterface gpi = grvProxies->getInterface(o);
|
||||
|
|
|
@ -236,8 +236,8 @@ struct WriteTagThrottlingWorkload : KVWorkload {
|
|||
// give tag to client
|
||||
if (self->writeThrottle) {
|
||||
ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION >= MIN_TAGS_PER_TRANSACTION);
|
||||
tr.options.tags.clear();
|
||||
tr.options.readTags.clear();
|
||||
tr.trState->options.tags.clear();
|
||||
tr.trState->options.readTags.clear();
|
||||
if (isBadActor) {
|
||||
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, self->badTag);
|
||||
} else if (deterministicRandom()->coinflip()) {
|
||||
|
|
Loading…
Reference in New Issue