From 852fc962006449b45a3a63dd9a2b7cd17a6208c0 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Tue, 14 Sep 2021 19:05:52 +0000 Subject: [PATCH] Address simulation test failures caused by: - Assertion failures in MoveKeys.actor.cpp - Wrong results returned by getRange() Changes: DatabaseContext.h, NativeAPI.actor.[h,cpp]: - Introduce a new flag, TransactionInfo::readVersionObtainedFromGrvProxy. - Set this flag to true by default, and clear it when the read version of a transaction is explicitly set (by using setVersion()). - Modify getLatestCommitVersions() to not populate "latestCommitVersions" if this flag is not set. (This will cause storage server to read at the specified read version.) - Modify getRange() actor to always use the specified version as the read version (except when the specified version is latestVersion). - Modify waitForCommittedVersion(), getRawVersion(), and getConsistentReadVersion() to update local version vector cache after receiving GetReadVersionReply. IClientApi.h, IConfigTransaction.h, ISingleThreadTransaction.h, MultiVersionTransaction[.actor].[h,cpp], ThreadSafeTransaction.[h,cpp], ApiWorkload.h: - Add methods to get the spanID of a transaction and also the version vector cached in a transaction. (Likely to be useful for debugging simulation test failures.) VersionVector.h: - Update "maxVersion" when populating/applying a delta. (Note that empty mutation messages only update VersionVector::maxVersion.) BackupWorker.actor.cpp: - Update local version vector cache after receiving GetReadVersionReply message. Status.actor.cpp: - Update local version vector cache and TransactionInfo::info.readVersionObtainedFromGrvProxy after setting the read version. --- fdbclient/DatabaseContext.h | 1 + fdbclient/IClientApi.h | 2 + fdbclient/IConfigTransaction.h | 2 + fdbclient/ISingleThreadTransaction.h | 2 + fdbclient/MultiVersionTransaction.actor.cpp | 19 +++++++++ fdbclient/MultiVersionTransaction.h | 4 ++ fdbclient/NativeAPI.actor.cpp | 44 +++++++++++++++++---- fdbclient/NativeAPI.actor.h | 5 ++- fdbclient/ReadYourWrites.h | 3 ++ fdbclient/ThreadSafeTransaction.cpp | 8 ++++ fdbclient/ThreadSafeTransaction.h | 2 + fdbclient/VersionVector.h | 5 +++ fdbserver/BackupWorker.actor.cpp | 1 + fdbserver/Status.actor.cpp | 2 + fdbserver/workloads/ApiWorkload.h | 16 ++++++++ 15 files changed, 107 insertions(+), 9 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 31ccc309e4..a385ea3ea8 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -455,6 +455,7 @@ public: // commit version of that storage server is below the specified "readVersion". void getLatestCommitVersions(const Reference& locationInfo, Version readVersion, + const TransactionInfo& info, VersionVector& latestCommitVersions); // used in template functions to create a transaction diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index 79e45c0842..86b4b0883b 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -79,6 +79,8 @@ public: virtual ThreadFuture commit() = 0; virtual Version getCommittedVersion() = 0; + virtual std::string getVersionVector() = 0; + virtual UID getSpanID() = 0; virtual ThreadFuture getApproximateSize() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) = 0; diff --git a/fdbclient/IConfigTransaction.h b/fdbclient/IConfigTransaction.h index 42d5769c51..1c349a2a3e 100644 --- a/fdbclient/IConfigTransaction.h +++ b/fdbclient/IConfigTransaction.h @@ -44,6 +44,8 @@ public: // Not implemented: void setVersion(Version) override { throw client_invalid_operation(); } + std::string getVersionVector() const override { throw client_invalid_operation(); } + UID getSpanID() const override { throw client_invalid_operation(); } Future getKey(KeySelector const& key, Snapshot snapshot = Snapshot::False) override { throw client_invalid_operation(); } diff --git a/fdbclient/ISingleThreadTransaction.h b/fdbclient/ISingleThreadTransaction.h index 9228184593..ce0b8401da 100644 --- a/fdbclient/ISingleThreadTransaction.h +++ b/fdbclient/ISingleThreadTransaction.h @@ -76,6 +76,8 @@ public: virtual void addWriteConflictRange(KeyRangeRef const& keys) = 0; virtual Future commit() = 0; virtual Version getCommittedVersion() const = 0; + virtual std::string getVersionVector() const = 0; + virtual UID getSpanID() const = 0; virtual int64_t getApproximateSize() const = 0; virtual Future> getVersionstamp() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 49f80dd6fb..7c46dfb251 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -668,6 +668,7 @@ void MultiVersionTransaction::setVersion(Version v) { tr.transaction->setVersion(v); } } + ThreadFuture MultiVersionTransaction::getReadVersion() { auto tr = getTransaction(); auto f = tr.transaction ? tr.transaction->getReadVersion() : ThreadFuture(Never()); @@ -825,6 +826,24 @@ Version MultiVersionTransaction::getCommittedVersion() { return invalidVersion; } +std::string MultiVersionTransaction::getVersionVector() { + auto tr = getTransaction(); + if (tr.transaction) { + return tr.transaction->getVersionVector(); + } + + return std::string("transaction not found"); +} + +UID MultiVersionTransaction::getSpanID() { + auto tr = getTransaction(); + if (tr.transaction) { + return tr.transaction->getSpanID(); + } + + return UID(); +} + ThreadFuture MultiVersionTransaction::getApproximateSize() { auto tr = getTransaction(); auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture(Never()); diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 274df7dd84..f0ab1fc7b6 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -239,6 +239,8 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; + std::string getVersionVector() override { return std::string("DLTransaction::Not implemented"); } + UID getSpanID() override { return UID(); }; ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) override; @@ -378,6 +380,8 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; + std::string getVersionVector() override; + UID getSpanID() override; ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) override; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 20d3a889a4..b702f91d3f 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -199,7 +199,21 @@ void DatabaseContext::addSSIdTagMapping(const UID& uid, const Tag& tag) { void DatabaseContext::getLatestCommitVersions(const Reference& locationInfo, Version readVersion, + const TransactionInfo& info, VersionVector& latestCommitVersions) { + latestCommitVersions.clear(); + + if (!info.readVersionObtainedFromGrvProxy) { + return; + } + + if (readVersion > ssVersionVectorCache.getMaxVersion()) { + TraceEvent("GetLatestCommitVersions") + .detail("ReadVersion", readVersion) + .detail("Version vector", ssVersionVectorCache.toString()); + ASSERT(false); + } + std::map> versionMap; // order the versions to be returned for (int i = 0; i < locationInfo->locations()->size(); i++) { UID uid = locationInfo->locations()->getId(i); @@ -215,7 +229,6 @@ void DatabaseContext::getLatestCommitVersions(const Reference& loc } // insert the commit versions in the version vector. - latestCommitVersions.clear(); for (auto& iter : versionMap) { latestCommitVersions.setVersion(iter.second, iter.first); } @@ -1672,6 +1685,8 @@ ACTOR static Future switchConnectionFileImpl(ReferenceminAcceptableReadVersion = std::numeric_limits::max(); self->invalidateCache(allKeys); + self->ssVersionVectorCache.clear(); + auto clearedClientInfo = self->clientInfo->get(); clearedClientInfo.commitProxies.clear(); clearedClientInfo.grvProxies.clear(); @@ -2518,7 +2533,7 @@ ACTOR Future> getValue(Future version, state uint64_t startTime; state double startTimeD; state VersionVector ssLatestCommitVersions; - cx->getLatestCommitVersions(ssi.second, ver, ssLatestCommitVersions); + cx->getLatestCommitVersions(ssi.second, ver, info, ssLatestCommitVersions); try { if (info.debugID.present()) { getValueID = nondeterministicRandom()->randomUniqueID(); @@ -2647,7 +2662,7 @@ ACTOR Future getKey(Database cx, KeySelector k, Future version, Tr wait(getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, Reverse{ k.isBackward() })); state VersionVector ssLatestCommitVersions; - cx->getLatestCommitVersions(ssi.second, version.get(), ssLatestCommitVersions); + cx->getLatestCommitVersions(ssi.second, version.get(), info, ssLatestCommitVersions); try { if (info.debugID.present()) @@ -2725,6 +2740,7 @@ ACTOR Future waitForCommittedVersion(Database cx, Version version, Span cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version); if (v.midShardSize > 0) cx->smoothMidShardSize.setTotal(v.midShardSize); + cx->ssVersionVectorCache.applyDelta(v.ssVersionVectorDelta); if (v.version >= version) return v.version; // SOMEDAY: Do the wait on the server side, possibly use less expensive source of committed version @@ -2750,6 +2766,7 @@ ACTOR Future getRawVersion(Database cx, SpanID spanContext) { GetReadVersionRequest( spanContext, 0, TransactionPriority::IMMEDIATE, cx->ssVersionVectorCache.getMaxVersion()), cx->taskID))) { + cx->ssVersionVectorCache.applyDelta(v.ssVersionVectorDelta); return v.version; } } @@ -3038,7 +3055,7 @@ ACTOR Future getExactRange(Database cx, req.begin = firstGreaterOrEqual(range.begin); req.end = firstGreaterOrEqual(range.end); req.spanContext = span.context; - cx->getLatestCommitVersions(locations[shard].second, req.version, req.ssLatestCommitVersions); + cx->getLatestCommitVersions(locations[shard].second, req.version, info, req.ssLatestCommitVersions); // keep shard's arena around in case of async tss comparison req.arena.dependsOn(locations[shard].first.arena()); @@ -3359,7 +3376,7 @@ ACTOR Future getRange(Database cx, req.isFetchKeys = (info.taskID == TaskPriority::FetchKeys); req.version = readVersion; - cx->getLatestCommitVersions(beginServer.second, req.version, req.ssLatestCommitVersions); + cx->getLatestCommitVersions(beginServer.second, req.version, info, req.ssLatestCommitVersions); // In case of async tss comparison, also make req arena depend on begin, end, and/or shard's arena depending // on which is used @@ -3521,7 +3538,9 @@ ACTOR Future getRange(Database cx, return output; } - readVersion = rep.version; // see above comment + if (readVersion == latestVersion) { + readVersion = rep.version; // see above comment + } if (!rep.more) { ASSERT(modifiedSelectors); @@ -3529,7 +3548,7 @@ ACTOR Future getRange(Database cx, if (!rep.data.size()) { RangeResult result = wait(getRangeFallback( - cx, version, originalBegin, originalEnd, originalLimits, reverse, info, tags)); + cx, readVersion, originalBegin, originalEnd, originalLimits, reverse, info, tags)); getRangeFinished(cx, trLogInfo, startTime, @@ -3797,7 +3816,7 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* req.spanContext = spanContext; req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT; req.limitBytes = std::numeric_limits::max(); - cx->getLatestCommitVersions(locations[shard].second, req.version, req.ssLatestCommitVersions); + cx->getLatestCommitVersions(locations[shard].second, req.version, info, req.ssLatestCommitVersions); // keep shard's arena around in case of async tss comparison req.arena.dependsOn(range.arena()); @@ -4226,13 +4245,19 @@ void Transaction::flushTrLogsIfEnabled() { } } +std::string Transaction::getVersionVector() const { + return cx->ssVersionVectorCache.toString(); +} + void Transaction::setVersion(Version v) { startTime = now(); if (readVersion.isValid()) throw read_version_already_set(); if (v <= 0) throw version_invalid(); + readVersion = v; + info.readVersionObtainedFromGrvProxy = false; } Future> Transaction::get(const Key& key, Snapshot snapshot) { @@ -4799,6 +4824,7 @@ void Transaction::reset() { committing = Future(); info.taskID = cx->taskID; info.debugID = Optional(); + info.readVersionObtainedFromGrvProxy = true; flushTrLogsIfEnabled(); trLogInfo = Reference(createTrLogInfoProbabilistically(cx)); cancelWatches(); @@ -5602,6 +5628,7 @@ ACTOR Future getConsistentReadVersion(SpanID parentSpan, "TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After"); ASSERT(v.version > 0); cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version); + cx->ssVersionVectorCache.applyDelta(v.ssVersionVectorDelta); return v; } } @@ -5821,6 +5848,7 @@ Future Transaction::getReadVersion(uint32_t flags) { auto const req = DatabaseContext::VersionRequest(spanContext, options.tags, info.debugID); batcher.stream.send(req); startTime = now(); + readVersion = extractReadVersion(location, spanContext, info.spanID, diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 96bf3aaed9..a1a16ab3af 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -180,12 +180,13 @@ struct TransactionInfo { // prefix/ : '1' - any keys equal or larger than this key are (probably) conflicting keys // prefix/ : '0' - any keys equal or larger than this key are (definitely) not conflicting keys std::shared_ptr> conflictingKeys; + bool readVersionObtainedFromGrvProxy; // Only available so that Transaction can have a default constructor, for use in state variables TransactionInfo() : taskID(), spanID(), useProvisionalProxies() {} explicit TransactionInfo(TaskPriority taskID, SpanID spanID) - : taskID(taskID), spanID(spanID), useProvisionalProxies(false) {} + : taskID(taskID), spanID(spanID), useProvisionalProxies(false), readVersionObtainedFromGrvProxy(true) {} }; struct TransactionLogInfo : public ReferenceCounted, NonCopyable { @@ -387,6 +388,8 @@ public: void fullReset(); double getBackoff(int errCode); void debugTransaction(UID dID) { info.debugID = dID; } + std::string getVersionVector() const; + UID getSpanID() const { return info.spanID; } Future commitMutations(); void setupWatches(); diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 3ac84a7658..c096bf05eb 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -123,6 +123,9 @@ public: [[nodiscard]] Future commit() override; Version getCommittedVersion() const override { return tr.getCommittedVersion(); } + std::string getVersionVector() const override { return tr.getVersionVector(); } + UID getSpanID() const override { return tr.getSpanID(); } + int64_t getApproximateSize() const override { return approximateSize; } [[nodiscard]] Future> getVersionstamp() override; diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index b1192b2ab8..6e98ffbdac 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -353,6 +353,14 @@ Version ThreadSafeTransaction::getCommittedVersion() { return tr->getCommittedVersion(); } +std::string ThreadSafeTransaction::getVersionVector() { + return tr->getVersionVector(); +} + +UID ThreadSafeTransaction::getSpanID() { + return tr->getSpanID(); +} + ThreadFuture ThreadSafeTransaction::getApproximateSize() { ISingleThreadTransaction* tr = this->tr; return onMainThread([tr]() -> Future { return tr->getApproximateSize(); }); diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 75faa67745..ef536463cb 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -127,6 +127,8 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; + std::string getVersionVector() override; + UID getSpanID() override; ThreadFuture getApproximateSize() override; ThreadFuture getProtocolVersion(); diff --git a/fdbclient/VersionVector.h b/fdbclient/VersionVector.h index fb15dc845c..82c88435c7 100644 --- a/fdbclient/VersionVector.h +++ b/fdbclient/VersionVector.h @@ -96,6 +96,8 @@ struct VersionVector { for (auto& [version, tags] : tmpVersionMap) { delta.setVersion(tags, version); } + + delta.maxVersion = maxVersion; } // @note this method, together with method getDelta(), helps minimize @@ -120,7 +122,10 @@ struct VersionVector { for (auto& [version, tags] : tmpVersionMap) { setVersion(tags, version); } + + maxVersion = delta.maxVersion; } + std::string toString() const { std::stringstream vector; vector << "["; diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index dfb5686743..38c192e28e 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -439,6 +439,7 @@ struct BackupData { &GrvProxyInterface::getConsistentReadVersion, request, self->cx->taskID))) { + self->cx->ssVersionVectorCache.applyDelta(reply.ssVersionVectorDelta); return reply.version; } } diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 7bedbfc757..fbc4c3a1b7 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -1264,6 +1264,8 @@ ACTOR static Future doCommitProbe(Future grvProbe, Transaction* ASSERT(sourceTr->getReadVersion().isReady()); tr->setVersion(sourceTr->getReadVersion().get()); + tr->getDatabase()->ssVersionVectorCache = sourceTr->getDatabase()->ssVersionVectorCache; + tr->info.readVersionObtainedFromGrvProxy = sourceTr->info.readVersionObtainedFromGrvProxy; state double start = g_network->timer_monotonic(); diff --git a/fdbserver/workloads/ApiWorkload.h b/fdbserver/workloads/ApiWorkload.h index b3992f33f3..93ce827eac 100644 --- a/fdbserver/workloads/ApiWorkload.h +++ b/fdbserver/workloads/ApiWorkload.h @@ -69,6 +69,12 @@ struct TransactionWrapper : public ReferenceCounted { // Gets the committed version of a transaction virtual Version getCommittedVersion() = 0; + // Gets the version vector cached in a transaction + virtual std::string getVersionVector() = 0; + + // Gets the spanID of a transaction + virtual UID getSpanID() = 0; + // Prints debugging messages for a transaction; not implemented for all transaction types virtual void debugTransaction(UID debugId) {} @@ -135,6 +141,12 @@ struct FlowTransactionWrapper : public TransactionWrapper { // Gets the committed version of a transaction Version getCommittedVersion() override { return transaction.getCommittedVersion(); } + // Gets the version vector cached in a transaction + std::string getVersionVector() override { return transaction.getVersionVector(); } + + // Gets the spanID of a transaction + UID getSpanID() override { return transaction.getSpanID(); } + // Prints debugging messages for a transaction void debugTransaction(UID debugId) override { transaction.debugTransaction(debugId); } @@ -188,6 +200,10 @@ struct ThreadTransactionWrapper : public TransactionWrapper { // Gets the committed version of a transaction Version getCommittedVersion() override { return transaction->getCommittedVersion(); } + std::string getVersionVector() override { return transaction->getVersionVector(); } + + UID getSpanID() override { return transaction->getSpanID(); } + void addReadConflictRange(KeyRangeRef const& keys) override { transaction->addReadConflictRange(keys); } };