- Propagate the latest commit version of a storage server as part of read request.

Make storage server read at the specified version.
This commit is contained in:
Sreenath Bodagala 2021-07-12 19:44:59 +00:00
parent 4676dacaab
commit b6f89df060
8 changed files with 186 additions and 40 deletions

View File

@ -293,9 +293,13 @@ struct GetKeyServerLocationsReply {
// if any storage servers in results have a TSS pair, that mapping is in here
std::vector<std::pair<UID, StorageServerInterface>> resultsTssMapping;
// maps storage server interfaces (captured in "results") to the tags of
// their corresponding storage servers
std::vector<std::pair<UID, Tag>> resultsTagMapping;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, results, resultsTssMapping, arena);
serializer(ar, results, resultsTssMapping, arena, resultsTagMapping);
}
};

View File

@ -32,13 +32,13 @@
#include "fdbclient/KeyRangeMap.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/VersionVector.h"
#include "fdbrpc/QueueModel.h"
#include "fdbrpc/MultiInterface.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/EventTypes.actor.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/Smoother.h"
#include "fdbclient/VersionVector.h"
class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
public:
@ -329,6 +329,9 @@ public:
// map from tssid -> metrics for that tss pair
std::unordered_map<UID, Reference<TSSMetrics>> tssMetrics;
// map from ssid -> ss tag
std::unordered_map<UID, Tag> ssidTagMapping;
UID dbId;
bool internal; // Only contexts created through the C client and fdbcli are non-internal
@ -434,13 +437,23 @@ public:
// Cache of the latest commit versions of storage servers.
VersionVector ssVersionVectorCache;
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
// Requests to the storage server will be duplicated to the TSS.
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
// Requests to the storage server will be duplicated to the TSS.
void addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi);
// Removes the storage server and its TSS pair from the TSS mapping (if present).
// Requests to the storage server will no longer be duplicated to its pair TSS.
// Removes the storage server and its TSS pair from the TSS mapping (if present).
// Requests to the storage server will no longer be duplicated to its pair TSS.
void removeTssMapping(StorageServerInterface const& ssi);
// Adds or updates the specified (UID, Tag) pair in the tag mapping.
void addSSIdTagMapping(const UID& uid, const Tag& tag);
// Returns the latest commit versions that mutated the specified storage servers
/// @note returns the latest commit version for a storage server only if the latest
// commit version of that storage server is below the specified "readVersion".
void getLatestCommitVersions(const Reference<LocationInfo>& locationInfo,
Version readVersion,
VersionVector& latestCommitVersions);
};
#endif

View File

