- Maintain a cache of the version vector on both the Grv proxy and
the client. Update these caches on the (transaction) read path.
This commit is contained in:
parent
75b2c4a96d
commit
baceacacf4
|
@ -30,6 +30,7 @@
|
|||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/TagThrottle.h"
|
||||
#include "fdbclient/GlobalConfig.h"
|
||||
#include "fdbclient/VersionVector.h"
|
||||
|
||||
#include "fdbrpc/Stats.h"
|
||||
#include "fdbrpc/TimedRequest.h"
|
||||
|
@ -197,6 +198,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
|||
|
||||
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
|
||||
|
||||
VersionVector ssVersionVector;
|
||||
|
||||
GetReadVersionReply() : version(invalidVersion), locked(false) {}
|
||||
|
||||
template <class Ar>
|
||||
|
@ -207,7 +210,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
|||
locked,
|
||||
metadataVersion,
|
||||
tagThrottleInfo,
|
||||
midShardSize);
|
||||
midShardSize,
|
||||
ssVersionVector);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -327,6 +331,7 @@ struct GetRawCommittedVersionReply {
|
|||
bool locked;
|
||||
Optional<Value> metadataVersion;
|
||||
Version minKnownCommittedVersion;
|
||||
VersionVector ssVersionVector;
|
||||
|
||||
GetRawCommittedVersionReply()
|
||||
: debugID(Optional<UID>()), version(invalidVersion), locked(false), metadataVersion(Optional<Value>()),
|
||||
|
@ -334,7 +339,7 @@ struct GetRawCommittedVersionReply {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, debugID, version, locked, metadataVersion, minKnownCommittedVersion);
|
||||
serializer(ar, debugID, version, locked, metadataVersion, minKnownCommittedVersion, ssVersionVector);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@
|
|||
#include "fdbclient/EventTypes.actor.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/Smoother.h"
|
||||
#include "fdbclient/VersionVector.h"
|
||||
|
||||
class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
|
||||
public:
|
||||
|
@ -430,6 +431,9 @@ public:
|
|||
static const std::vector<std::string> debugTransactionTagChoices;
|
||||
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
|
||||
|
||||
// Cache of the latest commit versions of storage servers.
|
||||
VersionVector ssVersionVectorCache;
|
||||
|
||||
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
|
||||
// Requests to the storage server will be duplicated to the TSS.
|
||||
void addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi);
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
#include <unordered_set>
|
||||
#include <boost/functional/hash.hpp>
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/flow.h"
|
||||
|
|
|
@ -5415,6 +5415,7 @@ ACTOR Future<Version> extractReadVersion(Location location,
|
|||
}
|
||||
|
||||
metadataVersion.send(rep.metadataVersion);
|
||||
cx->ssVersionVectorCache = rep.ssVersionVector;
|
||||
return rep.version;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/VersionVector.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct GrvProxyStats {
|
||||
|
@ -232,6 +233,9 @@ struct GrvProxyData {
|
|||
|
||||
Version minKnownCommittedVersion; // we should ask master for this version.
|
||||
|
||||
// Cache of the latest commit versions of storage servers.
|
||||
VersionVector ssVersionVectorCache;
|
||||
|
||||
void updateLatencyBandConfig(Optional<LatencyBandConfig> newLatencyBandConfig) {
|
||||
if (newLatencyBandConfig.present() != latencyBandConfig.present() ||
|
||||
(newLatencyBandConfig.present() &&
|
||||
|
@ -543,6 +547,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
|
|||
GetRawCommittedVersionReply repFromMaster = wait(replyFromMasterFuture);
|
||||
grvProxyData->minKnownCommittedVersion =
|
||||
std::max(grvProxyData->minKnownCommittedVersion, repFromMaster.minKnownCommittedVersion);
|
||||
grvProxyData->ssVersionVectorCache = repFromMaster.ssVersionVector;
|
||||
|
||||
GetReadVersionReply rep;
|
||||
rep.version = repFromMaster.version;
|
||||
|
@ -555,6 +560,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
|
|||
rep.processBusyTime += FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION *
|
||||
(g_network->isSimulated() ? deterministicRandom()->random01()
|
||||
: g_network->networkInfo.metrics.lastRunLoopBusyness);
|
||||
rep.ssVersionVector = grvProxyData->ssVersionVectorCache;
|
||||
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent(
|
||||
|
|
|
@ -1237,6 +1237,7 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
|||
reply.locked = self->databaseLocked;
|
||||
reply.metadataVersion = self->proxyMetadataVersion;
|
||||
reply.minKnownCommittedVersion = self->minKnownCommittedVersion;
|
||||
reply.ssVersionVector = self->ssVersionVector;
|
||||
req.reply.send(reply);
|
||||
}
|
||||
when(ReportRawCommittedVersionRequest req =
|
||||
|
|
Loading…
Reference in New Issue