Address simulation test failures caused by:

- Assertion failures in MoveKeys.actor.cpp
- Wrong results returned by getRange()

Changes:

DatabaseContext.h, NativeAPI.actor.[h,cpp]:
- Introduce a new flag, TransactionInfo::readVersionObtainedFromGrvProxy.
- Set this flag to true by default, and clear it when the read version of a
transaction is explicitly set (by using setVersion()).
- Modify getLatestCommitVersions() to not populate "latestCommitVersions" if
this flag is not set. (This will cause storage server to read at the specified
read version.)

- Modify getRange() actor to always use the specified version as the read
version (except when the specified version is latestVersion).

- Modify waitForCommittedVersion(), getRawVersion(), and getConsistentReadVersion()
to update local version vector cache after receiving GetReadVersionReply.

IClientApi.h, IConfigTransaction.h, ISingleThreadTransaction.h,
MultiVersionTransaction[.actor].[h,cpp], ThreadSafeTransaction.[h,cpp],
ApiWorkload.h:
- Add methods to get the spanID of a transaction and also the version vector
cached in a transaction. (Likely to be useful for debugging simulation test
failures.)

VersionVector.h:
- Update "maxVersion" when populating/applying a delta. (Note that empty
mutation messages only update VersionVector::maxVersion.)

BackupWorker.actor.cpp:
- Update local version vector cache after receiving GetReadVersionReply message.

Status.actor.cpp:
- Update local version vector cache and
TransactionInfo::info.readVersionObtainedFromGrvProxy after setting the
read version.
This commit is contained in:
Sreenath Bodagala 2021-09-14 19:05:52 +00:00 committed by Dan Lambright
parent 0e9ce5c6fc
commit 852fc96200
15 changed files with 107 additions and 9 deletions

View File

@ -455,6 +455,7 @@ public:
// commit version of that storage server is below the specified "readVersion".
void getLatestCommitVersions(const Reference<LocationInfo>& locationInfo,
Version readVersion,
const TransactionInfo& info,
VersionVector& latestCommitVersions);
// used in template functions to create a transaction

View File

@ -79,6 +79,8 @@ public:
virtual ThreadFuture<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
virtual std::string getVersionVector() = 0;
virtual UID getSpanID() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -44,6 +44,8 @@ public:
// Not implemented:
void setVersion(Version) override { throw client_invalid_operation(); }
std::string getVersionVector() const override { throw client_invalid_operation(); }
UID getSpanID() const override { throw client_invalid_operation(); }
Future<Key> getKey(KeySelector const& key, Snapshot snapshot = Snapshot::False) override {
throw client_invalid_operation();
}

View File

@ -76,6 +76,8 @@ public:
virtual void addWriteConflictRange(KeyRangeRef const& keys) = 0;
virtual Future<Void> commit() = 0;
virtual Version getCommittedVersion() const = 0;
virtual std::string getVersionVector() const = 0;
virtual UID getSpanID() const = 0;
virtual int64_t getApproximateSize() const = 0;
virtual Future<Standalone<StringRef>> getVersionstamp() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -668,6 +668,7 @@ void MultiVersionTransaction::setVersion(Version v) {
tr.transaction->setVersion(v);
}
}
ThreadFuture<Version> MultiVersionTransaction::getReadVersion() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getReadVersion() : ThreadFuture<Version>(Never());
@ -825,6 +826,24 @@ Version MultiVersionTransaction::getCommittedVersion() {
return invalidVersion;
}
std::string MultiVersionTransaction::getVersionVector() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getVersionVector();
}
return std::string("transaction not found");
}
UID MultiVersionTransaction::getSpanID() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getSpanID();
}
return UID();
}
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture<int64_t>(Never());

View File

@ -239,6 +239,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
std::string getVersionVector() override { return std::string("DLTransaction::Not implemented"); }
UID getSpanID() override { return UID(); };
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
@ -378,6 +380,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
std::string getVersionVector() override;
UID getSpanID() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;

