Recovered parallelism between the proxy-master rpc and its concurrent events and some minor fixes on formatting and variables naming

This commit is contained in:
Young Liu 2020-06-11 14:07:37 -07:00
parent a47806a966
commit ff7354a7de
2 changed files with 17 additions and 13 deletions

View File

@ -1392,7 +1392,14 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
// (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
// and no other proxy could have already committed anything without first ending the epoch
++commitData->stats.txnStartBatch;
state vector<Future<GetReadVersionReply>> proxyVersions;
state Future<GetReadVersionReply> replyFromMasterFuture;
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
replyFromMasterFuture = commitData->master.getLiveCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID));
} else {
for (auto const& p : *otherProxies)
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
}
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
wait(updateLastCommit(commitData, debugID));
@ -1407,18 +1414,14 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
state GetReadVersionReply rep;
rep.locked = commitData->locked;
rep.metadataVersion = commitData->metadataVersion;
rep.recentRequests = commitData->stats.getRecentRequests();
rep.version = commitData->committedVersion.get();
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
GetReadVersionReply liveCommittedVersionReply = wait(commitData->master.getLiveCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID)));
if (liveCommittedVersionReply.version > rep.version) {
rep = liveCommittedVersionReply;
GetReadVersionReply replyFromMaster = wait(replyFromMasterFuture);
if (replyFromMaster.version > rep.version) {
rep = replyFromMaster;
}
} else {
state vector<Future<GetReadVersionReply>> proxyVersions;
for (auto const& p : *otherProxies)
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
for (auto v : versions) {
if (v.version > rep.version) {
@ -1426,6 +1429,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
}
}
}
rep.recentRequests = commitData->stats.getRecentRequests();
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");

View File

@ -171,7 +171,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
double lastCommitTime;
Version liveCommittedVersion; // The live committed version reported by proxies.
bool proxyLocked;
bool databaseLocked;
Optional<Value> proxyMetadataVersion;
DatabaseConfiguration originalConfiguration;
@ -252,7 +252,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
neverCreated(false),
lastEpochEnd(invalidVersion),
liveCommittedVersion(invalidVersion),
proxyLocked(false),
databaseLocked(false),
recoveryTransactionVersion(invalidVersion),
lastCommitTime(0),
registrationCount(0),
@ -1009,16 +1009,16 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
if (self->liveCommittedVersion == invalidVersion) {
self->liveCommittedVersion = self->recoveryTransactionVersion;
}
state GetReadVersionReply reply;
GetReadVersionReply reply;
reply.version = self->liveCommittedVersion;
reply.locked = self->proxyLocked;
reply.locked = self->databaseLocked;
reply.metadataVersion = self->proxyMetadataVersion;
req.reply.send(reply);
}
when(ReportRawCommittedVersionRequest req = waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
if (req.version > self->liveCommittedVersion) {
self->liveCommittedVersion = req.version;
self->proxyLocked = req.locked;
self->databaseLocked = req.locked;
self->proxyMetadataVersion = req.metadataVersion;
}
req.reply.send(Void());