From b6f89df0601640e6d36805d235825976396426ce Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Mon, 12 Jul 2021 19:44:59 +0000 Subject: [PATCH] - Propagate the latest commit version of a storage server as part of read request. Make storage server read at the specified version. --- fdbclient/CommitProxyInterface.h | 6 ++- fdbclient/DatabaseContext.h | 23 +++++++-- fdbclient/NativeAPI.actor.cpp | 73 +++++++++++++++++++++++---- fdbclient/StorageServerInterface.h | 61 +++++++++++++++++++--- fdbclient/VersionVector.h | 7 ++- fdbserver/CommitProxyServer.actor.cpp | 12 +++++ fdbserver/GrvProxyServer.actor.cpp | 2 +- fdbserver/storageserver.actor.cpp | 42 ++++++++++----- 8 files changed, 186 insertions(+), 40 deletions(-) diff --git a/fdbclient/CommitProxyInterface.h b/fdbclient/CommitProxyInterface.h index 7af77e6a44..b67d1d4f5a 100644 --- a/fdbclient/CommitProxyInterface.h +++ b/fdbclient/CommitProxyInterface.h @@ -293,9 +293,13 @@ struct GetKeyServerLocationsReply { // if any storage servers in results have a TSS pair, that mapping is in here std::vector> resultsTssMapping; + // maps storage server interfaces (captured in "results") to the tags of + // their corresponding storage servers + std::vector> resultsTagMapping; + template void serialize(Ar& ar) { - serializer(ar, results, resultsTssMapping, arena); + serializer(ar, results, resultsTssMapping, arena, resultsTagMapping); } }; diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 5ea4208d36..268463e4f2 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -32,13 +32,13 @@ #include "fdbclient/KeyRangeMap.h" #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/SpecialKeySpace.actor.h" +#include "fdbclient/VersionVector.h" #include "fdbrpc/QueueModel.h" #include "fdbrpc/MultiInterface.h" #include "flow/TDMetric.actor.h" #include "fdbclient/EventTypes.actor.h" #include "fdbrpc/ContinuousSample.h" #include "fdbrpc/Smoother.h" -#include "fdbclient/VersionVector.h" class StorageServerInfo : public ReferencedInterface { public: @@ -329,6 +329,9 @@ public: // map from tssid -> metrics for that tss pair std::unordered_map> tssMetrics; + // map from ssid -> ss tag + std::unordered_map ssidTagMapping; + UID dbId; bool internal; // Only contexts created through the C client and fdbcli are non-internal @@ -434,13 +437,23 @@ public: // Cache of the latest commit versions of storage servers. VersionVector ssVersionVectorCache; - // Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present). - // Requests to the storage server will be duplicated to the TSS. + // Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present). + // Requests to the storage server will be duplicated to the TSS. void addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi); - // Removes the storage server and its TSS pair from the TSS mapping (if present). - // Requests to the storage server will no longer be duplicated to its pair TSS. + // Removes the storage server and its TSS pair from the TSS mapping (if present). + // Requests to the storage server will no longer be duplicated to its pair TSS. void removeTssMapping(StorageServerInterface const& ssi); + + // Adds or updates the specified (UID, Tag) pair in the tag mapping. + void addSSIdTagMapping(const UID& uid, const Tag& tag); + + // Returns the latest commit versions that mutated the specified storage servers + /// @note returns the latest commit version for a storage server only if the latest + // commit version of that storage server is below the specified "readVersion". + void getLatestCommitVersions(const Reference& locationInfo, + Version readVersion, + VersionVector& latestCommitVersions); }; #endif diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index f12a44785a..7e2a025e2b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -169,6 +169,34 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) { } } +void DatabaseContext::addSSIdTagMapping(const UID& uid, const Tag& tag) { + ssidTagMapping[uid] = tag; +} + +void DatabaseContext::getLatestCommitVersions(const Reference& locationInfo, + Version readVersion, + VersionVector& latestCommitVersions) { + std::map> versionMap; // order the versions to be returned + for (int i = 0; i < locationInfo->locations()->size(); i++) { + UID uid = locationInfo->locations()->getId(i); + if (ssidTagMapping.find(uid) != ssidTagMapping.end()) { + Tag tag = ssidTagMapping[uid]; + if (ssVersionVectorCache.hasVersion(tag)) { + Version commitVersion = ssVersionVectorCache.getVersion(tag); // latest commit version + if (commitVersion < readVersion) { + versionMap[commitVersion].insert(tag); + } + } + } + } + + // insert the commit versions in the version vector. + latestCommitVersions.clear(); + for (auto& iter : versionMap) { + latestCommitVersions.setVersion(iter.second, iter.first); + } +} + Reference StorageServerInfo::getInterface(DatabaseContext* cx, StorageServerInterface const& ssi, LocalityData const& locality) { @@ -2224,6 +2252,12 @@ void updateTssMappings(Database cx, const GetKeyServerLocationsReply& reply) { } } +void updateTagMappings(Database cx, const GetKeyServerLocationsReply& reply) { + for (const auto& mapping : reply.resultsTagMapping) { + cx->addSSIdTagMapping(mapping.first, mapping.second); + } +} + // If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). // Otherwise returns the shard containing key ACTOR Future>> getKeyLocation_internal(Database cx, @@ -2257,6 +2291,7 @@ ACTOR Future>> getKeyLocation_internal(Da auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second); updateTssMappings(cx, rep); + updateTagMappings(cx, rep); return std::make_pair(KeyRange(rep.results[0].first, rep.arena), locationInfo); } } @@ -2321,6 +2356,7 @@ ACTOR Future>>> getKeyRangeLocatio wait(yield()); } updateTssMappings(cx, rep); + updateTagMappings(cx, rep); return results; } @@ -2425,6 +2461,8 @@ ACTOR Future> getValue(Future version, state Optional getValueID = Optional(); state uint64_t startTime; state double startTimeD; + state VersionVector ssLatestCommitVersions; + cx->getLatestCommitVersions(ssi.second, ver, ssLatestCommitVersions); try { if (info.debugID.present()) { getValueID = nondeterministicRandom()->randomUniqueID(); @@ -2452,15 +2490,19 @@ ACTOR Future> getValue(Future version, } choose { when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } - when(GetValueReply _reply = wait(loadBalance( - cx.getPtr(), - ssi.second, - &StorageServerInterface::getValue, - GetValueRequest( - span.context, key, ver, cx->sampleReadTags() ? tags : Optional(), getValueID), - TaskPriority::DefaultPromiseEndpoint, - false, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + when(GetValueReply _reply = + wait(loadBalance(cx.getPtr(), + ssi.second, + &StorageServerInterface::getValue, + GetValueRequest(span.context, + key, + ver, + cx->sampleReadTags() ? tags : Optional(), + getValueID, + ssLatestCommitVersions), + TaskPriority::DefaultPromiseEndpoint, + false, + cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { reply = _reply; } } @@ -2548,6 +2590,9 @@ ACTOR Future getKey(Database cx, KeySelector k, Future version, Tr state pair> ssi = wait(getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward())); + state VersionVector ssLatestCommitVersions; + cx->getLatestCommitVersions(ssi.second, version.get(), ssLatestCommitVersions); + try { if (info.debugID.present()) g_traceBatch.addEvent( @@ -2557,8 +2602,12 @@ ACTOR Future getKey(Database cx, KeySelector k, Future version, Tr // k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual); ++cx->transactionPhysicalReads; - GetKeyRequest req( - span.context, k, version.get(), cx->sampleReadTags() ? tags : Optional(), getKeyID); + GetKeyRequest req(span.context, + k, + version.get(), + cx->sampleReadTags() ? tags : Optional(), + getKeyID, + ssLatestCommitVersions); req.arena.dependsOn(k.arena()); state GetKeyReply reply; @@ -2929,6 +2978,7 @@ ACTOR Future getExactRange(Database cx, req.begin = firstGreaterOrEqual(range.begin); req.end = firstGreaterOrEqual(range.end); req.spanContext = span.context; + cx->getLatestCommitVersions(locations[shard].second, version, req.ssLatestCommitVersions); // keep shard's arena around in case of async tss comparison req.arena.dependsOn(locations[shard].first.arena()); @@ -3515,6 +3565,7 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* req.spanContext = spanContext; req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT; req.limitBytes = std::numeric_limits::max(); + cx->getLatestCommitVersions(locations[shard].second, version, req.ssLatestCommitVersions); ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 0873b9b6b9..bddca6fcb3 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -32,6 +32,7 @@ #include "fdbrpc/TSSComparison.h" #include "fdbclient/TagThrottle.h" #include "flow/UnitTest.h" +#include "fdbclient/VersionVector.h" // Dead code, removed in the next protocol version struct VersionReply { @@ -212,14 +213,23 @@ struct GetValueRequest : TimedRequest { Optional tags; Optional debugID; ReplyPromise reply; + VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known + // to this client, of all storage replicas that + // serve the given key GetValueRequest() {} - GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional tags, Optional debugID) - : spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {} + GetValueRequest(SpanID spanContext, + const Key& key, + Version ver, + Optional tags, + Optional debugID, + VersionVector latestCommitVersions) + : spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID), + ssLatestCommitVersions(latestCommitVersions) {} template void serialize(Ar& ar) { - serializer(ar, key, version, tags, debugID, reply, spanContext); + serializer(ar, key, version, tags, debugID, reply, spanContext, ssLatestCommitVersions); } }; @@ -289,11 +299,26 @@ struct GetKeyValuesRequest : TimedRequest { Optional tags; Optional debugID; ReplyPromise reply; + VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known + // to this client, of all storage replicas that + // serve the given key GetKeyValuesRequest() : isFetchKeys(false) {} template void serialize(Ar& ar) { - serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena); + serializer(ar, + begin, + end, + version, + limit, + limitBytes, + isFetchKeys, + tags, + debugID, + reply, + spanContext, + arena, + ssLatestCommitVersions); } }; @@ -328,11 +353,26 @@ struct GetKeyValuesStreamRequest { Optional tags; Optional debugID; ReplyPromiseStream reply; + VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known + // to this client, of all storage replicas that + // serve the given key range GetKeyValuesStreamRequest() : isFetchKeys(false) {} template void serialize(Ar& ar) { - serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena); + serializer(ar, + begin, + end, + version, + limit, + limitBytes, + isFetchKeys, + tags, + debugID, + reply, + spanContext, + arena, + ssLatestCommitVersions); } }; @@ -359,18 +399,23 @@ struct GetKeyRequest : TimedRequest { Optional tags; Optional debugID; ReplyPromise reply; + VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known + // to this client, of all storage replicas that + // serve the given key GetKeyRequest() {} GetKeyRequest(SpanID spanContext, KeySelectorRef const& sel, Version version, Optional tags, - Optional debugID) - : spanContext(spanContext), sel(sel), version(version), debugID(debugID) {} + Optional debugID, + VersionVector latestCommitVersions) + : spanContext(spanContext), sel(sel), version(version), debugID(debugID), + ssLatestCommitVersions(latestCommitVersions) {} template void serialize(Ar& ar) { - serializer(ar, sel, version, tags, debugID, reply, spanContext, arena); + serializer(ar, sel, version, tags, debugID, reply, spanContext, arena, ssLatestCommitVersions); } }; diff --git a/fdbclient/VersionVector.h b/fdbclient/VersionVector.h index 4293ec2e53..9646ab519c 100644 --- a/fdbclient/VersionVector.h +++ b/fdbclient/VersionVector.h @@ -43,7 +43,7 @@ struct VersionVector { maxVersion = version; } - void setVersions(const std::set& tags, Version version) { + void setVersion(const std::set& tags, Version version) { ASSERT(version > maxVersion); for (auto& tag : tags) { ASSERT(tag != invalidTag); @@ -64,6 +64,11 @@ struct VersionVector { return iter->second; } + void clear() { + versions.clear(); + maxVersion = invalidVersion; + } + bool operator==(const VersionVector& vv) const { return maxVersion == vv.maxVersion; } bool operator!=(const VersionVector& vv) const { return maxVersion != vv.maxVersion; } bool operator<(const VersionVector& vv) const { return maxVersion < vv.maxVersion; } diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 7077ee7c53..2b892505fc 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -1470,6 +1470,17 @@ void maybeAddTssMapping(GetKeyServerLocationsReply& reply, } } +void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitData) { + for (const auto& [_, shard] : reply.results) { + for (auto& ssi : shard) { + auto iter = commitData->storageCache.find(ssi.id()); + if (iter != commitData->storageCache.end()) { + reply.resultsTagMapping.emplace_back(ssi.id(), iter->second->tag); + } + } + } +} + ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) { // We can't respond to these requests until we have valid txnStateStore wait(commitData->validState.getFuture()); @@ -1519,6 +1530,7 @@ ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsReques --r; } } + addTagMapping(rep, commitData); req.reply.send(rep); ++commitData->stats.keyServerLocationOut; return Void(); diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 56935103a9..0a3724c3bb 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -23,10 +23,10 @@ #include "fdbserver/LogSystemDiskQueueAdapter.h" #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/GrvProxyInterface.h" +#include "fdbclient/VersionVector.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "flow/flow.h" -#include "fdbclient/VersionVector.h" #include "flow/actorcompiler.h" // This must be the last #include. struct GrvProxyStats { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 85bf05e7b3..6614355178 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1235,7 +1235,12 @@ ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask()); state Optional v; - state Version version = wait(waitForVersion(data, req.version, req.spanContext)); + // If the client specified the latest commit version (that mutated the shard(s) being served + // by this storage server) then return the value that corresponds to that version. + Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag) + ? req.ssLatestCommitVersions.getVersion(data->tag) + : req.version; + state Version version = wait(waitForVersion(data, readVersion, req.spanContext)); if (req.debugID.present()) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), @@ -1363,7 +1368,8 @@ ACTOR Future watchWaitForValueChange(StorageServer* data, SpanID parent state Version latest = data->version.get(); TEST(latest >= minVersion && latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version - GetValueRequest getReq(span.context, metadata->key, latest, metadata->tags, metadata->debugID); + GetValueRequest getReq( + span.context, metadata->key, latest, metadata->tags, metadata->debugID, VersionVector()); state Future getValue = getValueQ( data, getReq); // we are relying on the delay zero at the top of getValueQ, if removed we need one here GetValueReply reply = wait(getReq.reply.getFuture()); @@ -1933,7 +1939,10 @@ ACTOR Future getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req) try { if (req.debugID.present()) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before"); - state Version version = wait(waitForVersion(data, req.version, span.context)); + Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag) + ? req.ssLatestCommitVersions.getVersion(data->tag) + : req.version; + state Version version = wait(waitForVersion(data, readVersion, span.context)); state uint64_t changeCounter = data->shardChangeCounter; // try { @@ -2099,7 +2108,10 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe if (req.debugID.present()) g_traceBatch.addEvent( "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.Before"); - state Version version = wait(waitForVersion(data, req.version, span.context)); + Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag) + ? req.ssLatestCommitVersions.getVersion(data->tag) + : req.version; + state Version version = wait(waitForVersion(data, readVersion, span.context)); state uint64_t changeCounter = data->shardChangeCounter; // try { @@ -2110,13 +2122,13 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion"); //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); //} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", - //req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", //"None").detail("In", "getKeyValues>getShardKeyRange"); throw e; } if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) { // TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", - //req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", - //shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", + // shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); throw wrong_shard_server(); } @@ -2193,10 +2205,10 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe } /*for( int i = 0; i < r.data.size(); i++ ) { - StorageMetrics m; - m.bytesPerKSecond = r.data[i].expectedSize(); - m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int - data->metrics.notify(r.data[i].key, m); + StorageMetrics m; + m.bytesPerKSecond = r.data[i].expectedSize(); + m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an + int data->metrics.notify(r.data[i].key, m); }*/ // For performance concerns, the cost of a range read is billed to the start key and end key of the @@ -2268,7 +2280,10 @@ ACTOR Future getKeyQ(StorageServer* data, GetKeyRequest req) { wait(data->getQueryDelay()); try { - state Version version = wait(waitForVersion(data, req.version, req.spanContext)); + Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag) + ? req.ssLatestCommitVersions.getVersion(data->tag) + : req.version; + state Version version = wait(waitForVersion(data, readVersion, req.spanContext)); state uint64_t changeCounter = data->shardChangeCounter; state KeyRange shard = getShardKeyRange(data, req.sel); @@ -4870,7 +4885,8 @@ ACTOR Future serveWatchValueRequestsImpl(StorageServer* self, FutureStream loop { try { state Version latest = self->version.get(); - GetValueRequest getReq(span.context, metadata->key, latest, metadata->tags, metadata->debugID); + GetValueRequest getReq( + span.context, metadata->key, latest, metadata->tags, metadata->debugID, VersionVector()); state Future getValue = getValueQ(self, getReq); GetValueReply reply = wait(getReq.reply.getFuture()); metadata = self->getWatchMetadata(req.key.contents());