Serve GetReadVersion through master
This commit is contained in:
parent
3fe8db55c5
commit
a038a02cdd
|
@ -358,6 +358,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( PROXY_COMPUTE_BUCKETS, 20000 );
|
||||
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
|
||||
init( TXN_STATE_SEND_AMOUNT, 2 );
|
||||
init( ASK_READ_VERSION_FROM_MASTER, true );
|
||||
|
||||
// Master Server
|
||||
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
|
||||
|
|
|
@ -288,6 +288,7 @@ public:
|
|||
int PROXY_COMPUTE_BUCKETS;
|
||||
double PROXY_COMPUTE_GROWTH_RATE;
|
||||
int TXN_STATE_SEND_AMOUNT;
|
||||
bool ASK_READ_VERSION_FROM_MASTER;
|
||||
|
||||
// Master Server
|
||||
double COMMIT_SLEEP_TIME;
|
||||
|
|
|
@ -37,6 +37,10 @@ struct MasterInterface {
|
|||
RequestStream< struct TLogRejoinRequest > tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new master
|
||||
RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators;
|
||||
RequestStream< struct GetCommitVersionRequest > getCommitVersion;
|
||||
// Report a proxy's committed version.
|
||||
RequestStream< struct ReportLiveCommittedVersionRequest > reportLiveCommittedVersion;
|
||||
// Get the centralized live committed version reported by proxies.
|
||||
RequestStream< struct GetLiveCommittedVersionRequest > getLiveCommittedVersion;
|
||||
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
|
||||
|
||||
NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); }
|
||||
|
@ -63,6 +67,8 @@ struct MasterInterface {
|
|||
streams.push_back(tlogRejoin.getReceiver(TaskPriority::MasterTLogRejoin));
|
||||
streams.push_back(changeCoordinators.getReceiver());
|
||||
streams.push_back(getCommitVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
|
||||
streams.push_back(getLiveCommittedVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
|
||||
streams.push_back(reportLiveCommittedVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
|
||||
streams.push_back(notifyBackupWorkerDone.getReceiver());
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
|
@ -170,6 +176,45 @@ struct GetCommitVersionRequest {
|
|||
}
|
||||
};
|
||||
|
||||
struct GetLiveCommittedVersionReply {
|
||||
constexpr static FileIdentifier file_identifier = 6298345;
|
||||
Version version;
|
||||
|
||||
GetLiveCommittedVersionReply() : version(0) {}
|
||||
explicit GetLiveCommittedVersionReply(Version version) : version(version) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetLiveCommittedVersionRequest {
|
||||
constexpr static FileIdentifier file_identifier = 3358313;
|
||||
ReplyPromise<GetLiveCommittedVersionReply> reply;
|
||||
|
||||
GetLiveCommittedVersionRequest() = default;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct ReportLiveCommittedVersionRequest {
|
||||
constexpr static FileIdentifier file_identifier = 1853148;
|
||||
Version version;
|
||||
ReplyPromise<Void> reply;
|
||||
|
||||
ReportLiveCommittedVersionRequest() : version(0) {}
|
||||
explicit ReportLiveCommittedVersionRequest(Version version) : version(version) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct BackupWorkerDoneRequest {
|
||||
constexpr static FileIdentifier file_identifier = 8736351;
|
||||
UID workerUID;
|
||||
|
|
|
@ -1281,6 +1281,9 @@ ACTOR Future<Void> commitBatch(
|
|||
self->locked = lockedAfter;
|
||||
self->metadataVersion = metadataVersionAfter;
|
||||
self->committedVersion.set(commitVersion);
|
||||
|
||||
// Let master know this bigger commit version so that every other proxy can know.
|
||||
wait(self->master.reportLiveCommittedVersion.getReply(ReportLiveCommittedVersionRequest(commitVersion)));
|
||||
}
|
||||
|
||||
if (forceRecovery) {
|
||||
|
@ -1390,9 +1393,6 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
|||
// and no other proxy could have already committed anything without first ending the epoch
|
||||
++commitData->stats.txnStartBatch;
|
||||
|
||||
state vector<Future<GetReadVersionReply>> proxyVersions;
|
||||
for (auto const& p : *otherProxies)
|
||||
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
|
||||
|
||||
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
|
||||
wait(updateLastCommit(commitData, debugID));
|
||||
|
@ -1404,19 +1404,25 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
|||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive");
|
||||
}
|
||||
|
||||
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
|
||||
GetReadVersionReply rep;
|
||||
rep.version = commitData->committedVersion.get();
|
||||
state GetReadVersionReply rep;
|
||||
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
|
||||
state GetLiveCommittedVersionReply liveCommittedVersionReply = wait(commitData->master.getLiveCommittedVersion.getReply(GetLiveCommittedVersionRequest()));
|
||||
rep.version = std::max<Version>(liveCommittedVersionReply.version, commitData->committedVersion.get());
|
||||
} else {
|
||||
state vector<Future<GetReadVersionReply>> proxyVersions;
|
||||
for (auto const& p : *otherProxies)
|
||||
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
|
||||
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
|
||||
for (auto v : versions) {
|
||||
if (v.version > rep.version) {
|
||||
rep = v;
|
||||
}
|
||||
}
|
||||
}
|
||||
rep.locked = commitData->locked;
|
||||
rep.metadataVersion = commitData->metadataVersion;
|
||||
rep.recentRequests = commitData->stats.getRecentRequests();
|
||||
|
||||
for (auto v : versions) {
|
||||
if(v.version > rep.version) {
|
||||
rep = v;
|
||||
}
|
||||
}
|
||||
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
|
||||
}
|
||||
|
|
|
@ -167,7 +167,8 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
|
||||
AsyncTrigger registrationTrigger;
|
||||
Version lastEpochEnd, // The last version in the old epoch not (to be) rolled back in this recovery
|
||||
recoveryTransactionVersion; // The first version in this epoch
|
||||
recoveryTransactionVersion, // The first version in this epoch
|
||||
liveCommittedVersion; // The most recent live committed version reported by proxies.
|
||||
double lastCommitTime;
|
||||
|
||||
DatabaseConfiguration originalConfiguration;
|
||||
|
@ -247,6 +248,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
primaryLocality(tagLocalityInvalid),
|
||||
neverCreated(false),
|
||||
lastEpochEnd(invalidVersion),
|
||||
liveCommittedVersion(invalidVersion),
|
||||
recoveryTransactionVersion(invalidVersion),
|
||||
lastCommitTime(0),
|
||||
registrationCount(0),
|
||||
|
@ -993,6 +995,22 @@ ACTOR Future<Void> provideVersions(Reference<MasterData> self) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
||||
loop {
|
||||
choose {
|
||||
when(GetLiveCommittedVersionRequest req = waitNext(self->myInterface.getLiveCommittedVersion.getFuture())) {
|
||||
if (self->liveCommittedVersion == invalidVersion) {
|
||||
self->liveCommittedVersion = self->recoveryTransactionVersion;
|
||||
}
|
||||
req.reply.send(GetLiveCommittedVersionReply(self->liveCommittedVersion));
|
||||
}
|
||||
when(ReportLiveCommittedVersionRequest req = waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
|
||||
self->liveCommittedVersion = std::max<Version>(self->liveCommittedVersion, req.version);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<KeyRangeRef, bool> findRange( CoalescedKeyRangeMap<int>& key_resolver, Standalone<VectorRef<ResolverMoveRef>>& movedRanges, int src, int dest ) {
|
||||
auto ranges = key_resolver.ranges();
|
||||
auto prev = ranges.begin();
|
||||
|
@ -1536,6 +1554,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
self->addActor.send( waitResolverFailure( self->resolvers ) );
|
||||
self->addActor.send( waitProxyFailure( self->proxies ) );
|
||||
self->addActor.send( provideVersions(self) );
|
||||
self->addActor.send( serveLiveCommittedVersion(self) );
|
||||
self->addActor.send( reportErrors(updateRegistration(self, self->logSystem), "UpdateRegistration", self->dbgid) );
|
||||
self->registrationTrigger.trigger();
|
||||
|
||||
|
|
Loading…
Reference in New Issue