@ -169,6 +169,34 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
}
}
void DatabaseContext::addSSIdTagMapping(const UID& uid, const Tag& tag) {
ssidTagMapping[uid] = tag;
}
void DatabaseContext::getLatestCommitVersions(const Reference<LocationInfo>& locationInfo,
Version readVersion,
VersionVector& latestCommitVersions) {
std::map<Version, std::set<Tag>> versionMap; // order the versions to be returned
for (int i = 0; i < locationInfo->locations()->size(); i++) {
UID uid = locationInfo->locations()->getId(i);
if (ssidTagMapping.find(uid) != ssidTagMapping.end()) {
Tag tag = ssidTagMapping[uid];
if (ssVersionVectorCache.hasVersion(tag)) {
Version commitVersion = ssVersionVectorCache.getVersion(tag); // latest commit version
if (commitVersion < readVersion) {
versionMap[commitVersion].insert(tag);
}
}
}
}
// insert the commit versions in the version vector.
latestCommitVersions.clear();
for (auto& iter : versionMap) {
latestCommitVersions.setVersion(iter.second, iter.first);
}
}
Reference<StorageServerInfo> StorageServerInfo::getInterface(DatabaseContext* cx,
StorageServerInterface const& ssi,
LocalityData const& locality) {
@ -2224,6 +2252,12 @@ void updateTssMappings(Database cx, const GetKeyServerLocationsReply& reply) {
}
}
void updateTagMappings(Database cx, const GetKeyServerLocationsReply& reply) {
for (const auto& mapping : reply.resultsTagMapping) {
cx->addSSIdTagMapping(mapping.first, mapping.second);
}
}
// If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key).
// Otherwise returns the shard containing key
ACTOR Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation_internal(Database cx,
@ -2257,6 +2291,7 @@ ACTOR Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation_internal(Da
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
updateTssMappings(cx, rep);
updateTagMappings(cx, rep);
return std::make_pair(KeyRange(rep.results[0].first, rep.arena), locationInfo);
}
}
@ -2321,6 +2356,7 @@ ACTOR Future<vector<pair<KeyRange, Reference<LocationInfo>>>> getKeyRangeLocatio
wait(yield());
}
updateTssMappings(cx, rep);
updateTagMappings(cx, rep);
return results;
}
@ -2425,6 +2461,8 @@ ACTOR Future<Optional<Value>> getValue(Future<Version> version,
state Optional<UID> getValueID = Optional<UID>();
state uint64_t startTime;
state double startTimeD;
state VersionVector ssLatestCommitVersions;
cx->getLatestCommitVersions(ssi.second, ver, ssLatestCommitVersions);
try {
if (info.debugID.present()) {
getValueID = nondeterministicRandom()->randomUniqueID();
@ -2452,15 +2490,19 @@ ACTOR Future<Optional<Value>> getValue(Future<Version> version,
}
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetValueReply _reply = wait(loadBalance(
cx.getPtr(),
ssi.second,
&StorageServerInterface::getValue,
GetValueRequest(
span.context, key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID),
TaskPriority::DefaultPromiseEndpoint,
false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
when(GetValueReply _reply =
wait(loadBalance(cx.getPtr(),
ssi.second,
&StorageServerInterface::getValue,
GetValueRequest(span.context,
key,
ver,
cx->sampleReadTags() ? tags : Optional<TagSet>(),
getValueID,
ssLatestCommitVersions),
TaskPriority::DefaultPromiseEndpoint,
false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -2548,6 +2590,9 @@ ACTOR Future<Key> getKey(Database cx, KeySelector k, Future<Version> version, Tr
state pair<KeyRange, Reference<LocationInfo>> ssi =
wait(getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()));
state VersionVector ssLatestCommitVersions;
cx->getLatestCommitVersions(ssi.second, version.get(), ssLatestCommitVersions);
try {
if (info.debugID.present())
g_traceBatch.addEvent(
@ -2557,8 +2602,12 @@ ACTOR Future<Key> getKey(Database cx, KeySelector k, Future<Version> version, Tr
// k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
++cx->transactionPhysicalReads;
GetKeyRequest req(
span.context, k, version.get(), cx->sampleReadTags() ? tags : Optional<TagSet>(), getKeyID);
GetKeyRequest req(span.context,
k,
version.get(),
cx->sampleReadTags() ? tags : Optional<TagSet>(),
getKeyID,
ssLatestCommitVersions);
req.arena.dependsOn(k.arena());
state GetKeyReply reply;
@ -2929,6 +2978,7 @@ ACTOR Future<RangeResult> getExactRange(Database cx,
req.begin = firstGreaterOrEqual(range.begin);
req.end = firstGreaterOrEqual(range.end);
req.spanContext = span.context;
cx->getLatestCommitVersions(locations[shard].second, version, req.ssLatestCommitVersions);
// keep shard's arena around in case of async tss comparison
req.arena.dependsOn(locations[shard].first.arena());
@ -3515,6 +3565,7 @@ ACTOR Future<Void> getRangeStreamFragment(ParallelStream<RangeResult>::Fragment*
req.spanContext = spanContext;
req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.limitBytes = std::numeric_limits<int>::max();
cx->getLatestCommitVersions(locations[shard].second, version, req.ssLatestCommitVersions);
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);

View File

@ -32,6 +32,7 @@
#include "fdbrpc/TSSComparison.h"
#include "fdbclient/TagThrottle.h"
#include "flow/UnitTest.h"
#include "fdbclient/VersionVector.h"
// Dead code, removed in the next protocol version
struct VersionReply {
@ -212,14 +213,23 @@ struct GetValueRequest : TimedRequest {
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<GetValueReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key
GetValueRequest() {}
GetValueRequest(SpanID spanContext, const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID)
: spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID) {}
GetValueRequest(SpanID spanContext,
const Key& key,
Version ver,
Optional<TagSet> tags,
Optional<UID> debugID,
VersionVector latestCommitVersions)
: spanContext(spanContext), key(key), version(ver), tags(tags), debugID(debugID),
ssLatestCommitVersions(latestCommitVersions) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, version, tags, debugID, reply, spanContext);
serializer(ar, key, version, tags, debugID, reply, spanContext, ssLatestCommitVersions);
}
};
@ -289,11 +299,26 @@ struct GetKeyValuesRequest : TimedRequest {
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<GetKeyValuesReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key
GetKeyValuesRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
serializer(ar,
begin,
end,
version,
limit,
limitBytes,
isFetchKeys,
tags,
debugID,
reply,
spanContext,
arena,
ssLatestCommitVersions);
}
};
@ -328,11 +353,26 @@ struct GetKeyValuesStreamRequest {
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromiseStream<GetKeyValuesStreamReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key range
GetKeyValuesStreamRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
serializer(ar,
begin,
end,
version,
limit,
limitBytes,
isFetchKeys,
tags,
debugID,
reply,
spanContext,
arena,
ssLatestCommitVersions);
}
};
@ -359,18 +399,23 @@ struct GetKeyRequest : TimedRequest {
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<GetKeyReply> reply;
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
// to this client, of all storage replicas that
// serve the given key
GetKeyRequest() {}
GetKeyRequest(SpanID spanContext,
KeySelectorRef const& sel,
Version version,
Optional<TagSet> tags,
Optional<UID> debugID)
: spanContext(spanContext), sel(sel), version(version), debugID(debugID) {}
Optional<UID> debugID,
VersionVector latestCommitVersions)
: spanContext(spanContext), sel(sel), version(version), debugID(debugID),
ssLatestCommitVersions(latestCommitVersions) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, sel, version, tags, debugID, reply, spanContext, arena);
serializer(ar, sel, version, tags, debugID, reply, spanContext, arena, ssLatestCommitVersions);
}
};

View File

@ -43,7 +43,7 @@ struct VersionVector {
maxVersion = version;
}
void setVersions(const std::set<Tag>& tags, Version version) {
void setVersion(const std::set<Tag>& tags, Version version) {
ASSERT(version > maxVersion);
for (auto& tag : tags) {
ASSERT(tag != invalidTag);
@ -64,6 +64,11 @@ struct VersionVector {
return iter->second;
}
void clear() {
versions.clear();
maxVersion = invalidVersion;
}
bool operator==(const VersionVector& vv) const { return maxVersion == vv.maxVersion; }
bool operator!=(const VersionVector& vv) const { return maxVersion != vv.maxVersion; }
bool operator<(const VersionVector& vv) const { return maxVersion < vv.maxVersion; }

View File

@ -1470,6 +1470,17 @@ void maybeAddTssMapping(GetKeyServerLocationsReply& reply,
}
}
void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitData) {
for (const auto& [_, shard] : reply.results) {
for (auto& ssi : shard) {
auto iter = commitData->storageCache.find(ssi.id());
if (iter != commitData->storageCache.end()) {
reply.resultsTagMapping.emplace_back(ssi.id(), iter->second->tag);
}
}
}
}
ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) {
// We can't respond to these requests until we have valid txnStateStore
wait(commitData->validState.getFuture());
@ -1519,6 +1530,7 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
--r;
}
}
addTagMapping(rep, commitData);
req.reply.send(rep);
++commitData->stats.keyServerLocationOut;
return Void();

View File

@ -23,10 +23,10 @@
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/GrvProxyInterface.h"
#include "fdbclient/VersionVector.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/flow.h"
#include "fdbclient/VersionVector.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct GrvProxyStats {

View File

@ -1235,7 +1235,12 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
"getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
state Optional<Value> v;
state Version version = wait(waitForVersion(data, req.version, req.spanContext));
// If the client specified the latest commit version (that mutated the shard(s) being served
// by this storage server) then return the value that corresponds to that version.
Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag)
? req.ssLatestCommitVersions.getVersion(data->tag)
: req.version;
state Version version = wait(waitForVersion(data, readVersion, req.spanContext));
if (req.debugID.present())
g_traceBatch.addEvent("GetValueDebug",
req.debugID.get().first(),
@ -1363,7 +1368,8 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanID parent
state Version latest = data->version.get();
TEST(latest >= minVersion &&
latest < data->data().latestVersion); // Starting watch loop with latestVersion > data->version
GetValueRequest getReq(span.context, metadata->key, latest, metadata->tags, metadata->debugID);
GetValueRequest getReq(
span.context, metadata->key, latest, metadata->tags, metadata->debugID, VersionVector());
state Future<Void> getValue = getValueQ(
data, getReq); // we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait(getReq.reply.getFuture());
@ -1933,7 +1939,10 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
try {
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
state Version version = wait(waitForVersion(data, req.version, span.context));
Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag)
? req.ssLatestCommitVersions.getVersion(data->tag)
: req.version;
state Version version = wait(waitForVersion(data, readVersion, span.context));
state uint64_t changeCounter = data->shardChangeCounter;
// try {
@ -2099,7 +2108,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
if (req.debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.Before");
state Version version = wait(waitForVersion(data, req.version, span.context));
Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag)
? req.ssLatestCommitVersions.getVersion(data->tag)
: req.version;
state Version version = wait(waitForVersion(data, readVersion, span.context));
state uint64_t changeCounter = data->shardChangeCounter;
// try {
@ -2110,13 +2122,13 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion");
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin",
//req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
//"None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) {
// TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin",
//req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin",
//shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents");
// req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin",
// shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents");
throw wrong_shard_server();
}
@ -2193,10 +2205,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
}
/*for( int i = 0; i < r.data.size(); i++ ) {
StorageMetrics m;
m.bytesPerKSecond = r.data[i].expectedSize();
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
data->metrics.notify(r.data[i].key, m);
StorageMetrics m;
m.bytesPerKSecond = r.data[i].expectedSize();
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an
int data->metrics.notify(r.data[i].key, m);
}*/
// For performance concerns, the cost of a range read is billed to the start key and end key of the
@ -2268,7 +2280,10 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
wait(data->getQueryDelay());
try {
state Version version = wait(waitForVersion(data, req.version, req.spanContext));
Version readVersion = req.ssLatestCommitVersions.hasVersion(data->tag)
? req.ssLatestCommitVersions.getVersion(data->tag)
: req.version;
state Version version = wait(waitForVersion(data, readVersion, req.spanContext));
state uint64_t changeCounter = data->shardChangeCounter;
state KeyRange shard = getShardKeyRange(data, req.sel);
@ -4870,7 +4885,8 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
loop {
try {
state Version latest = self->version.get();
GetValueRequest getReq(span.context, metadata->key, latest, metadata->tags, metadata->debugID);
GetValueRequest getReq(
span.context, metadata->key, latest, metadata->tags, metadata->debugID, VersionVector());
state Future<Void> getValue = getValueQ(self, getReq);
GetValueReply reply = wait(getReq.reply.getFuture());
metadata = self->getWatchMetadata(req.key.contents());