- Propagate version vector deltas between processes

This commit is contained in:
Sreenath Bodagala 2021-07-14 19:31:01 +00:00
parent 5f504d2148
commit 81001edb2e
6 changed files with 92 additions and 32 deletions

View File

@ -198,7 +198,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
VersionVector ssVersionVector;
VersionVector ssVersionVectorDelta;
GetReadVersionReply() : version(invalidVersion), locked(false) {}
@ -211,7 +211,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
metadataVersion,
tagThrottleInfo,
midShardSize,
ssVersionVector);
ssVersionVectorDelta);
}
};
@ -240,15 +240,18 @@ struct GetReadVersionRequest : TimedRequest {
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;
GetReadVersionRequest() : transactionCount(1), flags(0) {}
Version maxVersion; // max version in the client's version vector cache
GetReadVersionRequest() : transactionCount(1), flags(0), maxVersion(invalidVersion) {}
GetReadVersionRequest(SpanID spanContext,
uint32_t transactionCount,
TransactionPriority priority,
Version maxVersion,
uint32_t flags = 0,
TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), transactionCount(transactionCount), priority(priority), flags(flags), tags(tags),
debugID(debugID) {
debugID(debugID), maxVersion(maxVersion) {
flags = flags & ~FLAG_PRIORITY_MASK;
switch (priority) {
case TransactionPriority::BATCH:
@ -269,7 +272,7 @@ struct GetReadVersionRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext);
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext, maxVersion);
if (ar.isDeserializing) {
if ((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {
@ -338,7 +341,7 @@ struct GetRawCommittedVersionReply {
bool locked;
Optional<Value> metadataVersion;
Version minKnownCommittedVersion;
VersionVector ssVersionVector;
VersionVector ssVersionVectorDelta;
GetRawCommittedVersionReply()
: debugID(Optional<UID>()), version(invalidVersion), locked(false), metadataVersion(Optional<Value>()),
@ -346,7 +349,7 @@ struct GetRawCommittedVersionReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, debugID, version, locked, metadataVersion, minKnownCommittedVersion, ssVersionVector);
serializer(ar, debugID, version, locked, metadataVersion, minKnownCommittedVersion, ssVersionVectorDelta);
}
};
@ -355,14 +358,17 @@ struct GetRawCommittedVersionRequest {
SpanID spanContext;
Optional<UID> debugID;
ReplyPromise<GetRawCommittedVersionReply> reply;
Version maxVersion; // max version in the grv proxy's version vector cache
explicit GetRawCommittedVersionRequest(SpanID spanContext, Optional<UID> const& debugID = Optional<UID>())
: spanContext(spanContext), debugID(debugID) {}
explicit GetRawCommittedVersionRequest() : spanContext(), debugID() {}
explicit GetRawCommittedVersionRequest(SpanID spanContext,
Optional<UID> const& debugID = Optional<UID>(),
Version maxVersion = invalidVersion)
: spanContext(spanContext), debugID(debugID), maxVersion(maxVersion) {}
explicit GetRawCommittedVersionRequest() : spanContext(), debugID(), maxVersion(invalidVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, debugID, reply, spanContext);
serializer(ar, debugID, reply, spanContext, maxVersion);
}
};

View File

@ -2660,11 +2660,12 @@ ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, Span
loop {
choose {
when(wait(cx->onProxiesChanged())) {}
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getGrvProxies(false),
&GrvProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(span.context, 0, TransactionPriority::IMMEDIATE),
cx->taskID))) {
when(GetReadVersionReply v = wait(basicLoadBalance(
cx->getGrvProxies(false),
&GrvProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(
span.context, 0, TransactionPriority::IMMEDIATE, cx->ssVersionVectorCache.getMaxVersion()),
cx->taskID))) {
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
if (v.midShardSize > 0)
cx->smoothMidShardSize.setTotal(v.midShardSize);
@ -2687,11 +2688,12 @@ ACTOR Future<Version> getRawVersion(Database cx, SpanID spanContext) {
loop {
choose {
when(wait(cx->onProxiesChanged())) {}
when(GetReadVersionReply v =
wait(basicLoadBalance(cx->getGrvProxies(false),
&GrvProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE),
cx->taskID))) {
when(GetReadVersionReply v = wait(basicLoadBalance(
cx->getGrvProxies(false),
&GrvProxyInterface::getConsistentReadVersion,
GetReadVersionRequest(
spanContext, 0, TransactionPriority::IMMEDIATE, cx->ssVersionVectorCache.getMaxVersion()),
cx->taskID))) {
return v.version;
}
}
@ -5285,7 +5287,13 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion(SpanID parentSpan,
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
loop {
try {
state GetReadVersionRequest req(span.context, transactionCount, priority, flags, tags, debugID);
state GetReadVersionRequest req(span.context,
transactionCount,
priority,
cx->ssVersionVectorCache.getMaxVersion(),
flags,
tags,
debugID);
choose {
when(wait(cx->onProxiesChanged())) {}
@ -5466,7 +5474,7 @@ ACTOR Future<Version> extractReadVersion(Location location,
}
metadataVersion.send(rep.metadataVersion);
cx->ssVersionVectorCache = rep.ssVersionVector;
cx->ssVersionVectorCache.applyDelta(rep.ssVersionVectorDelta);
return rep.version;
}

View File

@ -23,6 +23,8 @@
#pragma once
#include <map>
#include <set>
#include <unordered_map>
#include "fdbclient/FDBTypes.h"
@ -36,6 +38,8 @@ struct VersionVector {
VersionVector() : maxVersion(invalidVersion) {}
VersionVector(Version version) : maxVersion(version) {}
Version getMaxVersion() const { return maxVersion; }
void setVersion(const Tag& tag, Version version) {
ASSERT(tag != invalidTag);
ASSERT(version > maxVersion);
@ -69,6 +73,46 @@ struct VersionVector {
maxVersion = invalidVersion;
}
void getDelta(Version version, VersionVector& delta) const {
ASSERT(version <= maxVersion);
delta.clear();
if (version == maxVersion) {
return; // rerurn an invalid version vector
}
std::map<Version, std::set<Tag>> versionMap;
for (auto& iter : versions) {
if (iter.second > version) {
versionMap[iter.second].insert(iter.first);
}
}
for (auto& iter : versionMap) {
delta.setVersion(iter.second, iter.first);
}
}
void applyDelta(const VersionVector& delta) {
if (delta.maxVersion == invalidVersion) {
return;
}
ASSERT(maxVersion < delta.maxVersion);
std::map<Version, std::set<Tag>> versionMap; // order the versions
for (auto& iter : delta.versions) {
// @todo remove this assert later
ASSERT(iter.second > maxVersion);
versionMap[iter.second].insert(iter.first);
}
for (auto& iter : versionMap) {
setVersion(iter.second, iter.first);
}
}
bool operator==(const VersionVector& vv) const { return maxVersion == vv.maxVersion; }
bool operator!=(const VersionVector& vv) const { return maxVersion != vv.maxVersion; }
bool operator<(const VersionVector& vv) const { return maxVersion < vv.maxVersion; }

View File

@ -1152,7 +1152,7 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
}
when(wait(pProxyCommitData->cx->onProxiesChanged())) {}
when(GetRawCommittedVersionReply v = wait(pProxyCommitData->master.getLiveCommittedVersion.getReply(
GetRawCommittedVersionRequest(waitVersionSpan.context, debugID),
GetRawCommittedVersionRequest(waitVersionSpan.context, debugID, invalidVersion),
TaskPriority::GetLiveCommittedVersionReply))) {
if (v.version > pProxyCommitData->committedVersion.get()) {
pProxyCommitData->locked = v.locked;
@ -1474,9 +1474,8 @@ void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitDat
for (const auto& [_, shard] : reply.results) {
for (auto& ssi : shard) {
auto iter = commitData->storageCache.find(ssi.id());
if (iter != commitData->storageCache.end()) {
reply.resultsTagMapping.emplace_back(ssi.id(), iter->second->tag);
}
ASSERT_WE_THINK(iter != commitData->storageCache.end());
reply.resultsTagMapping.emplace_back(ssi.id(), iter->second->tag);
}
}
}

View File

@ -530,7 +530,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
++grvProxyData->stats.txnStartBatch;
state Future<GetRawCommittedVersionReply> replyFromMasterFuture;
replyFromMasterFuture = grvProxyData->master.getLiveCommittedVersion.getReply(
GetRawCommittedVersionRequest(span.context, debugID), TaskPriority::GetLiveCommittedVersionReply);
GetRawCommittedVersionRequest(span.context, debugID, grvProxyData->ssVersionVectorCache.getMaxVersion()),
TaskPriority::GetLiveCommittedVersionReply);
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags & GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
wait(updateLastCommit(grvProxyData, debugID));
@ -547,7 +548,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
GetRawCommittedVersionReply repFromMaster = wait(replyFromMasterFuture);
grvProxyData->minKnownCommittedVersion =
std::max(grvProxyData->minKnownCommittedVersion, repFromMaster.minKnownCommittedVersion);
grvProxyData->ssVersionVectorCache = repFromMaster.ssVersionVector;
grvProxyData->ssVersionVectorCache.applyDelta(repFromMaster.ssVersionVectorDelta);
GetReadVersionReply rep;
rep.version = repFromMaster.version;
@ -560,7 +561,6 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
rep.processBusyTime += FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION *
(g_network->isSimulated() ? deterministicRandom()->random01()
: g_network->networkInfo.metrics.lastRunLoopBusyness);
rep.ssVersionVector = grvProxyData->ssVersionVectorCache;
if (debugID.present()) {
g_traceBatch.addEvent(
@ -580,6 +580,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
// Update GRV statistics according to the request's priority.
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
std::vector<GetReadVersionRequest> requests,
GrvProxyData* grvProxyData,
GrvProxyStats* stats,
Version minKnownCommittedVersion,
PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags,
@ -611,6 +612,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
}
reply.midShardSize = midShardSize;
reply.tagThrottleInfo.clear();
grvProxyData->ssVersionVectorCache.getDelta(request.maxVersion, reply.ssVersionVectorDelta);
if (!request.tags.empty()) {
auto& priorityThrottledTags = throttledTags[request.priority];
@ -880,6 +882,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
batchPriTransactionsStarted[i]);
addActor.send(sendGrvReplies(readVersionReply,
start[i],
grvProxyData,
&grvProxyData->stats,
grvProxyData->minKnownCommittedVersion,
throttledTags,

View File

@ -248,7 +248,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
std::vector<WorkerInterface> backupWorkers; // Recruited backup workers from cluster controller.
// Captures the latest commit version targeted for each storage server in the cluster.
// @todo We need to ensure that the latest commit versions of storage servers stay
// @todo We need to ensure that the latest commit versions of storage servers stay
// up-to-date in the presence of key range splits/merges.
VersionVector ssVersionVector;
@ -1241,7 +1241,7 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
reply.locked = self->databaseLocked;
reply.metadataVersion = self->proxyMetadataVersion;
reply.minKnownCommittedVersion = self->minKnownCommittedVersion;
reply.ssVersionVector = self->ssVersionVector;
self->ssVersionVector.getDelta(req.maxVersion, reply.ssVersionVectorDelta);
req.reply.send(reply);
}
when(ReportRawCommittedVersionRequest req =