View File

@ -199,7 +199,21 @@ void DatabaseContext::addSSIdTagMapping(const UID& uid, const Tag& tag) {
void DatabaseContext::getLatestCommitVersions(const Reference<LocationInfo>& locationInfo,
Version readVersion,
const TransactionInfo& info,
VersionVector& latestCommitVersions) {
latestCommitVersions.clear();
if (!info.readVersionObtainedFromGrvProxy) {
return;
}
if (readVersion > ssVersionVectorCache.getMaxVersion()) {
TraceEvent("GetLatestCommitVersions")
.detail("ReadVersion", readVersion)
.detail("Version vector", ssVersionVectorCache.toString());
ASSERT(false);
}
std::map<Version, std::set<Tag>> versionMap; // order the versions to be returned
for (int i = 0; i < locationInfo->locations()->size(); i++) {
UID uid = locationInfo->locations()->getId(i);
@ -215,7 +229,6 @@ void DatabaseContext::getLatestCommitVersions(const Reference<LocationInfo>& loc
}
// insert the commit versions in the version vector.
latestCommitVersions.clear();
for (auto& iter : versionMap) {
latestCommitVersions.setVersion(iter.second, iter.first);
}
@ -1672,6 +1685,8 @@ ACTOR static Future<Void> switchConnectionFileImpl(Reference<ClusterConnectionFi
self->minAcceptableReadVersion = std::numeric_limits<Version>::max();
self->invalidateCache(allKeys);
self->ssVersionVectorCache.clear();
auto clearedClientInfo = self->clientInfo->get();
clearedClientInfo.commitProxies.clear();
clearedClientInfo.grvProxies.clear();
@ -2518,7 +2533,7 @@ ACTOR Future<Optional<Value>> getValue(Future<Version> version,
state uint64_t startTime;
state double startTimeD;
state VersionVector ssLatestCommitVersions;
cx->getLatestCommitVersions(ssi.second, ver, ssLatestCommitVersions);
cx->getLatestCommitVersions(ssi.second, ver, info, ssLatestCommitVersions);
try {
if (info.debugID.present()) {
getValueID = nondeterministicRandom()->randomUniqueID();
@ -2647,7 +2662,7 @@ ACTOR Future<Key> getKey(Database cx, KeySelector k, Future<Version> version, Tr
wait(getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, Reverse{ k.isBackward() }));
state VersionVector ssLatestCommitVersions;
cx->getLatestCommitVersions(ssi.second, version.get(), ssLatestCommitVersions);
cx->getLatestCommitVersions(ssi.second, version.get(), info, ssLatestCommitVersions);
try {
if (info.debugID.present())
@ -2725,6 +2740,7 @@ ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, Span
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
if (v.midShardSize > 0)
cx->smoothMidShardSize.setTotal(v.midShardSize);
cx->ssVersionVectorCache.applyDelta(v.ssVersionVectorDelta);
if (v.version >= version)
return v.version;
// SOMEDAY: Do the wait on the server side, possibly use less expensive source of committed version
@ -2750,6 +2766,7 @@ ACTOR Future<Version> getRawVersion(Database cx, SpanID spanContext) {
GetReadVersionRequest(
spanContext, 0, TransactionPriority::IMMEDIATE, cx->ssVersionVectorCache.getMaxVersion()),
cx->taskID))) {
cx->ssVersionVectorCache.applyDelta(v.ssVersionVectorDelta);
return v.version;
}
}
@ -3038,7 +3055,7 @@ ACTOR Future<RangeResult> getExactRange(Database cx,
req.begin = firstGreaterOrEqual(range.begin);
req.end = firstGreaterOrEqual(range.end);
req.spanContext = span.context;
cx->getLatestCommitVersions(locations[shard].second, req.version, req.ssLatestCommitVersions);
cx->getLatestCommitVersions(locations[shard].second, req.version, info, req.ssLatestCommitVersions);
// keep shard's arena around in case of async tss comparison
req.arena.dependsOn(locations[shard].first.arena());
@ -3359,7 +3376,7 @@ ACTOR Future<RangeResult> getRange(Database cx,
req.isFetchKeys = (info.taskID == TaskPriority::FetchKeys);
req.version = readVersion;
cx->getLatestCommitVersions(beginServer.second, req.version, req.ssLatestCommitVersions);
cx->getLatestCommitVersions(beginServer.second, req.version, info, req.ssLatestCommitVersions);
// In case of async tss comparison, also make req arena depend on begin, end, and/or shard's arena depending
// on which is used
@ -3521,7 +3538,9 @@ ACTOR Future<RangeResult> getRange(Database cx,
return output;
}
if (readVersion == latestVersion) {
readVersion = rep.version; // see above comment
}
if (!rep.more) {
ASSERT(modifiedSelectors);
@ -3529,7 +3548,7 @@ ACTOR Future<RangeResult> getRange(Database cx,
if (!rep.data.size()) {
RangeResult result = wait(getRangeFallback(
cx, version, originalBegin, originalEnd, originalLimits, reverse, info, tags));
cx, readVersion, originalBegin, originalEnd, originalLimits, reverse, info, tags));
getRangeFinished(cx,
trLogInfo,
startTime,
@ -3797,7 +3816,7 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
req.spanContext = spanContext;
req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.limitBytes = std::numeric_limits<int>::max();
cx->getLatestCommitVersions(locations[shard].second, req.version, req.ssLatestCommitVersions);
cx->getLatestCommitVersions(locations[shard].second, req.version, info, req.ssLatestCommitVersions);
// keep shard's arena around in case of async tss comparison
req.arena.dependsOn(range.arena());
@ -4226,13 +4245,19 @@ void Transaction::flushTrLogsIfEnabled() {
}
}
std::string Transaction::getVersionVector() const {
return cx->ssVersionVectorCache.toString();
}
void Transaction::setVersion(Version v) {
startTime = now();
if (readVersion.isValid())
throw read_version_already_set();
if (v <= 0)
throw version_invalid();
readVersion = v;
info.readVersionObtainedFromGrvProxy = false;
}
Future<Optional<Value>> Transaction::get(const Key& key, Snapshot snapshot) {
@ -4799,6 +4824,7 @@ void Transaction::reset() {
committing = Future<Void>();
info.taskID = cx->taskID;
info.debugID = Optional<UID>();
info.readVersionObtainedFromGrvProxy = true;
flushTrLogsIfEnabled();
trLogInfo = Reference<TransactionLogInfo>(createTrLogInfoProbabilistically(cx));
cancelWatches();
@ -5602,6 +5628,7 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan,
"TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After");
ASSERT(v.version > 0);
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
cx->ssVersionVectorCache.applyDelta(v.ssVersionVectorDelta);
return v;
}
}
@ -5821,6 +5848,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
auto const req = DatabaseContext::VersionRequest(spanContext, options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion(location,
spanContext,
info.spanID,

View File

@ -180,12 +180,13 @@ struct TransactionInfo {
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
bool readVersionObtainedFromGrvProxy;
// Only available so that Transaction can have a default constructor, for use in state variables
TransactionInfo() : taskID(), spanID(), useProvisionalProxies() {}
explicit TransactionInfo(TaskPriority taskID, SpanID spanID)
: taskID(taskID), spanID(spanID), useProvisionalProxies(false) {}
: taskID(taskID), spanID(spanID), useProvisionalProxies(false), readVersionObtainedFromGrvProxy(true) {}
};
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
@ -387,6 +388,8 @@ public:
void fullReset();
double getBackoff(int errCode);
void debugTransaction(UID dID) { info.debugID = dID; }
std::string getVersionVector() const;
UID getSpanID() const { return info.spanID; }
Future<Void> commitMutations();
void setupWatches();

View File

@ -123,6 +123,9 @@ public:
[[nodiscard]] Future<Void> commit() override;
Version getCommittedVersion() const override { return tr.getCommittedVersion(); }
std::string getVersionVector() const override { return tr.getVersionVector(); }
UID getSpanID() const override { return tr.getSpanID(); }
int64_t getApproximateSize() const override { return approximateSize; }
[[nodiscard]] Future<Standalone<StringRef>> getVersionstamp() override;

View File

@ -353,6 +353,14 @@ Version ThreadSafeTransaction::getCommittedVersion() {
return tr->getCommittedVersion();
}
std::string ThreadSafeTransaction::getVersionVector() {
return tr->getVersionVector();
}
UID ThreadSafeTransaction::getSpanID() {
return tr->getSpanID();
}
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
ISingleThreadTransaction* tr = this->tr;
return onMainThread([tr]() -> Future<int64_t> { return tr->getApproximateSize(); });

View File

@ -127,6 +127,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
std::string getVersionVector() override;
UID getSpanID() override;
ThreadFuture<int64_t> getApproximateSize() override;
ThreadFuture<uint64_t> getProtocolVersion();

View File

@ -96,6 +96,8 @@ struct VersionVector {
for (auto& [version, tags] : tmpVersionMap) {
delta.setVersion(tags, version);
}
delta.maxVersion = maxVersion;
}
// @note this method, together with method getDelta(), helps minimize
@ -120,7 +122,10 @@ struct VersionVector {
for (auto& [version, tags] : tmpVersionMap) {
setVersion(tags, version);
}
maxVersion = delta.maxVersion;
}
std::string toString() const {
std::stringstream vector;
vector << "[";

View File

@ -439,6 +439,7 @@ struct BackupData {
&GrvProxyInterface::getConsistentReadVersion,
request,
self->cx->taskID))) {
self->cx->ssVersionVectorCache.applyDelta(reply.ssVersionVectorDelta);
return reply.version;
}
}

View File

@ -1264,6 +1264,8 @@ ACTOR static Future<double> doCommitProbe(Future<double> grvProbe, Transaction*
ASSERT(sourceTr->getReadVersion().isReady());
tr->setVersion(sourceTr->getReadVersion().get());
tr->getDatabase()->ssVersionVectorCache = sourceTr->getDatabase()->ssVersionVectorCache;
tr->info.readVersionObtainedFromGrvProxy = sourceTr->info.readVersionObtainedFromGrvProxy;
state double start = g_network->timer_monotonic();

View File

@ -69,6 +69,12 @@ struct TransactionWrapper : public ReferenceCounted<TransactionWrapper> {
// Gets the committed version of a transaction
virtual Version getCommittedVersion() = 0;
// Gets the version vector cached in a transaction
virtual std::string getVersionVector() = 0;
// Gets the spanID of a transaction
virtual UID getSpanID() = 0;
// Prints debugging messages for a transaction; not implemented for all transaction types
virtual void debugTransaction(UID debugId) {}
@ -135,6 +141,12 @@ struct FlowTransactionWrapper : public TransactionWrapper {
// Gets the committed version of a transaction
Version getCommittedVersion() override { return transaction.getCommittedVersion(); }
// Gets the version vector cached in a transaction
std::string getVersionVector() override { return transaction.getVersionVector(); }
// Gets the spanID of a transaction
UID getSpanID() override { return transaction.getSpanID(); }
// Prints debugging messages for a transaction
void debugTransaction(UID debugId) override { transaction.debugTransaction(debugId); }
@ -188,6 +200,10 @@ struct ThreadTransactionWrapper : public TransactionWrapper {
// Gets the committed version of a transaction
Version getCommittedVersion() override { return transaction->getCommittedVersion(); }
std::string getVersionVector() override { return transaction->getVersionVector(); }
UID getSpanID() override { return transaction->getSpanID(); }
void addReadConflictRange(KeyRangeRef const& keys) override { transaction->addReadConflictRange(keys); }
};